From b54e3b54ac674de5cf9183bf7d5c353a2287ae66 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 15 Oct 2025 10:36:50 +0300 Subject: [PATCH] refactor StatSum to use RSRParsers and add conn_status converter --- agents/diam_conn_stats_test.go | 36 ++++++++++----- engine/statmetrics.go | 46 +++++-------------- engine/statmetrics_test.go | 46 +++++++++++-------- .../offline_internal_apis_it_test.go | 2 +- utils/consts.go | 1 + utils/dataconverter.go | 20 ++++++++ 6 files changed, 85 insertions(+), 66 deletions(-) diff --git a/agents/diam_conn_stats_test.go b/agents/diam_conn_stats_test.go index e28c7611d..eaa463466 100644 --- a/agents/diam_conn_stats_test.go +++ b/agents/diam_conn_stats_test.go @@ -60,7 +60,7 @@ func TestDiamConnStats(t *testing.T) { "stats": { "enabled": true, "store_interval": "-1", - "string_indexed_fields": ["*req.OriginHost"] + "string_indexed_fields": ["*opts.*eventType"] }, "thresholds": { "enabled": true, @@ -82,27 +82,32 @@ func TestDiamConnStats(t *testing.T) { // LogBuffer: &bytes.Buffer{}, GracefulShutdown: true, } - // defer fmt.Println(ng.LogBuffer) + // t.Cleanup(func() { + // fmt.Println(ng.LogBuffer) + // }) client, cfg := ng.Run(t) setSQProfile := func(id, originHost, originRealm string, ttl time.Duration) { t.Helper() + fltrIDs := []string{"*string:~*opts.*eventType:ConnectionStatusReport"} + if originHost != "" { + fltrIDs = append(fltrIDs, fmt.Sprintf("*string:~*req.OriginHost:%s", originHost)) + } + if originRealm != "" { + fltrIDs = append(fltrIDs, fmt.Sprintf("*string:~*req.OriginRealm:%s", originRealm)) + } var reply string if err := client.Call(context.Background(), utils.APIerSv1SetStatQueueProfile, engine.StatQueueProfileWithAPIOpts{ StatQueueProfile: &engine.StatQueueProfile{ - Tenant: "cgrates.org", - ID: id, - FilterIDs: []string{ - "*string:~*opts.*eventType:ConnectionStatusReport", - fmt.Sprintf("*string:~*req.OriginHost:%s", originHost), - fmt.Sprintf("*string:~*req.OriginRealm:%s", originRealm), - }, + Tenant: "cgrates.org", + ID: id, + FilterIDs: fltrIDs, QueueLength: -1, TTL: ttl, Metrics: []*engine.MetricWithFilters{ { - MetricID: "*sum#~*req.ConnectionStatus", + MetricID: "*sum#~*req.ConnectionStatus{*conn_status}", }, }, Stored: true, @@ -157,7 +162,7 @@ func TestDiamConnStats(t *testing.T) { if err != nil { t.Error(err) } - metricID := "*sum#~*req.ConnectionStatus" + metricID := "*sum#~*req.ConnectionStatus{*conn_status}" got, ok := metrics[metricID] if !ok { t.Errorf("could not find metric %q", metricID) @@ -167,11 +172,13 @@ func TestDiamConnStats(t *testing.T) { } } + setSQProfile("SQ_CONN_ALL", "", "", -1) setSQProfile("SQ_CONN_1", "host1", "realm1", -1) setSQProfile("SQ_CONN_2", "host2", "realm1", -1) setSQProfile("SQ_CONN_3", "host3", "realm2", -1) // no connections have been established yet, expect -1 + checkConnStatusMetric("SQ_CONN_ALL", -1) checkConnStatusMetric("SQ_CONN_1", -1) checkConnStatusMetric("SQ_CONN_2", -1) checkConnStatusMetric("SQ_CONN_3", -1) @@ -181,6 +188,8 @@ func TestDiamConnStats(t *testing.T) { connHost1 := initDiamConn("host1", "realm1") connHost2 := initDiamConn("host2", "realm1") connHost3 := initDiamConn("host3", "realm2") + time.Sleep(10 * time.Millisecond) // wait for stats to process + checkConnStatusMetric("SQ_CONN_ALL", 3) checkConnStatusMetric("SQ_CONN_1", 1) checkConnStatusMetric("SQ_CONN_2", 1) checkConnStatusMetric("SQ_CONN_3", 1) @@ -194,6 +203,7 @@ func TestDiamConnStats(t *testing.T) { // Ensure periodic health check happens. time.Sleep(100 * time.Millisecond) + checkConnStatusMetric("SQ_CONN_ALL", 0) checkConnStatusMetric("SQ_CONN_1", 0) checkConnStatusMetric("SQ_CONN_2", 0) checkConnStatusMetric("SQ_CONN_3", 0) @@ -201,9 +211,11 @@ func TestDiamConnStats(t *testing.T) { // restart connection from host1 connHost1 = initDiamConn("host1", "realm1") + t.Cleanup(func() { connHost1.Close() }) + time.Sleep(10 * time.Millisecond) // wait for stats to process + checkConnStatusMetric("SQ_CONN_ALL", 1) checkConnStatusMetric("SQ_CONN_1", 1) checkConnStatusMetric("SQ_CONN_2", 0) checkConnStatusMetric("SQ_CONN_3", 0) - t.Cleanup(func() { connHost1.Close() }) // scrapePromURL(t) } diff --git a/engine/statmetrics.go b/engine/statmetrics.go index 905edd1c6..6852e83ee 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -1421,10 +1421,14 @@ func (ddc *StatDDC) GetCompressFactor(events map[string]int) map[string]int { } func NewStatSum(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { + flds, err := config.NewRSRParsers(extraParams, utils.InfieldSep) + if err != nil { + return nil, err + } return &StatSum{ Events: make(map[string]*StatWithCompress), MinItems: minItems, - FieldName: extraParams, + Fields: flds, FilterIDs: filterIDs, }, nil } @@ -1435,7 +1439,7 @@ type StatSum struct { Count int64 Events map[string]*StatWithCompress // map[EventTenantID]Cost MinItems int - FieldName string + Fields config.RSRParsers val *float64 // cached sum value } @@ -1445,10 +1449,10 @@ func (s *StatSum) Clone() StatMetric { return nil } clone := &StatSum{ - Sum: s.Sum, - Count: s.Count, - MinItems: s.MinItems, - FieldName: s.FieldName, + Sum: s.Sum, + Count: s.Count, + MinItems: s.MinItems, + Fields: s.Fields, } if s.FilterIDs != nil { clone.FilterIDs = make([]string, len(s.FilterIDs)) @@ -1500,39 +1504,11 @@ func (sum *StatSum) GetFloat64Value(roundingDecimal int) (v float64) { return sum.getValue(roundingDecimal) } -// connStatusToFloat converts connection status strings to numeric values for *sum metrics. -// This allows sending status as strings in events while computing numeric sums: -// - "UP" returns 1 (connection established) -// - "DOWN" returns -1 (connection closed) -// - "DUPLICATE" returns 0 (connection already exists, no change) -func connStatusToFloat(v string) (float64, bool) { - switch v { - case utils.ConnStatusUp: - return 1, true - case utils.ConnStatusDown: - return -1, true - case utils.ConnStatusDuplicate: - return 0, true - } - return 0, false -} - func (sum *StatSum) getFieldVal(ev utils.DataProvider) (float64, error) { - ival, err := utils.DPDynamicInterface(sum.FieldName, ev) + ival, err := sum.Fields.ParseDataProvider(ev) if err != nil { - if err == utils.ErrNotFound { - err = utils.ErrPrefix(err, sum.FieldName) - } return 0, err } - - // Check for connection status strings before numeric conversion. - if str, ok := ival.(string); ok { - if v, isConnStatus := connStatusToFloat(str); isConnStatus { - return v, nil - } - } - return utils.IfaceAsFloat64(ival) } diff --git a/engine/statmetrics_test.go b/engine/statmetrics_test.go index 85aaef681..c9dec8088 100644 --- a/engine/statmetrics_test.go +++ b/engine/statmetrics_test.go @@ -2267,7 +2267,8 @@ func TestStatSumGetFloat64Value(t *testing.T) { t.Errorf("wrong statSum value: %v", v) } ev2 := &utils.CGREvent{Tenant: "cgrates.org", ID: "EVENT_2"} - if err := statSum.AddEvent(ev2.ID, utils.MapStorage{utils.MetaReq: ev2.Event}); err == nil || err.Error() != "NOT_FOUND:~*req.Cost" { + if err := statSum.AddEvent(ev2.ID, utils.MapStorage{utils.MetaReq: ev2.Event}); err == nil || + err.Error() != utils.ErrNotFound.Error() { t.Error(err) } if v := statSum.GetFloat64Value(config.CgrConfig().GeneralCfg().RoundingDecimals); v != -1.0 { @@ -2386,7 +2387,12 @@ func TestStatSumGetStringValue2(t *testing.T) { } func TestStatSumGetStringValue3(t *testing.T) { - statSum := &StatSum{Events: make(map[string]*StatWithCompress), MinItems: 2, FilterIDs: []string{}, FieldName: "~*req.Cost"} + statSum := &StatSum{ + Events: make(map[string]*StatWithCompress), + MinItems: 2, + FilterIDs: []string{}, + Fields: config.NewRSRParsersMustCompile("~*req.Cost", utils.InfieldSep), + } expected := &StatSum{ Events: map[string]*StatWithCompress{ "EVENT_1": {Stat: 12.2, CompressFactor: 2}, @@ -2394,7 +2400,7 @@ func TestStatSumGetStringValue3(t *testing.T) { }, MinItems: 2, FilterIDs: []string{}, - FieldName: "~*req.Cost", + Fields: config.NewRSRParsersMustCompile("~*req.Cost", utils.InfieldSep), Count: 3, Sum: 42.7, } @@ -2432,8 +2438,12 @@ func TestStatSumGetStringValue3(t *testing.T) { } func TestStatSumCompress(t *testing.T) { - sum := &StatSum{Events: make(map[string]*StatWithCompress), FieldName: "~*req.Cost", - MinItems: 2, FilterIDs: []string{}} + sum := &StatSum{ + Events: make(map[string]*StatWithCompress), + Fields: config.NewRSRParsersMustCompile("~*req.Cost", utils.InfieldSep), + MinItems: 2, + FilterIDs: []string{}, + } expected := &StatSum{ Events: map[string]*StatWithCompress{ "EVENT_1": {Stat: 18.2, CompressFactor: 1}, @@ -2442,7 +2452,7 @@ func TestStatSumCompress(t *testing.T) { MinItems: 2, FilterIDs: []string{}, Sum: 24.4, - FieldName: "~*req.Cost", + Fields: config.NewRSRParsersMustCompile("~*req.Cost", utils.InfieldSep), Count: 2, } expected.GetStringValue(config.CgrConfig().GeneralCfg().RoundingDecimals) @@ -2470,7 +2480,7 @@ func TestStatSumCompress(t *testing.T) { }, MinItems: 2, FilterIDs: []string{}, - FieldName: "~*req.Cost", + Fields: config.NewRSRParsersMustCompile("~*req.Cost", utils.InfieldSep), Sum: 24.4, Count: 2, } @@ -3231,7 +3241,7 @@ func TestStatSumMarshal(t *testing.T) { utils.Destination: "1002"}} statSum.AddEvent(ev.ID, utils.MapStorage{utils.MetaReq: ev.Event}) var nstatSum StatSum - expected := []byte(`{"FilterIDs":[],"Sum":20,"Count":1,"Events":{"EVENT_1":{"Stat":20,"CompressFactor":1}},"MinItems":2,"FieldName":"~*req.Cost"}`) + expected := []byte(`{"FilterIDs":[],"Sum":20,"Count":1,"Events":{"EVENT_1":{"Stat":20,"CompressFactor":1}},"MinItems":2,"Fields":[{"Rules":"~*req.Cost"}]}`) if b, err := statSum.Marshal(&jMarshaler); err != nil { t.Error(err) } else if !reflect.DeepEqual(expected, b) { @@ -3574,9 +3584,9 @@ func TestStatMetricsStatSumGetMinItems(t *testing.T) { CompressFactor: 6, }, }, - MinItems: 20, - FieldName: "Field_Name", - val: nil, + MinItems: 20, + Fields: config.NewRSRParsersMustCompile("Field Name", utils.InfieldSep), + val: nil, } result := sum.GetMinItems() if !reflect.DeepEqual(result, 20) { @@ -3595,9 +3605,9 @@ func TestStatMetricsStatSumGetFilterIDs(t *testing.T) { CompressFactor: 6, }, }, - MinItems: 20, - FieldName: "Field_Name", - val: nil, + MinItems: 20, + Fields: config.NewRSRParsersMustCompile("Field Name", utils.InfieldSep), + val: nil, } result := sum.GetFilterIDs() if !reflect.DeepEqual(result, sum.FilterIDs) { @@ -3616,9 +3626,9 @@ func TestStatMetricsStatSumGetValue(t *testing.T) { CompressFactor: 6, }, }, - MinItems: 20, - FieldName: "Field_Name", - val: nil, + MinItems: 20, + Fields: config.NewRSRParsersMustCompile("Field Name", utils.InfieldSep), + val: nil, } result := sum.GetValue(50) if !reflect.DeepEqual(result, float64(-1)) { @@ -5099,7 +5109,7 @@ func TestStatSumClone(t *testing.T) { Sum: 123.45, Count: 5, MinItems: 3, - FieldName: "*cost", + Fields: config.NewRSRParsersMustCompile("*cost", utils.InfieldSep), val: &val, Events: map[string]*StatWithCompress{ "event1": {Stat: 55.5, CompressFactor: 2}, diff --git a/general_tests/offline_internal_apis_it_test.go b/general_tests/offline_internal_apis_it_test.go index 83b33b379..068962131 100644 --- a/general_tests/offline_internal_apis_it_test.go +++ b/general_tests/offline_internal_apis_it_test.go @@ -112,7 +112,7 @@ func TestOfflineInternalAPIsDumpDataDB(t *testing.T) { } else if files != 42 { t.Errorf("expected 42 files, received <%d>", files) } - if totalSize < 35500 || totalSize > 35700 { + if totalSize < 35600 || totalSize > 35800 { t.Errorf("expected folder size to be within range 35500KB to 35700KB, received <%v>KB", totalSize) } }) diff --git a/utils/consts.go b/utils/consts.go index 5ee66d543..a29fbb2da 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -828,6 +828,7 @@ const ( MetaSIPURIMethod = "*sipuri_method" MetaSIPURIHost = "*sipuri_host" MetaSIPURIUser = "*sipuri_user" + MetaConnStatus = "*conn_status" E164DomainConverter = "*e164Domain" E164Converter = "*e164" URLDecConverter = "*urldecode" diff --git a/utils/dataconverter.go b/utils/dataconverter.go index 5015e458f..3ecadc23e 100644 --- a/utils/dataconverter.go +++ b/utils/dataconverter.go @@ -132,6 +132,8 @@ func NewDataConverter(params string) (conv DataConverter, err error) { return NewRandomConverter(params[len(MetaRandom)+1:]) case strings.HasPrefix(params, MetaStrip): return NewStripConverter(params) + case params == MetaConnStatus: + return ConnStatusConverter{}, nil case strings.HasPrefix(params, MetaGigawords): return new(GigawordsConverter), nil default: @@ -841,3 +843,21 @@ func (ts TimeStringConverter) Convert(in any) (out any, err error) { tm = tm.In(ts.loc) return tm.Format(ts.layout), nil } + +// ConnStatusConverter converts connection status strings to numeric values. +// Returns 1 for UP, -1 for DOWN, and 0 for DUPLICATE. +type ConnStatusConverter struct{} + +// Convert implements DataConverter interface +func (c ConnStatusConverter) Convert(in any) (any, error) { + status := IfaceAsString(in) + switch status { + case ConnStatusUp: + return 1, nil + case ConnStatusDown: + return -1, nil + case ConnStatusDuplicate: + return 0, nil + } + return 0, fmt.Errorf("unsupported connection status: %q", status) +}