diff --git a/engine/libstats_test.go b/engine/libstats_test.go
index 53a70c9ab..242791817 100644
--- a/engine/libstats_test.go
+++ b/engine/libstats_test.go
@@ -697,7 +697,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) {
}
}
-func TestLibstatsSqID(t *testing.T) {
+func TestStatQueueSqID(t *testing.T) {
ssq := &StoredStatQueue{
ID: "testID",
Tenant: "testTenant",
@@ -777,7 +777,7 @@ func (sMM *statMetricMock) Clone() StatMetric {
return &statMetricMock{testcase: sMM.testcase}
}
-func TestLibstatsNewStoredStatQueue(t *testing.T) {
+func TestStatQueueNewStoredStatQueue(t *testing.T) {
sq := &StatQueue{
SQMetrics: map[string]StatMetric{
"key": &statMetricMock{},
@@ -797,7 +797,7 @@ func TestLibstatsNewStoredStatQueue(t *testing.T) {
}
}
-func TestLibstatsAsStatQueueNilStoredSq(t *testing.T) {
+func TestStatQueueAsStatQueueNilStoredSq(t *testing.T) {
var ssq *StoredStatQueue
var ms Marshaler
@@ -812,7 +812,7 @@ func TestLibstatsAsStatQueueNilStoredSq(t *testing.T) {
}
}
-func TestLibstatsAsStatQueueSuccess(t *testing.T) {
+func TestStatQueueAsStatQueueSuccess(t *testing.T) {
ssq := &StoredStatQueue{
SQItems: []SQItem{
{
@@ -841,7 +841,7 @@ func TestLibstatsAsStatQueueSuccess(t *testing.T) {
}
}
-func TestLibstatsAsStatQueueUnsupportedMetric(t *testing.T) {
+func TestStatQueueAsStatQueueUnsupportedMetric(t *testing.T) {
ssq := &StoredStatQueue{
SQItems: []SQItem{
{
@@ -866,7 +866,7 @@ func TestLibstatsAsStatQueueUnsupportedMetric(t *testing.T) {
}
}
-func TestLibstatsAsStatQueueErrLoadMarshaled(t *testing.T) {
+func TestStatQueueAsStatQueueErrLoadMarshaled(t *testing.T) {
ssq := &StoredStatQueue{
SQItems: []SQItem{
{
@@ -895,7 +895,7 @@ func TestLibstatsAsStatQueueErrLoadMarshaled(t *testing.T) {
}
}
-func TestLibstatsAsStatQueueOK(t *testing.T) {
+func TestStatQueueAsStatQueueOK(t *testing.T) {
ms, err := NewMarshaler(utils.JSON)
if err != nil {
t.Fatal(err)
@@ -939,7 +939,7 @@ func TestLibstatsAsStatQueueOK(t *testing.T) {
}
}
-func TestLibstatsNewStatQueue(t *testing.T) {
+func TestStatQueueNewStatQueue(t *testing.T) {
tnt := "tenant"
id := "id"
metrics := []*MetricWithFilters{
@@ -968,7 +968,7 @@ func TestLibstatsNewStatQueue(t *testing.T) {
}
}
-func TestLibstatsProcessEventremExpiredErr(t *testing.T) {
+func TestStatQueueProcessEventremExpiredErr(t *testing.T) {
tnt, evID := "tenant", "eventID"
filters := &FilterS{}
expiry := time.Date(2021, 1, 1, 23, 59, 59, 10, time.UTC)
@@ -1001,7 +1001,7 @@ func TestLibstatsProcessEventremExpiredErr(t *testing.T) {
}
}
-func TestLibstatsProcessEventremOnQueueLengthErr(t *testing.T) {
+func TestStatQueueProcessEventremOnQueueLengthErr(t *testing.T) {
tnt, evID := "tenant", "eventID"
filters := &FilterS{}
evNm := utils.MapStorage{
@@ -1032,7 +1032,7 @@ func TestLibstatsProcessEventremOnQueueLengthErr(t *testing.T) {
}
}
-func TestLibstatsProcessEventaddStatEvent(t *testing.T) {
+func TestStatQueueProcessEventaddStatEvent(t *testing.T) {
tnt, evID := "tenant", "eventID"
filters := &FilterS{}
evNm := utils.MapStorage{
@@ -1061,7 +1061,7 @@ func TestLibstatsProcessEventaddStatEvent(t *testing.T) {
}
}
-func TestLibstatsCompress(t *testing.T) {
+func TestStatQueueCompress(t *testing.T) {
sm, err := NewStatMetric(utils.MetaTCD, 0, []string{"*string:~*req.Account:1001"})
if err != nil {
t.Fatal(err)
@@ -1143,7 +1143,7 @@ func TestLibstatsCompress(t *testing.T) {
// }
}
-func TestLibstatsaddStatEventPassErr(t *testing.T) {
+func TestStatQueueaddStatEventPassErr(t *testing.T) {
sq := &StatQueue{
SQMetrics: map[string]StatMetric{
utils.MetaTCD: &statMetricMock{
@@ -1177,7 +1177,7 @@ func TestLibstatsaddStatEventPassErr(t *testing.T) {
}
}
-func TestLibstatsaddStatEventNoPass(t *testing.T) {
+func TestStatQueueaddStatEventNoPass(t *testing.T) {
sm, err := NewStatMetric(utils.MetaTCD, 0, []string{"*string:~*req.Account:1001"})
if err != nil {
t.Fatal(err)
@@ -1267,3 +1267,73 @@ func TestStatQueueWithAPIOptsJSONMarshall(t *testing.T) {
}
}
+
+func TestStatQueueLockUnlockStatQueueProfiles(t *testing.T) {
+ sqPrf := &StatQueueProfile{
+ Tenant: "cgrates.org",
+ ID: "SQ1",
+ Weight: 10,
+ QueueLength: 10,
+ }
+
+ //lock profile with empty lkID parameter
+ sqPrf.lock(utils.EmptyString)
+
+ if !sqPrf.isLocked() {
+ t.Fatal("expected profile to be locked")
+ } else if sqPrf.lkID == utils.EmptyString {
+ t.Fatal("expected struct field \"lkID\" to be non-empty")
+ }
+
+ //unlock previously locked profile
+ sqPrf.unlock()
+
+ if sqPrf.isLocked() {
+ t.Fatal("expected profile to be unlocked")
+ } else if sqPrf.lkID != utils.EmptyString {
+ t.Fatal("expected struct field \"lkID\" to be empty")
+ }
+
+ //unlock an already unlocked profile - nothing happens
+ sqPrf.unlock()
+
+ if sqPrf.isLocked() {
+ t.Fatal("expected profile to be unlocked")
+ } else if sqPrf.lkID != utils.EmptyString {
+ t.Fatal("expected struct field \"lkID\" to be empty")
+ }
+}
+
+func TestStatQueueLockUnlockStatQueues(t *testing.T) {
+ sq := &StatQueue{
+ Tenant: "cgrates.org",
+ ID: "SQ1",
+ }
+
+ //lock resource with empty lkID parameter
+ sq.lock(utils.EmptyString)
+
+ if !sq.isLocked() {
+ t.Fatal("expected resource to be locked")
+ } else if sq.lkID == utils.EmptyString {
+ t.Fatal("expected struct field \"lkID\" to be non-empty")
+ }
+
+ //unlock previously locked resource
+ sq.unlock()
+
+ if sq.isLocked() {
+ t.Fatal("expected resource to be unlocked")
+ } else if sq.lkID != utils.EmptyString {
+ t.Fatal("expected struct field \"lkID\" to be empty")
+ }
+
+ //unlock an already unlocked resource - nothing happens
+ sq.unlock()
+
+ if sq.isLocked() {
+ t.Fatal("expected resource to be unlocked")
+ } else if sq.lkID != utils.EmptyString {
+ t.Fatal("expected struct field \"lkID\" to be empty")
+ }
+}
diff --git a/engine/stats_test.go b/engine/stats_test.go
index b983b6f35..3f36be356 100644
--- a/engine/stats_test.go
+++ b/engine/stats_test.go
@@ -18,14 +18,19 @@ along with this program. If not, see
package engine
import (
+ "bytes"
"fmt"
+ "log"
+ "os"
"reflect"
+ "strings"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/rpcclient"
)
func TestNewStatService(t *testing.T) {
@@ -1462,3 +1467,307 @@ func TestStatQueueMatchingStatQueuesForEventLocks4(t *testing.T) {
}
}
+
+func TestStatQueueReload(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ cfg.StatSCfg().StoreInterval = 5 * time.Millisecond
+ data := NewInternalDB(nil, nil, true)
+ dm := NewDataManager(data, cfg.CacheCfg(), nil)
+ filterS := NewFilterS(cfg, nil, dm)
+ sS := &StatService{
+ dm: dm,
+ filterS: filterS,
+ stopBackup: make(chan struct{}),
+ loopStopped: make(chan struct{}, 1),
+ cgrcfg: cfg,
+ }
+ sS.loopStopped <- struct{}{}
+ sS.Reload()
+ close(sS.stopBackup)
+}
+
+func TestStatQueueStartLoop(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ cfg.StatSCfg().StoreInterval = -1
+ data := NewInternalDB(nil, nil, true)
+ dm := NewDataManager(data, cfg.CacheCfg(), nil)
+ filterS := NewFilterS(cfg, nil, dm)
+ sS := &StatService{
+ dm: dm,
+ filterS: filterS,
+ stopBackup: make(chan struct{}),
+ loopStopped: make(chan struct{}, 1),
+ cgrcfg: cfg,
+ }
+
+ sS.StartLoop()
+ time.Sleep(10 * time.Millisecond)
+
+ if len(sS.loopStopped) != 1 {
+ t.Errorf("expected loopStopped field to have only one element, received: <%+v>", len(sS.loopStopped))
+ }
+}
+
+func TestStatQueueShutdown(t *testing.T) {
+ utils.Logger.SetLogLevel(6)
+ utils.Logger.SetSyslog(nil)
+
+ var buf bytes.Buffer
+ log.SetOutput(&buf)
+ defer func() {
+ log.SetOutput(os.Stderr)
+ }()
+
+ cfg := config.NewDefaultCGRConfig()
+ data := NewInternalDB(nil, nil, true)
+ dm := NewDataManager(data, cfg.CacheCfg(), nil)
+ sS := NewStatService(dm, cfg, nil, nil)
+
+ expLog1 := `[INFO] service shutdown initialized`
+ expLog2 := `[INFO] service shutdown complete`
+ sS.Shutdown()
+
+ if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog1) ||
+ !strings.Contains(rcvLog, expLog2) {
+ t.Errorf("expected logs <%+v> and <%+v> \n to be included in <%+v>",
+ expLog1, expLog2, rcvLog)
+ }
+ utils.Logger.SetLogLevel(0)
+}
+
+func TestStatQueueStoreStatsOK(t *testing.T) {
+ tmp := Cache
+ defer func() {
+ Cache = tmp
+ }()
+
+ cfg := config.NewDefaultCGRConfig()
+ data := NewInternalDB(nil, nil, true)
+ dm := NewDataManager(data, cfg.CacheCfg(), nil)
+ sS := NewStatService(dm, cfg, nil, nil)
+
+ exp := &StatQueue{
+ dirty: utils.BoolPointer(true),
+ Tenant: "cgrates.org",
+ ID: "SQ1",
+ }
+ Cache.SetWithoutReplicate(utils.CacheStatQueues, "cgrates.org:SQ1", exp, nil, true,
+ utils.NonTransactional)
+ sS.storedStatQueues.Add("cgrates.org:SQ1")
+ sS.storeStats()
+
+ if rcv, err := sS.dm.GetStatQueue("cgrates.org", "SQ1", true, false,
+ utils.NonTransactional); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(rcv, exp) {
+ t.Errorf("expected: <%+v>, \nreceived: <%+v>",
+ utils.ToJSON(exp), utils.ToJSON(rcv))
+ }
+
+ Cache.Remove(utils.CacheStatQueues, "cgrates.org:SQ1", true, utils.NonTransactional)
+}
+
+func TestStatQueueStoreStatsStoreSQErr(t *testing.T) {
+ tmp := Cache
+ defer func() {
+ Cache = tmp
+ }()
+
+ utils.Logger.SetLogLevel(4)
+ utils.Logger.SetSyslog(nil)
+
+ var buf bytes.Buffer
+ log.SetOutput(&buf)
+ defer func() {
+ log.SetOutput(os.Stderr)
+ }()
+
+ cfg := config.NewDefaultCGRConfig()
+ sS := NewStatService(nil, cfg, nil, nil)
+
+ value := &StatQueue{
+ dirty: utils.BoolPointer(true),
+ Tenant: "cgrates.org",
+ ID: "SQ1",
+ }
+
+ Cache.SetWithoutReplicate(utils.CacheStatQueues, "SQ1", value, nil, true,
+ utils.NonTransactional)
+ sS.storedStatQueues.Add("SQ1")
+ exp := utils.StringSet{
+ "SQ1": struct{}{},
+ }
+ expLog := `[WARNING] failed saving StatQueue with ID: cgrates.org:SQ1, error: NO_DATABASE_CONNECTION`
+ sS.storeStats()
+
+ if !reflect.DeepEqual(sS.storedStatQueues, exp) {
+ t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, sS.storedStatQueues)
+ }
+ if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog) {
+ t.Errorf("expected log <%+v>\n to be in included in: <%+v>", expLog, rcvLog)
+ }
+
+ utils.Logger.SetLogLevel(0)
+ Cache.Remove(utils.CacheStatQueues, "SQ1", true, utils.NonTransactional)
+}
+
+func TestStatQueueStoreStatsCacheGetErr(t *testing.T) {
+ tmp := Cache
+ defer func() {
+ Cache = tmp
+ }()
+
+ utils.Logger.SetLogLevel(4)
+ utils.Logger.SetSyslog(nil)
+
+ var buf bytes.Buffer
+ log.SetOutput(&buf)
+ defer func() {
+ log.SetOutput(os.Stderr)
+ }()
+
+ cfg := config.NewDefaultCGRConfig()
+ data := NewInternalDB(nil, nil, true)
+ dm := NewDataManager(data, cfg.CacheCfg(), nil)
+ sS := NewStatService(dm, cfg, nil, nil)
+
+ value := &StatQueue{
+ dirty: utils.BoolPointer(true),
+ Tenant: "cgrates.org",
+ ID: "SQ1",
+ }
+
+ Cache.SetWithoutReplicate(utils.CacheStatQueues, "SQ2", value, nil, true,
+ utils.NonTransactional)
+ sS.storedStatQueues.Add("SQ1")
+ expLog := `[WARNING] failed retrieving from cache stat queue with ID: SQ1`
+ sS.storeStats()
+
+ if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog) {
+ t.Errorf("expected <%+v> \nto be included in: <%+v>", expLog, rcvLog)
+ }
+
+ utils.Logger.SetLogLevel(0)
+ Cache.Remove(utils.CacheStatQueues, "SQ2", true, utils.NonTransactional)
+}
+
+func TestStatQueueStoreStatQueueCacheSetErr(t *testing.T) {
+ utils.Logger.SetLogLevel(4)
+ utils.Logger.SetSyslog(nil)
+
+ var buf bytes.Buffer
+ log.SetOutput(&buf)
+ defer func() {
+ log.SetOutput(os.Stderr)
+ }()
+
+ tmp := Cache
+ tmpC := config.CgrConfig()
+ tmpCM := connMgr
+ defer func() {
+ Cache = tmp
+ config.SetCgrConfig(tmpC)
+ connMgr = tmpCM
+ }()
+
+ cfg := config.NewDefaultCGRConfig()
+ cfg.CacheCfg().ReplicationConns = []string{"test"}
+ cfg.CacheCfg().Partitions[utils.CacheStatQueues].Replicate = true
+ cfg.RPCConns()["test"] = &config.RPCConn{Conns: []*config.RemoteHost{{}}}
+ config.SetCgrConfig(cfg)
+ data := NewInternalDB(nil, nil, true)
+ dm := NewDataManager(data, cfg.CacheCfg(), nil)
+ connMgr = NewConnManager(cfg, make(map[string]chan rpcclient.ClientConnector))
+ Cache = NewCacheS(cfg, dm, nil)
+ filterS := NewFilterS(cfg, nil, dm)
+ sS := NewStatService(dm, cfg, filterS, connMgr)
+
+ sq := &StatQueue{
+ Tenant: "cgrates.org",
+ ID: "SQ1",
+ dirty: utils.BoolPointer(true),
+ }
+
+ expLog := `[WARNING] failed caching StatQueue with ID: cgrates.org:SQ1, error: DISCONNECTED`
+ if err := sS.StoreStatQueue(sq); err == nil ||
+ err.Error() != utils.ErrDisconnected.Error() {
+ t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrDisconnected, err)
+ } else if rcv := buf.String(); !strings.Contains(rcv, expLog) {
+ t.Errorf("expected log <%+v> to be included in <%+v>", expLog, rcv)
+ }
+
+ utils.Logger.SetLogLevel(0)
+}
+
+func TestStatQueueStoreThresholdNilDirtyField(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ data := NewInternalDB(nil, nil, true)
+ dm := NewDataManager(data, cfg.CacheCfg(), nil)
+ sS := NewStatService(dm, cfg, nil, nil)
+
+ sq := &StatQueue{
+ Tenant: "cgrates.org",
+ ID: "SQ1",
+ }
+
+ if err := sS.StoreStatQueue(sq); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestStatQueueSetCloneable(t *testing.T) {
+ args := &StatsArgsProcessEvent{
+ StatIDs: []string{"SQ_ID"},
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "EventTest",
+ Event: map[string]interface{}{},
+ },
+ clnb: false,
+ }
+
+ exp := &StatsArgsProcessEvent{
+ StatIDs: []string{"SQ_ID"},
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "EventTest",
+ Event: map[string]interface{}{},
+ },
+ clnb: true,
+ }
+ args.SetCloneable(true)
+
+ if !reflect.DeepEqual(args, exp) {
+ t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, args)
+ }
+}
+
+func TestStatQueueRPCClone(t *testing.T) {
+ args := &StatsArgsProcessEvent{
+ StatIDs: []string{"SQ_ID"},
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "EventTest",
+ Event: make(map[string]interface{}),
+ APIOpts: make(map[string]interface{}),
+ },
+ clnb: true,
+ }
+
+ exp := &StatsArgsProcessEvent{
+ StatIDs: []string{"SQ_ID"},
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "EventTest",
+ Event: make(map[string]interface{}),
+ APIOpts: make(map[string]interface{}),
+ },
+ }
+
+ if out, err := args.RPCClone(); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(out.(*StatsArgsProcessEvent), exp) {
+ t.Errorf("expected: <%T>, \nreceived: <%T>",
+ args, exp)
+ }
+}
diff --git a/engine/thresholds_test.go b/engine/thresholds_test.go
index a4c4594d3..36e63630f 100644
--- a/engine/thresholds_test.go
+++ b/engine/thresholds_test.go
@@ -2840,7 +2840,7 @@ func TestThresholdsV1ResetThresholdNegativeStoreIntervalErr(t *testing.T) {
func TestThresholdsLockUnlockThresholdProfiles(t *testing.T) {
thPrf := &ThresholdProfile{
Tenant: "cgrates.org",
- ID: "thPrf",
+ ID: "TH1",
Weight: 10,
MaxHits: 5,
MinHits: 2,
@@ -2877,7 +2877,7 @@ func TestThresholdsLockUnlockThresholdProfiles(t *testing.T) {
func TestThresholdsLockUnlockThresholds(t *testing.T) {
th := &Threshold{
Tenant: "cgrates.org",
- ID: "thPrf",
+ ID: "TH1",
}
//lock resource with empty lkID parameter
diff --git a/engine/z_resources_test.go b/engine/z_resources_test.go
index c74c40b96..55ffd915a 100644
--- a/engine/z_resources_test.go
+++ b/engine/z_resources_test.go
@@ -6095,7 +6095,7 @@ func TestResourceMatchingResourcesForEventLocks3(t *testing.T) {
func TestResourcesLockUnlockResourceProfiles(t *testing.T) {
rp := &ResourceProfile{
Tenant: "cgrates.org",
- ID: "rsPrf",
+ ID: "RES1",
Limit: 10,
AllocationMessage: "Approved",
Weight: 10,
@@ -6133,7 +6133,7 @@ func TestResourcesLockUnlockResourceProfiles(t *testing.T) {
func TestResourcesLockUnlockResources(t *testing.T) {
rs := &Resource{
Tenant: "cgrates.org",
- ID: "rsPrf",
+ ID: "RES1",
}
//lock resource with empty lkID parameter