diff --git a/engine/datamanager.go b/engine/datamanager.go index 4699c7022..74e1fb642 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -184,10 +184,14 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b _, err = dm.GetTiming(dataID, true, utils.NonTransactional) case utils.ThresholdProfilePrefix: tntID := utils.NewTenantID(dataID) + lkID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, thresholdProfileLockKey(tntID.Tenant, tntID.ID)) _, err = dm.GetThresholdProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + guardian.Guardian.UnguardIDs(lkID) case utils.ThresholdPrefix: tntID := utils.NewTenantID(dataID) + lkID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, thresholdLockKey(tntID.Tenant, tntID.ID)) _, err = dm.GetThreshold(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + guardian.Guardian.UnguardIDs(lkID) case utils.FilterPrefix: tntID := utils.NewTenantID(dataID) _, err = dm.GetFilter(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) diff --git a/engine/resources.go b/engine/resources.go index 63842b8f0..558a3e514 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -389,6 +389,19 @@ type ResourceService struct { connMgr *ConnManager } +// Reload stops the backupLoop and restarts it +func (rS *ResourceService) Reload() { + close(rS.stopBackup) + <-rS.loopStoped // wait until the loop is done + rS.stopBackup = make(chan struct{}) + go rS.runBackup() +} + +// StartLoop starts the gorutine with the backup loop +func (rS *ResourceService) StartLoop() { + go rS.runBackup() +} + // Shutdown is called to shutdown the service func (rS *ResourceService) Shutdown() { utils.Logger.Info(" service shutdown initialized") @@ -947,16 +960,3 @@ func (rS *ResourceService) V1GetResourceWithConfig(arg *utils.TenantIDWithAPIOpt return } - -// Reload stops the backupLoop and restarts it -func (rS *ResourceService) Reload() { - close(rS.stopBackup) - <-rS.loopStoped // wait until the loop is done - rS.stopBackup = make(chan struct{}) - go rS.runBackup() -} - -// StartLoop starts the gorutine with the backup loop -func (rS *ResourceService) StartLoop() { - go rS.runBackup() -} diff --git a/engine/stats.go b/engine/stats.go index 02f90c99d..21755a4ed 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -76,7 +76,7 @@ func (sS *StatService) Shutdown() { utils.Logger.Info(" service shutdown complete") } -// runBackup will regularly store resources changed to dataDB +// runBackup will regularly store statQueues changed to dataDB func (sS *StatService) runBackup() { storeInterval := sS.cgrcfg.StatSCfg().StoreInterval if storeInterval <= 0 { @@ -94,7 +94,7 @@ func (sS *StatService) runBackup() { } } -// storeResources represents one task of complete backup +// storestatQueues represents one task of complete backup func (sS *StatService) storeStats() { var failedSqIDs []string for { // don't stop untill we store all dirty statQueues @@ -155,7 +155,7 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { return } -// matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call +// matchingStatQueuesForEvent returns ordered list of matching statQueues which are active by the time of the call func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string, actTime *time.Time, evNm utils.MapStorage) (sqs StatQueues, err error) { sqIDs := utils.NewStringSet(statsIDs) if len(sqIDs) == 0 { @@ -440,7 +440,7 @@ func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithAPIOpts, reply *St if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - // make sure resource is locked at process level + // make sure statQueue is locked at process level lkID := guardian.Guardian.GuardIDs(utils.EmptyString, config.CgrConfig().GeneralCfg().LockingTimeout, statQueueLockKey(tnt, args.ID)) @@ -462,7 +462,7 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[ if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - // make sure resource is locked at process level + // make sure statQueue is locked at process level lkID := guardian.Guardian.GuardIDs(utils.EmptyString, config.CgrConfig().GeneralCfg().LockingTimeout, statQueueLockKey(tnt, args.ID)) @@ -491,7 +491,7 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - // make sure resource is locked at process level + // make sure statQueue is locked at process level lkID := guardian.Guardian.GuardIDs(utils.EmptyString, config.CgrConfig().GeneralCfg().LockingTimeout, statQueueLockKey(tnt, args.ID)) @@ -538,7 +538,7 @@ func (sS *StatService) V1ResetStatQueue(tntID *utils.TenantID, rply *string) (er if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - // make sure resource is locked at process level + // make sure statQueue is locked at process level lkID := guardian.Guardian.GuardIDs(utils.EmptyString, config.CgrConfig().GeneralCfg().LockingTimeout, statQueueLockKey(tnt, tntID.ID)) diff --git a/engine/thresholds.go b/engine/thresholds.go index 7d9a735be..5c075d545 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -26,6 +26,7 @@ import ( "time" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" ) @@ -48,6 +49,8 @@ type ThresholdProfile struct { Weight float64 // Weight to sort the thresholds ActionIDs []string Async bool + + lkID string // holds the reference towards guardian lock key } // TenantID returns the concatenated key beteen tenant and ID @@ -55,6 +58,36 @@ func (tp *ThresholdProfile) TenantID() string { return utils.ConcatenatedKey(tp.Tenant, tp.ID) } +// thresholdProfileLockKey returns the ID used to lock a ThresholdProfile with guardian +func thresholdProfileLockKey(tnt, id string) string { + return utils.ConcatenatedKey(utils.CacheThresholdProfiles, tnt, id) +} + +// lock will lock the ThresholdProfile using guardian and store the lock within r.lkID +// if lkID is passed as argument, the lock is considered as executed +func (tp *ThresholdProfile) lock(lkID string) { + if lkID == utils.EmptyString { + lkID = guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdProfileLockKey(tp.Tenant, tp.ID)) + } + tp.lkID = lkID +} + +// unlock will unlock the ThresholdProfile and clear rp.lkID +func (tp *ThresholdProfile) unlock() { + if tp.lkID == utils.EmptyString { + return + } + guardian.Guardian.UnguardIDs(tp.lkID) + tp.lkID = utils.EmptyString +} + +// isLocked returns the locks status of this ThresholdProfile +func (tp *ThresholdProfile) isLocked() bool { + return tp.lkID != utils.EmptyString +} + // ThresholdWithAPIOpts is used in replicatorV1 for dispatcher type ThresholdWithAPIOpts struct { *Threshold @@ -68,6 +101,7 @@ type Threshold struct { Hits int // number of hits for this threshold Snooze time.Time // prevent threshold to run too early + lkID string // ID of the lock used when matching the threshold tPrfl *ThresholdProfile dirty *bool // needs save } @@ -77,6 +111,36 @@ func (t *Threshold) TenantID() string { return utils.ConcatenatedKey(t.Tenant, t.ID) } +// thresholdLockKey returns the ID used to lock a threshold with guardian +func thresholdLockKey(tnt, id string) string { + return utils.ConcatenatedKey(utils.CacheThresholds, tnt, id) +} + +// lock will lock the threshold using guardian and store the lock within r.lkID +// if lkID is passed as argument, the lock is considered as executed +func (t *Threshold) lock(lkID string) { + if lkID == utils.EmptyString { + lkID = guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdLockKey(t.Tenant, t.ID)) + } + t.lkID = lkID +} + +// unlock will unlock the threshold and clear r.lkID +func (t *Threshold) unlock() { + if t.lkID == utils.EmptyString { + return + } + guardian.Guardian.UnguardIDs(t.lkID) + t.lkID = utils.EmptyString +} + +// isLocked returns the locks status of this threshold +func (t *Threshold) isLocked() bool { + return t.lkID != utils.EmptyString +} + // ProcessEvent processes an ThresholdEvent // concurrentActions limits the number of simultaneous action sets executed func (t *Threshold) ProcessEvent(args *ThresholdsArgsProcessEvent, dm *DataManager) (err error) { @@ -128,9 +192,20 @@ func (ts Thresholds) Sort() { sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight }) } +// unlock will unlock thresholds part of this slice +func (ts Thresholds) unlock() { + for _, t := range ts { + t.unlock() + if t.tPrfl != nil { + t.tPrfl.unlock() + } + } +} + // NewThresholdService the constructor for ThresoldS service -func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) (tS *ThresholdService) { - return &ThresholdService{dm: dm, +func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) *ThresholdService { + return &ThresholdService{ + dm: dm, cgrcfg: cgrcfg, filterS: filterS, stopBackup: make(chan struct{}), @@ -150,6 +225,19 @@ type ThresholdService struct { stMux sync.RWMutex // protects storedTdIDs } +// Reload stops the backupLoop and restarts it +func (tS *ThresholdService) Reload() { + close(tS.stopBackup) + <-tS.loopStoped // wait until the loop is done + tS.stopBackup = make(chan struct{}) + go tS.runBackup() +} + +// StartLoop starts the gorutine with the backup loop +func (tS *ThresholdService) StartLoop() { + go tS.runBackup() +} + // Shutdown is called to shutdown the service func (tS *ThresholdService) Shutdown() { utils.Logger.Info(" shutdown initialized") @@ -158,7 +246,7 @@ func (tS *ThresholdService) Shutdown() { utils.Logger.Info(" shutdown complete") } -// backup will regularly store resources changed to dataDB +// backup will regularly store thresholds changed to dataDB func (tS *ThresholdService) runBackup() { storeInterval := tS.cgrcfg.ThresholdSCfg().StoreInterval if storeInterval <= 0 { @@ -179,7 +267,7 @@ func (tS *ThresholdService) runBackup() { // storeThresholds represents one task of complete backup func (tS *ThresholdService) storeThresholds() { var failedTdIDs []string - for { // don't stop until we store all dirty resources + for { // don't stop until we store all dirty thresholds tS.stMux.Lock() tID := tS.storedTdIDs.GetOne() if tID != "" { @@ -189,11 +277,17 @@ func (tS *ThresholdService) storeThresholds() { if tID == "" { break // no more keys, backup completed } - if tIf, ok := Cache.Get(utils.CacheThresholds, tID); !ok || tIf == nil { + tIf, ok := Cache.Get(utils.CacheThresholds, tID) + if !ok || tIf == nil { utils.Logger.Warning(fmt.Sprintf(" failed retrieving from cache treshold with ID: %s", tID)) - } else if err := tS.StoreThreshold(tIf.(*Threshold)); err != nil { + continue + } + t := tIf.(*Threshold) + t.lock(utils.EmptyString) + if err := tS.StoreThreshold(t); err != nil { failedTdIDs = append(failedTdIDs, tID) // record failure so we can schedule it for next backup } + t.unlock() // randomize the CPU load and give up thread control runtime.Gosched() } @@ -215,6 +309,16 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) { t.Tenant, t.ID, err.Error())) return } + //since we no longer handle cache in DataManager do here a manual caching + if tntID := t.TenantID(); Cache.HasItem(utils.CacheThresholds, tntID) { // only cache if previously there + if err = Cache.Set(utils.CacheThresholds, tntID, t, nil, + true, utils.NonTransactional); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" failed caching Threshold with ID: %s, error: %s", + t.TenantID(), err.Error())) + return + } + } *t.dirty = false return } @@ -241,30 +345,50 @@ func (tS *ThresholdService) matchingThresholdsForEvent(tnt string, args *Thresho } ts = make(Thresholds, 0, len(tIDs)) for tID := range tIDs { - tPrfl, err := tS.dm.GetThresholdProfile(tnt, tID, true, true, utils.NonTransactional) - if err != nil { + lkPrflID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdProfileLockKey(tnt, tID)) + var tPrfl *ThresholdProfile + if tPrfl, err = tS.dm.GetThresholdProfile(tnt, tID, true, true, utils.NonTransactional); err != nil { + guardian.Guardian.UnguardIDs(lkPrflID) if err == utils.ErrNotFound { + err = nil continue } + ts.unlock() return nil, err } + tPrfl.lock(lkPrflID) if tPrfl.ActivationInterval != nil && args.Time != nil && !tPrfl.ActivationInterval.IsActiveAtTime(*args.Time) { // not active + tPrfl.unlock() continue } - if pass, err := tS.filterS.Pass(tnt, tPrfl.FilterIDs, + var pass bool + if pass, err = tS.filterS.Pass(tnt, tPrfl.FilterIDs, evNm); err != nil { + tPrfl.unlock() + ts.unlock() return nil, err } else if !pass { + tPrfl.unlock() continue } - t, err := tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, true, true, "") - if err != nil { + lkID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdLockKey(tPrfl.Tenant, tPrfl.ID)) + var t *Threshold + if t, err = tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, true, true, ""); err != nil { + guardian.Guardian.UnguardIDs(lkID) + tPrfl.unlock() if err == utils.ErrNotFound { // corner case where the threshold was removed due to MaxHits + err = nil continue } + ts.unlock() return nil, err } + t.lock(lkID) if t.dirty == nil || tPrfl.MaxHits == -1 || t.Hits < tPrfl.MaxHits { t.dirty = utils.BoolPointer(false) } @@ -277,7 +401,8 @@ func (tS *ThresholdService) matchingThresholdsForEvent(tnt string, args *Thresho } ts.Sort() for i, t := range ts { - if t.tPrfl.Blocker { // blocker will stop processing + if t.tPrfl.Blocker && i != len(ts)-1 { // blocker will stop processing and we are not at last index + Thresholds(ts[i+1:]).unlock() ts = ts[:i+1] break } @@ -309,10 +434,7 @@ func (attr *ThresholdsArgsProcessEvent) RPCClone() (interface{}, error) { func (attr *ThresholdsArgsProcessEvent) Clone() *ThresholdsArgsProcessEvent { var thIDs []string if attr.ThresholdIDs != nil { - thIDs = make([]string, len(attr.ThresholdIDs)) - for i, id := range attr.ThresholdIDs { - thIDs[i] = id - } + thIDs = utils.CloneStringSlice(attr.ThresholdIDs) } return &ThresholdsArgsProcessEvent{ ThresholdIDs: thIDs, @@ -322,8 +444,8 @@ func (attr *ThresholdsArgsProcessEvent) Clone() *ThresholdsArgsProcessEvent { // processEvent processes a new event, dispatching to matching thresholds func (tS *ThresholdService) processEvent(tnt string, args *ThresholdsArgsProcessEvent) (thresholdsIDs []string, err error) { - matchTs, err := tS.matchingThresholdsForEvent(tnt, args) - if err != nil { + var matchTs Thresholds + if matchTs, err = tS.matchingThresholdsForEvent(tnt, args); err != nil { return nil, err } var withErrors bool @@ -331,8 +453,7 @@ func (tS *ThresholdService) processEvent(tnt string, args *ThresholdsArgsProcess for _, t := range matchTs { thresholdsIDs = append(thresholdsIDs, t.ID) t.Hits++ - err = t.ProcessEvent(args, tS.dm) - if err != nil { + if err = t.ProcessEvent(args, tS.dm); err != nil { utils.Logger.Warning( fmt.Sprintf(" threshold: %s, ignoring event: %s, error: %s", t.TenantID(), utils.ConcatenatedKey(tnt, args.CGREvent.ID), err.Error())) @@ -347,11 +468,14 @@ func (tS *ThresholdService) processEvent(tnt string, args *ThresholdsArgsProcess withErrors = true } //since we don't handle in DataManager caching we do a manual remove here - if err = tS.dm.CacheDataFromDB(utils.ThresholdPrefix, []string{t.TenantID()}, true); err != nil { - utils.Logger.Warning( - fmt.Sprintf(" failed removing from cache non-recurrent threshold: %s, error: %s", - t.TenantID(), err.Error())) - withErrors = true + if tntID := t.TenantID(); Cache.HasItem(utils.CacheThresholds, tntID) { // only cache if previously there + if err = Cache.Set(utils.CacheThresholds, tntID, nil, nil, + true, utils.NonTransactional); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" failed removing from cache non-recurrent threshold: %s, error: %s", + t.TenantID(), err.Error())) + withErrors = true + } } continue } @@ -366,6 +490,7 @@ func (tS *ThresholdService) processEvent(tnt string, args *ThresholdsArgsProcess tS.stMux.Unlock() } } + matchTs.unlock() if withErrors { err = utils.ErrPartiallyExecuted } @@ -411,6 +536,7 @@ func (tS *ThresholdService) V1GetThresholdsForEvent(args *ThresholdsArgsProcessE var ts Thresholds if ts, err = tS.matchingThresholdsForEvent(tnt, args); err == nil { *reply = ts + ts.unlock() } return } @@ -440,6 +566,11 @@ func (tS *ThresholdService) V1GetThreshold(tntID *utils.TenantID, t *Threshold) if tnt == utils.EmptyString { tnt = tS.cgrcfg.GeneralCfg().DefaultTenant } + // make sure threshold is locked at process level + lkID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdLockKey(tnt, tntID.ID)) + defer guardian.Guardian.UnguardIDs(lkID) if thd, err = tS.dm.GetThreshold(tnt, tntID.ID, true, true, ""); err != nil { return } @@ -447,19 +578,6 @@ func (tS *ThresholdService) V1GetThreshold(tntID *utils.TenantID, t *Threshold) return } -// Reload stops the backupLoop and restarts it -func (tS *ThresholdService) Reload() { - close(tS.stopBackup) - <-tS.loopStoped // wait until the loop is done - tS.stopBackup = make(chan struct{}) - go tS.runBackup() -} - -// StartLoop starts the gorutine with the backup loop -func (tS *ThresholdService) StartLoop() { - go tS.runBackup() -} - // V1ResetThreshold resets the threshold hits func (tS *ThresholdService) V1ResetThreshold(tntID *utils.TenantID, rply *string) (err error) { var thd *Threshold @@ -467,6 +585,11 @@ func (tS *ThresholdService) V1ResetThreshold(tntID *utils.TenantID, rply *string if tnt == utils.EmptyString { tnt = tS.cgrcfg.GeneralCfg().DefaultTenant } + // make sure threshold is locked at process level + lkID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, + thresholdLockKey(tnt, tntID.ID)) + defer guardian.Guardian.UnguardIDs(lkID) if thd, err = tS.dm.GetThreshold(tnt, tntID.ID, true, true, ""); err != nil { return } diff --git a/engine/thresholds_test.go b/engine/thresholds_test.go index a9ad1e77e..1dd1bedb6 100644 --- a/engine/thresholds_test.go +++ b/engine/thresholds_test.go @@ -19,6 +19,7 @@ package engine import ( "bytes" + "fmt" "log" "os" "reflect" @@ -29,6 +30,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func TestThresholdsSort(t *testing.T) { @@ -376,29 +378,36 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", th, temptTh) } } - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != nil { + thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[0].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[0].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[0].ID, thMatched[0].ID) } else if !reflect.DeepEqual(ths[0].Hits, thMatched[0].Hits) { t.Errorf("Expecting: %+v, received: %+v", ths[0].Hits, thMatched[0].Hits) } - - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != nil { + thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[1].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[1].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[1].ID, thMatched[0].ID) } else if !reflect.DeepEqual(ths[1].Hits, thMatched[0].Hits) { t.Errorf("Expecting: %+v, received: %+v", ths[1].Hits, thMatched[0].Hits) } - - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != nil { + thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[2].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[2].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[2].ID, thMatched[0].ID) @@ -798,9 +807,12 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) { } else if !reflect.DeepEqual(thIDs, thMatched) { t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched) } - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != nil { + thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[0].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[0].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[0].ID, thMatched[0].ID) @@ -808,9 +820,12 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) { t.Errorf("Expecting: 1, received: %+v", thMatched[0].Hits) } - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != nil { + thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[1].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[1].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[1].ID, thMatched[0].ID) @@ -818,9 +833,12 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) { t.Errorf("Expecting: 1, received: %+v", thMatched[0].Hits) } - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != nil { + thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]) + if err != nil { t.Errorf("Error: %+v", err) - } else if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) { + } + thMatched.unlock() + if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[2].Tenant, thMatched[0].Tenant) } else if !reflect.DeepEqual(ths[2].ID, thMatched[0].ID) { t.Errorf("Expecting: %+v, received: %+v", ths[2].ID, thMatched[0].ID) @@ -1058,18 +1076,18 @@ func TestThresholdsProcessEvent2(t *testing.T) { } else if !reflect.DeepEqual(thIDs, thMatched) && !reflect.DeepEqual(thIDsRev, thMatched) { t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched) } - - if thMatched, err := thServ.matchingThresholdsForEvent(ev.Tenant, ev); err != nil { - t.Errorf("Error: %+v", err) - } else { - for _, thM := range thMatched { - if !reflect.DeepEqual(thPrf.Tenant, thM.Tenant) { - t.Errorf("Expecting: %+v, received: %+v", thPrf.Tenant, thM.Tenant) - } else if reflect.DeepEqual(thIDs[0], thM.ID) && thM.Hits != 1 { - t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits) - } else if reflect.DeepEqual(thIDs[1], thM.ID) && thM.Hits != 1 { - t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits) - } + thMatched, err := thServ.matchingThresholdsForEvent(ev.Tenant, ev) + if err != nil { + t.Fatalf("Error: %+v", err) + } + thMatched.unlock() + for _, thM := range thMatched { + if !reflect.DeepEqual(thPrf.Tenant, thM.Tenant) { + t.Errorf("Expecting: %+v, received: %+v", thPrf.Tenant, thM.Tenant) + } else if reflect.DeepEqual(thIDs[0], thM.ID) && thM.Hits != 1 { + t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits) + } else if reflect.DeepEqual(thIDs[1], thM.ID) && thM.Hits != 1 { + t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits) } } } @@ -1355,22 +1373,15 @@ func TestThresholdsStoreThresholdsOK(t *testing.T) { dm := NewDataManager(data, cfg.CacheCfg(), nil) tS := NewThresholdService(dm, cfg, nil) - value := &Threshold{ - dirty: utils.BoolPointer(true), - Tenant: "cgrates.org", - ID: "TH1", - Hits: 2, - } - - Cache.SetWithoutReplicate(utils.CacheThresholds, "TH1", value, nil, true, - utils.NonTransactional) - tS.storedTdIDs.Add("TH1") exp := &Threshold{ dirty: utils.BoolPointer(false), Tenant: "cgrates.org", ID: "TH1", Hits: 2, } + Cache.SetWithoutReplicate(utils.CacheThresholds, "cgrates.org:TH1", exp, nil, true, + utils.NonTransactional) + tS.storedTdIDs.Add("cgrates.org:TH1") tS.storeThresholds() if rcv, err := tS.dm.GetThreshold("cgrates.org", "TH1", true, false, @@ -1381,7 +1392,7 @@ func TestThresholdsStoreThresholdsOK(t *testing.T) { utils.ToJSON(exp), utils.ToJSON(rcv)) } - Cache.Remove(utils.CacheThresholds, "TH1", true, utils.NonTransactional) + Cache.Remove(utils.CacheThresholds, "cgrates.org:TH1", true, utils.NonTransactional) } func TestThresholdsStoreThresholdsStoreThErr(t *testing.T) { @@ -1614,13 +1625,10 @@ func TestThresholdsProcessEventStoreThOK(t *testing.T) { Tenant: "cgrates.org", ID: "TH2", FilterIDs: []string{"*string:~*req.Account:1001"}, - ActivationInterval: &utils.ActivationInterval{ - ExpiryTime: time.Date(2021, 6, 1, 12, 0, 0, 0, time.UTC), - }, - MinHits: 2, - MaxHits: 5, - Weight: 10, - Blocker: true, + MinHits: 2, + MaxHits: 5, + Weight: 10, + Blocker: true, } if err := dm.SetThresholdProfile(thPrf, true); err != nil { @@ -1653,8 +1661,7 @@ func TestThresholdsProcessEventStoreThOK(t *testing.T) { } else { rcv.tPrfl = nil rcv.dirty = nil - var snooze time.Time - rcv.Snooze = snooze + rcv.Snooze = time.Time{} if !reflect.DeepEqual(rcv, exp) { t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) } @@ -1668,16 +1675,28 @@ func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) { var buf bytes.Buffer log.SetOutput(&buf) - defer func() { - log.SetOutput(os.Stderr) - }() + tmp := config.CgrConfig() + tmpC := Cache + tmpCMgr := connMgr cfg := config.NewDefaultCGRConfig() + cfg.RPCConns()["test"] = &config.RPCConn{Conns: []*config.RemoteHost{{}}} + cfg.CacheCfg().ReplicationConns = []string{"test"} + cfg.CacheCfg().Partitions[utils.CacheThresholds].Replicate = true + config.SetCgrConfig(cfg) data := NewInternalDB(nil, nil, true) - dm := NewDataManager(data, cfg.CacheCfg(), nil) + connMgr = NewConnManager(cfg, make(map[string]chan rpcclient.ClientConnector)) + dm := NewDataManager(data, cfg.CacheCfg(), connMgr) filterS := NewFilterS(cfg, nil, dm) tS := NewThresholdService(nil, cfg, filterS) + Cache = NewCacheS(cfg, dm, nil) + defer func() { + connMgr = tmpCMgr + Cache = tmpC + config.SetCgrConfig(tmp) + log.SetOutput(os.Stderr) + }() thPrf := &ThresholdProfile{ Tenant: "cgrates.org", ID: "TH3", @@ -1716,7 +1735,7 @@ func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) { } expLog1 := `[WARNING] failed removing from database non-recurrent threshold: cgrates.org:TH3, error: NO_DATABASE_CONNECTION` - expLog2 := `[WARNING] failed removing from cache non-recurrent threshold: cgrates.org:TH3, error: NO_DATABASE_CONNECTION` + expLog2 := `[WARNING] failed removing from cache non-recurrent threshold: cgrates.org:TH3, error: DISCONNECTED` if _, err := tS.processEvent(args.Tenant, args); err == nil || err != utils.ErrPartiallyExecuted { @@ -2081,3 +2100,389 @@ func TestThresholdsV1GetThresholdNotFoundErr(t *testing.T) { t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrNotFound, err) } } + +func TestThresholdMatchingThresholdForEventLocks(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmp := Cache + defer func() { Cache = tmp }() + Cache = NewCacheS(cfg, nil, nil) + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}) + + prfs := make([]*ThresholdProfile, 0) + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TH%d", i), + Weight: 20.00, + MaxHits: 5, + } + dm.SetThresholdProfile(rPrf, true) + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + } + dm.RemoveThreshold("cgrates.org", "TH1") + mth, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + if err != nil { + t.Errorf("Error: %+v", err) + } + defer mth.unlock() + for _, rPrf := range prfs { + if rPrf.ID == "TH1" { + if rPrf.isLocked() { + t.Fatalf("Expected profile to not be locked %q", rPrf.ID) + } + continue + } + if !rPrf.isLocked() { + t.Fatalf("Expected profile to be locked %q", rPrf.ID) + } + if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if !r.isLocked() { + t.Fatalf("Expected Threshold to be locked %q", rPrf.ID) + } + } + +} + +func TestThresholdMatchingThresholdForEventLocks2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmp := Cache + defer func() { Cache = tmp }() + Cache = NewCacheS(cfg, nil, nil) + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}) + + prfs := make([]*ThresholdProfile, 0) + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TH%d", i), + Weight: 20.00, + MaxHits: 5, + } + dm.SetThresholdProfile(rPrf, true) + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + } + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: "TH20", + FilterIDs: []string{"FLTR_RES_201"}, + Weight: 20.00, + MaxHits: 5, + } + err = db.SetThresholdProfileDrv(rPrf) + if err != nil { + t.Fatal(err) + } + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + _, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + expErr := utils.ErrPrefixNotFound(rPrf.FilterIDs[0]) + if err == nil || err.Error() != expErr.Error() { + t.Errorf("Expected error: %s ,received: %+v", expErr, err) + } + for _, rPrf := range prfs { + if rPrf.isLocked() { + t.Fatalf("Expected profile to not be locked %q", rPrf.ID) + } + if rPrf.ID == "TH20" { + continue + } + if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if r.isLocked() { + t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID) + } + } +} + +func TestThresholdMatchingThresholdForEventLocksBlocker(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmp := Cache + defer func() { Cache = tmp }() + Cache = NewCacheS(cfg, nil, nil) + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}) + + prfs := make([]*ThresholdProfile, 0) + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TH%d", i), + Weight: float64(10 - i), + Blocker: i == 4, + MaxHits: 5, + } + dm.SetThresholdProfile(rPrf, true) + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + } + mres, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + if err != nil { + t.Errorf("Error: %+v", err) + } + defer mres.unlock() + if len(mres) != 5 { + t.Fatal("Expected 6 Thresholds") + } + for _, rPrf := range prfs[5:] { + if rPrf.isLocked() { + t.Errorf("Expected profile to not be locked %q", rPrf.ID) + } + if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if r.isLocked() { + t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID) + } + } + for _, rPrf := range prfs[:5] { + if !rPrf.isLocked() { + t.Errorf("Expected profile to be locked %q", rPrf.ID) + } + if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if !r.isLocked() { + t.Fatalf("Expected Threshold to be locked %q", rPrf.ID) + } + } +} + +func TestThresholdMatchingThresholdForEventLocksActivationInterval(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmp := Cache + defer func() { Cache = tmp }() + Cache = NewCacheS(cfg, nil, nil) + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}) + + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TH%d", i), + MaxHits: 5, + Weight: 20.00, + } + dm.SetThresholdProfile(rPrf, true) + ids.Add(rPrf.ID) + } + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: "TH21", + MaxHits: 5, + Weight: 20.00, + ActivationInterval: &utils.ActivationInterval{ + ExpiryTime: time.Now().Add(-5 * time.Second), + }, + } + dm.SetThresholdProfile(rPrf, true) + ids.Add(rPrf.ID) + mres, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: &utils.CGREvent{Time: utils.TimePointer(time.Now())}, + }) + if err != nil { + t.Errorf("Error: %+v", err) + } + defer mres.unlock() + if rPrf.isLocked() { + t.Fatalf("Expected profile to not be locked %q", rPrf.ID) + } + if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if r.isLocked() { + t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID) + } +} + +func TestThresholdMatchingThresholdForEventLocks3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + prfs := make([]*ThresholdProfile, 0) + tmp := Cache + defer func() { Cache = tmp }() + Cache = NewCacheS(cfg, nil, nil) + db := &DataDBMock{ + GetThresholdProfileDrvF: func(tnt, id string) (*ThresholdProfile, error) { + if id == "TH1" { + return nil, utils.ErrNotImplemented + } + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: id, + MaxHits: 5, + Weight: 20.00, + } + Cache.Set(utils.CacheThresholds, rPrf.TenantID(), &Threshold{ + Tenant: rPrf.Tenant, + ID: rPrf.ID, + }, nil, true, utils.NonTransactional) + prfs = append(prfs, rPrf) + return rPrf, nil + }, + } + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}) + + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + ids.Add(fmt.Sprintf("TH%d", i)) + } + _, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + if err != utils.ErrNotImplemented { + t.Fatalf("Error: %+v", err) + } + for _, rPrf := range prfs { + if rPrf.isLocked() { + t.Fatalf("Expected profile to not be locked %q", rPrf.ID) + } + } + +} + +func TestThresholdMatchingThresholdForEventLocks4(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmp := Cache + defer func() { Cache = tmp }() + Cache = NewCacheS(cfg, nil, nil) + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}) + + prfs := make([]*ThresholdProfile, 0) + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TH%d", i), + Weight: 20.00, + MaxHits: 5, + } + dm.SetThresholdProfile(rPrf, true) + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + } + ids.Add("TH20") + mres, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + if err != nil { + t.Errorf("Error: %+v", err) + } + defer mres.unlock() + for _, rPrf := range prfs { + if !rPrf.isLocked() { + t.Errorf("Expected profile to be locked %q", rPrf.ID) + } + if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if !r.isLocked() { + t.Fatalf("Expected Threshold to be locked %q", rPrf.ID) + } + } + +} + +func TestThresholdMatchingThresholdForEventLocks5(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmp := Cache + tmpC := config.CgrConfig() + defer func() { + Cache = tmp + config.SetCgrConfig(tmpC) + }() + Cache = NewCacheS(cfg, nil, nil) + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, config.CgrConfig().CacheCfg(), NewConnManager(cfg, make(map[string]chan rpcclient.ClientConnector))) + cfg.ThresholdSCfg().StoreInterval = 1 + cfg.ThresholdSCfg().StringIndexedFields = nil + cfg.ThresholdSCfg().PrefixIndexedFields = nil + cfg.RPCConns()["test"] = &config.RPCConn{Conns: []*config.RemoteHost{{}}} + cfg.DataDbCfg().RmtConns = []string{"test"} + cfg.DataDbCfg().Items[utils.CacheThresholds].Remote = true + config.SetCgrConfig(cfg) + rS := NewThresholdService(dm, cfg, + &FilterS{dm: dm, cfg: cfg}) + + prfs := make([]*ThresholdProfile, 0) + ids := utils.StringSet{} + for i := 0; i < 10; i++ { + rPrf := &ThresholdProfile{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TH%d", i), + Weight: 20.00, + MaxHits: 5, + } + dm.SetThresholdProfile(rPrf, true) + prfs = append(prfs, rPrf) + ids.Add(rPrf.ID) + } + dm.RemoveThreshold("cgrates.org", "TH1") + _, err := rS.matchingThresholdsForEvent("cgrates.org", &ThresholdsArgsProcessEvent{ + ThresholdIDs: ids.AsSlice(), + CGREvent: new(utils.CGREvent), + }) + if err != utils.ErrDisconnected { + t.Errorf("Error: %+v", err) + } + for _, rPrf := range prfs { + if rPrf.isLocked() { + t.Fatalf("Expected profile to not be locked %q", rPrf.ID) + } + if rPrf.ID == "TH1" { + continue + } + if r, err := dm.GetThreshold(rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil { + t.Errorf("error %s for <%s>", err, rPrf.ID) + } else if r.isLocked() { + t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID) + } + } + +}