mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-25 09:08:45 +05:00
remove connMgr global
This commit is contained in:
committed by
Dan Christian Bogos
parent
f3677f1042
commit
f626c2d2cb
@@ -274,7 +274,7 @@ func TestV1GetItemExpiryTimeFromCacheErr(t *testing.T) {
|
||||
cfg.CacheCfg().RemoteConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.RemoteConnsCfg)}
|
||||
cfg.CacheCfg().Partitions = map[string]*config.CacheParamCfg{}
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
var reply time.Time
|
||||
if err := cacheS.V1GetItemExpiryTime(context.Background(), args, &reply); err == nil || err != utils.ErrNotFound {
|
||||
@@ -302,7 +302,7 @@ func TestV1GetItemErr(t *testing.T) {
|
||||
cfg.CacheCfg().RemoteConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.RemoteConnsCfg)}
|
||||
cfg.CacheCfg().Partitions = map[string]*config.CacheParamCfg{}
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
var reply any
|
||||
if err := cacheS.V1GetItem(context.Background(), args, &reply); err == nil || err != utils.ErrNotFound {
|
||||
@@ -329,7 +329,7 @@ func TestV1GetItemIDsErr(t *testing.T) {
|
||||
cfg.CacheCfg().RemoteConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.RemoteConnsCfg)}
|
||||
cfg.CacheCfg().Partitions = map[string]*config.CacheParamCfg{}
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
var reply []string
|
||||
if err := cacheS.V1GetItemIDs(context.Background(), args, &reply); err == nil || err != utils.ErrNotFound {
|
||||
@@ -523,7 +523,7 @@ func TestCacheSV1ReplicateSet(t *testing.T) {
|
||||
db, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
args := &utils.ArgCacheReplicateSet{
|
||||
Tenant: utils.CGRateSorg,
|
||||
@@ -566,7 +566,7 @@ func TestCacheSV1ReplicateSetErr(t *testing.T) {
|
||||
db, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
fltr := &Filter{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "fltr1",
|
||||
@@ -607,7 +607,7 @@ func TestCacheSCacheDataFromDB(t *testing.T) {
|
||||
db, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
attrs := &utils.AttrReloadCacheWithAPIOpts{
|
||||
Tenant: utils.CGRateSorg,
|
||||
@@ -660,7 +660,7 @@ func TestCacheScacheDataFromDBErrCacheDataFromDB(t *testing.T) {
|
||||
Cache.Clear(nil)
|
||||
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cacheS := NewCacheS(cfg, nil, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, nil, nil, nil)
|
||||
|
||||
attrs := &utils.AttrReloadCacheWithAPIOpts{
|
||||
Tenant: utils.CGRateSorg,
|
||||
@@ -688,7 +688,7 @@ func TestCacheScacheDataFromDBErrGetItemLoadIDs(t *testing.T) {
|
||||
db, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
attrs := &utils.AttrReloadCacheWithAPIOpts{
|
||||
Tenant: utils.CGRateSorg,
|
||||
@@ -722,7 +722,7 @@ func TestCacheSV1LoadCache(t *testing.T) {
|
||||
db, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
attrs := &utils.AttrReloadCacheWithAPIOpts{
|
||||
Tenant: utils.CGRateSorg,
|
||||
@@ -775,7 +775,7 @@ func TestCacheSV1ReloadCache(t *testing.T) {
|
||||
db, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
attrs := &utils.AttrReloadCacheWithAPIOpts{
|
||||
Tenant: utils.CGRateSorg,
|
||||
@@ -832,7 +832,7 @@ func TestCacheSV1RemoveGroup(t *testing.T) {
|
||||
db, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
args := &utils.ArgsGetGroupWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
@@ -875,7 +875,7 @@ func TestV1GetCacheStats(t *testing.T) {
|
||||
db, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
args := &utils.AttrCacheIDsWithAPIOpts{
|
||||
APIOpts: map[string]any{
|
||||
@@ -911,7 +911,7 @@ func TestCacheSV1Clear(t *testing.T) {
|
||||
db, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
args := &utils.AttrCacheIDsWithAPIOpts{
|
||||
APIOpts: map[string]any{
|
||||
@@ -952,7 +952,7 @@ func TestCacheSV1RemoveItems(t *testing.T) {
|
||||
db, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
args := &utils.AttrReloadCacheWithAPIOpts{
|
||||
Tenant: utils.CGRateSorg,
|
||||
@@ -995,7 +995,7 @@ func TestCacheSV1RemoveSingular(t *testing.T) {
|
||||
db, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
args := &utils.ArgsGetCacheItemWithAPIOpts{
|
||||
ArgsGetCacheItem: utils.ArgsGetCacheItem{
|
||||
CacheID: utils.CacheAccounts,
|
||||
@@ -1052,7 +1052,7 @@ func TestCacheSV1GetItemExpiryTime(t *testing.T) {
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
if err := cacheS.Set(context.Background(), utils.CacheAccounts, "itemId", "valinterface", []string{}, true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
@@ -1086,7 +1086,7 @@ func TestV1GetItemSingular(t *testing.T) {
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
if err := cacheS.Set(context.Background(), utils.CacheAccounts, "itemId", "valinterface", []string{}, true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
@@ -1120,7 +1120,7 @@ func TestCacheSV1HasItem(t *testing.T) {
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
if err := cacheS.Set(context.Background(), utils.CacheAccounts, "itemId", "valinterface", []string{}, true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
@@ -1154,7 +1154,7 @@ func TestCacheSV1GetItemIDs(t *testing.T) {
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
if err := cacheS.Set(context.Background(), utils.CacheAccounts, "itemId", "valinterface", []string{}, true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
@@ -1182,7 +1182,7 @@ func TestCacheSGetPrecacheChannel(t *testing.T) {
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
if err := cacheS.Set(context.Background(), utils.MetaAccounts, "itemId", "valinterface", nil, true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
@@ -1216,7 +1216,7 @@ func TestCacheSV1PrecacheStatusDefault(t *testing.T) {
|
||||
Tenant: "cgrates.org",
|
||||
}
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
expArgs := &utils.AttrCacheIDsWithAPIOpts{
|
||||
APIOpts: map[string]any{
|
||||
@@ -1262,7 +1262,7 @@ func TestCacheSV1PrecacheStatusErrUnknownCacheID(t *testing.T) {
|
||||
Tenant: "cgrates.org",
|
||||
}
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
expErr := "unknown cacheID: Inproper ID"
|
||||
var reply map[string]string
|
||||
@@ -1292,7 +1292,7 @@ func TestCacheSV1PrecacheStatusMetaReady(t *testing.T) {
|
||||
Tenant: "cgrates.org",
|
||||
}
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
if err := Cache.Set(context.Background(), utils.MetaAccounts, "itemId", "valinterface", nil, true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
@@ -1337,7 +1337,7 @@ func TestCacheSPrecachePartitions(t *testing.T) {
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
atrPrfl := &utils.AttributeProfile{
|
||||
Tenant: utils.CGRateSorg,
|
||||
@@ -1396,7 +1396,7 @@ func TestCacheSPrecacheErr(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
cacheS := NewCacheS(cfg, nil, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, nil, nil, nil)
|
||||
|
||||
cacheS.Precache(utils.NewSyncedChan())
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
@@ -1423,7 +1423,7 @@ func TestCacheSBeginTransaction(t *testing.T) {
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
expFormat := `........-....-....-....-............`
|
||||
rcv := cacheS.BeginTransaction()
|
||||
@@ -1448,7 +1448,7 @@ func TestCacheSRollbackTransaction(t *testing.T) {
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
expFormat := `........-....-....-....-............`
|
||||
tranId := cacheS.BeginTransaction()
|
||||
@@ -1486,7 +1486,7 @@ func TestCacheSCommitTransaction(t *testing.T) {
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: db}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, nil)
|
||||
|
||||
cacheS := NewCacheS(cfg, dm, connMgr, nil)
|
||||
cacheS := NewCacheS(cfg, dm, nil, nil)
|
||||
|
||||
expFormat := `........-....-....-....-............`
|
||||
tranId := cacheS.BeginTransaction()
|
||||
|
||||
@@ -31,15 +31,13 @@ import (
|
||||
)
|
||||
|
||||
// NewConnManager returns the Connection Manager
|
||||
func NewConnManager(cfg *config.CGRConfig) (cM *ConnManager) {
|
||||
cM = &ConnManager{
|
||||
func NewConnManager(cfg *config.CGRConfig) *ConnManager {
|
||||
return &ConnManager{
|
||||
cfg: cfg,
|
||||
rpcInternal: make(map[string]chan context.ClientConnector),
|
||||
dynIntCh: make(RPCClientSet),
|
||||
connCache: ltcache.NewCache(-1, 0, true, false, nil),
|
||||
}
|
||||
SetConnManager(cM)
|
||||
return
|
||||
}
|
||||
|
||||
// ConnManager handle the RPC connections
|
||||
|
||||
@@ -2574,10 +2574,6 @@ func TestHttpInlineFilter(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFilterTrends(t *testing.T) {
|
||||
tmpConn := connMgr
|
||||
defer func() {
|
||||
connMgr = tmpConn
|
||||
}()
|
||||
clientConn := make(chan context.ClientConnector, 1)
|
||||
clientConn <- &ccMock{
|
||||
calls: map[string]func(ctx *context.Context, args any, reply any) error{
|
||||
@@ -2613,9 +2609,9 @@ func TestFilterTrends(t *testing.T) {
|
||||
},
|
||||
}
|
||||
now3 := time.Now().Add(-time.Second * 3).Format(time.RFC3339)
|
||||
connMgr = NewConnManager(config.NewDefaultCGRConfig())
|
||||
cM := NewConnManager(config.NewDefaultCGRConfig())
|
||||
|
||||
connMgr.rpcInternal = map[string]chan context.ClientConnector{
|
||||
cM.rpcInternal = map[string]chan context.ClientConnector{
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends): clientConn,
|
||||
}
|
||||
testCases := []struct {
|
||||
@@ -2656,7 +2652,7 @@ func TestFilterTrends(t *testing.T) {
|
||||
},
|
||||
}
|
||||
initDP := utils.MapStorage{}
|
||||
dp := NewDynamicDP(context.Background(), connMgr, nil, nil, nil, []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends)}, nil, "cgrates.org", initDP)
|
||||
dp := NewDynamicDP(context.Background(), cM, nil, nil, nil, []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends)}, nil, "cgrates.org", initDP)
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fl, err := NewFilterFromInline("cgrates.org", tc.filter)
|
||||
@@ -2676,10 +2672,6 @@ func TestFilterTrends(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFilterRanking(t *testing.T) {
|
||||
tmpConn := connMgr
|
||||
defer func() {
|
||||
connMgr = tmpConn
|
||||
}()
|
||||
clientConn := make(chan context.ClientConnector, 1)
|
||||
clientConn <- &ccMock{
|
||||
calls: map[string]func(ctx *context.Context, args any, reply any) error{
|
||||
@@ -2703,8 +2695,8 @@ func TestFilterRanking(t *testing.T) {
|
||||
},
|
||||
}}
|
||||
|
||||
connMgr = NewConnManager(config.NewDefaultCGRConfig())
|
||||
connMgr.rpcInternal = map[string]chan context.ClientConnector{
|
||||
cM := NewConnManager(config.NewDefaultCGRConfig())
|
||||
cM.rpcInternal = map[string]chan context.ClientConnector{
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings): clientConn,
|
||||
}
|
||||
now := time.Now().Add(-2 * time.Second).Format(time.RFC3339)
|
||||
@@ -2721,7 +2713,7 @@ func TestFilterRanking(t *testing.T) {
|
||||
}
|
||||
|
||||
initDP := utils.MapStorage{}
|
||||
dp := NewDynamicDP(context.Background(), connMgr, nil, nil, nil, nil, []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings)}, "cgrates.org", initDP)
|
||||
dp := NewDynamicDP(context.Background(), cM, nil, nil, nil, nil, []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings)}, "cgrates.org", initDP)
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fl, err := NewFilterFromInline(dp.tenant, tc.filter)
|
||||
|
||||
@@ -26,15 +26,7 @@ import (
|
||||
|
||||
// this file will contain all the global variable that are used by other subsystems
|
||||
|
||||
var (
|
||||
connMgr *ConnManager
|
||||
httpPstrTransport = config.CgrConfig().HTTPCfg().ClientOpts
|
||||
)
|
||||
|
||||
// SetConnManager is the exported method to set the connectionManager used when operate on an account.
|
||||
func SetConnManager(cm *ConnManager) {
|
||||
connMgr = cm
|
||||
}
|
||||
var httpPstrTransport = config.CgrConfig().HTTPCfg().ClientOpts
|
||||
|
||||
// SetHTTPPstrTransport sets the http transport to be used by the HTTP Poster
|
||||
func SetHTTPPstrTransport(pstrTransport *http.Transport) {
|
||||
|
||||
@@ -1123,11 +1123,9 @@ func TestStatQueueStoreStatQueueCacheSetErr(t *testing.T) {
|
||||
|
||||
tmp := Cache
|
||||
tmpC := config.CgrConfig()
|
||||
tmpCM := connMgr
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
config.SetCgrConfig(tmpC)
|
||||
connMgr = tmpCM
|
||||
}()
|
||||
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
@@ -1137,11 +1135,11 @@ func TestStatQueueStoreStatQueueCacheSetErr(t *testing.T) {
|
||||
config.SetCgrConfig(cfg)
|
||||
data, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: data}, cfg.DbCfg())
|
||||
connMgr = NewConnManager(cfg)
|
||||
dm := NewDataManager(dbCM, cfg, connMgr)
|
||||
Cache = NewCacheS(cfg, dm, connMgr, nil)
|
||||
cM := NewConnManager(cfg)
|
||||
dm := NewDataManager(dbCM, cfg, cM)
|
||||
Cache = NewCacheS(cfg, dm, cM, nil)
|
||||
filterS := NewFilterS(cfg, nil, dm)
|
||||
sS := NewStatService(dm, cfg, filterS, connMgr)
|
||||
sS := NewStatService(dm, cfg, filterS, cM)
|
||||
|
||||
sq := &StatQueue{
|
||||
Tenant: "cgrates.org",
|
||||
@@ -2195,11 +2193,9 @@ func TestStatQueueV1ResetStatQueueUnsupportedMetricType(t *testing.T) {
|
||||
func TestStatQueueProcessThresholdsOKNoThIDs(t *testing.T) {
|
||||
tmp := Cache
|
||||
tmpC := config.CgrConfig()
|
||||
tmpCM := connMgr
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
config.SetCgrConfig(tmpC)
|
||||
connMgr = tmpCM
|
||||
}()
|
||||
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
@@ -2266,11 +2262,9 @@ func TestStatQueueProcessThresholdsOKNoThIDs(t *testing.T) {
|
||||
func TestStatQueueProcessThresholdsOK(t *testing.T) {
|
||||
tmp := Cache
|
||||
tmpC := config.CgrConfig()
|
||||
tmpCM := connMgr
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
config.SetCgrConfig(tmpC)
|
||||
connMgr = tmpCM
|
||||
}()
|
||||
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
@@ -2308,11 +2302,11 @@ func TestStatQueueProcessThresholdsOK(t *testing.T) {
|
||||
}
|
||||
rpcInternal := make(chan birpc.ClientConnector, 1)
|
||||
rpcInternal <- ccM
|
||||
connMgr = NewConnManager(cfg)
|
||||
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), utils.ThresholdSv1, rpcInternal)
|
||||
cM := NewConnManager(cfg)
|
||||
cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), utils.ThresholdSv1, rpcInternal)
|
||||
|
||||
filterS := NewFilterS(cfg, nil, dm)
|
||||
sS := NewStatService(dm, cfg, filterS, connMgr)
|
||||
sS := NewStatService(dm, cfg, filterS, cM)
|
||||
|
||||
sqPrf := &StatQueueProfile{
|
||||
Tenant: "cgrates.org",
|
||||
@@ -2368,12 +2362,10 @@ func TestStatQueueProcessThresholdsOK(t *testing.T) {
|
||||
func TestStatQueueProcessThresholdsErrPartExec(t *testing.T) {
|
||||
tmp := Cache
|
||||
tmpC := config.CgrConfig()
|
||||
tmpCM := connMgr
|
||||
tmpLogger := utils.Logger
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
config.SetCgrConfig(tmpC)
|
||||
connMgr = tmpCM
|
||||
utils.Logger = tmpLogger
|
||||
}()
|
||||
var buf bytes.Buffer
|
||||
@@ -2395,11 +2387,11 @@ func TestStatQueueProcessThresholdsErrPartExec(t *testing.T) {
|
||||
}
|
||||
rpcInternal := make(chan birpc.ClientConnector, 1)
|
||||
rpcInternal <- ccM
|
||||
connMgr = NewConnManager(cfg)
|
||||
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), utils.ThresholdSv1, rpcInternal)
|
||||
cM := NewConnManager(cfg)
|
||||
cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), utils.ThresholdSv1, rpcInternal)
|
||||
|
||||
filterS := NewFilterS(cfg, nil, dm)
|
||||
sS := NewStatService(dm, cfg, filterS, connMgr)
|
||||
sS := NewStatService(dm, cfg, filterS, cM)
|
||||
|
||||
sqPrf := &StatQueueProfile{
|
||||
Tenant: "cgrates.org",
|
||||
@@ -2923,11 +2915,9 @@ func TestStatQueueV1GetQueueStringMetricsErrGetStats(t *testing.T) {
|
||||
func TestStatQueueStoreStatQueueStoreIntervalDisabled(t *testing.T) {
|
||||
tmp := Cache
|
||||
tmpC := config.CgrConfig()
|
||||
tmpCM := connMgr
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
config.SetCgrConfig(tmpC)
|
||||
connMgr = tmpCM
|
||||
}()
|
||||
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
@@ -2935,11 +2925,11 @@ func TestStatQueueStoreStatQueueStoreIntervalDisabled(t *testing.T) {
|
||||
config.SetCgrConfig(cfg)
|
||||
data, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: data}, cfg.DbCfg())
|
||||
connMgr = NewConnManager(cfg)
|
||||
dm := NewDataManager(dbCM, cfg, connMgr)
|
||||
Cache = NewCacheS(cfg, dm, connMgr, nil)
|
||||
cM := NewConnManager(cfg)
|
||||
dm := NewDataManager(dbCM, cfg, cM)
|
||||
Cache = NewCacheS(cfg, dm, cM, nil)
|
||||
filterS := NewFilterS(cfg, nil, dm)
|
||||
sS := NewStatService(dm, cfg, filterS, connMgr)
|
||||
sS := NewStatService(dm, cfg, filterS, cM)
|
||||
|
||||
sq := &StatQueue{
|
||||
Tenant: "cgrates.org",
|
||||
|
||||
@@ -1406,7 +1406,6 @@ func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) {
|
||||
utils.Logger = utils.NewStdLoggerWithWriter(&buf, "", 4)
|
||||
tmp := config.CgrConfig()
|
||||
tmpC := Cache
|
||||
tmpCMgr := connMgr
|
||||
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfg.RPCConns()["test"] = &config.RPCConn{Conns: []*config.RemoteHost{{}}}
|
||||
@@ -1414,15 +1413,14 @@ func TestThresholdsProcessEventMaxHitsDMErr(t *testing.T) {
|
||||
cfg.CacheCfg().Partitions[utils.CacheThresholds].Replicate = true
|
||||
config.SetCgrConfig(cfg)
|
||||
data, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
connMgr = NewConnManager(cfg)
|
||||
cM := NewConnManager(cfg)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: data}, cfg.DbCfg())
|
||||
dm := NewDataManager(dbCM, cfg, connMgr)
|
||||
dm := NewDataManager(dbCM, cfg, cM)
|
||||
filterS := NewFilterS(cfg, nil, dm)
|
||||
tS := NewThresholdService(nil, cfg, filterS, connMgr)
|
||||
Cache = NewCacheS(cfg, dm, connMgr, nil)
|
||||
tS := NewThresholdService(nil, cfg, filterS, cM)
|
||||
Cache = NewCacheS(cfg, dm, cM, nil)
|
||||
|
||||
defer func() {
|
||||
connMgr = tmpCMgr
|
||||
Cache = tmpC
|
||||
config.SetCgrConfig(tmp)
|
||||
utils.Logger = tmpLogger
|
||||
@@ -2724,11 +2722,9 @@ func TestThresholdsStoreThresholdCacheSetErr(t *testing.T) {
|
||||
|
||||
tmp := Cache
|
||||
tmpC := config.CgrConfig()
|
||||
tmpCM := connMgr
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
config.SetCgrConfig(tmpC)
|
||||
connMgr = tmpCM
|
||||
utils.Logger = tmpLogger
|
||||
}()
|
||||
|
||||
@@ -2739,11 +2735,11 @@ func TestThresholdsStoreThresholdCacheSetErr(t *testing.T) {
|
||||
config.SetCgrConfig(cfg)
|
||||
data, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: data}, cfg.DbCfg())
|
||||
connMgr = NewConnManager(cfg)
|
||||
dm := NewDataManager(dbCM, cfg, connMgr)
|
||||
Cache = NewCacheS(cfg, dm, connMgr, nil)
|
||||
cM := NewConnManager(cfg)
|
||||
dm := NewDataManager(dbCM, cfg, cM)
|
||||
Cache = NewCacheS(cfg, dm, cM, nil)
|
||||
filterS := NewFilterS(cfg, nil, dm)
|
||||
tS := NewThresholdService(dm, cfg, filterS, connMgr)
|
||||
tS := NewThresholdService(dm, cfg, filterS, cM)
|
||||
|
||||
th := &Threshold{
|
||||
Tenant: "cgrates.org",
|
||||
@@ -3236,12 +3232,9 @@ func TestThresholdsV1ResetThresholdStoreErr(t *testing.T) {
|
||||
|
||||
tmp := Cache
|
||||
tmpC := config.CgrConfig()
|
||||
tmpCM := connMgr
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
config.SetCgrConfig(tmpC)
|
||||
connMgr = tmpCM
|
||||
|
||||
}()
|
||||
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
@@ -3252,11 +3245,11 @@ func TestThresholdsV1ResetThresholdStoreErr(t *testing.T) {
|
||||
config.SetCgrConfig(cfg)
|
||||
data, _ := NewInternalDB(nil, nil, nil, cfg.DbCfg().Items)
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: data}, cfg.DbCfg())
|
||||
connMgr = NewConnManager(cfg)
|
||||
dm := NewDataManager(dbCM, cfg, connMgr)
|
||||
Cache = NewCacheS(cfg, dm, connMgr, nil)
|
||||
cM := NewConnManager(cfg)
|
||||
dm := NewDataManager(dbCM, cfg, cM)
|
||||
Cache = NewCacheS(cfg, dm, cM, nil)
|
||||
filterS := NewFilterS(cfg, nil, dm)
|
||||
tS := NewThresholdService(dm, cfg, filterS, connMgr)
|
||||
tS := NewThresholdService(dm, cfg, filterS, cM)
|
||||
th := &Threshold{
|
||||
Hits: 2,
|
||||
Tenant: "cgrates.org",
|
||||
|
||||
@@ -33,10 +33,9 @@ import (
|
||||
)
|
||||
|
||||
func TestTPReaderCallCacheNoCaching(t *testing.T) {
|
||||
tmp1, tmp2 := connMgr, Cache
|
||||
tmpCache := Cache
|
||||
defer func() {
|
||||
connMgr = tmp1
|
||||
Cache = tmp2
|
||||
Cache = tmpCache
|
||||
}()
|
||||
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
@@ -60,10 +59,9 @@ func TestTPReaderCallCacheNoCaching(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTPReaderCallCacheReloadCacheFirstCallErr(t *testing.T) {
|
||||
tmp1, tmp2 := connMgr, Cache
|
||||
tmpCache := Cache
|
||||
defer func() {
|
||||
connMgr = tmp1
|
||||
Cache = tmp2
|
||||
Cache = tmpCache
|
||||
}()
|
||||
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
@@ -126,10 +124,9 @@ func TestTPReaderCallCacheReloadCacheFirstCallErr(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTPReaderCallCacheReloadCacheSecondCallErr(t *testing.T) {
|
||||
tmp1, tmp2 := connMgr, Cache
|
||||
tmpCache := Cache
|
||||
defer func() {
|
||||
connMgr = tmp1
|
||||
Cache = tmp2
|
||||
Cache = tmpCache
|
||||
}()
|
||||
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
@@ -207,10 +204,9 @@ func TestTPReaderCallCacheReloadCacheSecondCallErr(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTPReaderCallCacheLoadCache(t *testing.T) {
|
||||
tmp1, tmp2 := connMgr, Cache
|
||||
tmpCache := Cache
|
||||
defer func() {
|
||||
connMgr = tmp1
|
||||
Cache = tmp2
|
||||
Cache = tmpCache
|
||||
}()
|
||||
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
@@ -277,10 +273,9 @@ func TestTPReaderCallCacheLoadCache(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTPReaderCallCacheRemoveItems(t *testing.T) {
|
||||
tmp1, tmp2 := connMgr, Cache
|
||||
tmpCache := Cache
|
||||
defer func() {
|
||||
connMgr = tmp1
|
||||
Cache = tmp2
|
||||
Cache = tmpCache
|
||||
}()
|
||||
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
@@ -347,10 +342,9 @@ func TestTPReaderCallCacheRemoveItems(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTPReaderCallCacheClear(t *testing.T) {
|
||||
tmp1, tmp2 := connMgr, Cache
|
||||
tmpCache := Cache
|
||||
defer func() {
|
||||
connMgr = tmp1
|
||||
Cache = tmp2
|
||||
Cache = tmpCache
|
||||
}()
|
||||
|
||||
defaultCfg := config.NewDefaultCGRConfig()
|
||||
|
||||
@@ -39,6 +39,7 @@ var (
|
||||
// Globals used
|
||||
dataDbCsv *DataManager // Each dataDb will have it's own sources to collect data
|
||||
lCfg *config.CGRConfig
|
||||
loaderConnMgr *ConnManager
|
||||
loader *TpReader
|
||||
loaderConfigDIR string
|
||||
loaderCfgPath string
|
||||
@@ -113,8 +114,8 @@ func testLoaderITInitDataDB(t *testing.T) {
|
||||
return strings.TrimPrefix(key, "V1")
|
||||
})
|
||||
cacheChan <- srv
|
||||
connMgr = NewConnManager(lCfg)
|
||||
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches), utils.CacheSv1, cacheChan)
|
||||
loaderConnMgr = NewConnManager(lCfg)
|
||||
loaderConnMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches), utils.CacheSv1, cacheChan)
|
||||
}
|
||||
|
||||
// Loads data from csv files in tp scenario to dataDbCsv
|
||||
@@ -131,7 +132,7 @@ func testLoaderITRemoveLoad(t *testing.T) {
|
||||
}
|
||||
dbCM := NewDBConnManager(dataDbCsv.DataDB(), lCfg.DbCfg())
|
||||
loader, err = NewTpReader(dbCM, csvStorage, "", "",
|
||||
[]string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)}, nil, connMgr)
|
||||
[]string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)}, nil, loaderConnMgr)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@@ -178,7 +179,7 @@ func testLoaderITLoadFromCSV(t *testing.T) {
|
||||
}
|
||||
dbCM := NewDBConnManager(dataDbCsv.DataDB(), lCfg.DbCfg())
|
||||
loader, err = NewTpReader(dbCM, csvStorage, "", "",
|
||||
[]string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)}, nil, connMgr)
|
||||
[]string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)}, nil, loaderConnMgr)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -68,7 +68,6 @@ func (s *ConnManagerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceR
|
||||
// Shutdown stops the service.
|
||||
func (s *ConnManagerService) Shutdown(_ *servmanager.ServiceRegistry) error {
|
||||
s.connMgr = nil
|
||||
engine.SetConnManager(nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1485,7 +1485,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), connMgr)
|
||||
engine.Cache = cacheS
|
||||
connMgr = engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): sMock})
|
||||
engine.SetConnManager(connMgr)
|
||||
|
||||
if _, err := sessions.processChargerS(cgrEv); err == nil || err != utils.ErrNotImplemented {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrNotImplemented, err)
|
||||
@@ -1632,7 +1631,6 @@ func TestLibsessionsSetMockErrors(t *testing.T) {
|
||||
engine.Cache = cacheS
|
||||
connMgr := engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): chanInternal})
|
||||
engine.SetConnManager(connMgr)
|
||||
|
||||
procIndt, err := NewProcessedIdentity("eyJhbGciOiJFUzI1NiIsInBwdCI6InNoYWtlbiIsInR5cCI6InBhc3Nwb3J0IiwieDV1IjoiaHR0cHM6Ly93d3cuZXhhbXBsZS5vcmcvY2VydC5jZXIifQ.eyJhdHRlc3QiOiJBIiwiZGVzdCI6eyJ0biI6WyIxMDAyIl19LCJpYXQiOjE1ODcwMTk4MjIsIm9yaWciOnsidG4iOiIxMDAxIn0sIm9yaWdpZCI6IjEyMzQ1NiJ9.4ybtWmgqdkNyJLS9Iv3PuJV8ZxR7yZ_NEBhCpKCEu2WBiTchqwoqoWpI17Q_ALm38tbnpay32t95ZY_LhSgwJg;info=<https://www.example.org/cert.cer>;ppt=shaken")
|
||||
if err != nil {
|
||||
@@ -1723,7 +1721,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), connMgr)
|
||||
engine.Cache = cacheS
|
||||
connMgr = engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): chanInternal})
|
||||
engine.SetConnManager(connMgr)
|
||||
sessions.aSessions = map[string]*Session{
|
||||
"ORIGIN_ID": {},
|
||||
}
|
||||
@@ -1951,7 +1948,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), connMgr)
|
||||
engine.Cache = cacheS
|
||||
connMgr = engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): chanInternal})
|
||||
engine.SetConnManager(connMgr)
|
||||
|
||||
sessions.cgrCfg.SessionSCfg().ChargerSConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers)}
|
||||
|
||||
@@ -4283,7 +4279,6 @@ dm := engine.NewDataManager(dbCM, cfg.CacheCfg(), connMgr)
|
||||
}
|
||||
cacheS := engine.NewCacheS(cfg, nil, nil)
|
||||
engine.Cache = cacheS
|
||||
engine.SetConnManager(connMgr)
|
||||
expected = "RALS_ERROR:NOT_IMPLEMENTED"
|
||||
if err := sessions.BiRPCv1ProcessEvent(nil, args, &reply); err == nil || err.Error() != expected {
|
||||
t.Errorf("Exepected %+v, received %+v", expected, err)
|
||||
@@ -4673,7 +4668,6 @@ func TestSyncSessionsSync(t *testing.T) {
|
||||
sessions.cfg.GeneralCfg().ReplyTimeout = 1
|
||||
cacheS := engine.NewCacheS(cfg, nil, nil, nil)
|
||||
engine.Cache = cacheS
|
||||
engine.SetConnManager(connMgr)
|
||||
sessions.aSessions = map[string]*Session{}
|
||||
var reply string
|
||||
if err := sessions.BiRPCv1SyncSessions(nil, nil, &reply); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user