From fa7576420385088be029879c367c2c2c78bc2e52 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 17 Mar 2019 20:25:45 +0100 Subject: [PATCH] Guardian - better remote locking through reference IDs --- apier/v1/apier.go | 21 ++--- engine/cdrs.go | 22 ++--- engine/filterhelpers.go | 130 +++++++++++++++-------------- engine/filterindexer.go | 5 +- engine/libstats.go | 8 +- engine/resources.go | 89 ++++++++++---------- engine/responder.go | 20 +++-- engine/stats.go | 54 ++++++------ guardian/guardian.go | 109 +++++++++++++++++------- guardian/guardian_test.go | 171 ++++++++++++++++++++++++++------------ sessions/sessions.go | 30 ++++--- 11 files changed, 402 insertions(+), 257 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 98669e987..21cc87919 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1331,20 +1331,21 @@ func (self *ApierV1) RemoveActions(attr AttrRemoveActions, reply *string) error } type AttrRemoteLock struct { - LockIDs []string // List of IDs to obtain lock for - Timeout time.Duration // Automatically unlock on timeout + ReferenceID string // reference ID for this lock if available + LockIDs []string // List of IDs to obtain lock for + Timeout time.Duration // Automatically unlock on timeout } -func (self *ApierV1) RemoteLock(attr AttrRemoteLock, reply *string) error { - guardian.Guardian.GuardIDs(attr.Timeout, attr.LockIDs...) - *reply = utils.OK - return nil +// RemoteLock will lock a key from remote +func (self *ApierV1) RemoteLock(attr AttrRemoteLock, reply *string) (err error) { + *reply = guardian.Guardian.GuardIDs(attr.ReferenceID, attr.Timeout, attr.LockIDs...) + return } -func (self *ApierV1) RemoteUnlock(lockIDs []string, reply *string) error { - guardian.Guardian.UnguardIDs(lockIDs...) - *reply = utils.OK - return nil +// RemoteUnlock will unlock a key from remote based on reference ID +func (self *ApierV1) RemoteUnlock(refID string, reply *[]string) (err error) { + *reply = guardian.Guardian.UnguardIDs(refID) + return } func (v1 *ApierV1) StartService(args servmanager.ArgStartService, reply *string) (err error) { diff --git a/engine/cdrs.go b/engine/cdrs.go index 67e0b9403..d07476731 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -413,9 +413,9 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) { // RPC caching if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessCDR, cdr.CGRID, cdr.RunID) - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) - + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) if cachedResp.Error == nil { @@ -503,8 +503,9 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er // RPC caching if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.CDRsV2ProcessCDR, arg.CGREvent.ID) - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) @@ -588,9 +589,9 @@ func (cdrS *CDRServer) V1StoreSessionCost(attr *AttrCDRSStoreSMCost, reply *stri // RPC caching if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, attr.Cost.CGRID, attr.Cost.RunID) - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) - + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) if cachedResp.Error == nil { @@ -621,8 +622,9 @@ func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *st // RPC caching if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, args.Cost.CGRID, args.Cost.RunID) - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) diff --git a/engine/filterhelpers.go b/engine/filterhelpers.go index 72a23c0f2..5ef4c6a87 100644 --- a/engine/filterhelpers.go +++ b/engine/filterhelpers.go @@ -32,79 +32,83 @@ import ( // helper on top of dataDB.MatchFilterIndex, adding utils.ANY to list of fields queried func MatchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string, dm *DataManager, cacheID, itemIDPrefix string, indexedSelects bool) (itemIDs utils.StringMap, err error) { - lockID := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lockID) - defer guardian.Guardian.UnguardIDs(lockID) itemIDs = make(utils.StringMap) - if !indexedSelects { - keysWithID, err := dm.DataDB().GetKeysForPrefix(utils.CacheIndexesToPrefix[cacheID]) - if err != nil { - return nil, err - } - var sliceIDs []string - for _, id := range keysWithID { - sliceIDs = append(sliceIDs, strings.Split(id, ":")[1]) - } - itemIDs = utils.StringMapFromSlice(sliceIDs) - return itemIDs, nil - } - allFieldIDs := make([]string, len(ev)) - i := 0 - for fldID := range ev { - allFieldIDs[i] = fldID - i += 1 - } - stringFieldVals := map[string]string{utils.ANY: utils.ANY} // cache here field string values, start with default one - filterIndexTypes := []string{MetaString, MetaPrefix, utils.META_NONE} - for i, fieldIDs := range []*[]string{stringFldIDs, prefixFldIDs, nil} { // same routine for both string and prefix filter types - if filterIndexTypes[i] == utils.META_NONE { - fieldIDs = &[]string{utils.ANY} // so we can query DB for unindexed filters - } - if fieldIDs == nil { - fieldIDs = &allFieldIDs - } - for _, fldName := range *fieldIDs { - fieldValIf, has := ev[fldName] - if !has && filterIndexTypes[i] != utils.META_NONE { - continue + + // Guard will protect the function with automatic locking + lockID := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix + guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { + + if !indexedSelects { + var keysWithID []string + if keysWithID, err = dm.DataDB().GetKeysForPrefix(utils.CacheIndexesToPrefix[cacheID]); err != nil { + return } - if _, cached := stringFieldVals[fldName]; !cached { - strVal, err := utils.IfaceAsString(fieldValIf) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> cannot cast field: %s into string", utils.FilterS, fldName)) + var sliceIDs []string + for _, id := range keysWithID { + sliceIDs = append(sliceIDs, strings.Split(id, ":")[1]) + } + itemIDs = utils.StringMapFromSlice(sliceIDs) + return + } + allFieldIDs := make([]string, len(ev)) + i := 0 + for fldID := range ev { + allFieldIDs[i] = fldID + i += 1 + } + stringFieldVals := map[string]string{utils.ANY: utils.ANY} // cache here field string values, start with default one + filterIndexTypes := []string{MetaString, MetaPrefix, utils.META_NONE} + for i, fieldIDs := range []*[]string{stringFldIDs, prefixFldIDs, nil} { // same routine for both string and prefix filter types + if filterIndexTypes[i] == utils.META_NONE { + fieldIDs = &[]string{utils.ANY} // so we can query DB for unindexed filters + } + if fieldIDs == nil { + fieldIDs = &allFieldIDs + } + for _, fldName := range *fieldIDs { + fieldValIf, has := ev[fldName] + if !has && filterIndexTypes[i] != utils.META_NONE { continue } - stringFieldVals[fldName] = strVal - } - fldVal := stringFieldVals[fldName] - fldVals := []string{fldVal} - // default is only one fieldValue checked - if filterIndexTypes[i] == MetaPrefix { - fldVals = utils.SplitPrefix(fldVal, 1) // all prefixes till last digit - } - if fldName != utils.META_ANY { - fldName = utils.DynamicDataPrefix + fldName - } - var dbItemIDs utils.StringMap // list of items matched in DB - for _, val := range fldVals { - dbItemIDs, err = dm.MatchFilterIndex(cacheID, itemIDPrefix, filterIndexTypes[i], fldName, val) - if err != nil { - if err == utils.ErrNotFound { - err = nil + if _, cached := stringFieldVals[fldName]; !cached { + strVal, err := utils.IfaceAsString(fieldValIf) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> cannot cast field: %s into string", utils.FilterS, fldName)) continue } - return nil, err + stringFieldVals[fldName] = strVal } - break // we got at least one answer back, longest prefix wins - } - for itemID := range dbItemIDs { - if _, hasIt := itemIDs[itemID]; !hasIt { // Add it to list if not already there - itemIDs[itemID] = dbItemIDs[itemID] + fldVal := stringFieldVals[fldName] + fldVals := []string{fldVal} + // default is only one fieldValue checked + if filterIndexTypes[i] == MetaPrefix { + fldVals = utils.SplitPrefix(fldVal, 1) // all prefixes till last digit + } + if fldName != utils.META_ANY { + fldName = utils.DynamicDataPrefix + fldName + } + var dbItemIDs utils.StringMap // list of items matched in DB + for _, val := range fldVals { + if dbItemIDs, err = dm.MatchFilterIndex(cacheID, itemIDPrefix, filterIndexTypes[i], fldName, val); err != nil { + if err == utils.ErrNotFound { + err = nil + continue + } + return + } + break // we got at least one answer back, longest prefix wins + } + for itemID := range dbItemIDs { + if _, hasIt := itemIDs[itemID]; !hasIt { // Add it to list if not already there + itemIDs[itemID] = dbItemIDs[itemID] + } } } } - } + return + }, config.CgrConfig().GeneralCfg().LockingTimeout, lockID) + if len(itemIDs) == 0 { return nil, utils.ErrNotFound } diff --git a/engine/filterindexer.go b/engine/filterindexer.go index 88664349a..790000f50 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -106,8 +106,9 @@ func (rfi *FilterIndexer) cacheRemItemType() { // ToDo: tune here by removing pe // StoreIndexes handles storing the indexes to dataDB func (rfi *FilterIndexer) StoreIndexes(commit bool, transactionID string) (err error) { lockID := utils.CacheInstanceToPrefix[utils.PrefixToIndexCache[rfi.itemType]] + rfi.dbKeySuffix - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lockID) - defer guardian.Guardian.UnguardIDs(lockID) + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, lockID) + defer guardian.Guardian.UnguardIDs(refID) if err = rfi.dm.SetFilterIndexes( utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, rfi.indexes, commit, transactionID); err != nil { diff --git a/engine/libstats.go b/engine/libstats.go index e7ac1a037..e6b1b6755 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -21,6 +21,7 @@ package engine import ( "fmt" "sort" + "sync" "time" "github.com/cgrates/cgrates/config" @@ -123,9 +124,10 @@ func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error) // StatQueue represents an individual stats instance type StatQueue struct { - Tenant string - ID string - SQItems []struct { + sync.RWMutex // protect the elements from within + Tenant string + ID string + SQItems []struct { EventID string // Bounded to the original utils.CGREvent ExpiryTime *time.Time // Used to auto-expire events } diff --git a/engine/resources.go b/engine/resources.go index befbc0f7d..2cf1453c5 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -248,37 +248,37 @@ func (rs Resources) allocateResource(ru *ResourceUsage, dryRun bool) (alcMessage return "", utils.ErrResourceUnavailable } lockIDs := utils.PrefixSliceItems(rs.tenatIDsStr(), utils.ResourcesPrefix) - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...) - defer guardian.Guardian.UnguardIDs(lockIDs...) - // Simulate resource usage - for _, r := range rs { - r.removeExpiredUnits() - if _, hasID := r.Usages[ru.ID]; hasID { // update - r.clearUsage(ru.ID) - } - if r.rPrf == nil { - return "", fmt.Errorf("empty configuration for resourceID: %s", r.TenantID()) - } - if r.rPrf.Limit >= r.totalUsage()+ru.Units { - if alcMessage == "" { - if r.rPrf.AllocationMessage != "" { - alcMessage = r.rPrf.AllocationMessage - } else { - alcMessage = r.rPrf.ID + guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { + // Simulate resource usage + for _, r := range rs { + r.removeExpiredUnits() + if _, hasID := r.Usages[ru.ID]; hasID { // update + r.clearUsage(ru.ID) + } + if r.rPrf == nil { + err = fmt.Errorf("empty configuration for resourceID: %s", r.TenantID()) + return + } + if r.rPrf.Limit >= r.totalUsage()+ru.Units { + if alcMessage == "" { + if r.rPrf.AllocationMessage != "" { + alcMessage = r.rPrf.AllocationMessage + } else { + alcMessage = r.rPrf.ID + } } } } - } - if alcMessage == "" { - return "", utils.ErrResourceUnavailable - } - if dryRun { + if alcMessage == "" { + err = utils.ErrResourceUnavailable + return + } + if dryRun { + return + } + err = rs.recordUsage(ru) return - } - err = rs.recordUsage(ru) - if err != nil { - return - } + }, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...) return } @@ -417,26 +417,27 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) for i, rTid := range rIDs { lockIDs[i] = utils.ResourcesPrefix + rTid.TenantID() } - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...) - defer guardian.Guardian.UnguardIDs(lockIDs...) - for i, rTid := range rIDs { - if r, err := rS.dm.GetResource(rTid.Tenant, rTid.ID, true, true, ""); err != nil { - utils.Logger.Warning( - fmt.Sprintf(" force-uncaching resources for evUUID: <%s>, error: <%s>", - evUUID, err.Error())) - // on errors, cleanup cache so we recache - if shortCached { - Cache.Remove(utils.CacheEventResources, evUUID, true, "") + guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { + for i, rTid := range rIDs { + if r, err := rS.dm.GetResource(rTid.Tenant, rTid.ID, true, true, ""); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" force-uncaching resources for evUUID: <%s>, error: <%s>", + evUUID, err.Error())) + // on errors, cleanup cache so we recache + if shortCached { + Cache.Remove(utils.CacheEventResources, evUUID, true, "") + } else { + rS.lcERMux.Lock() + delete(rS.lcEventResources, evUUID) + rS.lcERMux.Unlock() + } + return } else { - rS.lcERMux.Lock() - delete(rS.lcEventResources, evUUID) - rS.lcERMux.Unlock() + rs[i] = r } - return nil - } else { - rs[i] = r } - } + return + }, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...) return } diff --git a/engine/responder.go b/engine/responder.go index e4e589393..78b954e9f 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -89,8 +89,9 @@ func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) { // RPC caching if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.ResponderDebit, arg.CgrID) - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) @@ -126,8 +127,9 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error) // RPC caching if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.ResponderMaxDebit, arg.CgrID) - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) @@ -163,8 +165,9 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *Account) (err // RPC caching if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.ResponderRefundIncrements, arg.CgrID) - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) @@ -200,8 +203,9 @@ func (rs *Responder) RefundRounding(arg *CallDescriptor, reply *float64) (err er // RPC caching if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.ResponderRefundRounding, arg.CgrID) - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) diff --git a/engine/stats.go b/engine/stats.go index 84793eaa1..796238818 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -108,15 +108,16 @@ func (sS *StatService) storeStats() { break // no more keys, backup completed } lkID := utils.StatQueuePrefix + sID - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lkID) - if sqIf, ok := Cache.Get(utils.CacheStatQueues, sID); !ok || sqIf == nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> failed retrieving from cache stat queue with ID: %s", - utils.StatService, sID)) - } else if err := sS.StoreStatQueue(sqIf.(*StatQueue)); err != nil { - failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup - } - guardian.Guardian.UnguardIDs(lkID) + guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { + if sqIf, ok := Cache.Get(utils.CacheStatQueues, sID); !ok || sqIf == nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> failed retrieving from cache stat queue with ID: %s", + utils.StatService, sID)) + } else if err := sS.StoreStatQueue(sqIf.(*StatQueue)); err != nil { + failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup + } + return + }, config.CgrConfig().GeneralCfg().LockingTimeout, lkID) // randomize the CPU load and give up thread control time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) } @@ -176,21 +177,23 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) ( } else if !pass { continue } + var sq *StatQueue lkID := utils.StatQueuePrefix + utils.ConcatenatedKey(sqPrfl.Tenant, sqPrfl.ID) - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lkID) - s, err := sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, true, true, "") - guardian.Guardian.UnguardIDs(lkID) + guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { + sq, err = sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, true, true, "") + return + }, config.CgrConfig().GeneralCfg().LockingTimeout, lkID) if err != nil { return nil, err } - if sqPrfl.Stored && s.dirty == nil { - s.dirty = utils.BoolPointer(false) + if sqPrfl.Stored && sq.dirty == nil { + sq.dirty = utils.BoolPointer(false) } if sqPrfl.TTL >= 0 { - s.ttl = utils.DurationPointer(sqPrfl.TTL) + sq.ttl = utils.DurationPointer(sqPrfl.TTL) } - s.sqPrfl = sqPrfl - matchingSQs[sqPrfl.ID] = s + sq.sqPrfl = sqPrfl + matchingSQs[sqPrfl.ID] = sq } // All good, convert from Map to Slice so we can sort sqs = make(StatQueues, len(matchingSQs)) @@ -235,9 +238,10 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [ for _, sq := range matchSQs { stsIDs = append(stsIDs, sq.ID) lkID := utils.StatQueuePrefix + sq.TenantID() - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lkID) - err = sq.ProcessEvent(&args.CGREvent, sS.filterS) - guardian.Guardian.UnguardIDs(lkID) + guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { + err = sq.ProcessEvent(&args.CGREvent, sS.filterS) + return + }, config.CgrConfig().GeneralCfg().LockingTimeout, lkID) if err != nil { utils.Logger.Warning( fmt.Sprintf(" Queue: %s, ignoring event: %s, error: %s", @@ -333,9 +337,6 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[ if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } - lkID := utils.StatQueuePrefix + args.TenantID() - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lkID) - defer guardian.Guardian.UnguardIDs(lkID) sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, true, true, "") if err != nil { if err != utils.ErrNotFound { @@ -343,10 +344,12 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[ } return err } + sq.RLock() metrics := make(map[string]string, len(sq.SQMetrics)) for metricID, metric := range sq.SQMetrics { metrics[metricID] = metric.GetStringValue("") } + sq.RUnlock() *reply = metrics return } @@ -356,9 +359,6 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } - lkID := utils.StatQueuePrefix + args.TenantID() - guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lkID) - defer guardian.Guardian.UnguardIDs(lkID) sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, true, true, "") if err != nil { if err != utils.ErrNotFound { @@ -366,10 +366,12 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s } return err } + sq.RLock() metrics := make(map[string]float64, len(sq.SQMetrics)) for metricID, metric := range sq.SQMetrics { metrics[metricID] = metric.GetFloat64Value() } + sq.RUnlock() *reply = metrics return } diff --git a/guardian/guardian.go b/guardian/guardian.go index 4855dbb20..f8f26af30 100644 --- a/guardian/guardian.go +++ b/guardian/guardian.go @@ -27,7 +27,9 @@ import ( ) // global package variable -var Guardian = &GuardianLocker{locksMap: make(map[string]*itemLock)} +var Guardian = &GuardianLocker{ + locks: make(map[string]*itemLock), + refs: make(map[string][]string)} type itemLock struct { lk chan struct{} @@ -36,44 +38,94 @@ type itemLock struct { // GuardianLocker is an optimized locking system per locking key type GuardianLocker struct { - locksMap map[string]*itemLock - sync.Mutex // protects the map + locks map[string]*itemLock + lkMux sync.Mutex // protects the locks + refs map[string][]string // used in case of remote locks + refsMux sync.RWMutex // protects the map } func (gl *GuardianLocker) lockItem(itmID string) { - gl.Lock() - itmLock, exists := gl.locksMap[itmID] + if itmID == "" { + return + } + gl.lkMux.Lock() + itmLock, exists := gl.locks[itmID] if !exists { itmLock = &itemLock{lk: make(chan struct{}, 1)} - gl.locksMap[itmID] = itmLock + gl.locks[itmID] = itmLock itmLock.lk <- struct{}{} } itmLock.cnt++ select { case <-itmLock.lk: - gl.Unlock() + gl.lkMux.Unlock() return default: // move further so we can unlock } - gl.Unlock() + gl.lkMux.Unlock() <-itmLock.lk } func (gl *GuardianLocker) unlockItem(itmID string) { - gl.Lock() - itmLock, exists := gl.locksMap[itmID] + gl.lkMux.Lock() + itmLock, exists := gl.locks[itmID] if !exists { - gl.Unlock() + gl.lkMux.Unlock() return } itmLock.cnt-- if itmLock.cnt == 0 { - delete(gl.locksMap, itmID) + delete(gl.locks, itmID) } - gl.Unlock() + gl.lkMux.Unlock() itmLock.lk <- struct{}{} } +// lockWithReference will perform locks and also generate a lock reference for it (so it can be used when remotely locking) +func (gl *GuardianLocker) lockWithReference(refID string, lkIDs []string) string { + var refEmpty bool + if refID == "" { + refEmpty = true + refID = utils.GenUUID() + } + gl.lockItem(refID) // make sure we only process one simultaneous refID at the time, otherwise checking already used refID is not reliable + gl.refsMux.Lock() + if !refEmpty { + if _, has := gl.refs[refID]; has { + gl.refsMux.Unlock() + gl.unlockItem(refID) + return "" // no locking was done + } + } + gl.refs[refID] = lkIDs + gl.refsMux.Unlock() + // execute the real locks + for _, lk := range lkIDs { + gl.lockItem(lk) + } + gl.unlockItem(refID) + return refID +} + +// unlockWithReference will unlock based on the reference ID +func (gl *GuardianLocker) unlockWithReference(refID string) (lkIDs []string) { + gl.lockItem(refID) + gl.refsMux.Lock() + lkIDs, has := gl.refs[refID] + if !has { + gl.refsMux.Unlock() + gl.unlockItem(refID) + return + } + delete(gl.refs, refID) + gl.refsMux.Unlock() + for _, lk := range lkIDs { + gl.unlockItem(lk) + } + gl.unlockItem(refID) + return +} + // Guard executes the handler between locks func (gl *GuardianLocker) Guard(handler func() (interface{}, error), timeout time.Duration, lockIDs ...string) (reply interface{}, err error) { for _, lockID := range lockIDs { @@ -107,25 +159,26 @@ func (gl *GuardianLocker) Guard(handler func() (interface{}, error), timeout tim return } -// GuardTimed aquires a lock for duration -func (gl *GuardianLocker) GuardIDs(timeout time.Duration, lockIDs ...string) { - for _, lockID := range lockIDs { - gl.lockItem(lockID) - } - if timeout != 0 { - go func(timeout time.Duration, lockIDs ...string) { +// GuardIDs aquires a lock for duration +// returns the reference ID for the lock group aquired +func (gl *GuardianLocker) GuardIDs(refID string, timeout time.Duration, lkIDs ...string) (retRefID string) { + retRefID = gl.lockWithReference(refID, lkIDs) + if timeout != 0 && retRefID != "" { + go func() { time.Sleep(timeout) - utils.Logger.Warning(fmt.Sprintf(" WARNING: force timing-out locks: %+v", lockIDs)) - gl.UnguardIDs(lockIDs...) - }(timeout, lockIDs...) + lkIDs := gl.unlockWithReference(retRefID) + if len(lkIDs) != 0 { + utils.Logger.Warning(fmt.Sprintf(" WARNING: force timing-out locks: %+v", lkIDs)) + } + }() } return } -// UnguardTimed attempts to unlock a set of locks based on their locksUUID -func (gl *GuardianLocker) UnguardIDs(lockIDs ...string) { - for _, lockID := range lockIDs { - gl.unlockItem(lockID) +// UnguardIDs attempts to unlock a set of locks based on their reference ID received on lock +func (gl *GuardianLocker) UnguardIDs(refID string) (lkIDs []string) { + if refID == "" { + return } - return + return gl.unlockWithReference(refID) } diff --git a/guardian/guardian_test.go b/guardian/guardian_test.go index 1fc5edc0a..1de2c9564 100644 --- a/guardian/guardian_test.go +++ b/guardian/guardian_test.go @@ -18,9 +18,13 @@ along with this program. If not, see package guardian import ( + "reflect" + "strconv" "sync" "testing" "time" + + "github.com/cgrates/cgrates/utils" ) func delayHandler() (interface{}, error) { @@ -45,16 +49,17 @@ func TestGuardianMultipleKeys(t *testing.T) { } sg.Wait() mustExecDur := time.Duration(maxIter*100) * time.Millisecond - if execTime := time.Now().Sub(tStart); execTime < mustExecDur || execTime > mustExecDur+time.Duration(20*time.Millisecond) { + if execTime := time.Now().Sub(tStart); execTime < mustExecDur || + execTime > mustExecDur+time.Duration(100*time.Millisecond) { t.Errorf("Execution took: %v", execTime) } - Guardian.Lock() + Guardian.lkMux.Lock() for _, key := range keys { - if _, hasKey := Guardian.locksMap[key]; hasKey { + if _, hasKey := Guardian.locks[key]; hasKey { t.Errorf("Possible memleak for key: %s", key) } } - Guardian.Unlock() + Guardian.lkMux.Unlock() } func TestGuardianTimeout(t *testing.T) { @@ -73,16 +78,17 @@ func TestGuardianTimeout(t *testing.T) { } sg.Wait() mustExecDur := time.Duration(maxIter*10) * time.Millisecond - if execTime := time.Now().Sub(tStart); execTime < mustExecDur || execTime > mustExecDur+time.Duration(20*time.Millisecond) { + if execTime := time.Now().Sub(tStart); execTime < mustExecDur || + execTime > mustExecDur+time.Duration(100*time.Millisecond) { t.Errorf("Execution took: %v", execTime) } - Guardian.Lock() + Guardian.lkMux.Lock() for _, key := range keys { - if _, hasKey := Guardian.locksMap[key]; hasKey { + if _, hasKey := Guardian.locks[key]; hasKey { t.Error("Possible memleak") } } - Guardian.Unlock() + Guardian.lkMux.Unlock() } func TestGuardianGuardIDs(t *testing.T) { @@ -90,122 +96,185 @@ func TestGuardianGuardIDs(t *testing.T) { //lock with 3 keys lockIDs := []string{"test1", "test2", "test3"} // make sure the keys are not in guardian before lock - Guardian.Lock() + Guardian.lkMux.Lock() for _, lockID := range lockIDs { - if _, hasKey := Guardian.locksMap[lockID]; hasKey { + if _, hasKey := Guardian.locks[lockID]; hasKey { t.Errorf("Unexpected lockID found: %s", lockID) } } - Guardian.Unlock() - + Guardian.lkMux.Unlock() // lock 3 items tStart := time.Now() lockDur := 2 * time.Millisecond - Guardian.GuardIDs(lockDur, lockIDs...) - Guardian.Lock() + Guardian.GuardIDs("", lockDur, lockIDs...) + Guardian.lkMux.Lock() for _, lockID := range lockIDs { - if itmLock, hasKey := Guardian.locksMap[lockID]; !hasKey { + if itmLock, hasKey := Guardian.locks[lockID]; !hasKey { t.Errorf("Cannot find lock for lockID: %s", lockID) } else if itmLock.cnt != 1 { t.Errorf("Unexpected itmLock found: %+v", itmLock) } } - Guardian.Unlock() + Guardian.lkMux.Unlock() secLockDur := time.Duration(1 * time.Millisecond) - // second lock to test counter - go Guardian.GuardIDs(secLockDur, lockIDs[1:]...) - time.Sleep(20 * time.Microsecond) // give time for goroutine to lock - + go Guardian.GuardIDs("", secLockDur, lockIDs[1:]...) + time.Sleep(30 * time.Microsecond) // give time for goroutine to lock // check if counters were properly increased - Guardian.Lock() + Guardian.lkMux.Lock() lkID := lockIDs[0] eCnt := int64(1) - if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey { + if itmLock, hasKey := Guardian.locks[lkID]; !hasKey { t.Errorf("Cannot find lock for lockID: %s", lkID) } else if itmLock.cnt != eCnt { t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) } lkID = lockIDs[1] eCnt = int64(2) - if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey { + if itmLock, hasKey := Guardian.locks[lkID]; !hasKey { t.Errorf("Cannot find lock for lockID: %s", lkID) } else if itmLock.cnt != eCnt { t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) } lkID = lockIDs[2] eCnt = int64(1) // we did not manage to increase it yet since it did not pass first lock - if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey { + if itmLock, hasKey := Guardian.locks[lkID]; !hasKey { t.Errorf("Cannot find lock for lockID: %s", lkID) } else if itmLock.cnt != eCnt { t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) } - Guardian.Unlock() - - time.Sleep(lockDur + secLockDur + 10*time.Millisecond) // give time to unlock before proceeding + Guardian.lkMux.Unlock() + time.Sleep(lockDur + secLockDur + 50*time.Millisecond) // give time to unlock before proceeding // make sure all counters were removed for _, lockID := range lockIDs { - if _, hasKey := Guardian.locksMap[lockID]; hasKey { + if _, hasKey := Guardian.locks[lockID]; hasKey { t.Errorf("Unexpected lockID found: %s", lockID) } } - // test lock without timer - Guardian.GuardIDs(0, lockIDs...) + refID := Guardian.GuardIDs("", 0, lockIDs...) + if totalLockDur := time.Now().Sub(tStart); totalLockDur < lockDur { t.Errorf("Lock duration too small") } time.Sleep(time.Duration(30) * time.Millisecond) - // making sure the items stay locked - Guardian.Lock() - if len(Guardian.locksMap) != 3 { - t.Errorf("locksMap should be have 3 elements, have: %+v", Guardian.locksMap) + Guardian.lkMux.Lock() + if len(Guardian.locks) != 3 { + t.Errorf("locks should have 3 elements, have: %+v", Guardian.locks) } for _, lkID := range lockIDs { - if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey { + if itmLock, hasKey := Guardian.locks[lkID]; !hasKey { t.Errorf("Cannot find lock for lockID: %s", lkID) } else if itmLock.cnt != 1 { t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) } } - Guardian.Unlock() - - Guardian.UnguardIDs(lockIDs...) - time.Sleep(time.Duration(50) * time.Millisecond) - + Guardian.lkMux.Unlock() + Guardian.UnguardIDs(refID) // make sure items were unlocked - Guardian.Lock() - if len(Guardian.locksMap) != 0 { - t.Errorf("locksMap should have 0 elements, has: %+v", Guardian.locksMap) + Guardian.lkMux.Lock() + if len(Guardian.locks) != 0 { + t.Errorf("locks should have 0 elements, has: %+v", Guardian.locks) } - Guardian.Unlock() + Guardian.lkMux.Unlock() } +// TestGuardianGuardIDsConcurrent executes GuardIDs concurrently +func TestGuardianGuardIDsConcurrent(t *testing.T) { + maxIter := 500 + sg := new(sync.WaitGroup) + keys := []string{"test1", "test2", "test3"} + refID := utils.GenUUID() + for i := 0; i < maxIter; i++ { + sg.Add(1) + go func() { + if retRefID := Guardian.GuardIDs(refID, 0, keys...); retRefID != "" { + if lkIDs := Guardian.UnguardIDs(refID); !reflect.DeepEqual(keys, lkIDs) { + t.Errorf("expecting: %+v, received: %+v", keys, lkIDs) + } + } + sg.Done() + }() + } + sg.Wait() + + Guardian.lkMux.Lock() + if len(Guardian.locks) != 0 { + t.Errorf("Possible memleak for locks: %+v", Guardian.locks) + } + Guardian.lkMux.Unlock() + Guardian.refsMux.Lock() + if len(Guardian.refs) != 0 { + t.Errorf("Possible memleak for refs: %+v", Guardian.refs) + } + Guardian.refsMux.Unlock() +} + +func TestGuardianGuardIDsTimeoutConcurrent(t *testing.T) { + maxIter := 50 + sg := new(sync.WaitGroup) + keys := []string{"test1", "test2", "test3"} + refID := utils.GenUUID() + for i := 0; i < maxIter; i++ { + sg.Add(1) + go func() { + Guardian.GuardIDs(refID, time.Duration(time.Microsecond), keys...) + sg.Done() + }() + } + sg.Wait() + time.Sleep(10 * time.Millisecond) + Guardian.lkMux.Lock() + if len(Guardian.locks) != 0 { + t.Errorf("Possible memleak for locks: %+v", Guardian.locks) + } + Guardian.lkMux.Unlock() + Guardian.refsMux.Lock() + if len(Guardian.refs) != 0 { + t.Errorf("Possible memleak for refs: %+v", Guardian.refs) + } + Guardian.refsMux.Unlock() +} + +// BenchmarkGuard-8 200000 13759 ns/op func BenchmarkGuard(b *testing.B) { - for i := 0; i < 100; i++ { + for n := 0; n < b.N; n++ { go Guardian.Guard(func() (interface{}, error) { - time.Sleep(1 * time.Millisecond) + time.Sleep(time.Microsecond) return 0, nil }, 0, "1") go Guardian.Guard(func() (interface{}, error) { - time.Sleep(1 * time.Millisecond) + time.Sleep(time.Microsecond) return 0, nil }, 0, "2") go Guardian.Guard(func() (interface{}, error) { - time.Sleep(1 * time.Millisecond) + time.Sleep(time.Microsecond) return 0, nil }, 0, "1") } } +// BenchmarkGuardian-8 1000000 5794 ns/op func BenchmarkGuardian(b *testing.B) { - for i := 0; i < 100; i++ { + for n := 0; n < b.N; n++ { go Guardian.Guard(func() (interface{}, error) { - time.Sleep(1 * time.Millisecond) + time.Sleep(time.Microsecond) return 0, nil - }, 0, "1") + }, 0, strconv.Itoa(n)) + } +} + +// BenchmarkGuardIDs-8 1000000 8732 ns/op +func BenchmarkGuardIDs(b *testing.B) { + for n := 0; n < b.N; n++ { + go func() { + if refID := Guardian.GuardIDs("", 0, strconv.Itoa(n)); refID != "" { + time.Sleep(time.Microsecond) + Guardian.UnguardIDs(refID) + } + }() } } diff --git a/sessions/sessions.go b/sessions/sessions.go index b4656f262..37d0dca4a 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -1585,8 +1585,9 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, // RPC caching if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.SessionSv1AuthorizeEvent, args.CGREvent.ID) - guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) @@ -1830,8 +1831,9 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, // RPC caching if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.SessionSv1InitiateSession, args.CGREvent.ID) - guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) @@ -2046,8 +2048,9 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, // RPC caching if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.SessionSv1UpdateSession, args.CGREvent.ID) - guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) @@ -2151,8 +2154,9 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, // RPC caching if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.SessionSv1TerminateSession, args.CGREvent.ID) - guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) @@ -2267,8 +2271,9 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, // RPC caching if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessCDR, cgrEv.ID) - guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) @@ -2351,8 +2356,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, // RPC caching if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessEvent, args.CGREvent.ID) - guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(cacheKey) + refID := guardian.Guardian.GuardIDs("", + sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse)