Replacing replicate in datamanager

This commit is contained in:
andronache
2021-07-13 09:49:53 +03:00
committed by Dan Christian Bogos
parent 5c564c471d
commit 07111d1e4d
9 changed files with 440 additions and 234 deletions

View File

@@ -310,6 +310,7 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) {
Items: 0,
Groups: 0,
},
utils.CacheReplicationHosts: {},
}
if err := precacheRPC.Call(utils.CacheSv1GetCacheStats, args, &reply); err != nil {
t.Error(err.Error())

View File

@@ -39,6 +39,7 @@ type DataDbCfg struct {
QueryTimeout time.Duration
RmtConns []string // Remote DataDB connIDs
RplConns []string // Replication connIDs
RplFiltered bool
Items map[string]*ItemOpt
}
@@ -77,6 +78,9 @@ func (dbcfg *DataDbCfg) loadFromJsonCfg(jsnDbCfg *DbJsonCfg) (err error) {
return err
}
}
if jsnDbCfg.Replication_filtered != nil {
dbcfg.RplFiltered = *jsnDbCfg.Replication_filtered
}
if jsnDbCfg.Remote_conns != nil {
dbcfg.RmtConns = make([]string, len(*jsnDbCfg.Remote_conns))
for idx, rmtConn := range *jsnDbCfg.Remote_conns {
@@ -125,6 +129,7 @@ func (dbcfg *DataDbCfg) Clone() *DataDbCfg {
DataDbSentinelName: dbcfg.DataDbSentinelName,
QueryTimeout: dbcfg.QueryTimeout,
Items: dbcfg.Items,
RplFiltered: dbcfg.RplFiltered,
}
}
@@ -140,17 +145,18 @@ func (dbcfg *DataDbCfg) AsMapInterface() map[string]interface{} {
dbPort, _ := strconv.Atoi(dbcfg.DataDbPort)
return map[string]interface{}{
utils.DataDbTypeCfg: utils.Meta + dbcfg.DataDbType,
utils.DataDbHostCfg: dbcfg.DataDbHost,
utils.DataDbPortCfg: dbPort,
utils.DataDbNameCfg: dbcfg.DataDbName,
utils.DataDbUserCfg: dbcfg.DataDbUser,
utils.DataDbPassCfg: dbcfg.DataDbPass,
utils.DataDbSentinelNameCfg: dbcfg.DataDbSentinelName,
utils.QueryTimeoutCfg: queryTimeout,
utils.RmtConnsCfg: dbcfg.RmtConns,
utils.RplConnsCfg: dbcfg.RplConns,
utils.ItemsCfg: items,
utils.DataDbTypeCfg: utils.Meta + dbcfg.DataDbType,
utils.DataDbHostCfg: dbcfg.DataDbHost,
utils.DataDbPortCfg: dbPort,
utils.DataDbNameCfg: dbcfg.DataDbName,
utils.DataDbUserCfg: dbcfg.DataDbUser,
utils.DataDbPassCfg: dbcfg.DataDbPass,
utils.DataDbSentinelNameCfg: dbcfg.DataDbSentinelName,
utils.QueryTimeoutCfg: queryTimeout,
utils.RmtConnsCfg: dbcfg.RmtConns,
utils.RplConnsCfg: dbcfg.RplConns,
utils.ItemsCfg: items,
utils.ReplicationFilteredCfg: dbcfg.RplFiltered,
}
}

View File

@@ -418,16 +418,17 @@ func TestDataDbCfgAsMapInterface(t *testing.T) {
},
}`
eMap := map[string]interface{}{
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10",
"db_user": "cgrates",
"db_password": "",
"redis_sentinel": "",
"query_timeout": "10s",
"remote_conns": []string{},
"replication_conns": []string{},
"db_type": "*redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10",
"db_user": "cgrates",
"db_password": "",
"redis_sentinel": "",
"query_timeout": "10s",
"remote_conns": []string{},
"replication_conns": []string{},
"replication_filtered": false,
"items": map[string]interface{}{
"*accounts": map[string]interface{}{"remote": true, "replicate": false, "limit": -1, "ttl": "", "static_ttl": false},
"*reverse_destinations": map[string]interface{}{"remote": false, "replicate": false, "limit": 7, "ttl": "", "static_ttl": true},

View File

@@ -97,6 +97,7 @@ type DbJsonCfg struct {
Remote_conns *[]string
Replication_conns *[]string
Items *map[string]*ItemOptJson
Replication_filtered *bool
}
type ItemOptJson struct {
@@ -249,6 +250,7 @@ type RPCConnsJson struct {
// Represents one connection instance towards a rater/cdrs server
type RemoteHostJson struct {
Id *string
Address *string
Transport *string
Synchronous *bool

View File

@@ -77,6 +77,7 @@ func (rC *RPCConn) AsMapInterface() map[string]interface{} {
// One connection to Rater
type RemoteHost struct {
ID string
Address string
Transport string
Synchronous bool
@@ -87,6 +88,12 @@ func (self *RemoteHost) loadFromJsonCfg(jsnCfg *RemoteHostJson) error {
if jsnCfg == nil {
return nil
}
if jsnCfg.Id != nil {
self.ID = *jsnCfg.Id
// ignore defaults if we have ID
self.Address = utils.EmptyString
self.Transport = utils.EmptyString
}
if jsnCfg.Address != nil {
self.Address = *jsnCfg.Address
}

View File

@@ -23,12 +23,17 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
"github.com/cgrates/rpcclient"
)
// NewConnManager returns the Connection Manager
func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan rpcclient.ClientConnector) (cM *ConnManager) {
cM = &ConnManager{cfg: cfg, rpcInternal: rpcInternal}
cM = &ConnManager{
cfg: cfg,
rpcInternal: rpcInternal,
connCache: ltcache.NewCache(-1, 0, true, nil),
}
SetConnManager(cM)
return
}
@@ -37,6 +42,7 @@ func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan rpcclient
type ConnManager struct {
cfg *config.CGRConfig
rpcInternal map[string]chan rpcclient.ClientConnector
connCache *ltcache.Cache
}
// getConn is used to retrieve a connection from cache
@@ -142,3 +148,100 @@ func (cM *ConnManager) Call(connIDs []string, biRPCClient rpcclient.ClientConnec
}
return
}
func (cM *ConnManager) Reload() {
Cache.Clear([]string{utils.CacheRPCConnections})
Cache.Clear([]string{utils.CacheReplicationHosts})
cM.connCache.Clear()
}
func (cM *ConnManager) CallWithConnIDs(connIDs []string, subsHostIDs utils.StringSet, method string, arg, reply interface{}) (err error) {
if len(connIDs) == 0 {
return utils.NewErrMandatoryIeMissing("connIDs")
}
// no connection for this id exit here
if subsHostIDs.Size() == 0 {
return
}
var conn rpcclient.ClientConnector
for _, connID := range connIDs {
// recreate the config with only conns that are needed
connCfg := cM.cfg.RPCConns()[connID]
newCfg := &config.RPCConn{
Strategy: connCfg.Strategy,
PoolSize: connCfg.PoolSize,
// alloc for all connection in order to not increase the size later
Conns: make([]*config.RemoteHost, 0, len(connCfg.Conns)),
}
for _, conn := range connCfg.Conns {
if conn.ID != utils.EmptyString &&
subsHostIDs.Has(conn.ID) {
newCfg.Conns = append(newCfg.Conns, conn) // the slice will never grow
}
}
if len(newCfg.Conns) == 0 {
// skip this pool if no connection matches
continue
}
if conn, err = cM.getConnWithConfig(connID, newCfg, nil, nil, false); err != nil {
continue
}
if err = conn.Call(method, arg, reply); !rpcclient.IsNetworkError(err) {
return
}
}
return
}
func (cM *ConnManager) getConnWithConfig(connID string, connCfg *config.RPCConn,
biRPCClient rpcclient.ClientConnector, intChan chan rpcclient.ClientConnector,
isInternalRPC bool) (conn rpcclient.ClientConnector, err error) {
switch {
case biRPCClient != nil && isInternalRPC:
var rply string
sSIntConn := <-intChan
intChan <- sSIntConn
conn = utils.NewBiRPCInternalClient(sSIntConn.(utils.BiRPCServer))
conn.(*utils.BiRPCInternalClient).SetClientConn(biRPCClient)
if err = conn.Call(utils.SessionSv1RegisterInternalBiJSONConn,
utils.EmptyString, &rply); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not register biRPCClient, error: <%s>",
utils.SessionS, err.Error()))
return
}
case connCfg.Strategy == rpcclient.PoolParallel:
rpcConnCfg := connCfg.Conns[0] // for parrallel we need only the first connection
codec := rpcclient.GOBrpc
switch {
case rpcConnCfg.Address == rpcclient.InternalRPC:
codec = rpcclient.InternalRPC
case rpcConnCfg.Transport == utils.EmptyString:
intChan = nil
case rpcConnCfg.Transport == rpcclient.GOBrpc,
rpcConnCfg.Transport == rpcclient.JSONrpc:
codec = rpcConnCfg.Transport
intChan = nil
default:
err = fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport)
return
}
if conn, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS,
cM.cfg.TlsCfg().ClientKey, cM.cfg.TlsCfg().ClientCerificate,
cM.cfg.TlsCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts,
cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout,
cM.cfg.GeneralCfg().ReplyTimeout, codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false); err != nil {
return
}
default:
if conn, err = NewRPCPool(connCfg.Strategy,
cM.cfg.TlsCfg().ClientKey,
cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate,
cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects,
cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
connCfg.Conns, intChan, false); err != nil {
return
}
}
return
}

View File

@@ -388,13 +388,12 @@ func (dm *DataManager) SetDestination(dest *Destination, transactionID string) (
if err = dm.dataDB.SetDestinationDrv(dest, transactionID); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaDestinations].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetDestination, dest, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDestinations]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.DestinationPrefix, destID, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveDestination,
destID)
}
return
}
@@ -416,12 +415,11 @@ func (dm *DataManager) SetReverseDestination(dest *Destination, transactionID st
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaReverseDestinations].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetReverseDestination, dest, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.DestinationPrefix, dest.Id, // this are used to get the host IDs from cache
utils.ReplicatorSv1SetReverseDestination,
dest)
}
return
}
@@ -471,13 +469,12 @@ func (dm *DataManager) SetAccount(acc *Account) (err error) {
if err = dm.dataDB.SetAccountDrv(acc); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1Account, acc, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ACCOUNT_PREFIX, acc.ID, // this are used to get the host IDs from cache
utils.ReplicatorSv1Account,
acc) // the account doesn't have cache
}
return
}
@@ -486,10 +483,12 @@ func (dm *DataManager) RemoveAccount(id string) (err error) {
if err = dm.dataDB.RemoveAccountDrv(id); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveAccount, id, &reply)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ACCOUNT_PREFIX, id, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveAccount,
id)
}
return
}
@@ -552,13 +551,12 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) {
if err = dm.dataDB.SetStatQueueDrv(ssq, sq); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetStatQueue, ssq, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetStatQueue,
sq)
}
return
}
@@ -568,10 +566,12 @@ func (dm *DataManager) RemoveStatQueue(tenant, id string, transactionID string)
if err = dm.dataDB.RemStatQueueDrv(tenant, id); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveStatQueue, &utils.TenantID{Tenant: tenant, ID: id}, &reply)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveStatQueue,
&utils.TenantID{Tenant: tenant, ID: id})
}
return
}
@@ -628,13 +628,12 @@ func (dm *DataManager) SetFilter(fltr *Filter) (err error) {
if err = dm.DataDB().SetFilterDrv(fltr); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaFilters].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetFilter, fltr, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.FilterPrefix, fltr.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetFilter,
fltr)
}
return
@@ -644,10 +643,12 @@ func (dm *DataManager) RemoveFilter(tenant, id, transactionID string) (err error
if err = dm.DataDB().RemoveFilterDrv(tenant, id); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaFilters].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveFilter, &utils.TenantID{Tenant: tenant, ID: id}, &reply)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.FilterPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveFilter,
&utils.TenantID{Tenant: tenant, ID: id})
}
return
}
@@ -692,13 +693,12 @@ func (dm *DataManager) SetThreshold(th *Threshold) (err error) {
if err = dm.DataDB().SetThresholdDrv(th); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetThreshold, th, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ThresholdPrefix, th.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetThreshold,
th)
}
return
}
@@ -707,10 +707,12 @@ func (dm *DataManager) RemoveThreshold(tenant, id, transactionID string) (err er
if err = dm.DataDB().RemoveThresholdDrv(tenant, id); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveThreshold, &utils.TenantID{Tenant: tenant, ID: id}, &reply)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ThresholdPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveThreshold,
&utils.TenantID{Tenant: tenant, ID: id})
}
return
}
@@ -781,11 +783,12 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool)
return err
}
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetThresholdProfile, th, &reply); err != nil {
err = utils.CastRPCErr(err)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles]; itm.Replicate {
if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ThresholdProfilePrefix, th.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetThresholdProfile,
th); err != nil {
return
}
}
@@ -810,10 +813,12 @@ func (dm *DataManager) RemoveThresholdProfile(tenant, id,
return
}
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, utils.ReplicatorSv1RemoveThresholdProfile,
&utils.TenantID{Tenant: tenant, ID: id}, &reply)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ThresholdProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveThresholdProfile,
&utils.TenantID{Tenant: tenant, ID: id})
}
return
}
@@ -884,14 +889,13 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool
return
}
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetStatQueueProfile, sqp, &reply); err != nil {
err = utils.CastRPCErr(err)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles]; itm.Replicate {
if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.StatQueueProfilePrefix, sqp.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetStatQueueProfile,
sqp); err != nil {
return
}
}
return
}
@@ -913,10 +917,12 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id,
return
}
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveStatQueueProfile, &utils.TenantID{Tenant: tenant, ID: id}, &reply)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.StatQueueProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveStatQueueProfile,
&utils.TenantID{Tenant: tenant, ID: id})
}
return
}
@@ -962,13 +968,11 @@ func (dm *DataManager) SetTiming(t *utils.TPTiming) (err error) {
if err = dm.CacheDataFromDB(utils.TimingsPrefix, []string{t.ID}, true); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaTimings].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetTiming, t, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTimings]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.TimingsPrefix, t.ID, // this are used to get the host IDs from cache
utils.ReplicatorSv1SetTiming, t)
}
return
}
@@ -980,9 +984,11 @@ func (dm *DataManager) RemoveTiming(id, transactionID string) (err error) {
Cache.Remove(utils.CacheTimings, id,
cacheCommit(transactionID), transactionID)
if config.CgrConfig().DataDbCfg().Items[utils.MetaTimings].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveTiming, id, &reply)
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.TimingsPrefix, id, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveTiming,
id)
}
return
}
@@ -1029,13 +1035,12 @@ func (dm *DataManager) SetResource(rs *Resource) (err error) {
if err = dm.DataDB().SetResourceDrv(rs); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaResources].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetResource, rs, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResources]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ResourcesPrefix, rs.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetResource,
rs)
}
return
}
@@ -1044,10 +1049,12 @@ func (dm *DataManager) RemoveResource(tenant, id, transactionID string) (err err
if err = dm.DataDB().RemoveResourceDrv(tenant, id); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaResources].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveResource, &utils.TenantID{Tenant: tenant, ID: id}, &reply)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResources]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ResourcesPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveResource,
&utils.TenantID{Tenant: tenant, ID: id}
}
return
}
@@ -1117,11 +1124,12 @@ func (dm *DataManager) SetResourceProfile(rp *ResourceProfile, withIndex bool) (
}
Cache.Clear([]string{utils.CacheEventResources})
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetResourceProfile, rp, &reply); err != nil {
err = utils.CastRPCErr(err)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile]; itm.Replicate {
if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ResourceProfilesPrefix, rp.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetResourceProfile,
rp); err != nil {
return
}
}
@@ -1145,10 +1153,12 @@ func (dm *DataManager) RemoveResourceProfile(tenant, id, transactionID string, w
return
}
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveResourceProfile, &utils.TenantID{Tenant: tenant, ID: id}, &reply)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ResourceProfilesPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveResourceProfile,
&utils.TenantID{Tenant: tenant, ID: id})
}
return
}
@@ -1192,11 +1202,13 @@ func (dm *DataManager) RemoveActionTriggers(id, transactionID string) (err error
}
Cache.Remove(utils.CacheActionTriggers, id,
cacheCommit(transactionID), transactionID)
if config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveActionTriggers, id, &reply)
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ACTION_TRIGGER_PREFIX, id, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveActionTriggers,
id)
}
return
}
@@ -1214,13 +1226,15 @@ func (dm *DataManager) SetActionTriggers(key string, attr ActionTriggers,
if err = dm.CacheDataFromDB(utils.ACTION_TRIGGER_PREFIX, []string{key}, true); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, utils.ReplicatorSv1SetActionTriggers,
&SetActionTriggersArg{Attrs: attr, Key: key}, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ACTION_TRIGGER_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1SetActionTriggers,
&SetActionTriggersArg{
Attrs: attr,
Key: key,
})
}
return
}
@@ -1267,13 +1281,12 @@ func (dm *DataManager) SetSharedGroup(sg *SharedGroup,
[]string{sg.Id}, true); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetSharedGroup, sg, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.SHARED_GROUP_PREFIX, sg.Id, // this are used to get the host IDs from cache
utils.ReplicatorSv1SetSharedGroup,
sg)
}
return
}
@@ -1284,11 +1297,13 @@ func (dm *DataManager) RemoveSharedGroup(id, transactionID string) (err error) {
}
Cache.Remove(utils.CacheSharedGroups, id,
cacheCommit(transactionID), transactionID)
if config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveSharedGroup, id, &reply)
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.SHARED_GROUP_PREFIX, id, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveSharedGroup,
id)
}
return
}
@@ -1340,13 +1355,15 @@ func (dm *DataManager) SetActions(key string, as Actions, transactionID string)
if err = dm.CacheDataFromDB(utils.ACTION_PREFIX, []string{key}, true); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaActions].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetActions, &SetActionsArgs{Key: key, Acs: as}, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActions]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ACTION_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1SetActions,
&SetActionsArgs{
Key: key,
Acs: as,
})
}
return
}
@@ -1357,11 +1374,13 @@ func (dm *DataManager) RemoveActions(key, transactionID string) (err error) {
}
Cache.Remove(utils.CacheActions, key,
cacheCommit(transactionID), transactionID)
if config.CgrConfig().DataDbCfg().Items[utils.MetaActions].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveActions, key, &reply)
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActions]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ACTION_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveActions,
key)
}
return
}
@@ -1427,15 +1446,14 @@ func (dm *DataManager) SetActionPlan(key string, ats *ActionPlan,
if err = dm.dataDB.SetActionPlanDrv(key, ats); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetActionPlan, &SetActionPlanArg{
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.AccountActionPlansPrefix, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1SetActionPlan,
&SetActionPlanArg{
Key: key,
Ats: ats,
}, &reply); err != nil {
err = utils.CastRPCErr(err)
return
Ats: ats})
}
}
return
@@ -1460,10 +1478,12 @@ func (dm *DataManager) RemoveActionPlan(key string, transactionID string) (err e
if err = dm.dataDB.RemoveActionPlanDrv(key); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveActionPlan, key, &reply)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ACTION_PLAN_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveActionPlan,
key)
}
return
}
@@ -1521,16 +1541,15 @@ func (dm *DataManager) SetAccountActionPlans(acntID string, aPlIDs []string, ove
if err = dm.dataDB.SetAccountActionPlansDrv(acntID, aPlIDs); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetAccountActionPlans, &SetAccountActionPlansArg{
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.AccountActionPlansPrefix, acntID, // this are used to get the host IDs from cache
utils.ReplicatorSv1SetAccountActionPlans,
&SetAccountActionPlansArg{
AcntID: acntID,
AplIDs: aPlIDs,
}, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
})
}
return
}
@@ -1559,11 +1578,15 @@ func (dm *DataManager) RemAccountActionPlans(acntID string, apIDs []string) (err
if err = dm.dataDB.RemAccountActionPlansDrv(acntID); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.AccountActionPlansPrefix, acntID, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemAccountActionPlans,
&RemAccountActionPlansArgs{AcntID: acntID, ApIDs: apIDs}, &reply)
&RemAccountActionPlansArgs{
AcntID: acntID,
ApIDs: apIDs,
})
}
return
}
@@ -1608,13 +1631,12 @@ func (dm *DataManager) SetRatingPlan(rp *RatingPlan, transactionID string) (err
if err = dm.CacheDataFromDB(utils.RATING_PLAN_PREFIX, []string{rp.Id}, true); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetRatingPlan, rp, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RATING_PLAN_PREFIX, rp.Id, // this are used to get the host IDs from cache
utils.ReplicatorSv1SetRatingPlan,
rp)
}
return
}
@@ -1625,11 +1647,13 @@ func (dm *DataManager) RemoveRatingPlan(key string, transactionID string) (err e
}
Cache.Remove(utils.CacheRatingPlans, key,
cacheCommit(transactionID), transactionID)
if config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveRatingPlan, key, &reply)
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RATING_PLAN_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveRatingPlan,
key)
}
return
}
@@ -1678,13 +1702,12 @@ func (dm *DataManager) SetRatingProfile(rpf *RatingProfile,
if err = dm.CacheDataFromDB(utils.RATING_PROFILE_PREFIX, []string{rpf.Id}, true); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetRatingProfile, rpf, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RATING_PROFILE_PREFIX, rpf.Id, // this are used to get the host IDs from cache
utils.ReplicatorSv1SetRatingProfile,
rpf)
}
return
}
@@ -1696,11 +1719,13 @@ func (dm *DataManager) RemoveRatingProfile(key string,
}
Cache.Remove(utils.CacheRatingProfiles, key,
cacheCommit(transactionID), transactionID)
if config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveRatingProfile, key, &reply)
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RATING_PROFILE_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveRatingProfile,
key)
}
return
}
@@ -1738,7 +1763,7 @@ func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string,
indexes, commit, transactionID); err != nil {
return
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes].Replicate {
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; itm.Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetFilterIndexes,
@@ -2013,13 +2038,12 @@ func (dm *DataManager) SetAttributeProfile(ap *AttributeProfile, withIndex bool)
}
}
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles].Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetAttributeProfile, ap, &reply); err != nil {
err = utils.CastRPCErr(err)
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.AttributeProfilePrefix, ap.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetAttributeProfile,
ap)
}
return
}
@@ -2043,10 +2067,16 @@ func (dm *DataManager) RemoveAttributeProfile(tenant, id string, transactionID s
}
}
}
if config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles].Replicate {
var reply string
dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveAttributeProfile, &utils.TenantID{Tenant: tenant, ID: id}, &reply)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.AttributeProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveAttributeProfile,
&utils.TenantID{
Tenant: tenant,
ID: id,
},
)
}
return
}
@@ -2398,3 +2428,56 @@ func (dm *DataManager) Reconnect(marshaller string, newcfg *config.DataDbCfg) (e
dm.dataDB = d
return
}
func replicate(connMgr *ConnManager, connIDs []string, filtered bool, objType, objID, method string, args interface{}) (err error) {
// the reply is string for Set/Remove APIs
// ignored in favor of the error
var reply string
if !filtered {
// is not partial so send to all defined connections
return utils.CastRPCErr(connMgr.Call(connIDs, nil, method, args, &reply))
}
// is partial so get all the replicationHosts from cache based on object Type and ID
// alp_cgrates.org:ATTR1
rplcHostIDsIfaces := Cache.GetGroupItems(utils.CacheReplicationHosts, objType+objID)
rplcHostIDs := make(utils.StringSet)
for _, hostID := range rplcHostIDsIfaces {
rplcHostIDs.Add(hostID.(string))
}
// using the replication hosts call the method
return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs,
method, args, &reply))
}
func UpdateReplicationFilters(objType, objID, connID string) {
if connID == utils.EmptyString {
return
}
Cache.Set(utils.CacheReplicationHosts, objType+objID+utils.CONCATENATED_KEY_SEP+connID, connID, []string{objType + objID},
true, utils.NonTransactional)
}
// replicateMultipleIDs will do the same thing as replicate but uses multiple objectIDs
// used when setting the LoadIDs
func replicateMultipleIDs(connMgr *ConnManager, connIDs []string, filtered bool, objType string, objIDs []string, method string, args interface{}) (err error) {
// the reply is string for Set/Remove APIs
// ignored in favor of the error
var reply string
if !filtered {
// is not partial so send to all defined connections
return utils.CastRPCErr(connMgr.Call(connIDs, nil, method, args, &reply))
}
// is partial so get all the replicationHosts from cache based on object Type and ID
// combine all hosts in a single set so if we receive a get with one ID in list
// send all list to that hos
rplcHostIDs := make(utils.StringSet)
for _, objID := range objIDs {
rplcHostIDsIfaces := Cache.GetGroupItems(utils.CacheReplicationHosts, objType+objID)
for _, hostID := range rplcHostIDsIfaces {
rplcHostIDs.Add(hostID.(string))
}
}
// using the replication hosts call the method
return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs,
method, args, &reply))
}

View File

@@ -621,5 +621,6 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
Items: 0,
Groups: 0,
},
utils.CacheReplicationHosts: {},
}
}

View File

@@ -1517,6 +1517,7 @@ const (
CacheRPCConnections = "*rpc_connections"
CacheCDRIDs = "*cdr_ids"
CacheRatingProfilesTmp = "*tmp_rating_profiles"
CacheReplicationHosts = "*replication_hosts"
)
// Prefix for indexing
@@ -1616,15 +1617,16 @@ const (
// DataDbCfg
const (
DataDbTypeCfg = "db_type"
DataDbHostCfg = "db_host"
DataDbPortCfg = "db_port"
DataDbNameCfg = "db_name"
DataDbUserCfg = "db_user"
DataDbPassCfg = "db_password"
DataDbSentinelNameCfg = "redis_sentinel"
RmtConnsCfg = "remote_conns"
RplConnsCfg = "replication_conns"
DataDbTypeCfg = "db_type"
DataDbHostCfg = "db_host"
DataDbPortCfg = "db_port"
DataDbNameCfg = "db_name"
DataDbUserCfg = "db_user"
DataDbPassCfg = "db_password"
DataDbSentinelNameCfg = "redis_sentinel"
RmtConnsCfg = "remote_conns"
RplConnsCfg = "replication_conns"
ReplicationFilteredCfg = "replication_filtered"
)
// ItemOpt