diff --git a/engine/datamanager.go b/engine/datamanager.go index d1f5722f3..90818d1b3 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -626,7 +626,7 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters, } // if the user define a statQueue with an existing metric check if we need to update it based on queue length sq.ttl = ttl - if err = sq.remExpired(); err != nil { + if _, err = sq.remExpired(); err != nil { return } if len(sq.SQItems) > queueLength { diff --git a/engine/libstats.go b/engine/libstats.go index 1f71f3120..50a24f178 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -176,7 +176,7 @@ func (sq *StatQueue) TenantID() string { // ProcessEvent processes a utils.CGREvent, returns true if processed func (sq *StatQueue) ProcessEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) { - if err = sq.remExpired(); err != nil { + if _, err = sq.remExpired(); err != nil { return } if err = sq.remOnQueueLength(); err != nil { @@ -201,7 +201,7 @@ func (sq *StatQueue) remEventWithID(evID string) (err error) { } // remExpired expires items in queue -func (sq *StatQueue) remExpired() (err error) { +func (sq *StatQueue) remExpired() (removed int, err error) { var expIdx *int // index of last item to be expired for i, item := range sq.SQItems { if item.ExpiryTime == nil { @@ -218,7 +218,8 @@ func (sq *StatQueue) remExpired() (err error) { if expIdx == nil { return } - sq.SQItems = sq.SQItems[*expIdx+1:] + removed = *expIdx + 1 + sq.SQItems = sq.SQItems[removed:] return } diff --git a/engine/stats.go b/engine/stats.go index 798ff5386..daad7aa4d 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -257,13 +257,17 @@ func (attr *StatsArgsProcessEvent) Clone() *StatsArgsProcessEvent { } } -func (sS *StatService) removeExpiredWithStore(sq *StatQueue) (err error) { +func (sS *StatService) getStatQueue(tnt, id string) (sq *StatQueue, err error) { + if sq, err = sS.dm.GetStatQueue(tnt, id, true, true, utils.EmptyString); err != nil { + return + } lkID := utils.StatQueuePrefix + sq.TenantID() + var removed int guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { - err = sq.remExpired() + removed, err = sq.remExpired() return }, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID) - if err = sq.remExpired(); err != nil { + if err != nil || removed == 0 { return } sS.storeStatQueue(sq) @@ -424,13 +428,10 @@ func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithOpts, reply *StatQ if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - sq, err := sS.dm.GetStatQueue(tnt, args.ID, true, true, "") + sq, err := sS.getStatQueue(tnt, args.ID) if err != nil { return err } - if err = sS.removeExpiredWithStore(sq); err != nil { - return - } *reply = *sq return } @@ -444,16 +445,13 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[ if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - sq, err := sS.dm.GetStatQueue(tnt, args.ID, true, true, "") + sq, err := sS.getStatQueue(tnt, args.ID) if err != nil { if err != utils.ErrNotFound { err = utils.NewErrServerError(err) } return err } - if err = sS.removeExpiredWithStore(sq); err != nil { - return - } sq.RLock() metrics := make(map[string]string, len(sq.SQMetrics)) for metricID, metric := range sq.SQMetrics { @@ -473,16 +471,13 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - sq, err := sS.dm.GetStatQueue(tnt, args.ID, true, true, "") + sq, err := sS.getStatQueue(tnt, args.ID) if err != nil { if err != utils.ErrNotFound { err = utils.NewErrServerError(err) } return err } - if err = sS.removeExpiredWithStore(sq); err != nil { - return - } sq.RLock() metrics := make(map[string]float64, len(sq.SQMetrics)) for metricID, metric := range sq.SQMetrics {