Remove prefix when add items in internalDB

This commit is contained in:
TeoV
2019-12-04 07:29:14 -05:00
parent b7743036f2
commit 4a201c6f9e
24 changed files with 226 additions and 153 deletions

View File

@@ -95,11 +95,11 @@ func TestDiamItDispatcher(t *testing.T) {
t.SkipNow()
return
}
testDiamItResetAllDB(t)
isDispatcherActive = true
engine.StartEngine(path.Join(*dataDir, "conf", "samples", "dispatchers", "all"), 200)
engine.StartEngine(path.Join(*dataDir, "conf", "samples", "dispatchers", "all2"), 200)
diamConfigDIR = "dispatchers/diamagent"
testDiamItResetAllDB(t)
for _, stest := range sTestsDiam {
t.Run(diamConfigDIR, stest)
}

View File

@@ -147,6 +147,7 @@ func testV1CDRsProcessEventWithRefund(t *testing.T) {
utils.Destination: "+4986517174963",
utils.AnswerTime: time.Date(2019, 11, 27, 12, 21, 26, 0, time.UTC),
utils.Usage: time.Duration(3) * time.Minute,
utils.Subject: "ANY2CNT",
},
},
}

View File

@@ -555,6 +555,7 @@ func main() {
utils.CoreSv1: internalCoreSv1Chan,
utils.RALsV1: rals.GetIntenternalChan(),
})
fmt.Println("BEfore srvManager.AddServices")
srvManager.AddServices(connManager, attrS, chrS, tS, stS, reS, supS, schS, rals,
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg,
services.NewEventReaderService(cfg, filterSChan, exitChan, connManager.GetConnMgr()),
@@ -567,9 +568,10 @@ func main() {
services.NewHTTPAgent(cfg, filterSChan, smg.GetIntenternalChan(), dspS.GetIntenternalChan(), server), // no reload
ldrs, anz, dspS, dmService, storDBService,
)
fmt.Println("After srvManager.AddServices")
fmt.Println("srvManager Start")
srvManager.StartServices()
fmt.Println("srvManager Start success")
// Start FilterS
go startFilterService(filterSChan, cacheS, stS.GetIntenternalChan(),
reS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(),

View File

@@ -81,47 +81,3 @@ func (cCfg CacheCfg) AsTransCacheConfig() (tcCfg map[string]*ltcache.CacheConfig
}
return
}
// CGRATES_CFG_JSON_DISABLED_CACHE is used to populate cache config when DataDB is internal
const CGRATES_CFG_JSON_DISABLED_CACHE = `
{
"cache":{
"*destinations": {"limit": 0},
"*reverse_destinations": {"limit": 0},
"*rating_plans": {"limit": 0},
"*rating_profiles": {"limit": 0},
"*actions": {"limit": 0},
"*action_plans": {"limit": 0},
"*account_action_plans": {"limit": 0},
"*action_triggers": {"limit": 0},
"*shared_groups": {"limit": 0},
"*timings": {"limit": 0},
"*resource_profiles": {"limit": 0},
"*resources": {"limit": 0},
"*event_resources": {"limit": 0},
"*statqueue_profiles": {"limit": 0},
"*statqueues": {"limit": 0},
"*threshold_profiles": {"limit": 0},
"*thresholds": {"limit": 0},
"*filters": {"limit": 0},
"*supplier_profiles": {"limit": 0},
"*attribute_profiles": {"limit": 0},
"*charger_profiles": {"limit": 0},
"*dispatcher_profiles": {"limit": 0},
"*dispatcher_hosts": {"limit": 0},
"*resource_filter_indexes" : {"limit": 0},
"*stat_filter_indexes" : {"limit": 0},
"*threshold_filter_indexes" : {"limit": 0},
"*supplier_filter_indexes" : {"limit": 0},
"*attribute_filter_indexes" : {"limit": 0},
"*charger_filter_indexes" : {"limit": 0},
"*dispatcher_filter_indexes" : {"limit": 0},
"*dispatcher_routes": {"limit": 0},
"*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false}, // diameter messages caching
"*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false}, // RPC responses caching
"*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false}, // closed sessions cached for CDRs
"*load_ids": {"limit": 0},
},
}`

View File

@@ -425,17 +425,50 @@ func (cfg *CGRConfig) loadDataDBCfg(jsnCfg *CgrJsonCfg) (err error) {
// in case of internalDB we need to disable the cache
// so we enforce it here
if cfg.dataDbCfg.DataDbType == utils.INTERNAL {
var customCfg *CgrJsonCfg
var cacheJsonCfg *CacheJsonCfg
if customCfg, err = NewCgrJsonCfgFromBytes([]byte(CGRATES_CFG_JSON_DISABLED_CACHE)); err != nil {
return
}
if cacheJsonCfg, err = customCfg.CacheJsonCfg(); err != nil {
return
}
if err = cfg.cacheCfg.loadFromJsonCfg(cacheJsonCfg); err != nil {
return
zeroLimit := &CacheParamCfg{Limit: 0,
TTL: time.Duration(0), StaticTTL: false, Precache: false}
disabledCache := CacheCfg{
utils.CacheDestinations: zeroLimit,
utils.CacheReverseDestinations: zeroLimit,
utils.CacheRatingPlans: zeroLimit,
utils.CacheRatingProfiles: zeroLimit,
utils.CacheActions: zeroLimit,
utils.CacheActionPlans: zeroLimit,
utils.CacheAccountActionPlans: zeroLimit,
utils.CacheActionTriggers: zeroLimit,
utils.CacheSharedGroups: zeroLimit,
utils.CacheTimings: zeroLimit,
utils.CacheResourceProfiles: zeroLimit,
utils.CacheResources: zeroLimit,
utils.CacheEventResources: zeroLimit,
utils.CacheStatQueueProfiles: zeroLimit,
utils.CacheStatQueues: zeroLimit,
utils.CacheThresholdProfiles: zeroLimit,
utils.CacheThresholds: zeroLimit,
utils.CacheFilters: zeroLimit,
utils.CacheSupplierProfiles: zeroLimit,
utils.CacheAttributeProfiles: zeroLimit,
utils.CacheChargerProfiles: zeroLimit,
utils.CacheDispatcherProfiles: zeroLimit,
utils.CacheDispatcherHosts: zeroLimit,
utils.CacheResourceFilterIndexes: zeroLimit,
utils.CacheStatFilterIndexes: zeroLimit,
utils.CacheThresholdFilterIndexes: zeroLimit,
utils.CacheSupplierFilterIndexes: zeroLimit,
utils.CacheAttributeFilterIndexes: zeroLimit,
utils.CacheChargerFilterIndexes: zeroLimit,
utils.CacheDispatcherFilterIndexes: zeroLimit,
utils.CacheDispatcherRoutes: zeroLimit,
utils.CacheRPCResponses: zeroLimit,
utils.CacheLoadIDs: zeroLimit,
utils.CacheDiameterMessages: &CacheParamCfg{Limit: -1,
TTL: time.Duration(3 * time.Hour), StaticTTL: false},
utils.CacheClosedSessions: &CacheParamCfg{Limit: -1,
TTL: time.Duration(10 * time.Second), StaticTTL: false},
utils.CacheRPCConnections: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false},
}
cfg.cacheCfg = disabledCache
}
return
@@ -1328,6 +1361,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
}
cfg.rldChans[ATTRIBUTE_JSN] <- struct{}{}
if !fall {
fmt.Println("Exit here from V1Reload")
break
}
fallthrough
@@ -1415,6 +1449,12 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
if !fall {
break
}
fallthrough
case RPCConnsJsonName:
cfg.rldChans[RPCConnsJsonName] <- struct{}{}
if !fall {
break
}
}
return
}

