From 4a201c6f9edf6d50e922cf87e9651bfcd41592d0 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 4 Dec 2019 07:29:14 -0500 Subject: [PATCH] Remove prefix when add items in internalDB --- agents/diam_it_test.go | 2 +- apier/v1/cdrs_it_test.go | 1 + cmd/cgr-engine/cgr-engine.go | 6 +- config/cachecfg.go | 44 --------- config/config.go | 60 ++++++++++-- config/config_it_test.go | 9 +- data/conf/samples/cdrsv1internal/cgrates.json | 18 +++- engine/storage_internal_datadb.go | 77 +++++++++++---- engine/storage_internal_stordb.go | 94 +++++++++---------- services/attributes_it_test.go | 9 +- services/cdrs_it_test.go | 2 +- services/chargers_it_test.go | 6 +- services/datadb_it_test.go | 7 +- services/dispatchers_it_test.go | 8 +- services/dnsagent_it_test.go | 5 +- services/ers_it_test.go | 9 +- services/rals_it_test.go | 2 +- services/resources_it_test.go | 2 +- services/schedulers_it_test.go | 2 +- services/sessions_it_test.go | 5 +- services/stats_it_test.go | 2 +- services/suppliers_it_test.go | 2 +- services/thresholds_it_test.go | 2 +- servmanager/servmanager.go | 5 +- 24 files changed, 226 insertions(+), 153 deletions(-) diff --git a/agents/diam_it_test.go b/agents/diam_it_test.go index 517ddbbcb..1a4bcd199 100644 --- a/agents/diam_it_test.go +++ b/agents/diam_it_test.go @@ -95,11 +95,11 @@ func TestDiamItDispatcher(t *testing.T) { t.SkipNow() return } + testDiamItResetAllDB(t) isDispatcherActive = true engine.StartEngine(path.Join(*dataDir, "conf", "samples", "dispatchers", "all"), 200) engine.StartEngine(path.Join(*dataDir, "conf", "samples", "dispatchers", "all2"), 200) diamConfigDIR = "dispatchers/diamagent" - testDiamItResetAllDB(t) for _, stest := range sTestsDiam { t.Run(diamConfigDIR, stest) } diff --git a/apier/v1/cdrs_it_test.go b/apier/v1/cdrs_it_test.go index 9e67cc73f..293371563 100644 --- a/apier/v1/cdrs_it_test.go +++ b/apier/v1/cdrs_it_test.go @@ -147,6 +147,7 @@ func testV1CDRsProcessEventWithRefund(t *testing.T) { utils.Destination: "+4986517174963", utils.AnswerTime: time.Date(2019, 11, 27, 12, 21, 26, 0, time.UTC), utils.Usage: time.Duration(3) * time.Minute, + utils.Subject: "ANY2CNT", }, }, } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index bda2ddbe3..660bb7519 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -555,6 +555,7 @@ func main() { utils.CoreSv1: internalCoreSv1Chan, utils.RALsV1: rals.GetIntenternalChan(), }) + fmt.Println("BEfore srvManager.AddServices") srvManager.AddServices(connManager, attrS, chrS, tS, stS, reS, supS, schS, rals, rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg, services.NewEventReaderService(cfg, filterSChan, exitChan, connManager.GetConnMgr()), @@ -567,9 +568,10 @@ func main() { services.NewHTTPAgent(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), server), // no reload ldrs, anz, dspS, dmService, storDBService, ) - + fmt.Println("After srvManager.AddServices") + fmt.Println("srvManager Start") srvManager.StartServices() - + fmt.Println("srvManager Start success") // Start FilterS go startFilterService(filterSChan, cacheS, stS.GetIntenternalChan(), reS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), diff --git a/config/cachecfg.go b/config/cachecfg.go index 10605911a..6681559d1 100755 --- a/config/cachecfg.go +++ b/config/cachecfg.go @@ -81,47 +81,3 @@ func (cCfg CacheCfg) AsTransCacheConfig() (tcCfg map[string]*ltcache.CacheConfig } return } - -// CGRATES_CFG_JSON_DISABLED_CACHE is used to populate cache config when DataDB is internal -const CGRATES_CFG_JSON_DISABLED_CACHE = ` -{ - -"cache":{ - "*destinations": {"limit": 0}, - "*reverse_destinations": {"limit": 0}, - "*rating_plans": {"limit": 0}, - "*rating_profiles": {"limit": 0}, - "*actions": {"limit": 0}, - "*action_plans": {"limit": 0}, - "*account_action_plans": {"limit": 0}, - "*action_triggers": {"limit": 0}, - "*shared_groups": {"limit": 0}, - "*timings": {"limit": 0}, - "*resource_profiles": {"limit": 0}, - "*resources": {"limit": 0}, - "*event_resources": {"limit": 0}, - "*statqueue_profiles": {"limit": 0}, - "*statqueues": {"limit": 0}, - "*threshold_profiles": {"limit": 0}, - "*thresholds": {"limit": 0}, - "*filters": {"limit": 0}, - "*supplier_profiles": {"limit": 0}, - "*attribute_profiles": {"limit": 0}, - "*charger_profiles": {"limit": 0}, - "*dispatcher_profiles": {"limit": 0}, - "*dispatcher_hosts": {"limit": 0}, - "*resource_filter_indexes" : {"limit": 0}, - "*stat_filter_indexes" : {"limit": 0}, - "*threshold_filter_indexes" : {"limit": 0}, - "*supplier_filter_indexes" : {"limit": 0}, - "*attribute_filter_indexes" : {"limit": 0}, - "*charger_filter_indexes" : {"limit": 0}, - "*dispatcher_filter_indexes" : {"limit": 0}, - "*dispatcher_routes": {"limit": 0}, - "*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false}, // diameter messages caching - "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false}, // RPC responses caching - "*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false}, // closed sessions cached for CDRs - "*load_ids": {"limit": 0}, -}, - -}` diff --git a/config/config.go b/config/config.go index 54f8616c4..706b19334 100755 --- a/config/config.go +++ b/config/config.go @@ -425,17 +425,50 @@ func (cfg *CGRConfig) loadDataDBCfg(jsnCfg *CgrJsonCfg) (err error) { // in case of internalDB we need to disable the cache // so we enforce it here if cfg.dataDbCfg.DataDbType == utils.INTERNAL { - var customCfg *CgrJsonCfg - var cacheJsonCfg *CacheJsonCfg - if customCfg, err = NewCgrJsonCfgFromBytes([]byte(CGRATES_CFG_JSON_DISABLED_CACHE)); err != nil { - return - } - if cacheJsonCfg, err = customCfg.CacheJsonCfg(); err != nil { - return - } - if err = cfg.cacheCfg.loadFromJsonCfg(cacheJsonCfg); err != nil { - return + zeroLimit := &CacheParamCfg{Limit: 0, + TTL: time.Duration(0), StaticTTL: false, Precache: false} + disabledCache := CacheCfg{ + utils.CacheDestinations: zeroLimit, + utils.CacheReverseDestinations: zeroLimit, + utils.CacheRatingPlans: zeroLimit, + utils.CacheRatingProfiles: zeroLimit, + utils.CacheActions: zeroLimit, + utils.CacheActionPlans: zeroLimit, + utils.CacheAccountActionPlans: zeroLimit, + utils.CacheActionTriggers: zeroLimit, + utils.CacheSharedGroups: zeroLimit, + utils.CacheTimings: zeroLimit, + utils.CacheResourceProfiles: zeroLimit, + utils.CacheResources: zeroLimit, + utils.CacheEventResources: zeroLimit, + utils.CacheStatQueueProfiles: zeroLimit, + utils.CacheStatQueues: zeroLimit, + utils.CacheThresholdProfiles: zeroLimit, + utils.CacheThresholds: zeroLimit, + utils.CacheFilters: zeroLimit, + utils.CacheSupplierProfiles: zeroLimit, + utils.CacheAttributeProfiles: zeroLimit, + utils.CacheChargerProfiles: zeroLimit, + utils.CacheDispatcherProfiles: zeroLimit, + utils.CacheDispatcherHosts: zeroLimit, + utils.CacheResourceFilterIndexes: zeroLimit, + utils.CacheStatFilterIndexes: zeroLimit, + utils.CacheThresholdFilterIndexes: zeroLimit, + utils.CacheSupplierFilterIndexes: zeroLimit, + utils.CacheAttributeFilterIndexes: zeroLimit, + utils.CacheChargerFilterIndexes: zeroLimit, + utils.CacheDispatcherFilterIndexes: zeroLimit, + utils.CacheDispatcherRoutes: zeroLimit, + utils.CacheRPCResponses: zeroLimit, + utils.CacheLoadIDs: zeroLimit, + utils.CacheDiameterMessages: &CacheParamCfg{Limit: -1, + TTL: time.Duration(3 * time.Hour), StaticTTL: false}, + utils.CacheClosedSessions: &CacheParamCfg{Limit: -1, + TTL: time.Duration(10 * time.Second), StaticTTL: false}, + utils.CacheRPCConnections: &CacheParamCfg{Limit: -1, + TTL: time.Duration(0), StaticTTL: false}, } + cfg.cacheCfg = disabledCache } return @@ -1328,6 +1361,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } cfg.rldChans[ATTRIBUTE_JSN] <- struct{}{} if !fall { + fmt.Println("Exit here from V1Reload") break } fallthrough @@ -1415,6 +1449,12 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { if !fall { break } + fallthrough + case RPCConnsJsonName: + cfg.rldChans[RPCConnsJsonName] <- struct{}{} + if !fall { + break + } } return } diff --git a/config/config_it_test.go b/config/config_it_test.go index 576a11a55..c58fd205e 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -910,13 +910,8 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) { "ContentFields": content, }, }, - "SessionSConns": []interface{}{ - map[string]interface{}{ - "Address": "127.0.0.1:2012", - "Synchronous": false, - "TLS": false, - "Transport": "*json", - }, + "SessionSConns": []string{ + utils.MetaLocalHost, }, } diff --git a/data/conf/samples/cdrsv1internal/cgrates.json b/data/conf/samples/cdrsv1internal/cgrates.json index a123adfd7..fa1049a7c 100644 --- a/data/conf/samples/cdrsv1internal/cgrates.json +++ b/data/conf/samples/cdrsv1internal/cgrates.json @@ -24,8 +24,16 @@ }, -"chargers":{ - "enabled": true +"chargers": { + "enabled": true, + "attributes_conns": [ + {"address": "*internal"}, + ], +}, + + +"attributes": { + "enabled": true, }, @@ -33,6 +41,12 @@ "enabled": true, "rals_conns": [ {"address": "127.0.0.1:2012", "transport":"*json"}, + ], + "attributes_conns":[ + {"address": "127.0.0.1:2012", "transport":"*json"} + ], + "chargers_conns":[ + {"address": "127.0.0.1:2012", "transport":"*json"}, ] }, diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 6bdc64b89..a0e393d9d 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -29,23 +29,69 @@ import ( "github.com/cgrates/ltcache" ) -// InternalDataDBParts indexes the internal DataDB partitions -var InternalDataDBParts []string = []string{ - ColDst, ColRds, ColAct, ColApl, ColAAp, ColTsk, ColAtr, ColRpl, - ColRpf, ColAcc, ColShg, ColLht, ColVer, ColRsP, ColRFI, ColTmg, ColRes, - ColSqs, ColSqp, ColTps, ColThs, ColFlt, ColSpp, ColAttr, ColCDRs, ColCpp, - ColDpp, ColDph, ColLID, +var unlimitedCfg = <cache.CacheConfig{ + MaxItems: -1, +} + +// internalDBCacheCfg indexes the internal DataDB partitions +var internalDBCacheCfg = map[string]*ltcache.CacheConfig{ + utils.CacheDestinations: unlimitedCfg, + utils.CacheReverseDestinations: unlimitedCfg, + utils.CacheActions: unlimitedCfg, + utils.CacheActionPlans: unlimitedCfg, + utils.CacheAccountActionPlans: unlimitedCfg, + utils.CacheActionTriggers: unlimitedCfg, + utils.CacheRatingPlans: unlimitedCfg, + utils.CacheRatingProfiles: unlimitedCfg, + utils.CacheAccounts: unlimitedCfg, + utils.CacheSharedGroups: unlimitedCfg, + utils.TBLVersions: unlimitedCfg, + utils.CacheTimings: unlimitedCfg, + utils.CacheFilters: unlimitedCfg, + utils.CacheResourceProfiles: unlimitedCfg, + utils.CacheResourceFilterIndexes: unlimitedCfg, + utils.CacheResources: unlimitedCfg, + utils.CacheStatFilterIndexes: unlimitedCfg, + utils.CacheStatQueueProfiles: unlimitedCfg, + utils.CacheStatQueues: unlimitedCfg, + utils.CacheThresholdFilterIndexes: unlimitedCfg, + utils.CacheThresholdProfiles: unlimitedCfg, + utils.CacheThresholds: unlimitedCfg, + utils.CacheSupplierFilterIndexes: unlimitedCfg, + utils.CacheSupplierProfiles: unlimitedCfg, + utils.CacheChargerFilterIndexes: unlimitedCfg, + utils.CacheChargerProfiles: unlimitedCfg, + utils.CacheAttributeFilterIndexes: unlimitedCfg, + utils.CacheAttributeProfiles: unlimitedCfg, + utils.CacheDispatcherFilterIndexes: unlimitedCfg, + utils.CacheDispatcherProfiles: unlimitedCfg, + utils.CacheDispatcherHosts: unlimitedCfg, + utils.CacheDiameterMessages: unlimitedCfg, + utils.CacheEventResources: unlimitedCfg, + utils.CacheLoadIDs: unlimitedCfg, + utils.TBLTPTimings: unlimitedCfg, + utils.TBLTPDestinations: unlimitedCfg, + utils.TBLTPRates: unlimitedCfg, + utils.TBLTPDestinationRates: unlimitedCfg, + utils.TBLTPRatingPlans: unlimitedCfg, + utils.TBLTPRateProfiles: unlimitedCfg, + utils.TBLTPSharedGroups: unlimitedCfg, + utils.TBLTPActions: unlimitedCfg, + utils.TBLTPActionTriggers: unlimitedCfg, + utils.TBLTPAccountActions: unlimitedCfg, + utils.TBLTPResources: unlimitedCfg, + utils.TBLTPStats: unlimitedCfg, + utils.TBLTPThresholds: unlimitedCfg, + utils.TBLTPFilters: unlimitedCfg, + utils.SessionCostsTBL: unlimitedCfg, + utils.TBLTPActionPlans: unlimitedCfg, + utils.TBLTPSuppliers: unlimitedCfg, + utils.TBLTPAttributes: unlimitedCfg, + utils.TBLTPChargers: unlimitedCfg, } // InternalStorDBParts indexes the internal StorDB partitions -var InternalStorDBParts []string = []string{ - utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPRates, - utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, utils.TBLTPRateProfiles, - utils.TBLTPSharedGroups, utils.TBLTPActions, utils.TBLTPActionTriggers, - utils.TBLTPAccountActions, utils.TBLTPResources, utils.TBLTPStats, utils.TBLTPThresholds, - utils.TBLTPFilters, utils.SessionCostsTBL, utils.CDRsTBL, utils.TBLTPActionPlans, - utils.TBLVersions, utils.TBLTPSuppliers, utils.TBLTPAttributes, utils.TBLTPChargers, -} +var InternalStorDBParts []string = []string{} type InternalDB struct { tasks []*Task @@ -58,9 +104,8 @@ type InternalDB struct { // NewInternalDB constructs an InternalDB func NewInternalDB(stringIndexedFields, prefixIndexedFields []string) (iDB *InternalDB) { - dfltCfg, _ := config.NewDefaultCGRConfig() iDB = &InternalDB{ - db: ltcache.NewTransCache(dfltCfg.CacheCfg().AsTransCacheConfig()), + db: ltcache.NewTransCache(internalDBCacheCfg), stringIndexedFields: stringIndexedFields, prefixIndexedFields: prefixIndexedFields, cnter: utils.NewCounter(time.Now().UnixNano(), 0), diff --git a/engine/storage_internal_stordb.go b/engine/storage_internal_stordb.go index fda132403..9f1a93056 100644 --- a/engine/storage_internal_stordb.go +++ b/engine/storage_internal_stordb.go @@ -33,7 +33,7 @@ func (iDB *InternalDB) GetTpIds(colName string) (ids []string, err error) { func (iDB *InternalDB) GetTpTableIds(tpid, table string, distinct utils.TPDistinctIds, filters map[string]string, paginator *utils.PaginatorWithSearch) (ids []string, err error) { - key := table + utils.CONCATENATED_KEY_SEP + tpid + key := tpid fullIDs := iDB.db.GetItemIDs(table, key) switch table { // in case of account action we have the id the following form : loadid:tenant:account @@ -103,7 +103,7 @@ func (iDB *InternalDB) GetTpTableIds(tpid, table string, distinct utils.TPDistin } func (iDB *InternalDB) GetTPTimings(tpid, id string) (timings []*utils.ApierTPTiming, err error) { - key := utils.TBLTPTimings + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if id != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + id } @@ -123,7 +123,7 @@ func (iDB *InternalDB) GetTPTimings(tpid, id string) (timings []*utils.ApierTPTi } func (iDB *InternalDB) GetTPDestinations(tpid, id string) (dsts []*utils.TPDestination, err error) { - key := utils.TBLTPDestinations + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if id != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + id } @@ -143,7 +143,7 @@ func (iDB *InternalDB) GetTPDestinations(tpid, id string) (dsts []*utils.TPDesti } func (iDB *InternalDB) GetTPRates(tpid, id string) (rates []*utils.TPRate, err error) { - key := utils.TBLTPRates + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if id != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + id } @@ -168,7 +168,7 @@ func (iDB *InternalDB) GetTPRates(tpid, id string) (rates []*utils.TPRate, err e func (iDB *InternalDB) GetTPDestinationRates(tpid, id string, paginator *utils.Paginator) (dRates []*utils.TPDestinationRate, err error) { - key := utils.TBLTPDestinationRates + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if id != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + id } @@ -213,7 +213,7 @@ func (iDB *InternalDB) GetTPDestinationRates(tpid, id string, } func (iDB *InternalDB) GetTPRatingPlans(tpid, id string, paginator *utils.Paginator) (rPlans []*utils.TPRatingPlan, err error) { - key := utils.TBLTPRatingPlans + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if id != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + id } @@ -257,7 +257,7 @@ func (iDB *InternalDB) GetTPRatingPlans(tpid, id string, paginator *utils.Pagina } func (iDB *InternalDB) GetTPRatingProfiles(filter *utils.TPRatingProfile) (rProfiles []*utils.TPRatingProfile, err error) { - key := utils.TBLTPRateProfiles + utils.CONCATENATED_KEY_SEP + filter.TPid + key := filter.TPid if filter.LoadId != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + filter.LoadId @@ -287,7 +287,7 @@ func (iDB *InternalDB) GetTPRatingProfiles(filter *utils.TPRatingProfile) (rProf } func (iDB *InternalDB) GetTPSharedGroups(tpid, id string) (sGroups []*utils.TPSharedGroups, err error) { - key := utils.TBLTPSharedGroups + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if id != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + id } @@ -307,7 +307,7 @@ func (iDB *InternalDB) GetTPSharedGroups(tpid, id string) (sGroups []*utils.TPSh } func (iDB *InternalDB) GetTPActions(tpid, id string) (actions []*utils.TPActions, err error) { - key := utils.TBLTPActions + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if id != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + id } @@ -327,7 +327,7 @@ func (iDB *InternalDB) GetTPActions(tpid, id string) (actions []*utils.TPActions } func (iDB *InternalDB) GetTPActionPlans(tpid, id string) (aPlans []*utils.TPActionPlan, err error) { - key := utils.TBLTPActionPlans + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if id != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + id } @@ -347,7 +347,7 @@ func (iDB *InternalDB) GetTPActionPlans(tpid, id string) (aPlans []*utils.TPActi } func (iDB *InternalDB) GetTPActionTriggers(tpid, id string) (aTriggers []*utils.TPActionTriggers, err error) { - key := utils.TBLTPActionTriggers + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if id != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + id } @@ -365,7 +365,7 @@ func (iDB *InternalDB) GetTPActionTriggers(tpid, id string) (aTriggers []*utils. return } func (iDB *InternalDB) GetTPAccountActions(filter *utils.TPAccountActions) (accounts []*utils.TPAccountActions, err error) { - key := utils.TBLTPAccountActions + utils.CONCATENATED_KEY_SEP + filter.TPid + key := filter.TPid if filter.LoadId != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + filter.LoadId @@ -392,7 +392,7 @@ func (iDB *InternalDB) GetTPAccountActions(filter *utils.TPAccountActions) (acco } func (iDB *InternalDB) GetTPResources(tpid, tenant, id string) (resources []*utils.TPResourceProfile, err error) { - key := utils.TBLTPResources + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if tenant != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + tenant } @@ -415,7 +415,7 @@ func (iDB *InternalDB) GetTPResources(tpid, tenant, id string) (resources []*uti } func (iDB *InternalDB) GetTPStats(tpid, tenant, id string) (stats []*utils.TPStatProfile, err error) { - key := utils.TBLTPStats + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if tenant != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + tenant } @@ -438,7 +438,7 @@ func (iDB *InternalDB) GetTPStats(tpid, tenant, id string) (stats []*utils.TPSta } func (iDB *InternalDB) GetTPThresholds(tpid, tenant, id string) (ths []*utils.TPThresholdProfile, err error) { - key := utils.TBLTPThresholds + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if tenant != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + tenant } @@ -461,7 +461,7 @@ func (iDB *InternalDB) GetTPThresholds(tpid, tenant, id string) (ths []*utils.TP } func (iDB *InternalDB) GetTPFilters(tpid, tenant, id string) (fltrs []*utils.TPFilterProfile, err error) { - key := utils.TBLTPFilters + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if tenant != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + tenant } @@ -484,7 +484,7 @@ func (iDB *InternalDB) GetTPFilters(tpid, tenant, id string) (fltrs []*utils.TPF } func (iDB *InternalDB) GetTPSuppliers(tpid, tenant, id string) (supps []*utils.TPSupplierProfile, err error) { - key := utils.TBLTPSuppliers + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if tenant != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + tenant } @@ -507,7 +507,7 @@ func (iDB *InternalDB) GetTPSuppliers(tpid, tenant, id string) (supps []*utils.T } func (iDB *InternalDB) GetTPAttributes(tpid, tenant, id string) (attrs []*utils.TPAttributeProfile, err error) { - key := utils.TBLTPAttributes + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if tenant != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + tenant } @@ -530,7 +530,7 @@ func (iDB *InternalDB) GetTPAttributes(tpid, tenant, id string) (attrs []*utils. } func (iDB *InternalDB) GetTPChargers(tpid, tenant, id string) (cpps []*utils.TPChargerProfile, err error) { - key := utils.TBLTPChargers + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if tenant != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + tenant } @@ -553,7 +553,7 @@ func (iDB *InternalDB) GetTPChargers(tpid, tenant, id string) (cpps []*utils.TPC } func (iDB *InternalDB) GetTPDispatcherProfiles(tpid, tenant, id string) (dpps []*utils.TPDispatcherProfile, err error) { - key := utils.TBLTPDispatchers + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if tenant != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + tenant } @@ -576,7 +576,7 @@ func (iDB *InternalDB) GetTPDispatcherProfiles(tpid, tenant, id string) (dpps [] } func (iDB *InternalDB) GetTPDispatcherHosts(tpid, tenant, id string) (dpps []*utils.TPDispatcherHost, err error) { - key := utils.TBLTPDispatcherHosts + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if tenant != utils.EmptyString { key += utils.CONCATENATED_KEY_SEP + tenant } @@ -603,7 +603,7 @@ func (iDB *InternalDB) RemTpData(table, tpid string, args map[string]string) (er if table == utils.EmptyString { return iDB.Flush(utils.EmptyString) } - key := table + utils.CONCATENATED_KEY_SEP + tpid + key := tpid if args != nil { for _, val := range args { key += utils.CONCATENATED_KEY_SEP + val @@ -622,7 +622,7 @@ func (iDB *InternalDB) SetTPTimings(timings []*utils.ApierTPTiming) (err error) return nil } for _, timing := range timings { - iDB.db.Set(utils.TBLTPTimings, utils.ConcatenatedKey(utils.TBLTPTimings, timing.TPid, timing.ID), timing, nil, + iDB.db.Set(utils.TBLTPTimings, utils.ConcatenatedKey(timing.TPid, timing.ID), timing, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -632,7 +632,7 @@ func (iDB *InternalDB) SetTPDestinations(dests []*utils.TPDestination) (err erro return nil } for _, destination := range dests { - iDB.db.Set(utils.TBLTPDestinations, utils.ConcatenatedKey(utils.TBLTPDestinations, destination.TPid, destination.ID), destination, nil, + iDB.db.Set(utils.TBLTPDestinations, utils.ConcatenatedKey(destination.TPid, destination.ID), destination, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -643,7 +643,7 @@ func (iDB *InternalDB) SetTPRates(rates []*utils.TPRate) (err error) { return nil } for _, rate := range rates { - iDB.db.Set(utils.TBLTPRates, utils.ConcatenatedKey(utils.TBLTPRates, rate.TPid, rate.ID), rate, nil, + iDB.db.Set(utils.TBLTPRates, utils.ConcatenatedKey(rate.TPid, rate.ID), rate, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -654,7 +654,7 @@ func (iDB *InternalDB) SetTPDestinationRates(dRates []*utils.TPDestinationRate) return nil } for _, dRate := range dRates { - iDB.db.Set(utils.TBLTPDestinationRates, utils.ConcatenatedKey(utils.TBLTPDestinationRates, dRate.TPid, dRate.ID), dRate, nil, + iDB.db.Set(utils.TBLTPDestinationRates, utils.ConcatenatedKey(dRate.TPid, dRate.ID), dRate, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -665,7 +665,7 @@ func (iDB *InternalDB) SetTPRatingPlans(ratingPlans []*utils.TPRatingPlan) (err return nil } for _, rPlan := range ratingPlans { - iDB.db.Set(utils.TBLTPRatingPlans, utils.ConcatenatedKey(utils.TBLTPRatingPlans, rPlan.TPid, rPlan.ID), rPlan, nil, + iDB.db.Set(utils.TBLTPRatingPlans, utils.ConcatenatedKey(rPlan.TPid, rPlan.ID), rPlan, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -676,7 +676,7 @@ func (iDB *InternalDB) SetTPRatingProfiles(ratingProfiles []*utils.TPRatingProfi return nil } for _, rProfile := range ratingProfiles { - iDB.db.Set(utils.TBLTPRateProfiles, utils.ConcatenatedKey(utils.TBLTPRateProfiles, rProfile.TPid, + iDB.db.Set(utils.TBLTPRateProfiles, utils.ConcatenatedKey(rProfile.TPid, rProfile.LoadId, rProfile.Tenant, rProfile.Category, rProfile.Subject), rProfile, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } @@ -688,7 +688,7 @@ func (iDB *InternalDB) SetTPSharedGroups(groups []*utils.TPSharedGroups) (err er return nil } for _, group := range groups { - iDB.db.Set(utils.TBLTPSharedGroups, utils.ConcatenatedKey(utils.TBLTPSharedGroups, group.TPid, group.ID), group, nil, + iDB.db.Set(utils.TBLTPSharedGroups, utils.ConcatenatedKey(group.TPid, group.ID), group, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -699,7 +699,7 @@ func (iDB *InternalDB) SetTPActions(acts []*utils.TPActions) (err error) { return nil } for _, action := range acts { - iDB.db.Set(utils.TBLTPActions, utils.ConcatenatedKey(utils.TBLTPActions, action.TPid, action.ID), action, nil, + iDB.db.Set(utils.TBLTPActions, utils.ConcatenatedKey(action.TPid, action.ID), action, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -710,7 +710,7 @@ func (iDB *InternalDB) SetTPActionPlans(aPlans []*utils.TPActionPlan) (err error return nil } for _, aPlan := range aPlans { - iDB.db.Set(utils.TBLTPActionPlans, utils.ConcatenatedKey(utils.TBLTPActionPlans, aPlan.TPid, aPlan.ID), aPlan, nil, + iDB.db.Set(utils.TBLTPActionPlans, utils.ConcatenatedKey(aPlan.TPid, aPlan.ID), aPlan, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -721,7 +721,7 @@ func (iDB *InternalDB) SetTPActionTriggers(aTriggers []*utils.TPActionTriggers) return nil } for _, aTrigger := range aTriggers { - iDB.db.Set(utils.TBLTPActionTriggers, utils.ConcatenatedKey(utils.TBLTPActionTriggers, aTrigger.TPid, aTrigger.ID), aTrigger, nil, + iDB.db.Set(utils.TBLTPActionTriggers, utils.ConcatenatedKey(aTrigger.TPid, aTrigger.ID), aTrigger, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -732,7 +732,7 @@ func (iDB *InternalDB) SetTPAccountActions(accActions []*utils.TPAccountActions) return nil } for _, accAction := range accActions { - iDB.db.Set(utils.TBLTPAccountActions, utils.ConcatenatedKey(utils.TBLTPAccountActions, accAction.TPid, + iDB.db.Set(utils.TBLTPAccountActions, utils.ConcatenatedKey(accAction.TPid, accAction.LoadId, accAction.Tenant, accAction.Account), accAction, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } @@ -744,7 +744,7 @@ func (iDB *InternalDB) SetTPResources(resources []*utils.TPResourceProfile) (err return nil } for _, resource := range resources { - iDB.db.Set(utils.TBLTPResources, utils.ConcatenatedKey(utils.TBLTPResources, resource.TPid, resource.Tenant, resource.ID), resource, nil, + iDB.db.Set(utils.TBLTPResources, utils.ConcatenatedKey(resource.TPid, resource.Tenant, resource.ID), resource, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -754,7 +754,7 @@ func (iDB *InternalDB) SetTPStats(stats []*utils.TPStatProfile) (err error) { return nil } for _, stat := range stats { - iDB.db.Set(utils.TBLTPStats, utils.ConcatenatedKey(utils.TBLTPStats, stat.TPid, stat.Tenant, stat.ID), stat, nil, + iDB.db.Set(utils.TBLTPStats, utils.ConcatenatedKey(stat.TPid, stat.Tenant, stat.ID), stat, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -765,7 +765,7 @@ func (iDB *InternalDB) SetTPThresholds(thresholds []*utils.TPThresholdProfile) ( } for _, threshold := range thresholds { - iDB.db.Set(utils.TBLTPThresholds, utils.ConcatenatedKey(utils.TBLTPThresholds, threshold.TPid, threshold.Tenant, threshold.ID), threshold, nil, + iDB.db.Set(utils.TBLTPThresholds, utils.ConcatenatedKey(threshold.TPid, threshold.Tenant, threshold.ID), threshold, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -776,7 +776,7 @@ func (iDB *InternalDB) SetTPFilters(filters []*utils.TPFilterProfile) (err error } for _, filter := range filters { - iDB.db.Set(utils.TBLTPFilters, utils.ConcatenatedKey(utils.TBLTPFilters, filter.TPid, filter.Tenant, filter.ID), filter, nil, + iDB.db.Set(utils.TBLTPFilters, utils.ConcatenatedKey(filter.TPid, filter.Tenant, filter.ID), filter, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -787,7 +787,7 @@ func (iDB *InternalDB) SetTPSuppliers(suppliers []*utils.TPSupplierProfile) (err return nil } for _, supplier := range suppliers { - iDB.db.Set(utils.TBLTPSuppliers, utils.ConcatenatedKey(utils.TBLTPSuppliers, supplier.TPid, supplier.Tenant, supplier.ID), supplier, nil, + iDB.db.Set(utils.TBLTPSuppliers, utils.ConcatenatedKey(supplier.TPid, supplier.Tenant, supplier.ID), supplier, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -799,7 +799,7 @@ func (iDB *InternalDB) SetTPAttributes(attributes []*utils.TPAttributeProfile) ( } for _, attribute := range attributes { - iDB.db.Set(utils.TBLTPAttributes, utils.ConcatenatedKey(utils.TBLTPAttributes, attribute.TPid, attribute.Tenant, attribute.ID), attribute, nil, + iDB.db.Set(utils.TBLTPAttributes, utils.ConcatenatedKey(attribute.TPid, attribute.Tenant, attribute.ID), attribute, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -810,7 +810,7 @@ func (iDB *InternalDB) SetTPChargers(cpps []*utils.TPChargerProfile) (err error) } for _, cpp := range cpps { - iDB.db.Set(utils.TBLTPChargers, utils.ConcatenatedKey(utils.TBLTPChargers, cpp.TPid, cpp.Tenant, cpp.ID), cpp, nil, + iDB.db.Set(utils.TBLTPChargers, utils.ConcatenatedKey(cpp.TPid, cpp.Tenant, cpp.ID), cpp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -821,7 +821,7 @@ func (iDB *InternalDB) SetTPDispatcherProfiles(dpps []*utils.TPDispatcherProfile } for _, dpp := range dpps { - iDB.db.Set(utils.TBLTPDispatchers, utils.ConcatenatedKey(utils.TBLTPDispatchers, dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil, + iDB.db.Set(utils.TBLTPDispatchers, utils.ConcatenatedKey(dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -831,7 +831,7 @@ func (iDB *InternalDB) SetTPDispatcherHosts(dpps []*utils.TPDispatcherHost) (err return nil } for _, dpp := range dpps { - iDB.db.Set(utils.TBLTPDispatcherHosts, utils.ConcatenatedKey(utils.TBLTPDispatcherHosts, dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil, + iDB.db.Set(utils.TBLTPDispatcherHosts, utils.ConcatenatedKey(dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -843,7 +843,7 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) { cdr.OrderID = iDB.cnter.Next() } if !allowUpdate { - x, ok := iDB.db.Get(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID)) + x, ok := iDB.db.Get(utils.CDRsTBL, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID)) if ok && x != nil { return utils.ErrExists } @@ -879,14 +879,14 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) { } } - iDB.db.Set(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID), cdr, idxs.AsSlice(), + iDB.db.Set(utils.CDRsTBL, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID), cdr, idxs.AsSlice(), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveSMCost(smc *SMCost) (err error) { - iDB.db.Remove(utils.SessionCostsTBL, utils.ConcatenatedKey(utils.SessionCostsTBL, smc.CGRID, smc.RunID, smc.OriginHost, smc.OriginID), + iDB.db.Remove(utils.SessionCostsTBL, utils.ConcatenatedKey(smc.CGRID, smc.RunID, smc.OriginHost, smc.OriginID), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -1225,7 +1225,7 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C } if remove { for _, cdr := range cdrs { - iDB.db.Remove(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID), + iDB.db.Remove(utils.CDRsTBL, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID), cacheCommit(utils.NonTransactional), utils.NonTransactional) } return nil, 0, nil @@ -1366,7 +1366,7 @@ func (iDB *InternalDB) SetSMCost(smCost *SMCost) (err error) { idxs.Add(utils.ConcatenatedKey(utils.OriginHost, smCost.OriginHost)) idxs.Add(utils.ConcatenatedKey(utils.OriginID, smCost.OriginID)) idxs.Add(utils.ConcatenatedKey(utils.CostSource, smCost.CostSource)) - iDB.db.Set(utils.SessionCostsTBL, utils.ConcatenatedKey(utils.SessionCostsTBL, smCost.CGRID, smCost.RunID, smCost.OriginHost, smCost.OriginID), smCost, idxs.AsSlice(), + iDB.db.Set(utils.SessionCostsTBL, utils.ConcatenatedKey(smCost.CGRID, smCost.RunID, smCost.OriginHost, smCost.OriginID), smCost, idxs.AsSlice(), cacheCommit(utils.NonTransactional), utils.NonTransactional) return err } diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index ad6520755..e8e4bdd0b 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/cgrates/rpcclient" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -48,8 +50,10 @@ func TestAttributeSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) attrS := NewAttributeService(cfg, db, - chS, filterSChan, server) - srvMngr.AddServices(attrS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) + chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), + ) + srvMngr.AddServices(NewConnManagerService(cfg, nil), attrS, + NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -59,6 +63,7 @@ func TestAttributeSReload(t *testing.T) { if db.IsRunning() { t.Errorf("Expected service to be down") } + var reply string if err := cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{ Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"), diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index 27ebfd12f..b56118475 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -78,7 +78,7 @@ func TestCdrsReload(t *testing.T) { make(chan rpcclient.RpcClientConnection, 1), chrS.GetIntenternalChan(), ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil) - srvMngr.AddServices(cdrS, ralS, schS, chrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb) + srvMngr.AddServices(NewConnManagerService(cfg, nil), cdrS, ralS, schS, chrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go index 12b87359d..9dc9c0e88 100644 --- a/services/chargers_it_test.go +++ b/services/chargers_it_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/cgrates/rpcclient" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -49,9 +51,9 @@ func TestChargerSReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) - attrS := NewAttributeService(cfg, db, chS, filterSChan, server) + attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1)) chrS := NewChargerService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan(), nil) - srvMngr.AddServices(attrS, chrS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(NewConnManagerService(cfg, nil), attrS, chrS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/datadb_it_test.go b/services/datadb_it_test.go index 08cac78ce..839004497 100644 --- a/services/datadb_it_test.go +++ b/services/datadb_it_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "github.com/cgrates/rpcclient" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -48,8 +50,9 @@ func TestDataDBReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) - srvMngr.AddServices(NewAttributeService(cfg, db, - chS, filterSChan, server), NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(NewConnManagerService(cfg, nil), NewAttributeService(cfg, db, + chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1)), + NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/dispatchers_it_test.go b/services/dispatchers_it_test.go index 86e8c679f..9d6ae8f84 100644 --- a/services/dispatchers_it_test.go +++ b/services/dispatchers_it_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/cgrates/rpcclient" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -50,9 +52,9 @@ func TestDispatcherSReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) - attrS := NewAttributeService(cfg, db, chS, filterSChan, server) - srv := NewDispatcherService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan()) - srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) + attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1)) + srv := NewDispatcherService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan(), make(chan rpcclient.RpcClientConnection, 1)) + srvMngr.AddServices(NewConnManagerService(cfg, nil), attrS, srv, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index 4e1e81486..30d91323d 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -51,9 +51,10 @@ func TestDNSAgentReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) sS := NewSessionService(cfg, db, server, nil, - nil, nil, nil, nil, nil, nil, nil, nil, engineShutdown) + nil, nil, nil, nil, + nil, nil, nil, nil, make(chan rpcclient.RpcClientConnection, 1), engineShutdown) srv := NewDNSAgent(cfg, filterSChan, sS.GetIntenternalChan(), nil, engineShutdown) - srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db) + srvMngr.AddServices(NewConnManagerService(cfg, nil), srv, sS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 237c0fb33..0b0736951 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "github.com/cgrates/rpcclient" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -54,9 +56,10 @@ func TestEventReaderSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) sS := NewSessionService(cfg, db, server, nil, - nil, nil, nil, nil, nil, nil, nil, nil, engineShutdown) - attrS := NewEventReaderService(cfg, filterSChan, sS.GetIntenternalChan(), nil, engineShutdown) - srvMngr.AddServices(attrS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) + nil, nil, nil, nil, + nil, nil, nil, nil, make(chan rpcclient.RpcClientConnection, 1), engineShutdown) + attrS := NewEventReaderService(cfg, filterSChan, engineShutdown, nil) + srvMngr.AddServices(NewConnManagerService(cfg, nil), attrS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/rals_it_test.go b/services/rals_it_test.go index a10ca20aa..f900b5c14 100644 --- a/services/rals_it_test.go +++ b/services/rals_it_test.go @@ -72,7 +72,7 @@ func TestRalsReload(t *testing.T) { ralS := NewRalService(cfg, db, stordb, chS, filterSChan, server, tS.GetIntenternalChan(), internalChan, cacheSChan, internalChan, internalChan, internalChan, schS, engineShutdown) - srvMngr.AddServices(ralS, schS, tS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb) + srvMngr.AddServices(NewConnManagerService(cfg, nil), ralS, schS, tS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/resources_it_test.go b/services/resources_it_test.go index 50119d9dd..791dff249 100644 --- a/services/resources_it_test.go +++ b/services/resources_it_test.go @@ -54,7 +54,7 @@ func TestResourceSReload(t *testing.T) { db := NewDataDBService(cfg) tS := NewThresholdService(cfg, db, chS, filterSChan, server) reS := NewResourceService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil) - srvMngr.AddServices(tS, reS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, reS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/schedulers_it_test.go b/services/schedulers_it_test.go index 7ea358f46..ef101bb17 100644 --- a/services/schedulers_it_test.go +++ b/services/schedulers_it_test.go @@ -49,7 +49,7 @@ func TestSchedulerSReload(t *testing.T) { internalCdrSChan <- nil db := NewDataDBService(cfg) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, internalCdrSChan, nil) - srvMngr.AddServices(schS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(NewConnManagerService(cfg, nil), schS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/sessions_it_test.go b/services/sessions_it_test.go index e86486e9f..0b91020dc 100644 --- a/services/sessions_it_test.go +++ b/services/sessions_it_test.go @@ -80,8 +80,9 @@ func TestSessionSReload(t *testing.T) { chrS.GetIntenternalChan(), ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil) srv := NewSessionService(cfg, db, server, chrS.GetIntenternalChan(), - ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil, nil, cdrS.GetIntenternalChan(), nil, engineShutdown) - srvMngr.AddServices(srv, chrS, schS, ralS, cdrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb) + ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, + nil, nil, cdrS.GetIntenternalChan(), nil, make(chan rpcclient.RpcClientConnection, 1), engineShutdown) + srvMngr.AddServices(NewConnManagerService(cfg, nil), srv, chrS, schS, ralS, cdrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/stats_it_test.go b/services/stats_it_test.go index 02f5e8f4d..764edce2d 100644 --- a/services/stats_it_test.go +++ b/services/stats_it_test.go @@ -54,7 +54,7 @@ func TestStatSReload(t *testing.T) { db := NewDataDBService(cfg) tS := NewThresholdService(cfg, db, chS, filterSChan, server) sS := NewStatService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil) - srvMngr.AddServices(tS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/suppliers_it_test.go b/services/suppliers_it_test.go index a5dcddd3f..22ea4b6e2 100644 --- a/services/suppliers_it_test.go +++ b/services/suppliers_it_test.go @@ -52,7 +52,7 @@ func TestSupplierSReload(t *testing.T) { db := NewDataDBService(cfg) sts := NewStatService(cfg, db, chS, filterSChan, server, nil, nil) supS := NewSupplierService(cfg, db, chS, filterSChan, server, nil, sts.GetIntenternalChan(), nil, nil) - srvMngr.AddServices(supS, sts, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(NewConnManagerService(cfg, nil), supS, sts, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index ca9e008c3..a9f3fa476 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -49,7 +49,7 @@ func TestThresholdSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) tS := NewThresholdService(cfg, db, chS, filterSChan, server) - srvMngr.AddServices(tS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) + srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 6b028e3d4..803d50fa8 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -305,6 +305,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.StorDB); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJsonName): + if err = srvMngr.reloadService(utils.RPCConnS); err != nil { + return + } } // handle RPC server } @@ -312,7 +316,6 @@ func (srvMngr *ServiceManager) handleReload() { func (srvMngr *ServiceManager) reloadService(srviceName string) (err error) { srv := srvMngr.GetService(srviceName) - if srv.ShouldRun() { if srv.IsRunning() { if err = srv.Reload(); err != nil {