send ERs ProcessTime events to stats/thresholds

This commit is contained in:
ionutboangiu
2025-06-13 19:20:49 +03:00
committed by Dan Christian Bogos
parent 3acb1e94db
commit 7e1b4a5024
2 changed files with 56 additions and 4 deletions

View File

@@ -233,6 +233,7 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) {
// processEvent will be called each time a new event is received from readers
func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
rdrCfg *config.EventReaderCfg) (err error) {
startTime := time.Now()
// log the event created if requested by flags
if rdrCfg.Flags.Has(utils.MetaLog) {
utils.Logger.Info(
@@ -373,16 +374,65 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
if rdrCfg.Flags.Has(utils.MetaCDRs) &&
!rdrCfg.Flags.Has(utils.MetaDryRun) {
rplyCDRs := utils.StringPointer("")
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessCDR,
cgrEv, rplyCDRs)
if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns,
utils.SessionSv1ProcessCDR, cgrEv, rplyCDRs); err != nil {
return err
}
}
if rdrCfg.Flags.Has(utils.MetaExport) {
var reply map[string]map[string]any
return erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().EEsConns, utils.EeSv1ProcessEvent,
if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().EEsConns,
utils.EeSv1ProcessEvent,
&engine.CGREventWithEeIDs{
EeIDs: rdrCfg.EEsIDs,
CGREvent: cgrEv,
}, &reply)
}, &reply); err != nil {
return err
}
}
endTime := time.Now()
if rdrCfg.Flags.Has(utils.MetaDryRun) {
return nil
}
rawStatIDs := rdrCfg.Flags.ParamValue(utils.MetaERsStats)
rawThIDs := rdrCfg.Flags.ParamValue(utils.MetaERsThresholds)
// Early return if nothing to process.
if rawStatIDs == "" && rawThIDs == "" {
return nil
}
// Clone is needed to prevent data races if requests are sent
// asynchronously.
ev := cgrEv.Clone()
ev.Event[utils.StartTime] = startTime
ev.Event[utils.EndTime] = endTime
ev.Event[utils.ProcessingTime] = endTime.Sub(startTime)
ev.Event[utils.Source] = utils.ERs
ev.APIOpts[utils.MetaEventType] = utils.ProcessTime
if rawStatIDs != "" {
statIDs := strings.Split(rawStatIDs, utils.ANDSep)
ev.APIOpts[utils.OptsStatsProfileIDs] = statIDs
var reply []string
if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().StatSConns,
utils.StatSv1ProcessEvent, ev, &reply); err != nil {
return fmt.Errorf("failed to process event in %s: %v",
utils.StatS, err)
}
// NOTE: ProfileIDs APIOpts key persists for the ThresholdS request,
// although it would be ignored. Might want to delete it.
}
if rawThIDs != "" {
thIDs := strings.Split(rawThIDs, utils.ANDSep)
ev.APIOpts[utils.OptsThresholdsProfileIDs] = thIDs
var reply []string
if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, ev, &reply); err != nil {
return fmt.Errorf("failed to process event in %s: %v",
utils.ThresholdS, err)
}
}
return

View File

@@ -745,6 +745,8 @@ const (
MetaHAThresholds = "*haThresholds"
MetaSAStats = "*saStats"
MetaSAThresholds = "*saThresholds"
MetaERsStats = "*ersStats"
MetaERsThresholds = "*ersThresholds"
MetaDryRun = "*dryrun"
MetaRALsDryRun = "*ralsDryRun"
Event = "Event"