From ec843eb9e05c3ec02d30acc57eb58fea266c7895 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 9 May 2018 19:59:31 +0200 Subject: [PATCH] Adding locking for statQueues, fixes #1067 --- engine/stats.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/engine/stats.go b/engine/stats.go index ced12136b..910ca1480 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -107,6 +107,8 @@ func (sS *StatService) storeStats() { if sID == "" { break // no more keys, backup completed } + lkID := utils.StatQueuePrefix + sID + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID) if sqIf, ok := Cache.Get(utils.CacheStatQueues, sID); !ok || sqIf == nil { utils.Logger.Warning( fmt.Sprintf("<%s> failed retrieving from cache stat queue with ID: %s", @@ -114,6 +116,7 @@ func (sS *StatService) storeStats() { } else if err := sS.StoreStatQueue(sqIf.(*StatQueue)); err != nil { failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup } + guardian.Guardian.UnguardIDs(lkID) // randomize the CPU load and give up thread control time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) } @@ -150,6 +153,7 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *utils.CGREvent) (sqs StatQ return nil, err } lockIDs := utils.PrefixSliceItems(sqIDs.Slice(), utils.StatFilterIndexes) + lockIDs = append(lockIDs, utils.PrefixSliceItems(sqIDs.Slice(), utils.StatQueuePrefix)...) // add also lock for statQueue instances guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...) defer guardian.Guardian.UnguardIDs(lockIDs...) for sqID := range sqIDs { @@ -169,7 +173,10 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *utils.CGREvent) (sqs StatQ } else if !pass { continue } + lkID := utils.StatQueuePrefix + utils.ConcatenatedKey(sqPrfl.Tenant, sqPrfl.ID) + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID) s, err := sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, false, "") + guardian.Guardian.UnguardIDs(lkID) if err != nil { return nil, err } @@ -217,7 +224,11 @@ func (sS *StatService) processEvent(ev *utils.CGREvent) (err error) { } var withErrors bool for _, sq := range matchSQs { - if err = sq.ProcessEvent(ev); err != nil { + lkID := utils.StatQueuePrefix + sq.TenantID() + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID) + err = sq.ProcessEvent(ev) + guardian.Guardian.UnguardIDs(lkID) + if err != nil { utils.Logger.Warning( fmt.Sprintf(" Queue: %s, ignoring event: %s, error: %s", sq.TenantID(), ev.TenantID(), err.Error())) @@ -302,6 +313,9 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[ if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } + lkID := utils.StatQueuePrefix + args.TenantID() + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID) + defer guardian.Guardian.UnguardIDs(lkID) sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "") if err != nil { if err != utils.ErrNotFound { @@ -322,6 +336,9 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } + lkID := utils.StatQueuePrefix + args.TenantID() + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID) + defer guardian.Guardian.UnguardIDs(lkID) sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "") if err != nil { if err != utils.ErrNotFound {