From 5cc9f94cc02e49e77e6829dfab3d540cdafdbe68 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 13 Nov 2019 18:20:35 +0200 Subject: [PATCH] Add ReplicatorSv1 for remote/replication functionality --- apier/v1/dm_remote_it_test.go | 6 +- apier/v1/replicator.go | 52 +++ cmd/cgr-engine/cgr-engine.go | 1 - cmd/cgr-loader/cgr-loader.go | 26 +- config/config_json_test.go | 2 +- config/config_test.go | 4 +- config/datadbcfg.go | 16 +- config/datadbcfg_test.go | 72 ++-- config/libconfig_json.go | 2 +- console/ping.go | 2 + .../cgrates.json | 12 +- .../cgrates.json | 8 +- .../samples/replication_mongo/cgrates.json | 49 +++ engine/datamanager.go | 311 +----------------- engine/libtest.go | 25 +- engine/tpreader.go | 26 +- services/apierv1.go | 1 + services/datadb.go | 26 +- services/rals.go | 2 +- utils/consts.go | 8 + 20 files changed, 217 insertions(+), 434 deletions(-) create mode 100644 apier/v1/replicator.go rename data/conf/samples/{remote_mongo => internal_mongo}/cgrates.json (90%) rename data/conf/samples/{remote_redis => internal_redis}/cgrates.json (95%) create mode 100644 data/conf/samples/replication_mongo/cgrates.json diff --git a/apier/v1/dm_remote_it_test.go b/apier/v1/dm_remote_it_test.go index a0425ef63..c2b03abdb 100644 --- a/apier/v1/dm_remote_it_test.go +++ b/apier/v1/dm_remote_it_test.go @@ -63,12 +63,12 @@ var sTestsInternalRemoteIT = []func(t *testing.T){ testInternalRemoteITGetAction, testInternalRemoteITGetActionPlan, testInternalRemoteITGetAccountActionPlan, - testInternalReplicationSetThreshold, + //testInternalReplicationSetThreshold, testInternalRemoteITKillEngine, } func TestInternalRemoteITRedis(t *testing.T) { - internalCfgDirPath = "remote_redis" + internalCfgDirPath = "internal_redis" cfg, _ := config.NewDefaultCGRConfig() dataDB, err := engine.NewRedisStorage( fmt.Sprintf("%s:%s", cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort), @@ -84,7 +84,7 @@ func TestInternalRemoteITRedis(t *testing.T) { } func TestInternalRemoteITMongo(t *testing.T) { - internalCfgDirPath = "remote_mongo" + internalCfgDirPath = "internal_mongo" mgoITCfg, err := config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "tutmongo")) if err != nil { t.Fatal(err) diff --git a/apier/v1/replicator.go b/apier/v1/replicator.go new file mode 100644 index 000000000..4994ff399 --- /dev/null +++ b/apier/v1/replicator.go @@ -0,0 +1,52 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func NewReplicatorSv1(dm *engine.DataManager) *ReplicatorSv1 { + return &ReplicatorSv1{dm: dm} +} + +// Exports RPC +type ReplicatorSv1 struct { + dm *engine.DataManager +} + +// Call implements rpcclient.RpcClientConnection interface for internal RPC +func (rplSv1 *ReplicatorSv1) Call(serviceMethod string, args interface{}, reply interface{}) error { + return utils.APIerRPCCall(rplSv1, serviceMethod, args, reply) +} + +// SetThresholdProfile alters/creates a ThresholdProfile +func (rplSv1 *ReplicatorSv1) SetThresholdProfile(th *engine.ThresholdProfile, reply *string) error { + if err := rplSv1.dm.DataDB().SetThresholdProfileDrv(th); err != nil { + return err + } + *reply = utils.OK + return nil +} + +func (rplSv1 *ReplicatorSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error { + *reply = utils.Pong + return nil +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 1d03b16c8..b9e763001 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -526,7 +526,6 @@ func main() { rals.GetResponder().GetIntenternalChan(), reS.GetIntenternalChan(), tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(), attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), dspS.GetIntenternalChan(), exitChan) - ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalCacheSChan, dspS.GetIntenternalChan(), exitChan) anz := services.NewAnalyzerService(cfg, server, exitChan) srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals, diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 77b936de2..6d026f296 100755 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -239,7 +239,8 @@ func main() { if err != nil { log.Fatalf("Coud not open dataDB connection: %s", err.Error()) } - var rmtDBConns, rplDBConns []*engine.DataManager + var rmtDBConns []*engine.DataManager + var rplConns *rpcclient.RpcClientPool if len(ldrCfg.DataDbCfg().RmtDataDBCfgs) != 0 { rmtDBConns = make([]*engine.DataManager, len(ldrCfg.DataDbCfg().RmtDataDBCfgs)) for i, dbCfg := range ldrCfg.DataDbCfg().RmtDataDBCfgs { @@ -254,21 +255,18 @@ func main() { rmtDBConns[i] = engine.NewDataManager(dbConn, nil, nil, nil) } } - if len(ldrCfg.DataDbCfg().RplDataDBCfgs) != 0 { - rplDBConns = make([]*engine.DataManager, len(ldrCfg.DataDbCfg().RplDataDBCfgs)) - for i, dbCfg := range ldrCfg.DataDbCfg().RplDataDBCfgs { - dbConn, err := engine.NewDataDBConn(dbCfg.DataDbType, - dbCfg.DataDbHost, dbCfg.DataDbPort, - dbCfg.DataDbName, dbCfg.DataDbUser, - dbCfg.DataDbPass, ldrCfg.GeneralCfg().DBDataEncoding, - dbCfg.DataDbSentinelName) - if err != nil { - log.Fatalf("Coud not open dataDB connection: %s", err.Error()) - } - rplDBConns[i] = engine.NewDataManager(dbConn, nil, nil, nil) + if len(ldrCfg.DataDbCfg().RplConns) != 0 { + var err error + rplConns, err = engine.NewRPCPool(rpcclient.POOL_BROADCAST, ldrCfg.TlsCfg().ClientKey, + ldrCfg.TlsCfg().ClientCerificate, ldrCfg.TlsCfg().CaCertificate, + ldrCfg.GeneralCfg().ConnectAttempts, ldrCfg.GeneralCfg().Reconnects, + ldrCfg.GeneralCfg().ConnectTimeout, ldrCfg.GeneralCfg().ReplyTimeout, + ldrCfg.DataDbCfg().RplConns, nil, false) + if err != nil { + log.Fatalf("Coud not confignure dataDB replication connections: %s", err.Error()) } } - dm = engine.NewDataManager(d, config.CgrConfig().CacheCfg(), rmtDBConns, rplDBConns) + dm = engine.NewDataManager(d, config.CgrConfig().CacheCfg(), rmtDBConns, rplConns) defer dm.DataDB().Close() } diff --git a/config/config_json_test.go b/config/config_json_test.go index e53a6c782..e2437d66c 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -197,7 +197,7 @@ func TestDfDataDbJsonCfg(t *testing.T) { Db_password: utils.StringPointer(""), Redis_sentinel: utils.StringPointer(""), Query_timeout: utils.StringPointer("10s"), - Replication_conns: &[]*DbJsonCfg{}, + Replication_conns: &[]*RemoteHostJson{}, Remote_conns: &[]*DbJsonCfg{}, } if cfg, err := dfCgrJsonCfg.DbJsonCfg(DATADB_JSN); err != nil { diff --git a/config/config_test.go b/config/config_test.go index f976d7e05..206afabf7 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -423,8 +423,8 @@ func TestCgrCfgJSONDefaultsjsnDataDb(t *testing.T) { if len(cgrCfg.DataDbCfg().RmtDataDBCfgs) != 0 { t.Errorf("Expecting: 0, recived: %+v", len(cgrCfg.DataDbCfg().RmtDataDBCfgs)) } - if len(cgrCfg.DataDbCfg().RplDataDBCfgs) != 0 { - t.Errorf("Expecting: 0, recived: %+v", len(cgrCfg.DataDbCfg().RplDataDBCfgs)) + if len(cgrCfg.DataDbCfg().RplConns) != 0 { + t.Errorf("Expecting: 0, recived: %+v", len(cgrCfg.DataDbCfg().RplConns)) } } diff --git a/config/datadbcfg.go b/config/datadbcfg.go index 4ea4f74fc..3cc9aa874 100644 --- a/config/datadbcfg.go +++ b/config/datadbcfg.go @@ -36,8 +36,8 @@ type DataDbCfg struct { DataDbPass string // The user's password. DataDbSentinelName string QueryTimeout time.Duration - RmtDataDBCfgs []*DataDbCfg // Remote DataDB configurations - RplDataDBCfgs []*DataDbCfg // Replication DataDB configurations + RmtDataDBCfgs []*DataDbCfg // Remote DataDB configurations + RplConns []*RemoteHost // Replication conns } //loadFromJsonCfg loads Database config from JsonCfg @@ -85,12 +85,10 @@ func (dbcfg *DataDbCfg) loadFromJsonCfg(jsnDbCfg *DbJsonCfg) (err error) { } } if jsnDbCfg.Replication_conns != nil { - dbcfg.RplDataDBCfgs = make([]*DataDbCfg, len(*jsnDbCfg.Replication_conns)) - for i, cfg := range *jsnDbCfg.Replication_conns { - dbcfg.RplDataDBCfgs[i] = newDefaultDataDbCfg() - if err = dbcfg.RplDataDBCfgs[i].loadFromJsonCfg(cfg); err != nil { - return - } + dbcfg.RplConns = make([]*RemoteHost, len(*jsnDbCfg.Replication_conns)) + for idx, jsnRplCfg := range *jsnDbCfg.Replication_conns { + dbcfg.RplConns[idx] = NewDfltRemoteHost() + dbcfg.RplConns[idx].loadFromJsonCfg(jsnRplCfg) } } return nil @@ -121,6 +119,6 @@ func newDefaultDataDbCfg() *DataDbCfg { DataDbSentinelName: "", QueryTimeout: 10 * time.Second, RmtDataDBCfgs: nil, - RplDataDBCfgs: nil, + RplConns: nil, } } diff --git a/config/datadbcfg_test.go b/config/datadbcfg_test.go index 9519418d2..5e77e2cae 100644 --- a/config/datadbcfg_test.go +++ b/config/datadbcfg_test.go @@ -166,7 +166,7 @@ func TestDataDBRemoteReplication(t *testing.T) { DataDbPass: "", QueryTimeout: 10 * time.Second, RmtDataDBCfgs: nil, - RplDataDBCfgs: nil, + RplConns: nil, }, }, } @@ -192,14 +192,7 @@ func TestDataDBRemoteReplication(t *testing.T) { }, ], "replication_conns":[ - { - "db_type": "*mongo", - "db_host": "1.1.1.1", // data_db host address - "db_port": 1234, // data_db port to reach the database - "db_name": "test", // data_db database name to connect to - "db_user": "user", // username to use when connecting to data_db - "db_password": "pass", - }, + {"address": "127.0.0.1:2022","transport":"*json"}, ], } }` @@ -223,15 +216,10 @@ func TestDataDBRemoteReplication(t *testing.T) { QueryTimeout: 10 * time.Second, }, }, - RplDataDBCfgs: []*DataDbCfg{ - &DataDbCfg{ - DataDbType: utils.MONGO, - DataDbHost: "1.1.1.1", - DataDbPort: "1234", - DataDbName: "test", - DataDbUser: "user", - DataDbPass: "pass", - QueryTimeout: 10 * time.Second, + RplConns: []*RemoteHost{ + &RemoteHost{ + Address: "127.0.0.1:2022", + Transport: "*json", }, }, } @@ -259,24 +247,14 @@ func TestDataDBRemoteReplication(t *testing.T) { "db_type": "*mongo", "db_host": "1.2.3.4", // data_db host address "db_port": 1235, // data_db port to reach the database - "db_name": "remote_mongo", // data_db database name to connect to + "db_name": "internal_mongo", // data_db database name to connect to "db_user": "remote_mongo_user", // username to use when connecting to data_db }, ], "replication_conns":[ - { - "db_type": "*mongo", - "db_host": "1.1.1.1", // data_db host address - "db_port": 1234, // data_db port to reach the database - "db_name": "replication_mongo", // data_db database name to connect to - "db_user": "user", // username to use when connecting to data_db - "db_password": "pass", - }, - { - "db_host": "127.0.0.1", // data_db host address - "db_port": 8888, // data_db port to reach the database - "db_name": "15", // data_db database name to connect to - }, + {"address": "127.0.0.1:2032","transport":"*json"}, + {"address": "127.0.0.1:2042","transport":"*json"}, + {"address": "127.0.0.1:2052","transport":"*json"}, ], } }` @@ -303,30 +281,24 @@ func TestDataDBRemoteReplication(t *testing.T) { DataDbType: utils.MONGO, DataDbHost: "1.2.3.4", DataDbPort: "1235", - DataDbName: "remote_mongo", + DataDbName: "internal_mongo", DataDbUser: "remote_mongo_user", DataDbPass: "", QueryTimeout: 10 * time.Second, }, }, - RplDataDBCfgs: []*DataDbCfg{ - &DataDbCfg{ - DataDbType: utils.MONGO, - DataDbHost: "1.1.1.1", - DataDbPort: "1234", - DataDbName: "replication_mongo", - DataDbUser: "user", - DataDbPass: "pass", - QueryTimeout: 10 * time.Second, + RplConns: []*RemoteHost{ + &RemoteHost{ + Address: "127.0.0.1:2032", + Transport: "*json", }, - &DataDbCfg{ - DataDbType: utils.REDIS, - DataDbHost: "127.0.0.1", - DataDbPort: "8888", - DataDbName: "15", - DataDbUser: "cgrates", - DataDbPass: "", - QueryTimeout: 10 * time.Second, + &RemoteHost{ + Address: "127.0.0.1:2042", + Transport: "*json", + }, + &RemoteHost{ + Address: "127.0.0.1:2052", + Transport: "*json", }, }, } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 911544fbc..e05f06ac6 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -91,7 +91,7 @@ type DbJsonCfg struct { Query_timeout *string Sslmode *string // Used only in case of storDb Remote_conns *[]*DbJsonCfg - Replication_conns *[]*DbJsonCfg + Replication_conns *[]*RemoteHostJson } // Filters config diff --git a/console/ping.go b/console/ping.go index 615e8a186..c643e1aa5 100644 --- a/console/ping.go +++ b/console/ping.go @@ -75,6 +75,8 @@ func (self *CmdApierPing) RpcMethod() string { return utils.SchedulerSv1Ping case utils.RALsLow: return utils.RALsV1Ping + case utils.ReplicatorLow: + return utils.ReplicatorSv1Ping default: } return self.rpcMethod diff --git a/data/conf/samples/remote_mongo/cgrates.json b/data/conf/samples/internal_mongo/cgrates.json similarity index 90% rename from data/conf/samples/remote_mongo/cgrates.json rename to data/conf/samples/internal_mongo/cgrates.json index 52ca2453c..927ed5f0b 100644 --- a/data/conf/samples/remote_mongo/cgrates.json +++ b/data/conf/samples/internal_mongo/cgrates.json @@ -1,8 +1,4 @@ { -// CGRateS Configuration file -// - - "general": { "log_level": 7, "node_id": "RemoteMongo", @@ -27,11 +23,7 @@ } ], "replication_conns": [ - { - "db_type": "mongo", - "db_name": "10", - "db_port": 27017 - } + {"address": "127.0.0.1:2022", "transport":"*json"} ] }, @@ -138,7 +130,7 @@ "apier": { - "scheduler_conns": [ // connections to SchedulerS for reloads + "scheduler_conns": [ {"address": "*internal"} ] } diff --git a/data/conf/samples/remote_redis/cgrates.json b/data/conf/samples/internal_redis/cgrates.json similarity index 95% rename from data/conf/samples/remote_redis/cgrates.json rename to data/conf/samples/internal_redis/cgrates.json index e46d1dbd8..559394c20 100644 --- a/data/conf/samples/remote_redis/cgrates.json +++ b/data/conf/samples/internal_redis/cgrates.json @@ -24,14 +24,10 @@ "db_type": "*redis", "db_port": 6379, "db_name": "10", - } + }, ], "replication_conns": [ - { - "db_type": "*redis", - "db_port": 6379, - "db_name": "10", - } + {"address": "127.0.0.1:2022", "transport":"*json"} ] }, diff --git a/data/conf/samples/replication_mongo/cgrates.json b/data/conf/samples/replication_mongo/cgrates.json new file mode 100644 index 000000000..fbce5edd8 --- /dev/null +++ b/data/conf/samples/replication_mongo/cgrates.json @@ -0,0 +1,49 @@ +{ +// CGRateS Configuration file + + +"general": { + "log_level": 7, + "reply_timeout": "30s", +}, + + +"listen": { + "rpc_json": ":2022", + "rpc_gob": ":2023", + "http": ":2280" +}, + + +"data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017, +}, + + +"stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, +}, + + +"rals": { + "enabled": true, +}, + + +"scheduler": { + "enabled": true, +}, + + +"apier": { + "scheduler_conns": [ // connections to SchedulerS for reloads + {"address": "*internal"}, + ], +}, + + +} diff --git a/engine/datamanager.go b/engine/datamanager.go index 88f3d7846..45b251279 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -89,12 +89,13 @@ var ( ) // NewDataManager returns a new DataManager -func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, rmtDataDBs, rplDataDBs []*DataManager) *DataManager { +func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, rmtDataDBs []*DataManager, + rplConns *rpcclient.RpcClientPool) *DataManager { return &DataManager{ dataDB: dataDB, cacheCfg: cacheCfg, rmtDataDBs: rmtDataDBs, - rplDataDBs: rplDataDBs, + rplConns: rplConns, } } @@ -103,8 +104,8 @@ func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, rmtDataDBs, rplData type DataManager struct { dataDB DataDB rmtDataDBs []*DataManager - rplDataDBs []*DataManager cacheCfg config.CacheCfg + rplConns *rpcclient.RpcClientPool } // DataDB exports access to dataDB @@ -387,13 +388,6 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) { if err = dm.dataDB.SetStoredStatQueueDrv(ssq); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.dataDB.SetStoredStatQueueDrv(ssq); err != nil { - return - } - } - } return } @@ -402,14 +396,6 @@ func (dm *DataManager) RemoveStatQueue(tenant, id string, transactionID string) if err = dm.dataDB.RemStoredStatQueueDrv(tenant, id); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveStatQueue(tenant, id, - utils.NonTransactional); err != nil { - return - } - } - } return } @@ -462,13 +448,6 @@ func (dm *DataManager) SetFilter(fltr *Filter) (err error) { if err = dm.DataDB().SetFilterDrv(fltr); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetFilter(fltr); err != nil { - return - } - } - } return } @@ -477,14 +456,6 @@ func (dm *DataManager) RemoveFilter(tenant, id, transactionID string) (err error if err = dm.DataDB().RemoveFilterDrv(tenant, id); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveFilter(tenant, id, - utils.NonTransactional); err != nil { - return - } - } - } return } @@ -531,13 +502,6 @@ func (dm *DataManager) SetThreshold(th *Threshold) (err error) { if err = dm.DataDB().SetThresholdDrv(th); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetThreshold(th); err != nil { - return - } - } - } return } @@ -545,14 +509,6 @@ func (dm *DataManager) RemoveThreshold(tenant, id, transactionID string) (err er if err = dm.DataDB().RemoveThresholdDrv(tenant, id); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveThreshold(tenant, id, - utils.NonTransactional); err != nil { - return - } - } - } return } @@ -623,11 +579,17 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) return err } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetThresholdProfile(th, withIndex); err != nil { - return - } + if dm.rplConns != nil { + //call set threshold from replicator + var reply string + if err = dm.rplConns.Call("ReplicatorSv1.SetThresholdProfile", th, &reply); err != nil { + return + } + if err = dm.rplConns.Call("ReplicatorSv1.SetIndexes", th, &reply); err != nil { + return + } + if err = dm.rplConns.Call("ReplicatorSv1.SetThreshold", th, &reply); err != nil { + return } } return @@ -651,14 +613,7 @@ func (dm *DataManager) RemoveThresholdProfile(tenant, id, return } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveThresholdProfile(tenant, id, - utils.NonTransactional, withIndex); err != nil { - return - } - } - } + return } @@ -729,13 +684,6 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool return } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetStatQueueProfile(sqp, withIndex); err != nil { - return - } - } - } return } @@ -757,14 +705,6 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id, return } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveStatQueueProfile(tenant, id, - utils.NonTransactional, withIndex); err != nil { - return - } - } - } return } @@ -811,13 +751,6 @@ func (dm *DataManager) SetTiming(t *utils.TPTiming) (err error) { if err = dm.CacheDataFromDB(utils.TimingsPrefix, []string{t.ID}, true); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.DataDB().SetTimingDrv(t); err != nil { - return - } - } - } return } @@ -825,14 +758,6 @@ func (dm *DataManager) RemoveTiming(id, transactionID string) (err error) { if err = dm.DataDB().RemoveTimingDrv(id); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveTiming(id, - utils.NonTransactional); err != nil { - return - } - } - } Cache.Remove(utils.CacheTimings, id, cacheCommit(transactionID), transactionID) return @@ -881,13 +806,6 @@ func (dm *DataManager) SetResource(rs *Resource) (err error) { if err = dm.DataDB().SetResourceDrv(rs); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetResource(rs); err != nil { - return - } - } - } return } @@ -895,14 +813,6 @@ func (dm *DataManager) RemoveResource(tenant, id, transactionID string) (err err if err = dm.DataDB().RemoveResourceDrv(tenant, id); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveResource(tenant, id, - utils.NonTransactional); err != nil { - return - } - } - } return } @@ -973,13 +883,6 @@ func (dm *DataManager) SetResourceProfile(rp *ResourceProfile, withIndex bool) ( } Cache.Clear([]string{utils.CacheEventResources}) } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetResourceProfile(rp, withIndex); err != nil { - return - } - } - } return } @@ -1000,14 +903,6 @@ func (dm *DataManager) RemoveResourceProfile(tenant, id, transactionID string, w return } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveResourceProfile(tenant, id, - utils.NonTransactional, withIndex); err != nil { - return - } - } - } return } @@ -1050,14 +945,6 @@ func (dm *DataManager) RemoveActionTriggers(id, transactionID string) (err error if err = dm.DataDB().RemoveActionTriggersDrv(id); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveActionTriggers(id, - utils.NonTransactional); err != nil { - return - } - } - } Cache.Remove(utils.CacheActionTriggers, id, cacheCommit(transactionID), transactionID) return @@ -1071,13 +958,6 @@ func (dm *DataManager) SetActionTriggers(key string, attr ActionTriggers, if err = dm.CacheDataFromDB(utils.ACTION_TRIGGER_PREFIX, []string{key}, true); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.DataDB().SetActionTriggersDrv(key, attr); err != nil { - return - } - } - } return } @@ -1125,13 +1005,6 @@ func (dm *DataManager) SetSharedGroup(sg *SharedGroup, []string{sg.Id}, true); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.DataDB().SetSharedGroupDrv(sg); err != nil { - return - } - } - } return } @@ -1139,14 +1012,6 @@ func (dm *DataManager) RemoveSharedGroup(id, transactionID string) (err error) { if err = dm.DataDB().RemoveSharedGroupDrv(id); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveSharedGroup(id, - utils.NonTransactional); err != nil { - return - } - } - } Cache.Remove(utils.CacheSharedGroups, id, cacheCommit(transactionID), transactionID) return @@ -1196,13 +1061,6 @@ func (dm *DataManager) SetActions(key string, as Actions, transactionID string) if err = dm.CacheDataFromDB(utils.ACTION_PREFIX, []string{key}, true); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.DataDB().SetActionsDrv(key, as); err != nil { - return - } - } - } return } @@ -1210,14 +1068,6 @@ func (dm *DataManager) RemoveActions(key, transactionID string) (err error) { if err = dm.DataDB().RemoveActionsDrv(key); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveActions(key, - utils.NonTransactional); err != nil { - return - } - } - } Cache.Remove(utils.CacheActions, key, cacheCommit(transactionID), transactionID) return @@ -1318,13 +1168,6 @@ func (dm *DataManager) SetRatingPlan(rp *RatingPlan, transactionID string) (err if err = dm.CacheDataFromDB(utils.RATING_PLAN_PREFIX, []string{rp.Id}, true); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.DataDB().SetRatingPlanDrv(rp); err != nil { - return - } - } - } return } @@ -1332,14 +1175,6 @@ func (dm *DataManager) RemoveRatingPlan(key string, transactionID string) (err e if err = dm.DataDB().RemoveRatingPlanDrv(key); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveRatingPlan(key, - utils.NonTransactional); err != nil { - return - } - } - } Cache.Remove(utils.CacheRatingPlans, key, cacheCommit(transactionID), transactionID) return @@ -1388,13 +1223,6 @@ func (dm *DataManager) SetRatingProfile(rpf *RatingProfile, if err = dm.CacheDataFromDB(utils.RATING_PROFILE_PREFIX, []string{rpf.Id}, true); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.DataDB().SetRatingProfileDrv(rpf); err != nil { - return - } - } - } return } @@ -1403,14 +1231,6 @@ func (dm *DataManager) RemoveRatingProfile(key string, if err = dm.DataDB().RemoveRatingProfileDrv(key); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveRatingProfile(key, - utils.NonTransactional); err != nil { - return - } - } - } Cache.Remove(utils.CacheRatingProfiles, key, cacheCommit(transactionID), transactionID) return @@ -1446,14 +1266,6 @@ func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string, indexes, commit, transactionID); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.DataDB().SetFilterIndexesDrv(cacheID, itemIDPrefix, - indexes, commit, transactionID); err != nil { - return - } - } - } return } @@ -1461,14 +1273,6 @@ func (dm *DataManager) RemoveFilterIndexes(cacheID, itemIDPrefix string) (err er if err = dm.DataDB().RemoveFilterIndexesDrv(cacheID, itemIDPrefix); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveFilterIndexes(cacheID, - itemIDPrefix); err != nil { - return - } - } - } return } @@ -1593,13 +1397,6 @@ func (dm *DataManager) SetSupplierProfile(supp *SupplierProfile, withIndex bool) return } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetSupplierProfile(supp, withIndex); err != nil { - return - } - } - } return } @@ -1620,14 +1417,6 @@ func (dm *DataManager) RemoveSupplierProfile(tenant, id, transactionID string, w return } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveSupplierProfile(tenant, id, - utils.NonTransactional, withIndex); err != nil { - return - } - } - } return } @@ -1709,13 +1498,6 @@ func (dm *DataManager) SetAttributeProfile(ap *AttributeProfile, withIndex bool) } } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetAttributeProfile(ap, withIndex); err != nil { - return - } - } - } return } @@ -1738,14 +1520,6 @@ func (dm *DataManager) RemoveAttributeProfile(tenant, id string, transactionID s } } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveAttributeProfile(tenant, id, - utils.NonTransactional, withIndex); err != nil { - return - } - } - } return } @@ -1816,13 +1590,6 @@ func (dm *DataManager) SetChargerProfile(cpp *ChargerProfile, withIndex bool) (e return } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetChargerProfile(cpp, withIndex); err != nil { - return - } - } - } return } @@ -1844,14 +1611,6 @@ func (dm *DataManager) RemoveChargerProfile(tenant, id string, return } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveChargerProfile(tenant, id, - utils.NonTransactional, withIndex); err != nil { - return - } - } - } return } @@ -1929,13 +1688,6 @@ func (dm *DataManager) SetDispatcherProfile(dpp *DispatcherProfile, withIndex bo } } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetDispatcherProfile(dpp, withIndex); err != nil { - return - } - } - } return } @@ -1959,14 +1711,6 @@ func (dm *DataManager) RemoveDispatcherProfile(tenant, id string, } } } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveDispatcherProfile(tenant, id, - utils.NonTransactional, withIndex); err != nil { - return - } - } - } return } @@ -2021,13 +1765,7 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit func (dm *DataManager) SetDispatcherHost(dpp *DispatcherHost) (err error) { if err = dm.DataDB().SetDispatcherHostDrv(dpp); err != nil { - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetDispatcherHost(dpp); err != nil { - return - } - } - } + return } return } @@ -2044,14 +1782,6 @@ func (dm *DataManager) RemoveDispatcherHost(tenant, id string, if oldDpp == nil { return utils.ErrNotFound } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.RemoveDispatcherHost(tenant, id, - utils.NonTransactional); err != nil { - return - } - } - } return } @@ -2091,13 +1821,6 @@ func (dm *DataManager) SetLoadIDs(loadIDs map[string]int64) (err error) { if err = dm.DataDB().SetLoadIDsDrv(loadIDs); err != nil { return } - if len(dm.rplDataDBs) != 0 { - for _, rplDM := range dm.rplDataDBs { - if err = rplDM.SetLoadIDs(loadIDs); err != nil { - return - } - } - } return } diff --git a/engine/libtest.go b/engine/libtest.go index 313529b6d..916f043c1 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -292,7 +292,8 @@ func InitDataDb(cfg *config.CGRConfig) error { if err != nil { return err } - var rmtDBConns, rplDBConns []*DataManager + var rmtDBConns []*DataManager + var rplConns *rpcclient.RpcClientPool if len(cfg.DataDbCfg().RmtDataDBCfgs) != 0 { rmtDBConns = make([]*DataManager, len(cfg.DataDbCfg().RmtDataDBCfgs)) for i, dbCfg := range cfg.DataDbCfg().RmtDataDBCfgs { @@ -307,21 +308,17 @@ func InitDataDb(cfg *config.CGRConfig) error { rmtDBConns[i] = NewDataManager(dbConn, nil, nil, nil) } } - if len(cfg.DataDbCfg().RplDataDBCfgs) != 0 { - rplDBConns = make([]*DataManager, len(cfg.DataDbCfg().RplDataDBCfgs)) - for i, dbCfg := range cfg.DataDbCfg().RplDataDBCfgs { - dbConn, err := NewDataDBConn(dbCfg.DataDbType, - dbCfg.DataDbHost, dbCfg.DataDbPort, - dbCfg.DataDbName, dbCfg.DataDbUser, - dbCfg.DataDbPass, cfg.GeneralCfg().DBDataEncoding, - dbCfg.DataDbSentinelName) - if err != nil { - return err - } - rplDBConns[i] = NewDataManager(dbConn, nil, nil, nil) + if len(cfg.DataDbCfg().RplConns) != 0 { + rplConns, err = NewRPCPool(rpcclient.POOL_BROADCAST, cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cfg.DataDbCfg().RplConns, nil, false) + if err != nil { + return err } } - dm := NewDataManager(d, cfg.CacheCfg(), rmtDBConns, rplDBConns) + dm := NewDataManager(d, cfg.CacheCfg(), rmtDBConns, rplConns) if err := dm.DataDB().Flush(""); err != nil { return err diff --git a/engine/tpreader.go b/engine/tpreader.go index b70447f19..0cc105f8c 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -68,7 +68,8 @@ type TpReader struct { func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string, cacheS rpcclient.RpcClientConnection, schedulerS rpcclient.RpcClientConnection) (*TpReader, error) { - var rmtDBConns, rplDBConns []*DataManager + var rmtDBConns []*DataManager + var rplConns *rpcclient.RpcClientPool if len(config.CgrConfig().DataDbCfg().RmtDataDBCfgs) != 0 { rmtDBConns = make([]*DataManager, len(config.CgrConfig().DataDbCfg().RmtDataDBCfgs)) for i, dbCfg := range config.CgrConfig().DataDbCfg().RmtDataDBCfgs { @@ -83,24 +84,21 @@ func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string, rmtDBConns[i] = NewDataManager(dbConn, nil, nil, nil) } } - if len(config.CgrConfig().DataDbCfg().RplDataDBCfgs) != 0 { - rplDBConns = make([]*DataManager, len(config.CgrConfig().DataDbCfg().RplDataDBCfgs)) - for i, dbCfg := range config.CgrConfig().DataDbCfg().RplDataDBCfgs { - dbConn, err := NewDataDBConn(dbCfg.DataDbType, - dbCfg.DataDbHost, dbCfg.DataDbPort, - dbCfg.DataDbName, dbCfg.DataDbUser, - dbCfg.DataDbPass, config.CgrConfig().GeneralCfg().DBDataEncoding, - dbCfg.DataDbSentinelName) - if err != nil { - return nil, err - } - rplDBConns[i] = NewDataManager(dbConn, nil, nil, nil) + if len(config.CgrConfig().DataDbCfg().RplConns) != 0 { + var err error + rplConns, err = NewRPCPool(rpcclient.POOL_BROADCAST, config.CgrConfig().TlsCfg().ClientKey, + config.CgrConfig().TlsCfg().ClientCerificate, config.CgrConfig().TlsCfg().CaCertificate, + config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects, + config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout, + config.CgrConfig().DataDbCfg().RplConns, nil, false) + if err != nil { + return nil, err } } tpr := &TpReader{ tpid: tpid, timezone: timezone, - dm: NewDataManager(db, config.CgrConfig().CacheCfg(), rmtDBConns, rplDBConns), // ToDo: add ChacheCfg as parameter to the NewTpReader + dm: NewDataManager(db, config.CgrConfig().CacheCfg(), rmtDBConns, rplConns), // ToDo: add ChacheCfg as parameter to the NewTpReader lr: lr, cacheS: cacheS, schedulerS: schedulerS, diff --git a/services/apierv1.go b/services/apierv1.go index 19213adc3..90e93ce10 100644 --- a/services/apierv1.go +++ b/services/apierv1.go @@ -122,6 +122,7 @@ func (api *ApierV1Service) Start() (err error) { if !api.cfg.DispatcherSCfg().Enabled { api.server.RpcRegister(api.api) + api.server.RpcRegister(v1.NewReplicatorSv1(api.dm.GetDM())) } utils.RegisterRpcParams("", &v1.CDRsV1{}) diff --git a/services/datadb.go b/services/datadb.go index 8a584547c..1f866ae79 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -68,7 +68,8 @@ func (db *DataDBService) Start() (err error) { utils.Logger.Warning(fmt.Sprintf("Could not configure dataDb: %s.Some SessionS APIs will not work", err)) return } - var rmtDBConns, rplDBConns []*engine.DataManager + var rmtDBConns []*engine.DataManager + var rplConns *rpcclient.RpcClientPool if len(db.cfg.DataDbCfg().RmtDataDBCfgs) != 0 { rmtDBConns = make([]*engine.DataManager, len(db.cfg.DataDbCfg().RmtDataDBCfgs)) for i, dbCfg := range db.cfg.DataDbCfg().RmtDataDBCfgs { @@ -83,21 +84,18 @@ func (db *DataDBService) Start() (err error) { rmtDBConns[i] = engine.NewDataManager(dbConn, nil, nil, nil) } } - if len(db.cfg.DataDbCfg().RplDataDBCfgs) != 0 { - rplDBConns = make([]*engine.DataManager, len(db.cfg.DataDbCfg().RplDataDBCfgs)) - for i, dbCfg := range db.cfg.DataDbCfg().RplDataDBCfgs { - dbConn, err := engine.NewDataDBConn(dbCfg.DataDbType, - dbCfg.DataDbHost, dbCfg.DataDbPort, - dbCfg.DataDbName, dbCfg.DataDbUser, - dbCfg.DataDbPass, db.cfg.GeneralCfg().DBDataEncoding, - dbCfg.DataDbSentinelName) - if err != nil { - log.Fatalf("Coud not open dataDB connection: %s", err.Error()) - } - rplDBConns[i] = engine.NewDataManager(dbConn, nil, nil, nil) + if len(config.CgrConfig().DataDbCfg().RplConns) != 0 { + var err error + rplConns, err = engine.NewRPCPool(rpcclient.POOL_BROADCAST, db.cfg.TlsCfg().ClientKey, + db.cfg.TlsCfg().ClientCerificate, db.cfg.TlsCfg().CaCertificate, + db.cfg.GeneralCfg().ConnectAttempts, db.cfg.GeneralCfg().Reconnects, + db.cfg.GeneralCfg().ConnectTimeout, db.cfg.GeneralCfg().ReplyTimeout, + db.cfg.DataDbCfg().RplConns, nil, false) + if err != nil { + log.Fatalf("Coud not confignure dataDB replication connections: %s", err.Error()) } } - db.db = engine.NewDataManager(d, db.cfg.CacheCfg(), rmtDBConns, rplDBConns) + db.db = engine.NewDataManager(d, db.cfg.CacheCfg(), rmtDBConns, rplConns) engine.SetDataStorage(db.db) if err = engine.CheckVersions(db.db.DataDB()); err != nil { fmt.Println(err) diff --git a/services/rals.go b/services/rals.go index 49838110f..14eb7d98b 100644 --- a/services/rals.go +++ b/services/rals.go @@ -176,4 +176,4 @@ func (rals *RalService) GetAPIv2() servmanager.Service { // GetResponder returns the responder service func (rals *RalService) GetResponder() servmanager.Service { return rals.responder -} +} \ No newline at end of file diff --git a/utils/consts.go b/utils/consts.go index 176a0e720..7bb2cb5d8 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -628,6 +628,7 @@ const ( SchedulerSLow = "schedulers" LoaderSLow = "loaders" RALsLow = "rals" + ReplicatorLow = "replicator" ) // Actions @@ -789,6 +790,13 @@ const ( MetaNotEqual = "*noteq" ) +// ReplicatorSv1 APIs +const ( + ReplicatorSv1 = "ReplicatorSv1" + ReplicatorSv1Ping = "ReplicatorSv1.Ping" + ReplicatorSv1SetThresholdProfile = "ReplicatorSv1.SetThresholdProfile" +) + // ApierV1 APIs const ( ApierV1 = "ApierV1"