send agent events to stats/thresholds w/ ProcessTime

applies only to template-based agents
This commit is contained in:
ionutboangiu
2025-06-04 19:55:50 +03:00
committed by Dan Christian Bogos
parent 9b706f57ac
commit 671c7474b8
10 changed files with 180 additions and 26 deletions

View File

@@ -263,6 +263,8 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
da.filterS, nil),
utils.DiameterAgent, da.connMgr,
da.cgrCfg.DiameterAgentCfg().SessionSConns,
da.cgrCfg.DiameterAgentCfg().StatSConns,
da.cgrCfg.DiameterAgentCfg().ThresholdSConns,
da.filterS)
if lclProcessed {
processed = lclProcessed

View File

@@ -460,7 +460,11 @@ func TestProcessRequest(t *testing.T) {
t.Fatal(err)
}
da.ctx = context.WithClient(context.TODO(), srv)
pr, err := processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, da.cgrCfg.DiameterAgentCfg().SessionSConns, da.filterS)
pr, err := processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr,
da.cgrCfg.DiameterAgentCfg().SessionSConns,
da.cgrCfg.DiameterAgentCfg().StatSConns,
da.cgrCfg.DiameterAgentCfg().ThresholdSConns,
da.filterS)
if err != nil {
t.Error(err)
} else if !pr {
@@ -477,7 +481,11 @@ func TestProcessRequest(t *testing.T) {
reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant,
config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil)
pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, da.cgrCfg.DiameterAgentCfg().SessionSConns, da.filterS)
pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr,
da.cgrCfg.DiameterAgentCfg().SessionSConns,
da.cgrCfg.DiameterAgentCfg().StatSConns,
da.cgrCfg.DiameterAgentCfg().ThresholdSConns,
da.filterS)
if err != nil {
t.Error(err)
} else if !pr {
@@ -494,7 +502,11 @@ func TestProcessRequest(t *testing.T) {
reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant,
config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil)
pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, da.cgrCfg.DiameterAgentCfg().SessionSConns, da.filterS)
pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr,
da.cgrCfg.DiameterAgentCfg().SessionSConns,
da.cgrCfg.DiameterAgentCfg().StatSConns,
da.cgrCfg.DiameterAgentCfg().ThresholdSConns,
da.filterS)
if err != nil {
t.Error(err)
} else if !pr {
@@ -517,7 +529,11 @@ func TestProcessRequest(t *testing.T) {
reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant,
config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil)
pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, da.cgrCfg.DiameterAgentCfg().SessionSConns, da.filterS)
pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr,
da.cgrCfg.DiameterAgentCfg().SessionSConns,
da.cgrCfg.DiameterAgentCfg().StatSConns,
da.cgrCfg.DiameterAgentCfg().ThresholdSConns,
da.filterS)
if err != nil {
t.Error(err)
} else if !pr {
@@ -534,7 +550,11 @@ func TestProcessRequest(t *testing.T) {
reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant,
config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil)
pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr, da.cgrCfg.DiameterAgentCfg().SessionSConns, da.filterS)
pr, err = processRequest(da.ctx, reqProcessor, agReq, utils.DiameterAgent, connMgr,
da.cgrCfg.DiameterAgentCfg().SessionSConns,
da.cgrCfg.DiameterAgentCfg().StatSConns,
da.cgrCfg.DiameterAgentCfg().ThresholdSConns,
da.filterS)
if err != nil {
t.Error(err)
} else if !pr {

View File

@@ -197,6 +197,8 @@ func (da *DNSAgent) handleQuestion(dnsDP utils.DataProvider, rply *dns.Msg, q *d
da.fltrS, nil),
utils.DNSAgent, da.connMgr,
da.cgrCfg.DNSAgentCfg().SessionSConns,
da.cgrCfg.DNSAgentCfg().StatSConns,
da.cgrCfg.DNSAgentCfg().ThresholdSConns,
da.fltrS); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing message: %s from %s",

View File

