Adding OneEvent functionality for stats

This commit is contained in:
gezimbll
2023-12-06 11:25:45 -05:00
committed by Dan Christian Bogos
parent f0852b645b
commit 3fe5f70dc9
6 changed files with 450 additions and 11 deletions

View File

@@ -27,6 +27,7 @@ import (
"net/http/httptest"
"path"
"reflect"
"slices"
"sort"
"testing"
"time"
@@ -67,6 +68,7 @@ var (
testStatsProcessEventWithBlockersOnMetricsSecond,
testStatsProcessEventNoBlockers,
testStatsGetStatQueueForEventWithBlockers,
testStatsStatOneEvent,
// check if stats, thresholds and actions subsystems function properly together
testStatsStartServer,
testStatsSetActionProfileBeforeProcessEv,
@@ -947,6 +949,231 @@ func testStatsGetStatQueueForEventWithBlockers(t *testing.T) {
}
}
func testStatsStatOneEvent(t *testing.T) {
sqPrf := &engine.StatQueueProfileWithAPIOpts{
StatQueueProfile: &engine.StatQueueProfile{
Tenant: "cgrates.org",
ID: "SQ_OneEv",
//FilterIDs: []string{"*string:~*req.StatsMetrics:*exist"},
QueueLength: -1,
TTL: -1,
MinItems: 2,
Blockers: utils.DynamicBlockers{
{
Blocker: false,
},
},
Weights: utils.DynamicWeights{
&utils.DynamicWeight{
Weight: 100,
},
},
Stored: true,
Metrics: []*engine.MetricWithFilters{
{
MetricID: utils.MetaASR,
},
{
MetricID: utils.MetaDDC,
},
{
MetricID: utils.MetaPDD,
},
{
MetricID: utils.MetaTCC,
},
{
MetricID: utils.MetaACD,
},
{
MetricID: utils.MetaTCD,
},
{
MetricID: utils.MetaACC,
},
{
MetricID: utils.MetaDistinct + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.AccountField,
},
{
MetricID: utils.MetaAverage + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaOpts + utils.NestingSep + utils.OptsResourcesUnits,
},
{
MetricID: utils.MetaSum + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + "RandomVal",
},
},
ThresholdIDs: []string{"*none"},
},
}
var reply string
if err := sqRPC.Call(context.Background(), utils.AdminSv1SetStatQueueProfile, sqPrf, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Unexpected reply returned:", reply)
}
args := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "SQ_Event1",
Event: map[string]any{
utils.AccountField: "1001",
"RandomVal": "11",
},
APIOpts: map[string]any{
utils.MetaUsage: 20 * time.Second,
utils.MetaCost: 102.1,
utils.MetaDestination: "332214",
utils.MetaStartTime: time.Date(2021, 7, 14, 14, 25, 0, 0, time.UTC),
utils.MetaPDD: 5 * time.Second,
utils.OptsStatsProfileIDs: []string{"SQ_OneEv"},
utils.OptsResourcesUnits: 6,
},
}
expected := []string{"SQ_OneEv"}
var replyStats []string
if err := sqRPC.Call(context.Background(), utils.StatSv1ProcessEvent, args, &replyStats); err != nil {
t.Error(err)
} else if !slices.Equal(expected, replyStats) {
t.Errorf("Expected %v, Received %v", expected, replyStats)
}
expFloat := map[string]float64{
utils.MetaPDD: -1,
utils.MetaACD: -1,
utils.MetaDDC: -1,
utils.MetaTCD: -1,
utils.MetaTCC: -1,
utils.MetaASR: -1,
utils.MetaACC: -1,
utils.MetaDistinct + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.AccountField: -1,
utils.MetaAverage + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaOpts + utils.NestingSep + utils.OptsResourcesUnits: -1,
utils.MetaSum + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + "RandomVal": -1,
}
expString := map[string]string{
utils.MetaPDD: utils.NotAvailable,
utils.MetaACD: utils.NotAvailable,
utils.MetaDDC: utils.NotAvailable,
utils.MetaTCD: utils.NotAvailable,
utils.MetaTCC: utils.NotAvailable,
utils.MetaASR: utils.NotAvailable,
utils.MetaACC: utils.NotAvailable,
utils.MetaDistinct + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.AccountField: utils.NotAvailable,
utils.MetaAverage + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaOpts + utils.NestingSep + utils.OptsResourcesUnits: utils.NotAvailable,
utils.MetaSum + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + "RandomVal": utils.NotAvailable,
}
var replFlts map[string]float64
if err := sqRPC.Call(context.Background(), utils.StatSv1GetQueueFloatMetrics, &utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{
Tenant: "cgrates.org",
ID: "SQ_OneEv",
},
}, &replFlts); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(replFlts, expFloat) {
t.Errorf("Expected %v,Received %v", utils.ToJSON(expFloat), utils.ToJSON(replFlts))
}
var rplString map[string]string
if err := sqRPC.Call(context.Background(), utils.StatSv1GetQueueStringMetrics, &utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{
Tenant: "cgrates.org",
ID: "SQ_OneEv",
},
}, &rplString); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(rplString, expString) {
t.Errorf("Expected %v,Received %v", expString, rplString)
}
//processing second event to stats
args = &utils.CGREvent{
Tenant: "cgrates.org",
ID: "SQ_Event1",
Event: map[string]any{
utils.AccountField: "1002",
"RandomVal": 25,
},
APIOpts: map[string]any{
utils.MetaUsage: 13 * time.Second,
utils.MetaCost: 57.8,
utils.MetaDestination: "332214",
utils.MetaPDD: 2 * time.Second,
utils.OptsStatsProfileIDs: []string{"SQ_OneEv"},
utils.OptsResourcesUnits: 7,
},
}
if err := sqRPC.Call(context.Background(), utils.StatSv1ProcessEvent, args, &replyStats); err != nil {
t.Error(err)
} else if !slices.Equal(expected, replyStats) {
t.Errorf("Expected %v, Received %v", expected, replyStats)
}
expFloat = map[string]float64{
utils.MetaACC: 79.95,
utils.MetaTCC: 159.9,
utils.MetaACD: 16500000000,
utils.MetaTCD: 33000000000,
utils.MetaASR: 50,
utils.MetaDDC: 1,
utils.MetaAverage + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaOpts + utils.NestingSep + utils.OptsResourcesUnits: 6.5,
utils.MetaSum + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + "RandomVal": 36,
utils.MetaPDD: 3500000000,
utils.MetaDistinct + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.AccountField: 2,
}
expString = map[string]string{
utils.MetaACC: "79.95",
utils.MetaTCC: "159.9",
utils.MetaACD: "16.5s",
utils.MetaTCD: "33s",
utils.MetaASR: "50%",
utils.MetaDDC: "1",
utils.MetaAverage + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaOpts + utils.NestingSep + utils.OptsResourcesUnits: "6.5",
utils.MetaSum + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + "RandomVal": "36",
utils.MetaPDD: "3.5s",
utils.MetaDistinct + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.AccountField: "2",
}
if err := sqRPC.Call(context.Background(), utils.StatSv1GetQueueFloatMetrics, &utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{
Tenant: "cgrates.org",
ID: "SQ_OneEv",
},
}, &replFlts); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(replFlts, expFloat) {
t.Errorf("Expected %v,Received %v", utils.ToJSON(expFloat), utils.ToJSON(replFlts))
}
if err := sqRPC.Call(context.Background(), utils.StatSv1GetQueueStringMetrics, &utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{
Tenant: "cgrates.org",
ID: "SQ_OneEv",
},
}, &rplString); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(rplString, expString) {
t.Errorf("Expected %v,Received %v", expString, rplString)
}
var sQ *engine.StatQueue
if err := sqRPC.Call(context.Background(), utils.StatSv1GetStatQueue, &utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{
Tenant: "cgrates.org",
ID: "SQ_OneEv",
},
}, &sQ); err != nil {
t.Error(err)
} else if len(sQ.SQItems) != 0 {
t.Errorf("Expected to be stored 0 events on queue,got %v ", len(sQ.SQItems))
}
}
func testStatsStartServer(t *testing.T) {
sqSrv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var err error

View File

@@ -233,6 +233,11 @@ func (sq *StatQueue) TenantID() string {
// ProcessEvent processes a utils.CGREvent, returns true if processed
func (sq *StatQueue) ProcessEvent(ctx *context.Context, tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) {
//processing metrics without storing in the queue
if oneEv := sq.isOneEvent(); oneEv {
return sq.addStatOneEvent(ctx, tnt, filterS, evNm)
}
if _, err = sq.remExpired(); err != nil {
return
}
@@ -331,6 +336,40 @@ func (sq *StatQueue) addStatEvent(ctx *context.Context, tnt, evID string, filter
return
}
func (sq *StatQueue) isOneEvent() bool {
return sq.ttl != nil && *sq.ttl == -1
}
func (sq *StatQueue) addStatOneEvent(ctx *context.Context, tnt string, filterS *FilterS, evNm utils.MapStorage) (err error) {
var pass bool
dDP := newDynamicDP(ctx, config.CgrConfig().FilterSCfg().ResourceSConns, config.CgrConfig().FilterSCfg().StatSConns,
config.CgrConfig().FilterSCfg().AccountSConns, tnt, utils.MapStorage{utils.MetaReq: evNm[utils.MetaReq], utils.MetaOpts: evNm[utils.MetaOpts]})
for idx, metricCfg := range sq.sqPrfl.Metrics {
if pass, err = filterS.Pass(ctx, tnt, metricCfg.FilterIDs,
evNm); err != nil {
return
} else if !pass {
continue
}
if err = sq.SQMetrics[metricCfg.MetricID].AddOneEvent(dDP); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue>: metric: %s, error: %s", metricCfg.MetricID, err.Error()))
return
}
var blocker bool
if blocker, err = BlockerFromDynamics(ctx, metricCfg.Blockers, filterS, tnt, evNm); err != nil {
return
}
if blocker && idx != len(sq.sqPrfl.Metrics)-1 {
break
}
}
return
}
func (sq *StatQueue) Compress(maxQL uint64) bool {
if uint64(len(sq.SQItems)) < maxQL || maxQL == 0 {
return false

View File

@@ -795,6 +795,10 @@ func (statMetricMock) AddEvent(string, utils.DataProvider) error {
return nil
}
func (statMetricMock) AddOneEvent(utils.DataProvider) error {
return nil
}
func (sMM statMetricMock) RemEvent(string) error {
if sMM == "remExpired error" {
return fmt.Errorf("remExpired mock error")

View File

@@ -62,6 +62,7 @@ func NewStatMetric(metricID string, minItems uint64, filterIDs []string) (sm Sta
type StatMetric interface {
GetValue() *utils.Decimal
GetStringValue(rounding int) string
AddOneEvent(ev utils.DataProvider) error
AddEvent(evID string, ev utils.DataProvider) error
RemEvent(evID string) error
GetMinItems() (minIts uint64)
@@ -96,6 +97,24 @@ func (asr *StatASR) GetValue() (val *utils.Decimal) {
return
}
func (asr *StatASR) AddOneEvent(ev utils.DataProvider) (err error) {
var (
answered int64
val any
)
if val, err = ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaStartTime}); err != nil {
if err != utils.ErrNotFound {
return
}
} else if at, err := utils.IfaceAsTime(val, config.CgrConfig().GeneralCfg().DefaultTimezone); err != nil {
return err
} else if !at.IsZero() {
answered = 1
}
return asr.addOneEvent(answered)
}
// AddEvent is part of StatMetric interface
func (asr *StatASR) AddEvent(evID string, ev utils.DataProvider) (err error) {
var answered int
@@ -153,7 +172,7 @@ type StatACD struct {
}
func (acd *StatACD) GetStringValue(rounding int) string {
if len(acd.Events) == 0 || acd.Count < acd.MinItems {
if acd.Count == 0 || acd.Count < acd.MinItems {
return utils.NotAvailable
}
v, _ := acd.getAvgValue().Round(rounding).Duration()
@@ -175,6 +194,19 @@ func (acd *StatACD) AddEvent(evID string, ev utils.DataProvider) (err error) {
return acd.addEvent(evID, ival)
}
func (acd *StatACD) AddOneEvent(ev utils.DataProvider) (err error) {
ival, err := ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaUsage})
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.MetaUsage)
}
return err
}
return acd.addOneEvent(ival)
}
func (acd *StatACD) Clone() StatMetric {
return &StatACD{
Metric: acd.Metric.Clone(),
@@ -191,7 +223,7 @@ type StatTCD struct {
}
func (sum *StatTCD) GetStringValue(rounding int) string {
if len(sum.Events) == 0 || sum.Count < sum.MinItems {
if sum.Count == 0 || sum.Count < sum.MinItems {
return utils.NotAvailable
}
v, _ := sum.Value.Round(rounding).Duration()
@@ -209,6 +241,18 @@ func (sum *StatTCD) AddEvent(evID string, ev utils.DataProvider) (err error) {
return sum.addEvent(evID, ival)
}
func (sum *StatTCD) AddOneEvent(ev utils.DataProvider) (err error) {
ival, err := ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaUsage})
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.MetaUsage)
}
return err
}
return sum.addOneEvent(ival)
}
func (sum *StatTCD) Clone() StatMetric {
return &StatTCD{
Metric: sum.Metric.Clone(),
@@ -250,6 +294,24 @@ func (acc *StatACC) AddEvent(evID string, ev utils.DataProvider) error {
return acc.addEvent(evID, val)
}
func (acc *StatACC) AddOneEvent(ev utils.DataProvider) error {
ival, err := ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaCost})
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.MetaCost)
}
return err
}
val, err := utils.IfaceAsBig(ival)
if err != nil {
return err
}
if val.Cmp(decimal.New(0, 0)) < 0 {
return utils.ErrPrefix(utils.ErrNegative, utils.MetaCost)
}
return acc.addOneEvent(val)
}
func (acc *StatACC) Clone() StatMetric {
return &StatACC{
Metric: acc.Metric.Clone(),
@@ -283,6 +345,24 @@ func (tcc *StatTCC) AddEvent(evID string, ev utils.DataProvider) error {
return tcc.addEvent(evID, val)
}
func (tcc *StatTCC) AddOneEvent(ev utils.DataProvider) error {
ival, err := ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaCost})
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.MetaCost)
}
return err
}
val, err := utils.IfaceAsBig(ival)
if err != nil {
return err
}
if val.Cmp(decimal.New(0, 0)) < 0 {
return utils.ErrPrefix(utils.ErrNegative, utils.MetaCost)
}
return tcc.addOneEvent(ival)
}
func (tcc *StatTCC) Clone() StatMetric {
return &StatTCC{
Metric: tcc.Metric.Clone(),
@@ -299,7 +379,7 @@ type StatPDD struct {
}
func (pdd *StatPDD) GetStringValue(rounding int) string {
if len(pdd.Events) == 0 || pdd.Count < pdd.MinItems {
if pdd.Count == 0 || pdd.Count < pdd.MinItems {
return utils.NotAvailable
}
v, _ := pdd.getAvgValue().Round(rounding).Duration()
@@ -321,6 +401,17 @@ func (pdd *StatPDD) AddEvent(evID string, ev utils.DataProvider) error {
return pdd.addEvent(evID, ival)
}
func (pdd *StatPDD) AddOneEvent(ev utils.DataProvider) error {
ival, err := ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaPDD})
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.MetaPDD)
}
return err
}
return pdd.addOneEvent(ival)
}
func (pdd *StatPDD) Clone() StatMetric {
return &StatPDD{
Metric: pdd.Metric.Clone(),
@@ -336,6 +427,7 @@ func NewDDC(minItems uint64, _ string, filterIDs []string) StatMetric {
}
}
// StatDDC count values occurring in destination field
type StatDDC struct {
FieldValues map[string]utils.StringSet // map[fieldValue]map[eventID]
Events map[string]map[string]uint64 // map[EventTenantID]map[fieldValue]compressfactor
@@ -390,6 +482,21 @@ func (ddc *StatDDC) AddEvent(evID string, ev utils.DataProvider) (err error) {
return
}
func (ddc *StatDDC) AddOneEvent(ev utils.DataProvider) (err error) {
var fieldValue string
if fieldValue, err = ev.FieldAsString([]string{utils.MetaOpts, utils.MetaDestination}); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.MetaDestination)
}
return
}
if _, has := ddc.FieldValues[fieldValue]; !has {
ddc.FieldValues[fieldValue] = make(utils.StringSet)
}
ddc.Count++
return
}
func (ddc *StatDDC) RemEvent(evID string) (err error) {
fieldValues, has := ddc.Events[evID]
if !has {
@@ -498,21 +605,21 @@ type Metric struct {
func (sum *Metric) GetFilterIDs() []string { return sum.FilterIDs }
func (sum *Metric) getTotalValue() *utils.Decimal {
if len(sum.Events) == 0 || sum.Count < sum.MinItems {
if sum.Count == 0 || sum.Count < sum.MinItems {
return utils.DecimalNaN
}
return sum.Value
}
func (sum *Metric) getAvgValue() *utils.Decimal {
if len(sum.Events) == 0 || sum.Count < sum.MinItems {
if sum.Count == 0 || sum.Count < sum.MinItems {
return utils.DecimalNaN
}
return utils.DivideDecimal(sum.Value, utils.NewDecimal(int64(sum.Count), 0))
}
func (sum *Metric) getAvgStringValue(rounding int) string {
if len(sum.Events) == 0 || sum.Count < sum.MinItems {
if sum.Count == 0 || sum.Count < sum.MinItems {
return utils.NotAvailable
}
v, _ := utils.DivideDecimal(sum.Value, utils.NewDecimal(int64(sum.Count), 0)).Round(rounding).Float64()
@@ -520,7 +627,7 @@ func (sum *Metric) getAvgStringValue(rounding int) string {
}
func (sum *Metric) GetStringValue(rounding int) string {
if len(sum.Events) == 0 || sum.Count < sum.MinItems {
if sum.Count == 0 || sum.Count < sum.MinItems {
return utils.NotAvailable
}
v, _ := sum.Value.Round(rounding).Float64()
@@ -552,6 +659,19 @@ func (sum *Metric) addEvent(evID string, ival any) (err error) {
return
}
// Adding aggregated metrics without events
func (sum *Metric) addOneEvent(ival any) (err error) {
var val *decimal.Big
if val, err = utils.IfaceAsBig(ival); err != nil {
return
}
dVal := &utils.Decimal{Big: val}
sum.Value = utils.SumDecimal(sum.Value, dVal)
sum.Count++
return
}
// Deleting a specific event and updating metrics
func (sum *Metric) RemEvent(evID string) (err error) {
val, has := sum.Events[evID]
if !has {
@@ -610,7 +730,7 @@ func (sum *Metric) Clone() (cln *Metric) {
FilterIDs: slices.Clone(sum.FilterIDs),
}
for k, v := range sum.Events {
cln.Events[k] = &(*v)
cln.Events[k] = v
}
return
}
@@ -653,6 +773,17 @@ func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) error {
}
return sum.addEvent(evID, ival)
}
func (sum *StatSum) AddOneEvent(ev utils.DataProvider) error {
ival, err := utils.DPDynamicInterface(sum.FieldName, ev)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, sum.FieldName)
}
return err
}
return sum.addOneEvent(ival)
}
func (sum *StatSum) Clone() StatMetric {
return &StatSum{
@@ -691,6 +822,17 @@ func (avg *StatAverage) AddEvent(evID string, ev utils.DataProvider) error {
return avg.addEvent(evID, ival)
}
func (avg *StatAverage) AddOneEvent(ev utils.DataProvider) error {
ival, err := utils.DPDynamicInterface(avg.FieldName, ev)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, avg.FieldName)
}
return err
}
return avg.addOneEvent(ival)
}
func (avg *StatAverage) Clone() StatMetric {
return &StatAverage{
Metric: avg.Metric.Clone(),
@@ -698,6 +840,7 @@ func (avg *StatAverage) Clone() StatMetric {
}
}
// StatDistinct counts the different values occurring in a specific event field
func NewStatDistinct(minItems uint64, fieldName string, filterIDs []string) StatMetric {
return &StatDistinct{
Events: make(map[string]map[string]uint64),
@@ -739,7 +882,7 @@ func (dst *StatDistinct) AddEvent(evID string, ev utils.DataProvider) (err error
var fieldValue string
// simply remove the ~*req./~*opts. prefix and do normal process
if !strings.HasPrefix(dst.FieldName, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) && !strings.HasPrefix(dst.FieldName, utils.DynamicDataPrefix+utils.MetaOpts+utils.NestingSep) {
return fmt.Errorf("Invalid format for field <%s>", dst.FieldName)
return fmt.Errorf("invalid format for field <%s>", dst.FieldName)
}
if fieldValue, err = utils.DPDynamicString(dst.FieldName, ev); err != nil {
@@ -768,6 +911,27 @@ func (dst *StatDistinct) AddEvent(evID string, ev utils.DataProvider) (err error
return
}
func (dst *StatDistinct) AddOneEvent(ev utils.DataProvider) (err error) {
var fieldValue string
// simply remove the ~*req./~*opts. prefix and do normal process
if !strings.HasPrefix(dst.FieldName, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) && !strings.HasPrefix(dst.FieldName, utils.DynamicDataPrefix+utils.MetaOpts+utils.NestingSep) {
return fmt.Errorf("invalid format for field <%s>", dst.FieldName)
}
if fieldValue, err = utils.DPDynamicString(dst.FieldName, ev); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, dst.FieldName)
}
return
}
// add to fieldValues
if _, has := dst.FieldValues[fieldValue]; !has {
dst.FieldValues[fieldValue] = make(utils.StringSet)
}
dst.Count++
return
}
func (dst *StatDistinct) RemEvent(evID string) (err error) {
fieldValues, has := dst.Events[evID]
if !has {

View File

@@ -3242,8 +3242,8 @@ func TestStatMetricsStatDistinctAddEventErr(t *testing.T) {
Count: 3,
}
err := dst.AddEvent("Event1", utils.MapStorage{utils.MetaOpts: ev.APIOpts})
if err == nil || err.Error() != "Invalid format for field <Test_Field_Name>" {
t.Errorf("\nExpecting <Invalid format for field <Test_Field_Name>>,\n Recevied <%+v>", err)
if err == nil || err.Error() != "invalid format for field <Test_Field_Name>" {
t.Errorf("\nExpecting <invalid format for field <Test_Field_Name>>,\n Recevied <%+v>", err)
}
}

View File

@@ -220,6 +220,11 @@ func (sS *StatS) matchingStatQueuesForEvent(ctx *context.Context, tnt string, st
if sqPrfl.TTL > 0 {
sq.ttl = utils.DurationPointer(sqPrfl.TTL)
}
if sqPrfl.TTL == -1 && sqPrfl.QueueLength == -1 {
sq.ttl = utils.DurationPointer(sqPrfl.TTL)
}
sq.sqPrfl = sqPrfl
if sq.weight, err = WeightFromDynamics(ctx, sqPrfl.Weights,
sS.fltrS, tnt, evNm); err != nil {