From daf91dc3fa58184b378a3dbe3b8cb0d2503dcf51 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 4 Nov 2025 13:52:41 +0200 Subject: [PATCH] Add ReplyState field to track successful/failed requests --- agents/libagents.go | 27 +++++++++++++++++ agents/radagent.go | 32 +++++++++++++++++++- agents/sipagent.go | 9 ++++++ ers/ers.go | 68 +++++++++++++++++++++++++++++++++++++++++ utils/consts.go | 73 ++++++++++++++++++++++++++------------------- 5 files changed, 178 insertions(+), 31 deletions(-) diff --git a/agents/libagents.go b/agents/libagents.go index b4155bace..4f991874d 100644 --- a/agents/libagents.go +++ b/agents/libagents.go @@ -59,6 +59,8 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, fmt.Sprintf("<%s> LOG, processorID: %s, %s message: %s", agentName, reqProcessor.ID, strings.ToLower(agentName[:len(agentName)-5]), agReq.Request.String())) } + + replyState := utils.OK switch reqType { default: return false, fmt.Errorf("unknown request type: <%s>", reqType) @@ -72,6 +74,9 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, sessions.ApplyFlags(reqType, reqProcessor.Flags, cgrEv.APIOpts) err = connMgr.Call(ctx, sessionsConns, utils.SessionSv1AuthorizeEvent, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateAuthorize + } rply.SetMaxUsageNeeded(utils.OptAsBool(cgrEv.APIOpts, utils.MetaAccounts)) agReq.setCGRReply(rply, err) case utils.MetaInitiate: @@ -79,6 +84,9 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, sessions.ApplyFlags(reqType, reqProcessor.Flags, cgrEv.APIOpts) err = connMgr.Call(ctx, sessionsConns, utils.SessionSv1InitiateSession, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateInitiate + } rply.SetMaxUsageNeeded(utils.OptAsBool(cgrEv.APIOpts, utils.MetaInitiate)) agReq.setCGRReply(rply, err) case utils.MetaUpdate: @@ -86,6 +94,9 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, sessions.ApplyFlags(reqType, reqProcessor.Flags, cgrEv.APIOpts) err = connMgr.Call(ctx, sessionsConns, utils.SessionSv1UpdateSession, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateUpdate + } rply.SetMaxUsageNeeded(utils.OptAsBool(cgrEv.APIOpts, utils.MetaUpdate)) agReq.setCGRReply(rply, err) case utils.MetaTerminate: @@ -93,11 +104,17 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, sessions.ApplyFlags(reqType, reqProcessor.Flags, cgrEv.APIOpts) err = connMgr.Call(ctx, sessionsConns, utils.SessionSv1TerminateSession, cgrEv, &rply) + if err != nil { + replyState = utils.ErrReplyStateTerminate + } agReq.setCGRReply(nil, err) case utils.MetaMessage: rply := new(sessions.V1ProcessMessageReply) err = connMgr.Call(ctx, sessionsConns, utils.SessionSv1ProcessMessage, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateMessage + } // if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { // cgrEv.Event[utils.Usage] = 0 // avoid further debits messageS := utils.OptAsBool(cgrEv.APIOpts, utils.OptsSesMessage) @@ -110,6 +127,9 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, rply := new(sessions.V1ProcessEventReply) err = connMgr.Call(ctx, sessionsConns, utils.SessionSv1ProcessEvent, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateEvent + } // if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { // cgrEv.Event[utils.Usage] = 0 // avoid further debits // } else if needsMaxUsage(reqProcessor.Flags[utils.MetaRALs]) { @@ -118,6 +138,7 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, agReq.setCGRReply(rply, err) case utils.MetaCDRs: // allow CDR processing } + // separate request so we can capture the Terminate/Event also here if reqProcessor.Flags.GetBool(utils.MetaCDRs) && !reqProcessor.Flags.Has(utils.MetaDryRun) { @@ -125,6 +146,11 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, if err = connMgr.Call(ctx, sessionsConns, utils.SessionSv1ProcessCDR, cgrEv, &rplyCDRs); err != nil { agReq.CGRReply.Map[utils.Error] = utils.NewLeafNode(err.Error()) + if replyState == utils.OK { + replyState = utils.ErrReplyStateCDRs + } else { + replyState += ";" + utils.ErrReplyStateCDRs + } } } if err = agReq.SetFields(reqProcessor.ReplyFields); err != nil { @@ -167,6 +193,7 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, // asynchronously. ev := cgrEv.Clone() + ev.Event[utils.ReplyState] = replyState ev.Event[utils.StartTime] = startTime ev.Event[utils.EndTime] = endTime ev.Event[utils.ProcessingTime] = endTime.Sub(startTime) diff --git a/agents/radagent.go b/agents/radagent.go index 6b31dc419..66c343f74 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -336,6 +336,8 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R fmt.Sprintf("<%s> LOG, processorID: %s, radius message: %s", utils.RadiusAgent, reqProcessor.ID, agReq.Request.String())) } + + replyState := utils.OK switch reqType { default: return false, fmt.Errorf("unknown request type: <%s>", reqType) @@ -349,6 +351,9 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R sessions.ApplyFlags(reqType, reqProcessor.Flags, cgrEv.APIOpts) err = ra.cm.Call(ra.ctx, ra.cfg.RadiusAgentCfg().SessionSConns, utils.SessionSv1AuthorizeEvent, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateAuthorize + } rply.SetMaxUsageNeeded(utils.OptAsBool(cgrEv.APIOpts, utils.MetaAccounts)) agReq.setCGRReply(rply, err) case utils.MetaInitiate: @@ -356,6 +361,9 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R sessions.ApplyFlags(reqType, reqProcessor.Flags, cgrEv.APIOpts) err = ra.cm.Call(ra.ctx, ra.cfg.RadiusAgentCfg().SessionSConns, utils.SessionSv1InitiateSession, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateInitiate + } rply.SetMaxUsageNeeded(utils.OptAsBool(cgrEv.APIOpts, utils.MetaInitiate)) agReq.setCGRReply(rply, err) case utils.MetaUpdate: @@ -363,6 +371,9 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R sessions.ApplyFlags(reqType, reqProcessor.Flags, cgrEv.APIOpts) err = ra.cm.Call(ra.ctx, ra.cfg.RadiusAgentCfg().SessionSConns, utils.SessionSv1UpdateSession, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateUpdate + } rply.SetMaxUsageNeeded(utils.OptAsBool(cgrEv.APIOpts, utils.MetaUpdate)) agReq.setCGRReply(rply, err) case utils.MetaTerminate: @@ -370,10 +381,16 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R sessions.ApplyFlags(reqType, reqProcessor.Flags, cgrEv.APIOpts) err = ra.cm.Call(ra.ctx, ra.cfg.RadiusAgentCfg().SessionSConns, utils.SessionSv1TerminateSession, cgrEv, &rply) + if err != nil { + replyState = utils.ErrReplyStateTerminate + } agReq.setCGRReply(nil, err) case utils.MetaMessage: rply := new(sessions.V1ProcessMessageReply) err = ra.cm.Call(ra.ctx, ra.cfg.RadiusAgentCfg().SessionSConns, utils.SessionSv1ProcessMessage, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateMessage + } // if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { // cgrEv.Event[utils.Usage] = 0 // avoid further debits // } else @@ -387,6 +404,9 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R rply := new(sessions.V1ProcessEventReply) err = ra.cm.Call(ra.ctx, ra.cfg.RadiusAgentCfg().SessionSConns, utils.SessionSv1ProcessEvent, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateEvent + } // if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { // cgrEv.Event[utils.Usage] = 0 // avoid further debits // } else @@ -396,9 +416,13 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R agReq.setCGRReply(rply, err) case utils.MetaCDRs: // allow this method case utils.MetaRadauth: - if pass, err := radauthReq(reqProcessor.Flags, req, agReq, rpl); err != nil { + var pass bool + if pass, err = radauthReq(reqProcessor.Flags, req, agReq, rpl); err != nil { + replyState = utils.ErrReplyStateRadauth agReq.CGRReply.Map[utils.Error] = utils.NewLeafNode(err.Error()) } else if !pass { + // Assume failed auth counts as a failed request. + replyState = utils.ErrReplyStateRadauth agReq.CGRReply.Map[utils.Error] = utils.NewLeafNode(utils.RadauthFailed) } } @@ -409,6 +433,11 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R if err = ra.cm.Call(ra.ctx, ra.cfg.RadiusAgentCfg().SessionSConns, utils.SessionSv1ProcessCDR, cgrEv, &rplyCDRs); err != nil { agReq.CGRReply.Map[utils.Error] = utils.NewLeafNode(err.Error()) + if replyState == utils.OK { + replyState = utils.ErrReplyStateCDRs + } else { + replyState += ";" + utils.ErrReplyStateCDRs + } } } @@ -442,6 +471,7 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R // asynchronously. ev := cgrEv.Clone() + ev.Event[utils.ReplyState] = replyState ev.Event[utils.StartTime] = startTime ev.Event[utils.EndTime] = endTime ev.Event[utils.ProcessingTime] = endTime.Sub(startTime) diff --git a/agents/sipagent.go b/agents/sipagent.go index d06b07142..e7c73b232 100644 --- a/agents/sipagent.go +++ b/agents/sipagent.go @@ -379,6 +379,7 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor, agReq *AgentRequest) (processed bool, err error) { startTime := time.Now() + replyState := utils.OK if pass, err := sa.filterS.Pass(context.TODO(), agReq.Tenant, reqProcessor.Filters, agReq); err != nil || !pass { return pass, err @@ -417,12 +418,18 @@ func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor, sessions.ApplyFlags(reqType, reqProcessor.Flags, cgrEv.APIOpts) err = sa.connMgr.Call(context.TODO(), sa.cfg.SIPAgentCfg().SessionSConns, utils.SessionSv1AuthorizeEvent, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateAuthorize + } rply.SetMaxUsageNeeded(utils.OptAsBool(cgrEv.APIOpts, utils.MetaAccounts)) agReq.setCGRReply(rply, err) case utils.MetaEvent: rply := new(sessions.V1ProcessEventReply) err = sa.connMgr.Call(context.TODO(), sa.cfg.SIPAgentCfg().SessionSConns, utils.SessionSv1ProcessEvent, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateEvent + } // if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { // cgrEv.Event[utils.Usage] = 0 // avoid further debits // } else @@ -431,6 +438,7 @@ func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor, // } agReq.setCGRReply(rply, err) } + if err := agReq.SetFields(reqProcessor.ReplyFields); err != nil { return false, err } @@ -461,6 +469,7 @@ func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor, // asynchronously. ev := cgrEv.Clone() + ev.Event[utils.ReplyState] = replyState ev.Event[utils.StartTime] = startTime ev.Event[utils.EndTime] = endTime ev.Event[utils.ProcessingTime] = endTime.Sub(startTime) diff --git a/ers/ers.go b/ers/ers.go index 1e5964c51..93b647407 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -191,6 +191,54 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) { func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventReaderCfg) (err error) { startTime := time.Now() + replyState := utils.OK + + // Defer stats and thresholds processing to ensure it happens even with early returns. + defer func() { + endTime := time.Now() + if rdrCfg.Flags.Has(utils.MetaDryRun) { + return + } + rawStatIDs := rdrCfg.Flags.ParamValue(utils.MetaERsStats) + rawThIDs := rdrCfg.Flags.ParamValue(utils.MetaERsThresholds) + + // Early return if nothing to process. + if rawStatIDs == "" && rawThIDs == "" { + return + } + + // Clone is needed to prevent data races if requests are sent + // asynchronously. + ev := cgrEv.Clone() + + ev.Event[utils.ReplyState] = replyState + ev.Event[utils.StartTime] = startTime + ev.Event[utils.EndTime] = endTime + ev.Event[utils.ProcessingTime] = endTime.Sub(startTime) + ev.Event[utils.Source] = utils.ERs + ev.APIOpts[utils.MetaEventType] = utils.ProcessTime + + if rawStatIDs != "" { + statIDs := strings.Split(rawStatIDs, utils.ANDSep) + ev.APIOpts[utils.OptsStatsProfileIDs] = statIDs + var reply []string + if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().StatSConns, + utils.StatSv1ProcessEvent, ev, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed to process event in %s: %v", + utils.ERs, utils.StatS, err)) + } + } + if rawThIDs != "" { + thIDs := strings.Split(rawThIDs, utils.ANDSep) + ev.APIOpts[utils.OptsThresholdsProfileIDs] = thIDs + var reply []string + if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().ThresholdSConns, + utils.ThresholdSv1ProcessEvent, ev, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed to process event in %s: %v", + utils.ERs, utils.ThresholdS, err)) + } + } + }() // log the event created if requested by flags if rdrCfg.Flags.Has(utils.MetaLog) { utils.Logger.Info( @@ -223,21 +271,33 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, sessions.ApplyFlags(reqType, rdrCfg.Flags, cgrEv.APIOpts) err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1AuthorizeEvent, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateAuthorize + } case utils.MetaInitiate: rply := new(sessions.V1InitSessionReply) sessions.ApplyFlags(reqType, rdrCfg.Flags, cgrEv.APIOpts) err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1InitiateSession, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateInitiate + } case utils.MetaUpdate: rply := new(sessions.V1UpdateSessionReply) sessions.ApplyFlags(reqType, rdrCfg.Flags, cgrEv.APIOpts) err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1UpdateSession, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateUpdate + } case utils.MetaTerminate: rply := utils.StringPointer("") sessions.ApplyFlags(reqType, rdrCfg.Flags, cgrEv.APIOpts) err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1TerminateSession, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateTerminate + } case utils.MetaMessage: rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessMessage, @@ -246,12 +306,18 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, // cgrEv.Event[utils.Usage] = 0 // avoid further debits // } else if utils.OptAsBool(cgrEv.APIOpts, utils.OptsSesMessage) { + if err != nil { + replyState = utils.ErrReplyStateMessage + } cgrEv.Event[utils.Usage] = rply.MaxUsage // make sure the CDR reflects the debit } case utils.MetaEvent: rply := new(sessions.V1ProcessEventReply) err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessEvent, cgrEv, rply) + if err != nil { + replyState = utils.ErrReplyStateEvent + } case utils.MetaCDRs: // allow CDR processing case utils.MetaExport: // allow event exporting } @@ -265,6 +331,7 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, EeIDs: rdrCfg.EEsIDs, CGREvent: cgrEv, }, &reply); err != nil { + replyState = utils.ErrReplyStateExport return err } } @@ -277,6 +344,7 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, var replyCDRs string if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessCDR, cgrEv, &replyCDRs); err != nil { + replyState = utils.ErrReplyStateCDRs return err } } diff --git a/utils/consts.go b/utils/consts.go index b1d6aba5c..c3efc2ad8 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -520,36 +520,49 @@ const ( User = "User" Subscribers = "Subscribers" //Destinations = "Destinations" - MetaSubscribers = "*subscribers" - MetaDataDB = "*datadb" - MetaStorDB = "*stordb" - MetaWeight = "*weight" - MetaLC = "*lc" - MetaHC = "*hc" - MetaQOS = "*qos" - MetaReas = "*reas" - MetaReds = "*reds" - Weight = "Weight" - Limit = "Limit" - UsageTTL = "UsageTTL" - Usages = "Usages" - TTLIdx = "TTLIdx" - AllocationMessage = "AllocationMessage" - AddressPool = "AddressPool" - Pools = "Pools" - Allocations = "Allocations" - TTLIndex = "TTLIndex" - Allocation = "Allocation" - Range = "Range" - Stored = "Stored" - RatingSubject = "RatingSubject" - Categories = "Categories" - Blocker = "Blocker" - Blockers = "Blockers" - Params = "Params" - StartTime = "StartTime" - EndTime = "EndTime" - ProcessingTime = "ProcessingTime" + MetaSubscribers = "*subscribers" + MetaDataDB = "*datadb" + MetaStorDB = "*stordb" + MetaWeight = "*weight" + MetaLC = "*lc" + MetaHC = "*hc" + MetaQOS = "*qos" + MetaReas = "*reas" + MetaReds = "*reds" + Weight = "Weight" + Limit = "Limit" + UsageTTL = "UsageTTL" + Usages = "Usages" + TTLIdx = "TTLIdx" + AllocationMessage = "AllocationMessage" + AddressPool = "AddressPool" + Pools = "Pools" + Allocations = "Allocations" + TTLIndex = "TTLIndex" + Allocation = "Allocation" + Range = "Range" + Stored = "Stored" + RatingSubject = "RatingSubject" + Categories = "Categories" + Blocker = "Blocker" + Blockers = "Blockers" + Params = "Params" + StartTime = "StartTime" + EndTime = "EndTime" + ProcessingTime = "ProcessingTime" + ReplyState = "ReplyState" + + // ReplyState error constants + ErrReplyStateAuthorize = "ERR_AUTHORIZE" + ErrReplyStateInitiate = "ERR_INITIATE" + ErrReplyStateUpdate = "ERR_UPDATE" + ErrReplyStateTerminate = "ERR_TERMINATE" + ErrReplyStateMessage = "ERR_MESSAGE" + ErrReplyStateEvent = "ERR_EVENT" + ErrReplyStateCDRs = "ERR_CDRS" + ErrReplyStateExport = "ERR_EXPORT" + ErrReplyStateRadauth = "ERR_RADAUTH" + AccountSummary = "AccountSummary" RatingFilters = "RatingFilters" RatingFilter = "RatingFilter"