diff --git a/engine/statsmetrics.go b/engine/statsmetrics.go index e478557d2..c4172e95b 100644 --- a/engine/statsmetrics.go +++ b/engine/statsmetrics.go @@ -42,7 +42,7 @@ type StatsMetric interface { addEvent(ev StatsEvent) error remEvent(ev StatsEvent) error getStoredValues() ([]byte, error) // used to generate the values which are stored into DB - loadStoredValues([]byte) error // load the values from DB + loadStoredValues([]byte) error // load the values from DB data } func NewStatsASR() (StatsMetric, error) { diff --git a/engine/statsqueue.go b/engine/statsqueue.go index 28e25269e..40abc08e7 100644 --- a/engine/statsqueue.go +++ b/engine/statsqueue.go @@ -18,7 +18,6 @@ along with this program. If not, see package engine import ( - "errors" "fmt" "sync" "time" @@ -34,7 +33,8 @@ type SQItem struct { // SQStored contains values saved in DB on store type StoredSQ struct { - SQItems []*SQItem + SEvents map[string]StatsEvent // Events used by SQItems + SQItems []*SQItem // SQItems SQMetrics map[string][]byte } @@ -63,6 +63,9 @@ func (sq *StatsQueue) Init(sec *StatsEventCache, storedSQ *StoredSQ) (err error) if storedSQ == nil { return } + for evID, ev := range storedSQ.SEvents { + sq.sec.Cache(evID, ev, sq.ID) + } sq.sqItems = storedSQ.SQItems for metricID := range sq.sqMetrics { if sq.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { @@ -78,23 +81,46 @@ func (sq *StatsQueue) Init(sec *StatsEventCache, storedSQ *StoredSQ) (err error) } // GetStoredSQ retrieves the data used for store to DB -func (sq *StatsQueue) GetStoredSQ() (sSQ *StoredSQ, err error) { +func (sq *StatsQueue) GetStoredSQ() (sSQ *StoredSQ) { sq.RLock() defer sq.RUnlock() - if !sq.Store { - return nil, errors.New("not storable") + sEvents := make(map[string]StatsEvent) + var sItems []*SQItem + for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache + ev := sq.sec.GetEvent(sqItem.EventID) + if ev == nil { + utils.Logger.Warning(fmt.Sprintf(" querying for storage eventID: %s, error: event not cached", + sqItem.EventID)) + continue + } + sEvents[sqItem.EventID] = ev + sItems = append(sItems, sqItem) } sSQ = &StoredSQ{ - SQItems: sq.sqItems, + SEvents: sEvents, + SQItems: sItems, SQMetrics: make(map[string][]byte, len(sq.sqMetrics))} for metricID, metric := range sq.sqMetrics { + var err error if sSQ.SQMetrics[metricID], err = metric.getStoredValues(); err != nil { - return nil, err + utils.Logger.Warning(fmt.Sprintf(" querying for storage metricID: %s, error: %s", + metricID, err.Error())) + continue } } return } +// ProcessEvent processes a StatsEvent, returns true if processed +func (sq *StatsQueue) ProcessEvent(ev StatsEvent) (err error) { + sq.Lock() + sq.remExpired() + sq.remOnQueueLength() + sq.addStatsEvent(ev) + sq.Unlock() + return +} + // remExpired expires items in queue func (sq *StatsQueue) remExpired() { var expIdx *int // index of last item to be expired @@ -153,13 +179,3 @@ func (sq *StatsQueue) remEventWithID(evID string) { } } } - -// ProcessEvent processes a StatsEvent, returns true if processed -func (sq *StatsQueue) ProcessEvent(ev StatsEvent) (err error) { - sq.Lock() - sq.remExpired() - sq.remOnQueueLength() - sq.addStatsEvent(ev) - sq.Unlock() - return -}