added passing events from thresholds to ees

This commit is contained in:
gezimbll
2025-05-26 08:49:42 +02:00
committed by Dan Christian Bogos
parent 90de059801
commit ccdf3ef1f1
24 changed files with 288 additions and 121 deletions

View File

@@ -25,6 +25,8 @@ import (
"sync"
"time"
"slices"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
@@ -50,6 +52,7 @@ type ThresholdProfile struct {
Weight float64 // Weight to sort the thresholds
ActionIDs []string
Async bool
EeIDs []string
lkID string // holds the reference towards guardian lock key
}
@@ -172,6 +175,19 @@ func (t *Threshold) TenantID() string {
return utils.ConcatenatedKey(t.Tenant, t.ID)
}
type ThresholdConfig struct {
FilterIDs []string
ActivationInterval *utils.ActivationInterval
MaxHits int
MinHits int
MinSleep time.Duration
Blocker bool
Weight float64
ActionIDs []string
Async bool
EeIDs []string
}
// thresholdLockKey returns the ID used to lock a threshold with guardian
func thresholdLockKey(tnt, id string) string {
return utils.ConcatenatedKey(utils.CacheThresholds, tnt, id)
@@ -205,12 +221,6 @@ func (t *Threshold) isLocked() bool {
// ProcessEvent processes an ThresholdEvent
// concurrentActions limits the number of simultaneous action sets executed
func (t *Threshold) ProcessEvent(args *utils.CGREvent, dm *DataManager, fltrS *FilterS) (err error) {
if t.Snooze.After(time.Now()) || // snoozed, not executing actions
t.Hits < t.tPrfl.MinHits || // number of hits was not met, will not execute actions
(t.tPrfl.MaxHits != -1 &&
t.Hits > t.tPrfl.MaxHits) {
return
}
var tntAcnt string
var acnt string
if utils.IfaceAsString(args.APIOpts[utils.MetaEventType]) == utils.AccountUpdate {
@@ -224,7 +234,6 @@ func (t *Threshold) ProcessEvent(args *utils.CGREvent, dm *DataManager, fltrS *F
if acnt != utils.EmptyString {
tntAcnt = utils.ConcatenatedKey(args.Tenant, acnt)
}
for _, actionSetID := range t.tPrfl.ActionIDs {
at := &ActionTiming{
Uuid: utils.GenUUID(),
@@ -249,6 +258,79 @@ func (t *Threshold) ProcessEvent(args *utils.CGREvent, dm *DataManager, fltrS *F
return
}
// processEEs processes to the EEs for this threshold
func (t *Threshold) processEEs(opts map[string]any, thScfg *config.ThresholdSCfg, connMgr *ConnManager) (err error) {
if len(thScfg.EEsConns) == 0 {
return nil
}
var targetEeIDs []string
if len(t.tPrfl.EeIDs) > 0 {
targetEeIDs = t.tPrfl.EeIDs
} else {
isNone := slices.Contains(thScfg.EEsExporterIDs, utils.MetaNone)
if isNone {
targetEeIDs = []string{}
} else if len(thScfg.EEsExporterIDs) > 0 {
targetEeIDs = thScfg.EEsExporterIDs
}
}
if targetEeIDs == nil {
return nil // Nothing to do.
}
if opts == nil {
opts = make(map[string]any)
}
opts[utils.MetaEventType] = utils.ThresholdHit
cgrEv := &utils.CGREvent{
Tenant: t.Tenant,
ID: utils.GenUUID(),
Time: utils.TimePointer(time.Now()),
Event: map[string]any{
utils.EventType: utils.ThresholdHit,
utils.ID: t.ID,
utils.Hits: t.Hits,
utils.Snooze: t.Snooze,
utils.ThresholdConfig: ThresholdConfig{
FilterIDs: t.tPrfl.FilterIDs,
ActivationInterval: t.tPrfl.ActivationInterval,
MaxHits: t.tPrfl.MaxHits,
MinHits: t.tPrfl.MinHits,
MinSleep: t.tPrfl.MinSleep,
Blocker: t.tPrfl.Blocker,
Weight: t.tPrfl.Weight,
ActionIDs: t.tPrfl.ActionIDs,
Async: t.tPrfl.Async,
EeIDs: t.tPrfl.EeIDs,
},
},
APIOpts: opts,
}
cgrEventWithID := &CGREventWithEeIDs{
CGREvent: cgrEv,
EeIDs: targetEeIDs,
}
var reply map[string]map[string]any
if t.tPrfl.Async {
go func() {
if err := connMgr.Call(context.TODO(), thScfg.EEsConns,
utils.EeSv1ProcessEvent,
cgrEventWithID, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdS> error: %s processing event %+v with EEs.", err.Error(), cgrEv))
}
}()
} else if errExec := connMgr.Call(context.TODO(), thScfg.EEsConns,
utils.EeSv1ProcessEvent,
cgrEventWithID, &reply); errExec != nil &&
errExec.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdS> error: %s processing event %+v with EEs.", err.Error(), cgrEv))
err = utils.ErrPartiallyExecuted
}
return
}
// Thresholds is a sortable slice of Threshold
type Thresholds []*Threshold
@@ -268,11 +350,12 @@ func (ts Thresholds) unlock() {
}
// NewThresholdService the constructor for ThresoldS service
func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) *ThresholdService {
func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS, conn *ConnManager) *ThresholdService {
return &ThresholdService{
dm: dm,
cgrcfg: cgrcfg,
filterS: filterS,
dm: dm,
cgrcfg: cgrcfg,
filterS: filterS,
stopBackup: make(chan struct{}),
loopStopped: make(chan struct{}),
storedTdIDs: make(utils.StringSet),
@@ -288,6 +371,7 @@ type ThresholdService struct {
loopStopped chan struct{}
storedTdIDs utils.StringSet // keep a record of stats which need saving, map[statsTenantID]bool
stMux sync.RWMutex // protects storedTdIDs
connMgr *ConnManager
}
// Reload stops the backupLoop and restarts it
@@ -500,6 +584,12 @@ func (tS *ThresholdService) processEvent(tnt string, args *utils.CGREvent) (thre
for _, t := range matchTs {
thresholdsIDs = append(thresholdsIDs, t.ID)
t.Hits++
if t.Snooze.After(time.Now()) || // snoozed, not executing actions
t.Hits < t.tPrfl.MinHits || // number of hits was not met, will not execute actions
(t.tPrfl.MaxHits != -1 &&
t.Hits > t.tPrfl.MaxHits) {
continue
}
if err = t.ProcessEvent(args, tS.dm, tS.filterS); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> threshold: %s, ignoring event: %s, error: %s",
@@ -507,6 +597,11 @@ func (tS *ThresholdService) processEvent(tnt string, args *utils.CGREvent) (thre
withErrors = true
continue
}
if err = t.processEEs(args.APIOpts, tS.cgrcfg.ThresholdSCfg(), connMgr); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> threshold: %s processing with EEs.", err.Error()))
withErrors = true
}
if t.dirty == nil || t.Hits == t.tPrfl.MaxHits { // one time threshold
if err = tS.dm.RemoveThreshold(t.Tenant, t.ID); err != nil {
utils.Logger.Warning(