Adding locking for statQueues, fixes #1067

This commit is contained in:
DanB
2018-05-09 19:59:31 +02:00
parent 0708890f78
commit ec843eb9e0

View File

@@ -107,6 +107,8 @@ func (sS *StatService) storeStats() {
if sID == "" {
break // no more keys, backup completed
}
lkID := utils.StatQueuePrefix + sID
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID)
if sqIf, ok := Cache.Get(utils.CacheStatQueues, sID); !ok || sqIf == nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed retrieving from cache stat queue with ID: %s",
@@ -114,6 +116,7 @@ func (sS *StatService) storeStats() {
} else if err := sS.StoreStatQueue(sqIf.(*StatQueue)); err != nil {
failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup
}
guardian.Guardian.UnguardIDs(lkID)
// randomize the CPU load and give up thread control
time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond)
}
@@ -150,6 +153,7 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *utils.CGREvent) (sqs StatQ
return nil, err
}
lockIDs := utils.PrefixSliceItems(sqIDs.Slice(), utils.StatFilterIndexes)
lockIDs = append(lockIDs, utils.PrefixSliceItems(sqIDs.Slice(), utils.StatQueuePrefix)...) // add also lock for statQueue instances
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
for sqID := range sqIDs {
@@ -169,7 +173,10 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *utils.CGREvent) (sqs StatQ
} else if !pass {
continue
}
lkID := utils.StatQueuePrefix + utils.ConcatenatedKey(sqPrfl.Tenant, sqPrfl.ID)
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID)
s, err := sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, false, "")
guardian.Guardian.UnguardIDs(lkID)
if err != nil {
return nil, err
}
@@ -217,7 +224,11 @@ func (sS *StatService) processEvent(ev *utils.CGREvent) (err error) {
}
var withErrors bool
for _, sq := range matchSQs {
if err = sq.ProcessEvent(ev); err != nil {
lkID := utils.StatQueuePrefix + sq.TenantID()
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID)
err = sq.ProcessEvent(ev)
guardian.Guardian.UnguardIDs(lkID)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatS> Queue: %s, ignoring event: %s, error: %s",
sq.TenantID(), ev.TenantID(), err.Error()))
@@ -302,6 +313,9 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[
if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
lkID := utils.StatQueuePrefix + args.TenantID()
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID)
defer guardian.Guardian.UnguardIDs(lkID)
sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "")
if err != nil {
if err != utils.ErrNotFound {
@@ -322,6 +336,9 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s
if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
lkID := utils.StatQueuePrefix + args.TenantID()
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lkID)
defer guardian.Guardian.UnguardIDs(lkID)
sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "")
if err != nil {
if err != utils.ErrNotFound {