send SIPAgent ProcessTime events to stats/thresholds

This commit is contained in:
ionutboangiu
2025-06-13 18:42:55 +03:00
committed by Dan Christian Bogos
parent 5a7f278dbd
commit 64eea80f3b
2 changed files with 49 additions and 0 deletions

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"net"
"regexp"
"strings"
"sync"
"time"
@@ -377,6 +378,7 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string)
// processRequest represents one processor processing the request
func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor,
agReq *AgentRequest) (processed bool, err error) {
startTime := time.Now()
if pass, err := sa.filterS.Pass(agReq.Tenant,
reqProcessor.Filters, agReq); err != nil || !pass {
return pass, err
@@ -460,6 +462,7 @@ func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor,
if err := agReq.SetFields(reqProcessor.ReplyFields); err != nil {
return false, err
}
endTime := time.Now()
if reqProcessor.Flags.Has(utils.MetaLog) {
utils.Logger.Info(
fmt.Sprintf("<%s> LOG, SIP reply: %s",
@@ -470,5 +473,49 @@ func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor,
fmt.Sprintf("<%s> DRY_RUN, SIP reply: %s",
utils.SIPAgent, agReq.Reply))
}
if reqProcessor.Flags.Has(utils.MetaDryRun) {
return true, nil
}
rawStatIDs := reqProcessor.Flags.ParamValue(utils.MetaRAStats)
rawThIDs := reqProcessor.Flags.ParamValue(utils.MetaRAThresholds)
// Early return if nothing to process.
if rawStatIDs == "" && rawThIDs == "" {
return true, nil
}
// Clone is needed to prevent data races if requests are sent
// asynchronously.
ev := cgrEv.Clone()
ev.Event[utils.StartTime] = startTime
ev.Event[utils.EndTime] = endTime
ev.Event[utils.ProcessingTime] = endTime.Sub(startTime)
ev.Event[utils.Source] = utils.SIPAgent
ev.APIOpts[utils.MetaEventType] = utils.ProcessTime
if rawStatIDs != "" {
statIDs := strings.Split(rawStatIDs, utils.ANDSep)
ev.APIOpts[utils.OptsStatsProfileIDs] = statIDs
var reply []string
if err := sa.connMgr.Call(context.TODO(), sa.cfg.SIPAgentCfg().StatSConns,
utils.StatSv1ProcessEvent, ev, &reply); err != nil {
return false, fmt.Errorf("failed to process %s event in %s: %v",
utils.SIPAgent, utils.StatS, err)
}
// NOTE: ProfileIDs APIOpts key persists for the ThresholdS request,
// although it would be ignored. Might want to delete it.
}
if rawThIDs != "" {
thIDs := strings.Split(rawThIDs, utils.ANDSep)
ev.APIOpts[utils.OptsThresholdsProfileIDs] = thIDs
var reply []string
if err := sa.connMgr.Call(context.TODO(), sa.cfg.SIPAgentCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, ev, &reply); err != nil {
return false, fmt.Errorf("failed to process %s event in %s: %v",
utils.SIPAgent, utils.ThresholdS, err)
}
}
return true, nil
}