diff --git a/engine/libstats.go b/engine/libstats.go index c6d250f3e..44ebbda7e 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -233,17 +233,12 @@ func (sq *StatQueue) TenantID() string { } // ProcessEvent processes a utils.CGREvent, returns true if processed -func (sq *StatQueue) ProcessEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) { +func (sq *StatQueue) ProcessEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) error { if oneEv := sq.isOneEvent(); oneEv { return sq.addOneEvent(tnt, filterS, evNm) } - if _, err = sq.remExpired(); err != nil { - return - } - - if err = sq.remOnQueueLength(); err != nil { - return - } + sq.remExpired() + sq.remOnQueueLength() return sq.addStatEvent(tnt, evID, filterS, evNm) } @@ -272,22 +267,14 @@ func (sq *StatQueue) addOneEvent(tnt string, filterS *FilterS, evNm utils.MapSto } // remStatEvent removes an event from metrics -func (sq *StatQueue) remEventWithID(evID string) (err error) { - for metricID, metric := range sq.SQMetrics { - if err = metric.RemEvent(evID); err != nil { - if err.Error() == utils.ErrNotFound.Error() { - err = nil - continue - } - utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error())) - return - } +func (sq *StatQueue) remEventWithID(evID string) { + for _, metric := range sq.SQMetrics { + metric.RemEvent(evID) } - return } // remExpired expires items in queue -func (sq *StatQueue) remExpired() (removed int, err error) { +func (sq *StatQueue) remExpired() (removed int) { var expIdx *int // index of last item to be expired for i, item := range sq.SQItems { if item.ExpiryTime == nil { @@ -296,9 +283,7 @@ func (sq *StatQueue) remExpired() (removed int, err error) { if item.ExpiryTime.After(time.Now()) { break } - if err = sq.remEventWithID(item.EventID); err != nil { - return - } + sq.remEventWithID(item.EventID) expIdx = utils.IntPointer(i) } if expIdx == nil { @@ -310,18 +295,15 @@ func (sq *StatQueue) remExpired() (removed int, err error) { } // remOnQueueLength removes elements based on QueueLength setting -func (sq *StatQueue) remOnQueueLength() (err error) { +func (sq *StatQueue) remOnQueueLength() { if sq.sqPrfl.QueueLength <= 0 { // infinite length return } - if len(sq.SQItems) == sq.sqPrfl.QueueLength { // reached limit, rem first element + if len(sq.SQItems) == sq.sqPrfl.QueueLength { // reached limit, remove first element item := sq.SQItems[0] - if err = sq.remEventWithID(item.EventID); err != nil { - return - } + sq.remEventWithID(item.EventID) sq.SQItems = sq.SQItems[1:] } - return } // addStatEvent computes metrics for an event diff --git a/engine/libstats_test.go b/engine/libstats_test.go index b7608d916..7ac732499 100644 --- a/engine/libstats_test.go +++ b/engine/libstats_test.go @@ -734,12 +734,7 @@ func (sMM *statMetricMock) AddOneEvent(ev utils.DataProvider) error { return nil } -func (sMM *statMetricMock) RemEvent(evTenantID string) error { - switch sMM.testcase { - case "remExpired error": - return fmt.Errorf("remExpired mock error") - } - return nil +func (sMM *statMetricMock) RemEvent(evTenantID string) { } func (sMM *statMetricMock) Marshal(ms Marshaler) (marshaled []byte, err error) { @@ -777,26 +772,6 @@ func (sMM *statMetricMock) GetCompressFactor(events map[string]int) map[string]i return nil } -func TestStatQueueNewStoredStatQueue(t *testing.T) { - sq := &StatQueue{ - SQMetrics: map[string]StatMetric{ - "key": &statMetricMock{}, - }, - } - var ms Marshaler - - experr := "marshal mock error" - rcv, err := NewStoredStatQueue(sq, ms) - - if err == nil || err.Error() != experr { - t.Fatalf("\nreceived: %q, \nexpected: %q", experr, err) - } - - if rcv != nil { - t.Errorf("\nreceived: <%+v>, \nexpected: <%+v>", nil, rcv) - } -} - func TestStatQueueAsStatQueueNilStoredSq(t *testing.T) { var ssq *StoredStatQueue var ms Marshaler @@ -968,105 +943,11 @@ func TestStatQueueNewStatQueue(t *testing.T) { } } -func TestStatQueueProcessEventremExpiredErr(t *testing.T) { - tnt, evID := "tenant", "eventID" - filters := &FilterS{} - expiry := time.Date(2021, 1, 1, 23, 59, 59, 10, time.UTC) - evNm := utils.MapStorage{ - "key": nil, - } - - sq := &StatQueue{ - sqPrfl: &StatQueueProfile{ - QueueLength: -1, - }, - SQItems: []SQItem{ - { - EventID: evID, - ExpiryTime: &expiry, - }, - }, - SQMetrics: map[string]StatMetric{ - "key": &statMetricMock{ - testcase: "remExpired error", - }, - }, - } - - experr := "remExpired mock error" - err := sq.ProcessEvent(tnt, evID, filters, evNm) - - if err == nil || err.Error() != experr { - t.Errorf("\nexpected: %q, \nreceived: %q", experr, err) - } -} - -func TestStatQueueProcessEventremOnQueueLengthErr(t *testing.T) { - tnt, evID := "tenant", "eventID" - filters := &FilterS{} - evNm := utils.MapStorage{ - "key": nil, - } - - sq := &StatQueue{ - sqPrfl: &StatQueueProfile{ - QueueLength: 1, - }, - SQItems: []SQItem{ - { - EventID: evID, - }, - }, - SQMetrics: map[string]StatMetric{ - "key": &statMetricMock{ - testcase: "remExpired error", - }, - }, - } - - experr := "remExpired mock error" - err := sq.ProcessEvent(tnt, evID, filters, evNm) - - if err == nil || err.Error() != experr { - t.Errorf("\nexpected: %q, \nreceived: %q", experr, err) - } -} - -func TestStatQueueProcessEventaddStatEvent(t *testing.T) { - tnt, evID := "tenant", "eventID" - filters := &FilterS{} - evNm := utils.MapStorage{ - "key": nil, - } - - sq := &StatQueue{ - sqPrfl: &StatQueueProfile{ - QueueLength: 1, - }, - SQItems: []SQItem{ - { - EventID: evID, - }, - }, - SQMetrics: map[string]StatMetric{ - utils.MetaTCD: &StatTCD{}, - }, - } - - experr := utils.ErrWrongPath - err := sq.ProcessEvent(tnt, evID, filters, evNm) - - if err == nil || err != experr { - t.Errorf("\nexpected: %q, \nreceived: %q", experr, err) - } -} - func TestStatQueueCompress(t *testing.T) { sm, err := NewStatMetric(utils.MetaTCD, 0, []string{"*string:~*req.Account:1001"}) if err != nil { t.Fatal(err) } - ttl := time.Millisecond expiryTime1 := time.Date(2021, 1, 1, 23, 59, 59, 0, time.UTC) expiryTime2 := time.Date(2021, 1, 2, 23, 59, 59, 0, time.UTC) @@ -1138,9 +1019,6 @@ func TestStatQueueCompress(t *testing.T) { if len(sq.SQItems) != len(exp) { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, sq.SQItems) } - // if !reflect.DeepEqual(sq.SQItems, exp) { - // t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, sq.SQItems) - // } } func TestStatQueueaddStatEventPassErr(t *testing.T) { @@ -1177,59 +1055,6 @@ func TestStatQueueaddStatEventPassErr(t *testing.T) { } } -func TestStatQueueaddStatEventNoPass(t *testing.T) { - sm, err := NewStatMetric(utils.MetaTCD, 0, []string{"*string:~*req.Account:1001"}) - if err != nil { - t.Fatal(err) - } - - sq := &StatQueue{ - SQMetrics: map[string]StatMetric{ - utils.MetaTCD: sm, - }, - } - sq.lock(utils.EmptyString) - - tnt, evID := "cgrates.org", "eventID" - filters := &FilterS{ - cfg: config.CgrConfig(), - dm: &DataManager{ - dataDB: NewInternalDB(nil, nil, true, config.CgrConfig().DataDbCfg().Items), - }, - connMgr: &ConnManager{}, - } - evNm := utils.MapStorage{ - utils.MetaReq: utils.MapStorage{ - utils.MetaReq: nil, - }, - utils.MetaOpts: nil, - utils.MetaVars: utils.MapStorage{ - utils.OptsAttributesProcessRuns: 0, - }, - } - - exp := &StatQueue{ - SQMetrics: map[string]StatMetric{ - utils.MetaTCD: sm, - }, - SQItems: []SQItem{ - { - EventID: "eventID", - }, - }, - } - err = sq.addStatEvent(tnt, evID, filters, evNm) - sq.unlock() - - if err != nil { - t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err) - } - - if !reflect.DeepEqual(sq, exp) { - t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, sq) - } -} - func TestStatQueueJSONMarshall(t *testing.T) { var rply *StatQueue exp, err := NewStatQueue("cgrates.org", "STS", []*MetricWithFilters{ diff --git a/engine/statmetrics.go b/engine/statmetrics.go index 21f58df28..db774d499 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -75,7 +75,7 @@ type StatMetric interface { GetFloat64Value(roundingDecimal int) (val float64) AddEvent(evID string, ev utils.DataProvider) error AddOneEvent(ev utils.DataProvider) error - RemEvent(evTenantID string) error + RemEvent(evTenantID string) Marshal(ms Marshaler) (marshaled []byte, err error) LoadMarshaled(ms Marshaler, marshaled []byte) (err error) GetFilterIDs() (filterIDs []string) @@ -181,10 +181,10 @@ func (asr *StatASR) AddOneEvent(ev utils.DataProvider) (err error) { } // RemEvent deletes a stored event and decrements statistics of the metric for recalculation -func (asr *StatASR) RemEvent(evID string) (err error) { +func (asr *StatASR) RemEvent(evID string) { val, has := asr.Events[evID] if !has { - return utils.ErrNotFound + return } ans := 0 if val.Stat > 0.5 { @@ -199,7 +199,6 @@ func (asr *StatASR) RemEvent(evID string) (err error) { val.CompressFactor = val.CompressFactor - 1 } asr.val = nil - return } // Marshal is part of StatMetric interface @@ -341,10 +340,10 @@ func (acd *StatACD) AddOneEvent(ev utils.DataProvider) (err error) { return } -func (acd *StatACD) RemEvent(evID string) (err error) { +func (acd *StatACD) RemEvent(evID string) { val, has := acd.Events[evID] if !has { - return utils.ErrNotFound + return } if val.Duration != 0 { acd.Sum -= val.Duration @@ -356,7 +355,6 @@ func (acd *StatACD) RemEvent(evID string) (err error) { val.CompressFactor = val.CompressFactor - 1 } acd.val = nil - return } func (acd *StatACD) Marshal(ms Marshaler) (marshaled []byte, err error) { @@ -496,10 +494,10 @@ func (tcd *StatTCD) AddOneEvent(ev utils.DataProvider) (err error) { return } -func (tcd *StatTCD) RemEvent(evID string) (err error) { +func (tcd *StatTCD) RemEvent(evID string) { val, has := tcd.Events[evID] if !has { - return utils.ErrNotFound + return } if val.Duration != 0 { tcd.Sum -= val.Duration @@ -511,7 +509,6 @@ func (tcd *StatTCD) RemEvent(evID string) (err error) { val.CompressFactor = val.CompressFactor - 1 } tcd.val = nil - return } func (tcd *StatTCD) Marshal(ms Marshaler) (marshaled []byte, err error) { @@ -649,10 +646,10 @@ func (acc *StatACC) AddOneEvent(ev utils.DataProvider) (err error) { return } -func (acc *StatACC) RemEvent(evID string) (err error) { +func (acc *StatACC) RemEvent(evID string) { cost, has := acc.Events[evID] if !has { - return utils.ErrNotFound + return } acc.Sum -= cost.Stat acc.Count-- @@ -662,7 +659,6 @@ func (acc *StatACC) RemEvent(evID string) (err error) { cost.CompressFactor = cost.CompressFactor - 1 } acc.val = nil - return } func (acc *StatACC) Marshal(ms Marshaler) (marshaled []byte, err error) { @@ -800,10 +796,10 @@ func (tcc *StatTCC) AddOneEvent(ev utils.DataProvider) (err error) { return } -func (tcc *StatTCC) RemEvent(evID string) (err error) { +func (tcc *StatTCC) RemEvent(evID string) { cost, has := tcc.Events[evID] if !has { - return utils.ErrNotFound + return } if cost.Stat != 0 { tcc.Sum -= cost.Stat @@ -815,7 +811,6 @@ func (tcc *StatTCC) RemEvent(evID string) (err error) { cost.CompressFactor = cost.CompressFactor - 1 } tcc.val = nil - return } func (tcc *StatTCC) Marshal(ms Marshaler) (marshaled []byte, err error) { @@ -954,10 +949,10 @@ func (pdd *StatPDD) AddOneEvent(ev utils.DataProvider) (err error) { return } -func (pdd *StatPDD) RemEvent(evID string) (err error) { +func (pdd *StatPDD) RemEvent(evID string) { val, has := pdd.Events[evID] if !has { - return utils.ErrNotFound + return } if val.Duration != 0 { pdd.Sum -= val.Duration @@ -969,7 +964,6 @@ func (pdd *StatPDD) RemEvent(evID string) (err error) { val.CompressFactor = val.CompressFactor - 1 } pdd.val = nil - return } func (pdd *StatPDD) Marshal(ms Marshaler) (marshaled []byte, err error) { @@ -1102,16 +1096,15 @@ func (ddc *StatDDC) AddOneEvent(ev utils.DataProvider) (err error) { return } -func (ddc *StatDDC) RemEvent(evID string) (err error) { +func (ddc *StatDDC) RemEvent(evID string) { fieldValues, has := ddc.Events[evID] if !has { - return utils.ErrNotFound + return } if len(fieldValues) == 0 { delete(ddc.Events, evID) - return utils.ErrNotFound + return } - // decrement events var fieldValue string for k := range fieldValues { @@ -1124,7 +1117,6 @@ func (ddc *StatDDC) RemEvent(evID string) (err error) { return // do not delete the reference until it reaches 0 } delete(ddc.Events[evID], fieldValue) - // remove from fieldValues if _, has := ddc.FieldValues[fieldValue]; !has { return @@ -1133,7 +1125,6 @@ func (ddc *StatDDC) RemEvent(evID string) (err error) { if ddc.FieldValues[fieldValue].Size() <= 0 { delete(ddc.FieldValues, fieldValue) } - return } func (ddc *StatDDC) Marshal(ms Marshaler) (marshaled []byte, err error) { @@ -1267,10 +1258,10 @@ func (sum *StatSum) AddOneEvent(ev utils.DataProvider) (err error) { return } -func (sum *StatSum) RemEvent(evID string) (err error) { +func (sum *StatSum) RemEvent(evID string) { val, has := sum.Events[evID] if !has { - return utils.ErrNotFound + return } if val.Stat != 0 { sum.Sum -= val.Stat @@ -1282,7 +1273,6 @@ func (sum *StatSum) RemEvent(evID string) (err error) { val.CompressFactor = val.CompressFactor - 1 } sum.val = nil - return } func (sum *StatSum) Marshal(ms Marshaler) (marshaled []byte, err error) { @@ -1421,10 +1411,10 @@ func (avg *StatAverage) AddOneEvent(ev utils.DataProvider) (err error) { return } -func (avg *StatAverage) RemEvent(evID string) (err error) { +func (avg *StatAverage) RemEvent(evID string) { val, has := avg.Events[evID] if !has { - return utils.ErrNotFound + return } if val.Stat >= 0 { avg.Sum -= val.Stat @@ -1436,7 +1426,6 @@ func (avg *StatAverage) RemEvent(evID string) (err error) { val.CompressFactor = val.CompressFactor - 1 } avg.val = nil - return } func (avg *StatAverage) Marshal(ms Marshaler) (marshaled []byte, err error) { @@ -1573,14 +1562,14 @@ func (dst *StatDistinct) AddOneEvent(ev utils.DataProvider) (err error) { return } -func (dst *StatDistinct) RemEvent(evID string) (err error) { +func (dst *StatDistinct) RemEvent(evID string) { fieldValues, has := dst.Events[evID] if !has { - return utils.ErrNotFound + return } if len(fieldValues) == 0 { delete(dst.Events, evID) - return utils.ErrNotFound + return } // decrement events @@ -1604,7 +1593,6 @@ func (dst *StatDistinct) RemEvent(evID string) (err error) { if dst.FieldValues[fieldValue].Size() <= 0 { delete(dst.FieldValues, fieldValue) } - return } func (dst *StatDistinct) Marshal(ms Marshaler) (marshaled []byte, err error) { diff --git a/engine/statmetrics_test.go b/engine/statmetrics_test.go index f70a200b5..81954a7cf 100644 --- a/engine/statmetrics_test.go +++ b/engine/statmetrics_test.go @@ -1871,15 +1871,11 @@ func TestPDDGetValue(t *testing.T) { if v := pdd.GetValue(config.CgrConfig().GeneralCfg().RoundingDecimals); v != 9*time.Second+500*time.Millisecond { t.Errorf("wrong pdd value: %+v", v) } - if err := pdd.RemEvent(ev.ID); err != nil { - t.Error(err) - } + pdd.RemEvent(ev.ID) if v := pdd.GetValue(config.CgrConfig().GeneralCfg().RoundingDecimals); v != -time.Nanosecond { t.Errorf("wrong pdd value: %+v", v) } - if err := pdd.RemEvent(ev2.ID); err != nil { - t.Error(err) - } + pdd.RemEvent(ev2.ID) if v := pdd.GetValue(config.CgrConfig().GeneralCfg().RoundingDecimals); v != -time.Nanosecond { t.Errorf("wrong pdd value: %+v", v) } @@ -1905,12 +1901,8 @@ func TestPDDGetValue(t *testing.T) { if v := pdd.GetValue(config.CgrConfig().GeneralCfg().RoundingDecimals); v != -time.Nanosecond { t.Errorf("wrong pdd value: %+v", v) } - if err := pdd.RemEvent(ev5.ID); err == nil || err.Error() != "NOT_FOUND" { - t.Error(err) - } - if err := pdd.RemEvent(ev4.ID); err != nil { - t.Error(err) - } + pdd.RemEvent(ev5.ID) + pdd.RemEvent(ev4.ID) if v := pdd.GetValue(config.CgrConfig().GeneralCfg().RoundingDecimals); v != -time.Nanosecond { t.Errorf("wrong pdd value: %+v", v) } @@ -3384,26 +3376,6 @@ func TestStatMetricsStatDistinctGetFilterIDs(t *testing.T) { } } -func TestStatMetricsStatDistinctRemEventErr2(t *testing.T) { - dst := &StatDistinct{ - FilterIDs: []string{"Test_Filter_ID"}, - FieldValues: map[string]utils.StringSet{}, - Events: map[string]map[string]int64{ - "Event1": { - "FieldValue1": 1, - }, - "Event2": {}, - }, - MinItems: 3, - FieldName: "Test_Field_Name", - Count: 3, - } - err := dst.RemEvent("Event2") - if err == nil || err != utils.ErrNotFound { - t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", utils.ErrNotFound, err) - } -} - func TestStatMetricsStatDistinctRemEvent(t *testing.T) { dst := &StatDistinct{ FilterIDs: []string{"Test_Filter_ID"}, @@ -3429,10 +3401,8 @@ func TestStatMetricsStatDistinctRemEvent(t *testing.T) { FieldName: "Test_Field_Name", Count: 2, } - err := dst.RemEvent("Event1") - if err != nil { - t.Errorf("\nExpecting ,\n Recevied <%+v>", err) - } + dst.RemEvent("Event1") + if !reflect.DeepEqual(expected, dst) { t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", expected, dst) } @@ -3469,10 +3439,7 @@ func TestStatMetricsStatDistinctRemEvent2(t *testing.T) { FieldName: "Test_Field_Name", Count: 2, } - err := dst.RemEvent("Event1") - if err != nil { - t.Errorf("\nExpecting ,\n Recevied <%+v>", err) - } + dst.RemEvent("Event1") if !reflect.DeepEqual(expected, dst) { t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", expected, dst) } @@ -3692,25 +3659,6 @@ func TestStatMetricsStatDDCGetMinItems(t *testing.T) { } } -func TestStatMetricsStatDDCRemEventErr2(t *testing.T) { - ddc := &StatDDC{ - FilterIDs: []string{"Test_Filter_ID"}, - FieldValues: map[string]utils.StringSet{}, - Events: map[string]map[string]int64{ - "Event1": { - "FieldValue1": 1, - }, - "Event2": {}, - }, - MinItems: 3, - Count: 3, - } - err := ddc.RemEvent("Event2") - if err == nil || err != utils.ErrNotFound { - t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", utils.ErrNotFound, err) - } -} - func TestStatMetricsStatDDCRemEvent(t *testing.T) { ddc := &StatDDC{ FilterIDs: []string{"Test_Filter_ID"}, @@ -3734,10 +3682,7 @@ func TestStatMetricsStatDDCRemEvent(t *testing.T) { MinItems: 3, Count: 2, } - err := ddc.RemEvent("Event1") - if err != nil { - t.Errorf("\nExpecting ,\n Recevied <%+v>", err) - } + ddc.RemEvent("Event1") if !reflect.DeepEqual(expected, ddc) { t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", expected, ddc) } @@ -3772,10 +3717,8 @@ func TestStatMetricsStatDDCRemEvent2(t *testing.T) { MinItems: 3, Count: 2, } - err := ddc.RemEvent("Event1") - if err != nil { - t.Errorf("\nExpecting ,\n Recevied <%+v>", err) - } + ddc.RemEvent("Event1") + if !reflect.DeepEqual(expected, ddc) { t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", expected, ddc) } diff --git a/engine/stats.go b/engine/stats.go index 851f1dab3..49bdb6c89 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -248,8 +248,8 @@ func (sS *StatService) getStatQueue(tnt, id string) (sq *StatQueue, err error) { if sq, err = sS.dm.GetStatQueue(tnt, id, true, true, utils.EmptyString); err != nil { return } - var removed int - if removed, err = sq.remExpired(); err != nil || removed == 0 { + removed := sq.remExpired() + if removed == 0 { return } sS.storeStatQueue(sq) @@ -383,18 +383,11 @@ func (sS *StatService) processEvent(tnt string, args *utils.CGREvent) (statQueue if err != nil { return nil, err } - statQueueIDs = matchSQs.IDs() var withErrors bool for _, sq := range matchSQs { - if err = sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm); err != nil { - utils.Logger.Warning( - fmt.Sprintf(" Queue: %s, ignoring event: %s, error: %s", - sq.TenantID(), utils.ConcatenatedKey(tnt, args.ID), err.Error())) - withErrors = true - } + sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm) sS.storeStatQueue(sq) - } if sS.processThresholds(matchSQs, args.APIOpts) != nil || sS.processEEs(matchSQs, args.APIOpts) != nil || withErrors { diff --git a/engine/stats_test.go b/engine/stats_test.go index 90e6839ac..9a72ae5f7 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -1349,156 +1349,6 @@ func TestStatQueueProcessEventProcessThPartExec(t *testing.T) { } } -func TestStatQueueProcessEventProcessEventErr(t *testing.T) { - utils.Logger.SetLogLevel(4) - utils.Logger.SetSyslog(nil) - - var buf bytes.Buffer - log.SetOutput(&buf) - defer func() { - log.SetOutput(os.Stderr) - }() - - cfg := config.NewDefaultCGRConfig() - data := NewInternalDB(nil, nil, true, config.CgrConfig().DataDbCfg().Items) - dm := NewDataManager(data, cfg.CacheCfg(), nil) - filterS := NewFilterS(cfg, nil, dm) - sS := NewStatService(dm, cfg, filterS, nil) - - sqPrf := &StatQueueProfile{ - Tenant: "cgrates.org", - ID: "SQ1", - FilterIDs: []string{"*string:~*req.Account:1001"}, - ActivationInterval: &utils.ActivationInterval{ - ExpiryTime: time.Date(2021, 6, 1, 12, 0, 0, 0, time.UTC), - }, - Weight: 10, - Blocker: true, - QueueLength: 10, - ThresholdIDs: []string{"*none"}, - MinItems: 5, - Metrics: []*MetricWithFilters{ - { - MetricID: utils.MetaTCD, - }, - }, - } - sq := &StatQueue{ - sqPrfl: sqPrf, - Tenant: "cgrates.org", - ID: "SQ1", - SQItems: []SQItem{ - { - EventID: "SqProcessEvent", - }, - }, - SQMetrics: map[string]StatMetric{ - utils.MetaTCD: &StatTCD{}, - }, - } - - if err := dm.SetStatQueueProfile(sqPrf, true); err != nil { - t.Error(err) - } - if err := dm.SetStatQueue(sq); err != nil { - t.Error(err) - } - - args := &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "SqProcessEvent", - Event: map[string]any{ - utils.AccountField: "1001", - }, - APIOpts: map[string]any{ - utils.OptsStatsProfileIDs: []string{"SQ1"}, - }, - } - - expLog := `[WARNING] Queue: cgrates.org:SQ1, ignoring event: cgrates.org:SqProcessEvent, error: NOT_FOUND:Usage` - expIDs := []string{"SQ1"} - if rcvIDs, err := sS.processEvent(args.Tenant, args); err == nil || - err.Error() != utils.ErrPartiallyExecuted.Error() { - t.Errorf("expected: <%+v>, received: <%+v>", utils.ErrPartiallyExecuted, err) - } else if !reflect.DeepEqual(rcvIDs, expIDs) { - t.Errorf("expected: <%+v>, received: <%+v>", expIDs, rcvIDs) - } else if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog) { - t.Errorf("expected log <%+v> to be included in: <%+v>", - expLog, rcvLog) - } - - utils.Logger.SetLogLevel(0) -} - -func TestStatQueueV1ProcessEventProcessEventErr(t *testing.T) { - tmpC := config.CgrConfig() - defer func() { - config.SetCgrConfig(tmpC) - }() - - cfg := config.NewDefaultCGRConfig() - data := NewInternalDB(nil, nil, true, config.CgrConfig().DataDbCfg().Items) - dm := NewDataManager(data, cfg.CacheCfg(), nil) - Cache.Clear(nil) - filterS := NewFilterS(cfg, nil, dm) - sS := NewStatService(dm, cfg, filterS, nil) - - sqPrf := &StatQueueProfile{ - Tenant: "cgrates.org", - ID: "SQ1", - FilterIDs: []string{"*string:~*req.Account:1001"}, - ActivationInterval: &utils.ActivationInterval{ - ExpiryTime: time.Date(2021, 6, 1, 12, 0, 0, 0, time.UTC), - }, - Weight: 10, - Blocker: true, - QueueLength: 10, - ThresholdIDs: []string{"*none"}, - MinItems: 5, - Metrics: []*MetricWithFilters{ - { - MetricID: utils.MetaTCD, - }, - }, - } - sq := &StatQueue{ - sqPrfl: sqPrf, - Tenant: "cgrates.org", - ID: "SQ1", - SQItems: []SQItem{ - { - EventID: "SqProcessEvent", - }, - }, - SQMetrics: map[string]StatMetric{ - utils.MetaTCD: &StatTCD{}, - }, - } - - if err := dm.SetStatQueueProfile(sqPrf, true); err != nil { - t.Error(err) - } - if err := dm.SetStatQueue(sq); err != nil { - t.Error(err) - } - - args := &utils.CGREvent{ - ID: "SqProcessEvent", - Event: map[string]any{ - utils.AccountField: "1001", - }, - APIOpts: map[string]any{ - utils.OptsStatsProfileIDs: []string{"SQ1"}, - }, - } - - var reply []string - if err := sS.V1ProcessEvent(context.Background(), args, &reply); err == nil || - err.Error() != utils.ErrPartiallyExecuted.Error() { - t.Errorf("expected: <%+v>, received: <%+v>", utils.ErrPartiallyExecuted, err) - } -} - func TestStatQueueV1ProcessEventMissingArgs(t *testing.T) { tmpC := config.CgrConfig() defer func() {