diff --git a/agents/diam_prom_it_test.go b/agents/diam_prom_it_test.go index d9d81a715..0089c42a1 100644 --- a/agents/diam_prom_it_test.go +++ b/agents/diam_prom_it_test.go @@ -79,21 +79,11 @@ func TestDiamPrometheus(t *testing.T) { "enabled": true, "store_interval": "-1" }, -"rpc_conns": { - "async": { - "strategy": "*async", - "conns": [ - { - "address": "*internal" - } - ] - } -}, "diameter_agent": { "enabled": true, "sessions_conns": ["*birpc_internal"], - "stats_conns": ["async"], - "thresholds_conns": ["async"], + "stats_conns": ["*internal"], + "thresholds_conns": ["*internal"], "request_processors": [{ "id": "message", "filters": [ @@ -230,7 +220,7 @@ cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10`, StatQueueProfile: &engine.StatQueueProfile{ Tenant: "cgrates.org", ID: "SQ_1", - FilterIDs: []string{"*string:~*req.Category:sms"}, + FilterIDs: []string{"*string:~*opts.*eventType:ProcessTime"}, QueueLength: -1, TTL: 5 * time.Second, Metrics: []*engine.MetricWithFilters{ @@ -249,6 +239,15 @@ cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10`, { MetricID: "*distinct#~*req.ProcessingTime", }, + { + MetricID: utils.MetaREPSC, + }, + { + MetricID: utils.MetaREPFC, + }, + { + MetricID: utils.MetaREPFC + "#ERR_MESSAGE", + }, }, Stored: true, MinItems: 1, @@ -262,7 +261,7 @@ cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10`, StatQueueProfile: &engine.StatQueueProfile{ Tenant: "cgrates.org", ID: "SQ_2", - FilterIDs: []string{"*string:~*req.Category:sms"}, + FilterIDs: []string{"*string:~*opts.*eventType:ProcessTime"}, QueueLength: -1, TTL: 10 * time.Second, Metrics: []*engine.MetricWithFilters{ @@ -281,6 +280,15 @@ cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10`, { MetricID: "*distinct#~*req.ProcessingTime", }, + { + MetricID: utils.MetaREPSC, + }, + { + MetricID: utils.MetaREPFC, + }, + { + MetricID: utils.MetaREPFC + "#ERR_MESSAGE", + }, }, Stored: true, MinItems: 1, @@ -294,7 +302,7 @@ cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10`, ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "TH_1", - FilterIDs: []string{"*string:~*req.Category:sms"}, + FilterIDs: []string{"*string:~*opts.*eventType:ProcessTime"}, MaxHits: -1, MinHits: 8, MinSleep: time.Second, @@ -309,7 +317,7 @@ cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10`, ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "TH_2", - FilterIDs: []string{"*string:~*req.Category:sms"}, + FilterIDs: []string{"*string:~*opts.*eventType:ProcessTime"}, MaxHits: -1, MinHits: 10, MinSleep: time.Second, diff --git a/docs/stats.rst b/docs/stats.rst index 3e06dcb4b..1cfa13cb3 100644 --- a/docs/stats.rst +++ b/docs/stats.rst @@ -134,6 +134,18 @@ Following metrics are implemented: \*distinct Generic metric to return the distinct number of appearance of a field name within *Events*. Format: <*\*distinct#FieldName*>. +\*highest + Generic metric to return the highest value of a specific field within *Events*. Format: <*\*highest#FieldName*>. + +\*lowest + Generic metric to return the lowest value of a specific field within *Events*. Format: <*\*lowest#FieldName*>. + +\*repsc + Reply success count. Counts requests where ReplyState equals "OK". Uses *ReplyState* field in the *Event*. + +\*repfc + Reply fail count. Counts requests where ReplyState is not "OK". Uses *ReplyState* field in the *Event*. Format: <*\*repfc*> for all failed requests or <*\*repfc#ErrorType*> for specific error types (e.g., *repfc#ERR_INITIATE). + Use cases --------- diff --git a/engine/statmetrics.go b/engine/statmetrics.go index edb343f6c..9bfdf002e 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -83,6 +83,8 @@ func NewStatMetric(metricID string, minItems int, filterIDs []string) (sm StatMe utils.MetaDistinct: NewStatDistinct, utils.MetaHighest: NewStatHighest, utils.MetaLowest: NewStatLowest, + utils.MetaREPSC: NewStatREPSC, + utils.MetaREPFC: NewStatREPFC, } // split the metricID // in case of *sum we have *sum#~*req.FieldName @@ -1419,8 +1421,12 @@ func (ddc *StatDDC) GetCompressFactor(events map[string]int) map[string]int { } func NewStatSum(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { - return &StatSum{Events: make(map[string]*StatWithCompress), - MinItems: minItems, FieldName: extraParams, FilterIDs: filterIDs}, nil + return &StatSum{ + Events: make(map[string]*StatWithCompress), + MinItems: minItems, + FieldName: extraParams, + FilterIDs: filterIDs, + }, nil } type StatSum struct { @@ -2338,3 +2344,352 @@ func (s *StatLowest) GetCompressFactor(events map[string]int) map[string]int { } return events } + +// NewStatREPSC creates a StatREPSC metric for counting successful requests. +func NewStatREPSC(minItems int, _ string, filterIDs []string) (StatMetric, error) { + return &StatREPSC{ + FilterIDs: filterIDs, + MinItems: minItems, + Events: make(map[string]struct{}), + }, nil +} + +// StatREPSC counts requests where ReplyState equals "OK" +type StatREPSC struct { + FilterIDs []string // event filters to apply before processing + MinItems int // minimum events required for valid results + Count int64 // number of successful events tracked + Events map[string]struct{} // event IDs indexed for deletion + cachedVal *float64 // cached result to avoid recalculation +} + +// Clone creates a deep copy of StatREPSC. +func (s *StatREPSC) Clone() StatMetric { + if s == nil { + return nil + } + clone := &StatREPSC{ + FilterIDs: slices.Clone(s.FilterIDs), + MinItems: s.MinItems, + Count: s.Count, + Events: maps.Clone(s.Events), + } + if s.cachedVal != nil { + clone.cachedVal = utils.Float64Pointer(*s.cachedVal) + } + return clone +} + +func (s *StatREPSC) GetStringValue(decimals int) string { + v := s.getValue(decimals) + if v == utils.StatsNA { + return utils.NotAvailable + } + return strconv.FormatFloat(v, 'f', -1, 64) +} + +func (s *StatREPSC) GetValue(decimals int) any { + return s.getValue(decimals) +} + +func (s *StatREPSC) GetFloat64Value(decimals int) float64 { + return s.getValue(decimals) +} + +// getValue returns current count value, calculating if cache is invalid. +func (s *StatREPSC) getValue(_ int) float64 { + if s.cachedVal != nil { + return *s.cachedVal + } + + if s.Count == 0 || s.Count < int64(s.MinItems) { + s.cachedVal = utils.Float64Pointer(utils.StatsNA) + return *s.cachedVal + } + + v := float64(s.Count) + s.cachedVal = &v + return *s.cachedVal +} + +// getFieldValue gets the value of the ReplyState field from the DataProvider. +func (s *StatREPSC) getFieldValue(ev utils.DataProvider) (string, error) { + ival, err := ev.FieldAsInterface([]string{utils.MetaReq, utils.ReplyState}) + if err != nil { + if errors.Is(err, utils.ErrNotFound) { + return "", utils.ErrPrefix(err, utils.ReplyState) + // NOTE: return below might be clearer + // return 0, fmt.Errorf("field %s: %v", utils.ReplyState, err) + } + return "", err + } + return utils.IfaceAsString(ival), nil +} + +// AddEvent processes a new event, incrementing count if ReplyState is "OK". +func (s *StatREPSC) AddEvent(evID string, ev utils.DataProvider) error { + replyState, err := s.getFieldValue(ev) + if err != nil { + return err + } + if replyState != utils.OK { + return nil + } + + // Only increment count for new events. + if _, exists := s.Events[evID]; !exists { + s.Events[evID] = struct{}{} + s.Count++ + s.cachedVal = nil + } + + return nil +} + +// AddOneEvent processes event without storing for removal (used when events +// never expire). +func (s *StatREPSC) AddOneEvent(ev utils.DataProvider) error { + replyState, err := s.getFieldValue(ev) + if err != nil { + return err + } + if replyState != utils.OK { + return nil + } + + s.Count++ + s.cachedVal = nil + return nil +} + +func (s *StatREPSC) RemEvent(evID string) { + if _, exists := s.Events[evID]; !exists { + return + } + delete(s.Events, evID) + s.Count-- + s.cachedVal = nil +} + +func (s *StatREPSC) Marshal(ms Marshaler) ([]byte, error) { + return ms.Marshal(s) +} + +func (s *StatREPSC) LoadMarshaled(ms Marshaler, marshaled []byte) error { + return ms.Unmarshal(marshaled, &s) +} + +// GetFilterIDs is part of StatMetric interface. +func (s *StatREPSC) GetFilterIDs() []string { + return s.FilterIDs +} + +// GetMinItems returns the minimum items for the metric. +func (s *StatREPSC) GetMinItems() int { + return s.MinItems +} + +// Compress is part of StatMetric interface. +func (s *StatREPSC) Compress(queueLen int64, defaultID string, decimals int) []string { + eventIDs := make([]string, 0, len(s.Events)) + for id := range s.Events { + eventIDs = append(eventIDs, id) + } + return eventIDs +} + +func (s *StatREPSC) GetCompressFactor(events map[string]int) map[string]int { + for id := range s.Events { + if _, exists := events[id]; !exists { + events[id] = 1 + } + } + return events +} + +// NewStatREPFC creates a StatREPFC metric for counting failed requests. +func NewStatREPFC(minItems int, errorType string, filterIDs []string) (StatMetric, error) { + return &StatREPFC{ + FilterIDs: filterIDs, + MinItems: minItems, + ErrorType: errorType, + Events: make(map[string]struct{}), + }, nil +} + +// StatREPFC counts requests where ReplyState is not "OK". +type StatREPFC struct { + FilterIDs []string // event filters to apply before processing + MinItems int // minimum events required for valid results + ErrorType string // specific error type to filter for (empty = all errors) + Count int64 // number of failed events tracked + Events map[string]struct{} // event IDs indexed for deletion + cachedVal *float64 // cached result to avoid recalculation +} + +// Clone creates a deep copy of StatREPFC. +func (s *StatREPFC) Clone() StatMetric { + if s == nil { + return nil + } + clone := &StatREPFC{ + FilterIDs: slices.Clone(s.FilterIDs), + MinItems: s.MinItems, + ErrorType: s.ErrorType, + Count: s.Count, + Events: maps.Clone(s.Events), + } + if s.cachedVal != nil { + clone.cachedVal = utils.Float64Pointer(*s.cachedVal) + } + return clone +} + +func (s *StatREPFC) GetStringValue(decimals int) string { + v := s.getValue(decimals) + if v == utils.StatsNA { + return utils.NotAvailable + } + return strconv.FormatFloat(v, 'f', -1, 64) +} + +func (s *StatREPFC) GetValue(decimals int) any { + return s.getValue(decimals) +} + +func (s *StatREPFC) GetFloat64Value(decimals int) float64 { + return s.getValue(decimals) +} + +// getValue returns current count value, calculating if cache is invalid. +func (s *StatREPFC) getValue(_ int) float64 { + if s.cachedVal != nil { + return *s.cachedVal + } + + if s.Count == 0 || s.Count < int64(s.MinItems) { + s.cachedVal = utils.Float64Pointer(utils.StatsNA) + return *s.cachedVal + } + + v := float64(s.Count) + s.cachedVal = &v + return *s.cachedVal +} + +// getFieldValue gets the value of the ReplyState field from the DataProvider. +func (s *StatREPFC) getFieldValue(ev utils.DataProvider) (string, error) { + ival, err := ev.FieldAsInterface([]string{utils.MetaReq, utils.ReplyState}) + if err != nil { + if errors.Is(err, utils.ErrNotFound) { + return "", utils.ErrPrefix(err, utils.ReplyState) + // NOTE: return below might be clearer + // return 0, fmt.Errorf("field %s: %v", utils.ReplyState, err) + } + return "", err + } + return utils.IfaceAsString(ival), nil +} + +// AddEvent processes a new event, incrementing count if ReplyState is not "OK". +func (s *StatREPFC) AddEvent(evID string, ev utils.DataProvider) error { + replyState, err := s.getFieldValue(ev) + if err != nil { + return err + } + + // Skip if success when counting all failures, or if not matching specific + // error type. + if s.ErrorType == "" && replyState == utils.OK { + return nil + } + // Handle multiple errors separated by ";" (e.g., "ERR_TERMINATE;ERR_CDRS") + // Use split + exact match instead of strings.Contains to avoid false positives. + if s.ErrorType != "" { + errors := strings.Split(replyState, utils.InfieldSep) + if !slices.Contains(errors, s.ErrorType) { + return nil + } + } + + // Only increment count for new events. + if _, exists := s.Events[evID]; !exists { + s.Events[evID] = struct{}{} + s.Count++ + s.cachedVal = nil + } + + return nil +} + +// AddOneEvent processes event without storing for removal (used when events +// never expire). +func (s *StatREPFC) AddOneEvent(ev utils.DataProvider) error { + replyState, err := s.getFieldValue(ev) + if err != nil { + return err + } + + // Skip if success when counting all failures, or if not matching specific + // error type. + if s.ErrorType == "" && replyState == utils.OK { + return nil + } + // Handle multiple errors separated by ";" (e.g., "ERR_TERMINATE;ERR_CDRS") + // Use split + exact match instead of strings.Contains to avoid false positives + if s.ErrorType != "" { + errors := strings.Split(replyState, utils.InfieldSep) + if !slices.Contains(errors, s.ErrorType) { + return nil + } + } + + s.Count++ + s.cachedVal = nil + return nil +} + +func (s *StatREPFC) RemEvent(evID string) { + if _, exists := s.Events[evID]; !exists { + return + } + delete(s.Events, evID) + s.Count-- + s.cachedVal = nil +} + +func (s *StatREPFC) Marshal(ms Marshaler) ([]byte, error) { + return ms.Marshal(s) +} + +func (s *StatREPFC) LoadMarshaled(ms Marshaler, marshaled []byte) error { + return ms.Unmarshal(marshaled, &s) +} + +// GetFilterIDs is part of StatMetric interface. +func (s *StatREPFC) GetFilterIDs() []string { + return s.FilterIDs +} + +// GetMinItems returns the minimum items for the metric. +func (s *StatREPFC) GetMinItems() int { + return s.MinItems +} + +// Compress is part of StatMetric interface. +func (s *StatREPFC) Compress(queueLen int64, defaultID string, decimals int) []string { + eventIDs := make([]string, 0, len(s.Events)) + for id := range s.Events { + eventIDs = append(eventIDs, id) + } + return eventIDs +} + +func (s *StatREPFC) GetCompressFactor(events map[string]int) map[string]int { + for id := range s.Events { + if _, exists := events[id]; !exists { + events[id] = 1 + } + } + return events +} diff --git a/utils/consts.go b/utils/consts.go index 4cb14e62c..de640cc29 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1153,6 +1153,8 @@ const ( MetaPDD = "*pdd" MetaDDC = "*ddc" MetaSum = "*sum" + MetaREPSC = "*repsc" + MetaREPFC = "*repfc" MetaAverage = "*average" MetaDistinct = "*distinct" MetaHighest = "*highest"