diff --git a/engine/datamanager.go b/engine/datamanager.go index e0a87d42d..1a96d0055 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -165,10 +165,14 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids [] guardian.Guardian.UnguardIDs(lkID) case utils.ThresholdProfilePrefix: tntID := utils.NewTenantID(dataID) + lkID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, thresholdProfileLockKey(tntID.Tenant, tntID.ID)) _, err = dm.GetThresholdProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + guardian.Guardian.UnguardIDs(lkID) case utils.ThresholdPrefix: tntID := utils.NewTenantID(dataID) + lkID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, thresholdLockKey(tntID.Tenant, tntID.ID)) _, err = dm.GetThreshold(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + guardian.Guardian.UnguardIDs(lkID) case utils.FilterPrefix: tntID := utils.NewTenantID(dataID) _, err = dm.GetFilter(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) diff --git a/engine/resources.go b/engine/resources.go index 13c32dd1d..308702139 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -389,6 +389,19 @@ type ResourceService struct { connMgr *ConnManager } +// Reload stops the backupLoop and restarts it +func (rS *ResourceService) Reload(ctx *context.Context) { + close(rS.stopBackup) + <-rS.loopStoped // wait until the loop is done + rS.stopBackup = make(chan struct{}) + go rS.runBackup(ctx) +} + +// StartLoop starts the gorutine with the backup loop +func (rS *ResourceService) StartLoop(ctx *context.Context) { + go rS.runBackup(ctx) +} + // Shutdown is called to shutdown the service func (rS *ResourceService) Shutdown(ctx *context.Context) { utils.Logger.Info(" service shutdown initialized") @@ -940,16 +953,3 @@ func (rS *ResourceService) V1GetResourceWithConfig(ctx *context.Context, arg *ut return } - -// Reload stops the backupLoop and restarts it -func (rS *ResourceService) Reload(ctx *context.Context) { - close(rS.stopBackup) - <-rS.loopStoped // wait until the loop is done - rS.stopBackup = make(chan struct{}) - go rS.runBackup(ctx) -} - -// StartLoop starts the gorutine with the backup loop -func (rS *ResourceService) StartLoop(ctx *context.Context) { - go rS.runBackup(ctx) -} diff --git a/engine/stats.go b/engine/stats.go index f3eda45fd..fe993d272 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -77,7 +77,7 @@ func (sS *StatService) Shutdown(ctx *context.Context) { utils.Logger.Info(" service shutdown complete") } -// runBackup will regularly store resources changed to dataDB +// runBackup will regularly store statQueues changed to dataDB func (sS *StatService) runBackup(ctx *context.Context) { storeInterval := sS.cgrcfg.StatSCfg().StoreInterval if storeInterval <= 0 { @@ -95,7 +95,7 @@ func (sS *StatService) runBackup(ctx *context.Context) { } } -// storeResources represents one task of complete backup +// storeStats represents one task of complete backup func (sS *StatService) storeStats(ctx *context.Context) { var failedSqIDs []string for { // don't stop untill we store all dirty statQueues @@ -156,7 +156,7 @@ func (sS *StatService) StoreStatQueue(ctx *context.Context, sq *StatQueue) (err return } -// matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call +// matchingStatQueuesForEvent returns ordered list of matching statQueues which are active by the time of the call func (sS *StatService) matchingStatQueuesForEvent(ctx *context.Context, tnt string, statsIDs []string, evNm utils.MapStorage) (sqs StatQueues, err error) { sqIDs := utils.NewStringSet(statsIDs) if len(sqIDs) == 0 { @@ -430,7 +430,7 @@ func (sS *StatService) V1GetStatQueue(ctx *context.Context, args *utils.TenantID if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - // make sure resource is locked at process level + // make sure statQueue is locked at process level lkID := guardian.Guardian.GuardIDs(utils.EmptyString, config.CgrConfig().GeneralCfg().LockingTimeout, statQueueLockKey(tnt, args.ID)) @@ -452,7 +452,7 @@ func (sS *StatService) V1GetQueueStringMetrics(ctx *context.Context, args *utils if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - // make sure resource is locked at process level + // make sure statQueue is locked at process level lkID := guardian.Guardian.GuardIDs(utils.EmptyString, config.CgrConfig().GeneralCfg().LockingTimeout, statQueueLockKey(tnt, args.ID)) @@ -481,7 +481,7 @@ func (sS *StatService) V1GetQueueFloatMetrics(ctx *context.Context, args *utils. if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - // make sure resource is locked at process level + // make sure statQueue is locked at process level lkID := guardian.Guardian.GuardIDs(utils.EmptyString, config.CgrConfig().GeneralCfg().LockingTimeout, statQueueLockKey(tnt, args.ID)) @@ -528,7 +528,7 @@ func (sS *StatService) V1ResetStatQueue(ctx *context.Context, tntID *utils.Tenan if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - // make sure resource is locked at process level + // make sure statQueue is locked at process level lkID := guardian.Guardian.GuardIDs(utils.EmptyString, config.CgrConfig().GeneralCfg().LockingTimeout, statQueueLockKey(tnt, tntID.ID)) diff --git a/engine/thresholds.go b/engine/thresholds.go index bd0db22db..de5ab5cd5 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -27,6 +27,7 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" ) @@ -48,6 +49,8 @@ type ThresholdProfile struct { Weight float64 // Weight to sort the thresholds ActionProfileIDs []string Async bool + + lkID string // holds the reference towards guardian lock key } // TenantID returns the concatenated key beteen tenant and ID @@ -55,6 +58,36 @@ func (tp *ThresholdProfile) TenantID() string { return utils.ConcatenatedKey(tp.Tenant, tp.ID) } +// thresholdProfileLockKey returns the ID used to lock a ThresholdProfile with guardian +func thresholdProfileLockKey(tnt, id string) string { + return utils.ConcatenatedKey(utils.CacheThresholdProfiles, tnt, id) +} + +// lock will lock the ThresholdProfile using guardian and store the lock within r.lkID +// if lkID is passed as argument, the lock is considered as executed +func (tp *ThresholdProfile) lock(lkID string) { + if lkID == utils.EmptyString { + lkID = guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdProfileLockKey(tp.Tenant, tp.ID)) + } + tp.lkID = lkID +} + +// unlock will unlock the ThresholdProfile and clear rp.lkID +func (tp *ThresholdProfile) unlock() { + if tp.lkID == utils.EmptyString { + return + } + guardian.Guardian.UnguardIDs(tp.lkID) + tp.lkID = utils.EmptyString +} + +// isLocked returns the locks status of this ThresholdProfile +func (tp *ThresholdProfile) isLocked() bool { + return tp.lkID != utils.EmptyString +} + // ThresholdWithAPIOpts is used in replicatorV1 for dispatcher type ThresholdWithAPIOpts struct { *Threshold @@ -68,6 +101,7 @@ type Threshold struct { Hits int // number of hits for this threshold Snooze time.Time // prevent threshold to run too early + lkID string // ID of the lock used when matching the threshold tPrfl *ThresholdProfile dirty *bool // needs save } @@ -77,6 +111,36 @@ func (t *Threshold) TenantID() string { return utils.ConcatenatedKey(t.Tenant, t.ID) } +// thresholdLockKey returns the ID used to lock a threshold with guardian +func thresholdLockKey(tnt, id string) string { + return utils.ConcatenatedKey(utils.CacheThresholds, tnt, id) +} + +// lock will lock the threshold using guardian and store the lock within r.lkID +// if lkID is passed as argument, the lock is considered as executed +func (t *Threshold) lock(lkID string) { + if lkID == utils.EmptyString { + lkID = guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdLockKey(t.Tenant, t.ID)) + } + t.lkID = lkID +} + +// unlock will unlock the threshold and clear r.lkID +func (t *Threshold) unlock() { + if t.lkID == utils.EmptyString { + return + } + guardian.Guardian.UnguardIDs(t.lkID) + t.lkID = utils.EmptyString +} + +// isLocked returns the locks status of this threshold +func (t *Threshold) isLocked() bool { + return t.lkID != utils.EmptyString +} + // processEventWithThreshold processes an ThresholdEvent func processEventWithThreshold(ctx *context.Context, connMgr *ConnManager, actionsConns []string, args *utils.CGREvent, t *Threshold) (err error) { if t.Snooze.After(time.Now()) || // snoozed, not executing actions @@ -125,6 +189,16 @@ func (ts Thresholds) Sort() { sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight }) } +// unlock will unlock thresholds part of this slice +func (ts Thresholds) unlock() { + for _, t := range ts { + t.unlock() + if t.tPrfl != nil { + t.tPrfl.unlock() + } + } +} + // NewThresholdService the constructor for ThresoldS service func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS, connMgr *ConnManager) (tS *ThresholdService) { return &ThresholdService{ @@ -150,6 +224,19 @@ type ThresholdService struct { stMux sync.RWMutex // protects storedTdIDs } +// Reload stops the backupLoop and restarts it +func (tS *ThresholdService) Reload(ctx *context.Context) { + close(tS.stopBackup) + <-tS.loopStoped // wait until the loop is done + tS.stopBackup = make(chan struct{}) + go tS.runBackup(ctx) +} + +// StartLoop starts the gorutine with the backup loop +func (tS *ThresholdService) StartLoop(ctx *context.Context) { + go tS.runBackup(ctx) +} + // Shutdown is called to shutdown the service func (tS *ThresholdService) Shutdown(ctx *context.Context) { utils.Logger.Info(" shutdown initialized") @@ -158,7 +245,7 @@ func (tS *ThresholdService) Shutdown(ctx *context.Context) { utils.Logger.Info(" shutdown complete") } -// backup will regularly store resources changed to dataDB +// backup will regularly store thresholds changed to dataDB func (tS *ThresholdService) runBackup(ctx *context.Context) { storeInterval := tS.cgrcfg.ThresholdSCfg().StoreInterval if storeInterval <= 0 { @@ -179,7 +266,7 @@ func (tS *ThresholdService) runBackup(ctx *context.Context) { // storeThresholds represents one task of complete backup func (tS *ThresholdService) storeThresholds(ctx *context.Context) { var failedTdIDs []string - for { // don't stop until we store all dirty resources + for { // don't stop until we store all dirty thresholds tS.stMux.Lock() tID := tS.storedTdIDs.GetOne() if tID != "" { @@ -189,11 +276,17 @@ func (tS *ThresholdService) storeThresholds(ctx *context.Context) { if tID == "" { break // no more keys, backup completed } - if tIf, ok := Cache.Get(utils.CacheThresholds, tID); !ok || tIf == nil { + tIf, ok := Cache.Get(utils.CacheThresholds, tID) + if !ok || tIf == nil { utils.Logger.Warning(fmt.Sprintf(" failed retrieving from cache treshold with ID: %s", tID)) - } else if err := tS.StoreThreshold(ctx, tIf.(*Threshold)); err != nil { + continue + } + t := tIf.(*Threshold) + t.lock(utils.EmptyString) + if err := tS.StoreThreshold(ctx, t); err != nil { failedTdIDs = append(failedTdIDs, tID) // record failure so we can schedule it for next backup } + t.unlock() // randomize the CPU load and give up thread control runtime.Gosched() } @@ -215,6 +308,16 @@ func (tS *ThresholdService) StoreThreshold(ctx *context.Context, t *Threshold) ( t.Tenant, t.ID, err.Error())) return } + //since we no longer handle cache in DataManager do here a manual caching + if tntID := t.TenantID(); Cache.HasItem(utils.CacheThresholds, tntID) { // only cache if previously there + if err = Cache.Set(ctx, utils.CacheThresholds, tntID, t, nil, + true, utils.NonTransactional); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" failed caching Threshold with ID: %s, error: %s", + t.TenantID(), err.Error())) + return + } + } *t.dirty = false return } @@ -241,26 +344,45 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ctx *context.Context, tnt } ts = make(Thresholds, 0, len(tIDs)) for tID := range tIDs { - tPrfl, err := tS.dm.GetThresholdProfile(ctx, tnt, tID, true, true, utils.NonTransactional) - if err != nil { + lkPrflID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdProfileLockKey(tnt, tID)) + var tPrfl *ThresholdProfile + if tPrfl, err = tS.dm.GetThresholdProfile(ctx, tnt, tID, true, true, utils.NonTransactional); err != nil { + guardian.Guardian.UnguardIDs(lkPrflID) if err == utils.ErrNotFound { + err = nil continue } + ts.unlock() return nil, err } - if pass, err := tS.filterS.Pass(ctx, tnt, tPrfl.FilterIDs, + tPrfl.lock(lkPrflID) + var pass bool + if pass, err = tS.filterS.Pass(ctx, tnt, tPrfl.FilterIDs, evNm); err != nil { + tPrfl.unlock() + ts.unlock() return nil, err } else if !pass { + tPrfl.unlock() continue } - t, err := tS.dm.GetThreshold(ctx, tPrfl.Tenant, tPrfl.ID, true, true, "") - if err != nil { + lkID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdLockKey(tPrfl.Tenant, tPrfl.ID)) + var t *Threshold + if t, err = tS.dm.GetThreshold(ctx, tPrfl.Tenant, tPrfl.ID, true, true, ""); err != nil { + guardian.Guardian.UnguardIDs(lkID) + tPrfl.unlock() if err == utils.ErrNotFound { // corner case where the threshold was removed due to MaxHits + err = nil continue } + ts.unlock() return nil, err } + t.lock(lkID) if t.dirty == nil || tPrfl.MaxHits == -1 || t.Hits < tPrfl.MaxHits { t.dirty = utils.BoolPointer(false) } @@ -273,7 +395,8 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ctx *context.Context, tnt } ts.Sort() for i, t := range ts { - if t.tPrfl.Blocker { // blocker will stop processing + if t.tPrfl.Blocker && i != len(ts)-1 { // blocker will stop processing and we are not at last index + Thresholds(ts[i+1:]).unlock() ts = ts[:i+1] break } @@ -305,10 +428,7 @@ func (attr *ThresholdsArgsProcessEvent) RPCClone() (interface{}, error) { func (attr *ThresholdsArgsProcessEvent) Clone() *ThresholdsArgsProcessEvent { var thIDs []string if attr.ThresholdIDs != nil { - thIDs = make([]string, len(attr.ThresholdIDs)) - for i, id := range attr.ThresholdIDs { - thIDs[i] = id - } + thIDs = utils.CloneStringSlice(attr.ThresholdIDs) } return &ThresholdsArgsProcessEvent{ ThresholdIDs: thIDs, @@ -318,13 +438,13 @@ func (attr *ThresholdsArgsProcessEvent) Clone() *ThresholdsArgsProcessEvent { // processEvent processes a new event, dispatching to matching thresholds func (tS *ThresholdService) processEvent(ctx *context.Context, tnt string, args *ThresholdsArgsProcessEvent) (thresholdsIDs []string, err error) { - matchTS, err := tS.matchingThresholdsForEvent(ctx, tnt, args) - if err != nil { + var matchTs Thresholds + if matchTs, err = tS.matchingThresholdsForEvent(ctx, tnt, args); err != nil { return nil, err } var withErrors bool - thresholdsIDs = make([]string, 0, len(matchTS)) - for _, t := range matchTS { + thresholdsIDs = make([]string, 0, len(matchTs)) + for _, t := range matchTs { thresholdsIDs = append(thresholdsIDs, t.ID) t.Hits++ if err = processEventWithThreshold(ctx, tS.connMgr, @@ -343,11 +463,14 @@ func (tS *ThresholdService) processEvent(ctx *context.Context, tnt string, args withErrors = true } //since we don't handle in DataManager caching we do a manual remove here - if err = tS.dm.CacheDataFromDB(ctx, utils.ThresholdPrefix, []string{t.TenantID()}, true); err != nil { - utils.Logger.Warning( - fmt.Sprintf(" failed removing from cache non-recurrent threshold: %s, error: %s", - t.TenantID(), err.Error())) - withErrors = true + if tntID := t.TenantID(); Cache.HasItem(utils.CacheThresholds, tntID) { // only cache if previously there + if err = Cache.Set(ctx, utils.CacheThresholds, tntID, nil, nil, + true, utils.NonTransactional); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" failed removing from cache non-recurrent threshold: %s, error: %s", + t.TenantID(), err.Error())) + withErrors = true + } } continue } @@ -362,6 +485,7 @@ func (tS *ThresholdService) processEvent(ctx *context.Context, tnt string, args tS.stMux.Unlock() } } + matchTs.unlock() if withErrors { err = utils.ErrPartiallyExecuted } @@ -407,6 +531,7 @@ func (tS *ThresholdService) V1GetThresholdsForEvent(ctx *context.Context, args * var ts Thresholds if ts, err = tS.matchingThresholdsForEvent(ctx, tnt, args); err == nil { *reply = ts + ts.unlock() } return } @@ -436,6 +561,11 @@ func (tS *ThresholdService) V1GetThreshold(ctx *context.Context, tntID *utils.Te if tnt == utils.EmptyString { tnt = tS.cgrcfg.GeneralCfg().DefaultTenant } + // make sure threshold is locked at process level + lkID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdLockKey(tnt, tntID.ID)) + defer guardian.Guardian.UnguardIDs(lkID) if thd, err = tS.dm.GetThreshold(ctx, tnt, tntID.ID, true, true, ""); err != nil { return } @@ -443,19 +573,6 @@ func (tS *ThresholdService) V1GetThreshold(ctx *context.Context, tntID *utils.Te return } -// Reload stops the backupLoop and restarts it -func (tS *ThresholdService) Reload(ctx *context.Context) { - close(tS.stopBackup) - <-tS.loopStoped // wait until the loop is done - tS.stopBackup = make(chan struct{}) - go tS.runBackup(ctx) -} - -// StartLoop starts the gorutine with the backup loop -func (tS *ThresholdService) StartLoop(ctx *context.Context) { - go tS.runBackup(ctx) -} - // V1ResetThreshold resets the threshold hits func (tS *ThresholdService) V1ResetThreshold(ctx *context.Context, tntID *utils.TenantID, rply *string) (err error) { var thd *Threshold @@ -463,6 +580,11 @@ func (tS *ThresholdService) V1ResetThreshold(ctx *context.Context, tntID *utils. if tnt == utils.EmptyString { tnt = tS.cgrcfg.GeneralCfg().DefaultTenant } + // make sure threshold is locked at process level + lkID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdLockKey(tnt, tntID.ID)) + defer guardian.Guardian.UnguardIDs(lkID) if thd, err = tS.dm.GetThreshold(ctx, tnt, tntID.ID, true, true, ""); err != nil { return } diff --git a/engine/thresholds_test.go b/engine/thresholds_test.go index 872b2a1c7..61bf3c606 100644 --- a/engine/thresholds_test.go +++ b/engine/thresholds_test.go @@ -19,6 +19,7 @@ package engine import ( "bytes" + "fmt" "log" "os" "reflect" @@ -26,6 +27,7 @@ import ( "testing" "time" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -358,29 +360,36 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", th, temptTh) } } - if thMatched, err := thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != nil { + thMatched, err := thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[0].Tenant, argsGetThresholds[0]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[0].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[0].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[0].ID, thMatched[0].ID) } else if !reflect.DeepEqual(ths[0].Hits, thMatched[0].Hits) { t.Errorf("Expecting: %+v, received: %+v", ths[0].Hits, thMatched[0].Hits) } - - if thMatched, err := thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != nil { + thMatched, err = thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[1].Tenant, argsGetThresholds[1]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[1].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[1].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[1].ID, thMatched[0].ID) } else if !reflect.DeepEqual(ths[1].Hits, thMatched[0].Hits) { t.Errorf("Expecting: %+v, received: %+v", ths[1].Hits, thMatched[0].Hits) } - - if thMatched, err := thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != nil { + thMatched, err = thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[2].Tenant, argsGetThresholds[2]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[2].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[2].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[2].ID, thMatched[0].ID) @@ -561,21 +570,21 @@ func TestThresholdsProcessEvent(t *testing.T) { } } thIDs := []string{"TH_1"} - if thMatched, err := thServ.processEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != utils.ErrPartiallyExecuted { + if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != utils.ErrPartiallyExecuted { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(thIDs, thMatched) { t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched) } thIDs = []string{"TH_2"} - if thMatched, err := thServ.processEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != utils.ErrPartiallyExecuted { + if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != utils.ErrPartiallyExecuted { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(thIDs, thMatched) { t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched) } thIDs = []string{"TH_3"} - if thMatched, err := thServ.processEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != utils.ErrPartiallyExecuted { + if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != utils.ErrPartiallyExecuted { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(thIDs, thMatched) { t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched) @@ -744,28 +753,31 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) { } } thIDs := []string{"TH_1"} - if thMatched, err := thServ.processEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != utils.ErrPartiallyExecuted { + if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != utils.ErrPartiallyExecuted { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(thIDs, thMatched) { t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched) } thIDs = []string{"TH_2"} - if thMatched, err := thServ.processEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != utils.ErrPartiallyExecuted { + if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != utils.ErrPartiallyExecuted { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(thIDs, thMatched) { t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched) } thIDs = []string{"TH_3"} - if thMatched, err := thServ.processEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != utils.ErrPartiallyExecuted { + if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != utils.ErrPartiallyExecuted { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(thIDs, thMatched) { t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched) } - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != nil { + thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[0].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[0].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[0].ID, thMatched[0].ID) @@ -773,9 +785,12 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) { t.Errorf("Expecting: 1, received: %+v", thMatched[0].Hits) } - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != nil { + thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[1].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[1].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[1].ID, thMatched[0].ID) @@ -783,9 +798,12 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) { t.Errorf("Expecting: 1, received: %+v", thMatched[0].Hits) } - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != nil { + thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[2].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[2].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[2].ID, thMatched[0].ID) @@ -1010,18 +1028,18 @@ func TestThresholdsProcessEvent2(t *testing.T) { } else if !reflect.DeepEqual(thIDs, thMatched) && !reflect.DeepEqual(thIDsRev, thMatched) { t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched) } - - if thMatched, err := thServ.matchingThresholdsForEvent(ev.Tenant, ev); err != nil { - t.Errorf("Error: %+v", err) - } else { - for _, thM := range thMatched { - if !reflect.DeepEqual(thPrf.Tenant, thM.Tenant) { - t.Errorf("Expecting: %+v, received: %+v", thPrf.Tenant, thM.Tenant) - } else if reflect.DeepEqual(thIDs[0], thM.ID) && thM.Hits != 1 { - t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits) - } else if reflect.DeepEqual(thIDs[1], thM.ID) && thM.Hits != 1 { - t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits) - } + thMatched, err := thServ.matchingThresholdsForEvent(ev.Tenant, ev) + if err != nil { + t.Fatalf("Error: %+v", err) + } + thMatched.unlock() + for _, thM := range thMatched { + if !reflect.DeepEqual(thPrf.Tenant, thM.Tenant) { + t.Errorf("Expecting: %+v, received: %+v", thPrf.Tenant, thM.Tenant) + } else if reflect.DeepEqual(thIDs[0], thM.ID) && thM.Hits != 1 { + t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits) + } else if reflect.DeepEqual(thIDs[1], thM.ID) && thM.Hits != 1 { + t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits) } } } @@ -1307,22 +1325,15 @@ func TestThresholdsStoreThresholdsOK(t *testing.T) { dm := NewDataManager(data, cfg.CacheCfg(), nil) tS := NewThresholdService(dm, cfg, nil, nil) - value := &Threshold{ - dirty: utils.BoolPointer(true), - Tenant: "cgrates.org", - ID: "TH1", - Hits: 2, - } - - Cache.SetWithoutReplicate(utils.CacheThresholds, "TH1", value, nil, true, - utils.NonTransactional) - tS.storedTdIDs.Add("TH1") exp := &Threshold{ dirty: utils.BoolPointer(false), Tenant: "cgrates.org", ID: "TH1", Hits: 2, } + Cache.SetWithoutReplicate(utils.CacheThresholds, "cgrates.org:TH1", exp, nil, true, + utils.NonTransactional) + tS.storedTdIDs.Add("cgrates.org:TH1") tS.storeThresholds(context.Background()) if rcv, err := tS.dm.GetThreshold(context.Background(), "cgrates.org", "TH1", true, false, @@ -1333,7 +1344,7 @@ func TestThresholdsStoreThresholdsOK(t *testing.T) { utils.ToJSON(exp), utils.ToJSON(rcv)) } - Cache.Remove(context.Background(), utils.CacheThresholds, "TH1", true, utils.NonTransactional) + Cache.Remove(context.Background(), utils.CacheThresholds, "cgrates.org:TH1", true, utils.NonTransactional) } func TestThresholdsStoreThresholdsStoreThErr(t *testing.T) { @@ -1544,8 +1555,7 @@ func TestThresholdsProcessEventStoreThOK(t *testing.T) { } else { rcv.tPrfl = nil rcv.dirty = nil - var snooze time.Time - rcv.Snooze = snooze + rcv.Snooze = time.Time{} if !reflect.DeepEqual(rcv, exp) { t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) } @@ -1553,73 +1563,86 @@ func TestThresholdsProcessEventStoreThOK(t *testing.T) { } -// func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) { -// utils.Logger.SetLogLevel(4) -// utils.Logger.SetSyslog(nil) +func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) { + utils.Logger.SetLogLevel(4) + utils.Logger.SetSyslog(nil) -// var buf bytes.Buffer -// log.SetOutput(&buf) -// defer func() { -// log.SetOutput(os.Stderr) -// }() + var buf bytes.Buffer + log.SetOutput(&buf) + tmp := config.CgrConfig() + tmpC := Cache + tmpCMgr := connMgr -// cfg := config.NewDefaultCGRConfig() -// data := NewInternalDB(nil, nil, true) -// dm := NewDataManager(data, cfg.CacheCfg(), nil) -// filterS := NewFilterS(cfg, nil, dm) -// tS := NewThresholdService(nil, cfg, filterS, nil) + cfg := config.NewDefaultCGRConfig() + cfg.RPCConns()["test"] = &config.RPCConn{Conns: []*config.RemoteHost{{}}} + cfg.CacheCfg().ReplicationConns = []string{"test"} + cfg.CacheCfg().Partitions[utils.CacheThresholds].Replicate = true + config.SetCgrConfig(cfg) + data := NewInternalDB(nil, nil, true) + connMgr = NewConnManager(cfg, make(map[string]chan birpc.ClientConnector)) + dm := NewDataManager(data, cfg.CacheCfg(), connMgr) + filterS := NewFilterS(cfg, nil, dm) + tS := NewThresholdService(nil, cfg, filterS, connMgr) + Cache = NewCacheS(cfg, dm, nil) -// thPrf := &ThresholdProfile{ -// Tenant: "cgrates.org", -// ID: "TH3", -// FilterIDs: []string{"*string:~*req.Account:1001"}, -// MinHits: 2, -// MaxHits: 5, -// Weight: 10, -// Blocker: true, -// } -// th := &Threshold{ -// Tenant: "cgrates.org", -// ID: "TH3", -// Hits: 4, -// tPrfl: thPrf, -// } + defer func() { + connMgr = tmpCMgr + Cache = tmpC + config.SetCgrConfig(tmp) + log.SetOutput(os.Stderr) + }() + thPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: "TH3", + FilterIDs: []string{"*string:~*req.Account:1001"}, + MinHits: 2, + MaxHits: 5, + Weight: 10, + ActionProfileIDs: []string{utils.MetaNone}, + Blocker: true, + } + th := &Threshold{ + Tenant: "cgrates.org", + ID: "TH3", + Hits: 4, + tPrfl: thPrf, + } -// if err := dm.SetThresholdProfile(context.Background(), thPrf, true); err != nil { -// t.Error(err) -// } -// if err := dm.SetThreshold(context.Background(), th); err != nil { -// t.Error(err) -// } + if err := dm.SetThresholdProfile(context.Background(), thPrf, true); err != nil { + t.Error(err) + } + if err := dm.SetThreshold(context.Background(), th); err != nil { + t.Error(err) + } -// args := &ThresholdsArgsProcessEvent{ -// ThresholdIDs: []string{"TH3"}, -// CGREvent: &utils.CGREvent{ -// Tenant: "cgrates.org", -// ID: "ThdProcessEvent", -// Event: map[string]interface{}{ -// utils.AccountField: "1001", -// }, -// }, -// } + args := &ThresholdsArgsProcessEvent{ + ThresholdIDs: []string{"TH3"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "ThdProcessEvent", + Event: map[string]interface{}{ + utils.AccountField: "1001", + }, + }, + } -// expLog1 := `[WARNING] failed removing from database non-recurrent threshold: cgrates.org:TH3, error: NO_DATABASE_CONNECTION` -// expLog2 := `[WARNING] failed removing from cache non-recurrent threshold: cgrates.org:TH3, error: NO_DATABASE_CONNECTION` + expLog1 := `[WARNING] failed removing from database non-recurrent threshold: cgrates.org:TH3, error: NO_DATABASE_CONNECTION` + expLog2 := `[WARNING] failed removing from cache non-recurrent threshold: cgrates.org:TH3, error: DISCONNECTED` -// if _, err := tS.processEvent(context.Background(), args.Tenant, args); err == nil || -// err != utils.ErrPartiallyExecuted { -// t.Errorf("expected: <%+v>, \nreceived: <%+v>", -// utils.ErrPartiallyExecuted, err) -// } + if _, err := tS.processEvent(context.Background(), args.Tenant, args); err == nil || + err != utils.ErrPartiallyExecuted { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", + utils.ErrPartiallyExecuted, err) + } -// if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog1) || -// !strings.Contains(rcvLog, expLog2) { -// t.Errorf("expected logs <%+v> and <%+v> to be included in: <%+v>", -// expLog1, expLog2, rcvLog) -// } + if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog1) || + !strings.Contains(rcvLog, expLog2) { + t.Errorf("expected logs <%+v> and <%+v> to be included in: <%+v>", + expLog1, expLog2, rcvLog) + } -// utils.Logger.SetLogLevel(0) -// } + utils.Logger.SetLogLevel(0) +} func TestThresholdsProcessEventNotFound(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -1965,3 +1988,336 @@ func TestThresholdsV1GetThresholdNotFoundErr(t *testing.T) { t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrNotFound, err) } } + +func TestThresholdMatchingThresholdForEventLocks(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmp := Cache + defer func() { Cache = tmp }() + Cache = NewCacheS(cfg, nil, nil) + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}, nil) + + prfs := make([]*ThresholdProfile, 0) + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TH%d", i), + Weight: 20.00, + MaxHits: 5, + } + dm.SetThresholdProfile(context.Background(), rPrf, true) + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + } + dm.RemoveThreshold(context.Background(), "cgrates.org", "TH1") + mth, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + if err != nil { + t.Errorf("Error: %+v", err) + } + defer mth.unlock() + for _, rPrf := range prfs { + if rPrf.ID == "TH1" { + if rPrf.isLocked() { + t.Fatalf("Expected profile to not be locked %q", rPrf.ID) + } + continue + } + if !rPrf.isLocked() { + t.Fatalf("Expected profile to be locked %q", rPrf.ID) + } + if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if !r.isLocked() { + t.Fatalf("Expected Threshold to be locked %q", rPrf.ID) + } + } + +} + +func TestThresholdMatchingThresholdForEventLocks2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmp := Cache + defer func() { Cache = tmp }() + Cache = NewCacheS(cfg, nil, nil) + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}, nil) + + prfs := make([]*ThresholdProfile, 0) + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TH%d", i), + Weight: 20.00, + MaxHits: 5, + } + dm.SetThresholdProfile(context.Background(), rPrf, true) + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + } + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: "TH20", + FilterIDs: []string{"FLTR_RES_201"}, + Weight: 20.00, + MaxHits: 5, + } + err = db.SetThresholdProfileDrv(context.Background(), rPrf) + if err != nil { + t.Fatal(err) + } + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + _, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + expErr := utils.ErrPrefixNotFound(rPrf.FilterIDs[0]) + if err == nil || err.Error() != expErr.Error() { + t.Errorf("Expected error: %s ,received: %+v", expErr, err) + } + for _, rPrf := range prfs { + if rPrf.isLocked() { + t.Fatalf("Expected profile to not be locked %q", rPrf.ID) + } + if rPrf.ID == "TH20" { + continue + } + if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if r.isLocked() { + t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID) + } + } +} + +func TestThresholdMatchingThresholdForEventLocksBlocker(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmp := Cache + defer func() { Cache = tmp }() + Cache = NewCacheS(cfg, nil, nil) + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}, nil) + + prfs := make([]*ThresholdProfile, 0) + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TH%d", i), + Weight: float64(10 - i), + Blocker: i == 4, + MaxHits: 5, + } + dm.SetThresholdProfile(context.Background(), rPrf, true) + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + } + mres, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + if err != nil { + t.Errorf("Error: %+v", err) + } + defer mres.unlock() + if len(mres) != 5 { + t.Fatal("Expected 6 Thresholds") + } + for _, rPrf := range prfs[5:] { + if rPrf.isLocked() { + t.Errorf("Expected profile to not be locked %q", rPrf.ID) + } + if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if r.isLocked() { + t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID) + } + } + for _, rPrf := range prfs[:5] { + if !rPrf.isLocked() { + t.Errorf("Expected profile to be locked %q", rPrf.ID) + } + if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if !r.isLocked() { + t.Fatalf("Expected Threshold to be locked %q", rPrf.ID) + } + } +} + +func TestThresholdMatchingThresholdForEventLocks3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + prfs := make([]*ThresholdProfile, 0) + tmp := Cache + defer func() { Cache = tmp }() + Cache = NewCacheS(cfg, nil, nil) + db := &DataDBMock{ + GetThresholdProfileDrvF: func(ctx *context.Context, tnt, id string) (*ThresholdProfile, error) { + if id == "TH1" { + return nil, utils.ErrNotImplemented + } + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: id, + MaxHits: 5, + Weight: 20.00, + } + Cache.Set(ctx, utils.CacheThresholds, rPrf.TenantID(), &Threshold{ + Tenant: rPrf.Tenant, + ID: rPrf.ID, + }, nil, true, utils.NonTransactional) + prfs = append(prfs, rPrf) + return rPrf, nil + }, + } + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}, nil) + + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + ids.Add(fmt.Sprintf("TH%d", i)) + } + _, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + if err != utils.ErrNotImplemented { + t.Fatalf("Error: %+v", err) + } + for _, rPrf := range prfs { + if rPrf.isLocked() { + t.Fatalf("Expected profile to not be locked %q", rPrf.ID) + } + } + +} + +func TestThresholdMatchingThresholdForEventLocks4(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmp := Cache + defer func() { Cache = tmp }() + Cache = NewCacheS(cfg, nil, nil) + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}, nil) + + prfs := make([]*ThresholdProfile, 0) + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TH%d", i), + Weight: 20.00, + MaxHits: 5, + } + dm.SetThresholdProfile(context.Background(), rPrf, true) + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + } + ids.Add("TH20") + mres, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + if err != nil { + t.Errorf("Error: %+v", err) + } + defer mres.unlock() + for _, rPrf := range prfs { + if !rPrf.isLocked() { + t.Errorf("Expected profile to be locked %q", rPrf.ID) + } + if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if !r.isLocked() { + t.Fatalf("Expected Threshold to be locked %q", rPrf.ID) + } + } + +} + +func TestThresholdMatchingThresholdForEventLocks5(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmp := Cache + tmpC := config.CgrConfig() + defer func() { + Cache = tmp + config.SetCgrConfig(tmpC) + }() + Cache = NewCacheS(cfg, nil, nil) + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), NewConnManager(cfg, make(map[string]chan birpc.ClientConnector))) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + cfg.RPCConns()["test"] = &config.RPCConn{Conns: []*config.RemoteHost{{}}} + cfg.DataDbCfg().RmtConns = []string{"test"} + cfg.DataDbCfg().Items[utils.CacheThresholds].Remote = true + config.SetCgrConfig(cfg) + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}, nil) + + prfs := make([]*ThresholdProfile, 0) + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TH%d", i), + Weight: 20.00, + MaxHits: 5, + } + dm.SetThresholdProfile(context.Background(), rPrf, true) + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + } + dm.RemoveThreshold(context.Background(), "cgrates.org", "TH1") + _, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + if err != utils.ErrDisconnected { + t.Errorf("Error: %+v", err) + } + for _, rPrf := range prfs { + if rPrf.isLocked() { + t.Fatalf("Expected profile to not be locked %q", rPrf.ID) + } + if rPrf.ID == "TH1" { + continue + } + if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if r.isLocked() { + t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID) + } + } + +}