Guardian - better remote locking through reference IDs

This commit is contained in:
DanB
2019-03-17 20:25:45 +01:00
parent f19ef9a2dc
commit fa75764203
11 changed files with 402 additions and 257 deletions

View File

@@ -1331,20 +1331,21 @@ func (self *ApierV1) RemoveActions(attr AttrRemoveActions, reply *string) error
}
type AttrRemoteLock struct {
LockIDs []string // List of IDs to obtain lock for
Timeout time.Duration // Automatically unlock on timeout
ReferenceID string // reference ID for this lock if available
LockIDs []string // List of IDs to obtain lock for
Timeout time.Duration // Automatically unlock on timeout
}
func (self *ApierV1) RemoteLock(attr AttrRemoteLock, reply *string) error {
guardian.Guardian.GuardIDs(attr.Timeout, attr.LockIDs...)
*reply = utils.OK
return nil
// RemoteLock will lock a key from remote
func (self *ApierV1) RemoteLock(attr AttrRemoteLock, reply *string) (err error) {
*reply = guardian.Guardian.GuardIDs(attr.ReferenceID, attr.Timeout, attr.LockIDs...)
return
}
func (self *ApierV1) RemoteUnlock(lockIDs []string, reply *string) error {
guardian.Guardian.UnguardIDs(lockIDs...)
*reply = utils.OK
return nil
// RemoteUnlock will unlock a key from remote based on reference ID
func (self *ApierV1) RemoteUnlock(refID string, reply *[]string) (err error) {
*reply = guardian.Guardian.UnguardIDs(refID)
return
}
func (v1 *ApierV1) StartService(args servmanager.ArgStartService, reply *string) (err error) {

View File

@@ -413,9 +413,9 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) {
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessCDR, cdr.CGRID, cdr.RunID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
@@ -503,8 +503,9 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV2ProcessCDR, arg.CGREvent.ID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
@@ -588,9 +589,9 @@ func (cdrS *CDRServer) V1StoreSessionCost(attr *AttrCDRSStoreSMCost, reply *stri
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, attr.Cost.CGRID, attr.Cost.RunID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
@@ -621,8 +622,9 @@ func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *st
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, args.Cost.CGRID, args.Cost.RunID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)

View File

@@ -32,79 +32,83 @@ import (
// helper on top of dataDB.MatchFilterIndex, adding utils.ANY to list of fields queried
func MatchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string,
dm *DataManager, cacheID, itemIDPrefix string, indexedSelects bool) (itemIDs utils.StringMap, err error) {
lockID := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lockID)
defer guardian.Guardian.UnguardIDs(lockID)
itemIDs = make(utils.StringMap)
if !indexedSelects {
keysWithID, err := dm.DataDB().GetKeysForPrefix(utils.CacheIndexesToPrefix[cacheID])
if err != nil {
return nil, err
}
var sliceIDs []string
for _, id := range keysWithID {
sliceIDs = append(sliceIDs, strings.Split(id, ":")[1])
}
itemIDs = utils.StringMapFromSlice(sliceIDs)
return itemIDs, nil
}
allFieldIDs := make([]string, len(ev))
i := 0
for fldID := range ev {
allFieldIDs[i] = fldID
i += 1
}
stringFieldVals := map[string]string{utils.ANY: utils.ANY} // cache here field string values, start with default one
filterIndexTypes := []string{MetaString, MetaPrefix, utils.META_NONE}
for i, fieldIDs := range []*[]string{stringFldIDs, prefixFldIDs, nil} { // same routine for both string and prefix filter types
if filterIndexTypes[i] == utils.META_NONE {
fieldIDs = &[]string{utils.ANY} // so we can query DB for unindexed filters
}
if fieldIDs == nil {
fieldIDs = &allFieldIDs
}
for _, fldName := range *fieldIDs {
fieldValIf, has := ev[fldName]
if !has && filterIndexTypes[i] != utils.META_NONE {
continue
// Guard will protect the function with automatic locking
lockID := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
if !indexedSelects {
var keysWithID []string
if keysWithID, err = dm.DataDB().GetKeysForPrefix(utils.CacheIndexesToPrefix[cacheID]); err != nil {
return
}
if _, cached := stringFieldVals[fldName]; !cached {
strVal, err := utils.IfaceAsString(fieldValIf)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> cannot cast field: %s into string", utils.FilterS, fldName))
var sliceIDs []string
for _, id := range keysWithID {
sliceIDs = append(sliceIDs, strings.Split(id, ":")[1])
}
itemIDs = utils.StringMapFromSlice(sliceIDs)
return
}
allFieldIDs := make([]string, len(ev))
i := 0
for fldID := range ev {
allFieldIDs[i] = fldID
i += 1
}
stringFieldVals := map[string]string{utils.ANY: utils.ANY} // cache here field string values, start with default one
filterIndexTypes := []string{MetaString, MetaPrefix, utils.META_NONE}
for i, fieldIDs := range []*[]string{stringFldIDs, prefixFldIDs, nil} { // same routine for both string and prefix filter types
if filterIndexTypes[i] == utils.META_NONE {
fieldIDs = &[]string{utils.ANY} // so we can query DB for unindexed filters
}
if fieldIDs == nil {
fieldIDs = &allFieldIDs
}
for _, fldName := range *fieldIDs {
fieldValIf, has := ev[fldName]
if !has && filterIndexTypes[i] != utils.META_NONE {
continue
}
stringFieldVals[fldName] = strVal
}
fldVal := stringFieldVals[fldName]
fldVals := []string{fldVal}
// default is only one fieldValue checked
if filterIndexTypes[i] == MetaPrefix {
fldVals = utils.SplitPrefix(fldVal, 1) // all prefixes till last digit
}
if fldName != utils.META_ANY {
fldName = utils.DynamicDataPrefix + fldName
}
var dbItemIDs utils.StringMap // list of items matched in DB
for _, val := range fldVals {
dbItemIDs, err = dm.MatchFilterIndex(cacheID, itemIDPrefix, filterIndexTypes[i], fldName, val)
if err != nil {
if err == utils.ErrNotFound {
err = nil
if _, cached := stringFieldVals[fldName]; !cached {
strVal, err := utils.IfaceAsString(fieldValIf)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> cannot cast field: %s into string", utils.FilterS, fldName))
continue
}
return nil, err
stringFieldVals[fldName] = strVal
}
break // we got at least one answer back, longest prefix wins
}
for itemID := range dbItemIDs {
if _, hasIt := itemIDs[itemID]; !hasIt { // Add it to list if not already there
itemIDs[itemID] = dbItemIDs[itemID]
fldVal := stringFieldVals[fldName]
fldVals := []string{fldVal}
// default is only one fieldValue checked
if filterIndexTypes[i] == MetaPrefix {
fldVals = utils.SplitPrefix(fldVal, 1) // all prefixes till last digit
}
if fldName != utils.META_ANY {
fldName = utils.DynamicDataPrefix + fldName
}
var dbItemIDs utils.StringMap // list of items matched in DB
for _, val := range fldVals {
if dbItemIDs, err = dm.MatchFilterIndex(cacheID, itemIDPrefix, filterIndexTypes[i], fldName, val); err != nil {
if err == utils.ErrNotFound {
err = nil
continue
}
return
}
break // we got at least one answer back, longest prefix wins
}
for itemID := range dbItemIDs {
if _, hasIt := itemIDs[itemID]; !hasIt { // Add it to list if not already there
itemIDs[itemID] = dbItemIDs[itemID]
}
}
}
}
}
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, lockID)
if len(itemIDs) == 0 {
return nil, utils.ErrNotFound
}

View File

@@ -106,8 +106,9 @@ func (rfi *FilterIndexer) cacheRemItemType() { // ToDo: tune here by removing pe
// StoreIndexes handles storing the indexes to dataDB
func (rfi *FilterIndexer) StoreIndexes(commit bool, transactionID string) (err error) {
lockID := utils.CacheInstanceToPrefix[utils.PrefixToIndexCache[rfi.itemType]] + rfi.dbKeySuffix
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lockID)
defer guardian.Guardian.UnguardIDs(lockID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, lockID)
defer guardian.Guardian.UnguardIDs(refID)
if err = rfi.dm.SetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
rfi.indexes, commit, transactionID); err != nil {

View File

@@ -21,6 +21,7 @@ package engine
import (
"fmt"
"sort"
"sync"
"time"
"github.com/cgrates/cgrates/config"
@@ -123,9 +124,10 @@ func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error)
// StatQueue represents an individual stats instance
type StatQueue struct {
Tenant string
ID string
SQItems []struct {
sync.RWMutex // protect the elements from within
Tenant string
ID string
SQItems []struct {
EventID string // Bounded to the original utils.CGREvent
ExpiryTime *time.Time // Used to auto-expire events
}

View File

@@ -248,37 +248,37 @@ func (rs Resources) allocateResource(ru *ResourceUsage, dryRun bool) (alcMessage
return "", utils.ErrResourceUnavailable
}
lockIDs := utils.PrefixSliceItems(rs.tenatIDsStr(), utils.ResourcesPrefix)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
// Simulate resource usage
for _, r := range rs {
r.removeExpiredUnits()
if _, hasID := r.Usages[ru.ID]; hasID { // update
r.clearUsage(ru.ID)
}
if r.rPrf == nil {
return "", fmt.Errorf("empty configuration for resourceID: %s", r.TenantID())
}
if r.rPrf.Limit >= r.totalUsage()+ru.Units {
if alcMessage == "" {
if r.rPrf.AllocationMessage != "" {
alcMessage = r.rPrf.AllocationMessage
} else {
alcMessage = r.rPrf.ID
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
// Simulate resource usage
for _, r := range rs {
r.removeExpiredUnits()
if _, hasID := r.Usages[ru.ID]; hasID { // update
r.clearUsage(ru.ID)
}
if r.rPrf == nil {
err = fmt.Errorf("empty configuration for resourceID: %s", r.TenantID())
return
}
if r.rPrf.Limit >= r.totalUsage()+ru.Units {
if alcMessage == "" {
if r.rPrf.AllocationMessage != "" {
alcMessage = r.rPrf.AllocationMessage
} else {
alcMessage = r.rPrf.ID
}
}
}
}
}
if alcMessage == "" {
return "", utils.ErrResourceUnavailable
}
if dryRun {
if alcMessage == "" {
err = utils.ErrResourceUnavailable
return
}
if dryRun {
return
}
err = rs.recordUsage(ru)
return
}
err = rs.recordUsage(ru)
if err != nil {
return
}
}, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...)
return
}
@@ -417,26 +417,27 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
for i, rTid := range rIDs {
lockIDs[i] = utils.ResourcesPrefix + rTid.TenantID()
}
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
for i, rTid := range rIDs {
if r, err := rS.dm.GetResource(rTid.Tenant, rTid.ID, true, true, ""); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ResourceS> force-uncaching resources for evUUID: <%s>, error: <%s>",
evUUID, err.Error()))
// on errors, cleanup cache so we recache
if shortCached {
Cache.Remove(utils.CacheEventResources, evUUID, true, "")
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
for i, rTid := range rIDs {
if r, err := rS.dm.GetResource(rTid.Tenant, rTid.ID, true, true, ""); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ResourceS> force-uncaching resources for evUUID: <%s>, error: <%s>",
evUUID, err.Error()))
// on errors, cleanup cache so we recache
if shortCached {
Cache.Remove(utils.CacheEventResources, evUUID, true, "")
} else {
rS.lcERMux.Lock()
delete(rS.lcEventResources, evUUID)
rS.lcERMux.Unlock()
}
return
} else {
rS.lcERMux.Lock()
delete(rS.lcEventResources, evUUID)
rS.lcERMux.Unlock()
rs[i] = r
}
return nil
} else {
rs[i] = r
}
}
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...)
return
}

View File

@@ -89,8 +89,9 @@ func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) {
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.ResponderDebit, arg.CgrID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
@@ -126,8 +127,9 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error)
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.ResponderMaxDebit, arg.CgrID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
@@ -163,8 +165,9 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *Account) (err
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.ResponderRefundIncrements, arg.CgrID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
@@ -200,8 +203,9 @@ func (rs *Responder) RefundRounding(arg *CallDescriptor, reply *float64) (err er
// RPC caching
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.ResponderRefundRounding, arg.CgrID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)

View File

@@ -108,15 +108,16 @@ func (sS *StatService) storeStats() {
break // no more keys, backup completed
}
lkID := utils.StatQueuePrefix + sID
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
if sqIf, ok := Cache.Get(utils.CacheStatQueues, sID); !ok || sqIf == nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed retrieving from cache stat queue with ID: %s",
utils.StatService, sID))
} else if err := sS.StoreStatQueue(sqIf.(*StatQueue)); err != nil {
failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup
}
guardian.Guardian.UnguardIDs(lkID)
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
if sqIf, ok := Cache.Get(utils.CacheStatQueues, sID); !ok || sqIf == nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed retrieving from cache stat queue with ID: %s",
utils.StatService, sID))
} else if err := sS.StoreStatQueue(sqIf.(*StatQueue)); err != nil {
failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup
}
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
// randomize the CPU load and give up thread control
time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond)
}
@@ -176,21 +177,23 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) (
} else if !pass {
continue
}
var sq *StatQueue
lkID := utils.StatQueuePrefix + utils.ConcatenatedKey(sqPrfl.Tenant, sqPrfl.ID)
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
s, err := sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, true, true, "")
guardian.Guardian.UnguardIDs(lkID)
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
sq, err = sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, true, true, "")
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
if err != nil {
return nil, err
}
if sqPrfl.Stored && s.dirty == nil {
s.dirty = utils.BoolPointer(false)
if sqPrfl.Stored && sq.dirty == nil {
sq.dirty = utils.BoolPointer(false)
}
if sqPrfl.TTL >= 0 {
s.ttl = utils.DurationPointer(sqPrfl.TTL)
sq.ttl = utils.DurationPointer(sqPrfl.TTL)
}
s.sqPrfl = sqPrfl
matchingSQs[sqPrfl.ID] = s
sq.sqPrfl = sqPrfl
matchingSQs[sqPrfl.ID] = sq
}
// All good, convert from Map to Slice so we can sort
sqs = make(StatQueues, len(matchingSQs))
@@ -235,9 +238,10 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [
for _, sq := range matchSQs {
stsIDs = append(stsIDs, sq.ID)
lkID := utils.StatQueuePrefix + sq.TenantID()
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
err = sq.ProcessEvent(&args.CGREvent, sS.filterS)
guardian.Guardian.UnguardIDs(lkID)
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
err = sq.ProcessEvent(&args.CGREvent, sS.filterS)
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatS> Queue: %s, ignoring event: %s, error: %s",
@@ -333,9 +337,6 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[
if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
lkID := utils.StatQueuePrefix + args.TenantID()
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
defer guardian.Guardian.UnguardIDs(lkID)
sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, true, true, "")
if err != nil {
if err != utils.ErrNotFound {
@@ -343,10 +344,12 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[
}
return err
}
sq.RLock()
metrics := make(map[string]string, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {
metrics[metricID] = metric.GetStringValue("")
}
sq.RUnlock()
*reply = metrics
return
}
@@ -356,9 +359,6 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s
if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
lkID := utils.StatQueuePrefix + args.TenantID()
guardian.Guardian.GuardIDs(config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
defer guardian.Guardian.UnguardIDs(lkID)
sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, true, true, "")
if err != nil {
if err != utils.ErrNotFound {
@@ -366,10 +366,12 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s
}
return err
}
sq.RLock()
metrics := make(map[string]float64, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {
metrics[metricID] = metric.GetFloat64Value()
}
sq.RUnlock()
*reply = metrics
return
}

View File

@@ -27,7 +27,9 @@ import (
)
// global package variable
var Guardian = &GuardianLocker{locksMap: make(map[string]*itemLock)}
var Guardian = &GuardianLocker{
locks: make(map[string]*itemLock),
refs: make(map[string][]string)}
type itemLock struct {
lk chan struct{}
@@ -36,44 +38,94 @@ type itemLock struct {
// GuardianLocker is an optimized locking system per locking key
type GuardianLocker struct {
locksMap map[string]*itemLock
sync.Mutex // protects the map
locks map[string]*itemLock
lkMux sync.Mutex // protects the locks
refs map[string][]string // used in case of remote locks
refsMux sync.RWMutex // protects the map
}
func (gl *GuardianLocker) lockItem(itmID string) {
gl.Lock()
itmLock, exists := gl.locksMap[itmID]
if itmID == "" {
return
}
gl.lkMux.Lock()
itmLock, exists := gl.locks[itmID]
if !exists {
itmLock = &itemLock{lk: make(chan struct{}, 1)}
gl.locksMap[itmID] = itmLock
gl.locks[itmID] = itmLock
itmLock.lk <- struct{}{}
}
itmLock.cnt++
select {
case <-itmLock.lk:
gl.Unlock()
gl.lkMux.Unlock()
return
default: // move further so we can unlock
}
gl.Unlock()
gl.lkMux.Unlock()
<-itmLock.lk
}
func (gl *GuardianLocker) unlockItem(itmID string) {
gl.Lock()
itmLock, exists := gl.locksMap[itmID]
gl.lkMux.Lock()
itmLock, exists := gl.locks[itmID]
if !exists {
gl.Unlock()
gl.lkMux.Unlock()
return
}
itmLock.cnt--
if itmLock.cnt == 0 {
delete(gl.locksMap, itmID)
delete(gl.locks, itmID)
}
gl.Unlock()
gl.lkMux.Unlock()
itmLock.lk <- struct{}{}
}
// lockWithReference will perform locks and also generate a lock reference for it (so it can be used when remotely locking)
func (gl *GuardianLocker) lockWithReference(refID string, lkIDs []string) string {
var refEmpty bool
if refID == "" {
refEmpty = true
refID = utils.GenUUID()
}
gl.lockItem(refID) // make sure we only process one simultaneous refID at the time, otherwise checking already used refID is not reliable
gl.refsMux.Lock()
if !refEmpty {
if _, has := gl.refs[refID]; has {
gl.refsMux.Unlock()
gl.unlockItem(refID)
return "" // no locking was done
}
}
gl.refs[refID] = lkIDs
gl.refsMux.Unlock()
// execute the real locks
for _, lk := range lkIDs {
gl.lockItem(lk)
}
gl.unlockItem(refID)
return refID
}
// unlockWithReference will unlock based on the reference ID
func (gl *GuardianLocker) unlockWithReference(refID string) (lkIDs []string) {
gl.lockItem(refID)
gl.refsMux.Lock()
lkIDs, has := gl.refs[refID]
if !has {
gl.refsMux.Unlock()
gl.unlockItem(refID)
return
}
delete(gl.refs, refID)
gl.refsMux.Unlock()
for _, lk := range lkIDs {
gl.unlockItem(lk)
}
gl.unlockItem(refID)
return
}
// Guard executes the handler between locks
func (gl *GuardianLocker) Guard(handler func() (interface{}, error), timeout time.Duration, lockIDs ...string) (reply interface{}, err error) {
for _, lockID := range lockIDs {
@@ -107,25 +159,26 @@ func (gl *GuardianLocker) Guard(handler func() (interface{}, error), timeout tim
return
}
// GuardTimed aquires a lock for duration
func (gl *GuardianLocker) GuardIDs(timeout time.Duration, lockIDs ...string) {
for _, lockID := range lockIDs {
gl.lockItem(lockID)
}
if timeout != 0 {
go func(timeout time.Duration, lockIDs ...string) {
// GuardIDs aquires a lock for duration
// returns the reference ID for the lock group aquired
func (gl *GuardianLocker) GuardIDs(refID string, timeout time.Duration, lkIDs ...string) (retRefID string) {
retRefID = gl.lockWithReference(refID, lkIDs)
if timeout != 0 && retRefID != "" {
go func() {
time.Sleep(timeout)
utils.Logger.Warning(fmt.Sprintf("<Guardian> WARNING: force timing-out locks: %+v", lockIDs))
gl.UnguardIDs(lockIDs...)
}(timeout, lockIDs...)
lkIDs := gl.unlockWithReference(retRefID)
if len(lkIDs) != 0 {
utils.Logger.Warning(fmt.Sprintf("<Guardian> WARNING: force timing-out locks: %+v", lkIDs))
}
}()
}
return
}
// UnguardTimed attempts to unlock a set of locks based on their locksUUID
func (gl *GuardianLocker) UnguardIDs(lockIDs ...string) {
for _, lockID := range lockIDs {
gl.unlockItem(lockID)
// UnguardIDs attempts to unlock a set of locks based on their reference ID received on lock
func (gl *GuardianLocker) UnguardIDs(refID string) (lkIDs []string) {
if refID == "" {
return
}
return
return gl.unlockWithReference(refID)
}

View File

@@ -18,9 +18,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package guardian
import (
"reflect"
"strconv"
"sync"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
)
func delayHandler() (interface{}, error) {
@@ -45,16 +49,17 @@ func TestGuardianMultipleKeys(t *testing.T) {
}
sg.Wait()
mustExecDur := time.Duration(maxIter*100) * time.Millisecond
if execTime := time.Now().Sub(tStart); execTime < mustExecDur || execTime > mustExecDur+time.Duration(20*time.Millisecond) {
if execTime := time.Now().Sub(tStart); execTime < mustExecDur ||
execTime > mustExecDur+time.Duration(100*time.Millisecond) {
t.Errorf("Execution took: %v", execTime)
}
Guardian.Lock()
Guardian.lkMux.Lock()
for _, key := range keys {
if _, hasKey := Guardian.locksMap[key]; hasKey {
if _, hasKey := Guardian.locks[key]; hasKey {
t.Errorf("Possible memleak for key: %s", key)
}
}
Guardian.Unlock()
Guardian.lkMux.Unlock()
}
func TestGuardianTimeout(t *testing.T) {
@@ -73,16 +78,17 @@ func TestGuardianTimeout(t *testing.T) {
}
sg.Wait()
mustExecDur := time.Duration(maxIter*10) * time.Millisecond
if execTime := time.Now().Sub(tStart); execTime < mustExecDur || execTime > mustExecDur+time.Duration(20*time.Millisecond) {
if execTime := time.Now().Sub(tStart); execTime < mustExecDur ||
execTime > mustExecDur+time.Duration(100*time.Millisecond) {
t.Errorf("Execution took: %v", execTime)
}
Guardian.Lock()
Guardian.lkMux.Lock()
for _, key := range keys {
if _, hasKey := Guardian.locksMap[key]; hasKey {
if _, hasKey := Guardian.locks[key]; hasKey {
t.Error("Possible memleak")
}
}
Guardian.Unlock()
Guardian.lkMux.Unlock()
}
func TestGuardianGuardIDs(t *testing.T) {
@@ -90,122 +96,185 @@ func TestGuardianGuardIDs(t *testing.T) {
//lock with 3 keys
lockIDs := []string{"test1", "test2", "test3"}
// make sure the keys are not in guardian before lock
Guardian.Lock()
Guardian.lkMux.Lock()
for _, lockID := range lockIDs {
if _, hasKey := Guardian.locksMap[lockID]; hasKey {
if _, hasKey := Guardian.locks[lockID]; hasKey {
t.Errorf("Unexpected lockID found: %s", lockID)
}
}
Guardian.Unlock()
Guardian.lkMux.Unlock()
// lock 3 items
tStart := time.Now()
lockDur := 2 * time.Millisecond
Guardian.GuardIDs(lockDur, lockIDs...)
Guardian.Lock()
Guardian.GuardIDs("", lockDur, lockIDs...)
Guardian.lkMux.Lock()
for _, lockID := range lockIDs {
if itmLock, hasKey := Guardian.locksMap[lockID]; !hasKey {
if itmLock, hasKey := Guardian.locks[lockID]; !hasKey {
t.Errorf("Cannot find lock for lockID: %s", lockID)
} else if itmLock.cnt != 1 {
t.Errorf("Unexpected itmLock found: %+v", itmLock)
}
}
Guardian.Unlock()
Guardian.lkMux.Unlock()
secLockDur := time.Duration(1 * time.Millisecond)
// second lock to test counter
go Guardian.GuardIDs(secLockDur, lockIDs[1:]...)
time.Sleep(20 * time.Microsecond) // give time for goroutine to lock
go Guardian.GuardIDs("", secLockDur, lockIDs[1:]...)
time.Sleep(30 * time.Microsecond) // give time for goroutine to lock
// check if counters were properly increased
Guardian.Lock()
Guardian.lkMux.Lock()
lkID := lockIDs[0]
eCnt := int64(1)
if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey {
if itmLock, hasKey := Guardian.locks[lkID]; !hasKey {
t.Errorf("Cannot find lock for lockID: %s", lkID)
} else if itmLock.cnt != eCnt {
t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID)
}
lkID = lockIDs[1]
eCnt = int64(2)
if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey {
if itmLock, hasKey := Guardian.locks[lkID]; !hasKey {
t.Errorf("Cannot find lock for lockID: %s", lkID)
} else if itmLock.cnt != eCnt {
t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID)
}
lkID = lockIDs[2]
eCnt = int64(1) // we did not manage to increase it yet since it did not pass first lock
if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey {
if itmLock, hasKey := Guardian.locks[lkID]; !hasKey {
t.Errorf("Cannot find lock for lockID: %s", lkID)
} else if itmLock.cnt != eCnt {
t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID)
}
Guardian.Unlock()
time.Sleep(lockDur + secLockDur + 10*time.Millisecond) // give time to unlock before proceeding
Guardian.lkMux.Unlock()
time.Sleep(lockDur + secLockDur + 50*time.Millisecond) // give time to unlock before proceeding
// make sure all counters were removed
for _, lockID := range lockIDs {
if _, hasKey := Guardian.locksMap[lockID]; hasKey {
if _, hasKey := Guardian.locks[lockID]; hasKey {
t.Errorf("Unexpected lockID found: %s", lockID)
}
}
// test lock without timer
Guardian.GuardIDs(0, lockIDs...)
refID := Guardian.GuardIDs("", 0, lockIDs...)
if totalLockDur := time.Now().Sub(tStart); totalLockDur < lockDur {
t.Errorf("Lock duration too small")
}
time.Sleep(time.Duration(30) * time.Millisecond)
// making sure the items stay locked
Guardian.Lock()
if len(Guardian.locksMap) != 3 {
t.Errorf("locksMap should be have 3 elements, have: %+v", Guardian.locksMap)
Guardian.lkMux.Lock()
if len(Guardian.locks) != 3 {
t.Errorf("locks should have 3 elements, have: %+v", Guardian.locks)
}
for _, lkID := range lockIDs {
if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey {
if itmLock, hasKey := Guardian.locks[lkID]; !hasKey {
t.Errorf("Cannot find lock for lockID: %s", lkID)
} else if itmLock.cnt != 1 {
t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID)
}
}
Guardian.Unlock()
Guardian.UnguardIDs(lockIDs...)
time.Sleep(time.Duration(50) * time.Millisecond)
Guardian.lkMux.Unlock()
Guardian.UnguardIDs(refID)
// make sure items were unlocked
Guardian.Lock()
if len(Guardian.locksMap) != 0 {
t.Errorf("locksMap should have 0 elements, has: %+v", Guardian.locksMap)
Guardian.lkMux.Lock()
if len(Guardian.locks) != 0 {
t.Errorf("locks should have 0 elements, has: %+v", Guardian.locks)
}
Guardian.Unlock()
Guardian.lkMux.Unlock()
}
// TestGuardianGuardIDsConcurrent executes GuardIDs concurrently
func TestGuardianGuardIDsConcurrent(t *testing.T) {
maxIter := 500
sg := new(sync.WaitGroup)
keys := []string{"test1", "test2", "test3"}
refID := utils.GenUUID()
for i := 0; i < maxIter; i++ {
sg.Add(1)
go func() {
if retRefID := Guardian.GuardIDs(refID, 0, keys...); retRefID != "" {
if lkIDs := Guardian.UnguardIDs(refID); !reflect.DeepEqual(keys, lkIDs) {
t.Errorf("expecting: %+v, received: %+v", keys, lkIDs)
}
}
sg.Done()
}()
}
sg.Wait()
Guardian.lkMux.Lock()
if len(Guardian.locks) != 0 {
t.Errorf("Possible memleak for locks: %+v", Guardian.locks)
}
Guardian.lkMux.Unlock()
Guardian.refsMux.Lock()
if len(Guardian.refs) != 0 {
t.Errorf("Possible memleak for refs: %+v", Guardian.refs)
}
Guardian.refsMux.Unlock()
}
func TestGuardianGuardIDsTimeoutConcurrent(t *testing.T) {
maxIter := 50
sg := new(sync.WaitGroup)
keys := []string{"test1", "test2", "test3"}
refID := utils.GenUUID()
for i := 0; i < maxIter; i++ {
sg.Add(1)
go func() {
Guardian.GuardIDs(refID, time.Duration(time.Microsecond), keys...)
sg.Done()
}()
}
sg.Wait()
time.Sleep(10 * time.Millisecond)
Guardian.lkMux.Lock()
if len(Guardian.locks) != 0 {
t.Errorf("Possible memleak for locks: %+v", Guardian.locks)
}
Guardian.lkMux.Unlock()
Guardian.refsMux.Lock()
if len(Guardian.refs) != 0 {
t.Errorf("Possible memleak for refs: %+v", Guardian.refs)
}
Guardian.refsMux.Unlock()
}
// BenchmarkGuard-8 200000 13759 ns/op
func BenchmarkGuard(b *testing.B) {
for i := 0; i < 100; i++ {
for n := 0; n < b.N; n++ {
go Guardian.Guard(func() (interface{}, error) {
time.Sleep(1 * time.Millisecond)
time.Sleep(time.Microsecond)
return 0, nil
}, 0, "1")
go Guardian.Guard(func() (interface{}, error) {
time.Sleep(1 * time.Millisecond)
time.Sleep(time.Microsecond)
return 0, nil
}, 0, "2")
go Guardian.Guard(func() (interface{}, error) {
time.Sleep(1 * time.Millisecond)
time.Sleep(time.Microsecond)
return 0, nil
}, 0, "1")
}
}
// BenchmarkGuardian-8 1000000 5794 ns/op
func BenchmarkGuardian(b *testing.B) {
for i := 0; i < 100; i++ {
for n := 0; n < b.N; n++ {
go Guardian.Guard(func() (interface{}, error) {
time.Sleep(1 * time.Millisecond)
time.Sleep(time.Microsecond)
return 0, nil
}, 0, "1")
}, 0, strconv.Itoa(n))
}
}
// BenchmarkGuardIDs-8 1000000 8732 ns/op
func BenchmarkGuardIDs(b *testing.B) {
for n := 0; n < b.N; n++ {
go func() {
if refID := Guardian.GuardIDs("", 0, strconv.Itoa(n)); refID != "" {
time.Sleep(time.Microsecond)
Guardian.UnguardIDs(refID)
}
}()
}
}

View File

@@ -1585,8 +1585,9 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
// RPC caching
if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1AuthorizeEvent, args.CGREvent.ID)
guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
@@ -1830,8 +1831,9 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
// RPC caching
if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1InitiateSession, args.CGREvent.ID)
guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
@@ -2046,8 +2048,9 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection,
// RPC caching
if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1UpdateSession, args.CGREvent.ID)
guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
@@ -2151,8 +2154,9 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection,
// RPC caching
if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1TerminateSession, args.CGREvent.ID)
guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
@@ -2267,8 +2271,9 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
// RPC caching
if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessCDR, cgrEv.ID)
guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
@@ -2351,8 +2356,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection,
// RPC caching
if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessEvent, args.CGREvent.ID)
guardian.Guardian.GuardIDs(sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(cacheKey)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)