From 7e1b4a50241612c505eeec3ee830e3dcb6349832 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Fri, 13 Jun 2025 19:20:49 +0300 Subject: [PATCH] send ERs ProcessTime events to stats/thresholds --- ers/ers.go | 58 +++++++++++++++++++++++++++++++++++++++++++++---- utils/consts.go | 2 ++ 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/ers/ers.go b/ers/ers.go index 68a3f9e08..369bbd53c 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -233,6 +233,7 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) { // processEvent will be called each time a new event is received from readers func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventReaderCfg) (err error) { + startTime := time.Now() // log the event created if requested by flags if rdrCfg.Flags.Has(utils.MetaLog) { utils.Logger.Info( @@ -373,16 +374,65 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, if rdrCfg.Flags.Has(utils.MetaCDRs) && !rdrCfg.Flags.Has(utils.MetaDryRun) { rplyCDRs := utils.StringPointer("") - err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessCDR, - cgrEv, rplyCDRs) + if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, + utils.SessionSv1ProcessCDR, cgrEv, rplyCDRs); err != nil { + return err + } } if rdrCfg.Flags.Has(utils.MetaExport) { var reply map[string]map[string]any - return erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().EEsConns, utils.EeSv1ProcessEvent, + if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().EEsConns, + utils.EeSv1ProcessEvent, &engine.CGREventWithEeIDs{ EeIDs: rdrCfg.EEsIDs, CGREvent: cgrEv, - }, &reply) + }, &reply); err != nil { + return err + } + } + endTime := time.Now() + if rdrCfg.Flags.Has(utils.MetaDryRun) { + return nil + } + rawStatIDs := rdrCfg.Flags.ParamValue(utils.MetaERsStats) + rawThIDs := rdrCfg.Flags.ParamValue(utils.MetaERsThresholds) + + // Early return if nothing to process. + if rawStatIDs == "" && rawThIDs == "" { + return nil + } + + // Clone is needed to prevent data races if requests are sent + // asynchronously. + ev := cgrEv.Clone() + + ev.Event[utils.StartTime] = startTime + ev.Event[utils.EndTime] = endTime + ev.Event[utils.ProcessingTime] = endTime.Sub(startTime) + ev.Event[utils.Source] = utils.ERs + ev.APIOpts[utils.MetaEventType] = utils.ProcessTime + + if rawStatIDs != "" { + statIDs := strings.Split(rawStatIDs, utils.ANDSep) + ev.APIOpts[utils.OptsStatsProfileIDs] = statIDs + var reply []string + if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().StatSConns, + utils.StatSv1ProcessEvent, ev, &reply); err != nil { + return fmt.Errorf("failed to process event in %s: %v", + utils.StatS, err) + } + // NOTE: ProfileIDs APIOpts key persists for the ThresholdS request, + // although it would be ignored. Might want to delete it. + } + if rawThIDs != "" { + thIDs := strings.Split(rawThIDs, utils.ANDSep) + ev.APIOpts[utils.OptsThresholdsProfileIDs] = thIDs + var reply []string + if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().ThresholdSConns, + utils.ThresholdSv1ProcessEvent, ev, &reply); err != nil { + return fmt.Errorf("failed to process event in %s: %v", + utils.ThresholdS, err) + } } return diff --git a/utils/consts.go b/utils/consts.go index 7aa94ff36..51492ad85 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -745,6 +745,8 @@ const ( MetaHAThresholds = "*haThresholds" MetaSAStats = "*saStats" MetaSAThresholds = "*saThresholds" + MetaERsStats = "*ersStats" + MetaERsThresholds = "*ersThresholds" MetaDryRun = "*dryrun" MetaRALsDryRun = "*ralsDryRun" Event = "Event"