From 07111d1e4d95c1979212159e19e5e092d327d290 Mon Sep 17 00:00:00 2001 From: andronache Date: Tue, 13 Jul 2021 09:49:53 +0300 Subject: [PATCH] Replacing replicate in datamanager --- apier/v1/precache_it_test.go | 1 + config/datadbcfg.go | 28 +- config/datadbcfg_test.go | 21 +- config/libconfig_json.go | 2 + config/rpcconn.go | 7 + engine/connmanager.go | 105 +++++++- engine/datamanager.go | 489 ++++++++++++++++++++--------------- engine/libtest.go | 1 + utils/consts.go | 20 +- 9 files changed, 440 insertions(+), 234 deletions(-) diff --git a/apier/v1/precache_it_test.go b/apier/v1/precache_it_test.go index 7a14b7e8f..13769ed3e 100644 --- a/apier/v1/precache_it_test.go +++ b/apier/v1/precache_it_test.go @@ -310,6 +310,7 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) { Items: 0, Groups: 0, }, + utils.CacheReplicationHosts: {}, } if err := precacheRPC.Call(utils.CacheSv1GetCacheStats, args, &reply); err != nil { t.Error(err.Error()) diff --git a/config/datadbcfg.go b/config/datadbcfg.go index 632cb0947..aef92a5cc 100644 --- a/config/datadbcfg.go +++ b/config/datadbcfg.go @@ -39,6 +39,7 @@ type DataDbCfg struct { QueryTimeout time.Duration RmtConns []string // Remote DataDB connIDs RplConns []string // Replication connIDs + RplFiltered bool Items map[string]*ItemOpt } @@ -77,6 +78,9 @@ func (dbcfg *DataDbCfg) loadFromJsonCfg(jsnDbCfg *DbJsonCfg) (err error) { return err } } + if jsnDbCfg.Replication_filtered != nil { + dbcfg.RplFiltered = *jsnDbCfg.Replication_filtered + } if jsnDbCfg.Remote_conns != nil { dbcfg.RmtConns = make([]string, len(*jsnDbCfg.Remote_conns)) for idx, rmtConn := range *jsnDbCfg.Remote_conns { @@ -125,6 +129,7 @@ func (dbcfg *DataDbCfg) Clone() *DataDbCfg { DataDbSentinelName: dbcfg.DataDbSentinelName, QueryTimeout: dbcfg.QueryTimeout, Items: dbcfg.Items, + RplFiltered: dbcfg.RplFiltered, } } @@ -140,17 +145,18 @@ func (dbcfg *DataDbCfg) AsMapInterface() map[string]interface{} { dbPort, _ := strconv.Atoi(dbcfg.DataDbPort) return map[string]interface{}{ - utils.DataDbTypeCfg: utils.Meta + dbcfg.DataDbType, - utils.DataDbHostCfg: dbcfg.DataDbHost, - utils.DataDbPortCfg: dbPort, - utils.DataDbNameCfg: dbcfg.DataDbName, - utils.DataDbUserCfg: dbcfg.DataDbUser, - utils.DataDbPassCfg: dbcfg.DataDbPass, - utils.DataDbSentinelNameCfg: dbcfg.DataDbSentinelName, - utils.QueryTimeoutCfg: queryTimeout, - utils.RmtConnsCfg: dbcfg.RmtConns, - utils.RplConnsCfg: dbcfg.RplConns, - utils.ItemsCfg: items, + utils.DataDbTypeCfg: utils.Meta + dbcfg.DataDbType, + utils.DataDbHostCfg: dbcfg.DataDbHost, + utils.DataDbPortCfg: dbPort, + utils.DataDbNameCfg: dbcfg.DataDbName, + utils.DataDbUserCfg: dbcfg.DataDbUser, + utils.DataDbPassCfg: dbcfg.DataDbPass, + utils.DataDbSentinelNameCfg: dbcfg.DataDbSentinelName, + utils.QueryTimeoutCfg: queryTimeout, + utils.RmtConnsCfg: dbcfg.RmtConns, + utils.RplConnsCfg: dbcfg.RplConns, + utils.ItemsCfg: items, + utils.ReplicationFilteredCfg: dbcfg.RplFiltered, } } diff --git a/config/datadbcfg_test.go b/config/datadbcfg_test.go index 2c2084c32..9c4450acd 100644 --- a/config/datadbcfg_test.go +++ b/config/datadbcfg_test.go @@ -418,16 +418,17 @@ func TestDataDbCfgAsMapInterface(t *testing.T) { }, }` eMap := map[string]interface{}{ - "db_type": "*redis", - "db_host": "127.0.0.1", - "db_port": 6379, - "db_name": "10", - "db_user": "cgrates", - "db_password": "", - "redis_sentinel": "", - "query_timeout": "10s", - "remote_conns": []string{}, - "replication_conns": []string{}, + "db_type": "*redis", + "db_host": "127.0.0.1", + "db_port": 6379, + "db_name": "10", + "db_user": "cgrates", + "db_password": "", + "redis_sentinel": "", + "query_timeout": "10s", + "remote_conns": []string{}, + "replication_conns": []string{}, + "replication_filtered": false, "items": map[string]interface{}{ "*accounts": map[string]interface{}{"remote": true, "replicate": false, "limit": -1, "ttl": "", "static_ttl": false}, "*reverse_destinations": map[string]interface{}{"remote": false, "replicate": false, "limit": 7, "ttl": "", "static_ttl": true}, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 3555c76c7..d07f2a979 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -97,6 +97,7 @@ type DbJsonCfg struct { Remote_conns *[]string Replication_conns *[]string Items *map[string]*ItemOptJson + Replication_filtered *bool } type ItemOptJson struct { @@ -249,6 +250,7 @@ type RPCConnsJson struct { // Represents one connection instance towards a rater/cdrs server type RemoteHostJson struct { + Id *string Address *string Transport *string Synchronous *bool diff --git a/config/rpcconn.go b/config/rpcconn.go index f397ea1d1..c5f80df68 100644 --- a/config/rpcconn.go +++ b/config/rpcconn.go @@ -77,6 +77,7 @@ func (rC *RPCConn) AsMapInterface() map[string]interface{} { // One connection to Rater type RemoteHost struct { + ID string Address string Transport string Synchronous bool @@ -87,6 +88,12 @@ func (self *RemoteHost) loadFromJsonCfg(jsnCfg *RemoteHostJson) error { if jsnCfg == nil { return nil } + if jsnCfg.Id != nil { + self.ID = *jsnCfg.Id + // ignore defaults if we have ID + self.Address = utils.EmptyString + self.Transport = utils.EmptyString + } if jsnCfg.Address != nil { self.Address = *jsnCfg.Address } diff --git a/engine/connmanager.go b/engine/connmanager.go index d847f9997..4e554df72 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -23,12 +23,17 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" "github.com/cgrates/rpcclient" ) // NewConnManager returns the Connection Manager func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan rpcclient.ClientConnector) (cM *ConnManager) { - cM = &ConnManager{cfg: cfg, rpcInternal: rpcInternal} + cM = &ConnManager{ + cfg: cfg, + rpcInternal: rpcInternal, + connCache: ltcache.NewCache(-1, 0, true, nil), + } SetConnManager(cM) return } @@ -37,6 +42,7 @@ func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan rpcclient type ConnManager struct { cfg *config.CGRConfig rpcInternal map[string]chan rpcclient.ClientConnector + connCache *ltcache.Cache } // getConn is used to retrieve a connection from cache @@ -142,3 +148,100 @@ func (cM *ConnManager) Call(connIDs []string, biRPCClient rpcclient.ClientConnec } return } + +func (cM *ConnManager) Reload() { + Cache.Clear([]string{utils.CacheRPCConnections}) + Cache.Clear([]string{utils.CacheReplicationHosts}) + cM.connCache.Clear() +} + +func (cM *ConnManager) CallWithConnIDs(connIDs []string, subsHostIDs utils.StringSet, method string, arg, reply interface{}) (err error) { + if len(connIDs) == 0 { + return utils.NewErrMandatoryIeMissing("connIDs") + } + // no connection for this id exit here + if subsHostIDs.Size() == 0 { + return + } + var conn rpcclient.ClientConnector + for _, connID := range connIDs { + // recreate the config with only conns that are needed + connCfg := cM.cfg.RPCConns()[connID] + newCfg := &config.RPCConn{ + Strategy: connCfg.Strategy, + PoolSize: connCfg.PoolSize, + // alloc for all connection in order to not increase the size later + Conns: make([]*config.RemoteHost, 0, len(connCfg.Conns)), + } + for _, conn := range connCfg.Conns { + if conn.ID != utils.EmptyString && + subsHostIDs.Has(conn.ID) { + newCfg.Conns = append(newCfg.Conns, conn) // the slice will never grow + } + } + if len(newCfg.Conns) == 0 { + // skip this pool if no connection matches + continue + } + + if conn, err = cM.getConnWithConfig(connID, newCfg, nil, nil, false); err != nil { + continue + } + if err = conn.Call(method, arg, reply); !rpcclient.IsNetworkError(err) { + return + } + } + return +} + +func (cM *ConnManager) getConnWithConfig(connID string, connCfg *config.RPCConn, + biRPCClient rpcclient.ClientConnector, intChan chan rpcclient.ClientConnector, + isInternalRPC bool) (conn rpcclient.ClientConnector, err error) { + switch { + case biRPCClient != nil && isInternalRPC: + var rply string + sSIntConn := <-intChan + intChan <- sSIntConn + conn = utils.NewBiRPCInternalClient(sSIntConn.(utils.BiRPCServer)) + conn.(*utils.BiRPCInternalClient).SetClientConn(biRPCClient) + if err = conn.Call(utils.SessionSv1RegisterInternalBiJSONConn, + utils.EmptyString, &rply); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not register biRPCClient, error: <%s>", + utils.SessionS, err.Error())) + return + } + case connCfg.Strategy == rpcclient.PoolParallel: + rpcConnCfg := connCfg.Conns[0] // for parrallel we need only the first connection + codec := rpcclient.GOBrpc + switch { + case rpcConnCfg.Address == rpcclient.InternalRPC: + codec = rpcclient.InternalRPC + case rpcConnCfg.Transport == utils.EmptyString: + intChan = nil + case rpcConnCfg.Transport == rpcclient.GOBrpc, + rpcConnCfg.Transport == rpcclient.JSONrpc: + codec = rpcConnCfg.Transport + intChan = nil + default: + err = fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) + return + } + if conn, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS, + cM.cfg.TlsCfg().ClientKey, cM.cfg.TlsCfg().ClientCerificate, + cM.cfg.TlsCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, + cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, + cM.cfg.GeneralCfg().ReplyTimeout, codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false); err != nil { + return + } + default: + if conn, err = NewRPCPool(connCfg.Strategy, + cM.cfg.TlsCfg().ClientKey, + cM.cfg.TlsCfg().ClientCerificate, cM.cfg.TlsCfg().CaCertificate, + cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, + cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, + connCfg.Conns, intChan, false); err != nil { + return + } + } + return +} diff --git a/engine/datamanager.go b/engine/datamanager.go index 9a0b99fc6..3b0ba89c7 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -388,13 +388,12 @@ func (dm *DataManager) SetDestination(dest *Destination, transactionID string) ( if err = dm.dataDB.SetDestinationDrv(dest, transactionID); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaDestinations].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetDestination, dest, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDestinations]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.DestinationPrefix, destID, // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveDestination, + destID) } return } @@ -416,12 +415,11 @@ func (dm *DataManager) SetReverseDestination(dest *Destination, transactionID st return } if config.CgrConfig().DataDbCfg().Items[utils.MetaReverseDestinations].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetReverseDestination, dest, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.DestinationPrefix, dest.Id, // this are used to get the host IDs from cache + utils.ReplicatorSv1SetReverseDestination, + dest) } return } @@ -471,13 +469,12 @@ func (dm *DataManager) SetAccount(acc *Account) (err error) { if err = dm.dataDB.SetAccountDrv(acc); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1Account, acc, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ACCOUNT_PREFIX, acc.ID, // this are used to get the host IDs from cache + utils.ReplicatorSv1Account, + acc) // the account doesn't have cache } return } @@ -486,10 +483,12 @@ func (dm *DataManager) RemoveAccount(id string) (err error) { if err = dm.dataDB.RemoveAccountDrv(id); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveAccount, id, &reply) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ACCOUNT_PREFIX, id, // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveAccount, + id) } return } @@ -552,13 +551,12 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) { if err = dm.dataDB.SetStatQueueDrv(ssq, sq); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetStatQueue, ssq, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache + utils.ReplicatorSv1SetStatQueue, + sq) } return } @@ -568,10 +566,12 @@ func (dm *DataManager) RemoveStatQueue(tenant, id string, transactionID string) if err = dm.dataDB.RemStatQueueDrv(tenant, id); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveStatQueue, &utils.TenantID{Tenant: tenant, ID: id}, &reply) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveStatQueue, + &utils.TenantID{Tenant: tenant, ID: id}) } return } @@ -628,13 +628,12 @@ func (dm *DataManager) SetFilter(fltr *Filter) (err error) { if err = dm.DataDB().SetFilterDrv(fltr); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaFilters].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetFilter, fltr, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.FilterPrefix, fltr.TenantID(), // this are used to get the host IDs from cache + utils.ReplicatorSv1SetFilter, + fltr) } return @@ -644,10 +643,12 @@ func (dm *DataManager) RemoveFilter(tenant, id, transactionID string) (err error if err = dm.DataDB().RemoveFilterDrv(tenant, id); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaFilters].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveFilter, &utils.TenantID{Tenant: tenant, ID: id}, &reply) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.FilterPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveFilter, + &utils.TenantID{Tenant: tenant, ID: id}) } return } @@ -692,13 +693,12 @@ func (dm *DataManager) SetThreshold(th *Threshold) (err error) { if err = dm.DataDB().SetThresholdDrv(th); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetThreshold, th, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ThresholdPrefix, th.TenantID(), // this are used to get the host IDs from cache + utils.ReplicatorSv1SetThreshold, + th) } return } @@ -707,10 +707,12 @@ func (dm *DataManager) RemoveThreshold(tenant, id, transactionID string) (err er if err = dm.DataDB().RemoveThresholdDrv(tenant, id); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveThreshold, &utils.TenantID{Tenant: tenant, ID: id}, &reply) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ThresholdPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveThreshold, + &utils.TenantID{Tenant: tenant, ID: id}) } return } @@ -781,11 +783,12 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) return err } } - if config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetThresholdProfile, th, &reply); err != nil { - err = utils.CastRPCErr(err) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles]; itm.Replicate { + if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ThresholdProfilePrefix, th.TenantID(), // this are used to get the host IDs from cache + utils.ReplicatorSv1SetThresholdProfile, + th); err != nil { return } } @@ -810,10 +813,12 @@ func (dm *DataManager) RemoveThresholdProfile(tenant, id, return } } - if config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, utils.ReplicatorSv1RemoveThresholdProfile, - &utils.TenantID{Tenant: tenant, ID: id}, &reply) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ThresholdProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveThresholdProfile, + &utils.TenantID{Tenant: tenant, ID: id}) } return } @@ -884,14 +889,13 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool return } } - if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetStatQueueProfile, sqp, &reply); err != nil { - err = utils.CastRPCErr(err) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles]; itm.Replicate { + if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.StatQueueProfilePrefix, sqp.TenantID(), // this are used to get the host IDs from cache + utils.ReplicatorSv1SetStatQueueProfile, + sqp); err != nil { return - } - } return } @@ -913,10 +917,12 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id, return } } - if config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveStatQueueProfile, &utils.TenantID{Tenant: tenant, ID: id}, &reply) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.StatQueueProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveStatQueueProfile, + &utils.TenantID{Tenant: tenant, ID: id}) } return } @@ -962,13 +968,11 @@ func (dm *DataManager) SetTiming(t *utils.TPTiming) (err error) { if err = dm.CacheDataFromDB(utils.TimingsPrefix, []string{t.ID}, true); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaTimings].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetTiming, t, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTimings]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.TimingsPrefix, t.ID, // this are used to get the host IDs from cache + utils.ReplicatorSv1SetTiming, t) } return } @@ -980,9 +984,11 @@ func (dm *DataManager) RemoveTiming(id, transactionID string) (err error) { Cache.Remove(utils.CacheTimings, id, cacheCommit(transactionID), transactionID) if config.CgrConfig().DataDbCfg().Items[utils.MetaTimings].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveTiming, id, &reply) + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.TimingsPrefix, id, // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveTiming, + id) } return } @@ -1029,13 +1035,12 @@ func (dm *DataManager) SetResource(rs *Resource) (err error) { if err = dm.DataDB().SetResourceDrv(rs); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaResources].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetResource, rs, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResources]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ResourcesPrefix, rs.TenantID(), // this are used to get the host IDs from cache + utils.ReplicatorSv1SetResource, + rs) } return } @@ -1044,10 +1049,12 @@ func (dm *DataManager) RemoveResource(tenant, id, transactionID string) (err err if err = dm.DataDB().RemoveResourceDrv(tenant, id); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaResources].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveResource, &utils.TenantID{Tenant: tenant, ID: id}, &reply) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResources]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ResourcesPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveResource, + &utils.TenantID{Tenant: tenant, ID: id} } return } @@ -1117,11 +1124,12 @@ func (dm *DataManager) SetResourceProfile(rp *ResourceProfile, withIndex bool) ( } Cache.Clear([]string{utils.CacheEventResources}) } - if config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetResourceProfile, rp, &reply); err != nil { - err = utils.CastRPCErr(err) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile]; itm.Replicate { + if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ResourceProfilesPrefix, rp.TenantID(), // this are used to get the host IDs from cache + utils.ReplicatorSv1SetResourceProfile, + rp); err != nil { return } } @@ -1145,10 +1153,12 @@ func (dm *DataManager) RemoveResourceProfile(tenant, id, transactionID string, w return } } - if config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveResourceProfile, &utils.TenantID{Tenant: tenant, ID: id}, &reply) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ResourceProfilesPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveResourceProfile, + &utils.TenantID{Tenant: tenant, ID: id}) } return } @@ -1192,11 +1202,13 @@ func (dm *DataManager) RemoveActionTriggers(id, transactionID string) (err error } Cache.Remove(utils.CacheActionTriggers, id, cacheCommit(transactionID), transactionID) - if config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveActionTriggers, id, &reply) - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ACTION_TRIGGER_PREFIX, id, // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveActionTriggers, + id) + } return } @@ -1214,13 +1226,15 @@ func (dm *DataManager) SetActionTriggers(key string, attr ActionTriggers, if err = dm.CacheDataFromDB(utils.ACTION_TRIGGER_PREFIX, []string{key}, true); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, utils.ReplicatorSv1SetActionTriggers, - &SetActionTriggersArg{Attrs: attr, Key: key}, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ACTION_TRIGGER_PREFIX, key, // this are used to get the host IDs from cache + utils.ReplicatorSv1SetActionTriggers, + &SetActionTriggersArg{ + Attrs: attr, + Key: key, + }) } return } @@ -1267,13 +1281,12 @@ func (dm *DataManager) SetSharedGroup(sg *SharedGroup, []string{sg.Id}, true); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetSharedGroup, sg, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.SHARED_GROUP_PREFIX, sg.Id, // this are used to get the host IDs from cache + utils.ReplicatorSv1SetSharedGroup, + sg) } return } @@ -1284,11 +1297,13 @@ func (dm *DataManager) RemoveSharedGroup(id, transactionID string) (err error) { } Cache.Remove(utils.CacheSharedGroups, id, cacheCommit(transactionID), transactionID) - if config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveSharedGroup, id, &reply) - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.SHARED_GROUP_PREFIX, id, // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveSharedGroup, + id) + } return } @@ -1340,13 +1355,15 @@ func (dm *DataManager) SetActions(key string, as Actions, transactionID string) if err = dm.CacheDataFromDB(utils.ACTION_PREFIX, []string{key}, true); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaActions].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetActions, &SetActionsArgs{Key: key, Acs: as}, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActions]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ACTION_PREFIX, key, // this are used to get the host IDs from cache + utils.ReplicatorSv1SetActions, + &SetActionsArgs{ + Key: key, + Acs: as, + }) } return } @@ -1357,11 +1374,13 @@ func (dm *DataManager) RemoveActions(key, transactionID string) (err error) { } Cache.Remove(utils.CacheActions, key, cacheCommit(transactionID), transactionID) - if config.CgrConfig().DataDbCfg().Items[utils.MetaActions].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveActions, key, &reply) - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActions]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ACTION_PREFIX, key, // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveActions, + key) + } return } @@ -1427,15 +1446,14 @@ func (dm *DataManager) SetActionPlan(key string, ats *ActionPlan, if err = dm.dataDB.SetActionPlanDrv(key, ats); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetActionPlan, &SetActionPlanArg{ + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.AccountActionPlansPrefix, key, // this are used to get the host IDs from cache + utils.ReplicatorSv1SetActionPlan, + &SetActionPlanArg{ Key: key, - Ats: ats, - }, &reply); err != nil { - err = utils.CastRPCErr(err) - return + Ats: ats}) } } return @@ -1460,10 +1478,12 @@ func (dm *DataManager) RemoveActionPlan(key string, transactionID string) (err e if err = dm.dataDB.RemoveActionPlanDrv(key); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveActionPlan, key, &reply) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.ACTION_PLAN_PREFIX, key, // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveActionPlan, + key) } return } @@ -1521,16 +1541,15 @@ func (dm *DataManager) SetAccountActionPlans(acntID string, aPlIDs []string, ove if err = dm.dataDB.SetAccountActionPlansDrv(acntID, aPlIDs); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetAccountActionPlans, &SetAccountActionPlansArg{ + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.AccountActionPlansPrefix, acntID, // this are used to get the host IDs from cache + utils.ReplicatorSv1SetAccountActionPlans, + &SetAccountActionPlansArg{ AcntID: acntID, AplIDs: aPlIDs, - }, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + }) } return } @@ -1559,11 +1578,15 @@ func (dm *DataManager) RemAccountActionPlans(acntID string, apIDs []string) (err if err = dm.dataDB.RemAccountActionPlansDrv(acntID); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.AccountActionPlansPrefix, acntID, // this are used to get the host IDs from cache utils.ReplicatorSv1RemAccountActionPlans, - &RemAccountActionPlansArgs{AcntID: acntID, ApIDs: apIDs}, &reply) + &RemAccountActionPlansArgs{ + AcntID: acntID, + ApIDs: apIDs, + }) } return } @@ -1608,13 +1631,12 @@ func (dm *DataManager) SetRatingPlan(rp *RatingPlan, transactionID string) (err if err = dm.CacheDataFromDB(utils.RATING_PLAN_PREFIX, []string{rp.Id}, true); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetRatingPlan, rp, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.RATING_PLAN_PREFIX, rp.Id, // this are used to get the host IDs from cache + utils.ReplicatorSv1SetRatingPlan, + rp) } return } @@ -1625,11 +1647,13 @@ func (dm *DataManager) RemoveRatingPlan(key string, transactionID string) (err e } Cache.Remove(utils.CacheRatingPlans, key, cacheCommit(transactionID), transactionID) - if config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveRatingPlan, key, &reply) - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.RATING_PLAN_PREFIX, key, // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveRatingPlan, + key) + } return } @@ -1678,13 +1702,12 @@ func (dm *DataManager) SetRatingProfile(rpf *RatingProfile, if err = dm.CacheDataFromDB(utils.RATING_PROFILE_PREFIX, []string{rpf.Id}, true); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetRatingProfile, rpf, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.RATING_PROFILE_PREFIX, rpf.Id, // this are used to get the host IDs from cache + utils.ReplicatorSv1SetRatingProfile, + rpf) } return } @@ -1696,11 +1719,13 @@ func (dm *DataManager) RemoveRatingProfile(key string, } Cache.Remove(utils.CacheRatingProfiles, key, cacheCommit(transactionID), transactionID) - if config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveRatingProfile, key, &reply) - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.RATING_PROFILE_PREFIX, key, // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveRatingProfile, + key) + } return } @@ -1738,7 +1763,7 @@ func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string, indexes, commit, transactionID); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes].Replicate { + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; itm.Replicate { var reply string if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, utils.ReplicatorSv1SetFilterIndexes, @@ -2013,13 +2038,12 @@ func (dm *DataManager) SetAttributeProfile(ap *AttributeProfile, withIndex bool) } } } - if config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles].Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetAttributeProfile, ap, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.AttributeProfilePrefix, ap.TenantID(), // this are used to get the host IDs from cache + utils.ReplicatorSv1SetAttributeProfile, + ap) } return } @@ -2043,10 +2067,16 @@ func (dm *DataManager) RemoveAttributeProfile(tenant, id string, transactionID s } } } - if config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles].Replicate { - var reply string - dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1RemoveAttributeProfile, &utils.TenantID{Tenant: tenant, ID: id}, &reply) + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.AttributeProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveAttributeProfile, + &utils.TenantID{ + Tenant: tenant, + ID: id, + }, + ) } return } @@ -2398,3 +2428,56 @@ func (dm *DataManager) Reconnect(marshaller string, newcfg *config.DataDbCfg) (e dm.dataDB = d return } + +func replicate(connMgr *ConnManager, connIDs []string, filtered bool, objType, objID, method string, args interface{}) (err error) { + // the reply is string for Set/Remove APIs + // ignored in favor of the error + var reply string + if !filtered { + // is not partial so send to all defined connections + return utils.CastRPCErr(connMgr.Call(connIDs, nil, method, args, &reply)) + } + // is partial so get all the replicationHosts from cache based on object Type and ID + // alp_cgrates.org:ATTR1 + rplcHostIDsIfaces := Cache.GetGroupItems(utils.CacheReplicationHosts, objType+objID) + rplcHostIDs := make(utils.StringSet) + for _, hostID := range rplcHostIDsIfaces { + rplcHostIDs.Add(hostID.(string)) + } + // using the replication hosts call the method + return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs, + method, args, &reply)) +} + +func UpdateReplicationFilters(objType, objID, connID string) { + if connID == utils.EmptyString { + return + } + Cache.Set(utils.CacheReplicationHosts, objType+objID+utils.CONCATENATED_KEY_SEP+connID, connID, []string{objType + objID}, + true, utils.NonTransactional) +} + +// replicateMultipleIDs will do the same thing as replicate but uses multiple objectIDs +// used when setting the LoadIDs +func replicateMultipleIDs(connMgr *ConnManager, connIDs []string, filtered bool, objType string, objIDs []string, method string, args interface{}) (err error) { + // the reply is string for Set/Remove APIs + // ignored in favor of the error + var reply string + if !filtered { + // is not partial so send to all defined connections + return utils.CastRPCErr(connMgr.Call(connIDs, nil, method, args, &reply)) + } + // is partial so get all the replicationHosts from cache based on object Type and ID + // combine all hosts in a single set so if we receive a get with one ID in list + // send all list to that hos + rplcHostIDs := make(utils.StringSet) + for _, objID := range objIDs { + rplcHostIDsIfaces := Cache.GetGroupItems(utils.CacheReplicationHosts, objType+objID) + for _, hostID := range rplcHostIDsIfaces { + rplcHostIDs.Add(hostID.(string)) + } + } + // using the replication hosts call the method + return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs, + method, args, &reply)) +} diff --git a/engine/libtest.go b/engine/libtest.go index 699365636..59d54bde5 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -621,5 +621,6 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats { Items: 0, Groups: 0, }, + utils.CacheReplicationHosts: {}, } } diff --git a/utils/consts.go b/utils/consts.go index 7adf5f5ec..9db63c2a8 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1517,6 +1517,7 @@ const ( CacheRPCConnections = "*rpc_connections" CacheCDRIDs = "*cdr_ids" CacheRatingProfilesTmp = "*tmp_rating_profiles" + CacheReplicationHosts = "*replication_hosts" ) // Prefix for indexing @@ -1616,15 +1617,16 @@ const ( // DataDbCfg const ( - DataDbTypeCfg = "db_type" - DataDbHostCfg = "db_host" - DataDbPortCfg = "db_port" - DataDbNameCfg = "db_name" - DataDbUserCfg = "db_user" - DataDbPassCfg = "db_password" - DataDbSentinelNameCfg = "redis_sentinel" - RmtConnsCfg = "remote_conns" - RplConnsCfg = "replication_conns" + DataDbTypeCfg = "db_type" + DataDbHostCfg = "db_host" + DataDbPortCfg = "db_port" + DataDbNameCfg = "db_name" + DataDbUserCfg = "db_user" + DataDbPassCfg = "db_password" + DataDbSentinelNameCfg = "redis_sentinel" + RmtConnsCfg = "remote_conns" + RplConnsCfg = "replication_conns" + ReplicationFilteredCfg = "replication_filtered" ) // ItemOpt