View File

@@ -910,13 +910,8 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
"ContentFields": content,
},
},
"SessionSConns": []interface{}{
map[string]interface{}{
"Address": "127.0.0.1:2012",
"Synchronous": false,
"TLS": false,
"Transport": "*json",
},
"SessionSConns": []string{
utils.MetaLocalHost,
},
}

View File

@@ -24,8 +24,16 @@
},
"chargers":{
"enabled": true
"chargers": {
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
},
"attributes": {
"enabled": true,
},
@@ -33,6 +41,12 @@
"enabled": true,
"rals_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
],
"attributes_conns":[
{"address": "127.0.0.1:2012", "transport":"*json"}
],
"chargers_conns":[
{"address": "127.0.0.1:2012", "transport":"*json"},
]
},

View File

@@ -29,23 +29,69 @@ import (
"github.com/cgrates/ltcache"
)
// InternalDataDBParts indexes the internal DataDB partitions
var InternalDataDBParts []string = []string{
ColDst, ColRds, ColAct, ColApl, ColAAp, ColTsk, ColAtr, ColRpl,
ColRpf, ColAcc, ColShg, ColLht, ColVer, ColRsP, ColRFI, ColTmg, ColRes,
ColSqs, ColSqp, ColTps, ColThs, ColFlt, ColSpp, ColAttr, ColCDRs, ColCpp,
ColDpp, ColDph, ColLID,
var unlimitedCfg = &ltcache.CacheConfig{
MaxItems: -1,
}
// internalDBCacheCfg indexes the internal DataDB partitions
var internalDBCacheCfg = map[string]*ltcache.CacheConfig{
utils.CacheDestinations: unlimitedCfg,
utils.CacheReverseDestinations: unlimitedCfg,
utils.CacheActions: unlimitedCfg,
utils.CacheActionPlans: unlimitedCfg,
utils.CacheAccountActionPlans: unlimitedCfg,
utils.CacheActionTriggers: unlimitedCfg,
utils.CacheRatingPlans: unlimitedCfg,
utils.CacheRatingProfiles: unlimitedCfg,
utils.CacheAccounts: unlimitedCfg,
utils.CacheSharedGroups: unlimitedCfg,
utils.TBLVersions: unlimitedCfg,
utils.CacheTimings: unlimitedCfg,
utils.CacheFilters: unlimitedCfg,
utils.CacheResourceProfiles: unlimitedCfg,
utils.CacheResourceFilterIndexes: unlimitedCfg,
utils.CacheResources: unlimitedCfg,
utils.CacheStatFilterIndexes: unlimitedCfg,
utils.CacheStatQueueProfiles: unlimitedCfg,
utils.CacheStatQueues: unlimitedCfg,
utils.CacheThresholdFilterIndexes: unlimitedCfg,
utils.CacheThresholdProfiles: unlimitedCfg,
utils.CacheThresholds: unlimitedCfg,
utils.CacheSupplierFilterIndexes: unlimitedCfg,
utils.CacheSupplierProfiles: unlimitedCfg,
utils.CacheChargerFilterIndexes: unlimitedCfg,
utils.CacheChargerProfiles: unlimitedCfg,
utils.CacheAttributeFilterIndexes: unlimitedCfg,
utils.CacheAttributeProfiles: unlimitedCfg,
utils.CacheDispatcherFilterIndexes: unlimitedCfg,
utils.CacheDispatcherProfiles: unlimitedCfg,
utils.CacheDispatcherHosts: unlimitedCfg,
utils.CacheDiameterMessages: unlimitedCfg,
utils.CacheEventResources: unlimitedCfg,
utils.CacheLoadIDs: unlimitedCfg,
utils.TBLTPTimings: unlimitedCfg,
utils.TBLTPDestinations: unlimitedCfg,
utils.TBLTPRates: unlimitedCfg,
utils.TBLTPDestinationRates: unlimitedCfg,
utils.TBLTPRatingPlans: unlimitedCfg,
utils.TBLTPRateProfiles: unlimitedCfg,
utils.TBLTPSharedGroups: unlimitedCfg,
utils.TBLTPActions: unlimitedCfg,
utils.TBLTPActionTriggers: unlimitedCfg,
utils.TBLTPAccountActions: unlimitedCfg,
utils.TBLTPResources: unlimitedCfg,
utils.TBLTPStats: unlimitedCfg,
utils.TBLTPThresholds: unlimitedCfg,
utils.TBLTPFilters: unlimitedCfg,
utils.SessionCostsTBL: unlimitedCfg,
utils.TBLTPActionPlans: unlimitedCfg,
utils.TBLTPSuppliers: unlimitedCfg,
utils.TBLTPAttributes: unlimitedCfg,
utils.TBLTPChargers: unlimitedCfg,
}
// InternalStorDBParts indexes the internal StorDB partitions
var InternalStorDBParts []string = []string{
utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPRates,
utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, utils.TBLTPRateProfiles,
utils.TBLTPSharedGroups, utils.TBLTPActions, utils.TBLTPActionTriggers,
utils.TBLTPAccountActions, utils.TBLTPResources, utils.TBLTPStats, utils.TBLTPThresholds,
utils.TBLTPFilters, utils.SessionCostsTBL, utils.CDRsTBL, utils.TBLTPActionPlans,
utils.TBLVersions, utils.TBLTPSuppliers, utils.TBLTPAttributes, utils.TBLTPChargers,
}
var InternalStorDBParts []string = []string{}
type InternalDB struct {
tasks []*Task
@@ -58,9 +104,8 @@ type InternalDB struct {
// NewInternalDB constructs an InternalDB
func NewInternalDB(stringIndexedFields, prefixIndexedFields []string) (iDB *InternalDB) {
dfltCfg, _ := config.NewDefaultCGRConfig()
iDB = &InternalDB{
db: ltcache.NewTransCache(dfltCfg.CacheCfg().AsTransCacheConfig()),
db: ltcache.NewTransCache(internalDBCacheCfg),
stringIndexedFields: stringIndexedFields,
prefixIndexedFields: prefixIndexedFields,
cnter: utils.NewCounter(time.Now().UnixNano(), 0),

View File

@@ -33,7 +33,7 @@ func (iDB *InternalDB) GetTpIds(colName string) (ids []string, err error) {
func (iDB *InternalDB) GetTpTableIds(tpid, table string, distinct utils.TPDistinctIds,
filters map[string]string, paginator *utils.PaginatorWithSearch) (ids []string, err error) {
key := table + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
fullIDs := iDB.db.GetItemIDs(table, key)
switch table {
// in case of account action we have the id the following form : loadid:tenant:account
@@ -103,7 +103,7 @@ func (iDB *InternalDB) GetTpTableIds(tpid, table string, distinct utils.TPDistin
}
func (iDB *InternalDB) GetTPTimings(tpid, id string) (timings []*utils.ApierTPTiming, err error) {
key := utils.TBLTPTimings + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if id != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + id
}
@@ -123,7 +123,7 @@ func (iDB *InternalDB) GetTPTimings(tpid, id string) (timings []*utils.ApierTPTi
}
func (iDB *InternalDB) GetTPDestinations(tpid, id string) (dsts []*utils.TPDestination, err error) {
key := utils.TBLTPDestinations + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if id != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + id
}
@@ -143,7 +143,7 @@ func (iDB *InternalDB) GetTPDestinations(tpid, id string) (dsts []*utils.TPDesti
}
func (iDB *InternalDB) GetTPRates(tpid, id string) (rates []*utils.TPRate, err error) {
key := utils.TBLTPRates + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if id != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + id
}
@@ -168,7 +168,7 @@ func (iDB *InternalDB) GetTPRates(tpid, id string) (rates []*utils.TPRate, err e
func (iDB *InternalDB) GetTPDestinationRates(tpid, id string,
paginator *utils.Paginator) (dRates []*utils.TPDestinationRate, err error) {
key := utils.TBLTPDestinationRates + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if id != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + id
}
@@ -213,7 +213,7 @@ func (iDB *InternalDB) GetTPDestinationRates(tpid, id string,
}
func (iDB *InternalDB) GetTPRatingPlans(tpid, id string, paginator *utils.Paginator) (rPlans []*utils.TPRatingPlan, err error) {
key := utils.TBLTPRatingPlans + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if id != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + id
}
@@ -257,7 +257,7 @@ func (iDB *InternalDB) GetTPRatingPlans(tpid, id string, paginator *utils.Pagina
}
func (iDB *InternalDB) GetTPRatingProfiles(filter *utils.TPRatingProfile) (rProfiles []*utils.TPRatingProfile, err error) {
key := utils.TBLTPRateProfiles + utils.CONCATENATED_KEY_SEP + filter.TPid
key := filter.TPid
if filter.LoadId != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + filter.LoadId
@@ -287,7 +287,7 @@ func (iDB *InternalDB) GetTPRatingProfiles(filter *utils.TPRatingProfile) (rProf
}
func (iDB *InternalDB) GetTPSharedGroups(tpid, id string) (sGroups []*utils.TPSharedGroups, err error) {
key := utils.TBLTPSharedGroups + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if id != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + id
}
@@ -307,7 +307,7 @@ func (iDB *InternalDB) GetTPSharedGroups(tpid, id string) (sGroups []*utils.TPSh
}
func (iDB *InternalDB) GetTPActions(tpid, id string) (actions []*utils.TPActions, err error) {
key := utils.TBLTPActions + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if id != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + id
}
@@ -327,7 +327,7 @@ func (iDB *InternalDB) GetTPActions(tpid, id string) (actions []*utils.TPActions
}
func (iDB *InternalDB) GetTPActionPlans(tpid, id string) (aPlans []*utils.TPActionPlan, err error) {
key := utils.TBLTPActionPlans + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if id != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + id
}
@@ -347,7 +347,7 @@ func (iDB *InternalDB) GetTPActionPlans(tpid, id string) (aPlans []*utils.TPActi
}
func (iDB *InternalDB) GetTPActionTriggers(tpid, id string) (aTriggers []*utils.TPActionTriggers, err error) {
key := utils.TBLTPActionTriggers + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if id != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + id
}
@@ -365,7 +365,7 @@ func (iDB *InternalDB) GetTPActionTriggers(tpid, id string) (aTriggers []*utils.
return
}
func (iDB *InternalDB) GetTPAccountActions(filter *utils.TPAccountActions) (accounts []*utils.TPAccountActions, err error) {
key := utils.TBLTPAccountActions + utils.CONCATENATED_KEY_SEP + filter.TPid
key := filter.TPid
if filter.LoadId != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + filter.LoadId
@@ -392,7 +392,7 @@ func (iDB *InternalDB) GetTPAccountActions(filter *utils.TPAccountActions) (acco
}
func (iDB *InternalDB) GetTPResources(tpid, tenant, id string) (resources []*utils.TPResourceProfile, err error) {
key := utils.TBLTPResources + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if tenant != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + tenant
}
@@ -415,7 +415,7 @@ func (iDB *InternalDB) GetTPResources(tpid, tenant, id string) (resources []*uti
}
func (iDB *InternalDB) GetTPStats(tpid, tenant, id string) (stats []*utils.TPStatProfile, err error) {
key := utils.TBLTPStats + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if tenant != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + tenant
}
@@ -438,7 +438,7 @@ func (iDB *InternalDB) GetTPStats(tpid, tenant, id string) (stats []*utils.TPSta
}
func (iDB *InternalDB) GetTPThresholds(tpid, tenant, id string) (ths []*utils.TPThresholdProfile, err error) {
key := utils.TBLTPThresholds + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if tenant != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + tenant
}
@@ -461,7 +461,7 @@ func (iDB *InternalDB) GetTPThresholds(tpid, tenant, id string) (ths []*utils.TP
}
func (iDB *InternalDB) GetTPFilters(tpid, tenant, id string) (fltrs []*utils.TPFilterProfile, err error) {
key := utils.TBLTPFilters + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if tenant != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + tenant
}
@@ -484,7 +484,7 @@ func (iDB *InternalDB) GetTPFilters(tpid, tenant, id string) (fltrs []*utils.TPF
}
func (iDB *InternalDB) GetTPSuppliers(tpid, tenant, id string) (supps []*utils.TPSupplierProfile, err error) {
key := utils.TBLTPSuppliers + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if tenant != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + tenant
}
@@ -507,7 +507,7 @@ func (iDB *InternalDB) GetTPSuppliers(tpid, tenant, id string) (supps []*utils.T
}
func (iDB *InternalDB) GetTPAttributes(tpid, tenant, id string) (attrs []*utils.TPAttributeProfile, err error) {
key := utils.TBLTPAttributes + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if tenant != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + tenant
}
@@ -530,7 +530,7 @@ func (iDB *InternalDB) GetTPAttributes(tpid, tenant, id string) (attrs []*utils.
}
func (iDB *InternalDB) GetTPChargers(tpid, tenant, id string) (cpps []*utils.TPChargerProfile, err error) {
key := utils.TBLTPChargers + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if tenant != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + tenant
}
@@ -553,7 +553,7 @@ func (iDB *InternalDB) GetTPChargers(tpid, tenant, id string) (cpps []*utils.TPC
}
func (iDB *InternalDB) GetTPDispatcherProfiles(tpid, tenant, id string) (dpps []*utils.TPDispatcherProfile, err error) {
key := utils.TBLTPDispatchers + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if tenant != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + tenant
}
@@ -576,7 +576,7 @@ func (iDB *InternalDB) GetTPDispatcherProfiles(tpid, tenant, id string) (dpps []
}
func (iDB *InternalDB) GetTPDispatcherHosts(tpid, tenant, id string) (dpps []*utils.TPDispatcherHost, err error) {
key := utils.TBLTPDispatcherHosts + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if tenant != utils.EmptyString {
key += utils.CONCATENATED_KEY_SEP + tenant
}
@@ -603,7 +603,7 @@ func (iDB *InternalDB) RemTpData(table, tpid string, args map[string]string) (er
if table == utils.EmptyString {
return iDB.Flush(utils.EmptyString)
}
key := table + utils.CONCATENATED_KEY_SEP + tpid
key := tpid
if args != nil {
for _, val := range args {
key += utils.CONCATENATED_KEY_SEP + val
@@ -622,7 +622,7 @@ func (iDB *InternalDB) SetTPTimings(timings []*utils.ApierTPTiming) (err error)
return nil
}
for _, timing := range timings {
iDB.db.Set(utils.TBLTPTimings, utils.ConcatenatedKey(utils.TBLTPTimings, timing.TPid, timing.ID), timing, nil,
iDB.db.Set(utils.TBLTPTimings, utils.ConcatenatedKey(timing.TPid, timing.ID), timing, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -632,7 +632,7 @@ func (iDB *InternalDB) SetTPDestinations(dests []*utils.TPDestination) (err erro
return nil
}
for _, destination := range dests {
iDB.db.Set(utils.TBLTPDestinations, utils.ConcatenatedKey(utils.TBLTPDestinations, destination.TPid, destination.ID), destination, nil,
iDB.db.Set(utils.TBLTPDestinations, utils.ConcatenatedKey(destination.TPid, destination.ID), destination, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -643,7 +643,7 @@ func (iDB *InternalDB) SetTPRates(rates []*utils.TPRate) (err error) {
return nil
}
for _, rate := range rates {
iDB.db.Set(utils.TBLTPRates, utils.ConcatenatedKey(utils.TBLTPRates, rate.TPid, rate.ID), rate, nil,
iDB.db.Set(utils.TBLTPRates, utils.ConcatenatedKey(rate.TPid, rate.ID), rate, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -654,7 +654,7 @@ func (iDB *InternalDB) SetTPDestinationRates(dRates []*utils.TPDestinationRate)
return nil
}
for _, dRate := range dRates {
iDB.db.Set(utils.TBLTPDestinationRates, utils.ConcatenatedKey(utils.TBLTPDestinationRates, dRate.TPid, dRate.ID), dRate, nil,
iDB.db.Set(utils.TBLTPDestinationRates, utils.ConcatenatedKey(dRate.TPid, dRate.ID), dRate, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -665,7 +665,7 @@ func (iDB *InternalDB) SetTPRatingPlans(ratingPlans []*utils.TPRatingPlan) (err
return nil
}
for _, rPlan := range ratingPlans {
iDB.db.Set(utils.TBLTPRatingPlans, utils.ConcatenatedKey(utils.TBLTPRatingPlans, rPlan.TPid, rPlan.ID), rPlan, nil,
iDB.db.Set(utils.TBLTPRatingPlans, utils.ConcatenatedKey(rPlan.TPid, rPlan.ID), rPlan, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -676,7 +676,7 @@ func (iDB *InternalDB) SetTPRatingProfiles(ratingProfiles []*utils.TPRatingProfi
return nil
}
for _, rProfile := range ratingProfiles {
iDB.db.Set(utils.TBLTPRateProfiles, utils.ConcatenatedKey(utils.TBLTPRateProfiles, rProfile.TPid,
iDB.db.Set(utils.TBLTPRateProfiles, utils.ConcatenatedKey(rProfile.TPid,
rProfile.LoadId, rProfile.Tenant, rProfile.Category, rProfile.Subject), rProfile, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
@@ -688,7 +688,7 @@ func (iDB *InternalDB) SetTPSharedGroups(groups []*utils.TPSharedGroups) (err er
return nil
}
for _, group := range groups {
iDB.db.Set(utils.TBLTPSharedGroups, utils.ConcatenatedKey(utils.TBLTPSharedGroups, group.TPid, group.ID), group, nil,
iDB.db.Set(utils.TBLTPSharedGroups, utils.ConcatenatedKey(group.TPid, group.ID), group, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -699,7 +699,7 @@ func (iDB *InternalDB) SetTPActions(acts []*utils.TPActions) (err error) {
return nil
}
for _, action := range acts {
iDB.db.Set(utils.TBLTPActions, utils.ConcatenatedKey(utils.TBLTPActions, action.TPid, action.ID), action, nil,
iDB.db.Set(utils.TBLTPActions, utils.ConcatenatedKey(action.TPid, action.ID), action, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -710,7 +710,7 @@ func (iDB *InternalDB) SetTPActionPlans(aPlans []*utils.TPActionPlan) (err error
return nil
}
for _, aPlan := range aPlans {
iDB.db.Set(utils.TBLTPActionPlans, utils.ConcatenatedKey(utils.TBLTPActionPlans, aPlan.TPid, aPlan.ID), aPlan, nil,
iDB.db.Set(utils.TBLTPActionPlans, utils.ConcatenatedKey(aPlan.TPid, aPlan.ID), aPlan, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -721,7 +721,7 @@ func (iDB *InternalDB) SetTPActionTriggers(aTriggers []*utils.TPActionTriggers)
return nil
}
for _, aTrigger := range aTriggers {
iDB.db.Set(utils.TBLTPActionTriggers, utils.ConcatenatedKey(utils.TBLTPActionTriggers, aTrigger.TPid, aTrigger.ID), aTrigger, nil,
iDB.db.Set(utils.TBLTPActionTriggers, utils.ConcatenatedKey(aTrigger.TPid, aTrigger.ID), aTrigger, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -732,7 +732,7 @@ func (iDB *InternalDB) SetTPAccountActions(accActions []*utils.TPAccountActions)
return nil
}
for _, accAction := range accActions {
iDB.db.Set(utils.TBLTPAccountActions, utils.ConcatenatedKey(utils.TBLTPAccountActions, accAction.TPid,
iDB.db.Set(utils.TBLTPAccountActions, utils.ConcatenatedKey(accAction.TPid,
accAction.LoadId, accAction.Tenant, accAction.Account), accAction, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
@@ -744,7 +744,7 @@ func (iDB *InternalDB) SetTPResources(resources []*utils.TPResourceProfile) (err
return nil
}
for _, resource := range resources {
iDB.db.Set(utils.TBLTPResources, utils.ConcatenatedKey(utils.TBLTPResources, resource.TPid, resource.Tenant, resource.ID), resource, nil,
iDB.db.Set(utils.TBLTPResources, utils.ConcatenatedKey(resource.TPid, resource.Tenant, resource.ID), resource, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -754,7 +754,7 @@ func (iDB *InternalDB) SetTPStats(stats []*utils.TPStatProfile) (err error) {
return nil
}
for _, stat := range stats {
iDB.db.Set(utils.TBLTPStats, utils.ConcatenatedKey(utils.TBLTPStats, stat.TPid, stat.Tenant, stat.ID), stat, nil,
iDB.db.Set(utils.TBLTPStats, utils.ConcatenatedKey(stat.TPid, stat.Tenant, stat.ID), stat, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -765,7 +765,7 @@ func (iDB *InternalDB) SetTPThresholds(thresholds []*utils.TPThresholdProfile) (
}
for _, threshold := range thresholds {
iDB.db.Set(utils.TBLTPThresholds, utils.ConcatenatedKey(utils.TBLTPThresholds, threshold.TPid, threshold.Tenant, threshold.ID), threshold, nil,
iDB.db.Set(utils.TBLTPThresholds, utils.ConcatenatedKey(threshold.TPid, threshold.Tenant, threshold.ID), threshold, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -776,7 +776,7 @@ func (iDB *InternalDB) SetTPFilters(filters []*utils.TPFilterProfile) (err error
}
for _, filter := range filters {
iDB.db.Set(utils.TBLTPFilters, utils.ConcatenatedKey(utils.TBLTPFilters, filter.TPid, filter.Tenant, filter.ID), filter, nil,
iDB.db.Set(utils.TBLTPFilters, utils.ConcatenatedKey(filter.TPid, filter.Tenant, filter.ID), filter, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -787,7 +787,7 @@ func (iDB *InternalDB) SetTPSuppliers(suppliers []*utils.TPSupplierProfile) (err
return nil
}
for _, supplier := range suppliers {
iDB.db.Set(utils.TBLTPSuppliers, utils.ConcatenatedKey(utils.TBLTPSuppliers, supplier.TPid, supplier.Tenant, supplier.ID), supplier, nil,
iDB.db.Set(utils.TBLTPSuppliers, utils.ConcatenatedKey(supplier.TPid, supplier.Tenant, supplier.ID), supplier, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -799,7 +799,7 @@ func (iDB *InternalDB) SetTPAttributes(attributes []*utils.TPAttributeProfile) (
}
for _, attribute := range attributes {
iDB.db.Set(utils.TBLTPAttributes, utils.ConcatenatedKey(utils.TBLTPAttributes, attribute.TPid, attribute.Tenant, attribute.ID), attribute, nil,
iDB.db.Set(utils.TBLTPAttributes, utils.ConcatenatedKey(attribute.TPid, attribute.Tenant, attribute.ID), attribute, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -810,7 +810,7 @@ func (iDB *InternalDB) SetTPChargers(cpps []*utils.TPChargerProfile) (err error)
}
for _, cpp := range cpps {
iDB.db.Set(utils.TBLTPChargers, utils.ConcatenatedKey(utils.TBLTPChargers, cpp.TPid, cpp.Tenant, cpp.ID), cpp, nil,
iDB.db.Set(utils.TBLTPChargers, utils.ConcatenatedKey(cpp.TPid, cpp.Tenant, cpp.ID), cpp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -821,7 +821,7 @@ func (iDB *InternalDB) SetTPDispatcherProfiles(dpps []*utils.TPDispatcherProfile
}
for _, dpp := range dpps {
iDB.db.Set(utils.TBLTPDispatchers, utils.ConcatenatedKey(utils.TBLTPDispatchers, dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil,
iDB.db.Set(utils.TBLTPDispatchers, utils.ConcatenatedKey(dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -831,7 +831,7 @@ func (iDB *InternalDB) SetTPDispatcherHosts(dpps []*utils.TPDispatcherHost) (err
return nil
}
for _, dpp := range dpps {
iDB.db.Set(utils.TBLTPDispatcherHosts, utils.ConcatenatedKey(utils.TBLTPDispatcherHosts, dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil,
iDB.db.Set(utils.TBLTPDispatcherHosts, utils.ConcatenatedKey(dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -843,7 +843,7 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
cdr.OrderID = iDB.cnter.Next()
}
if !allowUpdate {
x, ok := iDB.db.Get(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID))
x, ok := iDB.db.Get(utils.CDRsTBL, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID))
if ok && x != nil {
return utils.ErrExists
}
@@ -879,14 +879,14 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
}
}
iDB.db.Set(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID), cdr, idxs.AsSlice(),
iDB.db.Set(utils.CDRsTBL, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID), cdr, idxs.AsSlice(),
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (iDB *InternalDB) RemoveSMCost(smc *SMCost) (err error) {
iDB.db.Remove(utils.SessionCostsTBL, utils.ConcatenatedKey(utils.SessionCostsTBL, smc.CGRID, smc.RunID, smc.OriginHost, smc.OriginID),
iDB.db.Remove(utils.SessionCostsTBL, utils.ConcatenatedKey(smc.CGRID, smc.RunID, smc.OriginHost, smc.OriginID),
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -1225,7 +1225,7 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C
}
if remove {
for _, cdr := range cdrs {
iDB.db.Remove(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID),
iDB.db.Remove(utils.CDRsTBL, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID),
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return nil, 0, nil
@@ -1366,7 +1366,7 @@ func (iDB *InternalDB) SetSMCost(smCost *SMCost) (err error) {
idxs.Add(utils.ConcatenatedKey(utils.OriginHost, smCost.OriginHost))
idxs.Add(utils.ConcatenatedKey(utils.OriginID, smCost.OriginID))
idxs.Add(utils.ConcatenatedKey(utils.CostSource, smCost.CostSource))
iDB.db.Set(utils.SessionCostsTBL, utils.ConcatenatedKey(utils.SessionCostsTBL, smCost.CGRID, smCost.RunID, smCost.OriginHost, smCost.OriginID), smCost, idxs.AsSlice(),
iDB.db.Set(utils.SessionCostsTBL, utils.ConcatenatedKey(smCost.CGRID, smCost.RunID, smCost.OriginHost, smCost.OriginID), smCost, idxs.AsSlice(),
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return err
}

View File

@@ -24,6 +24,8 @@ import (
"testing"
"time"
"github.com/cgrates/rpcclient"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -48,8 +50,10 @@ func TestAttributeSReload(t *testing.T) {
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg)
attrS := NewAttributeService(cfg, db,
chS, filterSChan, server)
srvMngr.AddServices(attrS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1),
)
srvMngr.AddServices(NewConnManagerService(cfg, nil), attrS,
NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}
@@ -59,6 +63,7 @@ func TestAttributeSReload(t *testing.T) {
if db.IsRunning() {
t.Errorf("Expected service to be down")
}
var reply string
if err := cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"),

View File

@@ -78,7 +78,7 @@ func TestCdrsReload(t *testing.T) {
make(chan rpcclient.RpcClientConnection, 1),
chrS.GetIntenternalChan(), ralS.GetResponder().GetIntenternalChan(),
nil, nil, nil, nil)
srvMngr.AddServices(cdrS, ralS, schS, chrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb)
srvMngr.AddServices(NewConnManagerService(cfg, nil), cdrS, ralS, schS, chrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -24,6 +24,8 @@ import (
"testing"
"time"
"github.com/cgrates/rpcclient"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -49,9 +51,9 @@ func TestChargerSReload(t *testing.T) {
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg)
attrS := NewAttributeService(cfg, db, chS, filterSChan, server)
attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1))
chrS := NewChargerService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan(), nil)
srvMngr.AddServices(attrS, chrS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
srvMngr.AddServices(NewConnManagerService(cfg, nil), attrS, chrS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -25,6 +25,8 @@ import (
"testing"
"time"
"github.com/cgrates/rpcclient"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -48,8 +50,9 @@ func TestDataDBReload(t *testing.T) {
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg)
srvMngr.AddServices(NewAttributeService(cfg, db,
chS, filterSChan, server), NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
srvMngr.AddServices(NewConnManagerService(cfg, nil), NewAttributeService(cfg, db,
chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1)),
NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -24,6 +24,8 @@ import (
"testing"
"time"
"github.com/cgrates/rpcclient"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -50,9 +52,9 @@ func TestDispatcherSReload(t *testing.T) {
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg)
attrS := NewAttributeService(cfg, db, chS, filterSChan, server)
srv := NewDispatcherService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan())
srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1))
srv := NewDispatcherService(cfg, db, chS, filterSChan, server, attrS.GetIntenternalChan(), make(chan rpcclient.RpcClientConnection, 1))
srvMngr.AddServices(NewConnManagerService(cfg, nil), attrS, srv, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -51,9 +51,10 @@ func TestDNSAgentReload(t *testing.T) {
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg)
sS := NewSessionService(cfg, db, server, nil,
nil, nil, nil, nil, nil, nil, nil, nil, engineShutdown)
nil, nil, nil, nil,
nil, nil, nil, nil, make(chan rpcclient.RpcClientConnection, 1), engineShutdown)
srv := NewDNSAgent(cfg, filterSChan, sS.GetIntenternalChan(), nil, engineShutdown)
srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db)
srvMngr.AddServices(NewConnManagerService(cfg, nil), srv, sS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -25,6 +25,8 @@ import (
"testing"
"time"
"github.com/cgrates/rpcclient"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -54,9 +56,10 @@ func TestEventReaderSReload(t *testing.T) {
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg)
sS := NewSessionService(cfg, db, server, nil,
nil, nil, nil, nil, nil, nil, nil, nil, engineShutdown)
attrS := NewEventReaderService(cfg, filterSChan, sS.GetIntenternalChan(), nil, engineShutdown)
srvMngr.AddServices(attrS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
nil, nil, nil, nil,
nil, nil, nil, nil, make(chan rpcclient.RpcClientConnection, 1), engineShutdown)
attrS := NewEventReaderService(cfg, filterSChan, engineShutdown, nil)
srvMngr.AddServices(NewConnManagerService(cfg, nil), attrS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -72,7 +72,7 @@ func TestRalsReload(t *testing.T) {
ralS := NewRalService(cfg, db, stordb, chS, filterSChan, server,
tS.GetIntenternalChan(), internalChan, cacheSChan, internalChan, internalChan,
internalChan, schS, engineShutdown)
srvMngr.AddServices(ralS, schS, tS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb)
srvMngr.AddServices(NewConnManagerService(cfg, nil), ralS, schS, tS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -54,7 +54,7 @@ func TestResourceSReload(t *testing.T) {
db := NewDataDBService(cfg)
tS := NewThresholdService(cfg, db, chS, filterSChan, server)
reS := NewResourceService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil)
srvMngr.AddServices(tS, reS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, reS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -49,7 +49,7 @@ func TestSchedulerSReload(t *testing.T) {
internalCdrSChan <- nil
db := NewDataDBService(cfg)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, internalCdrSChan, nil)
srvMngr.AddServices(schS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
srvMngr.AddServices(NewConnManagerService(cfg, nil), schS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -80,8 +80,9 @@ func TestSessionSReload(t *testing.T) {
chrS.GetIntenternalChan(), ralS.GetResponder().GetIntenternalChan(),
nil, nil, nil, nil)
srv := NewSessionService(cfg, db, server, chrS.GetIntenternalChan(),
ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil, nil, cdrS.GetIntenternalChan(), nil, engineShutdown)
srvMngr.AddServices(srv, chrS, schS, ralS, cdrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb)
ralS.GetResponder().GetIntenternalChan(), nil, nil, nil,
nil, nil, cdrS.GetIntenternalChan(), nil, make(chan rpcclient.RpcClientConnection, 1), engineShutdown)
srvMngr.AddServices(NewConnManagerService(cfg, nil), srv, chrS, schS, ralS, cdrS, NewLoaderService(cfg, db, filterSChan, server, cacheSChan, nil, engineShutdown), db, stordb)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -54,7 +54,7 @@ func TestStatSReload(t *testing.T) {
db := NewDataDBService(cfg)
tS := NewThresholdService(cfg, db, chS, filterSChan, server)
sS := NewStatService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil)
srvMngr.AddServices(tS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -52,7 +52,7 @@ func TestSupplierSReload(t *testing.T) {
db := NewDataDBService(cfg)
sts := NewStatService(cfg, db, chS, filterSChan, server, nil, nil)
supS := NewSupplierService(cfg, db, chS, filterSChan, server, nil, sts.GetIntenternalChan(), nil, nil)
srvMngr.AddServices(supS, sts, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
srvMngr.AddServices(NewConnManagerService(cfg, nil), supS, sts, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -49,7 +49,7 @@ func TestThresholdSReload(t *testing.T) {
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg)
tS := NewThresholdService(cfg, db, chS, filterSChan, server)
srvMngr.AddServices(tS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -305,6 +305,10 @@ func (srvMngr *ServiceManager) handleReload() {
if err = srvMngr.reloadService(utils.StorDB); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJsonName):
if err = srvMngr.reloadService(utils.RPCConnS); err != nil {
return
}
}
// handle RPC server
}
@@ -312,7 +316,6 @@ func (srvMngr *ServiceManager) handleReload() {
func (srvMngr *ServiceManager) reloadService(srviceName string) (err error) {
srv := srvMngr.GetService(srviceName)
if srv.ShouldRun() {
if srv.IsRunning() {
if err = srv.Reload(); err != nil {