@@ -29,29 +29,34 @@ import (
)
// NewHttpAgent will construct a HTTPAgent
func NewHTTPAgent(connMgr *engine.ConnManager, sessionConns []string,
func NewHTTPAgent(connMgr *engine.ConnManager,
sessionConns, statsConns, thresholdsConns []string,
filterS *engine.FilterS, dfltTenant, reqPayload, rplyPayload string,
reqProcessors []*config.RequestProcessor) *HTTPAgent {
return &HTTPAgent{
connMgr: connMgr,
filterS: filterS,
dfltTenant: dfltTenant,
reqPayload: reqPayload,
rplyPayload: rplyPayload,
reqProcessors: reqProcessors,
sessionConns: sessionConns,
connMgr: connMgr,
filterS: filterS,
dfltTenant: dfltTenant,
reqPayload: reqPayload,
rplyPayload: rplyPayload,
reqProcessors: reqProcessors,
sessionConns: sessionConns,
statsConns: statsConns,
thresholdsConns: thresholdsConns,
}
}
// HTTPAgent is a handler for HTTP requests
type HTTPAgent struct {
connMgr *engine.ConnManager
filterS *engine.FilterS
dfltTenant string
reqPayload string
rplyPayload string
reqProcessors []*config.RequestProcessor
sessionConns []string
connMgr *engine.ConnManager
filterS *engine.FilterS
dfltTenant string
reqPayload string
rplyPayload string
reqProcessors []*config.RequestProcessor
sessionConns []string
statsConns []string
thresholdsConns []string
}
// ServeHTTP implements http.Handler interface
@@ -74,7 +79,7 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) {
config.CgrConfig().GeneralCfg().DefaultTimezone),
ha.filterS, nil)
lclProcessed, err := processRequest(context.TODO(), reqProcessor, agReq,
utils.HTTPAgent, ha.connMgr, ha.sessionConns,
utils.HTTPAgent, ha.connMgr, ha.sessionConns, ha.statsConns, ha.thresholdsConns,
agReq.filterS)
if err != nil {
utils.Logger.Warning(

View File

@@ -37,9 +37,13 @@ func TestNewHTTPAgent(t *testing.T) {
{},
}
sessionConns := []string{"conn1", "conn2"}
statsConns := []string{"conn1", "conn2"}
thresholdsConns := []string{"conn1", "conn2"}
agent := NewHTTPAgent(
connMgr,
sessionConns,
statsConns,
thresholdsConns,
filterS,
dfltTenant,
reqPayload,
@@ -78,6 +82,16 @@ func TestNewHTTPAgent(t *testing.T) {
t.Errorf("Expected sessionConns[%d] %s, got %s", i, conn, agent.sessionConns[i])
}
}
for i, conn := range statsConns {
if agent.statsConns[i] != conn {
t.Errorf("Expected statsConns[%d] %s, got %s", i, conn, agent.statsConns[i])
}
}
for i, conn := range thresholdsConns {
if agent.thresholdsConns[i] != conn {
t.Errorf("Expected thresholdsConns[%d] %s, got %s", i, conn, agent.thresholdsConns[i])
}
}
}
func TestHTTPAgentServeHTTP(t *testing.T) {

View File

@@ -21,6 +21,7 @@ package agents
import (
"fmt"
"strings"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -31,7 +32,9 @@ import (
func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor,
agReq *AgentRequest, agentName string, connMgr *engine.ConnManager,
sessionsConns []string, filterS *engine.FilterS) (_ bool, err error) {
sessionsConns, statsConns, thConns []string,
filterS *engine.FilterS) (_ bool, err error) {
startTime := time.Now()
if pass, err := filterS.Pass(agReq.Tenant,
reqProcessor.Filters, agReq); err != nil || !pass {
return pass, err
@@ -195,6 +198,7 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor,
if err = agReq.SetFields(reqProcessor.ReplyFields); err != nil {
return
}
endTime := time.Now()
if reqProcessor.Flags.Has(utils.MetaLog) {
utils.Logger.Info(
fmt.Sprintf("<%s> LOG, %s reply: %s",
@@ -205,5 +209,57 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor,
fmt.Sprintf("<%s> DRY_RUN, %s reply: %s",
agentName, agentName[:len(agentName)-5], agReq.Reply))
}
if reqProcessor.Flags.Has(utils.MetaDryRun) {
return true, nil
}
var statIDs, thIDs []string
switch agentName {
case utils.DiameterAgent:
statIDs = reqProcessor.Flags.ParamsSlice(utils.MetaDAStats, utils.MetaIDs)
thIDs = reqProcessor.Flags.ParamsSlice(utils.MetaDAThresholds, utils.MetaIDs)
case utils.HTTPAgent:
statIDs = reqProcessor.Flags.ParamsSlice(utils.MetaHAStats, utils.MetaIDs)
thIDs = reqProcessor.Flags.ParamsSlice(utils.MetaHAThresholds, utils.MetaIDs)
case utils.DNSAgent:
statIDs = reqProcessor.Flags.ParamsSlice(utils.MetaDNSStats, utils.MetaIDs)
thIDs = reqProcessor.Flags.ParamsSlice(utils.MetaDNSThresholds, utils.MetaIDs)
}
// Return early if nothing to process.
if len(statIDs) == 0 && len(thIDs) == 0 {
return true, nil
}
// Clone is needed to prevent data races if requests are sent
// asynchronously.
ev := cgrEv.Clone()
ev.Event[utils.StartTime] = startTime
ev.Event[utils.EndTime] = endTime
ev.Event[utils.ProcessingTime] = endTime.Sub(startTime)
ev.Event[utils.Source] = agentName
ev.APIOpts[utils.MetaEventType] = utils.ProcessTime
if len(statIDs) > 0 {
ev.APIOpts[utils.OptsStatsProfileIDs] = statIDs
var reply []string
if err := connMgr.Call(ctx, statsConns, utils.StatSv1ProcessEvent,
ev, &reply); err != nil {
return false, fmt.Errorf("failed to process %s event in %s: %v",
agentName, utils.StatS, err)
}
// NOTE: ProfileIDs APIOpts key persists for the ThresholdS request,
// although it would be ignored. Might want to delete it.
}
if len(thIDs) > 0 {
ev.APIOpts[utils.OptsThresholdsProfileIDs] = thIDs
var reply []string
if err := connMgr.Call(ctx, thConns, utils.ThresholdSv1ProcessEvent,
ev, &reply); err != nil {
return false, fmt.Errorf("failed to process %s event in %s: %v",
agentName, utils.ThresholdS, err)
}
}
return true, nil
}

View File

@@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
@@ -296,6 +297,7 @@ func cacheRadiusPacket(packet *radigo.Packet, address string, cfg *config.Radius
// processRequest represents one processor processing the request
func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.RequestProcessor,
agReq *AgentRequest, rpl *radigo.Packet) (processed bool, err error) {
startTime := time.Now()
if pass, err := ra.filterS.Pass(agReq.Tenant,
reqProcessor.Filters, agReq); err != nil || !pass {
return pass, err
@@ -452,6 +454,7 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R
agReq.CGRReply.Map[utils.Error] = utils.NewLeafNode(utils.RadauthFailed)
}
}
// separate request so we can capture the Terminate/Event also here
if reqProcessor.Flags.GetBool(utils.MetaCDRs) {
var rplyCDRs string
@@ -464,7 +467,7 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R
if err := agReq.SetFields(reqProcessor.ReplyFields); err != nil {
return false, err
}
endTime := time.Now()
if reqProcessor.Flags.Has(utils.MetaLog) {
utils.Logger.Info(
fmt.Sprintf("<%s> LOG, Radius reply: %s",
@@ -475,6 +478,48 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R
fmt.Sprintf("<%s> DRY_RUN, Radius reply: %s",
utils.RadiusAgent, agReq.Reply))
}
if reqProcessor.Flags.Has(utils.MetaDryRun) {
return true, nil
}
statIDs := reqProcessor.Flags.ParamsSlice(utils.MetaRAStats, utils.MetaIDs)
thIDs := reqProcessor.Flags.ParamsSlice(utils.MetaRAThresholds, utils.MetaIDs)
// Early return if nothing to process.
if len(statIDs) == 0 && len(thIDs) == 0 {
return true, nil
}
// Clone is needed to prevent data races if requests are sent
// asynchronously.
ev := cgrEv.Clone()
ev.Event[utils.StartTime] = startTime
ev.Event[utils.EndTime] = endTime
ev.Event[utils.ProcessingTime] = endTime.Sub(startTime)
ev.Event[utils.Source] = utils.RadiusAgent
ev.APIOpts[utils.MetaEventType] = utils.ProcessTime
if len(statIDs) > 0 {
ev.APIOpts[utils.OptsStatsProfileIDs] = statIDs
var reply []string
if err := ra.connMgr.Call(ra.ctx, ra.cgrCfg.RadiusAgentCfg().StatSConns,
utils.StatSv1ProcessEvent, ev, &reply); err != nil {
return false, fmt.Errorf("failed to process %s event in %s: %v",
utils.RadiusAgent, utils.StatS, err)
}
// NOTE: ProfileIDs APIOpts key persists for the ThresholdS request,
// although it would be ignored. Might want to delete it.
}
if len(thIDs) > 0 {
ev.APIOpts[utils.OptsThresholdsProfileIDs] = thIDs
var reply []string
if err := ra.connMgr.Call(ra.ctx, ra.cgrCfg.RadiusAgentCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, ev, &reply); err != nil {
return false, fmt.Errorf("failed to process %s event in %s: %v",
utils.RadiusAgent, utils.ThresholdS, err)
}
}
return true, nil
}