From 671c7474b82986e5907fe828c56ded0023668c77 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 4 Jun 2025 19:55:50 +0300 Subject: [PATCH] send agent events to stats/thresholds w/ ProcessTime applies only to template-based agents --- agents/diamagent.go | 2 ++ agents/diamagent_test.go | 30 +++++++++++++++++---- agents/dnsagent.go | 2 ++ agents/httpagent.go | 37 ++++++++++++++----------- agents/httpagent_test.go | 14 ++++++++++ agents/libagents.go | 58 +++++++++++++++++++++++++++++++++++++++- agents/radagent.go | 47 +++++++++++++++++++++++++++++++- services/httpagent.go | 5 ++-- utils/consts.go | 10 +++++++ utils/decimal_test.go | 1 - 10 files changed, 180 insertions(+), 26 deletions(-) diff --git a/agents/diamagent.go b/agents/diamagent.go index 870f038a8..ee6a7189f 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -263,6 +263,8 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { da.filterS, nil), utils.DiameterAgent, da.connMgr, da.cgrCfg.DiameterAgentCfg().SessionSConns, + da.cgrCfg.DiameterAgentCfg().StatSConns, + da.cgrCfg.DiameterAgentCfg().ThresholdSConns, da.filterS) if lclProcessed { processed = lclProcessed diff --git a/agents/diamagent_test.go b/agents/diamagent_test.go index a519a445f..cadf875e9 100644 --- a/agents/diamagent_test.go +++ b/agents/diamagent_test.go @@ -460,7 +460,11 @@ func TestProcessRequest(t *testing.T) { t.Fatal(err) } da.ctx = context.WithClient(context.TODO(), srv) - pr, err := processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, da.cgrCfg.DiameterAgentCfg().SessionSConns, da.filterS) + pr, err := processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, + da.cgrCfg.DiameterAgentCfg().SessionSConns, + da.cgrCfg.DiameterAgentCfg().StatSConns, + da.cgrCfg.DiameterAgentCfg().ThresholdSConns, + da.filterS) if err != nil { t.Error(err) } else if !pr { @@ -477,7 +481,11 @@ func TestProcessRequest(t *testing.T) { reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil) - pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, da.cgrCfg.DiameterAgentCfg().SessionSConns, da.filterS) + pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, + da.cgrCfg.DiameterAgentCfg().SessionSConns, + da.cgrCfg.DiameterAgentCfg().StatSConns, + da.cgrCfg.DiameterAgentCfg().ThresholdSConns, + da.filterS) if err != nil { t.Error(err) } else if !pr { @@ -494,7 +502,11 @@ func TestProcessRequest(t *testing.T) { reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil) - pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, da.cgrCfg.DiameterAgentCfg().SessionSConns, da.filterS) + pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, + da.cgrCfg.DiameterAgentCfg().SessionSConns, + da.cgrCfg.DiameterAgentCfg().StatSConns, + da.cgrCfg.DiameterAgentCfg().ThresholdSConns, + da.filterS) if err != nil { t.Error(err) } else if !pr { @@ -517,7 +529,11 @@ func TestProcessRequest(t *testing.T) { reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil) - pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, da.cgrCfg.DiameterAgentCfg().SessionSConns, da.filterS) + pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, + da.cgrCfg.DiameterAgentCfg().SessionSConns, + da.cgrCfg.DiameterAgentCfg().StatSConns, + da.cgrCfg.DiameterAgentCfg().ThresholdSConns, + da.filterS) if err != nil { t.Error(err) } else if !pr { @@ -534,7 +550,11 @@ func TestProcessRequest(t *testing.T) { reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil) - pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, da.cgrCfg.DiameterAgentCfg().SessionSConns, da.filterS) + pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, + da.cgrCfg.DiameterAgentCfg().SessionSConns, + da.cgrCfg.DiameterAgentCfg().StatSConns, + da.cgrCfg.DiameterAgentCfg().ThresholdSConns, + da.filterS) if err != nil { t.Error(err) } else if !pr { diff --git a/agents/dnsagent.go b/agents/dnsagent.go index f883a9f52..6896b8b6b 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -197,6 +197,8 @@ func (da *DNSAgent) handleQuestion(dnsDP utils.DataProvider, rply *dns.Msg, q *d da.fltrS, nil), utils.DNSAgent, da.connMgr, da.cgrCfg.DNSAgentCfg().SessionSConns, + da.cgrCfg.DNSAgentCfg().StatSConns, + da.cgrCfg.DNSAgentCfg().ThresholdSConns, da.fltrS); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing message: %s from %s", diff --git a/agents/httpagent.go b/agents/httpagent.go index 15e88de06..b9e250741 100644 --- a/agents/httpagent.go +++ b/agents/httpagent.go @@ -29,29 +29,34 @@ import ( ) // NewHttpAgent will construct a HTTPAgent -func NewHTTPAgent(connMgr *engine.ConnManager, sessionConns []string, +func NewHTTPAgent(connMgr *engine.ConnManager, + sessionConns, statsConns, thresholdsConns []string, filterS *engine.FilterS, dfltTenant, reqPayload, rplyPayload string, reqProcessors []*config.RequestProcessor) *HTTPAgent { return &HTTPAgent{ - connMgr: connMgr, - filterS: filterS, - dfltTenant: dfltTenant, - reqPayload: reqPayload, - rplyPayload: rplyPayload, - reqProcessors: reqProcessors, - sessionConns: sessionConns, + connMgr: connMgr, + filterS: filterS, + dfltTenant: dfltTenant, + reqPayload: reqPayload, + rplyPayload: rplyPayload, + reqProcessors: reqProcessors, + sessionConns: sessionConns, + statsConns: statsConns, + thresholdsConns: thresholdsConns, } } // HTTPAgent is a handler for HTTP requests type HTTPAgent struct { - connMgr *engine.ConnManager - filterS *engine.FilterS - dfltTenant string - reqPayload string - rplyPayload string - reqProcessors []*config.RequestProcessor - sessionConns []string + connMgr *engine.ConnManager + filterS *engine.FilterS + dfltTenant string + reqPayload string + rplyPayload string + reqProcessors []*config.RequestProcessor + sessionConns []string + statsConns []string + thresholdsConns []string } // ServeHTTP implements http.Handler interface @@ -74,7 +79,7 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) { config.CgrConfig().GeneralCfg().DefaultTimezone), ha.filterS, nil) lclProcessed, err := processRequest(context.TODO(), reqProcessor, agReq, - utils.HTTPAgent, ha.connMgr, ha.sessionConns, + utils.HTTPAgent, ha.connMgr, ha.sessionConns, ha.statsConns, ha.thresholdsConns, agReq.filterS) if err != nil { utils.Logger.Warning( diff --git a/agents/httpagent_test.go b/agents/httpagent_test.go index f24cb4b7b..d676ec08b 100644 --- a/agents/httpagent_test.go +++ b/agents/httpagent_test.go @@ -37,9 +37,13 @@ func TestNewHTTPAgent(t *testing.T) { {}, } sessionConns := []string{"conn1", "conn2"} + statsConns := []string{"conn1", "conn2"} + thresholdsConns := []string{"conn1", "conn2"} agent := NewHTTPAgent( connMgr, sessionConns, + statsConns, + thresholdsConns, filterS, dfltTenant, reqPayload, @@ -78,6 +82,16 @@ func TestNewHTTPAgent(t *testing.T) { t.Errorf("Expected sessionConns[%d] %s, got %s", i, conn, agent.sessionConns[i]) } } + for i, conn := range statsConns { + if agent.statsConns[i] != conn { + t.Errorf("Expected statsConns[%d] %s, got %s", i, conn, agent.statsConns[i]) + } + } + for i, conn := range thresholdsConns { + if agent.thresholdsConns[i] != conn { + t.Errorf("Expected thresholdsConns[%d] %s, got %s", i, conn, agent.thresholdsConns[i]) + } + } } func TestHTTPAgentServeHTTP(t *testing.T) { diff --git a/agents/libagents.go b/agents/libagents.go index b4ea0de58..6f189cd57 100644 --- a/agents/libagents.go +++ b/agents/libagents.go @@ -21,6 +21,7 @@ package agents import ( "fmt" "strings" + "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" @@ -31,7 +32,9 @@ import ( func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, agReq *AgentRequest, agentName string, connMgr *engine.ConnManager, - sessionsConns []string, filterS *engine.FilterS) (_ bool, err error) { + sessionsConns, statsConns, thConns []string, + filterS *engine.FilterS) (_ bool, err error) { + startTime := time.Now() if pass, err := filterS.Pass(agReq.Tenant, reqProcessor.Filters, agReq); err != nil || !pass { return pass, err @@ -195,6 +198,7 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, if err = agReq.SetFields(reqProcessor.ReplyFields); err != nil { return } + endTime := time.Now() if reqProcessor.Flags.Has(utils.MetaLog) { utils.Logger.Info( fmt.Sprintf("<%s> LOG, %s reply: %s", @@ -205,5 +209,57 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, fmt.Sprintf("<%s> DRY_RUN, %s reply: %s", agentName, agentName[:len(agentName)-5], agReq.Reply)) } + if reqProcessor.Flags.Has(utils.MetaDryRun) { + return true, nil + } + + var statIDs, thIDs []string + switch agentName { + case utils.DiameterAgent: + statIDs = reqProcessor.Flags.ParamsSlice(utils.MetaDAStats, utils.MetaIDs) + thIDs = reqProcessor.Flags.ParamsSlice(utils.MetaDAThresholds, utils.MetaIDs) + case utils.HTTPAgent: + statIDs = reqProcessor.Flags.ParamsSlice(utils.MetaHAStats, utils.MetaIDs) + thIDs = reqProcessor.Flags.ParamsSlice(utils.MetaHAThresholds, utils.MetaIDs) + case utils.DNSAgent: + statIDs = reqProcessor.Flags.ParamsSlice(utils.MetaDNSStats, utils.MetaIDs) + thIDs = reqProcessor.Flags.ParamsSlice(utils.MetaDNSThresholds, utils.MetaIDs) + } + + // Return early if nothing to process. + if len(statIDs) == 0 && len(thIDs) == 0 { + return true, nil + } + + // Clone is needed to prevent data races if requests are sent + // asynchronously. + ev := cgrEv.Clone() + + ev.Event[utils.StartTime] = startTime + ev.Event[utils.EndTime] = endTime + ev.Event[utils.ProcessingTime] = endTime.Sub(startTime) + ev.Event[utils.Source] = agentName + ev.APIOpts[utils.MetaEventType] = utils.ProcessTime + + if len(statIDs) > 0 { + ev.APIOpts[utils.OptsStatsProfileIDs] = statIDs + var reply []string + if err := connMgr.Call(ctx, statsConns, utils.StatSv1ProcessEvent, + ev, &reply); err != nil { + return false, fmt.Errorf("failed to process %s event in %s: %v", + agentName, utils.StatS, err) + } + // NOTE: ProfileIDs APIOpts key persists for the ThresholdS request, + // although it would be ignored. Might want to delete it. + } + if len(thIDs) > 0 { + ev.APIOpts[utils.OptsThresholdsProfileIDs] = thIDs + var reply []string + if err := connMgr.Call(ctx, thConns, utils.ThresholdSv1ProcessEvent, + ev, &reply); err != nil { + return false, fmt.Errorf("failed to process %s event in %s: %v", + agentName, utils.ThresholdS, err) + } + } return true, nil } diff --git a/agents/radagent.go b/agents/radagent.go index 53555663a..c942dd1da 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" @@ -296,6 +297,7 @@ func cacheRadiusPacket(packet *radigo.Packet, address string, cfg *config.Radius // processRequest represents one processor processing the request func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.RequestProcessor, agReq *AgentRequest, rpl *radigo.Packet) (processed bool, err error) { + startTime := time.Now() if pass, err := ra.filterS.Pass(agReq.Tenant, reqProcessor.Filters, agReq); err != nil || !pass { return pass, err @@ -452,6 +454,7 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R agReq.CGRReply.Map[utils.Error] = utils.NewLeafNode(utils.RadauthFailed) } } + // separate request so we can capture the Terminate/Event also here if reqProcessor.Flags.GetBool(utils.MetaCDRs) { var rplyCDRs string @@ -464,7 +467,7 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R if err := agReq.SetFields(reqProcessor.ReplyFields); err != nil { return false, err } - + endTime := time.Now() if reqProcessor.Flags.Has(utils.MetaLog) { utils.Logger.Info( fmt.Sprintf("<%s> LOG, Radius reply: %s", @@ -475,6 +478,48 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R fmt.Sprintf("<%s> DRY_RUN, Radius reply: %s", utils.RadiusAgent, agReq.Reply)) } + if reqProcessor.Flags.Has(utils.MetaDryRun) { + return true, nil + } + + statIDs := reqProcessor.Flags.ParamsSlice(utils.MetaRAStats, utils.MetaIDs) + thIDs := reqProcessor.Flags.ParamsSlice(utils.MetaRAThresholds, utils.MetaIDs) + + // Early return if nothing to process. + if len(statIDs) == 0 && len(thIDs) == 0 { + return true, nil + } + + // Clone is needed to prevent data races if requests are sent + // asynchronously. + ev := cgrEv.Clone() + + ev.Event[utils.StartTime] = startTime + ev.Event[utils.EndTime] = endTime + ev.Event[utils.ProcessingTime] = endTime.Sub(startTime) + ev.Event[utils.Source] = utils.RadiusAgent + ev.APIOpts[utils.MetaEventType] = utils.ProcessTime + + if len(statIDs) > 0 { + ev.APIOpts[utils.OptsStatsProfileIDs] = statIDs + var reply []string + if err := ra.connMgr.Call(ra.ctx, ra.cgrCfg.RadiusAgentCfg().StatSConns, + utils.StatSv1ProcessEvent, ev, &reply); err != nil { + return false, fmt.Errorf("failed to process %s event in %s: %v", + utils.RadiusAgent, utils.StatS, err) + } + // NOTE: ProfileIDs APIOpts key persists for the ThresholdS request, + // although it would be ignored. Might want to delete it. + } + if len(thIDs) > 0 { + ev.APIOpts[utils.OptsThresholdsProfileIDs] = thIDs + var reply []string + if err := ra.connMgr.Call(ra.ctx, ra.cgrCfg.RadiusAgentCfg().ThresholdSConns, + utils.ThresholdSv1ProcessEvent, ev, &reply); err != nil { + return false, fmt.Errorf("failed to process %s event in %s: %v", + utils.RadiusAgent, utils.ThresholdS, err) + } + } return true, nil } diff --git a/services/httpagent.go b/services/httpagent.go index b493758dd..9528acac3 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -71,8 +71,9 @@ func (ha *HTTPAgent) Start() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> successfully started HTTPAgent", utils.HTTPAgent)) for _, agntCfg := range ha.cfg.HTTPAgentCfg() { ha.server.RegisterHttpHandler(agntCfg.URL, - agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, filterS, - ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload, + agents.NewHTTPAgent(ha.connMgr, + agntCfg.SessionSConns, agntCfg.StatSConns, agntCfg.ThresholdSConns, + filterS, ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload, agntCfg.ReplyPayload, agntCfg.RequestProcessors)) } ha.Unlock() diff --git a/utils/consts.go b/utils/consts.go index a0fb1abca..96fde9b2c 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -558,6 +558,7 @@ const ( TrendUpdate = "TrendUpdate" RankingUpdate = "RankingUpdate" ResourceUpdate = "ResourceUpdate" + ProcessTime = "ProcessTime" CDR = "CDR" CDRs = "CDRs" ExpiryTime = "ExpiryTime" @@ -604,6 +605,7 @@ const ( RatingPlanID = "RatingPlanID" StartTime = "StartTime" EndTime = "EndTime" + ProcessingTime = "ProcessingTime" AccountSummary = "AccountSummary" RatingFilters = "RatingFilters" RatingFilter = "RatingFilter" @@ -721,6 +723,14 @@ const ( MetaTerminate = "*terminate" MetaEvent = "*event" MetaMessage = "*message" + MetaDAStats = "*daStats" + MetaDAThresholds = "*daThresholds" + MetaRAStats = "*raStats" + MetaRAThresholds = "*raThresholds" + MetaDNSStats = "*dnsStats" + MetaDNSThresholds = "*dnsThresholds" + MetaHAStats = "*haStats" + MetaHAThresholds = "*haThresholds" MetaDryRun = "*dryrun" MetaRALsDryRun = "*ralsDryRun" Event = "Event" diff --git a/utils/decimal_test.go b/utils/decimal_test.go index f0a836b54..1159b3630 100644 --- a/utils/decimal_test.go +++ b/utils/decimal_test.go @@ -192,7 +192,6 @@ func TestNewDecimalFromFloat64(t *testing.T) { x := 21.5 xExp, _ := new(decimal.Big).SetString(strconv.FormatFloat(x, 'f', -1, 64)) expected := &Decimal{xExp} - // fmt.Printf("%v of type %T", expected, expected) rcv := NewDecimalFromFloat64(x) if !reflect.DeepEqual(rcv, expected) { t.Errorf("Expected <+%v> but received <+%v>", xExp, rcv)