diff --git a/apis/stats_it_test.go b/apis/stats_it_test.go index ed8f5c351..e04cae2d8 100644 --- a/apis/stats_it_test.go +++ b/apis/stats_it_test.go @@ -27,6 +27,7 @@ import ( "net/http/httptest" "path" "reflect" + "slices" "sort" "testing" "time" @@ -67,6 +68,7 @@ var ( testStatsProcessEventWithBlockersOnMetricsSecond, testStatsProcessEventNoBlockers, testStatsGetStatQueueForEventWithBlockers, + testStatsStatOneEvent, // check if stats, thresholds and actions subsystems function properly together testStatsStartServer, testStatsSetActionProfileBeforeProcessEv, @@ -947,6 +949,231 @@ func testStatsGetStatQueueForEventWithBlockers(t *testing.T) { } } +func testStatsStatOneEvent(t *testing.T) { + sqPrf := &engine.StatQueueProfileWithAPIOpts{ + StatQueueProfile: &engine.StatQueueProfile{ + Tenant: "cgrates.org", + ID: "SQ_OneEv", + //FilterIDs: []string{"*string:~*req.StatsMetrics:*exist"}, + QueueLength: -1, + TTL: -1, + MinItems: 2, + Blockers: utils.DynamicBlockers{ + { + Blocker: false, + }, + }, + Weights: utils.DynamicWeights{ + &utils.DynamicWeight{ + Weight: 100, + }, + }, + Stored: true, + Metrics: []*engine.MetricWithFilters{ + { + MetricID: utils.MetaASR, + }, + { + MetricID: utils.MetaDDC, + }, + { + MetricID: utils.MetaPDD, + }, + { + MetricID: utils.MetaTCC, + }, + { + MetricID: utils.MetaACD, + }, + { + MetricID: utils.MetaTCD, + }, + { + MetricID: utils.MetaACC, + }, + { + MetricID: utils.MetaDistinct + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.AccountField, + }, + { + MetricID: utils.MetaAverage + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaOpts + utils.NestingSep + utils.OptsResourcesUnits, + }, + { + MetricID: utils.MetaSum + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + "RandomVal", + }, + }, + ThresholdIDs: []string{"*none"}, + }, + } + + var reply string + if err := sqRPC.Call(context.Background(), utils.AdminSv1SetStatQueueProfile, sqPrf, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Error("Unexpected reply returned:", reply) + } + + args := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "SQ_Event1", + Event: map[string]any{ + utils.AccountField: "1001", + "RandomVal": "11", + }, + APIOpts: map[string]any{ + utils.MetaUsage: 20 * time.Second, + utils.MetaCost: 102.1, + utils.MetaDestination: "332214", + utils.MetaStartTime: time.Date(2021, 7, 14, 14, 25, 0, 0, time.UTC), + utils.MetaPDD: 5 * time.Second, + utils.OptsStatsProfileIDs: []string{"SQ_OneEv"}, + utils.OptsResourcesUnits: 6, + }, + } + + expected := []string{"SQ_OneEv"} + var replyStats []string + + if err := sqRPC.Call(context.Background(), utils.StatSv1ProcessEvent, args, &replyStats); err != nil { + t.Error(err) + } else if !slices.Equal(expected, replyStats) { + t.Errorf("Expected %v, Received %v", expected, replyStats) + } + + expFloat := map[string]float64{ + utils.MetaPDD: -1, + utils.MetaACD: -1, + utils.MetaDDC: -1, + utils.MetaTCD: -1, + utils.MetaTCC: -1, + utils.MetaASR: -1, + utils.MetaACC: -1, + utils.MetaDistinct + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.AccountField: -1, + utils.MetaAverage + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaOpts + utils.NestingSep + utils.OptsResourcesUnits: -1, + utils.MetaSum + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + "RandomVal": -1, + } + expString := map[string]string{ + utils.MetaPDD: utils.NotAvailable, + utils.MetaACD: utils.NotAvailable, + utils.MetaDDC: utils.NotAvailable, + utils.MetaTCD: utils.NotAvailable, + utils.MetaTCC: utils.NotAvailable, + utils.MetaASR: utils.NotAvailable, + utils.MetaACC: utils.NotAvailable, + utils.MetaDistinct + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.AccountField: utils.NotAvailable, + utils.MetaAverage + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaOpts + utils.NestingSep + utils.OptsResourcesUnits: utils.NotAvailable, + utils.MetaSum + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + "RandomVal": utils.NotAvailable, + } + + var replFlts map[string]float64 + if err := sqRPC.Call(context.Background(), utils.StatSv1GetQueueFloatMetrics, &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "SQ_OneEv", + }, + }, &replFlts); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(replFlts, expFloat) { + t.Errorf("Expected %v,Received %v", utils.ToJSON(expFloat), utils.ToJSON(replFlts)) + } + + var rplString map[string]string + if err := sqRPC.Call(context.Background(), utils.StatSv1GetQueueStringMetrics, &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "SQ_OneEv", + }, + }, &rplString); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(rplString, expString) { + t.Errorf("Expected %v,Received %v", expString, rplString) + } + + //processing second event to stats + args = &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "SQ_Event1", + Event: map[string]any{ + utils.AccountField: "1002", + "RandomVal": 25, + }, + APIOpts: map[string]any{ + utils.MetaUsage: 13 * time.Second, + utils.MetaCost: 57.8, + utils.MetaDestination: "332214", + utils.MetaPDD: 2 * time.Second, + utils.OptsStatsProfileIDs: []string{"SQ_OneEv"}, + utils.OptsResourcesUnits: 7, + }, + } + + if err := sqRPC.Call(context.Background(), utils.StatSv1ProcessEvent, args, &replyStats); err != nil { + t.Error(err) + } else if !slices.Equal(expected, replyStats) { + t.Errorf("Expected %v, Received %v", expected, replyStats) + } + + expFloat = map[string]float64{ + utils.MetaACC: 79.95, + utils.MetaTCC: 159.9, + utils.MetaACD: 16500000000, + utils.MetaTCD: 33000000000, + utils.MetaASR: 50, + utils.MetaDDC: 1, + utils.MetaAverage + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaOpts + utils.NestingSep + utils.OptsResourcesUnits: 6.5, + utils.MetaSum + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + "RandomVal": 36, + utils.MetaPDD: 3500000000, + utils.MetaDistinct + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.AccountField: 2, + } + + expString = map[string]string{ + utils.MetaACC: "79.95", + utils.MetaTCC: "159.9", + utils.MetaACD: "16.5s", + utils.MetaTCD: "33s", + utils.MetaASR: "50%", + utils.MetaDDC: "1", + utils.MetaAverage + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaOpts + utils.NestingSep + utils.OptsResourcesUnits: "6.5", + utils.MetaSum + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + "RandomVal": "36", + utils.MetaPDD: "3.5s", + utils.MetaDistinct + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.AccountField: "2", + } + + if err := sqRPC.Call(context.Background(), utils.StatSv1GetQueueFloatMetrics, &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "SQ_OneEv", + }, + }, &replFlts); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(replFlts, expFloat) { + t.Errorf("Expected %v,Received %v", utils.ToJSON(expFloat), utils.ToJSON(replFlts)) + } + + if err := sqRPC.Call(context.Background(), utils.StatSv1GetQueueStringMetrics, &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "SQ_OneEv", + }, + }, &rplString); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(rplString, expString) { + t.Errorf("Expected %v,Received %v", expString, rplString) + } + + var sQ *engine.StatQueue + if err := sqRPC.Call(context.Background(), utils.StatSv1GetStatQueue, &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "SQ_OneEv", + }, + }, &sQ); err != nil { + t.Error(err) + } else if len(sQ.SQItems) != 0 { + t.Errorf("Expected to be stored 0 events on queue,got %v ", len(sQ.SQItems)) + } + +} + func testStatsStartServer(t *testing.T) { sqSrv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var err error diff --git a/engine/libstats.go b/engine/libstats.go index 7e0cabf3e..5cc018126 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -233,6 +233,11 @@ func (sq *StatQueue) TenantID() string { // ProcessEvent processes a utils.CGREvent, returns true if processed func (sq *StatQueue) ProcessEvent(ctx *context.Context, tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) { + + //processing metrics without storing in the queue + if oneEv := sq.isOneEvent(); oneEv { + return sq.addStatOneEvent(ctx, tnt, filterS, evNm) + } if _, err = sq.remExpired(); err != nil { return } @@ -331,6 +336,40 @@ func (sq *StatQueue) addStatEvent(ctx *context.Context, tnt, evID string, filter return } +func (sq *StatQueue) isOneEvent() bool { + return sq.ttl != nil && *sq.ttl == -1 +} + +func (sq *StatQueue) addStatOneEvent(ctx *context.Context, tnt string, filterS *FilterS, evNm utils.MapStorage) (err error) { + var pass bool + + dDP := newDynamicDP(ctx, config.CgrConfig().FilterSCfg().ResourceSConns, config.CgrConfig().FilterSCfg().StatSConns, + config.CgrConfig().FilterSCfg().AccountSConns, tnt, utils.MapStorage{utils.MetaReq: evNm[utils.MetaReq], utils.MetaOpts: evNm[utils.MetaOpts]}) + + for idx, metricCfg := range sq.sqPrfl.Metrics { + if pass, err = filterS.Pass(ctx, tnt, metricCfg.FilterIDs, + evNm); err != nil { + return + } else if !pass { + continue + } + + if err = sq.SQMetrics[metricCfg.MetricID].AddOneEvent(dDP); err != nil { + utils.Logger.Warning(fmt.Sprintf(": metric: %s, error: %s", metricCfg.MetricID, err.Error())) + return + } + + var blocker bool + if blocker, err = BlockerFromDynamics(ctx, metricCfg.Blockers, filterS, tnt, evNm); err != nil { + return + } + if blocker && idx != len(sq.sqPrfl.Metrics)-1 { + break + } + } + return +} + func (sq *StatQueue) Compress(maxQL uint64) bool { if uint64(len(sq.SQItems)) < maxQL || maxQL == 0 { return false diff --git a/engine/libstats_test.go b/engine/libstats_test.go index 9c272f7dd..b73611087 100644 --- a/engine/libstats_test.go +++ b/engine/libstats_test.go @@ -795,6 +795,10 @@ func (statMetricMock) AddEvent(string, utils.DataProvider) error { return nil } +func (statMetricMock) AddOneEvent(utils.DataProvider) error { + return nil +} + func (sMM statMetricMock) RemEvent(string) error { if sMM == "remExpired error" { return fmt.Errorf("remExpired mock error") diff --git a/engine/statmetrics.go b/engine/statmetrics.go index 4fc7d2833..db6ae3907 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -62,6 +62,7 @@ func NewStatMetric(metricID string, minItems uint64, filterIDs []string) (sm Sta type StatMetric interface { GetValue() *utils.Decimal GetStringValue(rounding int) string + AddOneEvent(ev utils.DataProvider) error AddEvent(evID string, ev utils.DataProvider) error RemEvent(evID string) error GetMinItems() (minIts uint64) @@ -96,6 +97,24 @@ func (asr *StatASR) GetValue() (val *utils.Decimal) { return } +func (asr *StatASR) AddOneEvent(ev utils.DataProvider) (err error) { + var ( + answered int64 + val any + ) + if val, err = ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaStartTime}); err != nil { + if err != utils.ErrNotFound { + return + } + } else if at, err := utils.IfaceAsTime(val, config.CgrConfig().GeneralCfg().DefaultTimezone); err != nil { + return err + } else if !at.IsZero() { + answered = 1 + } + + return asr.addOneEvent(answered) +} + // AddEvent is part of StatMetric interface func (asr *StatASR) AddEvent(evID string, ev utils.DataProvider) (err error) { var answered int @@ -153,7 +172,7 @@ type StatACD struct { } func (acd *StatACD) GetStringValue(rounding int) string { - if len(acd.Events) == 0 || acd.Count < acd.MinItems { + if acd.Count == 0 || acd.Count < acd.MinItems { return utils.NotAvailable } v, _ := acd.getAvgValue().Round(rounding).Duration() @@ -175,6 +194,19 @@ func (acd *StatACD) AddEvent(evID string, ev utils.DataProvider) (err error) { return acd.addEvent(evID, ival) } +func (acd *StatACD) AddOneEvent(ev utils.DataProvider) (err error) { + ival, err := ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaUsage}) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, utils.MetaUsage) + } + return err + } + + return acd.addOneEvent(ival) + +} + func (acd *StatACD) Clone() StatMetric { return &StatACD{ Metric: acd.Metric.Clone(), @@ -191,7 +223,7 @@ type StatTCD struct { } func (sum *StatTCD) GetStringValue(rounding int) string { - if len(sum.Events) == 0 || sum.Count < sum.MinItems { + if sum.Count == 0 || sum.Count < sum.MinItems { return utils.NotAvailable } v, _ := sum.Value.Round(rounding).Duration() @@ -209,6 +241,18 @@ func (sum *StatTCD) AddEvent(evID string, ev utils.DataProvider) (err error) { return sum.addEvent(evID, ival) } +func (sum *StatTCD) AddOneEvent(ev utils.DataProvider) (err error) { + ival, err := ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaUsage}) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, utils.MetaUsage) + } + return err + } + return sum.addOneEvent(ival) + +} + func (sum *StatTCD) Clone() StatMetric { return &StatTCD{ Metric: sum.Metric.Clone(), @@ -250,6 +294,24 @@ func (acc *StatACC) AddEvent(evID string, ev utils.DataProvider) error { return acc.addEvent(evID, val) } +func (acc *StatACC) AddOneEvent(ev utils.DataProvider) error { + ival, err := ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaCost}) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, utils.MetaCost) + } + return err + } + val, err := utils.IfaceAsBig(ival) + if err != nil { + return err + } + if val.Cmp(decimal.New(0, 0)) < 0 { + return utils.ErrPrefix(utils.ErrNegative, utils.MetaCost) + } + return acc.addOneEvent(val) +} + func (acc *StatACC) Clone() StatMetric { return &StatACC{ Metric: acc.Metric.Clone(), @@ -283,6 +345,24 @@ func (tcc *StatTCC) AddEvent(evID string, ev utils.DataProvider) error { return tcc.addEvent(evID, val) } +func (tcc *StatTCC) AddOneEvent(ev utils.DataProvider) error { + ival, err := ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaCost}) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, utils.MetaCost) + } + return err + } + val, err := utils.IfaceAsBig(ival) + if err != nil { + return err + } + if val.Cmp(decimal.New(0, 0)) < 0 { + return utils.ErrPrefix(utils.ErrNegative, utils.MetaCost) + } + return tcc.addOneEvent(ival) +} + func (tcc *StatTCC) Clone() StatMetric { return &StatTCC{ Metric: tcc.Metric.Clone(), @@ -299,7 +379,7 @@ type StatPDD struct { } func (pdd *StatPDD) GetStringValue(rounding int) string { - if len(pdd.Events) == 0 || pdd.Count < pdd.MinItems { + if pdd.Count == 0 || pdd.Count < pdd.MinItems { return utils.NotAvailable } v, _ := pdd.getAvgValue().Round(rounding).Duration() @@ -321,6 +401,17 @@ func (pdd *StatPDD) AddEvent(evID string, ev utils.DataProvider) error { return pdd.addEvent(evID, ival) } +func (pdd *StatPDD) AddOneEvent(ev utils.DataProvider) error { + ival, err := ev.FieldAsInterface([]string{utils.MetaOpts, utils.MetaPDD}) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, utils.MetaPDD) + } + return err + } + return pdd.addOneEvent(ival) +} + func (pdd *StatPDD) Clone() StatMetric { return &StatPDD{ Metric: pdd.Metric.Clone(), @@ -336,6 +427,7 @@ func NewDDC(minItems uint64, _ string, filterIDs []string) StatMetric { } } +// StatDDC count values occurring in destination field type StatDDC struct { FieldValues map[string]utils.StringSet // map[fieldValue]map[eventID] Events map[string]map[string]uint64 // map[EventTenantID]map[fieldValue]compressfactor @@ -390,6 +482,21 @@ func (ddc *StatDDC) AddEvent(evID string, ev utils.DataProvider) (err error) { return } +func (ddc *StatDDC) AddOneEvent(ev utils.DataProvider) (err error) { + var fieldValue string + if fieldValue, err = ev.FieldAsString([]string{utils.MetaOpts, utils.MetaDestination}); err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, utils.MetaDestination) + } + return + } + if _, has := ddc.FieldValues[fieldValue]; !has { + ddc.FieldValues[fieldValue] = make(utils.StringSet) + } + ddc.Count++ + return +} + func (ddc *StatDDC) RemEvent(evID string) (err error) { fieldValues, has := ddc.Events[evID] if !has { @@ -498,21 +605,21 @@ type Metric struct { func (sum *Metric) GetFilterIDs() []string { return sum.FilterIDs } func (sum *Metric) getTotalValue() *utils.Decimal { - if len(sum.Events) == 0 || sum.Count < sum.MinItems { + if sum.Count == 0 || sum.Count < sum.MinItems { return utils.DecimalNaN } return sum.Value } func (sum *Metric) getAvgValue() *utils.Decimal { - if len(sum.Events) == 0 || sum.Count < sum.MinItems { + if sum.Count == 0 || sum.Count < sum.MinItems { return utils.DecimalNaN } return utils.DivideDecimal(sum.Value, utils.NewDecimal(int64(sum.Count), 0)) } func (sum *Metric) getAvgStringValue(rounding int) string { - if len(sum.Events) == 0 || sum.Count < sum.MinItems { + if sum.Count == 0 || sum.Count < sum.MinItems { return utils.NotAvailable } v, _ := utils.DivideDecimal(sum.Value, utils.NewDecimal(int64(sum.Count), 0)).Round(rounding).Float64() @@ -520,7 +627,7 @@ func (sum *Metric) getAvgStringValue(rounding int) string { } func (sum *Metric) GetStringValue(rounding int) string { - if len(sum.Events) == 0 || sum.Count < sum.MinItems { + if sum.Count == 0 || sum.Count < sum.MinItems { return utils.NotAvailable } v, _ := sum.Value.Round(rounding).Float64() @@ -552,6 +659,19 @@ func (sum *Metric) addEvent(evID string, ival any) (err error) { return } +// Adding aggregated metrics without events +func (sum *Metric) addOneEvent(ival any) (err error) { + var val *decimal.Big + if val, err = utils.IfaceAsBig(ival); err != nil { + return + } + dVal := &utils.Decimal{Big: val} + sum.Value = utils.SumDecimal(sum.Value, dVal) + sum.Count++ + return +} + +// Deleting a specific event and updating metrics func (sum *Metric) RemEvent(evID string) (err error) { val, has := sum.Events[evID] if !has { @@ -610,7 +730,7 @@ func (sum *Metric) Clone() (cln *Metric) { FilterIDs: slices.Clone(sum.FilterIDs), } for k, v := range sum.Events { - cln.Events[k] = &(*v) + cln.Events[k] = v } return } @@ -653,6 +773,17 @@ func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) error { } return sum.addEvent(evID, ival) } +func (sum *StatSum) AddOneEvent(ev utils.DataProvider) error { + ival, err := utils.DPDynamicInterface(sum.FieldName, ev) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, sum.FieldName) + } + return err + } + + return sum.addOneEvent(ival) +} func (sum *StatSum) Clone() StatMetric { return &StatSum{ @@ -691,6 +822,17 @@ func (avg *StatAverage) AddEvent(evID string, ev utils.DataProvider) error { return avg.addEvent(evID, ival) } +func (avg *StatAverage) AddOneEvent(ev utils.DataProvider) error { + ival, err := utils.DPDynamicInterface(avg.FieldName, ev) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, avg.FieldName) + } + return err + } + return avg.addOneEvent(ival) +} + func (avg *StatAverage) Clone() StatMetric { return &StatAverage{ Metric: avg.Metric.Clone(), @@ -698,6 +840,7 @@ func (avg *StatAverage) Clone() StatMetric { } } +// StatDistinct counts the different values occurring in a specific event field func NewStatDistinct(minItems uint64, fieldName string, filterIDs []string) StatMetric { return &StatDistinct{ Events: make(map[string]map[string]uint64), @@ -739,7 +882,7 @@ func (dst *StatDistinct) AddEvent(evID string, ev utils.DataProvider) (err error var fieldValue string // simply remove the ~*req./~*opts. prefix and do normal process if !strings.HasPrefix(dst.FieldName, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) && !strings.HasPrefix(dst.FieldName, utils.DynamicDataPrefix+utils.MetaOpts+utils.NestingSep) { - return fmt.Errorf("Invalid format for field <%s>", dst.FieldName) + return fmt.Errorf("invalid format for field <%s>", dst.FieldName) } if fieldValue, err = utils.DPDynamicString(dst.FieldName, ev); err != nil { @@ -768,6 +911,27 @@ func (dst *StatDistinct) AddEvent(evID string, ev utils.DataProvider) (err error return } +func (dst *StatDistinct) AddOneEvent(ev utils.DataProvider) (err error) { + var fieldValue string + // simply remove the ~*req./~*opts. prefix and do normal process + if !strings.HasPrefix(dst.FieldName, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) && !strings.HasPrefix(dst.FieldName, utils.DynamicDataPrefix+utils.MetaOpts+utils.NestingSep) { + return fmt.Errorf("invalid format for field <%s>", dst.FieldName) + } + if fieldValue, err = utils.DPDynamicString(dst.FieldName, ev); err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, dst.FieldName) + } + return + } + // add to fieldValues + if _, has := dst.FieldValues[fieldValue]; !has { + dst.FieldValues[fieldValue] = make(utils.StringSet) + } + + dst.Count++ + return +} + func (dst *StatDistinct) RemEvent(evID string) (err error) { fieldValues, has := dst.Events[evID] if !has { diff --git a/engine/statmetrics_test.go b/engine/statmetrics_test.go index 4e7d61bb4..5bfd04cf4 100644 --- a/engine/statmetrics_test.go +++ b/engine/statmetrics_test.go @@ -3242,8 +3242,8 @@ func TestStatMetricsStatDistinctAddEventErr(t *testing.T) { Count: 3, } err := dst.AddEvent("Event1", utils.MapStorage{utils.MetaOpts: ev.APIOpts}) - if err == nil || err.Error() != "Invalid format for field " { - t.Errorf("\nExpecting >,\n Recevied <%+v>", err) + if err == nil || err.Error() != "invalid format for field " { + t.Errorf("\nExpecting >,\n Recevied <%+v>", err) } } diff --git a/engine/stats.go b/engine/stats.go index 8802d9e76..378c65c80 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -220,6 +220,11 @@ func (sS *StatS) matchingStatQueuesForEvent(ctx *context.Context, tnt string, st if sqPrfl.TTL > 0 { sq.ttl = utils.DurationPointer(sqPrfl.TTL) } + + if sqPrfl.TTL == -1 && sqPrfl.QueueLength == -1 { + sq.ttl = utils.DurationPointer(sqPrfl.TTL) + } + sq.sqPrfl = sqPrfl if sq.weight, err = WeightFromDynamics(ctx, sqPrfl.Weights, sS.fltrS, tnt, evNm); err != nil {