diff --git a/agents/diam_conn_stats_test.go b/agents/diam_conn_stats_test.go index 128f36813..fdbe8c059 100644 --- a/agents/diam_conn_stats_test.go +++ b/agents/diam_conn_stats_test.go @@ -60,7 +60,7 @@ func TestDiamConnStats(t *testing.T) { "stats": { "enabled": true, "store_interval": "-1", - "string_indexed_fields": ["*req.OriginHost"] + "string_indexed_fields": ["*opts.*eventType"] }, "thresholds": { "enabled": true, @@ -83,27 +83,32 @@ func TestDiamConnStats(t *testing.T) { // LogBuffer: &bytes.Buffer{}, GracefulShutdown: true, } - // defer fmt.Println(ng.LogBuffer) + // t.Cleanup(func() { + // fmt.Println(ng.LogBuffer) + // }) client, cfg := ng.Run(t) setSQProfile := func(id, originHost, originRealm string, ttl time.Duration) { t.Helper() + fltrIDs := []string{"*string:~*opts.*eventType:ConnectionStatusReport"} + if originHost != "" { + fltrIDs = append(fltrIDs, fmt.Sprintf("*string:~*req.OriginHost:%s", originHost)) + } + if originRealm != "" { + fltrIDs = append(fltrIDs, fmt.Sprintf("*string:~*req.OriginRealm:%s", originRealm)) + } var reply string if err := client.Call(context.Background(), utils.AdminSv1SetStatQueueProfile, engine.StatQueueProfileWithAPIOpts{ StatQueueProfile: &engine.StatQueueProfile{ - Tenant: "cgrates.org", - ID: id, - FilterIDs: []string{ - "*string:~*opts.*eventType:ConnectionStatusReport", - fmt.Sprintf("*string:~*req.OriginHost:%s", originHost), - fmt.Sprintf("*string:~*req.OriginRealm:%s", originRealm), - }, + Tenant: "cgrates.org", + ID: id, + FilterIDs: fltrIDs, QueueLength: -1, TTL: ttl, Metrics: []*engine.MetricWithFilters{ { - MetricID: "*sum#~*req.ConnectionStatus", + MetricID: "*sum#~*req.ConnectionStatus{*conn_status}", }, }, Stored: true, @@ -162,7 +167,7 @@ func TestDiamConnStats(t *testing.T) { if err != nil { t.Error(err) } - metricID := "*sum#~*req.ConnectionStatus" + metricID := "*sum#~*req.ConnectionStatus{*conn_status}" got, ok := metrics[metricID] if !ok { t.Errorf("could not find metric %q", metricID) @@ -172,11 +177,13 @@ func TestDiamConnStats(t *testing.T) { } } + setSQProfile("SQ_CONN_ALL", "", "", -1) setSQProfile("SQ_CONN_1", "host1", "realm1", -1) setSQProfile("SQ_CONN_2", "host2", "realm1", -1) setSQProfile("SQ_CONN_3", "host3", "realm2", -1) // no connections have been established yet, expect -1 + checkConnStatusMetric("SQ_CONN_ALL", -1) checkConnStatusMetric("SQ_CONN_1", -1) checkConnStatusMetric("SQ_CONN_2", -1) checkConnStatusMetric("SQ_CONN_3", -1) @@ -186,6 +193,8 @@ func TestDiamConnStats(t *testing.T) { connHost1 := initDiamConn("host1", "realm1") connHost2 := initDiamConn("host2", "realm1") connHost3 := initDiamConn("host3", "realm2") + time.Sleep(10 * time.Millisecond) // wait for stats to process + checkConnStatusMetric("SQ_CONN_ALL", 3) checkConnStatusMetric("SQ_CONN_1", 1) checkConnStatusMetric("SQ_CONN_2", 1) checkConnStatusMetric("SQ_CONN_3", 1) @@ -199,6 +208,7 @@ func TestDiamConnStats(t *testing.T) { // Ensure periodic health check happens. time.Sleep(100 * time.Millisecond) + checkConnStatusMetric("SQ_CONN_ALL", 0) checkConnStatusMetric("SQ_CONN_1", 0) checkConnStatusMetric("SQ_CONN_2", 0) checkConnStatusMetric("SQ_CONN_3", 0) @@ -206,9 +216,11 @@ func TestDiamConnStats(t *testing.T) { // restart connection from host1 connHost1 = initDiamConn("host1", "realm1") + t.Cleanup(func() { connHost1.Close() }) + time.Sleep(10 * time.Millisecond) // wait for stats to process + checkConnStatusMetric("SQ_CONN_ALL", 1) checkConnStatusMetric("SQ_CONN_1", 1) checkConnStatusMetric("SQ_CONN_2", 0) checkConnStatusMetric("SQ_CONN_3", 0) - t.Cleanup(func() { connHost1.Close() }) // scrapePromURL(t) } diff --git a/agents/diamagent.go b/agents/diamagent.go index b71dd77e2..4adbaa7d7 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -449,18 +449,8 @@ func (da *DiameterAgent) handleRAA(c diam.Conn, m *diam.Message) { ch <- m } -const ( - // Connection status values for ConnectionStatus event field: - // -1 : connection closed/down - // 0 : duplicate connection (no metric change) - // 1 : new connection established/up - diamConnStatusDown = -1 - diamConnStatusDuplicate = 0 - diamConnStatusUp = 1 -) - // sendConnStatusReport reports connection status changes to StatS and ThresholdS. -func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status int, localAddr, remoteAddr net.Addr) { +func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status string, localAddr, remoteAddr net.Addr) { daCfg := da.cgrCfg.DiameterAgentCfg() if len(daCfg.StatSConns) == 0 && len(daCfg.ThresholdSConns) == 0 { return // nothing to do @@ -515,11 +505,11 @@ func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) { } key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm) da.peersLck.Lock() - diamConnStatus := diamConnStatusUp + connStatus := utils.ConnStatusUp if _, exists := da.peers[key]; exists { - // Connection already exists for this peer. Set status to 0 (duplicate) + // Connection already exists for this peer. Set status to DUPLICATE // to prevent incrementing StatS metrics. - diamConnStatus = diamConnStatusDuplicate + connStatus = utils.ConnStatusDuplicate utils.Logger.Warning(fmt.Sprintf( "<%s> a connection from a peer with the same ID (%q) is already registered, overwriting...", @@ -528,7 +518,7 @@ func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) { da.peers[key] = c da.peersLck.Unlock() localAddr, remoteAddr := c.LocalAddr(), c.RemoteAddr() - da.sendConnStatusReport(meta, diamConnStatus, localAddr, remoteAddr) + da.sendConnStatusReport(meta, connStatus, localAddr, remoteAddr) go func() { // Use hybrid approach to detect connection closure. CloseNotify() may not // fire if the serve() goroutine is blocked in Read(), so we also perform @@ -538,7 +528,7 @@ func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) { da.peersLck.Lock() delete(da.peers, key) da.peersLck.Unlock() - da.sendConnStatusReport(meta, diamConnStatusDown, localAddr, remoteAddr) + da.sendConnStatusReport(meta, utils.ConnStatusDown, localAddr, remoteAddr) }() closeChan := c.(diam.CloseNotifier).CloseNotify() diff --git a/engine/statmetrics.go b/engine/statmetrics.go index 1466e4aa4..ee91570a2 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -772,41 +772,37 @@ func (m *Metric) Equal(v *Metric) bool { } func NewStatSum(minItems uint64, fieldName string, filterIDs []string) StatMetric { - return &StatSum{Metric: NewMetric(minItems, filterIDs), - FieldName: fieldName} + flds, _ := utils.NewRSRParsers(fieldName, utils.InfieldSep) + return &StatSum{ + Metric: NewMetric(minItems, filterIDs), + Fields: flds, + } } type StatSum struct { *Metric - FieldName string + Fields utils.RSRParsers } func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) error { - ival, err := utils.DPDynamicInterface(sum.FieldName, ev) + ival, err := sum.Fields.ParseDataProvider(ev) if err != nil { - if err == utils.ErrNotFound { - err = utils.ErrPrefix(err, sum.FieldName) - } return err } return sum.addEvent(evID, ival) } func (sum *StatSum) AddOneEvent(ev utils.DataProvider) error { - ival, err := utils.DPDynamicInterface(sum.FieldName, ev) + ival, err := sum.Fields.ParseDataProvider(ev) if err != nil { - if err == utils.ErrNotFound { - err = utils.ErrPrefix(err, sum.FieldName) - } return err } - return sum.addOneEvent(ival) } func (sum *StatSum) Clone() StatMetric { return &StatSum{ - Metric: sum.Metric.Clone(), - FieldName: sum.FieldName, + Metric: sum.Metric.Clone(), + Fields: sum.Fields, } } diff --git a/engine/statmetrics_test.go b/engine/statmetrics_test.go index 0b72c3f40..f157615ef 100644 --- a/engine/statmetrics_test.go +++ b/engine/statmetrics_test.go @@ -2114,7 +2114,8 @@ func TestStatSumGetFloat64Value(t *testing.T) { t.Errorf("wrong statSum value: %v", v) } ev2 := &utils.CGREvent{ID: "EVENT_2"} - if err := statSum.AddEvent(ev2.ID, utils.MapStorage{utils.MetaOpts: ev2.APIOpts}); err == nil || err.Error() != "NOT_FOUND:~*opts.*cost" { + if err := statSum.AddEvent(ev2.ID, utils.MapStorage{utils.MetaOpts: ev2.APIOpts}); err == nil || + err != utils.ErrNotFound { t.Error(err) } if v := statSum.GetValue(); v != utils.DecimalNaN { @@ -2233,26 +2234,41 @@ func TestStatSumGetStringValue2(t *testing.T) { } func TestStatSumGetStringValue3(t *testing.T) { - statSum := &StatSum{Metric: NewMetric(2, nil), FieldName: "~*opts.*cost"} + statSum := &StatSum{ + Metric: NewMetric(2, nil), + Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep), + } expected := &StatSum{ Metric: &Metric{ Events: map[string]*DecimalWithCompress{ - "EVENT_1": {Stat: utils.NewDecimalFromStringIgnoreError("12.20000000000000"), CompressFactor: 2}, - "EVENT_3": {Stat: utils.NewDecimalFromStringIgnoreError("18.300000000000000710542735760100185871124267578125"), CompressFactor: 1}, + "EVENT_1": {Stat: utils.NewDecimalFromStringIgnoreError("12.2"), CompressFactor: 2}, + "EVENT_3": {Stat: utils.NewDecimalFromStringIgnoreError("18.3"), CompressFactor: 1}, }, MinItems: 2, Count: 3, - Value: utils.NewDecimalFromStringIgnoreError("42.700"), + Value: utils.NewDecimalFromStringIgnoreError("42.7"), }, - FieldName: "~*opts.*cost", + Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep), } expected.GetStringValue(config.CgrConfig().GeneralCfg().RoundingDecimals) - ev1 := &utils.CGREvent{ID: "EVENT_1", - APIOpts: map[string]any{utils.MetaCost: 18.2}} - ev2 := &utils.CGREvent{ID: "EVENT_1", - APIOpts: map[string]any{utils.MetaCost: 6.2}} - ev3 := &utils.CGREvent{ID: "EVENT_3", - APIOpts: map[string]any{utils.MetaCost: 18.3}} + ev1 := &utils.CGREvent{ + ID: "EVENT_1", + APIOpts: map[string]any{ + utils.MetaCost: utils.NewDecimal(182, 1), + }, + } + ev2 := &utils.CGREvent{ + ID: "EVENT_1", + APIOpts: map[string]any{ + utils.MetaCost: utils.NewDecimal(62, 1), + }, + } + ev3 := &utils.CGREvent{ + ID: "EVENT_3", + APIOpts: map[string]any{ + utils.MetaCost: utils.NewDecimal(183, 1), + }, + } if err := statSum.AddEvent(ev1.ID, utils.MapStorage{utils.MetaOpts: ev1.APIOpts}); err != nil { t.Error(err) } @@ -2278,26 +2294,41 @@ func TestStatSumGetStringValue3(t *testing.T) { } func TestStatSumCompress(t *testing.T) { - sum := &StatSum{Metric: NewMetric(2, nil), FieldName: "~*opts.*cost"} + sum := &StatSum{ + Metric: NewMetric(2, nil), + Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep), + } expected := &StatSum{ Metric: &Metric{ Events: map[string]*DecimalWithCompress{ - "EVENT_1": {Stat: utils.NewDecimalFromStringIgnoreError("18.199999999999999289457264239899814128875732421875"), CompressFactor: 1}, - "EVENT_2": {Stat: utils.NewDecimalFromStringIgnoreError("6.20000000000000017763568394002504646778106689453125"), CompressFactor: 1}, + "EVENT_1": {Stat: utils.NewDecimalFromStringIgnoreError("18.2"), CompressFactor: 1}, + "EVENT_2": {Stat: utils.NewDecimalFromStringIgnoreError("6.2"), CompressFactor: 1}, }, MinItems: 2, - Value: utils.NewDecimalFromStringIgnoreError("24.400"), + Value: utils.NewDecimalFromStringIgnoreError("24.4"), Count: 2, }, - FieldName: "~*opts.*cost", + Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep), } expected.GetStringValue(config.CgrConfig().GeneralCfg().RoundingDecimals) - ev := &utils.CGREvent{ID: "EVENT_1", - APIOpts: map[string]any{utils.MetaCost: 18.2}} - ev2 := &utils.CGREvent{ID: "EVENT_2", - APIOpts: map[string]any{utils.MetaCost: 6.2}} - ev4 := &utils.CGREvent{ID: "EVENT_1", - APIOpts: map[string]any{utils.MetaCost: 18.3}} + ev := &utils.CGREvent{ + ID: "EVENT_1", + APIOpts: map[string]any{ + utils.MetaCost: utils.NewDecimal(182, 1), + }, + } + ev2 := &utils.CGREvent{ + ID: "EVENT_2", + APIOpts: map[string]any{ + utils.MetaCost: utils.NewDecimal(62, 1), + }, + } + ev4 := &utils.CGREvent{ + ID: "EVENT_1", + APIOpts: map[string]any{ + utils.MetaCost: utils.NewDecimal(183, 1), + }, + } sum.AddEvent(ev.ID, utils.MapStorage{utils.MetaOpts: ev.APIOpts}) sum.AddEvent(ev2.ID, utils.MapStorage{utils.MetaOpts: ev2.APIOpts}) expIDs := []string{"EVENT_1", "EVENT_2"} @@ -2319,7 +2350,7 @@ func TestStatSumCompress(t *testing.T) { Value: utils.NewDecimalFromFloat64(24.4), Count: 2, }, - FieldName: "~*opts.*cost", + Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep), } expected.GetStringValue(config.CgrConfig().GeneralCfg().RoundingDecimals) expIDs = []string{"EVENT_3"} @@ -3013,7 +3044,7 @@ func TestStatSumMarshal(t *testing.T) { utils.MetaUsage: 10 * time.Second}} statSum.AddEvent(ev.ID, utils.MapStorage{utils.MetaOpts: ev.APIOpts}) var nstatSum StatSum - expected := []byte(`{"Value":20,"Count":1,"Events":{"EVENT_1":{"Stat":20,"CompressFactor":1}},"MinItems":2,"FilterIDs":null,"FieldName":"~*opts.*cost"}`) + expected := []byte(`{"Value":20,"Count":1,"Events":{"EVENT_1":{"Stat":20,"CompressFactor":1}},"MinItems":2,"FilterIDs":null,"Fields":[{"Rules":"~*opts.*cost","Path":"~*opts.*cost"}]}`) if b, err := jMarshaler.Marshal(statSum); err != nil { t.Error(err) } else if !reflect.DeepEqual(expected, b) { @@ -3704,9 +3735,10 @@ func TestStatPDDClone(t *testing.T) { } func TestStatSumClone(t *testing.T) { - - sum := &StatSum{Metric: NewMetric(2, nil), FieldName: "~*opts.*cost"} - + sum := &StatSum{ + Metric: NewMetric(2, nil), + Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep), + } if rcv := sum.Clone(); !reflect.DeepEqual(rcv, sum) { t.Errorf("Expecting <%+v>,\n Recevied <%+v>", sum, rcv) } diff --git a/utils/consts.go b/utils/consts.go index bbf088ee8..57731ff70 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -560,9 +560,12 @@ const ( EventConnectionStatusReport = "ConnectionStatusReport" // Connection status event fields. - ConnLocalAddr = "LocalAddr" - ConnRemoteAddr = "RemoteAddr" - ConnStatus = "ConnectionStatus" // -1=down, 0=duplicate, 1=up + ConnLocalAddr = "LocalAddr" + ConnRemoteAddr = "RemoteAddr" + ConnStatus = "ConnectionStatus" // sum metric: UP=1, DOWN=-1, DUPLICATE=0 + ConnStatusUp = "UP" + ConnStatusDown = "DOWN" + ConnStatusDuplicate = "DUPLICATE" // ReplyState error constants ErrReplyStateAuthorize = "ERR_AUTHORIZE" @@ -757,6 +760,7 @@ const ( MetaSIPURIMethod = "*sipuri_method" MetaSIPURIHost = "*sipuri_host" MetaSIPURIUser = "*sipuri_user" + MetaConnStatus = "*conn_status" E164DomainConverter = "*e164Domain" E164Converter = "*e164" MetaJoin = "*join" diff --git a/utils/dataconverter.go b/utils/dataconverter.go index b46ad63f2..05a9f7e36 100644 --- a/utils/dataconverter.go +++ b/utils/dataconverter.go @@ -142,6 +142,8 @@ func NewDataConverter(params string) (conv DataConverter, err error) { return splitConverter(params[len(MetaSplit)+1:]), nil case strings.HasPrefix(params, MetaStrip): return NewStripConverter(params) + case params == MetaConnStatus: + return ConnStatusConverter{}, nil case strings.HasPrefix(params, MetaGigawords): return new(GigawordsConverter), nil default: @@ -838,3 +840,21 @@ func (GigawordsConverter) Convert(in any) (any, error) { totalOctects := (gigawordsValue * int64(math.Pow(2, 32))) // 2^32 return totalOctects, nil } + +// ConnStatusConverter converts connection status strings to numeric values. +// Returns 1 for UP, -1 for DOWN, and 0 for DUPLICATE. +type ConnStatusConverter struct{} + +// Convert implements DataConverter interface +func (c ConnStatusConverter) Convert(in any) (any, error) { + status := IfaceAsString(in) + switch status { + case ConnStatusUp: + return 1, nil + case ConnStatusDown: + return -1, nil + case ConnStatusDuplicate: + return 0, nil + } + return 0, fmt.Errorf("unsupported connection status: %q", status) +}