Add ReplicatorSv1 for remote/replication functionality

This commit is contained in:
TeoV
2019-11-13 18:20:35 +02:00
committed by Dan Christian Bogos
parent a3a59bbb64
commit 5cc9f94cc0
20 changed files with 217 additions and 434 deletions

View File

@@ -89,12 +89,13 @@ var (
)
// NewDataManager returns a new DataManager
func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, rmtDataDBs, rplDataDBs []*DataManager) *DataManager {
func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, rmtDataDBs []*DataManager,
rplConns *rpcclient.RpcClientPool) *DataManager {
return &DataManager{
dataDB: dataDB,
cacheCfg: cacheCfg,
rmtDataDBs: rmtDataDBs,
rplDataDBs: rplDataDBs,
rplConns: rplConns,
}
}
@@ -103,8 +104,8 @@ func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, rmtDataDBs, rplData
type DataManager struct {
dataDB DataDB
rmtDataDBs []*DataManager
rplDataDBs []*DataManager
cacheCfg config.CacheCfg
rplConns *rpcclient.RpcClientPool
}
// DataDB exports access to dataDB
@@ -387,13 +388,6 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) {
if err = dm.dataDB.SetStoredStatQueueDrv(ssq); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.dataDB.SetStoredStatQueueDrv(ssq); err != nil {
return
}
}
}
return
}
@@ -402,14 +396,6 @@ func (dm *DataManager) RemoveStatQueue(tenant, id string, transactionID string)
if err = dm.dataDB.RemStoredStatQueueDrv(tenant, id); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveStatQueue(tenant, id,
utils.NonTransactional); err != nil {
return
}
}
}
return
}
@@ -462,13 +448,6 @@ func (dm *DataManager) SetFilter(fltr *Filter) (err error) {
if err = dm.DataDB().SetFilterDrv(fltr); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetFilter(fltr); err != nil {
return
}
}
}
return
}
@@ -477,14 +456,6 @@ func (dm *DataManager) RemoveFilter(tenant, id, transactionID string) (err error
if err = dm.DataDB().RemoveFilterDrv(tenant, id); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveFilter(tenant, id,
utils.NonTransactional); err != nil {
return
}
}
}
return
}
@@ -531,13 +502,6 @@ func (dm *DataManager) SetThreshold(th *Threshold) (err error) {
if err = dm.DataDB().SetThresholdDrv(th); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetThreshold(th); err != nil {
return
}
}
}
return
}
@@ -545,14 +509,6 @@ func (dm *DataManager) RemoveThreshold(tenant, id, transactionID string) (err er
if err = dm.DataDB().RemoveThresholdDrv(tenant, id); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveThreshold(tenant, id,
utils.NonTransactional); err != nil {
return
}
}
}
return
}
@@ -623,11 +579,17 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool)
return err
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetThresholdProfile(th, withIndex); err != nil {
return
}
if dm.rplConns != nil {
//call set threshold from replicator
var reply string
if err = dm.rplConns.Call("ReplicatorSv1.SetThresholdProfile", th, &reply); err != nil {
return
}
if err = dm.rplConns.Call("ReplicatorSv1.SetIndexes", th, &reply); err != nil {
return
}
if err = dm.rplConns.Call("ReplicatorSv1.SetThreshold", th, &reply); err != nil {
return
}
}
return
@@ -651,14 +613,7 @@ func (dm *DataManager) RemoveThresholdProfile(tenant, id,
return
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveThresholdProfile(tenant, id,
utils.NonTransactional, withIndex); err != nil {
return
}
}
}
return
}
@@ -729,13 +684,6 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool
return
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetStatQueueProfile(sqp, withIndex); err != nil {
return
}
}
}
return
}
@@ -757,14 +705,6 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id,
return
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveStatQueueProfile(tenant, id,
utils.NonTransactional, withIndex); err != nil {
return
}
}
}
return
}
@@ -811,13 +751,6 @@ func (dm *DataManager) SetTiming(t *utils.TPTiming) (err error) {
if err = dm.CacheDataFromDB(utils.TimingsPrefix, []string{t.ID}, true); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.DataDB().SetTimingDrv(t); err != nil {
return
}
}
}
return
}
@@ -825,14 +758,6 @@ func (dm *DataManager) RemoveTiming(id, transactionID string) (err error) {
if err = dm.DataDB().RemoveTimingDrv(id); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveTiming(id,
utils.NonTransactional); err != nil {
return
}
}
}
Cache.Remove(utils.CacheTimings, id,
cacheCommit(transactionID), transactionID)
return
@@ -881,13 +806,6 @@ func (dm *DataManager) SetResource(rs *Resource) (err error) {
if err = dm.DataDB().SetResourceDrv(rs); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetResource(rs); err != nil {
return
}
}
}
return
}
@@ -895,14 +813,6 @@ func (dm *DataManager) RemoveResource(tenant, id, transactionID string) (err err
if err = dm.DataDB().RemoveResourceDrv(tenant, id); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveResource(tenant, id,
utils.NonTransactional); err != nil {
return
}
}
}
return
}
@@ -973,13 +883,6 @@ func (dm *DataManager) SetResourceProfile(rp *ResourceProfile, withIndex bool) (
}
Cache.Clear([]string{utils.CacheEventResources})
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetResourceProfile(rp, withIndex); err != nil {
return
}
}
}
return
}
@@ -1000,14 +903,6 @@ func (dm *DataManager) RemoveResourceProfile(tenant, id, transactionID string, w
return
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveResourceProfile(tenant, id,
utils.NonTransactional, withIndex); err != nil {
return
}
}
}
return
}
@@ -1050,14 +945,6 @@ func (dm *DataManager) RemoveActionTriggers(id, transactionID string) (err error
if err = dm.DataDB().RemoveActionTriggersDrv(id); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveActionTriggers(id,
utils.NonTransactional); err != nil {
return
}
}
}
Cache.Remove(utils.CacheActionTriggers, id,
cacheCommit(transactionID), transactionID)
return
@@ -1071,13 +958,6 @@ func (dm *DataManager) SetActionTriggers(key string, attr ActionTriggers,
if err = dm.CacheDataFromDB(utils.ACTION_TRIGGER_PREFIX, []string{key}, true); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.DataDB().SetActionTriggersDrv(key, attr); err != nil {
return
}
}
}
return
}
@@ -1125,13 +1005,6 @@ func (dm *DataManager) SetSharedGroup(sg *SharedGroup,
[]string{sg.Id}, true); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.DataDB().SetSharedGroupDrv(sg); err != nil {
return
}
}
}
return
}
@@ -1139,14 +1012,6 @@ func (dm *DataManager) RemoveSharedGroup(id, transactionID string) (err error) {
if err = dm.DataDB().RemoveSharedGroupDrv(id); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveSharedGroup(id,
utils.NonTransactional); err != nil {
return
}
}
}
Cache.Remove(utils.CacheSharedGroups, id,
cacheCommit(transactionID), transactionID)
return
@@ -1196,13 +1061,6 @@ func (dm *DataManager) SetActions(key string, as Actions, transactionID string)
if err = dm.CacheDataFromDB(utils.ACTION_PREFIX, []string{key}, true); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.DataDB().SetActionsDrv(key, as); err != nil {
return
}
}
}
return
}
@@ -1210,14 +1068,6 @@ func (dm *DataManager) RemoveActions(key, transactionID string) (err error) {
if err = dm.DataDB().RemoveActionsDrv(key); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveActions(key,
utils.NonTransactional); err != nil {
return
}
}
}
Cache.Remove(utils.CacheActions, key,
cacheCommit(transactionID), transactionID)
return
@@ -1318,13 +1168,6 @@ func (dm *DataManager) SetRatingPlan(rp *RatingPlan, transactionID string) (err
if err = dm.CacheDataFromDB(utils.RATING_PLAN_PREFIX, []string{rp.Id}, true); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.DataDB().SetRatingPlanDrv(rp); err != nil {
return
}
}
}
return
}
@@ -1332,14 +1175,6 @@ func (dm *DataManager) RemoveRatingPlan(key string, transactionID string) (err e
if err = dm.DataDB().RemoveRatingPlanDrv(key); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveRatingPlan(key,
utils.NonTransactional); err != nil {
return
}
}
}
Cache.Remove(utils.CacheRatingPlans, key,
cacheCommit(transactionID), transactionID)
return
@@ -1388,13 +1223,6 @@ func (dm *DataManager) SetRatingProfile(rpf *RatingProfile,
if err = dm.CacheDataFromDB(utils.RATING_PROFILE_PREFIX, []string{rpf.Id}, true); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.DataDB().SetRatingProfileDrv(rpf); err != nil {
return
}
}
}
return
}
@@ -1403,14 +1231,6 @@ func (dm *DataManager) RemoveRatingProfile(key string,
if err = dm.DataDB().RemoveRatingProfileDrv(key); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveRatingProfile(key,
utils.NonTransactional); err != nil {
return
}
}
}
Cache.Remove(utils.CacheRatingProfiles, key,
cacheCommit(transactionID), transactionID)
return
@@ -1446,14 +1266,6 @@ func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string,
indexes, commit, transactionID); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.DataDB().SetFilterIndexesDrv(cacheID, itemIDPrefix,
indexes, commit, transactionID); err != nil {
return
}
}
}
return
}
@@ -1461,14 +1273,6 @@ func (dm *DataManager) RemoveFilterIndexes(cacheID, itemIDPrefix string) (err er
if err = dm.DataDB().RemoveFilterIndexesDrv(cacheID, itemIDPrefix); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveFilterIndexes(cacheID,
itemIDPrefix); err != nil {
return
}
}
}
return
}
@@ -1593,13 +1397,6 @@ func (dm *DataManager) SetSupplierProfile(supp *SupplierProfile, withIndex bool)
return
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetSupplierProfile(supp, withIndex); err != nil {
return
}
}
}
return
}
@@ -1620,14 +1417,6 @@ func (dm *DataManager) RemoveSupplierProfile(tenant, id, transactionID string, w
return
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveSupplierProfile(tenant, id,
utils.NonTransactional, withIndex); err != nil {
return
}
}
}
return
}
@@ -1709,13 +1498,6 @@ func (dm *DataManager) SetAttributeProfile(ap *AttributeProfile, withIndex bool)
}
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetAttributeProfile(ap, withIndex); err != nil {
return
}
}
}
return
}
@@ -1738,14 +1520,6 @@ func (dm *DataManager) RemoveAttributeProfile(tenant, id string, transactionID s
}
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveAttributeProfile(tenant, id,
utils.NonTransactional, withIndex); err != nil {
return
}
}
}
return
}
@@ -1816,13 +1590,6 @@ func (dm *DataManager) SetChargerProfile(cpp *ChargerProfile, withIndex bool) (e
return
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetChargerProfile(cpp, withIndex); err != nil {
return
}
}
}
return
}
@@ -1844,14 +1611,6 @@ func (dm *DataManager) RemoveChargerProfile(tenant, id string,
return
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveChargerProfile(tenant, id,
utils.NonTransactional, withIndex); err != nil {
return
}
}
}
return
}
@@ -1929,13 +1688,6 @@ func (dm *DataManager) SetDispatcherProfile(dpp *DispatcherProfile, withIndex bo
}
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetDispatcherProfile(dpp, withIndex); err != nil {
return
}
}
}
return
}
@@ -1959,14 +1711,6 @@ func (dm *DataManager) RemoveDispatcherProfile(tenant, id string,
}
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveDispatcherProfile(tenant, id,
utils.NonTransactional, withIndex); err != nil {
return
}
}
}
return
}
@@ -2021,13 +1765,7 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit
func (dm *DataManager) SetDispatcherHost(dpp *DispatcherHost) (err error) {
if err = dm.DataDB().SetDispatcherHostDrv(dpp); err != nil {
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetDispatcherHost(dpp); err != nil {
return
}
}
}
return
}
return
}
@@ -2044,14 +1782,6 @@ func (dm *DataManager) RemoveDispatcherHost(tenant, id string,
if oldDpp == nil {
return utils.ErrNotFound
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveDispatcherHost(tenant, id,
utils.NonTransactional); err != nil {
return
}
}
}
return
}
@@ -2091,13 +1821,6 @@ func (dm *DataManager) SetLoadIDs(loadIDs map[string]int64) (err error) {
if err = dm.DataDB().SetLoadIDsDrv(loadIDs); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.SetLoadIDs(loadIDs); err != nil {
return
}
}
}
return
}

