stats: implement new REPSC/REPFC metrics

used to track successful/failed requests. REPFC supports error filtering.
This commit is contained in:
ionutboangiu
2025-07-30 23:21:52 +03:00
committed by Ionut Boangiu
parent 9833152e38
commit 415d027761
4 changed files with 395 additions and 18 deletions

View File

@@ -83,6 +83,8 @@ func NewStatMetric(metricID string, minItems int, filterIDs []string) (sm StatMe
utils.MetaDistinct: NewStatDistinct,
utils.MetaHighest: NewStatHighest,
utils.MetaLowest: NewStatLowest,
utils.MetaREPSC: NewStatREPSC,
utils.MetaREPFC: NewStatREPFC,
}
// split the metricID
// in case of *sum we have *sum#~*req.FieldName
@@ -1419,8 +1421,12 @@ func (ddc *StatDDC) GetCompressFactor(events map[string]int) map[string]int {
}
func NewStatSum(minItems int, extraParams string, filterIDs []string) (StatMetric, error) {
return &StatSum{Events: make(map[string]*StatWithCompress),
MinItems: minItems, FieldName: extraParams, FilterIDs: filterIDs}, nil
return &StatSum{
Events: make(map[string]*StatWithCompress),
MinItems: minItems,
FieldName: extraParams,
FilterIDs: filterIDs,
}, nil
}
type StatSum struct {
@@ -2338,3 +2344,352 @@ func (s *StatLowest) GetCompressFactor(events map[string]int) map[string]int {
}
return events
}
// NewStatREPSC creates a StatREPSC metric for counting successful requests.
func NewStatREPSC(minItems int, _ string, filterIDs []string) (StatMetric, error) {
return &StatREPSC{
FilterIDs: filterIDs,
MinItems: minItems,
Events: make(map[string]struct{}),
}, nil
}
// StatREPSC counts requests where ReplyState equals "OK"
type StatREPSC struct {
FilterIDs []string // event filters to apply before processing
MinItems int // minimum events required for valid results
Count int64 // number of successful events tracked
Events map[string]struct{} // event IDs indexed for deletion
cachedVal *float64 // cached result to avoid recalculation
}
// Clone creates a deep copy of StatREPSC.
func (s *StatREPSC) Clone() StatMetric {
if s == nil {
return nil
}
clone := &StatREPSC{
FilterIDs: slices.Clone(s.FilterIDs),
MinItems: s.MinItems,
Count: s.Count,
Events: maps.Clone(s.Events),
}
if s.cachedVal != nil {
clone.cachedVal = utils.Float64Pointer(*s.cachedVal)
}
return clone
}
func (s *StatREPSC) GetStringValue(decimals int) string {
v := s.getValue(decimals)
if v == utils.StatsNA {
return utils.NotAvailable
}
return strconv.FormatFloat(v, 'f', -1, 64)
}
func (s *StatREPSC) GetValue(decimals int) any {
return s.getValue(decimals)
}
func (s *StatREPSC) GetFloat64Value(decimals int) float64 {
return s.getValue(decimals)
}
// getValue returns current count value, calculating if cache is invalid.
func (s *StatREPSC) getValue(_ int) float64 {
if s.cachedVal != nil {
return *s.cachedVal
}
if s.Count == 0 || s.Count < int64(s.MinItems) {
s.cachedVal = utils.Float64Pointer(utils.StatsNA)
return *s.cachedVal
}
v := float64(s.Count)
s.cachedVal = &v
return *s.cachedVal
}
// getFieldValue gets the value of the ReplyState field from the DataProvider.
func (s *StatREPSC) getFieldValue(ev utils.DataProvider) (string, error) {
ival, err := ev.FieldAsInterface([]string{utils.MetaReq, utils.ReplyState})
if err != nil {
if errors.Is(err, utils.ErrNotFound) {
return "", utils.ErrPrefix(err, utils.ReplyState)
// NOTE: return below might be clearer
// return 0, fmt.Errorf("field %s: %v", utils.ReplyState, err)
}
return "", err
}
return utils.IfaceAsString(ival), nil
}
// AddEvent processes a new event, incrementing count if ReplyState is "OK".
func (s *StatREPSC) AddEvent(evID string, ev utils.DataProvider) error {
replyState, err := s.getFieldValue(ev)
if err != nil {
return err
}
if replyState != utils.OK {
return nil
}
// Only increment count for new events.
if _, exists := s.Events[evID]; !exists {
s.Events[evID] = struct{}{}
s.Count++
s.cachedVal = nil
}
return nil
}
// AddOneEvent processes event without storing for removal (used when events
// never expire).
func (s *StatREPSC) AddOneEvent(ev utils.DataProvider) error {
replyState, err := s.getFieldValue(ev)
if err != nil {
return err
}
if replyState != utils.OK {
return nil
}
s.Count++
s.cachedVal = nil
return nil
}
func (s *StatREPSC) RemEvent(evID string) {
if _, exists := s.Events[evID]; !exists {
return
}
delete(s.Events, evID)
s.Count--
s.cachedVal = nil
}
func (s *StatREPSC) Marshal(ms Marshaler) ([]byte, error) {
return ms.Marshal(s)
}
func (s *StatREPSC) LoadMarshaled(ms Marshaler, marshaled []byte) error {
return ms.Unmarshal(marshaled, &s)
}
// GetFilterIDs is part of StatMetric interface.
func (s *StatREPSC) GetFilterIDs() []string {
return s.FilterIDs
}
// GetMinItems returns the minimum items for the metric.
func (s *StatREPSC) GetMinItems() int {
return s.MinItems
}
// Compress is part of StatMetric interface.
func (s *StatREPSC) Compress(queueLen int64, defaultID string, decimals int) []string {
eventIDs := make([]string, 0, len(s.Events))
for id := range s.Events {
eventIDs = append(eventIDs, id)
}
return eventIDs
}
func (s *StatREPSC) GetCompressFactor(events map[string]int) map[string]int {
for id := range s.Events {
if _, exists := events[id]; !exists {
events[id] = 1
}
}
return events
}
// NewStatREPFC creates a StatREPFC metric for counting failed requests.
func NewStatREPFC(minItems int, errorType string, filterIDs []string) (StatMetric, error) {
return &StatREPFC{
FilterIDs: filterIDs,
MinItems: minItems,
ErrorType: errorType,
Events: make(map[string]struct{}),
}, nil
}
// StatREPFC counts requests where ReplyState is not "OK".
type StatREPFC struct {
FilterIDs []string // event filters to apply before processing
MinItems int // minimum events required for valid results
ErrorType string // specific error type to filter for (empty = all errors)
Count int64 // number of failed events tracked
Events map[string]struct{} // event IDs indexed for deletion
cachedVal *float64 // cached result to avoid recalculation
}
// Clone creates a deep copy of StatREPFC.
func (s *StatREPFC) Clone() StatMetric {
if s == nil {
return nil
}
clone := &StatREPFC{
FilterIDs: slices.Clone(s.FilterIDs),
MinItems: s.MinItems,
ErrorType: s.ErrorType,
Count: s.Count,
Events: maps.Clone(s.Events),
}
if s.cachedVal != nil {
clone.cachedVal = utils.Float64Pointer(*s.cachedVal)
}
return clone
}
func (s *StatREPFC) GetStringValue(decimals int) string {
v := s.getValue(decimals)
if v == utils.StatsNA {
return utils.NotAvailable
}
return strconv.FormatFloat(v, 'f', -1, 64)
}
func (s *StatREPFC) GetValue(decimals int) any {
return s.getValue(decimals)
}
func (s *StatREPFC) GetFloat64Value(decimals int) float64 {
return s.getValue(decimals)
}
// getValue returns current count value, calculating if cache is invalid.
func (s *StatREPFC) getValue(_ int) float64 {
if s.cachedVal != nil {
return *s.cachedVal
}
if s.Count == 0 || s.Count < int64(s.MinItems) {
s.cachedVal = utils.Float64Pointer(utils.StatsNA)
return *s.cachedVal
}
v := float64(s.Count)
s.cachedVal = &v
return *s.cachedVal
}
// getFieldValue gets the value of the ReplyState field from the DataProvider.
func (s *StatREPFC) getFieldValue(ev utils.DataProvider) (string, error) {
ival, err := ev.FieldAsInterface([]string{utils.MetaReq, utils.ReplyState})
if err != nil {
if errors.Is(err, utils.ErrNotFound) {
return "", utils.ErrPrefix(err, utils.ReplyState)
// NOTE: return below might be clearer
// return 0, fmt.Errorf("field %s: %v", utils.ReplyState, err)
}
return "", err
}
return utils.IfaceAsString(ival), nil
}
// AddEvent processes a new event, incrementing count if ReplyState is not "OK".
func (s *StatREPFC) AddEvent(evID string, ev utils.DataProvider) error {
replyState, err := s.getFieldValue(ev)
if err != nil {
return err
}
// Skip if success when counting all failures, or if not matching specific
// error type.
if s.ErrorType == "" && replyState == utils.OK {
return nil
}
// Handle multiple errors separated by ";" (e.g., "ERR_TERMINATE;ERR_CDRS")
// Use split + exact match instead of strings.Contains to avoid false positives.
if s.ErrorType != "" {
errors := strings.Split(replyState, utils.InfieldSep)
if !slices.Contains(errors, s.ErrorType) {
return nil
}
}
// Only increment count for new events.
if _, exists := s.Events[evID]; !exists {
s.Events[evID] = struct{}{}
s.Count++
s.cachedVal = nil
}
return nil
}
// AddOneEvent processes event without storing for removal (used when events
// never expire).
func (s *StatREPFC) AddOneEvent(ev utils.DataProvider) error {
replyState, err := s.getFieldValue(ev)
if err != nil {
return err
}
// Skip if success when counting all failures, or if not matching specific
// error type.
if s.ErrorType == "" && replyState == utils.OK {
return nil
}
// Handle multiple errors separated by ";" (e.g., "ERR_TERMINATE;ERR_CDRS")
// Use split + exact match instead of strings.Contains to avoid false positives
if s.ErrorType != "" {
errors := strings.Split(replyState, utils.InfieldSep)
if !slices.Contains(errors, s.ErrorType) {
return nil
}
}
s.Count++
s.cachedVal = nil
return nil
}
func (s *StatREPFC) RemEvent(evID string) {
if _, exists := s.Events[evID]; !exists {
return
}
delete(s.Events, evID)
s.Count--
s.cachedVal = nil
}
func (s *StatREPFC) Marshal(ms Marshaler) ([]byte, error) {
return ms.Marshal(s)
}
func (s *StatREPFC) LoadMarshaled(ms Marshaler, marshaled []byte) error {
return ms.Unmarshal(marshaled, &s)
}
// GetFilterIDs is part of StatMetric interface.
func (s *StatREPFC) GetFilterIDs() []string {
return s.FilterIDs
}
// GetMinItems returns the minimum items for the metric.
func (s *StatREPFC) GetMinItems() int {
return s.MinItems
}
// Compress is part of StatMetric interface.
func (s *StatREPFC) Compress(queueLen int64, defaultID string, decimals int) []string {
eventIDs := make([]string, 0, len(s.Events))
for id := range s.Events {
eventIDs = append(eventIDs, id)
}
return eventIDs
}
func (s *StatREPFC) GetCompressFactor(events map[string]int) map[string]int {
for id := range s.Events {
if _, exists := events[id]; !exists {
events[id] = 1
}
}
return events
}