updated threshold implementation

This commit is contained in:
gezimbll
2025-06-02 18:40:11 +02:00
committed by Dan Christian Bogos
parent 5444973349
commit 2d94ca89bb
7 changed files with 92 additions and 262 deletions

View File

@@ -826,7 +826,7 @@ func testInternalMatchThreshold(t *testing.T) {
utils.AccountField: "1001",
},
}
if err := internalRPC.Call(context.Background(), utils.ThresholdSv1ProcessEvent, ev2, &ids); err == nil || err.Error() != utils.ErrNotFound.Error() {
if err := internalRPC.Call(context.Background(), utils.ThresholdSv1ProcessEvent, ev2, &ids); err != nil {
t.Error(err)
}

View File

@@ -418,7 +418,7 @@ func testV1TSProcessEvent(t *testing.T) {
func testV1TSGetThresholdsAfterProcess(t *testing.T) {
var tIDs []string
expectedIDs := []string{"THD_RES_1", "THD_STATS_2", "THD_STATS_1", "THD_ACNT_BALANCE_1", "THD_ACNT_EXPIRED"}
expectedIDs := []string{"THD_RES_1", "THD_STATS_2", "THD_STATS_1", "THD_ACNT_BALANCE_1", "THD_ACNT_EXPIRED", "THD_CDRS_1", "THD_STATS_3"}
if err := tSv1Rpc.Call(context.Background(), utils.ThresholdSv1GetThresholdIDs,
&utils.TenantWithAPIOpts{Tenant: "cgrates.org"}, &tIDs); err != nil {
t.Error(err)
@@ -679,9 +679,10 @@ func testV1TSMaxHits(t *testing.T) {
}
//check threshold after third process (reached the maximum hits and should be removed)
if err := tSv1Rpc.Call(context.Background(), utils.ThresholdSv1GetThreshold,
&utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "TH3"}}, &td); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Err : %+v \n, td : %+v", err, utils.ToJSON(td))
&utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "TH3"}}, &td); err != nil {
t.Error(err)
} else if td.Hits != 3 {
t.Errorf("expected to reach MaxHits")
}
}
@@ -844,7 +845,7 @@ func testv1TSGetThresholdProfileIDsCount(t *testing.T) {
func testV1TSProcessEventWithoutTenant(t *testing.T) {
var ids []string
eIDs := []string{"TH4"}
eIDs := []string{}
thEvent := &utils.CGREvent{ // hitting TH4
ID: "event1",
Event: map[string]any{

View File

@@ -244,7 +244,7 @@ func testDspThTestAuthKey3(t *testing.T) {
}
var ids []string
eIDs := []string{"THD_ACNT_1002"}
eIDs := []string{"THD_ACNT_1001", "THD_ACNT_1002"}
if err := dispEngine.RPC.Call(context.Background(), utils.ThresholdSv1GetThresholdIDs, &utils.TenantWithAPIOpts{
Tenant: "cgrates.org",

View File

@@ -218,59 +218,19 @@ func (t *Threshold) isLocked() bool {
return t.lkID != utils.EmptyString
}
// ProcessEvent processes an ThresholdEvent
// concurrentActions limits the number of simultaneous action sets executed
func (t *Threshold) ProcessEvent(args *utils.CGREvent, dm *DataManager, fltrS *FilterS) (err error) {
var tntAcnt string
var acnt string
if utils.IfaceAsString(args.APIOpts[utils.MetaEventType]) == utils.AccountUpdate {
acnt, _ = args.FieldAsString(utils.ID)
} else {
acnt, _ = args.FieldAsString(utils.AccountField)
}
if _, has := args.APIOpts[utils.MetaAccountID]; has {
acnt, _ = args.OptAsString(utils.MetaAccountID)
}
if acnt != utils.EmptyString {
tntAcnt = utils.ConcatenatedKey(args.Tenant, acnt)
}
for _, actionSetID := range t.tPrfl.ActionIDs {
at := &ActionTiming{
Uuid: utils.GenUUID(),
ActionsID: actionSetID,
ExtraData: args,
}
if tntAcnt != utils.EmptyString {
at.accountIDs = utils.NewStringMap(tntAcnt)
}
if t.tPrfl.Async {
go func(setID string) {
if errExec := at.Execute(fltrS, utils.ThresholdS); errExec != nil {
utils.Logger.Warning(fmt.Sprintf("<ThresholdS> failed executing actions: %s, error: %s", setID, errExec.Error()))
}
}(actionSetID)
} else if errExec := at.Execute(fltrS, utils.ThresholdS); errExec != nil {
utils.Logger.Warning(fmt.Sprintf("<ThresholdS> failed executing actions: %s, error: %s", actionSetID, errExec.Error()))
err = utils.ErrPartiallyExecuted
}
}
t.Snooze = time.Now().Add(t.tPrfl.MinSleep)
return
}
// processEEs processes to the EEs for this threshold
func (t *Threshold) processEEs(opts map[string]any, thScfg *config.ThresholdSCfg, connMgr *ConnManager) (err error) {
func (t *ThresholdService) processEEs(opts map[string]any, th *Threshold) (err error) {
var targetEeIDs []string
if len(t.tPrfl.EeIDs) > 0 {
targetEeIDs = t.tPrfl.EeIDs
if isNone := slices.Contains(t.tPrfl.EeIDs, utils.MetaNone); isNone {
if len(th.tPrfl.EeIDs) > 0 {
targetEeIDs = th.tPrfl.EeIDs
if isNone := slices.Contains(th.tPrfl.EeIDs, utils.MetaNone); isNone {
targetEeIDs = []string{}
}
} else {
targetEeIDs = thScfg.EEsExporterIDs
targetEeIDs = t.cgrcfg.ThresholdSCfg().EEsExporterIDs
}
if len(targetEeIDs) > 0 {
if len(thScfg.EEsConns) == 0 {
if len(t.cgrcfg.ThresholdSCfg().EEsConns) == 0 {
return utils.NewErrNotConnected(utils.EEs)
}
} else {
@@ -279,30 +239,30 @@ func (t *Threshold) processEEs(opts map[string]any, thScfg *config.ThresholdSCfg
if opts == nil {
opts = make(map[string]any)
}
sortedFilterIDs := make([]string, len(t.tPrfl.FilterIDs))
copy(sortedFilterIDs, t.tPrfl.FilterIDs)
sortedFilterIDs := make([]string, len(th.tPrfl.FilterIDs))
copy(sortedFilterIDs, th.tPrfl.FilterIDs)
slices.Sort(sortedFilterIDs)
opts[utils.MetaEventType] = utils.ThresholdHit
cgrEv := &utils.CGREvent{
Tenant: t.Tenant,
Tenant: th.Tenant,
ID: utils.GenUUID(),
Time: utils.TimePointer(time.Now()),
Event: map[string]any{
utils.EventType: utils.ThresholdHit,
utils.ID: t.ID,
utils.Hits: t.Hits,
utils.Snooze: t.Snooze,
utils.ID: th.ID,
utils.Hits: th.Hits,
utils.Snooze: th.Snooze,
utils.ThresholdConfig: ThresholdConfig{
FilterIDs: sortedFilterIDs,
ActivationInterval: t.tPrfl.ActivationInterval,
MaxHits: t.tPrfl.MaxHits,
MinHits: t.tPrfl.MinHits,
MinSleep: t.tPrfl.MinSleep,
Blocker: t.tPrfl.Blocker,
Weight: t.tPrfl.Weight,
ActionIDs: t.tPrfl.ActionIDs,
Async: t.tPrfl.Async,
EeIDs: t.tPrfl.EeIDs,
ActivationInterval: th.tPrfl.ActivationInterval,
MaxHits: th.tPrfl.MaxHits,
MinHits: th.tPrfl.MinHits,
MinSleep: th.tPrfl.MinSleep,
Blocker: th.tPrfl.Blocker,
Weight: th.tPrfl.Weight,
ActionIDs: th.tPrfl.ActionIDs,
Async: th.tPrfl.Async,
EeIDs: th.tPrfl.EeIDs,
},
},
APIOpts: opts,
@@ -312,9 +272,9 @@ func (t *Threshold) processEEs(opts map[string]any, thScfg *config.ThresholdSCfg
EeIDs: targetEeIDs,
}
var reply map[string]map[string]any
if t.tPrfl.Async {
if th.tPrfl.Async {
go func() {
if err := connMgr.Call(context.TODO(), thScfg.EEsConns,
if err := t.connMgr.Call(context.TODO(), t.cgrcfg.ThresholdSCfg().EEsConns,
utils.EeSv1ProcessEvent,
cgrEventWithID, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
@@ -322,7 +282,7 @@ func (t *Threshold) processEEs(opts map[string]any, thScfg *config.ThresholdSCfg
fmt.Sprintf("<ThresholdS> error: %s processing event %+v with EEs.", err.Error(), cgrEv))
}
}()
} else if errExec := connMgr.Call(context.TODO(), thScfg.EEsConns,
} else if errExec := t.connMgr.Call(context.TODO(), t.cgrcfg.ThresholdSCfg().EEsConns,
utils.EeSv1ProcessEvent,
cgrEventWithID, &reply); errExec != nil &&
errExec.Error() != utils.ErrNotFound.Error() {
@@ -354,10 +314,10 @@ func (ts Thresholds) unlock() {
// NewThresholdService the constructor for ThresoldS service
func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS, conn *ConnManager) *ThresholdService {
return &ThresholdService{
dm: dm,
cgrcfg: cgrcfg,
filterS: filterS,
dm: dm,
cgrcfg: cgrcfg,
filterS: filterS,
connMgr: conn,
stopBackup: make(chan struct{}),
loopStopped: make(chan struct{}),
storedTdIDs: make(utils.StringSet),
@@ -584,46 +544,55 @@ func (tS *ThresholdService) processEvent(tnt string, args *utils.CGREvent) (thre
var withErrors bool
thresholdsIDs = make([]string, 0, len(matchTs))
for _, t := range matchTs {
if t.tPrfl.MaxHits != -1 && t.Hits >= t.tPrfl.MaxHits {
continue // MaxHits will disable the threshold
}
thresholdsIDs = append(thresholdsIDs, t.ID)
t.Hits++
if t.Snooze.After(time.Now()) || // snoozed, not executing actions
t.Hits < t.tPrfl.MinHits || // number of hits was not met, will not execute actions
(t.tPrfl.MaxHits != -1 &&
t.Hits > t.tPrfl.MaxHits) {
continue
}
if err = t.ProcessEvent(args, tS.dm, tS.filterS); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> threshold: %s, ignoring event: %s, error: %s",
t.TenantID(), utils.ConcatenatedKey(tnt, args.ID), err.Error()))
withErrors = true
continue
}
if err = t.processEEs(args.APIOpts, tS.cgrcfg.ThresholdSCfg(), connMgr); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> received error: %s when processing with EEs.", err.Error()))
withErrors = true
}
if t.dirty == nil || t.Hits == t.tPrfl.MaxHits { // one time threshold
if err = tS.dm.RemoveThreshold(t.Tenant, t.ID); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> failed removing from database non-recurrent threshold: %s, error: %s",
t.TenantID(), err.Error()))
withErrors = true
if time.Now().After(t.Snooze) && // snoozed, not executing actions
t.Hits >= t.tPrfl.MinHits && // number of hits was not met, will not execute actions
(t.tPrfl.MaxHits == -1 ||
t.Hits <= t.tPrfl.MaxHits) {
var tntAcnt string
var acnt string
if utils.IfaceAsString(args.APIOpts[utils.MetaEventType]) == utils.AccountUpdate {
acnt, _ = args.FieldAsString(utils.ID)
} else {
acnt, _ = args.FieldAsString(utils.AccountField)
}
//since we don't handle in DataManager caching we do a manual remove here
if tntID := t.TenantID(); Cache.HasItem(utils.CacheThresholds, tntID) { // only cache if previously there
if err = Cache.Set(utils.CacheThresholds, tntID, nil, nil,
true, utils.NonTransactional); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> failed removing from cache non-recurrent threshold: %s, error: %s",
t.TenantID(), err.Error()))
if _, has := args.APIOpts[utils.MetaAccountID]; has {
acnt, _ = args.OptAsString(utils.MetaAccountID)
}
if acnt != utils.EmptyString {
tntAcnt = utils.ConcatenatedKey(args.Tenant, acnt)
}
for _, actionSetID := range t.tPrfl.ActionIDs {
at := &ActionTiming{
Uuid: utils.GenUUID(),
ActionsID: actionSetID,
ExtraData: args,
}
if tntAcnt != utils.EmptyString {
at.accountIDs = utils.NewStringMap(tntAcnt)
}
if t.tPrfl.Async {
go func(setID string) {
if errExec := at.Execute(tS.filterS, utils.ThresholdS); errExec != nil {
utils.Logger.Warning(fmt.Sprintf("<ThresholdS> failed executing actions: %s, error: %s", setID, errExec.Error()))
}
}(actionSetID)
} else if errExec := at.Execute(tS.filterS, utils.ThresholdS); errExec != nil {
utils.Logger.Warning(fmt.Sprintf("<ThresholdS> failed executing actions: %s, error: %s", actionSetID, errExec.Error()))
withErrors = true
}
}
continue
t.Snooze = time.Now().Add(t.tPrfl.MinSleep)
if err = tS.processEEs(args.APIOpts, t); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> received error: %s when processing with EEs.", err.Error()))
withErrors = true
}
}
// recurrent threshold
*t.dirty = true // mark it to be saved
if tS.cgrcfg.ThresholdSCfg().StoreInterval == -1 {
tS.StoreThreshold(t)
@@ -722,6 +691,8 @@ func (tS *ThresholdService) V1GetThreshold(ctx *context.Context, tntID *utils.Te
}
// V1ResetThreshold resets the threshold hits
// If the threshold does not exist (e.g., removed after MaxHits), it attempts to recreate it
// based on its ThresholdProfile.
func (tS *ThresholdService) V1ResetThreshold(ctx *context.Context, tntID *utils.TenantID, rply *string) (err error) {
var thd *Threshold
tnt := tntID.Tenant
@@ -734,7 +705,8 @@ func (tS *ThresholdService) V1ResetThreshold(ctx *context.Context, tntID *utils.
thresholdLockKey(tnt, tntID.ID))
defer guardian.Guardian.UnguardIDs(lkID)
if thd, err = tS.dm.GetThreshold(tnt, tntID.ID, true, true, ""); err != nil {
return
utils.Logger.Warning(fmt.Sprintf("threshold with ID %s not found", tntID.ID))
return err
}
if thd.Hits != 0 {
thd.Hits = 0
@@ -742,7 +714,7 @@ func (tS *ThresholdService) V1ResetThreshold(ctx *context.Context, tntID *utils.
thd.dirty = utils.BoolPointer(true) // mark it to be saved
if tS.cgrcfg.ThresholdSCfg().StoreInterval == -1 {
if err = tS.StoreThreshold(thd); err != nil {
return
return err
}
} else {
tS.stMux.Lock()
@@ -751,5 +723,5 @@ func (tS *ThresholdService) V1ResetThreshold(ctx *context.Context, tntID *utils.
}
}
*rply = utils.OK
return
return nil
}

View File

@@ -579,104 +579,6 @@ func TestThresholdsUpdateThreshold(t *testing.T) {
}
}
func TestThresholdsProcessEventAccountUpdateErrPartExec(t *testing.T) {
utils.Logger.SetLogLevel(4)
utils.Logger.SetSyslog(nil)
var buf bytes.Buffer
log.SetOutput(&buf)
defer func() {
log.SetOutput(os.Stderr)
}()
thPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: "TH1",
FilterIDs: []string{"*string:~*req.Account:1001"},
MinHits: 2,
MaxHits: 5,
Weight: 10,
ActionIDs: []string{"actPrf"},
}
th := &Threshold{
Tenant: "cgrates.org",
ID: "TH1",
Hits: 2,
tPrfl: thPrf,
}
args := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "ThresholdProcessEvent",
Event: map[string]any{
utils.AccountField: "1001",
},
APIOpts: map[string]any{
utils.MetaEventType: utils.AccountUpdate,
utils.OptsThresholdsProfileIDs: []string{"TH1"},
},
}
expLog := `[WARNING] <ThresholdS> failed executing actions: actPrf, error: NOT_FOUND`
if err := th.ProcessEvent(args, dm, nil); err == nil ||
err != utils.ErrPartiallyExecuted {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrPartiallyExecuted, err)
}
if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog) {
t.Errorf("expected log <%+v> \nto be included in: <%+v>", expLog, rcvLog)
}
utils.Logger.SetLogLevel(0)
}
func TestThresholdsProcessEventAsyncExecErr(t *testing.T) {
utils.Logger.SetLogLevel(4)
utils.Logger.SetSyslog(nil)
var buf bytes.Buffer
log.SetOutput(&buf)
defer func() {
log.SetOutput(os.Stderr)
}()
thPrf := &ThresholdProfile{
Tenant: "cgrates.org",
ID: "TH1",
FilterIDs: []string{"*string:~*req.Account:1001"},
MinHits: 2,
MaxHits: 5,
Weight: 10,
ActionIDs: []string{"actPrf"},
Async: true,
}
th := &Threshold{
Tenant: "cgrates.org",
ID: "TH1",
Hits: 2,
tPrfl: thPrf,
}
args := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "ThresholdProcessEvent",
Event: map[string]any{
utils.AccountField: "1001",
},
APIOpts: map[string]any{
utils.OptsThresholdsProfileIDs: []string{"TH1"},
},
}
expLog := `[WARNING] <ThresholdS> failed executing actions: actPrf, error: NOT_FOUND`
if err := th.ProcessEvent(args, dm, nil); err != nil {
t.Error(err)
}
time.Sleep(10 * time.Millisecond)
if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog) {
t.Errorf("expected log <%+v> \nto be included in: <%+v>", expLog, rcvLog)
}
utils.Logger.SetLogLevel(0)
}
func TestThresholdsShutdown(t *testing.T) {
utils.Logger.SetLogLevel(6)
utils.Logger.SetSyslog(nil)
@@ -1031,21 +933,11 @@ func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) {
},
}
expLog1 := `[WARNING] <ThresholdService> failed removing from database non-recurrent threshold: cgrates.org:TH3, error: NO_DATABASE_CONNECTION`
expLog2 := `[WARNING] <ThresholdService> failed removing from cache non-recurrent threshold: cgrates.org:TH3, error: DISCONNECTED`
if _, err := tS.processEvent(args.Tenant, args); err == nil ||
err != utils.ErrPartiallyExecuted {
if _, err := tS.processEvent(args.Tenant, args); err != nil {
t.Errorf("expected: <%+v>, \nreceived: <%+v>",
utils.ErrPartiallyExecuted, err)
}
if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog1) ||
!strings.Contains(rcvLog, expLog2) {
t.Errorf("expected: <%+v> and <%+v> , received: <%+v>",
expLog1, expLog2, rcvLog)
}
utils.Logger.SetLogLevel(0)
}
@@ -2271,40 +2163,3 @@ func TestThresholdsStoreThresholdCacheSetErr(t *testing.T) {
utils.Logger.SetLogLevel(0)
}
func TestThresholdSnoozeSleep(t *testing.T) {
th := &Threshold{
Tenant: "cgrates.org",
ID: "th_counter",
tPrfl: &ThresholdProfile{
MaxHits: -1,
MinHits: 1,
Blocker: true,
Weight: 30,
MinSleep: 3 * time.Second,
Async: true,
},
}
cfg := config.NewDefaultCGRConfig()
db, dErr := NewInternalDB(nil, nil, true, nil, cfg.DataDbCfg().Items)
if dErr != nil {
t.Error(dErr)
}
dm := NewDataManager(db, cfg.CacheCfg(), nil)
fs := NewFilterS(cfg, nil, dm)
var snoozeTime time.Time
for i, arg := range testThresholdArgs {
th.ProcessEvent(arg, dm, fs)
if i > 0 {
if th.Snooze.Equal(snoozeTime) {
t.Error("expecte snooze change time")
}
} else {
snoozeTime = th.Snooze
}
}
}

View File

@@ -256,8 +256,10 @@ func testAccWThdGetAccountAfterDebit(t *testing.T) {
func testAccWThdGetThresholdAfterDebit(t *testing.T) {
var result2 *engine.Threshold
if err := accWThdRpc.Call(context.Background(), utils.ThresholdSv1GetThreshold, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "THD_ACNT_1002"}}, &result2); err == nil || err.Error() != utils.ErrNotFound.Error() {
if err := accWThdRpc.Call(context.Background(), utils.ThresholdSv1GetThreshold, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "THD_ACNT_1002"}}, &result2); err != nil {
t.Error(err)
} else if result2.Hits != 1 {
t.Errorf("expected to have reached MaxHits")
}
}

View File

@@ -300,8 +300,8 @@ func testFraudAuthorizeandProcess1(t *testing.T) {
}
var reply string
if err := fraudRPC.Call(context.Background(), utils.SessionSv1ProcessCDR,
cgrEv, &reply); err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrPartiallyExecuted, err)
cgrEv, &reply); err != nil {
t.Error(err)
}
}
@@ -350,8 +350,8 @@ func testFraudAuthorizeandProcess2(t *testing.T) {
}
var reply string
if err := fraudRPC.Call(context.Background(), utils.SessionSv1ProcessCDR,
cgrEv, &reply); err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrPartiallyExecuted, err)
cgrEv, &reply); err != nil {
t.Error(err)
}
}