From 240a93b512bc4f84618cce7bcc57d51ce8027cf8 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 4 Nov 2025 13:52:36 +0200 Subject: [PATCH] add StatHighest and StatLowest metric implementations --- agents/diam_prom_it_test.go | 18 +++ engine/libstats.go | 15 +- engine/statmetrics.go | 286 ++++++++++++++++++++++++++++++++++++ utils/consts.go | 2 + 4 files changed, 316 insertions(+), 5 deletions(-) diff --git a/agents/diam_prom_it_test.go b/agents/diam_prom_it_test.go index 77172075f..421b76b8a 100644 --- a/agents/diam_prom_it_test.go +++ b/agents/diam_prom_it_test.go @@ -214,6 +214,15 @@ func TestDiamPrometheus(t *testing.T) { { MetricID: "*sum#~*req.ProcessingTime", }, + { + MetricID: "*highest#~*req.ProcessingTime", + }, + { + MetricID: "*lowest#~*req.ProcessingTime", + }, + { + MetricID: "*distinct#~*req.ProcessingTime", + }, }, Stored: true, MinItems: 1, @@ -237,6 +246,15 @@ func TestDiamPrometheus(t *testing.T) { { MetricID: "*sum#~*req.ProcessingTime", }, + { + MetricID: "*highest#~*req.ProcessingTime", + }, + { + MetricID: "*lowest#~*req.ProcessingTime", + }, + { + MetricID: "*distinct#~*req.ProcessingTime", + }, }, Stored: true, MinItems: 1, diff --git a/engine/libstats.go b/engine/libstats.go index 8a549bf9c..5ebef77a6 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -209,13 +209,14 @@ func (ssq *StoredStatQueue) AsStatQueue(ms utils.Marshaler) (sq *StatQueue, err } copy(sq.SQItems, ssq.SQItems) for metricID, marshaled := range ssq.SQMetrics { - if metric, err := NewStatMetric(metricID, 0, []string{}); err != nil { + metric, err := NewStatMetric(metricID, 0, []string{}) + if err != nil { return nil, err - } else if err := ms.Unmarshal(marshaled, metric); err != nil { - return nil, err - } else { - sq.SQMetrics[metricID] = metric } + if err := ms.Unmarshal(marshaled, metric); err != nil { + return nil, err + } + sq.SQMetrics[metricID] = metric } if ssq.Compressed { sq.Expand() @@ -540,6 +541,10 @@ func (sq *StatQueue) UnmarshalJSON(data []byte) (err error) { metric = new(StatAverage) case utils.MetaDistinct: metric = new(StatDistinct) + case utils.MetaHighest: + metric = new(StatHighest) + case utils.MetaLowest: + metric = new(StatLowest) default: return fmt.Errorf("unsupported metric type <%s>", metricSplit[0]) } diff --git a/engine/statmetrics.go b/engine/statmetrics.go index 165f9d908..3f5b88196 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -19,7 +19,9 @@ along with this program. If not, see package engine import ( + "errors" "fmt" + "math" "slices" "strconv" "strings" @@ -45,6 +47,8 @@ func NewStatMetric(metricID string, minItems uint64, filterIDs []string) (sm Sta utils.MetaSum: NewStatSum, utils.MetaAverage: NewStatAverage, utils.MetaDistinct: NewStatDistinct, + utils.MetaHighest: NewStatHighest, + utils.MetaLowest: NewStatLowest, } // split the metricID // in case of *sum we have *sum#~*req.FieldName @@ -1029,3 +1033,285 @@ func (dst *StatDistinct) Clone() StatMetric { } return cln } + +// NewStatHighest creates a StatHighest metric for tracking maximum field values. +func NewStatHighest(minItems uint64, fieldName string, filterIDs []string) StatMetric { + return &StatHighest{ + FilterIDs: filterIDs, + MinItems: minItems, + FieldName: fieldName, + Highest: utils.NewDecimal(0, 0), + Events: make(map[string]*utils.Decimal), + } +} + +// StatHighest tracks the maximum value for a specific field across events. +type StatHighest struct { + FilterIDs []string // event filters to apply before processing + FieldName string // field path to extract from events + MinItems uint64 // minimum events required for valid results + + Highest *utils.Decimal // current maximum value tracked + Count uint64 // number of events currently tracked + Events map[string]*utils.Decimal // event values indexed by ID for deletion +} + +// Clone creates a deep copy of StatHighest. +func (s *StatHighest) Clone() StatMetric { + if s == nil { + return nil + } + clone := &StatHighest{ + FilterIDs: slices.Clone(s.FilterIDs), + Highest: s.Highest, + Count: s.Count, + MinItems: s.MinItems, + FieldName: s.FieldName, + Events: maps.Clone(s.Events), + } + return clone +} + +func (s *StatHighest) GetStringValue(decimals int) string { + if s.Count == 0 || s.Count < s.MinItems { + return utils.NotAvailable + } + v, _ := s.Highest.Round(decimals).Float64() + return strconv.FormatFloat(v, 'f', -1, 64) +} + +func (s *StatHighest) GetValue() *utils.Decimal { + if s.Count == 0 || s.Count < s.MinItems { + return utils.DecimalNaN + } + return s.Highest +} + +// AddEvent processes a new event, updating highest value if necessary +func (s *StatHighest) AddEvent(evID string, ev utils.DataProvider) error { + val, err := fieldValue(s.FieldName, ev) + if err != nil { + return err + } + if val.Compare(s.Highest) == 1 { + s.Highest = val + } + + // Only increment count for new events. + if _, exists := s.Events[evID]; !exists { + s.Count++ + } + + s.Events[evID] = val + return nil +} + +// AddOneEvent processes event without storing for removal (used when events +// never expire). +func (s *StatHighest) AddOneEvent(ev utils.DataProvider) error { + val, err := fieldValue(s.FieldName, ev) + if err != nil { + return err + } + if val.Compare(s.Highest) == 1 { + s.Highest = val + } + s.Count++ + return nil +} + +func (s *StatHighest) RemEvent(evID string) error { + v, exists := s.Events[evID] + if !exists { + return utils.ErrNotFound + } + delete(s.Events, evID) + s.Count-- + if v.Compare(s.Highest) == 0 { + s.Highest = utils.NewDecimal(0, 0) // reset highest + + // Find new highest among remaining events. + for _, val := range s.Events { + if val.Compare(s.Highest) == 1 { + s.Highest = val + } + } + } + return nil +} + +// GetFilterIDs is part of StatMetric interface. +func (s *StatHighest) GetFilterIDs() []string { + return s.FilterIDs +} + +// GetMinItems returns the minimum items for the metric. +func (s *StatHighest) GetMinItems() uint64 { return s.MinItems } + +// Compress is part of StatMetric interface. +func (s *StatHighest) Compress(_ uint64, _ string) []string { + eventIDs := make([]string, 0, len(s.Events)) + for id := range s.Events { + eventIDs = append(eventIDs, id) + } + return eventIDs +} + +func (s *StatHighest) GetCompressFactor(events map[string]uint64) map[string]uint64 { + for id := range s.Events { + if _, exists := events[id]; !exists { + events[id] = 1 + } + } + return events +} + +// NewStatLowest creates a StatLowest metric for tracking minimum field values. +func NewStatLowest(minItems uint64, fieldName string, filterIDs []string) StatMetric { + return &StatLowest{ + FilterIDs: filterIDs, + MinItems: minItems, + FieldName: fieldName, + Lowest: utils.NewDecimalFromFloat64(math.MaxFloat64), + Events: make(map[string]*utils.Decimal), + } +} + +// StatLowest tracks the minimum value for a specific field across events. +type StatLowest struct { + FilterIDs []string // event filters to apply before processing + FieldName string // field path to extract from events + MinItems uint64 // minimum events required for valid results + + Lowest *utils.Decimal // current minimum value tracked + Count uint64 // number of events currently tracked + Events map[string]*utils.Decimal // event values indexed by ID for deletion +} + +// Clone creates a deep copy of StatLowest. +func (s *StatLowest) Clone() StatMetric { + if s == nil { + return nil + } + clone := &StatLowest{ + FilterIDs: slices.Clone(s.FilterIDs), + Lowest: s.Lowest, + Count: s.Count, + MinItems: s.MinItems, + FieldName: s.FieldName, + Events: maps.Clone(s.Events), + } + return clone +} + +func (s *StatLowest) GetStringValue(decimals int) string { + if s.Count == 0 || s.Count < s.MinItems { + return utils.NotAvailable + } + v, _ := s.Lowest.Round(decimals).Float64() + return strconv.FormatFloat(v, 'f', -1, 64) +} + +func (s *StatLowest) GetValue() *utils.Decimal { + if s.Count == 0 || s.Count < s.MinItems { + return utils.DecimalNaN + } + return s.Lowest +} + +// AddEvent processes a new event, updating lowest value if necessary. +func (s *StatLowest) AddEvent(evID string, ev utils.DataProvider) error { + val, err := fieldValue(s.FieldName, ev) + if err != nil { + return err + } + if val.Compare(s.Lowest) == -1 { + s.Lowest = val + } + + // Only increment count for new events. + if _, exists := s.Events[evID]; !exists { + s.Count++ + } + + s.Events[evID] = val + return nil +} + +// AddOneEvent processes event without storing for removal (used when events +// never expire). +func (s *StatLowest) AddOneEvent(ev utils.DataProvider) error { + val, err := fieldValue(s.FieldName, ev) + if err != nil { + return err + } + if val.Compare(s.Lowest) == -1 { + s.Lowest = val + } + s.Count++ + return nil +} + +func (s *StatLowest) RemEvent(evID string) error { + v, exists := s.Events[evID] + if !exists { + return utils.ErrNotFound + } + delete(s.Events, evID) + s.Count-- + if v.Compare(s.Lowest) == 0 { + s.Lowest = utils.NewDecimalFromFloat64(math.MaxFloat64) // reset lowest + + // Find new lowest among remaining events. + for _, val := range s.Events { + if val.Compare(s.Lowest) == -1 { + s.Lowest = val + } + } + } + return nil +} + +// GetFilterIDs is part of StatMetric interface. +func (s *StatLowest) GetFilterIDs() []string { + return s.FilterIDs +} + +// GetMinItems returns the minimum items for the metric. +func (s *StatLowest) GetMinItems() uint64 { return s.MinItems } + +// Compress is part of StatMetric interface. +func (s *StatLowest) Compress(_ uint64, _ string) []string { + eventIDs := make([]string, 0, len(s.Events)) + for id := range s.Events { + eventIDs = append(eventIDs, id) + } + return eventIDs +} + +func (s *StatLowest) GetCompressFactor(events map[string]uint64) map[string]uint64 { + for id := range s.Events { + if _, exists := events[id]; !exists { + events[id] = 1 + } + } + return events +} + +// fieldValue gets the numeric value from the DataProvider. +func fieldValue(fldName string, dp utils.DataProvider) (*utils.Decimal, error) { + ival, err := utils.DPDynamicInterface(fldName, dp) + if err != nil { + if errors.Is(err, utils.ErrNotFound) { + return nil, utils.ErrPrefix(err, fldName) + // NOTE: return below might be clearer + // return nil, fmt.Errorf("field %s: %v", field, err) + } + return nil, err + } + v, err := utils.IfaceAsBig(ival) + if err != nil { + return nil, err + } + return &utils.Decimal{Big: v}, nil +} diff --git a/utils/consts.go b/utils/consts.go index 772ed631d..b1d6aba5c 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1082,6 +1082,8 @@ const ( MetaSum = "*sum" MetaAverage = "*average" MetaDistinct = "*distinct" + MetaHighest = "*highest" + MetaLowest = "*lowest" ) // Diameter/Radius request types