View File

@@ -292,7 +292,8 @@ func InitDataDb(cfg *config.CGRConfig) error {
if err != nil {
return err
}
var rmtDBConns, rplDBConns []*DataManager
var rmtDBConns []*DataManager
var rplConns *rpcclient.RpcClientPool
if len(cfg.DataDbCfg().RmtDataDBCfgs) != 0 {
rmtDBConns = make([]*DataManager, len(cfg.DataDbCfg().RmtDataDBCfgs))
for i, dbCfg := range cfg.DataDbCfg().RmtDataDBCfgs {
@@ -307,21 +308,17 @@ func InitDataDb(cfg *config.CGRConfig) error {
rmtDBConns[i] = NewDataManager(dbConn, nil, nil, nil)
}
}
if len(cfg.DataDbCfg().RplDataDBCfgs) != 0 {
rplDBConns = make([]*DataManager, len(cfg.DataDbCfg().RplDataDBCfgs))
for i, dbCfg := range cfg.DataDbCfg().RplDataDBCfgs {
dbConn, err := NewDataDBConn(dbCfg.DataDbType,
dbCfg.DataDbHost, dbCfg.DataDbPort,
dbCfg.DataDbName, dbCfg.DataDbUser,
dbCfg.DataDbPass, cfg.GeneralCfg().DBDataEncoding,
dbCfg.DataDbSentinelName)
if err != nil {
return err
}
rplDBConns[i] = NewDataManager(dbConn, nil, nil, nil)
if len(cfg.DataDbCfg().RplConns) != 0 {
rplConns, err = NewRPCPool(rpcclient.POOL_BROADCAST, cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DataDbCfg().RplConns, nil, false)
if err != nil {
return err
}
}
dm := NewDataManager(d, cfg.CacheCfg(), rmtDBConns, rplDBConns)
dm := NewDataManager(d, cfg.CacheCfg(), rmtDBConns, rplConns)
if err := dm.DataDB().Flush(""); err != nil {
return err

View File

@@ -68,7 +68,8 @@ type TpReader struct {
func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string,
cacheS rpcclient.RpcClientConnection, schedulerS rpcclient.RpcClientConnection) (*TpReader, error) {
var rmtDBConns, rplDBConns []*DataManager
var rmtDBConns []*DataManager
var rplConns *rpcclient.RpcClientPool
if len(config.CgrConfig().DataDbCfg().RmtDataDBCfgs) != 0 {
rmtDBConns = make([]*DataManager, len(config.CgrConfig().DataDbCfg().RmtDataDBCfgs))
for i, dbCfg := range config.CgrConfig().DataDbCfg().RmtDataDBCfgs {
@@ -83,24 +84,21 @@ func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string,
rmtDBConns[i] = NewDataManager(dbConn, nil, nil, nil)
}
}
if len(config.CgrConfig().DataDbCfg().RplDataDBCfgs) != 0 {
rplDBConns = make([]*DataManager, len(config.CgrConfig().DataDbCfg().RplDataDBCfgs))
for i, dbCfg := range config.CgrConfig().DataDbCfg().RplDataDBCfgs {
dbConn, err := NewDataDBConn(dbCfg.DataDbType,
dbCfg.DataDbHost, dbCfg.DataDbPort,
dbCfg.DataDbName, dbCfg.DataDbUser,
dbCfg.DataDbPass, config.CgrConfig().GeneralCfg().DBDataEncoding,
dbCfg.DataDbSentinelName)
if err != nil {
return nil, err
}
rplDBConns[i] = NewDataManager(dbConn, nil, nil, nil)
if len(config.CgrConfig().DataDbCfg().RplConns) != 0 {
var err error
rplConns, err = NewRPCPool(rpcclient.POOL_BROADCAST, config.CgrConfig().TlsCfg().ClientKey,
config.CgrConfig().TlsCfg().ClientCerificate, config.CgrConfig().TlsCfg().CaCertificate,
config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects,
config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout,
config.CgrConfig().DataDbCfg().RplConns, nil, false)
if err != nil {
return nil, err
}
}
tpr := &TpReader{
tpid: tpid,
timezone: timezone,
dm: NewDataManager(db, config.CgrConfig().CacheCfg(), rmtDBConns, rplDBConns), // ToDo: add ChacheCfg as parameter to the NewTpReader
dm: NewDataManager(db, config.CgrConfig().CacheCfg(), rmtDBConns, rplConns), // ToDo: add ChacheCfg as parameter to the NewTpReader
lr: lr,
cacheS: cacheS,
schedulerS: schedulerS,