add StatHighest and StatLowest metric implementations

This commit is contained in:
ionutboangiu
2025-11-04 13:52:36 +02:00
committed by Dan Christian Bogos
parent 37ba187658
commit 240a93b512
4 changed files with 316 additions and 5 deletions

View File

@@ -214,6 +214,15 @@ func TestDiamPrometheus(t *testing.T) {
{
MetricID: "*sum#~*req.ProcessingTime",
},
{
MetricID: "*highest#~*req.ProcessingTime",
},
{
MetricID: "*lowest#~*req.ProcessingTime",
},
{
MetricID: "*distinct#~*req.ProcessingTime",
},
},
Stored: true,
MinItems: 1,
@@ -237,6 +246,15 @@ func TestDiamPrometheus(t *testing.T) {
{
MetricID: "*sum#~*req.ProcessingTime",
},
{
MetricID: "*highest#~*req.ProcessingTime",
},
{
MetricID: "*lowest#~*req.ProcessingTime",
},
{
MetricID: "*distinct#~*req.ProcessingTime",
},
},
Stored: true,
MinItems: 1,

View File

@@ -209,13 +209,14 @@ func (ssq *StoredStatQueue) AsStatQueue(ms utils.Marshaler) (sq *StatQueue, err
}
copy(sq.SQItems, ssq.SQItems)
for metricID, marshaled := range ssq.SQMetrics {
if metric, err := NewStatMetric(metricID, 0, []string{}); err != nil {
metric, err := NewStatMetric(metricID, 0, []string{})
if err != nil {
return nil, err
} else if err := ms.Unmarshal(marshaled, metric); err != nil {
return nil, err
} else {
sq.SQMetrics[metricID] = metric
}
if err := ms.Unmarshal(marshaled, metric); err != nil {
return nil, err
}
sq.SQMetrics[metricID] = metric
}
if ssq.Compressed {
sq.Expand()
@@ -540,6 +541,10 @@ func (sq *StatQueue) UnmarshalJSON(data []byte) (err error) {
metric = new(StatAverage)
case utils.MetaDistinct:
metric = new(StatDistinct)
case utils.MetaHighest:
metric = new(StatHighest)
case utils.MetaLowest:
metric = new(StatLowest)
default:
return fmt.Errorf("unsupported metric type <%s>", metricSplit[0])
}

View File

@@ -19,7 +19,9 @@ along with this program. If not, see <https://www.gnu.org/licenses/>
package engine
import (
"errors"
"fmt"
"math"
"slices"
"strconv"
"strings"
@@ -45,6 +47,8 @@ func NewStatMetric(metricID string, minItems uint64, filterIDs []string) (sm Sta
utils.MetaSum: NewStatSum,
utils.MetaAverage: NewStatAverage,
utils.MetaDistinct: NewStatDistinct,
utils.MetaHighest: NewStatHighest,
utils.MetaLowest: NewStatLowest,
}
// split the metricID
// in case of *sum we have *sum#~*req.FieldName
@@ -1029,3 +1033,285 @@ func (dst *StatDistinct) Clone() StatMetric {
}
return cln
}
// NewStatHighest creates a StatHighest metric for tracking maximum field values.
func NewStatHighest(minItems uint64, fieldName string, filterIDs []string) StatMetric {
return &StatHighest{
FilterIDs: filterIDs,
MinItems: minItems,
FieldName: fieldName,
Highest: utils.NewDecimal(0, 0),
Events: make(map[string]*utils.Decimal),
}
}
// StatHighest tracks the maximum value for a specific field across events.
type StatHighest struct {
FilterIDs []string // event filters to apply before processing
FieldName string // field path to extract from events
MinItems uint64 // minimum events required for valid results
Highest *utils.Decimal // current maximum value tracked
Count uint64 // number of events currently tracked
Events map[string]*utils.Decimal // event values indexed by ID for deletion
}
// Clone creates a deep copy of StatHighest.
func (s *StatHighest) Clone() StatMetric {
if s == nil {
return nil
}
clone := &StatHighest{
FilterIDs: slices.Clone(s.FilterIDs),
Highest: s.Highest,
Count: s.Count,
MinItems: s.MinItems,
FieldName: s.FieldName,
Events: maps.Clone(s.Events),
}
return clone
}
func (s *StatHighest) GetStringValue(decimals int) string {
if s.Count == 0 || s.Count < s.MinItems {
return utils.NotAvailable
}
v, _ := s.Highest.Round(decimals).Float64()
return strconv.FormatFloat(v, 'f', -1, 64)
}
func (s *StatHighest) GetValue() *utils.Decimal {
if s.Count == 0 || s.Count < s.MinItems {
return utils.DecimalNaN
}
return s.Highest
}
// AddEvent processes a new event, updating highest value if necessary
func (s *StatHighest) AddEvent(evID string, ev utils.DataProvider) error {
val, err := fieldValue(s.FieldName, ev)
if err != nil {
return err
}
if val.Compare(s.Highest) == 1 {
s.Highest = val
}
// Only increment count for new events.
if _, exists := s.Events[evID]; !exists {
s.Count++
}
s.Events[evID] = val
return nil
}
// AddOneEvent processes event without storing for removal (used when events
// never expire).
func (s *StatHighest) AddOneEvent(ev utils.DataProvider) error {
val, err := fieldValue(s.FieldName, ev)
if err != nil {
return err
}
if val.Compare(s.Highest) == 1 {
s.Highest = val
}
s.Count++
return nil
}
func (s *StatHighest) RemEvent(evID string) error {
v, exists := s.Events[evID]
if !exists {
return utils.ErrNotFound
}
delete(s.Events, evID)
s.Count--
if v.Compare(s.Highest) == 0 {
s.Highest = utils.NewDecimal(0, 0) // reset highest
// Find new highest among remaining events.
for _, val := range s.Events {
if val.Compare(s.Highest) == 1 {
s.Highest = val
}
}
}
return nil
}
// GetFilterIDs is part of StatMetric interface.
func (s *StatHighest) GetFilterIDs() []string {
return s.FilterIDs
}
// GetMinItems returns the minimum items for the metric.
func (s *StatHighest) GetMinItems() uint64 { return s.MinItems }
// Compress is part of StatMetric interface.
func (s *StatHighest) Compress(_ uint64, _ string) []string {
eventIDs := make([]string, 0, len(s.Events))
for id := range s.Events {
eventIDs = append(eventIDs, id)
}
return eventIDs
}
func (s *StatHighest) GetCompressFactor(events map[string]uint64) map[string]uint64 {
for id := range s.Events {
if _, exists := events[id]; !exists {
events[id] = 1
}
}
return events
}
// NewStatLowest creates a StatLowest metric for tracking minimum field values.
func NewStatLowest(minItems uint64, fieldName string, filterIDs []string) StatMetric {
return &StatLowest{
FilterIDs: filterIDs,
MinItems: minItems,
FieldName: fieldName,
Lowest: utils.NewDecimalFromFloat64(math.MaxFloat64),
Events: make(map[string]*utils.Decimal),
}
}
// StatLowest tracks the minimum value for a specific field across events.
type StatLowest struct {
FilterIDs []string // event filters to apply before processing
FieldName string // field path to extract from events
MinItems uint64 // minimum events required for valid results
Lowest *utils.Decimal // current minimum value tracked
Count uint64 // number of events currently tracked
Events map[string]*utils.Decimal // event values indexed by ID for deletion
}
// Clone creates a deep copy of StatLowest.
func (s *StatLowest) Clone() StatMetric {
if s == nil {
return nil
}
clone := &StatLowest{
FilterIDs: slices.Clone(s.FilterIDs),
Lowest: s.Lowest,
Count: s.Count,
MinItems: s.MinItems,
FieldName: s.FieldName,
Events: maps.Clone(s.Events),
}
return clone
}
func (s *StatLowest) GetStringValue(decimals int) string {
if s.Count == 0 || s.Count < s.MinItems {
return utils.NotAvailable
}
v, _ := s.Lowest.Round(decimals).Float64()
return strconv.FormatFloat(v, 'f', -1, 64)
}
func (s *StatLowest) GetValue() *utils.Decimal {
if s.Count == 0 || s.Count < s.MinItems {
return utils.DecimalNaN
}
return s.Lowest
}
// AddEvent processes a new event, updating lowest value if necessary.
func (s *StatLowest) AddEvent(evID string, ev utils.DataProvider) error {
val, err := fieldValue(s.FieldName, ev)
if err != nil {
return err
}
if val.Compare(s.Lowest) == -1 {
s.Lowest = val
}
// Only increment count for new events.
if _, exists := s.Events[evID]; !exists {
s.Count++
}
s.Events[evID] = val
return nil
}
// AddOneEvent processes event without storing for removal (used when events
// never expire).
func (s *StatLowest) AddOneEvent(ev utils.DataProvider) error {
val, err := fieldValue(s.FieldName, ev)
if err != nil {
return err
}
if val.Compare(s.Lowest) == -1 {
s.Lowest = val
}
s.Count++
return nil
}
func (s *StatLowest) RemEvent(evID string) error {
v, exists := s.Events[evID]
if !exists {
return utils.ErrNotFound
}
delete(s.Events, evID)
s.Count--
if v.Compare(s.Lowest) == 0 {
s.Lowest = utils.NewDecimalFromFloat64(math.MaxFloat64) // reset lowest
// Find new lowest among remaining events.
for _, val := range s.Events {
if val.Compare(s.Lowest) == -1 {
s.Lowest = val
}
}
}
return nil
}
// GetFilterIDs is part of StatMetric interface.
func (s *StatLowest) GetFilterIDs() []string {
return s.FilterIDs
}
// GetMinItems returns the minimum items for the metric.
func (s *StatLowest) GetMinItems() uint64 { return s.MinItems }
// Compress is part of StatMetric interface.
func (s *StatLowest) Compress(_ uint64, _ string) []string {
eventIDs := make([]string, 0, len(s.Events))
for id := range s.Events {
eventIDs = append(eventIDs, id)
}
return eventIDs
}
func (s *StatLowest) GetCompressFactor(events map[string]uint64) map[string]uint64 {
for id := range s.Events {
if _, exists := events[id]; !exists {
events[id] = 1
}
}
return events
}
// fieldValue gets the numeric value from the DataProvider.
func fieldValue(fldName string, dp utils.DataProvider) (*utils.Decimal, error) {
ival, err := utils.DPDynamicInterface(fldName, dp)
if err != nil {
if errors.Is(err, utils.ErrNotFound) {
return nil, utils.ErrPrefix(err, fldName)
// NOTE: return below might be clearer
// return nil, fmt.Errorf("field %s: %v", field, err)
}
return nil, err
}
v, err := utils.IfaceAsBig(ival)
if err != nil {
return nil, err
}
return &utils.Decimal{Big: v}, nil
}

View File

@@ -1082,6 +1082,8 @@ const (
MetaSum = "*sum"
MetaAverage = "*average"
MetaDistinct = "*distinct"
MetaHighest = "*highest"
MetaLowest = "*lowest"
)
// Diameter/Radius request types