From 0c3ad179b2543e3e20347df10401a9c0a88235aa Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 11 Sep 2017 17:54:33 +0200 Subject: [PATCH] StatQueue methods for processing events --- engine/libstats.go | 79 ++++++++++++------------------------------- engine/statmetrics.go | 12 ++++--- 2 files changed, 28 insertions(+), 63 deletions(-) diff --git a/engine/libstats.go b/engine/libstats.go index 8bca9d491..0a6c53189 100755 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -20,6 +20,7 @@ package engine import ( "errors" + "fmt" "sort" "time" @@ -49,6 +50,11 @@ type StatEvent struct { Fields map[string]interface{} } +// TenantID returns the unique identifier based on Tenant and ID +func (se StatEvent) TenantID() string { + return utils.ConcatenatedKey(se.Tenant, se.ID) +} + // AnswerTime returns the AnswerTime of StatEvent func (se StatEvent) AnswerTime(timezone string) (at time.Time, err error) { atIf, has := se.Fields[utils.ANSWER_TIME] @@ -149,52 +155,18 @@ func (sq *StatQueue) SqID() string { return utils.ConcatenatedKey(sq.Tenant, sq.ID) } -/* -// GetSQStoredMetrics retrieves the data used for store to DB -func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { - sq.RLock() - defer sq.RUnlock() - sEvents := make(map[string]engine.StatEvent) - var sItems []*engine.SQItem - for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache - ev := sq.sec.GetEvent(sqItem.EventID) - if ev == nil { - utils.Logger.Warning(fmt.Sprintf(" querying for storage eventID: %s, error: event not cached", - sqItem.EventID)) - continue - } - sEvents[sqItem.EventID] = ev - sItems = append(sItems, sqItem) - } - sqSM = &engine.SQStoredMetrics{ - SEvents: sEvents, - SQItems: sItems, - SQMetrics: make(map[string][]byte, len(sq.sqMetrics))} - for metricID, metric := range sq.sqMetrics { - var err error - if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil { - utils.Logger.Warning(fmt.Sprintf(" querying for storage metricID: %s, error: %s", - metricID, err.Error())) - continue - } - } - return -} - // ProcessEvent processes a StatEvent, returns true if processed -func (sq *StatQueue) ProcessEvent(ev engine.StatEvent) (err error) { - sq.Lock() +func (sq *StatQueue) ProcessEvent(ev StatEvent) (err error) { sq.remExpired() sq.remOnQueueLength() sq.addStatEvent(ev) - sq.Unlock() return } // remExpired expires items in queue func (sq *StatQueue) remExpired() { var expIdx *int // index of last item to be expired - for i, item := range sq.sqItems { + for i, item := range sq.SQItems { if item.ExpiryTime == nil { break } @@ -202,54 +174,45 @@ func (sq *StatQueue) remExpired() { break } sq.remEventWithID(item.EventID) - item = nil // garbage collected asap expIdx = &i } if expIdx == nil { return } nextValidIdx := *expIdx + 1 - sq.sqItems = sq.sqItems[nextValidIdx:] + sq.SQItems = sq.SQItems[nextValidIdx:] } -// remOnQueueLength rems elements based on QueueLength setting +// remOnQueueLength removes elements based on QueueLength setting func (sq *StatQueue) remOnQueueLength() { - if sq.cfg.QueueLength == 0 { + if sq.sqPrfl.QueueLength <= 0 { // infinite length return } - if len(sq.sqItems) == sq.cfg.QueueLength { // reached limit, rem first element - itm := sq.sqItems[0] + if len(sq.SQItems) == sq.sqPrfl.QueueLength { // reached limit, rem first element + itm := sq.SQItems[0] sq.remEventWithID(itm.EventID) - itm = nil - sq.sqItems = sq.sqItems[1:] + sq.SQItems = sq.SQItems[1:] } } // addStatEvent computes metrics for an event -func (sq *StatQueue) addStatEvent(ev engine.StatEvent) { - evID := ev.ID() - for metricID, metric := range sq.sqMetrics { +func (sq *StatQueue) addStatEvent(ev StatEvent) { + for metricID, metric := range sq.SQMetrics { if err := metric.AddEvent(ev); err != nil { utils.Logger.Warning(fmt.Sprintf(" metricID: %s, add eventID: %s, error: %s", - metricID, evID, err.Error())) + metricID, ev.TenantID(), err.Error())) } } } // remStatEvent removes an event from metrics -func (sq *StatQueue) remEventWithID(evID string) { - ev := sq.sec.GetEvent(evID) - if ev == nil { - utils.Logger.Warning(fmt.Sprintf(" removing eventID: %s, error: event not cached", evID)) - return - } - for metricID, metric := range sq.sqMetrics { - if err := metric.RemEvent(ev); err != nil { - utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error())) +func (sq *StatQueue) remEventWithID(evTenantID string) { + for metricID, metric := range sq.SQMetrics { + if err := metric.RemEvent(evTenantID); err != nil { + utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evTenantID, err.Error())) } } } -*/ // StatQueues is a sortable list of StatQueue type StatQueues []*StatQueue diff --git a/engine/statmetrics.go b/engine/statmetrics.go index 2e82a87dc..262fd8c02 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -45,7 +45,7 @@ type StatMetric interface { GetStringValue(fmtOpts string) (val string) GetFloat64Value() (val float64) AddEvent(ev StatEvent) error - RemEvent(evID string) error + RemEvent(evTenantID string) error Marshal(ms Marshaler) (marshaled []byte, err error) LoadMarshaled(ms Marshaler, marshaled []byte) (err error) } @@ -58,7 +58,7 @@ func NewASR() (StatMetric, error) { type StatASR struct { Answered float64 Count float64 - Events map[string]bool // map[EventID]Answered + Events map[string]bool // map[EventTenantID]Answered val *float64 // cached ASR value } @@ -101,6 +101,7 @@ func (asr *StatASR) AddEvent(ev StatEvent) (err error) { } else if !at.IsZero() { answered = true } + asr.Events[ev.TenantID()] = answered asr.Count += 1 if answered { asr.Answered += 1 @@ -109,8 +110,8 @@ func (asr *StatASR) AddEvent(ev StatEvent) (err error) { return } -func (asr *StatASR) RemEvent(evID string) (err error) { - answered, has := asr.Events[evID] +func (asr *StatASR) RemEvent(evTenantID string) (err error) { + answered, has := asr.Events[evTenantID] if !has { return utils.ErrNotFound } @@ -118,6 +119,7 @@ func (asr *StatASR) RemEvent(evID string) (err error) { asr.Answered -= 1 } asr.Count -= 1 + delete(asr.Events, evTenantID) asr.val = nil return } @@ -158,7 +160,7 @@ func (acd *StatACD) AddEvent(ev StatEvent) (err error) { return } -func (acd *StatACD) RemEvent(evID string) (err error) { +func (acd *StatACD) RemEvent(evTenantID string) (err error) { return }