diff --git a/engine/stats_test.go b/engine/stats_test.go index ad0558060..77fa2f44a 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -2876,3 +2876,268 @@ func TestStatQueueV1ResetStatQueueUnsupportedMetricType(t *testing.T) { t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err) } } + +func TestStatQueueProcessThresholdsOKNoThIDs(t *testing.T) { + tmp := Cache + tmpC := config.CgrConfig() + tmpCM := connMgr + defer func() { + Cache = tmp + config.SetCgrConfig(tmpC) + connMgr = tmpCM + }() + + cfg := config.NewDefaultCGRConfig() + cfg.StatSCfg().ThresholdSConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)} + data := NewInternalDB(nil, nil, true) + dm := NewDataManager(data, cfg.CacheCfg(), nil) + Cache = NewCacheS(cfg, dm, nil) + + filterS := NewFilterS(cfg, nil, dm) + sS := NewStatService(dm, cfg, filterS, nil) + + sqPrf := &StatQueueProfile{ + Tenant: "cgrates.org", + ID: "SQ1", + FilterIDs: []string{"*string:~*req.Account:1001"}, + ActivationInterval: &utils.ActivationInterval{ + ExpiryTime: time.Date(2021, 6, 1, 12, 0, 0, 0, time.UTC), + }, + Weight: 10, + Blocker: true, + QueueLength: 10, + ThresholdIDs: []string{"*none"}, + MinItems: 5, + Metrics: []*MetricWithFilters{ + { + MetricID: "testMetricType", + }, + }, + } + sq := &StatQueue{ + sqPrfl: sqPrf, + dirty: utils.BoolPointer(false), + Tenant: "cgrates.org", + ID: "SQ1", + SQItems: []SQItem{ + { + EventID: "SqProcessEvent", + }, + }, + SQMetrics: map[string]StatMetric{ + "testMetricType": &StatTCD{ + Sum: time.Minute, + val: utils.DurationPointer(time.Hour), + }, + }, + } + + if err := dm.SetStatQueue(sq); err != nil { + t.Error(err) + } + + sQs := StatQueues{ + sq, + } + + if err := sS.processThresholds(sQs, nil); err != nil { + t.Error(err) + } +} + +func TestStatQueueProcessThresholdsOK(t *testing.T) { + tmp := Cache + tmpC := config.CgrConfig() + tmpCM := connMgr + defer func() { + Cache = tmp + config.SetCgrConfig(tmpC) + connMgr = tmpCM + }() + + cfg := config.NewDefaultCGRConfig() + cfg.StatSCfg().ThresholdSConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)} + data := NewInternalDB(nil, nil, true) + dm := NewDataManager(data, cfg.CacheCfg(), nil) + Cache = NewCacheS(cfg, dm, nil) + + ccM := &ccMock{ + calls: map[string]func(args interface{}, reply interface{}) error{ + utils.ThresholdSv1ProcessEvent: func(args, reply interface{}) error { + exp := &ThresholdsArgsProcessEvent{ + ThresholdIDs: []string{"TH1"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: args.(*ThresholdsArgsProcessEvent).CGREvent.ID, + Event: map[string]interface{}{ + utils.EventType: utils.StatUpdate, + utils.StatID: "SQ1", + "testMetricType": time.Duration(time.Hour), + }, + APIOpts: map[string]interface{}{ + utils.MetaEventType: utils.StatUpdate, + }, + }, + } + if !reflect.DeepEqual(exp, args) { + return fmt.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", + utils.ToJSON(exp), utils.ToJSON(args)) + } + return nil + }, + }, + } + rpcInternal := make(chan rpcclient.ClientConnector, 1) + rpcInternal <- ccM + connMgr = NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds): rpcInternal, + }) + + filterS := NewFilterS(cfg, nil, dm) + sS := NewStatService(dm, cfg, filterS, connMgr) + + sqPrf := &StatQueueProfile{ + Tenant: "cgrates.org", + ID: "SQ1", + FilterIDs: []string{"*string:~*req.Account:1001"}, + ActivationInterval: &utils.ActivationInterval{ + ExpiryTime: time.Date(2021, 6, 1, 12, 0, 0, 0, time.UTC), + }, + Weight: 10, + Blocker: true, + QueueLength: 10, + ThresholdIDs: []string{"TH1"}, + MinItems: 5, + Metrics: []*MetricWithFilters{ + { + MetricID: "testMetricType", + }, + }, + } + sq := &StatQueue{ + sqPrfl: sqPrf, + dirty: utils.BoolPointer(false), + Tenant: "cgrates.org", + ID: "SQ1", + SQItems: []SQItem{ + { + EventID: "SqProcessEvent", + }, + }, + SQMetrics: map[string]StatMetric{ + "testMetricType": &StatTCD{ + Sum: time.Minute, + val: utils.DurationPointer(time.Hour), + }, + }, + } + + if err := dm.SetStatQueue(sq); err != nil { + t.Error(err) + } + + sQs := StatQueues{ + sq, + } + + if err := sS.processThresholds(sQs, nil); err != nil { + t.Error(err) + } +} + +func TestStatQueueProcessThresholds2(t *testing.T) { + tmp := Cache + tmpC := config.CgrConfig() + tmpCM := connMgr + defer func() { + Cache = tmp + config.SetCgrConfig(tmpC) + connMgr = tmpCM + }() + + utils.Logger.SetLogLevel(4) + utils.Logger.SetSyslog(nil) + defer func() { + utils.Logger.SetLogLevel(0) + }() + + var buf bytes.Buffer + log.SetOutput(&buf) + defer func() { + log.SetOutput(os.Stderr) + }() + + cfg := config.NewDefaultCGRConfig() + cfg.StatSCfg().ThresholdSConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)} + data := NewInternalDB(nil, nil, true) + dm := NewDataManager(data, cfg.CacheCfg(), nil) + Cache = NewCacheS(cfg, dm, nil) + + ccM := &ccMock{ + calls: map[string]func(args interface{}, reply interface{}) error{ + utils.ThresholdSv1ProcessEvent: func(args, reply interface{}) error { + return utils.ErrExists + }, + }, + } + rpcInternal := make(chan rpcclient.ClientConnector, 1) + rpcInternal <- ccM + connMgr = NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds): rpcInternal, + }) + + filterS := NewFilterS(cfg, nil, dm) + sS := NewStatService(dm, cfg, filterS, connMgr) + + sqPrf := &StatQueueProfile{ + Tenant: "cgrates.org", + ID: "SQ1", + FilterIDs: []string{"*string:~*req.Account:1001"}, + ActivationInterval: &utils.ActivationInterval{ + ExpiryTime: time.Date(2021, 6, 1, 12, 0, 0, 0, time.UTC), + }, + Weight: 10, + Blocker: true, + QueueLength: 10, + ThresholdIDs: []string{"TH1"}, + MinItems: 5, + Metrics: []*MetricWithFilters{ + { + MetricID: "testMetricType", + }, + }, + } + sq := &StatQueue{ + sqPrfl: sqPrf, + dirty: utils.BoolPointer(false), + Tenant: "cgrates.org", + ID: "SQ1", + SQItems: []SQItem{ + { + EventID: "SqProcessEvent", + }, + }, + SQMetrics: map[string]StatMetric{ + "testMetricType": &StatTCD{ + Sum: time.Minute, + val: utils.DurationPointer(time.Hour), + }, + }, + } + + if err := dm.SetStatQueue(sq); err != nil { + t.Error(err) + } + + sQs := StatQueues{ + sq, + } + + expLog := `[WARNING] error: EXISTS` + if err := sS.processThresholds(sQs, nil); err == nil || + err != utils.ErrPartiallyExecuted { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrPartiallyExecuted, err) + } else if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog) { + t.Errorf("expected log <%+v> to be included in <%+v>", expLog, rcvLog) + } +}