Updated getStatQueue

This commit is contained in:
Trial97
2020-11-09 10:22:05 +02:00
committed by Dan Christian Bogos
parent c3c9539da9
commit dd1e70b36d
3 changed files with 15 additions and 19 deletions

View File

@@ -626,7 +626,7 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters,
}
// if the user define a statQueue with an existing metric check if we need to update it based on queue length
sq.ttl = ttl
if err = sq.remExpired(); err != nil {
if _, err = sq.remExpired(); err != nil {
return
}
if len(sq.SQItems) > queueLength {

View File

@@ -176,7 +176,7 @@ func (sq *StatQueue) TenantID() string {
// ProcessEvent processes a utils.CGREvent, returns true if processed
func (sq *StatQueue) ProcessEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) {
if err = sq.remExpired(); err != nil {
if _, err = sq.remExpired(); err != nil {
return
}
if err = sq.remOnQueueLength(); err != nil {
@@ -201,7 +201,7 @@ func (sq *StatQueue) remEventWithID(evID string) (err error) {
}
// remExpired expires items in queue
func (sq *StatQueue) remExpired() (err error) {
func (sq *StatQueue) remExpired() (removed int, err error) {
var expIdx *int // index of last item to be expired
for i, item := range sq.SQItems {
if item.ExpiryTime == nil {
@@ -218,7 +218,8 @@ func (sq *StatQueue) remExpired() (err error) {
if expIdx == nil {
return
}
sq.SQItems = sq.SQItems[*expIdx+1:]
removed = *expIdx + 1
sq.SQItems = sq.SQItems[removed:]
return
}

View File

@@ -257,13 +257,17 @@ func (attr *StatsArgsProcessEvent) Clone() *StatsArgsProcessEvent {
}
}
func (sS *StatService) removeExpiredWithStore(sq *StatQueue) (err error) {
func (sS *StatService) getStatQueue(tnt, id string) (sq *StatQueue, err error) {
if sq, err = sS.dm.GetStatQueue(tnt, id, true, true, utils.EmptyString); err != nil {
return
}
lkID := utils.StatQueuePrefix + sq.TenantID()
var removed int
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
err = sq.remExpired()
removed, err = sq.remExpired()
return
}, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID)
if err = sq.remExpired(); err != nil {
if err != nil || removed == 0 {
return
}
sS.storeStatQueue(sq)
@@ -424,13 +428,10 @@ func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithOpts, reply *StatQ
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
}
sq, err := sS.dm.GetStatQueue(tnt, args.ID, true, true, "")
sq, err := sS.getStatQueue(tnt, args.ID)
if err != nil {
return err
}
if err = sS.removeExpiredWithStore(sq); err != nil {
return
}
*reply = *sq
return
}
@@ -444,16 +445,13 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
}
sq, err := sS.dm.GetStatQueue(tnt, args.ID, true, true, "")
sq, err := sS.getStatQueue(tnt, args.ID)
if err != nil {
if err != utils.ErrNotFound {
err = utils.NewErrServerError(err)
}
return err
}
if err = sS.removeExpiredWithStore(sq); err != nil {
return
}
sq.RLock()
metrics := make(map[string]string, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {
@@ -473,16 +471,13 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
}
sq, err := sS.dm.GetStatQueue(tnt, args.ID, true, true, "")
sq, err := sS.getStatQueue(tnt, args.ID)
if err != nil {
if err != utils.ErrNotFound {
err = utils.NewErrServerError(err)
}
return err
}
if err = sS.removeExpiredWithStore(sq); err != nil {
return
}
sq.RLock()
metrics := make(map[string]float64, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {