stats: implement new REPSC/REPFC metrics

used to track successful/failed requests. REPFC supports error filtering.
This commit is contained in:
ionutboangiu
2025-11-04 13:52:44 +02:00
committed by Dan Christian Bogos
parent 344b14709a
commit 42b0ec3abe
3 changed files with 302 additions and 20 deletions

View File

@@ -84,21 +84,11 @@ func TestDiamPrometheus(t *testing.T) {
"enabled": true,
"store_interval": "-1"
},
"rpc_conns": {
"async": {
"strategy": "*async",
"conns": [
{
"address": "*internal"
}
]
}
},
"diameter_agent": {
"enabled": true,
"sessions_conns": ["*birpc_internal"],
"stats_conns": ["*localhost"],
"thresholds_conns": ["*localhost"],
"stats_conns": ["*internal"],
"thresholds_conns": ["*internal"],
"request_processors": [{
"id": "message",
"filters": [
@@ -204,7 +194,7 @@ func TestDiamPrometheus(t *testing.T) {
StatQueueProfile: &engine.StatQueueProfile{
Tenant: "cgrates.org",
ID: "SQ_1",
FilterIDs: []string{"*string:~*req.Category:sms"},
FilterIDs: []string{"*string:~*opts.*eventType:ProcessTime"},
QueueLength: -1,
TTL: 5 * time.Second,
Metrics: []*engine.MetricWithFilters{
@@ -223,6 +213,15 @@ func TestDiamPrometheus(t *testing.T) {
{
MetricID: "*distinct#~*req.ProcessingTime",
},
{
MetricID: utils.MetaREPSC,
},
{
MetricID: utils.MetaREPFC,
},
{
MetricID: utils.MetaREPFC + "#ERR_MESSAGE",
},
},
Stored: true,
MinItems: 1,
@@ -236,7 +235,7 @@ func TestDiamPrometheus(t *testing.T) {
StatQueueProfile: &engine.StatQueueProfile{
Tenant: "cgrates.org",
ID: "SQ_2",
FilterIDs: []string{"*string:~*req.Category:sms"},
FilterIDs: []string{"*string:~*opts.*eventType:ProcessTime"},
QueueLength: -1,
TTL: 10 * time.Second,
Metrics: []*engine.MetricWithFilters{
@@ -255,6 +254,15 @@ func TestDiamPrometheus(t *testing.T) {
{
MetricID: "*distinct#~*req.ProcessingTime",
},
{
MetricID: utils.MetaREPSC,
},
{
MetricID: utils.MetaREPFC,
},
{
MetricID: utils.MetaREPFC + "#ERR_MESSAGE",
},
},
Stored: true,
MinItems: 1,

View File

@@ -49,6 +49,8 @@ func NewStatMetric(metricID string, minItems uint64, filterIDs []string) (sm Sta
utils.MetaDistinct: NewStatDistinct,
utils.MetaHighest: NewStatHighest,
utils.MetaLowest: NewStatLowest,
utils.MetaREPSC: NewStatREPSC,
utils.MetaREPFC: NewStatREPFC,
}
// split the metricID
// in case of *sum we have *sum#~*req.FieldName
@@ -1089,7 +1091,7 @@ func (s *StatHighest) GetValue() *utils.Decimal {
// AddEvent processes a new event, updating highest value if necessary
func (s *StatHighest) AddEvent(evID string, ev utils.DataProvider) error {
val, err := fieldValue(s.FieldName, ev)
val, err := fieldValueFromDP(s.FieldName, ev)
if err != nil {
return err
}
@@ -1109,7 +1111,7 @@ func (s *StatHighest) AddEvent(evID string, ev utils.DataProvider) error {
// AddOneEvent processes event without storing for removal (used when events
// never expire).
func (s *StatHighest) AddOneEvent(ev utils.DataProvider) error {
val, err := fieldValue(s.FieldName, ev)
val, err := fieldValueFromDP(s.FieldName, ev)
if err != nil {
return err
}
@@ -1221,7 +1223,7 @@ func (s *StatLowest) GetValue() *utils.Decimal {
// AddEvent processes a new event, updating lowest value if necessary.
func (s *StatLowest) AddEvent(evID string, ev utils.DataProvider) error {
val, err := fieldValue(s.FieldName, ev)
val, err := fieldValueFromDP(s.FieldName, ev)
if err != nil {
return err
}
@@ -1241,7 +1243,7 @@ func (s *StatLowest) AddEvent(evID string, ev utils.DataProvider) error {
// AddOneEvent processes event without storing for removal (used when events
// never expire).
func (s *StatLowest) AddOneEvent(ev utils.DataProvider) error {
val, err := fieldValue(s.FieldName, ev)
val, err := fieldValueFromDP(s.FieldName, ev)
if err != nil {
return err
}
@@ -1298,8 +1300,264 @@ func (s *StatLowest) GetCompressFactor(events map[string]uint64) map[string]uint
return events
}
// fieldValue gets the numeric value from the DataProvider.
func fieldValue(fldName string, dp utils.DataProvider) (*utils.Decimal, error) {
// NewStatREPSC creates a StatREPSC metric for counting successful requests.
func NewStatREPSC(minItems uint64, _ string, filterIDs []string) StatMetric {
return &StatREPSC{
FilterIDs: filterIDs,
MinItems: minItems,
Events: make(map[string]struct{}),
}
}
// StatREPSC counts requests where ReplyState equals "OK"
type StatREPSC struct {
FilterIDs []string // event filters to apply before processing
MinItems uint64 // minimum events required for valid results
Count uint64 // number of successful events tracked
Events map[string]struct{} // event IDs indexed for deletion
}
// Clone creates a deep copy of StatREPSC.
func (s *StatREPSC) Clone() StatMetric {
if s == nil {
return nil
}
clone := &StatREPSC{
FilterIDs: slices.Clone(s.FilterIDs),
MinItems: s.MinItems,
Count: s.Count,
Events: maps.Clone(s.Events),
}
return clone
}
func (s *StatREPSC) GetStringValue(_ int) string {
if s.Count == 0 || s.Count < s.MinItems {
return utils.NotAvailable
}
return strconv.Itoa(int(s.Count))
}
func (s *StatREPSC) GetValue() *utils.Decimal {
if s.Count == 0 || s.Count < s.MinItems {
return utils.DecimalNaN
}
return utils.NewDecimal(int64(s.Count), 0)
}
// AddEvent processes a new event, incrementing count if ReplyState is "OK".
func (s *StatREPSC) AddEvent(evID string, ev utils.DataProvider) error {
replyState, err := replyStateFromDP(ev)
if err != nil {
return err
}
if replyState != utils.OK {
return nil
}
// Only increment count for new events.
if _, exists := s.Events[evID]; !exists {
s.Events[evID] = struct{}{}
s.Count++
}
return nil
}
// AddOneEvent processes event without storing for removal (used when events
// never expire).
func (s *StatREPSC) AddOneEvent(ev utils.DataProvider) error {
replyState, err := replyStateFromDP(ev)
if err != nil {
return err
}
if replyState != utils.OK {
return nil
}
s.Count++
return nil
}
func (s *StatREPSC) RemEvent(evID string) error {
if _, exists := s.Events[evID]; !exists {
return utils.ErrNotFound
}
delete(s.Events, evID)
s.Count--
return nil
}
// GetFilterIDs is part of StatMetric interface.
func (s *StatREPSC) GetFilterIDs() []string {
return s.FilterIDs
}
// GetMinItems returns the minimum items for the metric.
func (s *StatREPSC) GetMinItems() uint64 {
return s.MinItems
}
// Compress is part of StatMetric interface.
func (s *StatREPSC) Compress(_ uint64, _ string) []string {
eventIDs := make([]string, 0, len(s.Events))
for id := range s.Events {
eventIDs = append(eventIDs, id)
}
return eventIDs
}
func (s *StatREPSC) GetCompressFactor(events map[string]uint64) map[string]uint64 {
for id := range s.Events {
if _, exists := events[id]; !exists {
events[id] = 1
}
}
return events
}
// NewStatREPFC creates a StatREPFC metric for counting failed requests.
func NewStatREPFC(minItems uint64, errorType string, filterIDs []string) StatMetric {
return &StatREPFC{
FilterIDs: filterIDs,
MinItems: minItems,
ErrorType: errorType,
Events: make(map[string]struct{}),
}
}
// StatREPFC counts requests where ReplyState is not "OK".
type StatREPFC struct {
FilterIDs []string // event filters to apply before processing
MinItems uint64 // minimum events required for valid results
ErrorType string // specific error type to filter for (empty = all errors)
Count uint64 // number of failed events tracked
Events map[string]struct{} // event IDs indexed for deletion
}
// Clone creates a deep copy of StatREPFC.
func (s *StatREPFC) Clone() StatMetric {
if s == nil {
return nil
}
clone := &StatREPFC{
FilterIDs: slices.Clone(s.FilterIDs),
MinItems: s.MinItems,
ErrorType: s.ErrorType,
Count: s.Count,
Events: maps.Clone(s.Events),
}
return clone
}
func (s *StatREPFC) GetStringValue(_ int) string {
if s.Count == 0 || s.Count < s.MinItems {
return utils.NotAvailable
}
return strconv.Itoa(int(s.Count))
}
func (s *StatREPFC) GetValue() *utils.Decimal {
if s.Count == 0 || s.Count < s.MinItems {
return utils.DecimalNaN
}
return utils.NewDecimal(int64(s.Count), 0)
}
// AddEvent processes a new event, incrementing count if ReplyState is not "OK".
func (s *StatREPFC) AddEvent(evID string, ev utils.DataProvider) error {
replyState, err := replyStateFromDP(ev)
if err != nil {
return err
}
// Skip if success when counting all failures, or if not matching specific
// error type.
if s.ErrorType == "" && replyState == utils.OK {
return nil
}
// Handle multiple errors separated by ";" (e.g., "ERR_TERMINATE;ERR_CDRS")
// Use split + exact match instead of strings.Contains to avoid false positives.
if s.ErrorType != "" {
errors := strings.Split(replyState, utils.InfieldSep)
if !slices.Contains(errors, s.ErrorType) {
return nil
}
}
// Only increment count for new events.
if _, exists := s.Events[evID]; !exists {
s.Events[evID] = struct{}{}
s.Count++
}
return nil
}
// AddOneEvent processes event without storing for removal (used when events
// never expire).
func (s *StatREPFC) AddOneEvent(ev utils.DataProvider) error {
replyState, err := replyStateFromDP(ev)
if err != nil {
return err
}
// Skip if success when counting all failures, or if not matching specific
// error type.
if s.ErrorType == "" && replyState == utils.OK {
return nil
}
// Handle multiple errors separated by ";" (e.g., "ERR_TERMINATE;ERR_CDRS")
// Use split + exact match instead of strings.Contains to avoid false positives
if s.ErrorType != "" {
errors := strings.Split(replyState, utils.InfieldSep)
if !slices.Contains(errors, s.ErrorType) {
return nil
}
}
s.Count++
return nil
}
func (s *StatREPFC) RemEvent(evID string) error {
if _, exists := s.Events[evID]; !exists {
return utils.ErrNotFound
}
delete(s.Events, evID)
s.Count--
return nil
}
// GetFilterIDs is part of StatMetric interface.
func (s *StatREPFC) GetFilterIDs() []string {
return s.FilterIDs
}
// GetMinItems returns the minimum items for the metric.
func (s *StatREPFC) GetMinItems() uint64 {
return s.MinItems
}
// Compress is part of StatMetric interface.
func (s *StatREPFC) Compress(_ uint64, _ string) []string {
eventIDs := make([]string, 0, len(s.Events))
for id := range s.Events {
eventIDs = append(eventIDs, id)
}
return eventIDs
}
func (s *StatREPFC) GetCompressFactor(events map[string]uint64) map[string]uint64 {
for id := range s.Events {
if _, exists := events[id]; !exists {
events[id] = 1
}
}
return events
}
// fieldValueFromDP gets the numeric value from the DataProvider.
func fieldValueFromDP(fldName string, dp utils.DataProvider) (*utils.Decimal, error) {
ival, err := utils.DPDynamicInterface(fldName, dp)
if err != nil {
if errors.Is(err, utils.ErrNotFound) {
@@ -1315,3 +1573,17 @@ func fieldValue(fldName string, dp utils.DataProvider) (*utils.Decimal, error) {
}
return &utils.Decimal{Big: v}, nil
}
// replyStateFromDP gets the numeric value from the DataProvider.
func replyStateFromDP(dp utils.DataProvider) (string, error) {
ival, err := dp.FieldAsInterface([]string{utils.MetaReq, utils.ReplyState})
if err != nil {
if errors.Is(err, utils.ErrNotFound) {
return "", utils.ErrPrefix(err, utils.ReplyState)
// NOTE: return below might be clearer
// return 0, fmt.Errorf("field %s: %v", utils.ReplyState, err)
}
return "", err
}
return utils.IfaceAsString(ival), nil
}

View File

@@ -1093,6 +1093,8 @@ const (
MetaPDD = "*pdd"
MetaDDC = "*ddc"
MetaSum = "*sum"
MetaREPSC = "*repsc"
MetaREPFC = "*repfc"
MetaAverage = "*average"
MetaDistinct = "*distinct"
MetaHighest = "*highest"