From 42b0ec3abe9115053646817fdd554b474900e70b Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 4 Nov 2025 13:52:44 +0200 Subject: [PATCH] stats: implement new REPSC/REPFC metrics used to track successful/failed requests. REPFC supports error filtering. --- agents/diam_prom_it_test.go | 36 +++-- engine/statmetrics.go | 284 +++++++++++++++++++++++++++++++++++- utils/consts.go | 2 + 3 files changed, 302 insertions(+), 20 deletions(-) diff --git a/agents/diam_prom_it_test.go b/agents/diam_prom_it_test.go index 421b76b8a..be8992bd7 100644 --- a/agents/diam_prom_it_test.go +++ b/agents/diam_prom_it_test.go @@ -84,21 +84,11 @@ func TestDiamPrometheus(t *testing.T) { "enabled": true, "store_interval": "-1" }, -"rpc_conns": { - "async": { - "strategy": "*async", - "conns": [ - { - "address": "*internal" - } - ] - } -}, "diameter_agent": { "enabled": true, "sessions_conns": ["*birpc_internal"], - "stats_conns": ["*localhost"], - "thresholds_conns": ["*localhost"], + "stats_conns": ["*internal"], + "thresholds_conns": ["*internal"], "request_processors": [{ "id": "message", "filters": [ @@ -204,7 +194,7 @@ func TestDiamPrometheus(t *testing.T) { StatQueueProfile: &engine.StatQueueProfile{ Tenant: "cgrates.org", ID: "SQ_1", - FilterIDs: []string{"*string:~*req.Category:sms"}, + FilterIDs: []string{"*string:~*opts.*eventType:ProcessTime"}, QueueLength: -1, TTL: 5 * time.Second, Metrics: []*engine.MetricWithFilters{ @@ -223,6 +213,15 @@ func TestDiamPrometheus(t *testing.T) { { MetricID: "*distinct#~*req.ProcessingTime", }, + { + MetricID: utils.MetaREPSC, + }, + { + MetricID: utils.MetaREPFC, + }, + { + MetricID: utils.MetaREPFC + "#ERR_MESSAGE", + }, }, Stored: true, MinItems: 1, @@ -236,7 +235,7 @@ func TestDiamPrometheus(t *testing.T) { StatQueueProfile: &engine.StatQueueProfile{ Tenant: "cgrates.org", ID: "SQ_2", - FilterIDs: []string{"*string:~*req.Category:sms"}, + FilterIDs: []string{"*string:~*opts.*eventType:ProcessTime"}, QueueLength: -1, TTL: 10 * time.Second, Metrics: []*engine.MetricWithFilters{ @@ -255,6 +254,15 @@ func TestDiamPrometheus(t *testing.T) { { MetricID: "*distinct#~*req.ProcessingTime", }, + { + MetricID: utils.MetaREPSC, + }, + { + MetricID: utils.MetaREPFC, + }, + { + MetricID: utils.MetaREPFC + "#ERR_MESSAGE", + }, }, Stored: true, MinItems: 1, diff --git a/engine/statmetrics.go b/engine/statmetrics.go index e5a07fb6f..1466e4aa4 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -49,6 +49,8 @@ func NewStatMetric(metricID string, minItems uint64, filterIDs []string) (sm Sta utils.MetaDistinct: NewStatDistinct, utils.MetaHighest: NewStatHighest, utils.MetaLowest: NewStatLowest, + utils.MetaREPSC: NewStatREPSC, + utils.MetaREPFC: NewStatREPFC, } // split the metricID // in case of *sum we have *sum#~*req.FieldName @@ -1089,7 +1091,7 @@ func (s *StatHighest) GetValue() *utils.Decimal { // AddEvent processes a new event, updating highest value if necessary func (s *StatHighest) AddEvent(evID string, ev utils.DataProvider) error { - val, err := fieldValue(s.FieldName, ev) + val, err := fieldValueFromDP(s.FieldName, ev) if err != nil { return err } @@ -1109,7 +1111,7 @@ func (s *StatHighest) AddEvent(evID string, ev utils.DataProvider) error { // AddOneEvent processes event without storing for removal (used when events // never expire). func (s *StatHighest) AddOneEvent(ev utils.DataProvider) error { - val, err := fieldValue(s.FieldName, ev) + val, err := fieldValueFromDP(s.FieldName, ev) if err != nil { return err } @@ -1221,7 +1223,7 @@ func (s *StatLowest) GetValue() *utils.Decimal { // AddEvent processes a new event, updating lowest value if necessary. func (s *StatLowest) AddEvent(evID string, ev utils.DataProvider) error { - val, err := fieldValue(s.FieldName, ev) + val, err := fieldValueFromDP(s.FieldName, ev) if err != nil { return err } @@ -1241,7 +1243,7 @@ func (s *StatLowest) AddEvent(evID string, ev utils.DataProvider) error { // AddOneEvent processes event without storing for removal (used when events // never expire). func (s *StatLowest) AddOneEvent(ev utils.DataProvider) error { - val, err := fieldValue(s.FieldName, ev) + val, err := fieldValueFromDP(s.FieldName, ev) if err != nil { return err } @@ -1298,8 +1300,264 @@ func (s *StatLowest) GetCompressFactor(events map[string]uint64) map[string]uint return events } -// fieldValue gets the numeric value from the DataProvider. -func fieldValue(fldName string, dp utils.DataProvider) (*utils.Decimal, error) { +// NewStatREPSC creates a StatREPSC metric for counting successful requests. +func NewStatREPSC(minItems uint64, _ string, filterIDs []string) StatMetric { + return &StatREPSC{ + FilterIDs: filterIDs, + MinItems: minItems, + Events: make(map[string]struct{}), + } +} + +// StatREPSC counts requests where ReplyState equals "OK" +type StatREPSC struct { + FilterIDs []string // event filters to apply before processing + MinItems uint64 // minimum events required for valid results + Count uint64 // number of successful events tracked + Events map[string]struct{} // event IDs indexed for deletion +} + +// Clone creates a deep copy of StatREPSC. +func (s *StatREPSC) Clone() StatMetric { + if s == nil { + return nil + } + clone := &StatREPSC{ + FilterIDs: slices.Clone(s.FilterIDs), + MinItems: s.MinItems, + Count: s.Count, + Events: maps.Clone(s.Events), + } + return clone +} + +func (s *StatREPSC) GetStringValue(_ int) string { + if s.Count == 0 || s.Count < s.MinItems { + return utils.NotAvailable + } + return strconv.Itoa(int(s.Count)) +} + +func (s *StatREPSC) GetValue() *utils.Decimal { + if s.Count == 0 || s.Count < s.MinItems { + return utils.DecimalNaN + } + return utils.NewDecimal(int64(s.Count), 0) +} + +// AddEvent processes a new event, incrementing count if ReplyState is "OK". +func (s *StatREPSC) AddEvent(evID string, ev utils.DataProvider) error { + replyState, err := replyStateFromDP(ev) + if err != nil { + return err + } + if replyState != utils.OK { + return nil + } + + // Only increment count for new events. + if _, exists := s.Events[evID]; !exists { + s.Events[evID] = struct{}{} + s.Count++ + } + + return nil +} + +// AddOneEvent processes event without storing for removal (used when events +// never expire). +func (s *StatREPSC) AddOneEvent(ev utils.DataProvider) error { + replyState, err := replyStateFromDP(ev) + if err != nil { + return err + } + if replyState != utils.OK { + return nil + } + s.Count++ + return nil +} + +func (s *StatREPSC) RemEvent(evID string) error { + if _, exists := s.Events[evID]; !exists { + return utils.ErrNotFound + } + delete(s.Events, evID) + s.Count-- + return nil +} + +// GetFilterIDs is part of StatMetric interface. +func (s *StatREPSC) GetFilterIDs() []string { + return s.FilterIDs +} + +// GetMinItems returns the minimum items for the metric. +func (s *StatREPSC) GetMinItems() uint64 { + return s.MinItems +} + +// Compress is part of StatMetric interface. +func (s *StatREPSC) Compress(_ uint64, _ string) []string { + eventIDs := make([]string, 0, len(s.Events)) + for id := range s.Events { + eventIDs = append(eventIDs, id) + } + return eventIDs +} + +func (s *StatREPSC) GetCompressFactor(events map[string]uint64) map[string]uint64 { + for id := range s.Events { + if _, exists := events[id]; !exists { + events[id] = 1 + } + } + return events +} + +// NewStatREPFC creates a StatREPFC metric for counting failed requests. +func NewStatREPFC(minItems uint64, errorType string, filterIDs []string) StatMetric { + return &StatREPFC{ + FilterIDs: filterIDs, + MinItems: minItems, + ErrorType: errorType, + Events: make(map[string]struct{}), + } +} + +// StatREPFC counts requests where ReplyState is not "OK". +type StatREPFC struct { + FilterIDs []string // event filters to apply before processing + MinItems uint64 // minimum events required for valid results + ErrorType string // specific error type to filter for (empty = all errors) + Count uint64 // number of failed events tracked + Events map[string]struct{} // event IDs indexed for deletion +} + +// Clone creates a deep copy of StatREPFC. +func (s *StatREPFC) Clone() StatMetric { + if s == nil { + return nil + } + clone := &StatREPFC{ + FilterIDs: slices.Clone(s.FilterIDs), + MinItems: s.MinItems, + ErrorType: s.ErrorType, + Count: s.Count, + Events: maps.Clone(s.Events), + } + return clone +} + +func (s *StatREPFC) GetStringValue(_ int) string { + if s.Count == 0 || s.Count < s.MinItems { + return utils.NotAvailable + } + return strconv.Itoa(int(s.Count)) +} + +func (s *StatREPFC) GetValue() *utils.Decimal { + if s.Count == 0 || s.Count < s.MinItems { + return utils.DecimalNaN + } + return utils.NewDecimal(int64(s.Count), 0) +} + +// AddEvent processes a new event, incrementing count if ReplyState is not "OK". +func (s *StatREPFC) AddEvent(evID string, ev utils.DataProvider) error { + replyState, err := replyStateFromDP(ev) + if err != nil { + return err + } + + // Skip if success when counting all failures, or if not matching specific + // error type. + if s.ErrorType == "" && replyState == utils.OK { + return nil + } + // Handle multiple errors separated by ";" (e.g., "ERR_TERMINATE;ERR_CDRS") + // Use split + exact match instead of strings.Contains to avoid false positives. + if s.ErrorType != "" { + errors := strings.Split(replyState, utils.InfieldSep) + if !slices.Contains(errors, s.ErrorType) { + return nil + } + } + + // Only increment count for new events. + if _, exists := s.Events[evID]; !exists { + s.Events[evID] = struct{}{} + s.Count++ + } + + return nil +} + +// AddOneEvent processes event without storing for removal (used when events +// never expire). +func (s *StatREPFC) AddOneEvent(ev utils.DataProvider) error { + replyState, err := replyStateFromDP(ev) + if err != nil { + return err + } + + // Skip if success when counting all failures, or if not matching specific + // error type. + if s.ErrorType == "" && replyState == utils.OK { + return nil + } + // Handle multiple errors separated by ";" (e.g., "ERR_TERMINATE;ERR_CDRS") + // Use split + exact match instead of strings.Contains to avoid false positives + if s.ErrorType != "" { + errors := strings.Split(replyState, utils.InfieldSep) + if !slices.Contains(errors, s.ErrorType) { + return nil + } + } + + s.Count++ + return nil +} + +func (s *StatREPFC) RemEvent(evID string) error { + if _, exists := s.Events[evID]; !exists { + return utils.ErrNotFound + } + delete(s.Events, evID) + s.Count-- + return nil +} + +// GetFilterIDs is part of StatMetric interface. +func (s *StatREPFC) GetFilterIDs() []string { + return s.FilterIDs +} + +// GetMinItems returns the minimum items for the metric. +func (s *StatREPFC) GetMinItems() uint64 { + return s.MinItems +} + +// Compress is part of StatMetric interface. +func (s *StatREPFC) Compress(_ uint64, _ string) []string { + eventIDs := make([]string, 0, len(s.Events)) + for id := range s.Events { + eventIDs = append(eventIDs, id) + } + return eventIDs +} + +func (s *StatREPFC) GetCompressFactor(events map[string]uint64) map[string]uint64 { + for id := range s.Events { + if _, exists := events[id]; !exists { + events[id] = 1 + } + } + return events +} + +// fieldValueFromDP gets the numeric value from the DataProvider. +func fieldValueFromDP(fldName string, dp utils.DataProvider) (*utils.Decimal, error) { ival, err := utils.DPDynamicInterface(fldName, dp) if err != nil { if errors.Is(err, utils.ErrNotFound) { @@ -1315,3 +1573,17 @@ func fieldValue(fldName string, dp utils.DataProvider) (*utils.Decimal, error) { } return &utils.Decimal{Big: v}, nil } + +// replyStateFromDP gets the numeric value from the DataProvider. +func replyStateFromDP(dp utils.DataProvider) (string, error) { + ival, err := dp.FieldAsInterface([]string{utils.MetaReq, utils.ReplyState}) + if err != nil { + if errors.Is(err, utils.ErrNotFound) { + return "", utils.ErrPrefix(err, utils.ReplyState) + // NOTE: return below might be clearer + // return 0, fmt.Errorf("field %s: %v", utils.ReplyState, err) + } + return "", err + } + return utils.IfaceAsString(ival), nil +} diff --git a/utils/consts.go b/utils/consts.go index c3efc2ad8..b2f5d68c0 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1093,6 +1093,8 @@ const ( MetaPDD = "*pdd" MetaDDC = "*ddc" MetaSum = "*sum" + MetaREPSC = "*repsc" + MetaREPFC = "*repfc" MetaAverage = "*average" MetaDistinct = "*distinct" MetaHighest = "*highest"