mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 12:49:54 +05:00
Updated threshold caching
This commit is contained in:
committed by
Dan Christian Bogos
parent
e6851695b1
commit
697ec8cf50
@@ -165,10 +165,14 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids []
|
||||
guardian.Guardian.UnguardIDs(lkID)
|
||||
case utils.ThresholdProfilePrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
lkID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, thresholdProfileLockKey(tntID.Tenant, tntID.ID))
|
||||
_, err = dm.GetThresholdProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
guardian.Guardian.UnguardIDs(lkID)
|
||||
case utils.ThresholdPrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
lkID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, thresholdLockKey(tntID.Tenant, tntID.ID))
|
||||
_, err = dm.GetThreshold(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
guardian.Guardian.UnguardIDs(lkID)
|
||||
case utils.FilterPrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = dm.GetFilter(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
|
||||
@@ -389,6 +389,19 @@ type ResourceService struct {
|
||||
connMgr *ConnManager
|
||||
}
|
||||
|
||||
// Reload stops the backupLoop and restarts it
|
||||
func (rS *ResourceService) Reload(ctx *context.Context) {
|
||||
close(rS.stopBackup)
|
||||
<-rS.loopStoped // wait until the loop is done
|
||||
rS.stopBackup = make(chan struct{})
|
||||
go rS.runBackup(ctx)
|
||||
}
|
||||
|
||||
// StartLoop starts the gorutine with the backup loop
|
||||
func (rS *ResourceService) StartLoop(ctx *context.Context) {
|
||||
go rS.runBackup(ctx)
|
||||
}
|
||||
|
||||
// Shutdown is called to shutdown the service
|
||||
func (rS *ResourceService) Shutdown(ctx *context.Context) {
|
||||
utils.Logger.Info("<ResourceS> service shutdown initialized")
|
||||
@@ -940,16 +953,3 @@ func (rS *ResourceService) V1GetResourceWithConfig(ctx *context.Context, arg *ut
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Reload stops the backupLoop and restarts it
|
||||
func (rS *ResourceService) Reload(ctx *context.Context) {
|
||||
close(rS.stopBackup)
|
||||
<-rS.loopStoped // wait until the loop is done
|
||||
rS.stopBackup = make(chan struct{})
|
||||
go rS.runBackup(ctx)
|
||||
}
|
||||
|
||||
// StartLoop starts the gorutine with the backup loop
|
||||
func (rS *ResourceService) StartLoop(ctx *context.Context) {
|
||||
go rS.runBackup(ctx)
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ func (sS *StatService) Shutdown(ctx *context.Context) {
|
||||
utils.Logger.Info("<StatS> service shutdown complete")
|
||||
}
|
||||
|
||||
// runBackup will regularly store resources changed to dataDB
|
||||
// runBackup will regularly store statQueues changed to dataDB
|
||||
func (sS *StatService) runBackup(ctx *context.Context) {
|
||||
storeInterval := sS.cgrcfg.StatSCfg().StoreInterval
|
||||
if storeInterval <= 0 {
|
||||
@@ -95,7 +95,7 @@ func (sS *StatService) runBackup(ctx *context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// storeResources represents one task of complete backup
|
||||
// storeStats represents one task of complete backup
|
||||
func (sS *StatService) storeStats(ctx *context.Context) {
|
||||
var failedSqIDs []string
|
||||
for { // don't stop untill we store all dirty statQueues
|
||||
@@ -156,7 +156,7 @@ func (sS *StatService) StoreStatQueue(ctx *context.Context, sq *StatQueue) (err
|
||||
return
|
||||
}
|
||||
|
||||
// matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call
|
||||
// matchingStatQueuesForEvent returns ordered list of matching statQueues which are active by the time of the call
|
||||
func (sS *StatService) matchingStatQueuesForEvent(ctx *context.Context, tnt string, statsIDs []string, evNm utils.MapStorage) (sqs StatQueues, err error) {
|
||||
sqIDs := utils.NewStringSet(statsIDs)
|
||||
if len(sqIDs) == 0 {
|
||||
@@ -430,7 +430,7 @@ func (sS *StatService) V1GetStatQueue(ctx *context.Context, args *utils.TenantID
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
// make sure resource is locked at process level
|
||||
// make sure statQueue is locked at process level
|
||||
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
statQueueLockKey(tnt, args.ID))
|
||||
@@ -452,7 +452,7 @@ func (sS *StatService) V1GetQueueStringMetrics(ctx *context.Context, args *utils
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
// make sure resource is locked at process level
|
||||
// make sure statQueue is locked at process level
|
||||
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
statQueueLockKey(tnt, args.ID))
|
||||
@@ -481,7 +481,7 @@ func (sS *StatService) V1GetQueueFloatMetrics(ctx *context.Context, args *utils.
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
// make sure resource is locked at process level
|
||||
// make sure statQueue is locked at process level
|
||||
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
statQueueLockKey(tnt, args.ID))
|
||||
@@ -528,7 +528,7 @@ func (sS *StatService) V1ResetStatQueue(ctx *context.Context, tntID *utils.Tenan
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
// make sure resource is locked at process level
|
||||
// make sure statQueue is locked at process level
|
||||
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
statQueueLockKey(tnt, tntID.ID))
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -48,6 +49,8 @@ type ThresholdProfile struct {
|
||||
Weight float64 // Weight to sort the thresholds
|
||||
ActionProfileIDs []string
|
||||
Async bool
|
||||
|
||||
lkID string // holds the reference towards guardian lock key
|
||||
}
|
||||
|
||||
// TenantID returns the concatenated key beteen tenant and ID
|
||||
@@ -55,6 +58,36 @@ func (tp *ThresholdProfile) TenantID() string {
|
||||
return utils.ConcatenatedKey(tp.Tenant, tp.ID)
|
||||
}
|
||||
|
||||
// thresholdProfileLockKey returns the ID used to lock a ThresholdProfile with guardian
|
||||
func thresholdProfileLockKey(tnt, id string) string {
|
||||
return utils.ConcatenatedKey(utils.CacheThresholdProfiles, tnt, id)
|
||||
}
|
||||
|
||||
// lock will lock the ThresholdProfile using guardian and store the lock within r.lkID
|
||||
// if lkID is passed as argument, the lock is considered as executed
|
||||
func (tp *ThresholdProfile) lock(lkID string) {
|
||||
if lkID == utils.EmptyString {
|
||||
lkID = guardian.Guardian.GuardIDs("",
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
thresholdProfileLockKey(tp.Tenant, tp.ID))
|
||||
}
|
||||
tp.lkID = lkID
|
||||
}
|
||||
|
||||
// unlock will unlock the ThresholdProfile and clear rp.lkID
|
||||
func (tp *ThresholdProfile) unlock() {
|
||||
if tp.lkID == utils.EmptyString {
|
||||
return
|
||||
}
|
||||
guardian.Guardian.UnguardIDs(tp.lkID)
|
||||
tp.lkID = utils.EmptyString
|
||||
}
|
||||
|
||||
// isLocked returns the locks status of this ThresholdProfile
|
||||
func (tp *ThresholdProfile) isLocked() bool {
|
||||
return tp.lkID != utils.EmptyString
|
||||
}
|
||||
|
||||
// ThresholdWithAPIOpts is used in replicatorV1 for dispatcher
|
||||
type ThresholdWithAPIOpts struct {
|
||||
*Threshold
|
||||
@@ -68,6 +101,7 @@ type Threshold struct {
|
||||
Hits int // number of hits for this threshold
|
||||
Snooze time.Time // prevent threshold to run too early
|
||||
|
||||
lkID string // ID of the lock used when matching the threshold
|
||||
tPrfl *ThresholdProfile
|
||||
dirty *bool // needs save
|
||||
}
|
||||
@@ -77,6 +111,36 @@ func (t *Threshold) TenantID() string {
|
||||
return utils.ConcatenatedKey(t.Tenant, t.ID)
|
||||
}
|
||||
|
||||
// thresholdLockKey returns the ID used to lock a threshold with guardian
|
||||
func thresholdLockKey(tnt, id string) string {
|
||||
return utils.ConcatenatedKey(utils.CacheThresholds, tnt, id)
|
||||
}
|
||||
|
||||
// lock will lock the threshold using guardian and store the lock within r.lkID
|
||||
// if lkID is passed as argument, the lock is considered as executed
|
||||
func (t *Threshold) lock(lkID string) {
|
||||
if lkID == utils.EmptyString {
|
||||
lkID = guardian.Guardian.GuardIDs("",
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
thresholdLockKey(t.Tenant, t.ID))
|
||||
}
|
||||
t.lkID = lkID
|
||||
}
|
||||
|
||||
// unlock will unlock the threshold and clear r.lkID
|
||||
func (t *Threshold) unlock() {
|
||||
if t.lkID == utils.EmptyString {
|
||||
return
|
||||
}
|
||||
guardian.Guardian.UnguardIDs(t.lkID)
|
||||
t.lkID = utils.EmptyString
|
||||
}
|
||||
|
||||
// isLocked returns the locks status of this threshold
|
||||
func (t *Threshold) isLocked() bool {
|
||||
return t.lkID != utils.EmptyString
|
||||
}
|
||||
|
||||
// processEventWithThreshold processes an ThresholdEvent
|
||||
func processEventWithThreshold(ctx *context.Context, connMgr *ConnManager, actionsConns []string, args *utils.CGREvent, t *Threshold) (err error) {
|
||||
if t.Snooze.After(time.Now()) || // snoozed, not executing actions
|
||||
@@ -125,6 +189,16 @@ func (ts Thresholds) Sort() {
|
||||
sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight })
|
||||
}
|
||||
|
||||
// unlock will unlock thresholds part of this slice
|
||||
func (ts Thresholds) unlock() {
|
||||
for _, t := range ts {
|
||||
t.unlock()
|
||||
if t.tPrfl != nil {
|
||||
t.tPrfl.unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewThresholdService the constructor for ThresoldS service
|
||||
func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS, connMgr *ConnManager) (tS *ThresholdService) {
|
||||
return &ThresholdService{
|
||||
@@ -150,6 +224,19 @@ type ThresholdService struct {
|
||||
stMux sync.RWMutex // protects storedTdIDs
|
||||
}
|
||||
|
||||
// Reload stops the backupLoop and restarts it
|
||||
func (tS *ThresholdService) Reload(ctx *context.Context) {
|
||||
close(tS.stopBackup)
|
||||
<-tS.loopStoped // wait until the loop is done
|
||||
tS.stopBackup = make(chan struct{})
|
||||
go tS.runBackup(ctx)
|
||||
}
|
||||
|
||||
// StartLoop starts the gorutine with the backup loop
|
||||
func (tS *ThresholdService) StartLoop(ctx *context.Context) {
|
||||
go tS.runBackup(ctx)
|
||||
}
|
||||
|
||||
// Shutdown is called to shutdown the service
|
||||
func (tS *ThresholdService) Shutdown(ctx *context.Context) {
|
||||
utils.Logger.Info("<ThresholdS> shutdown initialized")
|
||||
@@ -158,7 +245,7 @@ func (tS *ThresholdService) Shutdown(ctx *context.Context) {
|
||||
utils.Logger.Info("<ThresholdS> shutdown complete")
|
||||
}
|
||||
|
||||
// backup will regularly store resources changed to dataDB
|
||||
// backup will regularly store thresholds changed to dataDB
|
||||
func (tS *ThresholdService) runBackup(ctx *context.Context) {
|
||||
storeInterval := tS.cgrcfg.ThresholdSCfg().StoreInterval
|
||||
if storeInterval <= 0 {
|
||||
@@ -179,7 +266,7 @@ func (tS *ThresholdService) runBackup(ctx *context.Context) {
|
||||
// storeThresholds represents one task of complete backup
|
||||
func (tS *ThresholdService) storeThresholds(ctx *context.Context) {
|
||||
var failedTdIDs []string
|
||||
for { // don't stop until we store all dirty resources
|
||||
for { // don't stop until we store all dirty thresholds
|
||||
tS.stMux.Lock()
|
||||
tID := tS.storedTdIDs.GetOne()
|
||||
if tID != "" {
|
||||
@@ -189,11 +276,17 @@ func (tS *ThresholdService) storeThresholds(ctx *context.Context) {
|
||||
if tID == "" {
|
||||
break // no more keys, backup completed
|
||||
}
|
||||
if tIf, ok := Cache.Get(utils.CacheThresholds, tID); !ok || tIf == nil {
|
||||
tIf, ok := Cache.Get(utils.CacheThresholds, tID)
|
||||
if !ok || tIf == nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<ThresholdS> failed retrieving from cache treshold with ID: %s", tID))
|
||||
} else if err := tS.StoreThreshold(ctx, tIf.(*Threshold)); err != nil {
|
||||
continue
|
||||
}
|
||||
t := tIf.(*Threshold)
|
||||
t.lock(utils.EmptyString)
|
||||
if err := tS.StoreThreshold(ctx, t); err != nil {
|
||||
failedTdIDs = append(failedTdIDs, tID) // record failure so we can schedule it for next backup
|
||||
}
|
||||
t.unlock()
|
||||
// randomize the CPU load and give up thread control
|
||||
runtime.Gosched()
|
||||
}
|
||||
@@ -215,6 +308,16 @@ func (tS *ThresholdService) StoreThreshold(ctx *context.Context, t *Threshold) (
|
||||
t.Tenant, t.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
//since we no longer handle cache in DataManager do here a manual caching
|
||||
if tntID := t.TenantID(); Cache.HasItem(utils.CacheThresholds, tntID) { // only cache if previously there
|
||||
if err = Cache.Set(ctx, utils.CacheThresholds, tntID, t, nil,
|
||||
true, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<ThresholdService> failed caching Threshold with ID: %s, error: %s",
|
||||
t.TenantID(), err.Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
*t.dirty = false
|
||||
return
|
||||
}
|
||||
@@ -241,26 +344,45 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ctx *context.Context, tnt
|
||||
}
|
||||
ts = make(Thresholds, 0, len(tIDs))
|
||||
for tID := range tIDs {
|
||||
tPrfl, err := tS.dm.GetThresholdProfile(ctx, tnt, tID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
lkPrflID := guardian.Guardian.GuardIDs("",
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
thresholdProfileLockKey(tnt, tID))
|
||||
var tPrfl *ThresholdProfile
|
||||
if tPrfl, err = tS.dm.GetThresholdProfile(ctx, tnt, tID, true, true, utils.NonTransactional); err != nil {
|
||||
guardian.Guardian.UnguardIDs(lkPrflID)
|
||||
if err == utils.ErrNotFound {
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
ts.unlock()
|
||||
return nil, err
|
||||
}
|
||||
if pass, err := tS.filterS.Pass(ctx, tnt, tPrfl.FilterIDs,
|
||||
tPrfl.lock(lkPrflID)
|
||||
var pass bool
|
||||
if pass, err = tS.filterS.Pass(ctx, tnt, tPrfl.FilterIDs,
|
||||
evNm); err != nil {
|
||||
tPrfl.unlock()
|
||||
ts.unlock()
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
tPrfl.unlock()
|
||||
continue
|
||||
}
|
||||
t, err := tS.dm.GetThreshold(ctx, tPrfl.Tenant, tPrfl.ID, true, true, "")
|
||||
if err != nil {
|
||||
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
thresholdLockKey(tPrfl.Tenant, tPrfl.ID))
|
||||
var t *Threshold
|
||||
if t, err = tS.dm.GetThreshold(ctx, tPrfl.Tenant, tPrfl.ID, true, true, ""); err != nil {
|
||||
guardian.Guardian.UnguardIDs(lkID)
|
||||
tPrfl.unlock()
|
||||
if err == utils.ErrNotFound { // corner case where the threshold was removed due to MaxHits
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
ts.unlock()
|
||||
return nil, err
|
||||
}
|
||||
t.lock(lkID)
|
||||
if t.dirty == nil || tPrfl.MaxHits == -1 || t.Hits < tPrfl.MaxHits {
|
||||
t.dirty = utils.BoolPointer(false)
|
||||
}
|
||||
@@ -273,7 +395,8 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ctx *context.Context, tnt
|
||||
}
|
||||
ts.Sort()
|
||||
for i, t := range ts {
|
||||
if t.tPrfl.Blocker { // blocker will stop processing
|
||||
if t.tPrfl.Blocker && i != len(ts)-1 { // blocker will stop processing and we are not at last index
|
||||
Thresholds(ts[i+1:]).unlock()
|
||||
ts = ts[:i+1]
|
||||
break
|
||||
}
|
||||
@@ -305,10 +428,7 @@ func (attr *ThresholdsArgsProcessEvent) RPCClone() (interface{}, error) {
|
||||
func (attr *ThresholdsArgsProcessEvent) Clone() *ThresholdsArgsProcessEvent {
|
||||
var thIDs []string
|
||||
if attr.ThresholdIDs != nil {
|
||||
thIDs = make([]string, len(attr.ThresholdIDs))
|
||||
for i, id := range attr.ThresholdIDs {
|
||||
thIDs[i] = id
|
||||
}
|
||||
thIDs = utils.CloneStringSlice(attr.ThresholdIDs)
|
||||
}
|
||||
return &ThresholdsArgsProcessEvent{
|
||||
ThresholdIDs: thIDs,
|
||||
@@ -318,13 +438,13 @@ func (attr *ThresholdsArgsProcessEvent) Clone() *ThresholdsArgsProcessEvent {
|
||||
|
||||
// processEvent processes a new event, dispatching to matching thresholds
|
||||
func (tS *ThresholdService) processEvent(ctx *context.Context, tnt string, args *ThresholdsArgsProcessEvent) (thresholdsIDs []string, err error) {
|
||||
matchTS, err := tS.matchingThresholdsForEvent(ctx, tnt, args)
|
||||
if err != nil {
|
||||
var matchTs Thresholds
|
||||
if matchTs, err = tS.matchingThresholdsForEvent(ctx, tnt, args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var withErrors bool
|
||||
thresholdsIDs = make([]string, 0, len(matchTS))
|
||||
for _, t := range matchTS {
|
||||
thresholdsIDs = make([]string, 0, len(matchTs))
|
||||
for _, t := range matchTs {
|
||||
thresholdsIDs = append(thresholdsIDs, t.ID)
|
||||
t.Hits++
|
||||
if err = processEventWithThreshold(ctx, tS.connMgr,
|
||||
@@ -343,11 +463,14 @@ func (tS *ThresholdService) processEvent(ctx *context.Context, tnt string, args
|
||||
withErrors = true
|
||||
}
|
||||
//since we don't handle in DataManager caching we do a manual remove here
|
||||
if err = tS.dm.CacheDataFromDB(ctx, utils.ThresholdPrefix, []string{t.TenantID()}, true); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<ThresholdService> failed removing from cache non-recurrent threshold: %s, error: %s",
|
||||
t.TenantID(), err.Error()))
|
||||
withErrors = true
|
||||
if tntID := t.TenantID(); Cache.HasItem(utils.CacheThresholds, tntID) { // only cache if previously there
|
||||
if err = Cache.Set(ctx, utils.CacheThresholds, tntID, nil, nil,
|
||||
true, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<ThresholdService> failed removing from cache non-recurrent threshold: %s, error: %s",
|
||||
t.TenantID(), err.Error()))
|
||||
withErrors = true
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -362,6 +485,7 @@ func (tS *ThresholdService) processEvent(ctx *context.Context, tnt string, args
|
||||
tS.stMux.Unlock()
|
||||
}
|
||||
}
|
||||
matchTs.unlock()
|
||||
if withErrors {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
@@ -407,6 +531,7 @@ func (tS *ThresholdService) V1GetThresholdsForEvent(ctx *context.Context, args *
|
||||
var ts Thresholds
|
||||
if ts, err = tS.matchingThresholdsForEvent(ctx, tnt, args); err == nil {
|
||||
*reply = ts
|
||||
ts.unlock()
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -436,6 +561,11 @@ func (tS *ThresholdService) V1GetThreshold(ctx *context.Context, tntID *utils.Te
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = tS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
// make sure threshold is locked at process level
|
||||
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
thresholdLockKey(tnt, tntID.ID))
|
||||
defer guardian.Guardian.UnguardIDs(lkID)
|
||||
if thd, err = tS.dm.GetThreshold(ctx, tnt, tntID.ID, true, true, ""); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -443,19 +573,6 @@ func (tS *ThresholdService) V1GetThreshold(ctx *context.Context, tntID *utils.Te
|
||||
return
|
||||
}
|
||||
|
||||
// Reload stops the backupLoop and restarts it
|
||||
func (tS *ThresholdService) Reload(ctx *context.Context) {
|
||||
close(tS.stopBackup)
|
||||
<-tS.loopStoped // wait until the loop is done
|
||||
tS.stopBackup = make(chan struct{})
|
||||
go tS.runBackup(ctx)
|
||||
}
|
||||
|
||||
// StartLoop starts the gorutine with the backup loop
|
||||
func (tS *ThresholdService) StartLoop(ctx *context.Context) {
|
||||
go tS.runBackup(ctx)
|
||||
}
|
||||
|
||||
// V1ResetThreshold resets the threshold hits
|
||||
func (tS *ThresholdService) V1ResetThreshold(ctx *context.Context, tntID *utils.TenantID, rply *string) (err error) {
|
||||
var thd *Threshold
|
||||
@@ -463,6 +580,11 @@ func (tS *ThresholdService) V1ResetThreshold(ctx *context.Context, tntID *utils.
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = tS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
// make sure threshold is locked at process level
|
||||
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
thresholdLockKey(tnt, tntID.ID))
|
||||
defer guardian.Guardian.UnguardIDs(lkID)
|
||||
if thd, err = tS.dm.GetThreshold(ctx, tnt, tntID.ID, true, true, ""); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ package engine
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"reflect"
|
||||
@@ -26,6 +27,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -358,29 +360,36 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", th, temptTh)
|
||||
}
|
||||
}
|
||||
if thMatched, err := thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != nil {
|
||||
thMatched, err := thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[0].Tenant, argsGetThresholds[0])
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) {
|
||||
}
|
||||
thMatched.unlock()
|
||||
if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[0].Tenant, thMatched[0].Tenant)
|
||||
} else if !reflect.DeepEqual(ths[0].ID, thMatched[0].ID) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[0].ID, thMatched[0].ID)
|
||||
} else if !reflect.DeepEqual(ths[0].Hits, thMatched[0].Hits) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[0].Hits, thMatched[0].Hits)
|
||||
}
|
||||
|
||||
if thMatched, err := thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != nil {
|
||||
thMatched, err = thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[1].Tenant, argsGetThresholds[1])
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) {
|
||||
}
|
||||
thMatched.unlock()
|
||||
if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[1].Tenant, thMatched[0].Tenant)
|
||||
} else if !reflect.DeepEqual(ths[1].ID, thMatched[0].ID) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[1].ID, thMatched[0].ID)
|
||||
} else if !reflect.DeepEqual(ths[1].Hits, thMatched[0].Hits) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[1].Hits, thMatched[0].Hits)
|
||||
}
|
||||
|
||||
if thMatched, err := thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != nil {
|
||||
thMatched, err = thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[2].Tenant, argsGetThresholds[2])
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) {
|
||||
}
|
||||
thMatched.unlock()
|
||||
if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[2].Tenant, thMatched[0].Tenant)
|
||||
} else if !reflect.DeepEqual(ths[2].ID, thMatched[0].ID) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[2].ID, thMatched[0].ID)
|
||||
@@ -561,21 +570,21 @@ func TestThresholdsProcessEvent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
thIDs := []string{"TH_1"}
|
||||
if thMatched, err := thServ.processEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != utils.ErrPartiallyExecuted {
|
||||
if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(thIDs, thMatched) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched)
|
||||
}
|
||||
|
||||
thIDs = []string{"TH_2"}
|
||||
if thMatched, err := thServ.processEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != utils.ErrPartiallyExecuted {
|
||||
if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(thIDs, thMatched) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched)
|
||||
}
|
||||
|
||||
thIDs = []string{"TH_3"}
|
||||
if thMatched, err := thServ.processEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != utils.ErrPartiallyExecuted {
|
||||
if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(thIDs, thMatched) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched)
|
||||
@@ -744,28 +753,31 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) {
|
||||
}
|
||||
}
|
||||
thIDs := []string{"TH_1"}
|
||||
if thMatched, err := thServ.processEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != utils.ErrPartiallyExecuted {
|
||||
if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(thIDs, thMatched) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched)
|
||||
}
|
||||
|
||||
thIDs = []string{"TH_2"}
|
||||
if thMatched, err := thServ.processEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != utils.ErrPartiallyExecuted {
|
||||
if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(thIDs, thMatched) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched)
|
||||
}
|
||||
|
||||
thIDs = []string{"TH_3"}
|
||||
if thMatched, err := thServ.processEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != utils.ErrPartiallyExecuted {
|
||||
if thMatched, err := thServ.processEvent(context.Background(),argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(thIDs, thMatched) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched)
|
||||
}
|
||||
if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != nil {
|
||||
thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0])
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) {
|
||||
}
|
||||
thMatched.unlock()
|
||||
if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[0].Tenant, thMatched[0].Tenant)
|
||||
} else if !reflect.DeepEqual(ths[0].ID, thMatched[0].ID) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[0].ID, thMatched[0].ID)
|
||||
@@ -773,9 +785,12 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) {
|
||||
t.Errorf("Expecting: 1, received: %+v", thMatched[0].Hits)
|
||||
}
|
||||
|
||||
if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != nil {
|
||||
thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1])
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) {
|
||||
}
|
||||
thMatched.unlock()
|
||||
if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[1].Tenant, thMatched[0].Tenant)
|
||||
} else if !reflect.DeepEqual(ths[1].ID, thMatched[0].ID) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[1].ID, thMatched[0].ID)
|
||||
@@ -783,9 +798,12 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) {
|
||||
t.Errorf("Expecting: 1, received: %+v", thMatched[0].Hits)
|
||||
}
|
||||
|
||||
if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != nil {
|
||||
thMatched, err = thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2])
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) {
|
||||
}
|
||||
thMatched.unlock()
|
||||
if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[2].Tenant, thMatched[0].Tenant)
|
||||
} else if !reflect.DeepEqual(ths[2].ID, thMatched[0].ID) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ths[2].ID, thMatched[0].ID)
|
||||
@@ -1010,18 +1028,18 @@ func TestThresholdsProcessEvent2(t *testing.T) {
|
||||
} else if !reflect.DeepEqual(thIDs, thMatched) && !reflect.DeepEqual(thIDsRev, thMatched) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", thIDs, thMatched)
|
||||
}
|
||||
|
||||
if thMatched, err := thServ.matchingThresholdsForEvent(ev.Tenant, ev); err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
} else {
|
||||
for _, thM := range thMatched {
|
||||
if !reflect.DeepEqual(thPrf.Tenant, thM.Tenant) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", thPrf.Tenant, thM.Tenant)
|
||||
} else if reflect.DeepEqual(thIDs[0], thM.ID) && thM.Hits != 1 {
|
||||
t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits)
|
||||
} else if reflect.DeepEqual(thIDs[1], thM.ID) && thM.Hits != 1 {
|
||||
t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits)
|
||||
}
|
||||
thMatched, err := thServ.matchingThresholdsForEvent(ev.Tenant, ev)
|
||||
if err != nil {
|
||||
t.Fatalf("Error: %+v", err)
|
||||
}
|
||||
thMatched.unlock()
|
||||
for _, thM := range thMatched {
|
||||
if !reflect.DeepEqual(thPrf.Tenant, thM.Tenant) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", thPrf.Tenant, thM.Tenant)
|
||||
} else if reflect.DeepEqual(thIDs[0], thM.ID) && thM.Hits != 1 {
|
||||
t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits)
|
||||
} else if reflect.DeepEqual(thIDs[1], thM.ID) && thM.Hits != 1 {
|
||||
t.Errorf("Expecting: 1 for %+v, received: %+v", thM.ID, thM.Hits)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1307,22 +1325,15 @@ func TestThresholdsStoreThresholdsOK(t *testing.T) {
|
||||
dm := NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
tS := NewThresholdService(dm, cfg, nil, nil)
|
||||
|
||||
value := &Threshold{
|
||||
dirty: utils.BoolPointer(true),
|
||||
Tenant: "cgrates.org",
|
||||
ID: "TH1",
|
||||
Hits: 2,
|
||||
}
|
||||
|
||||
Cache.SetWithoutReplicate(utils.CacheThresholds, "TH1", value, nil, true,
|
||||
utils.NonTransactional)
|
||||
tS.storedTdIDs.Add("TH1")
|
||||
exp := &Threshold{
|
||||
dirty: utils.BoolPointer(false),
|
||||
Tenant: "cgrates.org",
|
||||
ID: "TH1",
|
||||
Hits: 2,
|
||||
}
|
||||
Cache.SetWithoutReplicate(utils.CacheThresholds, "cgrates.org:TH1", exp, nil, true,
|
||||
utils.NonTransactional)
|
||||
tS.storedTdIDs.Add("cgrates.org:TH1")
|
||||
tS.storeThresholds(context.Background())
|
||||
|
||||
if rcv, err := tS.dm.GetThreshold(context.Background(), "cgrates.org", "TH1", true, false,
|
||||
@@ -1333,7 +1344,7 @@ func TestThresholdsStoreThresholdsOK(t *testing.T) {
|
||||
utils.ToJSON(exp), utils.ToJSON(rcv))
|
||||
}
|
||||
|
||||
Cache.Remove(context.Background(), utils.CacheThresholds, "TH1", true, utils.NonTransactional)
|
||||
Cache.Remove(context.Background(), utils.CacheThresholds, "cgrates.org:TH1", true, utils.NonTransactional)
|
||||
}
|
||||
|
||||
func TestThresholdsStoreThresholdsStoreThErr(t *testing.T) {
|
||||
@@ -1544,8 +1555,7 @@ func TestThresholdsProcessEventStoreThOK(t *testing.T) {
|
||||
} else {
|
||||
rcv.tPrfl = nil
|
||||
rcv.dirty = nil
|
||||
var snooze time.Time
|
||||
rcv.Snooze = snooze
|
||||
rcv.Snooze = time.Time{}
|
||||
if !reflect.DeepEqual(rcv, exp) {
|
||||
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
|
||||
}
|
||||
@@ -1553,73 +1563,86 @@ func TestThresholdsProcessEventStoreThOK(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
// func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) {
|
||||
// utils.Logger.SetLogLevel(4)
|
||||
// utils.Logger.SetSyslog(nil)
|
||||
func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) {
|
||||
utils.Logger.SetLogLevel(4)
|
||||
utils.Logger.SetSyslog(nil)
|
||||
|
||||
// var buf bytes.Buffer
|
||||
// log.SetOutput(&buf)
|
||||
// defer func() {
|
||||
// log.SetOutput(os.Stderr)
|
||||
// }()
|
||||
var buf bytes.Buffer
|
||||
log.SetOutput(&buf)
|
||||
tmp := config.CgrConfig()
|
||||
tmpC := Cache
|
||||
tmpCMgr := connMgr
|
||||
|
||||
// cfg := config.NewDefaultCGRConfig()
|
||||
// data := NewInternalDB(nil, nil, true)
|
||||
// dm := NewDataManager(data, cfg.CacheCfg(), nil)
|
||||
// filterS := NewFilterS(cfg, nil, dm)
|
||||
// tS := NewThresholdService(nil, cfg, filterS, nil)
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfg.RPCConns()["test"] = &config.RPCConn{Conns: []*config.RemoteHost{{}}}
|
||||
cfg.CacheCfg().ReplicationConns = []string{"test"}
|
||||
cfg.CacheCfg().Partitions[utils.CacheThresholds].Replicate = true
|
||||
config.SetCgrConfig(cfg)
|
||||
data := NewInternalDB(nil, nil, true)
|
||||
connMgr = NewConnManager(cfg, make(map[string]chan birpc.ClientConnector))
|
||||
dm := NewDataManager(data, cfg.CacheCfg(), connMgr)
|
||||
filterS := NewFilterS(cfg, nil, dm)
|
||||
tS := NewThresholdService(nil, cfg, filterS, connMgr)
|
||||
Cache = NewCacheS(cfg, dm, nil)
|
||||
|
||||
// thPrf := &ThresholdProfile{
|
||||
// Tenant: "cgrates.org",
|
||||
// ID: "TH3",
|
||||
// FilterIDs: []string{"*string:~*req.Account:1001"},
|
||||
// MinHits: 2,
|
||||
// MaxHits: 5,
|
||||
// Weight: 10,
|
||||
// Blocker: true,
|
||||
// }
|
||||
// th := &Threshold{
|
||||
// Tenant: "cgrates.org",
|
||||
// ID: "TH3",
|
||||
// Hits: 4,
|
||||
// tPrfl: thPrf,
|
||||
// }
|
||||
defer func() {
|
||||
connMgr = tmpCMgr
|
||||
Cache = tmpC
|
||||
config.SetCgrConfig(tmp)
|
||||
log.SetOutput(os.Stderr)
|
||||
}()
|
||||
thPrf := &ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "TH3",
|
||||
FilterIDs: []string{"*string:~*req.Account:1001"},
|
||||
MinHits: 2,
|
||||
MaxHits: 5,
|
||||
Weight: 10,
|
||||
ActionProfileIDs: []string{utils.MetaNone},
|
||||
Blocker: true,
|
||||
}
|
||||
th := &Threshold{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "TH3",
|
||||
Hits: 4,
|
||||
tPrfl: thPrf,
|
||||
}
|
||||
|
||||
// if err := dm.SetThresholdProfile(context.Background(), thPrf, true); err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
// if err := dm.SetThreshold(context.Background(), th); err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
if err := dm.SetThresholdProfile(context.Background(), thPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := dm.SetThreshold(context.Background(), th); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// args := &ThresholdsArgsProcessEvent{
|
||||
// ThresholdIDs: []string{"TH3"},
|
||||
// CGREvent: &utils.CGREvent{
|
||||
// Tenant: "cgrates.org",
|
||||
// ID: "ThdProcessEvent",
|
||||
// Event: map[string]interface{}{
|
||||
// utils.AccountField: "1001",
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
args := &ThresholdsArgsProcessEvent{
|
||||
ThresholdIDs: []string{"TH3"},
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "ThdProcessEvent",
|
||||
Event: map[string]interface{}{
|
||||
utils.AccountField: "1001",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// expLog1 := `[WARNING] <ThresholdService> failed removing from database non-recurrent threshold: cgrates.org:TH3, error: NO_DATABASE_CONNECTION`
|
||||
// expLog2 := `[WARNING] <ThresholdService> failed removing from cache non-recurrent threshold: cgrates.org:TH3, error: NO_DATABASE_CONNECTION`
|
||||
expLog1 := `[WARNING] <ThresholdService> failed removing from database non-recurrent threshold: cgrates.org:TH3, error: NO_DATABASE_CONNECTION`
|
||||
expLog2 := `[WARNING] <ThresholdService> failed removing from cache non-recurrent threshold: cgrates.org:TH3, error: DISCONNECTED`
|
||||
|
||||
// if _, err := tS.processEvent(context.Background(), args.Tenant, args); err == nil ||
|
||||
// err != utils.ErrPartiallyExecuted {
|
||||
// t.Errorf("expected: <%+v>, \nreceived: <%+v>",
|
||||
// utils.ErrPartiallyExecuted, err)
|
||||
// }
|
||||
if _, err := tS.processEvent(context.Background(), args.Tenant, args); err == nil ||
|
||||
err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("expected: <%+v>, \nreceived: <%+v>",
|
||||
utils.ErrPartiallyExecuted, err)
|
||||
}
|
||||
|
||||
// if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog1) ||
|
||||
// !strings.Contains(rcvLog, expLog2) {
|
||||
// t.Errorf("expected logs <%+v> and <%+v> to be included in: <%+v>",
|
||||
// expLog1, expLog2, rcvLog)
|
||||
// }
|
||||
if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog1) ||
|
||||
!strings.Contains(rcvLog, expLog2) {
|
||||
t.Errorf("expected logs <%+v> and <%+v> to be included in: <%+v>",
|
||||
expLog1, expLog2, rcvLog)
|
||||
}
|
||||
|
||||
// utils.Logger.SetLogLevel(0)
|
||||
// }
|
||||
utils.Logger.SetLogLevel(0)
|
||||
}
|
||||
|
||||
func TestThresholdsProcessEventNotFound(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
@@ -1965,3 +1988,336 @@ func TestThresholdsV1GetThresholdNotFoundErr(t *testing.T) {
|
||||
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrNotFound, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestThresholdMatchingThresholdForEventLocks(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
tmp := Cache
|
||||
defer func() { Cache = tmp }()
|
||||
Cache = NewCacheS(cfg, nil, nil)
|
||||
db := NewInternalDB(nil, nil, true)
|
||||
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil)
|
||||
cfg.ThresholdSCfg().StoreInterval = 1
|
||||
cfg.ThresholdSCfg().StringIndexedFields = nil
|
||||
cfg.ThresholdSCfg().PrefixIndexedFields = nil
|
||||
rS := NewThresholdService(dm, cfg,
|
||||
&FilterS{dm: dm, cfg: cfg}, nil)
|
||||
|
||||
prfs := make([]*ThresholdProfile, 0)
|
||||
ids := utils.StringSet{}
|
||||
for i := 0; i < 10; i++ {
|
||||
rPrf := &ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: fmt.Sprintf("TH%d", i),
|
||||
Weight: 20.00,
|
||||
MaxHits: 5,
|
||||
}
|
||||
dm.SetThresholdProfile(context.Background(), rPrf, true)
|
||||
prfs = append(prfs, rPrf)
|
||||
ids.Add(rPrf.ID)
|
||||
}
|
||||
dm.RemoveThreshold(context.Background(), "cgrates.org", "TH1")
|
||||
mth, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
|
||||
ThresholdIDs: ids.AsSlice(),
|
||||
CGREvent: new(utils.CGREvent),
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
defer mth.unlock()
|
||||
for _, rPrf := range prfs {
|
||||
if rPrf.ID == "TH1" {
|
||||
if rPrf.isLocked() {
|
||||
t.Fatalf("Expected profile to not be locked %q", rPrf.ID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !rPrf.isLocked() {
|
||||
t.Fatalf("Expected profile to be locked %q", rPrf.ID)
|
||||
}
|
||||
if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
|
||||
t.Errorf("error %s for <%s>", err, rPrf.ID)
|
||||
} else if !r.isLocked() {
|
||||
t.Fatalf("Expected Threshold to be locked %q", rPrf.ID)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestThresholdMatchingThresholdForEventLocks2(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
tmp := Cache
|
||||
defer func() { Cache = tmp }()
|
||||
Cache = NewCacheS(cfg, nil, nil)
|
||||
db := NewInternalDB(nil, nil, true)
|
||||
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil)
|
||||
cfg.ThresholdSCfg().StoreInterval = 1
|
||||
cfg.ThresholdSCfg().StringIndexedFields = nil
|
||||
cfg.ThresholdSCfg().PrefixIndexedFields = nil
|
||||
rS := NewThresholdService(dm, cfg,
|
||||
&FilterS{dm: dm, cfg: cfg}, nil)
|
||||
|
||||
prfs := make([]*ThresholdProfile, 0)
|
||||
ids := utils.StringSet{}
|
||||
for i := 0; i < 10; i++ {
|
||||
rPrf := &ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: fmt.Sprintf("TH%d", i),
|
||||
Weight: 20.00,
|
||||
MaxHits: 5,
|
||||
}
|
||||
dm.SetThresholdProfile(context.Background(), rPrf, true)
|
||||
prfs = append(prfs, rPrf)
|
||||
ids.Add(rPrf.ID)
|
||||
}
|
||||
rPrf := &ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "TH20",
|
||||
FilterIDs: []string{"FLTR_RES_201"},
|
||||
Weight: 20.00,
|
||||
MaxHits: 5,
|
||||
}
|
||||
err = db.SetThresholdProfileDrv(context.Background(), rPrf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
prfs = append(prfs, rPrf)
|
||||
ids.Add(rPrf.ID)
|
||||
_, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
|
||||
ThresholdIDs: ids.AsSlice(),
|
||||
CGREvent: new(utils.CGREvent),
|
||||
})
|
||||
expErr := utils.ErrPrefixNotFound(rPrf.FilterIDs[0])
|
||||
if err == nil || err.Error() != expErr.Error() {
|
||||
t.Errorf("Expected error: %s ,received: %+v", expErr, err)
|
||||
}
|
||||
for _, rPrf := range prfs {
|
||||
if rPrf.isLocked() {
|
||||
t.Fatalf("Expected profile to not be locked %q", rPrf.ID)
|
||||
}
|
||||
if rPrf.ID == "TH20" {
|
||||
continue
|
||||
}
|
||||
if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
|
||||
t.Errorf("error %s for <%s>", err, rPrf.ID)
|
||||
} else if r.isLocked() {
|
||||
t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestThresholdMatchingThresholdForEventLocksBlocker(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
tmp := Cache
|
||||
defer func() { Cache = tmp }()
|
||||
Cache = NewCacheS(cfg, nil, nil)
|
||||
db := NewInternalDB(nil, nil, true)
|
||||
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil)
|
||||
cfg.ThresholdSCfg().StoreInterval = 1
|
||||
cfg.ThresholdSCfg().StringIndexedFields = nil
|
||||
cfg.ThresholdSCfg().PrefixIndexedFields = nil
|
||||
rS := NewThresholdService(dm, cfg,
|
||||
&FilterS{dm: dm, cfg: cfg}, nil)
|
||||
|
||||
prfs := make([]*ThresholdProfile, 0)
|
||||
ids := utils.StringSet{}
|
||||
for i := 0; i < 10; i++ {
|
||||
rPrf := &ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: fmt.Sprintf("TH%d", i),
|
||||
Weight: float64(10 - i),
|
||||
Blocker: i == 4,
|
||||
MaxHits: 5,
|
||||
}
|
||||
dm.SetThresholdProfile(context.Background(), rPrf, true)
|
||||
prfs = append(prfs, rPrf)
|
||||
ids.Add(rPrf.ID)
|
||||
}
|
||||
mres, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
|
||||
ThresholdIDs: ids.AsSlice(),
|
||||
CGREvent: new(utils.CGREvent),
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
defer mres.unlock()
|
||||
if len(mres) != 5 {
|
||||
t.Fatal("Expected 6 Thresholds")
|
||||
}
|
||||
for _, rPrf := range prfs[5:] {
|
||||
if rPrf.isLocked() {
|
||||
t.Errorf("Expected profile to not be locked %q", rPrf.ID)
|
||||
}
|
||||
if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
|
||||
t.Errorf("error %s for <%s>", err, rPrf.ID)
|
||||
} else if r.isLocked() {
|
||||
t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID)
|
||||
}
|
||||
}
|
||||
for _, rPrf := range prfs[:5] {
|
||||
if !rPrf.isLocked() {
|
||||
t.Errorf("Expected profile to be locked %q", rPrf.ID)
|
||||
}
|
||||
if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
|
||||
t.Errorf("error %s for <%s>", err, rPrf.ID)
|
||||
} else if !r.isLocked() {
|
||||
t.Fatalf("Expected Threshold to be locked %q", rPrf.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestThresholdMatchingThresholdForEventLocks3(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
prfs := make([]*ThresholdProfile, 0)
|
||||
tmp := Cache
|
||||
defer func() { Cache = tmp }()
|
||||
Cache = NewCacheS(cfg, nil, nil)
|
||||
db := &DataDBMock{
|
||||
GetThresholdProfileDrvF: func(ctx *context.Context, tnt, id string) (*ThresholdProfile, error) {
|
||||
if id == "TH1" {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
rPrf := &ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: id,
|
||||
MaxHits: 5,
|
||||
Weight: 20.00,
|
||||
}
|
||||
Cache.Set(ctx, utils.CacheThresholds, rPrf.TenantID(), &Threshold{
|
||||
Tenant: rPrf.Tenant,
|
||||
ID: rPrf.ID,
|
||||
}, nil, true, utils.NonTransactional)
|
||||
prfs = append(prfs, rPrf)
|
||||
return rPrf, nil
|
||||
},
|
||||
}
|
||||
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil)
|
||||
cfg.ThresholdSCfg().StoreInterval = 1
|
||||
cfg.ThresholdSCfg().StringIndexedFields = nil
|
||||
cfg.ThresholdSCfg().PrefixIndexedFields = nil
|
||||
rS := NewThresholdService(dm, cfg,
|
||||
&FilterS{dm: dm, cfg: cfg}, nil)
|
||||
|
||||
ids := utils.StringSet{}
|
||||
for i := 0; i < 10; i++ {
|
||||
ids.Add(fmt.Sprintf("TH%d", i))
|
||||
}
|
||||
_, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
|
||||
ThresholdIDs: ids.AsSlice(),
|
||||
CGREvent: new(utils.CGREvent),
|
||||
})
|
||||
if err != utils.ErrNotImplemented {
|
||||
t.Fatalf("Error: %+v", err)
|
||||
}
|
||||
for _, rPrf := range prfs {
|
||||
if rPrf.isLocked() {
|
||||
t.Fatalf("Expected profile to not be locked %q", rPrf.ID)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestThresholdMatchingThresholdForEventLocks4(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
tmp := Cache
|
||||
defer func() { Cache = tmp }()
|
||||
Cache = NewCacheS(cfg, nil, nil)
|
||||
db := NewInternalDB(nil, nil, true)
|
||||
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), nil)
|
||||
cfg.ThresholdSCfg().StoreInterval = 1
|
||||
cfg.ThresholdSCfg().StringIndexedFields = nil
|
||||
cfg.ThresholdSCfg().PrefixIndexedFields = nil
|
||||
rS := NewThresholdService(dm, cfg,
|
||||
&FilterS{dm: dm, cfg: cfg}, nil)
|
||||
|
||||
prfs := make([]*ThresholdProfile, 0)
|
||||
ids := utils.StringSet{}
|
||||
for i := 0; i < 10; i++ {
|
||||
rPrf := &ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: fmt.Sprintf("TH%d", i),
|
||||
Weight: 20.00,
|
||||
MaxHits: 5,
|
||||
}
|
||||
dm.SetThresholdProfile(context.Background(), rPrf, true)
|
||||
prfs = append(prfs, rPrf)
|
||||
ids.Add(rPrf.ID)
|
||||
}
|
||||
ids.Add("TH20")
|
||||
mres, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
|
||||
ThresholdIDs: ids.AsSlice(),
|
||||
CGREvent: new(utils.CGREvent),
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
defer mres.unlock()
|
||||
for _, rPrf := range prfs {
|
||||
if !rPrf.isLocked() {
|
||||
t.Errorf("Expected profile to be locked %q", rPrf.ID)
|
||||
}
|
||||
if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
|
||||
t.Errorf("error %s for <%s>", err, rPrf.ID)
|
||||
} else if !r.isLocked() {
|
||||
t.Fatalf("Expected Threshold to be locked %q", rPrf.ID)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestThresholdMatchingThresholdForEventLocks5(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
tmp := Cache
|
||||
tmpC := config.CgrConfig()
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
config.SetCgrConfig(tmpC)
|
||||
}()
|
||||
Cache = NewCacheS(cfg, nil, nil)
|
||||
db := NewInternalDB(nil, nil, true)
|
||||
dm := NewDataManager(db, config.CgrConfig().CacheCfg(), NewConnManager(cfg, make(map[string]chan birpc.ClientConnector)))
|
||||
cfg.ThresholdSCfg().StoreInterval = 1
|
||||
cfg.ThresholdSCfg().StringIndexedFields = nil
|
||||
cfg.ThresholdSCfg().PrefixIndexedFields = nil
|
||||
cfg.RPCConns()["test"] = &config.RPCConn{Conns: []*config.RemoteHost{{}}}
|
||||
cfg.DataDbCfg().RmtConns = []string{"test"}
|
||||
cfg.DataDbCfg().Items[utils.CacheThresholds].Remote = true
|
||||
config.SetCgrConfig(cfg)
|
||||
rS := NewThresholdService(dm, cfg,
|
||||
&FilterS{dm: dm, cfg: cfg}, nil)
|
||||
|
||||
prfs := make([]*ThresholdProfile, 0)
|
||||
ids := utils.StringSet{}
|
||||
for i := 0; i < 10; i++ {
|
||||
rPrf := &ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: fmt.Sprintf("TH%d", i),
|
||||
Weight: 20.00,
|
||||
MaxHits: 5,
|
||||
}
|
||||
dm.SetThresholdProfile(context.Background(), rPrf, true)
|
||||
prfs = append(prfs, rPrf)
|
||||
ids.Add(rPrf.ID)
|
||||
}
|
||||
dm.RemoveThreshold(context.Background(), "cgrates.org", "TH1")
|
||||
_, err := rS.matchingThresholdsForEvent(context.Background(), "cgrates.org", &ThresholdsArgsProcessEvent{
|
||||
ThresholdIDs: ids.AsSlice(),
|
||||
CGREvent: new(utils.CGREvent),
|
||||
})
|
||||
if err != utils.ErrDisconnected {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
for _, rPrf := range prfs {
|
||||
if rPrf.isLocked() {
|
||||
t.Fatalf("Expected profile to not be locked %q", rPrf.ID)
|
||||
}
|
||||
if rPrf.ID == "TH1" {
|
||||
continue
|
||||
}
|
||||
if r, err := dm.GetThreshold(context.Background(), rPrf.Tenant, rPrf.ID, true, false, utils.NonTransactional); err != nil {
|
||||
t.Errorf("error %s for <%s>", err, rPrf.ID)
|
||||
} else if r.isLocked() {
|
||||
t.Fatalf("Expected Threshold to not be locked %q", rPrf.ID)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user