diff --git a/agents/diam_prom_it_test.go b/agents/diam_prom_it_test.go index b53612b21..d9d81a715 100644 --- a/agents/diam_prom_it_test.go +++ b/agents/diam_prom_it_test.go @@ -240,6 +240,15 @@ cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10`, { MetricID: "*sum#~*req.ProcessingTime", }, + { + MetricID: "*highest#~*req.ProcessingTime", + }, + { + MetricID: "*lowest#~*req.ProcessingTime", + }, + { + MetricID: "*distinct#~*req.ProcessingTime", + }, }, Stored: true, MinItems: 1, @@ -263,6 +272,15 @@ cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10`, { MetricID: "*sum#~*req.ProcessingTime", }, + { + MetricID: "*highest#~*req.ProcessingTime", + }, + { + MetricID: "*lowest#~*req.ProcessingTime", + }, + { + MetricID: "*distinct#~*req.ProcessingTime", + }, }, Stored: true, MinItems: 1, diff --git a/engine/libstats.go b/engine/libstats.go index 5b6c14d82..9a197568d 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -206,13 +206,14 @@ func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error) copy(sq.SQItems, ssq.SQItems) for metricID, marshaled := range ssq.SQMetrics { - if metric, err := NewStatMetric(metricID, 0, []string{}); err != nil { + metric, err := NewStatMetric(metricID, 0, []string{}) + if err != nil { return nil, err - } else if err := metric.LoadMarshaled(ms, marshaled); err != nil { - return nil, err - } else { - sq.SQMetrics[metricID] = metric } + if err := metric.LoadMarshaled(ms, marshaled); err != nil { + return nil, err + } + sq.SQMetrics[metricID] = metric } if ssq.Compressed { sq.Expand() @@ -565,6 +566,10 @@ func (sq *StatQueue) UnmarshalJSON(data []byte) (err error) { metric = new(StatAverage) case utils.MetaDistinct: metric = new(StatDistinct) + case utils.MetaHighest: + metric = new(StatHighest) + case utils.MetaLowest: + metric = new(StatLowest) default: return fmt.Errorf("unsupported metric type <%s>", metricSplit[0]) } diff --git a/engine/statmetrics.go b/engine/statmetrics.go index 508603791..edb343f6c 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -19,7 +19,10 @@ along with this program. If not, see package engine import ( + "errors" "fmt" + "math" + "slices" "strconv" "strings" "time" @@ -78,6 +81,8 @@ func NewStatMetric(metricID string, minItems int, filterIDs []string) (sm StatMe utils.MetaSum: NewStatSum, utils.MetaAverage: NewStatAverage, utils.MetaDistinct: NewStatDistinct, + utils.MetaHighest: NewStatHighest, + utils.MetaLowest: NewStatLowest, } // split the metricID // in case of *sum we have *sum#~*req.FieldName @@ -1974,3 +1979,362 @@ func (dst *StatDistinct) GetCompressFactor(events map[string]int) map[string]int } return events } + +// NewStatHighest creates a StatHighest metric for tracking maximum field values. +func NewStatHighest(minItems int, fieldName string, filterIDs []string) (StatMetric, error) { + return &StatHighest{ + FilterIDs: filterIDs, + MinItems: minItems, + FieldName: fieldName, + Events: make(map[string]float64), + }, nil +} + +// StatHighest tracks the maximum value for a specific field across events. +type StatHighest struct { + FilterIDs []string // event filters to apply before processing + FieldName string // field path to extract from events + MinItems int // minimum events required for valid results + + Highest float64 // current maximum value tracked + Count int64 // number of events currently tracked + Events map[string]float64 // event values indexed by ID for deletion + + // cachedVal caches the result to avoid recalculation. + // Always invalidated on any state change to eliminate edge + // cases (i.e. when crossing MinItems threshold). + cachedVal *float64 +} + +// Clone creates a deep copy of StatHighest. +func (s *StatHighest) Clone() StatMetric { + if s == nil { + return nil + } + clone := &StatHighest{ + FilterIDs: slices.Clone(s.FilterIDs), + Highest: s.Highest, + Count: s.Count, + MinItems: s.MinItems, + FieldName: s.FieldName, + Events: maps.Clone(s.Events), + } + if s.cachedVal != nil { + val := *s.cachedVal + clone.cachedVal = &val + } + return clone +} + +func (s *StatHighest) GetStringValue(decimals int) string { + v := s.getValue(decimals) + if v == utils.StatsNA { + return utils.NotAvailable + } + return strconv.FormatFloat(v, 'f', -1, 64) +} + +func (s *StatHighest) GetValue(decimals int) any { + return s.getValue(decimals) +} + +func (s *StatHighest) GetFloat64Value(decimals int) float64 { + return s.getValue(decimals) +} + +// getValue returns current highest value, calculating if cache is invalid. +func (s *StatHighest) getValue(decimals int) float64 { + if s.cachedVal != nil { + return *s.cachedVal + } + if s.Count == 0 || s.Count < int64(s.MinItems) { + s.cachedVal = utils.Float64Pointer(utils.StatsNA) + return *s.cachedVal + } + v := utils.Round(s.Highest, decimals, utils.MetaRoundingMiddle) + s.cachedVal = &v + return v +} + +// AddEvent processes a new event, updating highest value if necessary +func (s *StatHighest) AddEvent(evID string, ev utils.DataProvider) error { + val, err := s.getFieldValue(ev) + if err != nil { + return err + } + if val > s.Highest { + s.Highest = val + } + + // Only increment count for new events. + if _, exists := s.Events[evID]; !exists { + s.Count++ + } + + s.Events[evID] = val + s.cachedVal = nil + return nil +} + +// AddOneEvent processes event without storing for removal (used when events +// never expire). +func (s *StatHighest) AddOneEvent(ev utils.DataProvider) error { + val, err := s.getFieldValue(ev) + if err != nil { + return err + } + if val > s.Highest { + s.Highest = val + } + s.Count++ + s.cachedVal = nil + return nil +} + +// getFieldValue gets the numeric value from the DataProvider. +func (s *StatHighest) getFieldValue(ev utils.DataProvider) (float64, error) { + ival, err := utils.DPDynamicInterface(s.FieldName, ev) + if err != nil { + if errors.Is(err, utils.ErrNotFound) { + return 0, utils.ErrPrefix(err, s.FieldName) + // NOTE: return below might be clearer + // return 0, fmt.Errorf("field %s: %v", s.FieldName, err) + } + return 0, err + } + return utils.IfaceAsFloat64(ival) +} + +func (s *StatHighest) RemEvent(evID string) { + v, exists := s.Events[evID] + if !exists { + return + } + delete(s.Events, evID) + s.Count-- + if v == s.Highest { + s.Highest = 0 // reset highest + + // Find new highest among remaining events. + for _, val := range s.Events { + if val > s.Highest { + s.Highest = val + } + } + } + s.cachedVal = nil +} + +func (s *StatHighest) Marshal(ms Marshaler) ([]byte, error) { + return ms.Marshal(s) +} + +func (s *StatHighest) LoadMarshaled(ms Marshaler, marshaled []byte) error { + return ms.Unmarshal(marshaled, &s) +} + +// GetFilterIDs is part of StatMetric interface. +func (s *StatHighest) GetFilterIDs() []string { + return s.FilterIDs +} + +// GetMinItems returns the minimum items for the metric. +func (s *StatHighest) GetMinItems() int { return s.MinItems } + +// Compress is part of StatMetric interface. +func (s *StatHighest) Compress(queueLen int64, defaultID string, decimals int) []string { + eventIDs := make([]string, 0, len(s.Events)) + for id := range s.Events { + eventIDs = append(eventIDs, id) + } + return eventIDs +} + +func (s *StatHighest) GetCompressFactor(events map[string]int) map[string]int { + for id := range s.Events { + if _, exists := events[id]; !exists { + events[id] = 1 + } + } + return events +} + +// NewStatLowest creates a StatLowest metric for tracking minimum field values. +func NewStatLowest(minItems int, fieldName string, filterIDs []string) (StatMetric, error) { + return &StatLowest{ + FilterIDs: filterIDs, + MinItems: minItems, + FieldName: fieldName, + Lowest: math.MaxFloat64, + Events: make(map[string]float64), + }, nil +} + +// StatLowest tracks the minimum value for a specific field across events. +type StatLowest struct { + FilterIDs []string // event filters to apply before processing + FieldName string // field path to extract from events + MinItems int // minimum events required for valid results + + Lowest float64 // current minimum value tracked + Count int64 // number of events currently tracked + Events map[string]float64 // event values indexed by ID for deletion + + // cachedVal caches the result to avoid recalculation. + // Always invalidated on any state change to eliminate edge + // cases (i.e. when crossing MinItems threshold). + cachedVal *float64 +} + +// Clone creates a deep copy of StatLowest. +func (s *StatLowest) Clone() StatMetric { + if s == nil { + return nil + } + clone := &StatLowest{ + FilterIDs: slices.Clone(s.FilterIDs), + Lowest: s.Lowest, + Count: s.Count, + MinItems: s.MinItems, + FieldName: s.FieldName, + Events: maps.Clone(s.Events), + } + if s.cachedVal != nil { + val := *s.cachedVal + clone.cachedVal = &val + } + return clone +} + +func (s *StatLowest) GetStringValue(decimals int) string { + v := s.getValue(decimals) + if v == utils.StatsNA { + return utils.NotAvailable + } + return strconv.FormatFloat(v, 'f', -1, 64) +} + +func (s *StatLowest) GetValue(decimals int) any { + return s.getValue(decimals) +} + +func (s *StatLowest) GetFloat64Value(decimals int) float64 { + return s.getValue(decimals) +} + +// getValue returns current lowest value, calculating if cache is invalid. +func (s *StatLowest) getValue(decimals int) float64 { + if s.cachedVal != nil { + return *s.cachedVal + } + if s.Count == 0 || s.Count < int64(s.MinItems) { + s.cachedVal = utils.Float64Pointer(utils.StatsNA) + return *s.cachedVal + } + v := utils.Round(s.Lowest, decimals, utils.MetaRoundingMiddle) + s.cachedVal = &v + return v +} + +// AddEvent processes a new event, updating lowest value if necessary. +func (s *StatLowest) AddEvent(evID string, ev utils.DataProvider) error { + val, err := s.getFieldValue(ev) + if err != nil { + return err + } + if val < s.Lowest { + s.Lowest = val + } + + // Only increment count for new events. + if _, exists := s.Events[evID]; !exists { + s.Count++ + } + + s.Events[evID] = val + s.cachedVal = nil + return nil +} + +// AddOneEvent processes event without storing for removal (used when events +// never expire). +func (s *StatLowest) AddOneEvent(ev utils.DataProvider) error { + val, err := s.getFieldValue(ev) + if err != nil { + return err + } + if val < s.Lowest { + s.Lowest = val + } + s.Count++ + s.cachedVal = nil + return nil +} + +// getFieldValue gets the numeric value from the DataProvider. +func (s *StatLowest) getFieldValue(ev utils.DataProvider) (float64, error) { + ival, err := utils.DPDynamicInterface(s.FieldName, ev) + if err != nil { + if errors.Is(err, utils.ErrNotFound) { + return 0, utils.ErrPrefix(err, s.FieldName) + // NOTE: return below might be clearer + // return 0, fmt.Errorf("field %s: %v", s.FieldName, err) + } + return 0, err + } + return utils.IfaceAsFloat64(ival) +} + +func (s *StatLowest) RemEvent(evID string) { + v, exists := s.Events[evID] + if !exists { + return + } + delete(s.Events, evID) + s.Count-- + if v == s.Lowest { + s.Lowest = math.MaxFloat64 // reset lowest + + // Find new lowest among remaining events. + for _, val := range s.Events { + if val < s.Lowest { + s.Lowest = val + } + } + } + s.cachedVal = nil +} + +func (s *StatLowest) Marshal(ms Marshaler) ([]byte, error) { + return ms.Marshal(s) +} + +func (s *StatLowest) LoadMarshaled(ms Marshaler, marshaled []byte) error { + return ms.Unmarshal(marshaled, &s) +} + +// GetFilterIDs is part of StatMetric interface. +func (s *StatLowest) GetFilterIDs() []string { + return s.FilterIDs +} + +// GetMinItems returns the minimum items for the metric. +func (s *StatLowest) GetMinItems() int { return s.MinItems } + +// Compress is part of StatMetric interface. +func (s *StatLowest) Compress(queueLen int64, defaultID string, decimals int) []string { + eventIDs := make([]string, 0, len(s.Events)) + for id := range s.Events { + eventIDs = append(eventIDs, id) + } + return eventIDs +} + +func (s *StatLowest) GetCompressFactor(events map[string]int) map[string]int { + for id := range s.Events { + if _, exists := events[id]; !exists { + events[id] = 1 + } + } + return events +} diff --git a/utils/consts.go b/utils/consts.go index aefcecba3..303ab99ec 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1137,9 +1137,15 @@ const ( MetaSum = "*sum" MetaAverage = "*average" MetaDistinct = "*distinct" - MetaRAR = "*rar" - MetaDMR = "*dmr" - MetaCoA = "*coa" + MetaHighest = "*highest" + MetaLowest = "*lowest" +) + +// Diameter/Radius request types +const ( + MetaRAR = "*rar" + MetaDMR = "*dmr" + MetaCoA = "*coa" ) // Services