add StatHighest and StatLowest metric implementations

This commit is contained in:
ionutboangiu
2025-06-12 19:32:51 +03:00
committed by Dan Christian Bogos
parent fbb625962c
commit 4635069702
4 changed files with 401 additions and 8 deletions

View File

@@ -240,6 +240,15 @@ cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10`,
{
MetricID: "*sum#~*req.ProcessingTime",
},
{
MetricID: "*highest#~*req.ProcessingTime",
},
{
MetricID: "*lowest#~*req.ProcessingTime",
},
{
MetricID: "*distinct#~*req.ProcessingTime",
},
},
Stored: true,
MinItems: 1,
@@ -263,6 +272,15 @@ cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10`,
{
MetricID: "*sum#~*req.ProcessingTime",
},
{
MetricID: "*highest#~*req.ProcessingTime",
},
{
MetricID: "*lowest#~*req.ProcessingTime",
},
{
MetricID: "*distinct#~*req.ProcessingTime",
},
},
Stored: true,
MinItems: 1,

View File

@@ -206,13 +206,14 @@ func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error)
copy(sq.SQItems, ssq.SQItems)
for metricID, marshaled := range ssq.SQMetrics {
if metric, err := NewStatMetric(metricID, 0, []string{}); err != nil {
metric, err := NewStatMetric(metricID, 0, []string{})
if err != nil {
return nil, err
} else if err := metric.LoadMarshaled(ms, marshaled); err != nil {
return nil, err
} else {
sq.SQMetrics[metricID] = metric
}
if err := metric.LoadMarshaled(ms, marshaled); err != nil {
return nil, err
}
sq.SQMetrics[metricID] = metric
}
if ssq.Compressed {
sq.Expand()
@@ -565,6 +566,10 @@ func (sq *StatQueue) UnmarshalJSON(data []byte) (err error) {
metric = new(StatAverage)
case utils.MetaDistinct:
metric = new(StatDistinct)
case utils.MetaHighest:
metric = new(StatHighest)
case utils.MetaLowest:
metric = new(StatLowest)
default:
return fmt.Errorf("unsupported metric type <%s>", metricSplit[0])
}

View File

@@ -19,7 +19,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"errors"
"fmt"
"math"
"slices"
"strconv"
"strings"
"time"
@@ -78,6 +81,8 @@ func NewStatMetric(metricID string, minItems int, filterIDs []string) (sm StatMe
utils.MetaSum: NewStatSum,
utils.MetaAverage: NewStatAverage,
utils.MetaDistinct: NewStatDistinct,
utils.MetaHighest: NewStatHighest,
utils.MetaLowest: NewStatLowest,
}
// split the metricID
// in case of *sum we have *sum#~*req.FieldName
@@ -1974,3 +1979,362 @@ func (dst *StatDistinct) GetCompressFactor(events map[string]int) map[string]int
}
return events
}
// NewStatHighest creates a StatHighest metric for tracking maximum field values.
func NewStatHighest(minItems int, fieldName string, filterIDs []string) (StatMetric, error) {
return &StatHighest{
FilterIDs: filterIDs,
MinItems: minItems,
FieldName: fieldName,
Events: make(map[string]float64),
}, nil
}
// StatHighest tracks the maximum value for a specific field across events.
type StatHighest struct {
FilterIDs []string // event filters to apply before processing
FieldName string // field path to extract from events
MinItems int // minimum events required for valid results
Highest float64 // current maximum value tracked
Count int64 // number of events currently tracked
Events map[string]float64 // event values indexed by ID for deletion
// cachedVal caches the result to avoid recalculation.
// Always invalidated on any state change to eliminate edge
// cases (i.e. when crossing MinItems threshold).
cachedVal *float64
}
// Clone creates a deep copy of StatHighest.
func (s *StatHighest) Clone() StatMetric {
if s == nil {
return nil
}
clone := &StatHighest{
FilterIDs: slices.Clone(s.FilterIDs),
Highest: s.Highest,
Count: s.Count,
MinItems: s.MinItems,
FieldName: s.FieldName,
Events: maps.Clone(s.Events),
}
if s.cachedVal != nil {
val := *s.cachedVal
clone.cachedVal = &val
}
return clone
}
func (s *StatHighest) GetStringValue(decimals int) string {
v := s.getValue(decimals)
if v == utils.StatsNA {
return utils.NotAvailable
}
return strconv.FormatFloat(v, 'f', -1, 64)
}
func (s *StatHighest) GetValue(decimals int) any {
return s.getValue(decimals)
}
func (s *StatHighest) GetFloat64Value(decimals int) float64 {
return s.getValue(decimals)
}
// getValue returns current highest value, calculating if cache is invalid.
func (s *StatHighest) getValue(decimals int) float64 {
if s.cachedVal != nil {
return *s.cachedVal
}
if s.Count == 0 || s.Count < int64(s.MinItems) {
s.cachedVal = utils.Float64Pointer(utils.StatsNA)
return *s.cachedVal
}
v := utils.Round(s.Highest, decimals, utils.MetaRoundingMiddle)
s.cachedVal = &v
return v
}
// AddEvent processes a new event, updating highest value if necessary
func (s *StatHighest) AddEvent(evID string, ev utils.DataProvider) error {
val, err := s.getFieldValue(ev)
if err != nil {
return err
}
if val > s.Highest {
s.Highest = val
}
// Only increment count for new events.
if _, exists := s.Events[evID]; !exists {
s.Count++
}
s.Events[evID] = val
s.cachedVal = nil
return nil
}
// AddOneEvent processes event without storing for removal (used when events
// never expire).
func (s *StatHighest) AddOneEvent(ev utils.DataProvider) error {
val, err := s.getFieldValue(ev)
if err != nil {
return err
}
if val > s.Highest {
s.Highest = val
}
s.Count++
s.cachedVal = nil
return nil
}
// getFieldValue gets the numeric value from the DataProvider.
func (s *StatHighest) getFieldValue(ev utils.DataProvider) (float64, error) {
ival, err := utils.DPDynamicInterface(s.FieldName, ev)
if err != nil {
if errors.Is(err, utils.ErrNotFound) {
return 0, utils.ErrPrefix(err, s.FieldName)
// NOTE: return below might be clearer
// return 0, fmt.Errorf("field %s: %v", s.FieldName, err)
}
return 0, err
}
return utils.IfaceAsFloat64(ival)
}
func (s *StatHighest) RemEvent(evID string) {
v, exists := s.Events[evID]
if !exists {
return
}
delete(s.Events, evID)
s.Count--
if v == s.Highest {
s.Highest = 0 // reset highest
// Find new highest among remaining events.
for _, val := range s.Events {
if val > s.Highest {
s.Highest = val
}
}
}
s.cachedVal = nil
}
func (s *StatHighest) Marshal(ms Marshaler) ([]byte, error) {
return ms.Marshal(s)
}
func (s *StatHighest) LoadMarshaled(ms Marshaler, marshaled []byte) error {
return ms.Unmarshal(marshaled, &s)
}
// GetFilterIDs is part of StatMetric interface.
func (s *StatHighest) GetFilterIDs() []string {
return s.FilterIDs
}
// GetMinItems returns the minimum items for the metric.
func (s *StatHighest) GetMinItems() int { return s.MinItems }
// Compress is part of StatMetric interface.
func (s *StatHighest) Compress(queueLen int64, defaultID string, decimals int) []string {
eventIDs := make([]string, 0, len(s.Events))
for id := range s.Events {
eventIDs = append(eventIDs, id)
}
return eventIDs
}
func (s *StatHighest) GetCompressFactor(events map[string]int) map[string]int {
for id := range s.Events {
if _, exists := events[id]; !exists {
events[id] = 1
}
}
return events
}
// NewStatLowest creates a StatLowest metric for tracking minimum field values.
func NewStatLowest(minItems int, fieldName string, filterIDs []string) (StatMetric, error) {
return &StatLowest{
FilterIDs: filterIDs,
MinItems: minItems,
FieldName: fieldName,
Lowest: math.MaxFloat64,
Events: make(map[string]float64),
}, nil
}
// StatLowest tracks the minimum value for a specific field across events.
type StatLowest struct {
FilterIDs []string // event filters to apply before processing
FieldName string // field path to extract from events
MinItems int // minimum events required for valid results
Lowest float64 // current minimum value tracked
Count int64 // number of events currently tracked
Events map[string]float64 // event values indexed by ID for deletion
// cachedVal caches the result to avoid recalculation.
// Always invalidated on any state change to eliminate edge
// cases (i.e. when crossing MinItems threshold).
cachedVal *float64
}
// Clone creates a deep copy of StatLowest.
func (s *StatLowest) Clone() StatMetric {
if s == nil {
return nil
}
clone := &StatLowest{
FilterIDs: slices.Clone(s.FilterIDs),
Lowest: s.Lowest,
Count: s.Count,
MinItems: s.MinItems,
FieldName: s.FieldName,
Events: maps.Clone(s.Events),
}
if s.cachedVal != nil {
val := *s.cachedVal
clone.cachedVal = &val
}
return clone
}
func (s *StatLowest) GetStringValue(decimals int) string {
v := s.getValue(decimals)
if v == utils.StatsNA {
return utils.NotAvailable
}
return strconv.FormatFloat(v, 'f', -1, 64)
}
func (s *StatLowest) GetValue(decimals int) any {
return s.getValue(decimals)
}
func (s *StatLowest) GetFloat64Value(decimals int) float64 {
return s.getValue(decimals)
}
// getValue returns current lowest value, calculating if cache is invalid.
func (s *StatLowest) getValue(decimals int) float64 {
if s.cachedVal != nil {
return *s.cachedVal
}
if s.Count == 0 || s.Count < int64(s.MinItems) {
s.cachedVal = utils.Float64Pointer(utils.StatsNA)
return *s.cachedVal
}
v := utils.Round(s.Lowest, decimals, utils.MetaRoundingMiddle)
s.cachedVal = &v
return v
}
// AddEvent processes a new event, updating lowest value if necessary.
func (s *StatLowest) AddEvent(evID string, ev utils.DataProvider) error {
val, err := s.getFieldValue(ev)
if err != nil {
return err
}
if val < s.Lowest {
s.Lowest = val
}
// Only increment count for new events.
if _, exists := s.Events[evID]; !exists {
s.Count++
}
s.Events[evID] = val
s.cachedVal = nil
return nil
}
// AddOneEvent processes event without storing for removal (used when events
// never expire).
func (s *StatLowest) AddOneEvent(ev utils.DataProvider) error {
val, err := s.getFieldValue(ev)
if err != nil {
return err
}
if val < s.Lowest {
s.Lowest = val
}
s.Count++
s.cachedVal = nil
return nil
}
// getFieldValue gets the numeric value from the DataProvider.
func (s *StatLowest) getFieldValue(ev utils.DataProvider) (float64, error) {
ival, err := utils.DPDynamicInterface(s.FieldName, ev)
if err != nil {
if errors.Is(err, utils.ErrNotFound) {
return 0, utils.ErrPrefix(err, s.FieldName)
// NOTE: return below might be clearer
// return 0, fmt.Errorf("field %s: %v", s.FieldName, err)
}
return 0, err
}
return utils.IfaceAsFloat64(ival)
}
func (s *StatLowest) RemEvent(evID string) {
v, exists := s.Events[evID]
if !exists {
return
}
delete(s.Events, evID)
s.Count--
if v == s.Lowest {
s.Lowest = math.MaxFloat64 // reset lowest
// Find new lowest among remaining events.
for _, val := range s.Events {
if val < s.Lowest {
s.Lowest = val
}
}
}
s.cachedVal = nil
}
func (s *StatLowest) Marshal(ms Marshaler) ([]byte, error) {
return ms.Marshal(s)
}
func (s *StatLowest) LoadMarshaled(ms Marshaler, marshaled []byte) error {
return ms.Unmarshal(marshaled, &s)
}
// GetFilterIDs is part of StatMetric interface.
func (s *StatLowest) GetFilterIDs() []string {
return s.FilterIDs
}
// GetMinItems returns the minimum items for the metric.
func (s *StatLowest) GetMinItems() int { return s.MinItems }
// Compress is part of StatMetric interface.
func (s *StatLowest) Compress(queueLen int64, defaultID string, decimals int) []string {
eventIDs := make([]string, 0, len(s.Events))
for id := range s.Events {
eventIDs = append(eventIDs, id)
}
return eventIDs
}
func (s *StatLowest) GetCompressFactor(events map[string]int) map[string]int {
for id := range s.Events {
if _, exists := events[id]; !exists {
events[id] = 1
}
}
return events
}

View File

@@ -1137,9 +1137,15 @@ const (
MetaSum = "*sum"
MetaAverage = "*average"
MetaDistinct = "*distinct"
MetaRAR = "*rar"
MetaDMR = "*dmr"
MetaCoA = "*coa"
MetaHighest = "*highest"
MetaLowest = "*lowest"
)
// Diameter/Radius request types
const (
MetaRAR = "*rar"
MetaDMR = "*dmr"
MetaCoA = "*coa"
)
// Services