Add rpc pool for remote connections

This commit is contained in:
TeoV
2019-11-14 19:45:19 +02:00
committed by Dan Christian Bogos
parent 5cc9f94cc0
commit ee37110cfe
20 changed files with 749 additions and 815 deletions

View File

@@ -89,23 +89,22 @@ var (
)
// NewDataManager returns a new DataManager
func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, rmtDataDBs []*DataManager,
func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, rmtConns,
rplConns *rpcclient.RpcClientPool) *DataManager {
return &DataManager{
dataDB: dataDB,
cacheCfg: cacheCfg,
rmtDataDBs: rmtDataDBs,
rplConns: rplConns,
dataDB: dataDB,
cacheCfg: cacheCfg,
rmtConns: rmtConns,
rplConns: rplConns,
}
}
// DataManager is the data storage manager for CGRateS
// transparently manages data retrieval, further serialization and caching
type DataManager struct {
dataDB DataDB
rmtDataDBs []*DataManager
cacheCfg config.CacheCfg
rplConns *rpcclient.RpcClientPool
dataDB DataDB
cacheCfg config.CacheCfg
rmtConns, rplConns *rpcclient.RpcClientPool
}
// DataDB exports access to dataDB
@@ -320,14 +319,8 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
func (dm *DataManager) GetAccount(id string) (acc *Account, err error) {
acc, err = dm.dataDB.GetAccountDrv(id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if acc, rmtErr = rmtDM.dataDB.GetAccountDrv(id); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetAccount, id, &acc)
}
if err != nil {
return nil, err
@@ -351,14 +344,9 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
}
ssq, err := dm.dataDB.GetStoredStatQueueDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if ssq, rmtErr = rmtDM.dataDB.GetStoredStatQueueDrv(tenant, id); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetStatQueue,
&utils.TenantID{Tenant: tenant, ID: id}, &ssq)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -417,15 +405,9 @@ func (dm *DataManager) GetFilter(tenant, id string, cacheRead, cacheWrite bool,
fltr, err = dm.DataDB().GetFilterDrv(tenant, id)
}
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if fltr, rmtErr = rmtDM.GetFilter(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetFilter,
&utils.TenantID{Tenant: tenant, ID: id}, &fltr)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -472,15 +454,9 @@ func (dm *DataManager) GetThreshold(tenant, id string,
}
th, err = dm.dataDB.GetThresholdDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if th, rmtErr = rmtDM.GetThreshold(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetThreshold,
&utils.TenantID{Tenant: tenant, ID: id}, &th)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -525,15 +501,9 @@ func (dm *DataManager) GetThresholdProfile(tenant, id string, cacheRead, cacheWr
}
th, err = dm.dataDB.GetThresholdProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if th, rmtErr = rmtDM.GetThresholdProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetThresholdProfile,
&utils.TenantID{Tenant: tenant, ID: id}, &th)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -630,15 +600,9 @@ func (dm *DataManager) GetStatQueueProfile(tenant, id string, cacheRead, cacheWr
}
sqp, err = dm.dataDB.GetStatQueueProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if sqp, rmtErr = rmtDM.GetStatQueueProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetStatQueueProfile,
&utils.TenantID{Tenant: tenant, ID: id}, &sqp)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -720,15 +684,9 @@ func (dm *DataManager) GetTiming(id string, skipCache bool,
}
t, err = dm.dataDB.GetTimingDrv(id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if t, rmtErr = rmtDM.GetTiming(id, false,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetTiming,
id, &t)
}
if err != nil {
if err == utils.ErrNotFound {
@@ -776,15 +734,9 @@ func (dm *DataManager) GetResource(tenant, id string, cacheRead, cacheWrite bool
}
rs, err = dm.dataDB.GetResourceDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if rs, rmtErr = rmtDM.GetResource(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetResource,
&utils.TenantID{Tenant: tenant, ID: id}, &rs)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -829,15 +781,9 @@ func (dm *DataManager) GetResourceProfile(tenant, id string, cacheRead, cacheWri
}
rp, err = dm.dataDB.GetResourceProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if rp, rmtErr = rmtDM.GetResourceProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetResourceProfile,
&utils.TenantID{Tenant: tenant, ID: id}, &rp)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -918,15 +864,9 @@ func (dm *DataManager) GetActionTriggers(id string, skipCache bool,
}
attrs, err = dm.dataDB.GetActionTriggersDrv(id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if attrs, rmtErr = rmtDM.GetActionTriggers(id, true,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetActionTriggers,
id, attrs)
}
if err != nil {
if err == utils.ErrNotFound {
@@ -973,15 +913,9 @@ func (dm *DataManager) GetSharedGroup(key string, skipCache bool,
}
sg, err = dm.DataDB().GetSharedGroupDrv(key)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if sg, rmtErr = rmtDM.GetSharedGroup(key, true,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetShareGroup,
key, &sg)
}
if err != nil {
if err == utils.ErrNotFound {
@@ -1031,15 +965,9 @@ func (dm *DataManager) GetActions(key string, skipCache bool, transactionID stri
}
as, err = dm.DataDB().GetActionsDrv(key)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if as, rmtErr = rmtDM.GetActions(key, true,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetActions,
key, &as)
}
if err != nil {
if err == utils.ErrNotFound {
@@ -1075,15 +1003,9 @@ func (dm *DataManager) RemoveActions(key, transactionID string) (err error) {
func (dm *DataManager) GetActionPlan(key string, skipCache bool, transactionID string) (ats *ActionPlan, err error) {
ats, err = dm.dataDB.GetActionPlanDrv(key, skipCache, transactionID)
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if ats, rmtErr = rmtDM.GetActionPlan(key, true,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetActionPlan,
key, &ats)
}
if err != nil {
return nil, err
@@ -1093,14 +1015,9 @@ func (dm *DataManager) GetActionPlan(key string, skipCache bool, transactionID s
func (dm *DataManager) GetAllActionPlans() (ats map[string]*ActionPlan, err error) {
ats, err = dm.dataDB.GetAllActionPlansDrv()
if ((err == nil && len(ats) == 0) || err == utils.ErrNotFound) && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if ats, rmtErr = rmtDM.GetAllActionPlans(); rmtErr == nil {
break
}
}
err = rmtErr
if ((err == nil && len(ats) == 0) || err == utils.ErrNotFound) && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetAllActionPlans,
utils.EmptyString, &ats)
}
if err != nil {
return nil, err
@@ -1111,14 +1028,9 @@ func (dm *DataManager) GetAllActionPlans() (ats map[string]*ActionPlan, err erro
func (dm *DataManager) GetAccountActionPlans(acntID string,
skipCache bool, transactionID string) (apIDs []string, err error) {
apIDs, err = dm.dataDB.GetAccountActionPlansDrv(acntID, skipCache, transactionID)
if ((err == nil && len(apIDs) == 0) || err == utils.ErrNotFound) && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if apIDs, rmtErr = rmtDM.dataDB.GetAccountActionPlansDrv(acntID, skipCache, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if ((err == nil && len(apIDs) == 0) || err == utils.ErrNotFound) && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetAccountActionPlans,
acntID, &apIDs)
}
if err != nil {
return nil, err
@@ -1138,15 +1050,9 @@ func (dm *DataManager) GetRatingPlan(key string, skipCache bool,
}
rp, err = dm.DataDB().GetRatingPlanDrv(key)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if rp, rmtErr = rmtDM.GetRatingPlan(key, true,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetRatingPlan,
key, &rp)
}
if err != nil {
if err == utils.ErrNotFound {
@@ -1192,15 +1098,9 @@ func (dm *DataManager) GetRatingProfile(key string, skipCache bool,
}
rpf, err = dm.DataDB().GetRatingProfileDrv(key)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if rpf, rmtErr = rmtDM.GetRatingProfile(key, true,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetRatingProfile,
key, &rpf)
}
if err != nil {
if err == utils.ErrNotFound {
@@ -1243,16 +1143,16 @@ func (dm *DataManager) HasData(category, subject, tenant string) (has bool, err
func (dm *DataManager) GetFilterIndexes(cacheID, itemIDPrefix, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
if indexes, err = dm.DataDB().GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType, fldNameVal); err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if indexes, rmtErr = rmtDM.GetFilterIndexes(cacheID, itemIDPrefix,
filterType, fldNameVal); rmtErr == nil {
break
}
}
err = rmtErr
}
//if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
// var rmtErr error
// for _, rmtDM := range dm.rmtDataDBs {
// if indexes, rmtErr = rmtDM.GetFilterIndexes(cacheID, itemIDPrefix,
// filterType, fldNameVal); rmtErr == nil {
// break
// }
// }
// err = rmtErr
//}
if err != nil {
return nil, err
}
@@ -1302,16 +1202,16 @@ func (dm *DataManager) MatchFilterIndex(cacheID, itemIDPrefix,
// Not found in cache, check in DB
itemIDs, err = dm.DataDB().MatchFilterIndexDrv(cacheID, itemIDPrefix, filterType, fieldName, fieldVal)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if itemIDs, rmtErr = rmtDM.MatchFilterIndex(cacheID, itemIDPrefix,
filterType, fieldName, fieldVal); rmtErr == nil {
break
}
}
err = rmtErr
}
//if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
// var rmtErr error
// for _, rmtDM := range dm.rmtDataDBs {
// if itemIDs, rmtErr = rmtDM.MatchFilterIndex(cacheID, itemIDPrefix,
// filterType, fieldName, fieldVal); rmtErr == nil {
// break
// }
// }
// err = rmtErr
//}
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(cacheID, fieldValKey, nil, nil,
@@ -1339,15 +1239,9 @@ func (dm *DataManager) GetSupplierProfile(tenant, id string, cacheRead, cacheWri
}
supp, err = dm.dataDB.GetSupplierProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if supp, rmtErr = rmtDM.GetSupplierProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetSupplierProfile,
&utils.TenantID{Tenant: tenant, ID: id}, &supp)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -1433,15 +1327,9 @@ func (dm *DataManager) GetAttributeProfile(tenant, id string, cacheRead, cacheWr
}
attrPrfl, err = dm.dataDB.GetAttributeProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if attrPrfl, rmtErr = rmtDM.GetAttributeProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetAttributeProfile,
&utils.TenantID{Tenant: tenant, ID: id}, &attrPrfl)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -1536,15 +1424,9 @@ func (dm *DataManager) GetChargerProfile(tenant, id string, cacheRead, cacheWrit
}
cpp, err = dm.dataDB.GetChargerProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if cpp, rmtErr = rmtDM.GetChargerProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetChargerProfile,
&utils.TenantID{Tenant: tenant, ID: id}, &cpp)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -1627,15 +1509,9 @@ func (dm *DataManager) GetDispatcherProfile(tenant, id string, cacheRead, cacheW
}
dpp, err = dm.dataDB.GetDispatcherProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if dpp, rmtErr = rmtDM.GetDispatcherProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetDispatcherProfile,
&utils.TenantID{Tenant: tenant, ID: id}, &dpp)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -1727,15 +1603,9 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit
}
dH, err = dm.dataDB.GetDispatcherHostDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if dH, rmtErr = rmtDM.GetDispatcherHost(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetDispatcherHost,
&utils.TenantID{Tenant: tenant, ID: id}, &dH)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
@@ -1788,14 +1658,9 @@ func (dm *DataManager) RemoveDispatcherHost(tenant, id string,
func (dm *DataManager) GetItemLoadIDs(itemIDPrefix string, cacheWrite bool) (loadIDs map[string]int64, err error) {
loadIDs, err = dm.DataDB().GetItemLoadIDsDrv(itemIDPrefix)
if err != nil {
if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if loadIDs, rmtErr = rmtDM.GetItemLoadIDs(itemIDPrefix, false); rmtErr == nil {
break
}
}
err = rmtErr
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetItemLoadIDs,
itemIDPrefix, &loadIDs)
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {

View File

@@ -292,33 +292,7 @@ func InitDataDb(cfg *config.CGRConfig) error {
if err != nil {
return err
}
var rmtDBConns []*DataManager
var rplConns *rpcclient.RpcClientPool
if len(cfg.DataDbCfg().RmtDataDBCfgs) != 0 {
rmtDBConns = make([]*DataManager, len(cfg.DataDbCfg().RmtDataDBCfgs))
for i, dbCfg := range cfg.DataDbCfg().RmtDataDBCfgs {
dbConn, err := NewDataDBConn(dbCfg.DataDbType,
dbCfg.DataDbHost, dbCfg.DataDbPort,
dbCfg.DataDbName, dbCfg.DataDbUser,
dbCfg.DataDbPass, cfg.GeneralCfg().DBDataEncoding,
dbCfg.DataDbSentinelName)
if err != nil {
return err
}
rmtDBConns[i] = NewDataManager(dbConn, nil, nil, nil)
}
}
if len(cfg.DataDbCfg().RplConns) != 0 {
rplConns, err = NewRPCPool(rpcclient.POOL_BROADCAST, cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DataDbCfg().RplConns, nil, false)
if err != nil {
return err
}
}
dm := NewDataManager(d, cfg.CacheCfg(), rmtDBConns, rplConns)
dm := NewDataManager(d, cfg.CacheCfg(), nil, nil)
if err := dm.DataDB().Flush(""); err != nil {
return err

View File

@@ -68,20 +68,16 @@ type TpReader struct {
func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string,
cacheS rpcclient.RpcClientConnection, schedulerS rpcclient.RpcClientConnection) (*TpReader, error) {
var rmtDBConns []*DataManager
var rplConns *rpcclient.RpcClientPool
if len(config.CgrConfig().DataDbCfg().RmtDataDBCfgs) != 0 {
rmtDBConns = make([]*DataManager, len(config.CgrConfig().DataDbCfg().RmtDataDBCfgs))
for i, dbCfg := range config.CgrConfig().DataDbCfg().RmtDataDBCfgs {
dbConn, err := NewDataDBConn(dbCfg.DataDbType,
dbCfg.DataDbHost, dbCfg.DataDbPort,
dbCfg.DataDbName, dbCfg.DataDbUser,
dbCfg.DataDbPass, config.CgrConfig().GeneralCfg().DBDataEncoding,
dbCfg.DataDbSentinelName)
if err != nil {
return nil, err
}
rmtDBConns[i] = NewDataManager(dbConn, nil, nil, nil)
var rmtConns, rplConns *rpcclient.RpcClientPool
if len(config.CgrConfig().DataDbCfg().RmtConns) != 0 {
var err error
rmtConns, err = NewRPCPool(rpcclient.POOL_FIRST, config.CgrConfig().TlsCfg().ClientKey,
config.CgrConfig().TlsCfg().ClientCerificate, config.CgrConfig().TlsCfg().CaCertificate,
config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects,
config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout,
config.CgrConfig().DataDbCfg().RmtConns, nil, false)
if err != nil {
return nil, err
}
}
if len(config.CgrConfig().DataDbCfg().RplConns) != 0 {
@@ -98,7 +94,7 @@ func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string,
tpr := &TpReader{
tpid: tpid,
timezone: timezone,
dm: NewDataManager(db, config.CgrConfig().CacheCfg(), rmtDBConns, rplConns), // ToDo: add ChacheCfg as parameter to the NewTpReader
dm: NewDataManager(db, config.CgrConfig().CacheCfg(), rmtConns, rplConns), // ToDo: add ChacheCfg as parameter to the NewTpReader
lr: lr,
cacheS: cacheS,
schedulerS: schedulerS,