Revise redundant error handling in StatQueue rem methods

This commit is contained in:
armirveliaj
2024-09-23 10:24:12 -04:00
committed by Dan Christian Bogos
parent 01dd4696cf
commit 085365e35d
6 changed files with 48 additions and 467 deletions

View File

@@ -233,17 +233,12 @@ func (sq *StatQueue) TenantID() string {
}
// ProcessEvent processes a utils.CGREvent, returns true if processed
func (sq *StatQueue) ProcessEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) {
func (sq *StatQueue) ProcessEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) error {
if oneEv := sq.isOneEvent(); oneEv {
return sq.addOneEvent(tnt, filterS, evNm)
}
if _, err = sq.remExpired(); err != nil {
return
}
if err = sq.remOnQueueLength(); err != nil {
return
}
sq.remExpired()
sq.remOnQueueLength()
return sq.addStatEvent(tnt, evID, filterS, evNm)
}
@@ -272,22 +267,14 @@ func (sq *StatQueue) addOneEvent(tnt string, filterS *FilterS, evNm utils.MapSto
}
// remStatEvent removes an event from metrics
func (sq *StatQueue) remEventWithID(evID string) (err error) {
for metricID, metric := range sq.SQMetrics {
if err = metric.RemEvent(evID); err != nil {
if err.Error() == utils.ErrNotFound.Error() {
err = nil
continue
}
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error()))
return
}
func (sq *StatQueue) remEventWithID(evID string) {
for _, metric := range sq.SQMetrics {
metric.RemEvent(evID)
}
return
}
// remExpired expires items in queue
func (sq *StatQueue) remExpired() (removed int, err error) {
func (sq *StatQueue) remExpired() (removed int) {
var expIdx *int // index of last item to be expired
for i, item := range sq.SQItems {
if item.ExpiryTime == nil {
@@ -296,9 +283,7 @@ func (sq *StatQueue) remExpired() (removed int, err error) {
if item.ExpiryTime.After(time.Now()) {
break
}
if err = sq.remEventWithID(item.EventID); err != nil {
return
}
sq.remEventWithID(item.EventID)
expIdx = utils.IntPointer(i)
}
if expIdx == nil {
@@ -310,18 +295,15 @@ func (sq *StatQueue) remExpired() (removed int, err error) {
}
// remOnQueueLength removes elements based on QueueLength setting
func (sq *StatQueue) remOnQueueLength() (err error) {
func (sq *StatQueue) remOnQueueLength() {
if sq.sqPrfl.QueueLength <= 0 { // infinite length
return
}
if len(sq.SQItems) == sq.sqPrfl.QueueLength { // reached limit, rem first element
if len(sq.SQItems) == sq.sqPrfl.QueueLength { // reached limit, remove first element
item := sq.SQItems[0]
if err = sq.remEventWithID(item.EventID); err != nil {
return
}
sq.remEventWithID(item.EventID)
sq.SQItems = sq.SQItems[1:]
}
return
}
// addStatEvent computes metrics for an event

View File

@@ -734,12 +734,7 @@ func (sMM *statMetricMock) AddOneEvent(ev utils.DataProvider) error {
return nil
}
func (sMM *statMetricMock) RemEvent(evTenantID string) error {
switch sMM.testcase {
case "remExpired error":
return fmt.Errorf("remExpired mock error")
}
return nil
func (sMM *statMetricMock) RemEvent(evTenantID string) {
}
func (sMM *statMetricMock) Marshal(ms Marshaler) (marshaled []byte, err error) {
@@ -777,26 +772,6 @@ func (sMM *statMetricMock) GetCompressFactor(events map[string]int) map[string]i
return nil
}
func TestStatQueueNewStoredStatQueue(t *testing.T) {
sq := &StatQueue{
SQMetrics: map[string]StatMetric{
"key": &statMetricMock{},
},
}
var ms Marshaler
experr := "marshal mock error"
rcv, err := NewStoredStatQueue(sq, ms)
if err == nil || err.Error() != experr {
t.Fatalf("\nreceived: %q, \nexpected: %q", experr, err)
}
if rcv != nil {
t.Errorf("\nreceived: <%+v>, \nexpected: <%+v>", nil, rcv)
}
}
func TestStatQueueAsStatQueueNilStoredSq(t *testing.T) {
var ssq *StoredStatQueue
var ms Marshaler
@@ -968,105 +943,11 @@ func TestStatQueueNewStatQueue(t *testing.T) {
}
}
func TestStatQueueProcessEventremExpiredErr(t *testing.T) {
tnt, evID := "tenant", "eventID"
filters := &FilterS{}
expiry := time.Date(2021, 1, 1, 23, 59, 59, 10, time.UTC)
evNm := utils.MapStorage{
"key": nil,
}
sq := &StatQueue{
sqPrfl: &StatQueueProfile{
QueueLength: -1,
},
SQItems: []SQItem{
{
EventID: evID,
ExpiryTime: &expiry,
},
},
SQMetrics: map[string]StatMetric{
"key": &statMetricMock{
testcase: "remExpired error",
},
},
}
experr := "remExpired mock error"
err := sq.ProcessEvent(tnt, evID, filters, evNm)
if err == nil || err.Error() != experr {
t.Errorf("\nexpected: %q, \nreceived: %q", experr, err)
}
}
func TestStatQueueProcessEventremOnQueueLengthErr(t *testing.T) {
tnt, evID := "tenant", "eventID"
filters := &FilterS{}
evNm := utils.MapStorage{
"key": nil,
}
sq := &StatQueue{
sqPrfl: &StatQueueProfile{
QueueLength: 1,
},
SQItems: []SQItem{
{
EventID: evID,
},
},
SQMetrics: map[string]StatMetric{
"key": &statMetricMock{
testcase: "remExpired error",
},
},
}
experr := "remExpired mock error"
err := sq.ProcessEvent(tnt, evID, filters, evNm)
if err == nil || err.Error() != experr {
t.Errorf("\nexpected: %q, \nreceived: %q", experr, err)
}
}
func TestStatQueueProcessEventaddStatEvent(t *testing.T) {
tnt, evID := "tenant", "eventID"
filters := &FilterS{}
evNm := utils.MapStorage{
"key": nil,
}
sq := &StatQueue{
sqPrfl: &StatQueueProfile{
QueueLength: 1,
},
SQItems: []SQItem{
{
EventID: evID,
},
},
SQMetrics: map[string]StatMetric{
utils.MetaTCD: &StatTCD{},
},
}
experr := utils.ErrWrongPath
err := sq.ProcessEvent(tnt, evID, filters, evNm)
if err == nil || err != experr {
t.Errorf("\nexpected: %q, \nreceived: %q", experr, err)
}
}
func TestStatQueueCompress(t *testing.T) {
sm, err := NewStatMetric(utils.MetaTCD, 0, []string{"*string:~*req.Account:1001"})
if err != nil {
t.Fatal(err)
}
ttl := time.Millisecond
expiryTime1 := time.Date(2021, 1, 1, 23, 59, 59, 0, time.UTC)
expiryTime2 := time.Date(2021, 1, 2, 23, 59, 59, 0, time.UTC)
@@ -1138,9 +1019,6 @@ func TestStatQueueCompress(t *testing.T) {
if len(sq.SQItems) != len(exp) {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, sq.SQItems)
}
// if !reflect.DeepEqual(sq.SQItems, exp) {
// t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, sq.SQItems)
// }
}
func TestStatQueueaddStatEventPassErr(t *testing.T) {
@@ -1177,59 +1055,6 @@ func TestStatQueueaddStatEventPassErr(t *testing.T) {
}
}
func TestStatQueueaddStatEventNoPass(t *testing.T) {
sm, err := NewStatMetric(utils.MetaTCD, 0, []string{"*string:~*req.Account:1001"})
if err != nil {
t.Fatal(err)
}
sq := &StatQueue{
SQMetrics: map[string]StatMetric{
utils.MetaTCD: sm,
},
}
sq.lock(utils.EmptyString)
tnt, evID := "cgrates.org", "eventID"
filters := &FilterS{
cfg: config.CgrConfig(),
dm: &DataManager{
dataDB: NewInternalDB(nil, nil, true, config.CgrConfig().DataDbCfg().Items),
},
connMgr: &ConnManager{},
}
evNm := utils.MapStorage{
utils.MetaReq: utils.MapStorage{
utils.MetaReq: nil,
},
utils.MetaOpts: nil,
utils.MetaVars: utils.MapStorage{
utils.OptsAttributesProcessRuns: 0,
},
}
exp := &StatQueue{
SQMetrics: map[string]StatMetric{
utils.MetaTCD: sm,
},
SQItems: []SQItem{
{
EventID: "eventID",
},
},
}
err = sq.addStatEvent(tnt, evID, filters, evNm)
sq.unlock()
if err != nil {
t.Fatalf("\nexpected: <%+v>, \nreceived: <%+v>", nil, err)
}
if !reflect.DeepEqual(sq, exp) {
t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, sq)
}
}
func TestStatQueueJSONMarshall(t *testing.T) {
var rply *StatQueue
exp, err := NewStatQueue("cgrates.org", "STS", []*MetricWithFilters{

View File

@@ -75,7 +75,7 @@ type StatMetric interface {
GetFloat64Value(roundingDecimal int) (val float64)
AddEvent(evID string, ev utils.DataProvider) error
AddOneEvent(ev utils.DataProvider) error
RemEvent(evTenantID string) error
RemEvent(evTenantID string)
Marshal(ms Marshaler) (marshaled []byte, err error)
LoadMarshaled(ms Marshaler, marshaled []byte) (err error)
GetFilterIDs() (filterIDs []string)
@@ -181,10 +181,10 @@ func (asr *StatASR) AddOneEvent(ev utils.DataProvider) (err error) {
}
// RemEvent deletes a stored event and decrements statistics of the metric for recalculation
func (asr *StatASR) RemEvent(evID string) (err error) {
func (asr *StatASR) RemEvent(evID string) {
val, has := asr.Events[evID]
if !has {
return utils.ErrNotFound
return
}
ans := 0
if val.Stat > 0.5 {
@@ -199,7 +199,6 @@ func (asr *StatASR) RemEvent(evID string) (err error) {
val.CompressFactor = val.CompressFactor - 1
}
asr.val = nil
return
}
// Marshal is part of StatMetric interface
@@ -341,10 +340,10 @@ func (acd *StatACD) AddOneEvent(ev utils.DataProvider) (err error) {
return
}
func (acd *StatACD) RemEvent(evID string) (err error) {
func (acd *StatACD) RemEvent(evID string) {
val, has := acd.Events[evID]
if !has {
return utils.ErrNotFound
return
}
if val.Duration != 0 {
acd.Sum -= val.Duration
@@ -356,7 +355,6 @@ func (acd *StatACD) RemEvent(evID string) (err error) {
val.CompressFactor = val.CompressFactor - 1
}
acd.val = nil
return
}
func (acd *StatACD) Marshal(ms Marshaler) (marshaled []byte, err error) {
@@ -496,10 +494,10 @@ func (tcd *StatTCD) AddOneEvent(ev utils.DataProvider) (err error) {
return
}
func (tcd *StatTCD) RemEvent(evID string) (err error) {
func (tcd *StatTCD) RemEvent(evID string) {
val, has := tcd.Events[evID]
if !has {
return utils.ErrNotFound
return
}
if val.Duration != 0 {
tcd.Sum -= val.Duration
@@ -511,7 +509,6 @@ func (tcd *StatTCD) RemEvent(evID string) (err error) {
val.CompressFactor = val.CompressFactor - 1
}
tcd.val = nil
return
}
func (tcd *StatTCD) Marshal(ms Marshaler) (marshaled []byte, err error) {
@@ -649,10 +646,10 @@ func (acc *StatACC) AddOneEvent(ev utils.DataProvider) (err error) {
return
}
func (acc *StatACC) RemEvent(evID string) (err error) {
func (acc *StatACC) RemEvent(evID string) {
cost, has := acc.Events[evID]
if !has {
return utils.ErrNotFound
return
}
acc.Sum -= cost.Stat
acc.Count--
@@ -662,7 +659,6 @@ func (acc *StatACC) RemEvent(evID string) (err error) {
cost.CompressFactor = cost.CompressFactor - 1
}
acc.val = nil
return
}
func (acc *StatACC) Marshal(ms Marshaler) (marshaled []byte, err error) {
@@ -800,10 +796,10 @@ func (tcc *StatTCC) AddOneEvent(ev utils.DataProvider) (err error) {
return
}
func (tcc *StatTCC) RemEvent(evID string) (err error) {
func (tcc *StatTCC) RemEvent(evID string) {
cost, has := tcc.Events[evID]
if !has {
return utils.ErrNotFound
return
}
if cost.Stat != 0 {
tcc.Sum -= cost.Stat
@@ -815,7 +811,6 @@ func (tcc *StatTCC) RemEvent(evID string) (err error) {
cost.CompressFactor = cost.CompressFactor - 1
}
tcc.val = nil
return
}
func (tcc *StatTCC) Marshal(ms Marshaler) (marshaled []byte, err error) {
@@ -954,10 +949,10 @@ func (pdd *StatPDD) AddOneEvent(ev utils.DataProvider) (err error) {
return
}
func (pdd *StatPDD) RemEvent(evID string) (err error) {
func (pdd *StatPDD) RemEvent(evID string) {
val, has := pdd.Events[evID]
if !has {
return utils.ErrNotFound
return
}
if val.Duration != 0 {
pdd.Sum -= val.Duration
@@ -969,7 +964,6 @@ func (pdd *StatPDD) RemEvent(evID string) (err error) {
val.CompressFactor = val.CompressFactor - 1
}
pdd.val = nil
return
}
func (pdd *StatPDD) Marshal(ms Marshaler) (marshaled []byte, err error) {
@@ -1102,16 +1096,15 @@ func (ddc *StatDDC) AddOneEvent(ev utils.DataProvider) (err error) {
return
}
func (ddc *StatDDC) RemEvent(evID string) (err error) {
func (ddc *StatDDC) RemEvent(evID string) {
fieldValues, has := ddc.Events[evID]
if !has {
return utils.ErrNotFound
return
}
if len(fieldValues) == 0 {
delete(ddc.Events, evID)
return utils.ErrNotFound
return
}
// decrement events
var fieldValue string
for k := range fieldValues {
@@ -1124,7 +1117,6 @@ func (ddc *StatDDC) RemEvent(evID string) (err error) {
return // do not delete the reference until it reaches 0
}
delete(ddc.Events[evID], fieldValue)
// remove from fieldValues
if _, has := ddc.FieldValues[fieldValue]; !has {
return
@@ -1133,7 +1125,6 @@ func (ddc *StatDDC) RemEvent(evID string) (err error) {
if ddc.FieldValues[fieldValue].Size() <= 0 {
delete(ddc.FieldValues, fieldValue)
}
return
}
func (ddc *StatDDC) Marshal(ms Marshaler) (marshaled []byte, err error) {
@@ -1267,10 +1258,10 @@ func (sum *StatSum) AddOneEvent(ev utils.DataProvider) (err error) {
return
}
func (sum *StatSum) RemEvent(evID string) (err error) {
func (sum *StatSum) RemEvent(evID string) {
val, has := sum.Events[evID]
if !has {
return utils.ErrNotFound
return
}
if val.Stat != 0 {
sum.Sum -= val.Stat
@@ -1282,7 +1273,6 @@ func (sum *StatSum) RemEvent(evID string) (err error) {
val.CompressFactor = val.CompressFactor - 1
}
sum.val = nil
return
}
func (sum *StatSum) Marshal(ms Marshaler) (marshaled []byte, err error) {
@@ -1421,10 +1411,10 @@ func (avg *StatAverage) AddOneEvent(ev utils.DataProvider) (err error) {
return
}
func (avg *StatAverage) RemEvent(evID string) (err error) {
func (avg *StatAverage) RemEvent(evID string) {
val, has := avg.Events[evID]
if !has {
return utils.ErrNotFound
return
}
if val.Stat >= 0 {
avg.Sum -= val.Stat
@@ -1436,7 +1426,6 @@ func (avg *StatAverage) RemEvent(evID string) (err error) {
val.CompressFactor = val.CompressFactor - 1
}
avg.val = nil
return
}
func (avg *StatAverage) Marshal(ms Marshaler) (marshaled []byte, err error) {
@@ -1573,14 +1562,14 @@ func (dst *StatDistinct) AddOneEvent(ev utils.DataProvider) (err error) {
return
}
func (dst *StatDistinct) RemEvent(evID string) (err error) {
func (dst *StatDistinct) RemEvent(evID string) {
fieldValues, has := dst.Events[evID]
if !has {
return utils.ErrNotFound
return
}
if len(fieldValues) == 0 {
delete(dst.Events, evID)
return utils.ErrNotFound
return
}
// decrement events
@@ -1604,7 +1593,6 @@ func (dst *StatDistinct) RemEvent(evID string) (err error) {
if dst.FieldValues[fieldValue].Size() <= 0 {
delete(dst.FieldValues, fieldValue)
}
return
}
func (dst *StatDistinct) Marshal(ms Marshaler) (marshaled []byte, err error) {

View File

@@ -1871,15 +1871,11 @@ func TestPDDGetValue(t *testing.T) {
if v := pdd.GetValue(config.CgrConfig().GeneralCfg().RoundingDecimals); v != 9*time.Second+500*time.Millisecond {
t.Errorf("wrong pdd value: %+v", v)
}
if err := pdd.RemEvent(ev.ID); err != nil {
t.Error(err)
}
pdd.RemEvent(ev.ID)
if v := pdd.GetValue(config.CgrConfig().GeneralCfg().RoundingDecimals); v != -time.Nanosecond {
t.Errorf("wrong pdd value: %+v", v)
}
if err := pdd.RemEvent(ev2.ID); err != nil {
t.Error(err)
}
pdd.RemEvent(ev2.ID)
if v := pdd.GetValue(config.CgrConfig().GeneralCfg().RoundingDecimals); v != -time.Nanosecond {
t.Errorf("wrong pdd value: %+v", v)
}
@@ -1905,12 +1901,8 @@ func TestPDDGetValue(t *testing.T) {
if v := pdd.GetValue(config.CgrConfig().GeneralCfg().RoundingDecimals); v != -time.Nanosecond {
t.Errorf("wrong pdd value: %+v", v)
}
if err := pdd.RemEvent(ev5.ID); err == nil || err.Error() != "NOT_FOUND" {
t.Error(err)
}
if err := pdd.RemEvent(ev4.ID); err != nil {
t.Error(err)
}
pdd.RemEvent(ev5.ID)
pdd.RemEvent(ev4.ID)
if v := pdd.GetValue(config.CgrConfig().GeneralCfg().RoundingDecimals); v != -time.Nanosecond {
t.Errorf("wrong pdd value: %+v", v)
}
@@ -3384,26 +3376,6 @@ func TestStatMetricsStatDistinctGetFilterIDs(t *testing.T) {
}
}
func TestStatMetricsStatDistinctRemEventErr2(t *testing.T) {
dst := &StatDistinct{
FilterIDs: []string{"Test_Filter_ID"},
FieldValues: map[string]utils.StringSet{},
Events: map[string]map[string]int64{
"Event1": {
"FieldValue1": 1,
},
"Event2": {},
},
MinItems: 3,
FieldName: "Test_Field_Name",
Count: 3,
}
err := dst.RemEvent("Event2")
if err == nil || err != utils.ErrNotFound {
t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", utils.ErrNotFound, err)
}
}
func TestStatMetricsStatDistinctRemEvent(t *testing.T) {
dst := &StatDistinct{
FilterIDs: []string{"Test_Filter_ID"},
@@ -3429,10 +3401,8 @@ func TestStatMetricsStatDistinctRemEvent(t *testing.T) {
FieldName: "Test_Field_Name",
Count: 2,
}
err := dst.RemEvent("Event1")
if err != nil {
t.Errorf("\nExpecting <nil>,\n Recevied <%+v>", err)
}
dst.RemEvent("Event1")
if !reflect.DeepEqual(expected, dst) {
t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", expected, dst)
}
@@ -3469,10 +3439,7 @@ func TestStatMetricsStatDistinctRemEvent2(t *testing.T) {
FieldName: "Test_Field_Name",
Count: 2,
}
err := dst.RemEvent("Event1")
if err != nil {
t.Errorf("\nExpecting <nil>,\n Recevied <%+v>", err)
}
dst.RemEvent("Event1")
if !reflect.DeepEqual(expected, dst) {
t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", expected, dst)
}
@@ -3692,25 +3659,6 @@ func TestStatMetricsStatDDCGetMinItems(t *testing.T) {
}
}
func TestStatMetricsStatDDCRemEventErr2(t *testing.T) {
ddc := &StatDDC{
FilterIDs: []string{"Test_Filter_ID"},
FieldValues: map[string]utils.StringSet{},
Events: map[string]map[string]int64{
"Event1": {
"FieldValue1": 1,
},
"Event2": {},
},
MinItems: 3,
Count: 3,
}
err := ddc.RemEvent("Event2")
if err == nil || err != utils.ErrNotFound {
t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", utils.ErrNotFound, err)
}
}
func TestStatMetricsStatDDCRemEvent(t *testing.T) {
ddc := &StatDDC{
FilterIDs: []string{"Test_Filter_ID"},
@@ -3734,10 +3682,7 @@ func TestStatMetricsStatDDCRemEvent(t *testing.T) {
MinItems: 3,
Count: 2,
}
err := ddc.RemEvent("Event1")
if err != nil {
t.Errorf("\nExpecting <nil>,\n Recevied <%+v>", err)
}
ddc.RemEvent("Event1")
if !reflect.DeepEqual(expected, ddc) {
t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", expected, ddc)
}
@@ -3772,10 +3717,8 @@ func TestStatMetricsStatDDCRemEvent2(t *testing.T) {
MinItems: 3,
Count: 2,
}
err := ddc.RemEvent("Event1")
if err != nil {
t.Errorf("\nExpecting <nil>,\n Recevied <%+v>", err)
}
ddc.RemEvent("Event1")
if !reflect.DeepEqual(expected, ddc) {
t.Errorf("\nExpecting <%+v>,\n Recevied <%+v>", expected, ddc)
}

View File

@@ -248,8 +248,8 @@ func (sS *StatService) getStatQueue(tnt, id string) (sq *StatQueue, err error) {
if sq, err = sS.dm.GetStatQueue(tnt, id, true, true, utils.EmptyString); err != nil {
return
}
var removed int
if removed, err = sq.remExpired(); err != nil || removed == 0 {
removed := sq.remExpired()
if removed == 0 {
return
}
sS.storeStatQueue(sq)
@@ -383,18 +383,11 @@ func (sS *StatService) processEvent(tnt string, args *utils.CGREvent) (statQueue
if err != nil {
return nil, err
}
statQueueIDs = matchSQs.IDs()
var withErrors bool
for _, sq := range matchSQs {
if err = sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatS> Queue: %s, ignoring event: %s, error: %s",
sq.TenantID(), utils.ConcatenatedKey(tnt, args.ID), err.Error()))
withErrors = true
}
sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm)
sS.storeStatQueue(sq)
}
if sS.processThresholds(matchSQs, args.APIOpts) != nil || sS.processEEs(matchSQs, args.APIOpts) != nil ||
withErrors {

View File

@@ -1349,156 +1349,6 @@ func TestStatQueueProcessEventProcessThPartExec(t *testing.T) {
}
}
func TestStatQueueProcessEventProcessEventErr(t *testing.T) {
utils.Logger.SetLogLevel(4)
utils.Logger.SetSyslog(nil)
var buf bytes.Buffer
log.SetOutput(&buf)
defer func() {
log.SetOutput(os.Stderr)
}()
cfg := config.NewDefaultCGRConfig()
data := NewInternalDB(nil, nil, true, config.CgrConfig().DataDbCfg().Items)
dm := NewDataManager(data, cfg.CacheCfg(), nil)
filterS := NewFilterS(cfg, nil, dm)
sS := NewStatService(dm, cfg, filterS, nil)
sqPrf := &StatQueueProfile{
Tenant: "cgrates.org",
ID: "SQ1",
FilterIDs: []string{"*string:~*req.Account:1001"},
ActivationInterval: &utils.ActivationInterval{
ExpiryTime: time.Date(2021, 6, 1, 12, 0, 0, 0, time.UTC),
},
Weight: 10,
Blocker: true,
QueueLength: 10,
ThresholdIDs: []string{"*none"},
MinItems: 5,
Metrics: []*MetricWithFilters{
{
MetricID: utils.MetaTCD,
},
},
}
sq := &StatQueue{
sqPrfl: sqPrf,
Tenant: "cgrates.org",
ID: "SQ1",
SQItems: []SQItem{
{
EventID: "SqProcessEvent",
},
},
SQMetrics: map[string]StatMetric{
utils.MetaTCD: &StatTCD{},
},
}
if err := dm.SetStatQueueProfile(sqPrf, true); err != nil {
t.Error(err)
}
if err := dm.SetStatQueue(sq); err != nil {
t.Error(err)
}
args := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "SqProcessEvent",
Event: map[string]any{
utils.AccountField: "1001",
},
APIOpts: map[string]any{
utils.OptsStatsProfileIDs: []string{"SQ1"},
},
}
expLog := `[WARNING] <StatS> Queue: cgrates.org:SQ1, ignoring event: cgrates.org:SqProcessEvent, error: NOT_FOUND:Usage`
expIDs := []string{"SQ1"}
if rcvIDs, err := sS.processEvent(args.Tenant, args); err == nil ||
err.Error() != utils.ErrPartiallyExecuted.Error() {
t.Errorf("expected: <%+v>, received: <%+v>", utils.ErrPartiallyExecuted, err)
} else if !reflect.DeepEqual(rcvIDs, expIDs) {
t.Errorf("expected: <%+v>, received: <%+v>", expIDs, rcvIDs)
} else if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog) {
t.Errorf("expected log <%+v> to be included in: <%+v>",
expLog, rcvLog)
}
utils.Logger.SetLogLevel(0)
}
func TestStatQueueV1ProcessEventProcessEventErr(t *testing.T) {
tmpC := config.CgrConfig()
defer func() {
config.SetCgrConfig(tmpC)
}()
cfg := config.NewDefaultCGRConfig()
data := NewInternalDB(nil, nil, true, config.CgrConfig().DataDbCfg().Items)
dm := NewDataManager(data, cfg.CacheCfg(), nil)
Cache.Clear(nil)
filterS := NewFilterS(cfg, nil, dm)
sS := NewStatService(dm, cfg, filterS, nil)
sqPrf := &StatQueueProfile{
Tenant: "cgrates.org",
ID: "SQ1",
FilterIDs: []string{"*string:~*req.Account:1001"},
ActivationInterval: &utils.ActivationInterval{
ExpiryTime: time.Date(2021, 6, 1, 12, 0, 0, 0, time.UTC),
},
Weight: 10,
Blocker: true,
QueueLength: 10,
ThresholdIDs: []string{"*none"},
MinItems: 5,
Metrics: []*MetricWithFilters{
{
MetricID: utils.MetaTCD,
},
},
}
sq := &StatQueue{
sqPrfl: sqPrf,
Tenant: "cgrates.org",
ID: "SQ1",
SQItems: []SQItem{
{
EventID: "SqProcessEvent",
},
},
SQMetrics: map[string]StatMetric{
utils.MetaTCD: &StatTCD{},
},
}
if err := dm.SetStatQueueProfile(sqPrf, true); err != nil {
t.Error(err)
}
if err := dm.SetStatQueue(sq); err != nil {
t.Error(err)
}
args := &utils.CGREvent{
ID: "SqProcessEvent",
Event: map[string]any{
utils.AccountField: "1001",
},
APIOpts: map[string]any{
utils.OptsStatsProfileIDs: []string{"SQ1"},
},
}
var reply []string
if err := sS.V1ProcessEvent(context.Background(), args, &reply); err == nil ||
err.Error() != utils.ErrPartiallyExecuted.Error() {
t.Errorf("expected: <%+v>, received: <%+v>", utils.ErrPartiallyExecuted, err)
}
}
func TestStatQueueV1ProcessEventMissingArgs(t *testing.T) {
tmpC := config.CgrConfig()
defer func() {