cfg: add stats/thresholds_conns to sip_agent

This commit is contained in:
ionutboangiu
2025-06-13 18:32:35 +03:00
committed by Dan Christian Bogos
parent 4635069702
commit 5a7f278dbd
6 changed files with 101 additions and 51 deletions

View File

@@ -1312,6 +1312,8 @@ const CGRATES_CFG_JSON = `
"listen": "127.0.0.1:5060", // address where to listen for SIP requests <x.y.z.y:1234>
"listen_net": "udp", // network to listen on <udp|tcp|tcp-tls>
"sessions_conns": ["*internal"],
"stats_conns": [], // connections to StatS, empty to disable: <""|*internal|$rpc_conns_id>
"thresholds_conns": [], // connections to ThresholdS, empty to disable: <""|*internal|$rpc_conns_id>
"timezone": "", // timezone of the events if not specified <UTC|Local|$IANA_TZ_DB>
"retransmission_timer": "1s", // the duration to wait to receive an ACK before resending the reply
"request_processors": [] // request processors to be applied to SIP messages

File diff suppressed because one or more lines are too long

View File

@@ -621,6 +621,22 @@ func (cfg *CGRConfig) checkConfigSanity() error {
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.SIPAgent, connID)
}
}
for _, connID := range cfg.sipAgentCfg.StatSConns {
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.statsCfg.Enabled {
return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.StatS, utils.SIPAgent)
}
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.SIPAgent, connID)
}
}
for _, connID := range cfg.sipAgentCfg.ThresholdSConns {
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.thresholdSCfg.Enabled {
return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.ThresholdS, utils.SIPAgent)
}
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.SIPAgent, connID)
}
}
for _, req := range cfg.sipAgentCfg.RequestProcessors {
for _, field := range req.RequestFields {
if field.Type != utils.MetaNone && field.Path == utils.EmptyString {

View File

@@ -937,13 +937,15 @@ type STIRJsonCfg struct {
// SIPAgentJsonCfg
type SIPAgentJsonCfg struct {
Enabled *bool
Listen *string
Listen_net *string
Sessions_conns *[]string
Timezone *string
Retransmission_timer *string
Request_processors *[]*ReqProcessorJsnCfg
Enabled *bool `json:"enabled"`
Listen *string `json:"listen"`
ListenNet *string `json:"listen_net"`
SessionSConns *[]string `json:"sessions_conns"`
StatSConns *[]string `json:"stats_conns"`
ThresholdSConns *[]string `json:"thresholds_conns"`
Timezone *string `json:"timezone"`
RetransmissionTimer *string `json:"retransmission_timer"`
RequestProcessors *[]*ReqProcessorJsnCfg `json:"request_processors"`
}
type JanusAgentJsonCfg struct {

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
import (
"slices"
"time"
"github.com/cgrates/cgrates/utils"
@@ -30,6 +31,8 @@ type SIPAgentCfg struct {
Listen string
ListenNet string // udp or tcp
SessionSConns []string
StatSConns []string
ThresholdSConns []string
Timezone string
RetransmissionTimer time.Duration // timeout replies if not reaching back
RequestProcessors []*RequestProcessor
@@ -42,8 +45,8 @@ func (sa *SIPAgentCfg) loadFromJSONCfg(jsnCfg *SIPAgentJsonCfg, sep string) (err
if jsnCfg.Enabled != nil {
sa.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Listen_net != nil {
sa.ListenNet = *jsnCfg.Listen_net
if jsnCfg.ListenNet != nil {
sa.ListenNet = *jsnCfg.ListenNet
}
if jsnCfg.Listen != nil {
sa.Listen = *jsnCfg.Listen
@@ -51,9 +54,9 @@ func (sa *SIPAgentCfg) loadFromJSONCfg(jsnCfg *SIPAgentJsonCfg, sep string) (err
if jsnCfg.Timezone != nil {
sa.Timezone = *jsnCfg.Timezone
}
if jsnCfg.Sessions_conns != nil {
sa.SessionSConns = make([]string, len(*jsnCfg.Sessions_conns))
for idx, connID := range *jsnCfg.Sessions_conns {
if jsnCfg.SessionSConns != nil {
sa.SessionSConns = make([]string, len(*jsnCfg.SessionSConns))
for idx, connID := range *jsnCfg.SessionSConns {
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
sa.SessionSConns[idx] = connID
if connID == utils.MetaInternal {
@@ -61,13 +64,19 @@ func (sa *SIPAgentCfg) loadFromJSONCfg(jsnCfg *SIPAgentJsonCfg, sep string) (err
}
}
}
if jsnCfg.Retransmission_timer != nil {
if sa.RetransmissionTimer, err = utils.ParseDurationWithNanosecs(*jsnCfg.Retransmission_timer); err != nil {
if jsnCfg.StatSConns != nil {
sa.StatSConns = tagInternalConns(*jsnCfg.StatSConns, utils.MetaStats)
}
if jsnCfg.ThresholdSConns != nil {
sa.ThresholdSConns = tagInternalConns(*jsnCfg.ThresholdSConns, utils.MetaThresholds)
}
if jsnCfg.RetransmissionTimer != nil {
if sa.RetransmissionTimer, err = utils.ParseDurationWithNanosecs(*jsnCfg.RetransmissionTimer); err != nil {
return err
}
}
if jsnCfg.Request_processors != nil {
for _, reqProcJsn := range *jsnCfg.Request_processors {
if jsnCfg.RequestProcessors != nil {
for _, reqProcJsn := range *jsnCfg.RequestProcessors {
rp := new(RequestProcessor)
var haveID bool
for _, rpSet := range sa.RequestProcessors {
@@ -89,21 +98,21 @@ func (sa *SIPAgentCfg) loadFromJSONCfg(jsnCfg *SIPAgentJsonCfg, sep string) (err
}
// AsMapInterface returns the config as a map[string]any
func (sa *SIPAgentCfg) AsMapInterface(separator string) (initialMP map[string]any) {
initialMP = map[string]any{
utils.EnabledCfg: sa.Enabled,
utils.ListenCfg: sa.Listen,
utils.ListenNetCfg: sa.ListenNet,
utils.TimezoneCfg: sa.Timezone,
utils.RetransmissionTimerCfg: sa.RetransmissionTimer,
}
func (sa *SIPAgentCfg) AsMapInterface(separator string) map[string]any {
requestProcessors := make([]map[string]any, len(sa.RequestProcessors))
for i, item := range sa.RequestProcessors {
requestProcessors[i] = item.AsMapInterface(separator)
}
initialMP[utils.RequestProcessorsCfg] = requestProcessors
m := map[string]any{
utils.EnabledCfg: sa.Enabled,
utils.ListenCfg: sa.Listen,
utils.ListenNetCfg: sa.ListenNet,
utils.StatSConnsCfg: stripInternalConns(sa.StatSConns),
utils.ThresholdSConnsCfg: stripInternalConns(sa.ThresholdSConns),
utils.TimezoneCfg: sa.Timezone,
utils.RetransmissionTimerCfg: sa.RetransmissionTimer,
utils.RequestProcessorsCfg: requestProcessors,
}
if sa.SessionSConns != nil {
sessionSConns := make([]string, len(sa.SessionSConns))
for i, item := range sa.SessionSConns {
@@ -112,29 +121,28 @@ func (sa *SIPAgentCfg) AsMapInterface(separator string) (initialMP map[string]an
sessionSConns[i] = utils.MetaInternal
}
}
initialMP[utils.SessionSConnsCfg] = sessionSConns
m[utils.SessionSConnsCfg] = sessionSConns
}
return
return m
}
// Clone returns a deep copy of SIPAgentCfg
func (sa SIPAgentCfg) Clone() (cln *SIPAgentCfg) {
cln = &SIPAgentCfg{
func (sa SIPAgentCfg) Clone() *SIPAgentCfg {
clone := &SIPAgentCfg{
Enabled: sa.Enabled,
Listen: sa.Listen,
ListenNet: sa.ListenNet,
SessionSConns: slices.Clone(sa.SessionSConns),
StatSConns: slices.Clone(sa.StatSConns),
ThresholdSConns: slices.Clone(sa.ThresholdSConns),
Timezone: sa.Timezone,
RetransmissionTimer: sa.RetransmissionTimer,
}
if sa.SessionSConns != nil {
cln.SessionSConns = make([]string, len(sa.SessionSConns))
copy(cln.SessionSConns, sa.SessionSConns)
}
if sa.RequestProcessors != nil {
cln.RequestProcessors = make([]*RequestProcessor, len(sa.RequestProcessors))
clone.RequestProcessors = make([]*RequestProcessor, len(sa.RequestProcessors))
for i, rp := range sa.RequestProcessors {
cln.RequestProcessors[i] = rp.Clone()
clone.RequestProcessors[i] = rp.Clone()
}
}
return
return clone
}

View File

@@ -27,13 +27,15 @@ import (
func TestSIPAgentCfgloadFromJsonCfgCase1(t *testing.T) {
cfgJSONS := &SIPAgentJsonCfg{
Enabled: utils.BoolPointer(true),
Listen: utils.StringPointer("127.0.0.1:5060"),
Listen_net: utils.StringPointer("udp"),
Sessions_conns: &[]string{utils.MetaInternal},
Timezone: utils.StringPointer("local"),
Retransmission_timer: utils.StringPointer("1"),
Request_processors: &[]*ReqProcessorJsnCfg{
Enabled: utils.BoolPointer(true),
Listen: utils.StringPointer("127.0.0.1:5060"),
ListenNet: utils.StringPointer("udp"),
SessionSConns: &[]string{utils.MetaInternal},
StatSConns: &[]string{utils.MetaInternal},
ThresholdSConns: &[]string{utils.MetaInternal},
Timezone: utils.StringPointer("local"),
RetransmissionTimer: utils.StringPointer("1"),
RequestProcessors: &[]*ReqProcessorJsnCfg{
{
ID: utils.StringPointer("OutboundAUTHDryRun"),
Filters: &[]string{"*string:~*req.request_type:OutboundAUTH", "*string:~*req.Msisdn:497700056231"},
@@ -56,6 +58,8 @@ func TestSIPAgentCfgloadFromJsonCfgCase1(t *testing.T) {
Listen: "127.0.0.1:5060",
ListenNet: "udp",
SessionSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS)},
StatSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)},
ThresholdSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)},
Timezone: "local",
RetransmissionTimer: 1,
RequestProcessors: []*RequestProcessor{
@@ -90,7 +94,7 @@ func TestSIPAgentCfgloadFromJsonCfgCase1(t *testing.T) {
func TestSIPAgentCfgloadFromJsonCfgCase2(t *testing.T) {
cfgJSON := &SIPAgentJsonCfg{
Retransmission_timer: utils.StringPointer("1ss"),
RetransmissionTimer: utils.StringPointer("1ss"),
}
expected := "time: unknown unit \"ss\" in duration \"1ss\""
jsonCfg := NewDefaultCGRConfig()
@@ -110,7 +114,7 @@ func TestSIPAgentCfgloadFromJsonCfgCase4(t *testing.T) {
},
}`
sipAgent := &SIPAgentJsonCfg{
Request_processors: &[]*ReqProcessorJsnCfg{{
RequestProcessors: &[]*ReqProcessorJsnCfg{{
ID: utils.StringPointer("randomID"),
}},
}
@@ -132,7 +136,7 @@ func TestSIPAgentCfgloadFromJsonCfgCase4(t *testing.T) {
func TestSIPAgentCfgloadFromJsonCfgCase5(t *testing.T) {
sipAgent := &SIPAgentJsonCfg{
Request_processors: &[]*ReqProcessorJsnCfg{{
RequestProcessors: &[]*ReqProcessorJsnCfg{{
Tenant: utils.StringPointer("a{*"),
}},
}
@@ -150,6 +154,8 @@ func TestSIPAgentCfgAsMapInterface(t *testing.T) {
"listen": "127.0.0.1:5060",
"listen_net": "udp",
"sessions_conns": ["*internal"],
"stats_conns": ["*internal"],
"thresholds_conns": ["*internal"],
"timezone": "",
"retransmission_timer": "2s",
"request_processors": [
@@ -161,6 +167,8 @@ func TestSIPAgentCfgAsMapInterface(t *testing.T) {
utils.ListenCfg: "127.0.0.1:5060",
utils.ListenNetCfg: "udp",
utils.SessionSConnsCfg: []string{"*internal"},
utils.StatSConnsCfg: []string{"*internal"},
utils.ThresholdSConnsCfg: []string{"*internal"},
utils.TimezoneCfg: "",
utils.RetransmissionTimerCfg: 2 * time.Second,
utils.RequestProcessorsCfg: []map[string]any{},
@@ -179,6 +187,8 @@ func TestSIPAgentCfgAsMapInterface1(t *testing.T) {
"listen": "127.0.0.1:5060",
"listen_net": "udp",
"sessions_conns": ["*internal"],
"stats_conns": ["*internal"],
"thresholds_conns": ["*internal"],
"timezone": "UTC",
"retransmission_timer": "5s",
"request_processors": [
@@ -211,6 +221,8 @@ func TestSIPAgentCfgAsMapInterface1(t *testing.T) {
utils.ListenCfg: "127.0.0.1:5060",
utils.ListenNetCfg: "udp",
utils.SessionSConnsCfg: []string{"*internal"},
utils.StatSConnsCfg: []string{"*internal"},
utils.ThresholdSConnsCfg: []string{"*internal"},
utils.TimezoneCfg: "UTC",
utils.RetransmissionTimerCfg: 5 * time.Second,
utils.RequestProcessorsCfg: []map[string]any{
@@ -244,6 +256,8 @@ func TestSIPAgentCfgAsMapInterface2(t *testing.T) {
"enabled": true,
"listen": "",
"sessions_conns": ["*conn1", "*conn2"],
"stats_conns": ["*conn1", "*conn2"],
"thresholds_conns": ["*conn1", "*conn2"],
"request_processors": [
{
"id": "Register",
@@ -265,6 +279,8 @@ func TestSIPAgentCfgAsMapInterface2(t *testing.T) {
utils.ListenCfg: "",
utils.ListenNetCfg: "udp",
utils.SessionSConnsCfg: []string{"*conn1", "*conn2"},
utils.StatSConnsCfg: []string{"*conn1", "*conn2"},
utils.ThresholdSConnsCfg: []string{"*conn1", "*conn2"},
utils.TimezoneCfg: "",
utils.RetransmissionTimerCfg: time.Second,
utils.RequestProcessorsCfg: []map[string]any{
@@ -294,6 +310,8 @@ func TestSIPAgentCfgClone(t *testing.T) {
Listen: "127.0.0.1:5060",
ListenNet: "udp",
SessionSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS)},
StatSConns: []string{},
ThresholdSConns: []string{},
Timezone: "UTC",
RetransmissionTimer: 1,
RequestProcessors: []*RequestProcessor{