diff --git a/apier/v1/dm_remote_it_test.go b/apier/v1/dm_remote_it_test.go index c2b03abdb..60b8ecf45 100644 --- a/apier/v1/dm_remote_it_test.go +++ b/apier/v1/dm_remote_it_test.go @@ -35,159 +35,111 @@ import ( var ( internalCfgPath string - internalCfgDirPath string + internalCfgDirPath = "internal" internalCfg *config.CGRConfig internalRPC *rpc.Client - rmtDM *engine.DataManager + + engineOneCfgPath string + engineOneCfgDirPath string + engineOneCfg *config.CGRConfig + engineOneRPC *rpc.Client + + engineTwoCfgPath string + engineTwoCfgDirPath string + engineTwoCfg *config.CGRConfig + engineTwoRPC *rpc.Client ) var sTestsInternalRemoteIT = []func(t *testing.T){ - testInternalRemoteITDataFlush, - testInternalRemoteITCheckEmpty, - testInternalRemoteITLoadData, - testInternalRemoteITVerifyLoadedDataInRemote, testInternalRemoteITInitCfg, + testInternalRemoteITDataFlush, testInternalRemoteITStartEngine, testInternalRemoteITRPCConn, + testInternalRemoteLoadDataInEngineTwo, testInternalRemoteITGetAccount, - testInternalRemoteITGetAttribute, - testInternalRemoteITGetThreshold, - testInternalRemoteITGetThresholdProfile, - testInternalRemoteITGetResource, - testInternalRemoteITGetResourceProfile, - testInternalRemoteITGetStatQueueProfile, - testInternalRemoteITGetSupplier, - testInternalRemoteITGetFilter, - testInternalRemoteITGetRatingPlan, - testInternalRemoteITGetRatingProfile, - testInternalRemoteITGetAction, - testInternalRemoteITGetActionPlan, - testInternalRemoteITGetAccountActionPlan, - //testInternalReplicationSetThreshold, + //testInternalRemoteITGetAttribute, + //testInternalRemoteITGetThreshold, + //testInternalRemoteITGetThresholdProfile, + //testInternalRemoteITGetResource, + //testInternalRemoteITGetResourceProfile, + //testInternalRemoteITGetStatQueueProfile, + //testInternalRemoteITGetSupplier, + //testInternalRemoteITGetFilter, + //testInternalRemoteITGetRatingPlan, + //testInternalRemoteITGetRatingProfile, + //testInternalRemoteITGetAction, + //testInternalRemoteITGetActionPlan, + //testInternalRemoteITGetAccountActionPlan, + ////testInternalReplicationSetThreshold, testInternalRemoteITKillEngine, } func TestInternalRemoteITRedis(t *testing.T) { - internalCfgDirPath = "internal_redis" - cfg, _ := config.NewDefaultCGRConfig() - dataDB, err := engine.NewRedisStorage( - fmt.Sprintf("%s:%s", cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort), - 10, cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding, - utils.REDIS_MAX_CONNS, "") - if err != nil { - t.Fatal("Could not connect to Redis", err.Error()) - } - rmtDM = engine.NewDataManager(dataDB, nil, nil, nil) + engineOneCfgDirPath = "engine1_redis" + engineTwoCfgDirPath = "engine2_redis" for _, stest := range sTestsInternalRemoteIT { t.Run("TestInternalRemoteITRedis", stest) } } func TestInternalRemoteITMongo(t *testing.T) { - internalCfgDirPath = "internal_mongo" - mgoITCfg, err := config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "tutmongo")) - if err != nil { - t.Fatal(err) - } - dataDB, err := engine.NewMongoStorage(mgoITCfg.DataDbCfg().DataDbHost, - mgoITCfg.DataDbCfg().DataDbPort, mgoITCfg.DataDbCfg().DataDbName, - mgoITCfg.DataDbCfg().DataDbUser, mgoITCfg.DataDbCfg().DataDbPass, - utils.DataDB, nil, false) - if err != nil { - t.Fatal("Could not connect to Mongo", err.Error()) - } - rmtDM = engine.NewDataManager(dataDB, nil, nil, nil) + engineOneCfgDirPath = "engine1_mongo" + engineTwoCfgDirPath = "engine2_mongo" for _, stest := range sTestsInternalRemoteIT { t.Run("TestInternalRemoteITMongo", stest) } } -func testInternalRemoteITDataFlush(t *testing.T) { - if err := rmtDM.DataDB().Flush(""); err != nil { - t.Error(err) - } -} - -func testInternalRemoteITCheckEmpty(t *testing.T) { - test, err := rmtDM.DataDB().IsDBEmpty() - if err != nil { - t.Error(err) - } else if test != true { - t.Errorf("\nExpecting: true got :%+v", test) - } -} - -func testInternalRemoteITLoadData(t *testing.T) { - loader, err := engine.NewTpReader(rmtDM.DataDB(), - engine.NewFileCSVStorage(utils.CSV_SEP, path.Join(*dataDir, "tariffplans", "tutorial"), false), - "", "", nil, nil) - if err != nil { - t.Error(err) - } - if err := loader.LoadAll(); err != nil { - t.Error(err) - } - if err := loader.WriteToDatabase(false, false); err != nil { - t.Error(err) - } - acc := &engine.Account{ - ID: "cgrates.org:testAccount", - BalanceMap: map[string]engine.Balances{ - "utils.MONETARY": []*engine.Balance{ - { - ID: "testAccount", - Value: 10, - Weight: 10, - }, - }, - }, - } - if err := rmtDM.DataDB().SetAccount(acc); err != nil { - t.Error(err) - } -} - -func testInternalRemoteITVerifyLoadedDataInRemote(t *testing.T) { - exp := &engine.AttributeProfile{ - Tenant: "cgrates.org", - ID: "ATTR_1001_SIMPLEAUTH", - Contexts: []string{"simpleauth"}, - FilterIDs: []string{"*string:~Account:1001"}, - Attributes: []*engine.Attribute{ - { - FieldName: "Password", - FilterIDs: []string{}, - Type: utils.META_CONSTANT, - Value: config.NewRSRParsersMustCompile("CGRateS.org", true, utils.INFIELD_SEP), - }, - }, - Weight: 20, - } - if tempAttr, err := rmtDM.GetAttributeProfile("cgrates.org", "ATTR_1001_SIMPLEAUTH", - false, false, utils.NonTransactional); err != nil { - t.Errorf("Error: %+v", err) - } else { - exp.Compile() - tempAttr.Compile() - if !reflect.DeepEqual(exp, tempAttr) { - t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(tempAttr)) - } - } -} - func testInternalRemoteITInitCfg(t *testing.T) { var err error - internalCfgPath = path.Join(*dataDir, "conf", "samples", internalCfgDirPath) + internalCfgPath = path.Join(*dataDir, "conf", "samples", + "remote_replication", internalCfgDirPath) internalCfg, err = config.NewCGRConfigFromPath(internalCfgPath) if err != nil { t.Error(err) } internalCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() config.SetCgrConfig(internalCfg) + + // prepare config for engine1 + engineOneCfgPath = path.Join(*dataDir, "conf", "samples", + "remote_replication", engineOneCfgDirPath) + engineOneCfg, err = config.NewCGRConfigFromPath(engineOneCfgPath) + if err != nil { + t.Error(err) + } + engineOneCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() + + // prepare config for engine2 + engineTwoCfgPath = path.Join(*dataDir, "conf", "samples", + "remote_replication", engineTwoCfgDirPath) + engineTwoCfg, err = config.NewCGRConfigFromPath(engineTwoCfgPath) + if err != nil { + t.Error(err) + } + engineTwoCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() + +} + +func testInternalRemoteITDataFlush(t *testing.T) { + if err := engine.InitDataDb(engineOneCfg); err != nil { + t.Fatal(err) + } + time.Sleep(100 * time.Millisecond) + if err := engine.InitDataDb(engineTwoCfg); err != nil { + t.Fatal(err) + } + time.Sleep(100 * time.Millisecond) } func testInternalRemoteITStartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(internalCfgPath, *waitRater); err != nil { + if _, err := engine.StartEngine(engineOneCfgPath, 500); err != nil { + t.Fatal(err) + } + if _, err := engine.StartEngine(engineTwoCfgPath, 500); err != nil { + t.Fatal(err) + } + if _, err := engine.StartEngine(internalCfgPath, 500); err != nil { t.Fatal(err) } } @@ -196,8 +148,31 @@ func testInternalRemoteITRPCConn(t *testing.T) { var err error internalRPC, err = jsonrpc.Dial("tcp", internalCfg.ListenCfg().RPCJSONListen) if err != nil { + fmt.Println(err) t.Fatal(err) } + time.Sleep(200 * time.Millisecond) + engineOneRPC, err = jsonrpc.Dial("tcp", engineOneCfg.ListenCfg().RPCJSONListen) + if err != nil { + fmt.Println(err) + t.Fatal(err) + } + time.Sleep(200 * time.Millisecond) + engineTwoRPC, err = jsonrpc.Dial("tcp", engineTwoCfg.ListenCfg().RPCJSONListen) + if err != nil { + fmt.Println(err) + t.Fatal(err) + } + time.Sleep(200 * time.Millisecond) +} + +func testInternalRemoteLoadDataInEngineTwo(t *testing.T) { + var reply string + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} + if err := engineTwoRPC.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { + t.Error(err) + } + time.Sleep(500 * time.Millisecond) } func testInternalRemoteITGetAccount(t *testing.T) { @@ -609,33 +584,33 @@ func testInternalReplicationSetThreshold(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply) } - // verify threshold profile in replication dataDB - if rcv, err := rmtDM.GetThresholdProfile("cgrates.org", "THD_Replication", - false, false, utils.NonTransactional); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rcv) { - t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rcv) - } - // - eIdxes := map[string]utils.StringMap{ - "*string:~Account:1001": { - "THD_ACNT_1001": true, - "THD_Replication": true, - }, - "*string:~Account:1002": { - "THD_ACNT_1002": true, - }, - "*string:~CustomField:CustomValue": { - "THD_Replication": true, - }, - } - if rcvIdx, err := rmtDM.GetFilterIndexes( - utils.PrefixToIndexCache[utils.ThresholdProfilePrefix], tPrfl.Tenant, - utils.EmptyString, nil); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eIdxes, rcvIdx) { - t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) - } + //// verify threshold profile in replication dataDB + //if rcv, err := rmtDM.GetThresholdProfile("cgrates.org", "THD_Replication", + // false, false, utils.NonTransactional); err != nil { + // t.Error(err) + //} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rcv) { + // t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rcv) + //} + //// + //eIdxes := map[string]utils.StringMap{ + // "*string:~Account:1001": { + // "THD_ACNT_1001": true, + // "THD_Replication": true, + // }, + // "*string:~Account:1002": { + // "THD_ACNT_1002": true, + // }, + // "*string:~CustomField:CustomValue": { + // "THD_Replication": true, + // }, + //} + //if rcvIdx, err := rmtDM.GetFilterIndexes( + // utils.PrefixToIndexCache[utils.ThresholdProfilePrefix], tPrfl.Tenant, + // utils.EmptyString, nil); err != nil { + // t.Error(err) + //} else if !reflect.DeepEqual(eIdxes, rcvIdx) { + // t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) + //} } func testInternalRemoteITKillEngine(t *testing.T) { diff --git a/apier/v1/replicator.go b/apier/v1/replicator.go index 4994ff399..63764918d 100644 --- a/apier/v1/replicator.go +++ b/apier/v1/replicator.go @@ -37,6 +37,236 @@ func (rplSv1 *ReplicatorSv1) Call(serviceMethod string, args interface{}, reply return utils.APIerRPCCall(rplSv1, serviceMethod, args, reply) } +//GetAccount +func (rplSv1 *ReplicatorSv1) GetAccount(id string, reply *engine.Account) error { + if rcv, err := rplSv1.dm.GetAccount(id); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetStatQueue +func (rplSv1 *ReplicatorSv1) GetStatQueue(tntID *utils.TenantID, reply *engine.StoredStatQueue) error { + if rcv, err := rplSv1.dm.DataDB().GetStoredStatQueueDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetFilter +func (rplSv1 *ReplicatorSv1) GetFilter(tntID *utils.TenantID, reply *engine.Filter) error { + if rcv, err := rplSv1.dm.DataDB().GetFilterDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetThreshold +func (rplSv1 *ReplicatorSv1) GetThreshold(tntID *utils.TenantID, reply *engine.Threshold) error { + if rcv, err := rplSv1.dm.DataDB().GetThresholdDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetThresholdProfile +func (rplSv1 *ReplicatorSv1) GetThresholdProfile(tntID *utils.TenantID, reply *engine.ThresholdProfile) error { + if rcv, err := rplSv1.dm.DataDB().GetThresholdProfileDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetStatQueueProfile +func (rplSv1 *ReplicatorSv1) GetStatQueueProfile(tntID *utils.TenantID, reply *engine.StatQueueProfile) error { + if rcv, err := rplSv1.dm.DataDB().GetStatQueueProfileDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetTiming +func (rplSv1 *ReplicatorSv1) GetTiming(id string, reply *utils.TPTiming) error { + if rcv, err := rplSv1.dm.DataDB().GetTimingDrv(id); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetResource +func (rplSv1 *ReplicatorSv1) GetResource(tntID *utils.TenantID, reply *engine.Resource) error { + if rcv, err := rplSv1.dm.DataDB().GetResourceDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetResourceProfile +func (rplSv1 *ReplicatorSv1) GetResourceProfile(tntID *utils.TenantID, reply *engine.ResourceProfile) error { + if rcv, err := rplSv1.dm.DataDB().GetResourceProfileDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetActionTriggers +func (rplSv1 *ReplicatorSv1) GetActionTriggers(id string, reply engine.ActionTriggers) error { + if rcv, err := rplSv1.dm.DataDB().GetActionTriggersDrv(id); err != nil { + return err + } else { + reply = rcv + } + return nil +} + +//GetShareGroup +func (rplSv1 *ReplicatorSv1) GetShareGroup(id string, reply *engine.SharedGroup) error { + if rcv, err := rplSv1.dm.DataDB().GetSharedGroupDrv(id); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetActions +func (rplSv1 *ReplicatorSv1) GetActions(id string, reply engine.Actions) error { + if rcv, err := rplSv1.dm.DataDB().GetActionsDrv(id); err != nil { + return err + } else { + reply = rcv + } + return nil +} + +//GetActions +func (rplSv1 *ReplicatorSv1) GetActionPlan(id string, reply *engine.ActionPlan) error { + if rcv, err := rplSv1.dm.DataDB().GetActionPlanDrv(id, true, utils.NonTransactional); err != nil { + return err + } else { + reply = rcv + } + return nil +} + +//GetAllActionPlans +func (rplSv1 *ReplicatorSv1) GetAllActionPlans(_ string, reply *map[string]*engine.ActionPlan) error { + if rcv, err := rplSv1.dm.DataDB().GetAllActionPlansDrv(); err != nil { + return err + } else { + *reply = rcv + } + return nil +} + +//GetAccountActionPlans +func (rplSv1 *ReplicatorSv1) GetAccountActionPlans(id string, reply *[]string) error { + if rcv, err := rplSv1.dm.DataDB().GetAccountActionPlansDrv(id, false, utils.NonTransactional); err != nil { + return err + } else { + *reply = rcv + } + return nil +} + +//GetAllActionPlans +func (rplSv1 *ReplicatorSv1) GetRatingPlan(id string, reply *engine.RatingPlan) error { + if rcv, err := rplSv1.dm.DataDB().GetRatingPlanDrv(id); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetAllActionPlans +func (rplSv1 *ReplicatorSv1) GetRatingProfile(id string, reply *engine.RatingProfile) error { + if rcv, err := rplSv1.dm.DataDB().GetRatingProfileDrv(id); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetResourceProfile +func (rplSv1 *ReplicatorSv1) GetSupplierProfile(tntID *utils.TenantID, reply *engine.SupplierProfile) error { + if rcv, err := rplSv1.dm.DataDB().GetSupplierProfileDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetResourceProfile +func (rplSv1 *ReplicatorSv1) GetAttributeProfile(tntID *utils.TenantID, reply *engine.AttributeProfile) error { + if rcv, err := rplSv1.dm.DataDB().GetAttributeProfileDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetResourceProfile +func (rplSv1 *ReplicatorSv1) GetChargerProfile(tntID *utils.TenantID, reply *engine.ChargerProfile) error { + if rcv, err := rplSv1.dm.DataDB().GetChargerProfileDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetResourceProfile +func (rplSv1 *ReplicatorSv1) GetDispatcherProfile(tntID *utils.TenantID, reply *engine.DispatcherProfile) error { + if rcv, err := rplSv1.dm.DataDB().GetDispatcherProfileDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetResourceProfile +func (rplSv1 *ReplicatorSv1) GetDispatcherHost(tntID *utils.TenantID, reply *engine.DispatcherHost) error { + if rcv, err := rplSv1.dm.DataDB().GetDispatcherHostDrv(tntID.Tenant, tntID.ID); err != nil { + return err + } else { + *reply = *rcv + } + return nil +} + +//GetResourceProfile +func (rplSv1 *ReplicatorSv1) GetItemLoadIDs(itemID string, reply *map[string]int64) error { + if rcv, err := rplSv1.dm.DataDB().GetItemLoadIDsDrv(itemID); err != nil { + return err + } else { + *reply = rcv + } + return nil +} + // SetThresholdProfile alters/creates a ThresholdProfile func (rplSv1 *ReplicatorSv1) SetThresholdProfile(th *engine.ThresholdProfile, reply *string) error { if err := rplSv1.dm.DataDB().SetThresholdProfileDrv(th); err != nil { diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 6d026f296..0ccf27839 100755 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -239,20 +239,16 @@ func main() { if err != nil { log.Fatalf("Coud not open dataDB connection: %s", err.Error()) } - var rmtDBConns []*engine.DataManager - var rplConns *rpcclient.RpcClientPool - if len(ldrCfg.DataDbCfg().RmtDataDBCfgs) != 0 { - rmtDBConns = make([]*engine.DataManager, len(ldrCfg.DataDbCfg().RmtDataDBCfgs)) - for i, dbCfg := range ldrCfg.DataDbCfg().RmtDataDBCfgs { - dbConn, err := engine.NewDataDBConn(dbCfg.DataDbType, - dbCfg.DataDbHost, dbCfg.DataDbPort, - dbCfg.DataDbName, dbCfg.DataDbUser, - dbCfg.DataDbPass, ldrCfg.GeneralCfg().DBDataEncoding, - dbCfg.DataDbSentinelName) - if err != nil { - log.Fatalf("Coud not open dataDB connection: %s", err.Error()) - } - rmtDBConns[i] = engine.NewDataManager(dbConn, nil, nil, nil) + var rmtConns, rplConns *rpcclient.RpcClientPool + if len(ldrCfg.DataDbCfg().RmtConns) != 0 { + var err error + rmtConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, ldrCfg.TlsCfg().ClientKey, + ldrCfg.TlsCfg().ClientCerificate, ldrCfg.TlsCfg().CaCertificate, + ldrCfg.GeneralCfg().ConnectAttempts, ldrCfg.GeneralCfg().Reconnects, + ldrCfg.GeneralCfg().ConnectTimeout, ldrCfg.GeneralCfg().ReplyTimeout, + ldrCfg.DataDbCfg().RmtConns, nil, false) + if err != nil { + log.Fatalf("Coud not confignure dataDB remote connections: %s", err.Error()) } } if len(ldrCfg.DataDbCfg().RplConns) != 0 { @@ -266,7 +262,7 @@ func main() { log.Fatalf("Coud not confignure dataDB replication connections: %s", err.Error()) } } - dm = engine.NewDataManager(d, config.CgrConfig().CacheCfg(), rmtDBConns, rplConns) + dm = engine.NewDataManager(d, config.CgrConfig().CacheCfg(), rmtConns, rplConns) defer dm.DataDB().Close() } diff --git a/config/config_json_test.go b/config/config_json_test.go index e2437d66c..2f84eb902 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -198,7 +198,7 @@ func TestDfDataDbJsonCfg(t *testing.T) { Redis_sentinel: utils.StringPointer(""), Query_timeout: utils.StringPointer("10s"), Replication_conns: &[]*RemoteHostJson{}, - Remote_conns: &[]*DbJsonCfg{}, + Remote_conns: &[]*RemoteHostJson{}, } if cfg, err := dfCgrJsonCfg.DbJsonCfg(DATADB_JSN); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index 206afabf7..68619f8bc 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -420,8 +420,8 @@ func TestCgrCfgJSONDefaultsjsnDataDb(t *testing.T) { if cgrCfg.DataDbCfg().DataDbPass != "" { t.Errorf("Expecting: , recived: %+v", cgrCfg.DataDbCfg().DataDbPass) } - if len(cgrCfg.DataDbCfg().RmtDataDBCfgs) != 0 { - t.Errorf("Expecting: 0, recived: %+v", len(cgrCfg.DataDbCfg().RmtDataDBCfgs)) + if len(cgrCfg.DataDbCfg().RmtConns) != 0 { + t.Errorf("Expecting: 0, recived: %+v", len(cgrCfg.DataDbCfg().RmtConns)) } if len(cgrCfg.DataDbCfg().RplConns) != 0 { t.Errorf("Expecting: 0, recived: %+v", len(cgrCfg.DataDbCfg().RplConns)) diff --git a/config/datadbcfg.go b/config/datadbcfg.go index 3cc9aa874..18eb373dc 100644 --- a/config/datadbcfg.go +++ b/config/datadbcfg.go @@ -36,7 +36,7 @@ type DataDbCfg struct { DataDbPass string // The user's password. DataDbSentinelName string QueryTimeout time.Duration - RmtDataDBCfgs []*DataDbCfg // Remote DataDB configurations + RmtConns []*RemoteHost // Remote DataDB configurations RplConns []*RemoteHost // Replication conns } @@ -76,10 +76,10 @@ func (dbcfg *DataDbCfg) loadFromJsonCfg(jsnDbCfg *DbJsonCfg) (err error) { } } if jsnDbCfg.Remote_conns != nil { - dbcfg.RmtDataDBCfgs = make([]*DataDbCfg, len(*jsnDbCfg.Remote_conns)) + dbcfg.RmtConns = make([]*RemoteHost, len(*jsnDbCfg.Remote_conns)) for i, cfg := range *jsnDbCfg.Remote_conns { - dbcfg.RmtDataDBCfgs[i] = newDefaultDataDbCfg() - if err = dbcfg.RmtDataDBCfgs[i].loadFromJsonCfg(cfg); err != nil { + dbcfg.RmtConns[i] = NewDfltRemoteHost() + if err = dbcfg.RmtConns[i].loadFromJsonCfg(cfg); err != nil { return } } @@ -107,18 +107,3 @@ func (dbcfg *DataDbCfg) Clone() *DataDbCfg { QueryTimeout: dbcfg.QueryTimeout, } } - -func newDefaultDataDbCfg() *DataDbCfg { - return &DataDbCfg{ - DataDbType: utils.REDIS, - DataDbHost: "127.0.0.1", - DataDbPort: "6379", - DataDbName: "10", - DataDbUser: "cgrates", - DataDbPass: "", - DataDbSentinelName: "", - QueryTimeout: 10 * time.Second, - RmtDataDBCfgs: nil, - RplConns: nil, - } -} diff --git a/config/datadbcfg_test.go b/config/datadbcfg_test.go index 5e77e2cae..60708d446 100644 --- a/config/datadbcfg_test.go +++ b/config/datadbcfg_test.go @@ -20,7 +20,6 @@ package config import ( "reflect" "testing" - "time" "github.com/cgrates/cgrates/utils" ) @@ -138,12 +137,7 @@ func TestDataDBRemoteReplication(t *testing.T) { "db_password": "password", // password to use when connecting to data_db "redis_sentinel":"sentinel", // redis_sentinel is the name of sentinel "remote_conns":[ - { - "db_host": "0.0.0.0", // data_db host address - "db_port": 1234, // data_db port to reach the database - "db_name": "1", // data_db database name to connect to - "db_user": "cgrates", // username to use when connecting to data_db - }, + {"address": "127.0.0.1:2022","transport":"*json"}, ], } }` @@ -156,17 +150,10 @@ func TestDataDBRemoteReplication(t *testing.T) { DataDbUser: "cgrates", DataDbPass: "password", DataDbSentinelName: "sentinel", - RmtDataDBCfgs: []*DataDbCfg{ - &DataDbCfg{ - DataDbType: utils.REDIS, - DataDbHost: "0.0.0.0", - DataDbPort: "1234", - DataDbName: "1", - DataDbUser: "cgrates", - DataDbPass: "", - QueryTimeout: 10 * time.Second, - RmtDataDBCfgs: nil, - RplConns: nil, + RmtConns: []*RemoteHost{ + &RemoteHost{ + Address: "127.0.0.1:2022", + Transport: "*json", }, }, } @@ -184,12 +171,7 @@ func TestDataDBRemoteReplication(t *testing.T) { "data_db": { // database used to store runtime data (eg: accounts, cdr stats) "db_type": "*internal", // data_db type: <*redis|*mongo|*internal> "remote_conns":[ - { - "db_host": "0.0.0.0", // data_db host address - "db_port": 1234, // data_db port to reach the database - "db_name": "1", // data_db database name to connect to - "db_user": "cgrates", // username to use when connecting to data_db - }, + {"address": "127.0.0.1:2022","transport":"*json"}, ], "replication_conns":[ {"address": "127.0.0.1:2022","transport":"*json"}, @@ -205,15 +187,10 @@ func TestDataDBRemoteReplication(t *testing.T) { DataDbUser: "cgrates", DataDbPass: "password", DataDbSentinelName: "sentinel", - RmtDataDBCfgs: []*DataDbCfg{ - &DataDbCfg{ - DataDbType: utils.REDIS, - DataDbHost: "0.0.0.0", - DataDbPort: "1234", - DataDbName: "1", - DataDbUser: "cgrates", - DataDbPass: "", - QueryTimeout: 10 * time.Second, + RmtConns: []*RemoteHost{ + &RemoteHost{ + Address: "127.0.0.1:2022", + Transport: "*json", }, }, RplConns: []*RemoteHost{ @@ -237,19 +214,9 @@ func TestDataDBRemoteReplication(t *testing.T) { "data_db": { // database used to store runtime data (eg: accounts, cdr stats) "db_type": "*internal", // data_db type: <*redis|*mongo|*internal> "remote_conns":[ - { - "db_host": "0.0.0.0", // data_db host address - "db_port": 1234, // data_db port to reach the database - "db_name": "1", // data_db database name to connect to - "db_user": "cgrates", // username to use when connecting to data_db - }, - { - "db_type": "*mongo", - "db_host": "1.2.3.4", // data_db host address - "db_port": 1235, // data_db port to reach the database - "db_name": "internal_mongo", // data_db database name to connect to - "db_user": "remote_mongo_user", // username to use when connecting to data_db - }, + {"address": "127.0.0.1:2032","transport":"*json"}, + {"address": "127.0.0.1:2042","transport":"*json"}, + {"address": "127.0.0.1:2052","transport":"*json"}, ], "replication_conns":[ {"address": "127.0.0.1:2032","transport":"*json"}, @@ -267,24 +234,18 @@ func TestDataDBRemoteReplication(t *testing.T) { DataDbUser: "cgrates", DataDbPass: "password", DataDbSentinelName: "sentinel", - RmtDataDBCfgs: []*DataDbCfg{ - &DataDbCfg{ - DataDbType: utils.REDIS, - DataDbHost: "0.0.0.0", - DataDbPort: "1234", - DataDbName: "1", - DataDbUser: "cgrates", - DataDbPass: "", - QueryTimeout: 10 * time.Second, + RmtConns: []*RemoteHost{ + &RemoteHost{ + Address: "127.0.0.1:2032", + Transport: "*json", }, - &DataDbCfg{ - DataDbType: utils.MONGO, - DataDbHost: "1.2.3.4", - DataDbPort: "1235", - DataDbName: "internal_mongo", - DataDbUser: "remote_mongo_user", - DataDbPass: "", - QueryTimeout: 10 * time.Second, + &RemoteHost{ + Address: "127.0.0.1:2042", + Transport: "*json", + }, + &RemoteHost{ + Address: "127.0.0.1:2052", + Transport: "*json", }, }, RplConns: []*RemoteHost{ diff --git a/config/libconfig_json.go b/config/libconfig_json.go index e05f06ac6..b426fafde 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -90,7 +90,7 @@ type DbJsonCfg struct { Redis_sentinel *string Query_timeout *string Sslmode *string // Used only in case of storDb - Remote_conns *[]*DbJsonCfg + Remote_conns *[]*RemoteHostJson Replication_conns *[]*RemoteHostJson } diff --git a/data/conf/samples/internal_mongo/cgrates.json b/data/conf/samples/internal_mongo/cgrates.json deleted file mode 100644 index 927ed5f0b..000000000 --- a/data/conf/samples/internal_mongo/cgrates.json +++ /dev/null @@ -1,137 +0,0 @@ -{ -"general": { - "log_level": 7, - "node_id": "RemoteMongo", - "reply_timeout": "50s" -}, - - -"listen": { - "rpc_json": ":2012", - "rpc_gob": ":2013", - "http": ":2080" -}, - - -"data_db": { - "db_type": "*internal", - "remote_conns": [ - { - "db_type": "mongo", - "db_name": "10", - "db_port": 27017 - } - ], - "replication_conns": [ - {"address": "127.0.0.1:2022", "transport":"*json"} - ] -}, - - -"stor_db": { - "db_type": "*internal" -}, - - -"rals": { - "enabled": true, - "thresholds_conns": [ - {"address": "*internal"} - ], - "max_increments":3000000 -}, - - -"scheduler": { - "enabled": true, - "cdrs_conns": [ - {"address": "*internal"} - ] -}, - - -"cdrs": { - "enabled": true, - "chargers_conns":[ - {"address": "*internal"} - ] -}, - - -"attributes": { - "enabled": true -}, - - -"chargers": { - "enabled": true, - "attributes_conns": [ - {"address": "*internal"} - ] -}, - - -"resources": { - "enabled": true, - "store_interval": "-1", - "thresholds_conns": [ - {"address": "*internal"} - ] -}, - - -"stats": { - "enabled": true, - "store_interval": "-1", - "thresholds_conns": [ - {"address": "*internal"} - ] -}, - -"thresholds": { - "enabled": true, - "store_interval": "-1" -}, - - -"suppliers": { - "enabled": true, - "prefix_indexed_fields":["Destination"], - "stats_conns": [ - {"address": "*internal"} - ], - "resources_conns": [ - {"address": "*internal"} - ] -}, - - -"sessions": { - "enabled": true, - "suppliers_conns": [ - {"address": "*internal"} - ], - "resources_conns": [ - {"address": "*internal"} - ], - "attributes_conns": [ - {"address": "*internal"} - ], - "rals_conns": [ - {"address": "*internal"} - ], - "cdrs_conns": [ - {"address": "*internal"} - ], - "chargers_conns": [ - {"address": "*internal"} - ] -}, - - - "apier": { - "scheduler_conns": [ - {"address": "*internal"} - ] - } -} diff --git a/data/conf/samples/internal_redis/cgrates.json b/data/conf/samples/internal_redis/cgrates.json deleted file mode 100644 index 559394c20..000000000 --- a/data/conf/samples/internal_redis/cgrates.json +++ /dev/null @@ -1,141 +0,0 @@ -{ -// CGRateS Configuration file -// - - -"general": { - "log_level": 7, - "node_id": "RemoteRedis", - "reply_timeout": "50s", -}, - - -"listen": { - "rpc_json": ":2012", - "rpc_gob": ":2013", - "http": ":2080", -}, - - -"data_db": { - "db_type": "*internal", - "remote_conns": [ - { - "db_type": "*redis", - "db_port": 6379, - "db_name": "10", - }, - ], - "replication_conns": [ - {"address": "127.0.0.1:2022", "transport":"*json"} - ] -}, - - -"stor_db": { - "db_type": "*internal", -}, - - -"rals": { - "enabled": true, - "thresholds_conns": [ - {"address": "*internal"}, - ], - "max_increments":3000000, -}, - - -"scheduler": { - "enabled": true, - "cdrs_conns": [ - {"address": "*internal"}, - ], -}, - - -"cdrs": { - "enabled": true, - "chargers_conns":[ - {"address": "*internal"}, - ], -}, - - -"attributes": { - "enabled": true, -}, - - -"chargers": { - "enabled": true, - "attributes_conns": [ - {"address": "*internal"}, - ], -}, - - -"resources": { - "enabled": true, - "store_interval": "-1", - "thresholds_conns": [ - {"address": "*internal"} - ], -}, - - -"stats": { - "enabled": true, - "store_interval": "-1", - "thresholds_conns": [ - {"address": "*internal"} - ], -}, - -"thresholds": { - "enabled": true, - "store_interval": "-1", -}, - - -"suppliers": { - "enabled": true, - "prefix_indexed_fields":["Destination"], - "stats_conns": [ - {"address": "*internal"}, - ], - "resources_conns": [ - {"address": "*internal"}, - ], -}, - - -"sessions": { - "enabled": true, - "suppliers_conns": [ - {"address": "*internal"} - ], - "resources_conns": [ - {"address": "*internal"} - ], - "attributes_conns": [ - {"address": "*internal"} - ], - "rals_conns": [ - {"address": "*internal"}, - ], - "cdrs_conns": [ - {"address": "*internal"} - ], - "chargers_conns": [ - {"address": "*internal"}, - ], -}, - - - "apier": { - "scheduler_conns": [ // connections to SchedulerS for reloads - {"address": "*internal"}, - ], - }, -} diff --git a/data/conf/samples/remote_replication/engine1_mongo/cgrates.json b/data/conf/samples/remote_replication/engine1_mongo/cgrates.json new file mode 100644 index 000000000..ca8ea370b --- /dev/null +++ b/data/conf/samples/remote_replication/engine1_mongo/cgrates.json @@ -0,0 +1,45 @@ +{ +"general": { + "log_level": 7, + "node_id" : "EngineMongo1", +}, + + +"listen": { + "rpc_json": ":2022", + "rpc_gob": ":2023", + "http": ":2280", +}, + + +"data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017, +}, + + +"stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, +}, + + +"rals": { + "enabled": true, +}, + + +"scheduler": { + "enabled": true, +}, + +"apier": { + "scheduler_conns": [ + {"address": "*internal"}, + ], +}, + + +} diff --git a/data/conf/samples/remote_replication/engine1_redis/cgrates.json b/data/conf/samples/remote_replication/engine1_redis/cgrates.json new file mode 100644 index 000000000..172ab11e3 --- /dev/null +++ b/data/conf/samples/remote_replication/engine1_redis/cgrates.json @@ -0,0 +1,42 @@ +{ +"general": { + "log_level": 7, + "node_id": "EngineRedis1" +}, + + +"listen": { + "rpc_json": ":2022", + "rpc_gob": ":2023", + "http": ":2280", +}, + +"data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "10", +}, + +"stor_db": { + "db_password": "CGRateS.org", +}, + + +"rals": { + "enabled": true, +}, + + +"scheduler": { + "enabled": true, +}, + + +"apier": { + "scheduler_conns": [ + {"address": "*internal"}, + ], +}, + + +} diff --git a/data/conf/samples/remote_replication/engine2_mongo/cgrates.json b/data/conf/samples/remote_replication/engine2_mongo/cgrates.json new file mode 100644 index 000000000..4c8e561e4 --- /dev/null +++ b/data/conf/samples/remote_replication/engine2_mongo/cgrates.json @@ -0,0 +1,46 @@ +{ +"general": { + "log_level": 7, + "node_id" : "EngineMongo2", +}, + + +"listen": { + "rpc_json": ":2032", + "rpc_gob": ":2033", + "http": ":2380", +}, + + +"data_db": { + "db_type": "mongo", + "db_name": "11", + "db_port": 27017, +}, + + +"stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, +}, + + +"rals": { + "enabled": true, +}, + + +"scheduler": { + "enabled": true, +}, + + +"apier": { + "scheduler_conns": [ + {"address": "*internal"}, + ], +}, + + +} diff --git a/data/conf/samples/remote_replication/engine2_redis/cgrates.json b/data/conf/samples/remote_replication/engine2_redis/cgrates.json new file mode 100644 index 000000000..e7337f37d --- /dev/null +++ b/data/conf/samples/remote_replication/engine2_redis/cgrates.json @@ -0,0 +1,41 @@ +{ +"general": { + "log_level": 7, + "node_id": "EngineRedis2" +}, + + +"listen": { + "rpc_json": ":2032", + "rpc_gob": ":2033", + "http": ":2380", +}, + +"data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "11", +}, + +"stor_db": { + "db_password": "CGRateS.org", +}, + + +"rals": { + "enabled": true, +}, + + +"scheduler": { + "enabled": true, +}, + + +"apier": { + "scheduler_conns": [ + {"address": "*internal"}, + ], +}, + +} diff --git a/data/conf/samples/remote_replication/internal/cgrates.json b/data/conf/samples/remote_replication/internal/cgrates.json new file mode 100644 index 000000000..cd71475ba --- /dev/null +++ b/data/conf/samples/remote_replication/internal/cgrates.json @@ -0,0 +1,37 @@ +{ +"general": { + "log_level": 7, + "node_id": "InternalEngine", +}, + + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + + +"data_db": { + "db_type": "*internal", + "remote_conns": [ + {"address": "127.0.0.1:2022", "transport":"*json"}, + {"address": "127.0.0.1:2032", "transport":"*json"} + ], + "replication_conns": [ + {"address": "127.0.0.1:2022", "transport":"*json"}, + {"address": "127.0.0.1:2032", "transport":"*json"} + ] +}, + + +"stor_db": { + "db_type": "*internal", +}, + + +"rals": { + "enabled": true, +}, + +} diff --git a/engine/datamanager.go b/engine/datamanager.go index 45b251279..aa9d078bc 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -89,23 +89,22 @@ var ( ) // NewDataManager returns a new DataManager -func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, rmtDataDBs []*DataManager, +func NewDataManager(dataDB DataDB, cacheCfg config.CacheCfg, rmtConns, rplConns *rpcclient.RpcClientPool) *DataManager { return &DataManager{ - dataDB: dataDB, - cacheCfg: cacheCfg, - rmtDataDBs: rmtDataDBs, - rplConns: rplConns, + dataDB: dataDB, + cacheCfg: cacheCfg, + rmtConns: rmtConns, + rplConns: rplConns, } } // DataManager is the data storage manager for CGRateS // transparently manages data retrieval, further serialization and caching type DataManager struct { - dataDB DataDB - rmtDataDBs []*DataManager - cacheCfg config.CacheCfg - rplConns *rpcclient.RpcClientPool + dataDB DataDB + cacheCfg config.CacheCfg + rmtConns, rplConns *rpcclient.RpcClientPool } // DataDB exports access to dataDB @@ -320,14 +319,8 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b func (dm *DataManager) GetAccount(id string) (acc *Account, err error) { acc, err = dm.dataDB.GetAccountDrv(id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if acc, rmtErr = rmtDM.dataDB.GetAccountDrv(id); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetAccount, id, &acc) } if err != nil { return nil, err @@ -351,14 +344,9 @@ func (dm *DataManager) GetStatQueue(tenant, id string, } ssq, err := dm.dataDB.GetStoredStatQueueDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if ssq, rmtErr = rmtDM.dataDB.GetStoredStatQueueDrv(tenant, id); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetStatQueue, + &utils.TenantID{Tenant: tenant, ID: id}, &ssq) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -417,15 +405,9 @@ func (dm *DataManager) GetFilter(tenant, id string, cacheRead, cacheWrite bool, fltr, err = dm.DataDB().GetFilterDrv(tenant, id) } if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if fltr, rmtErr = rmtDM.GetFilter(tenant, id, false, - false, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetFilter, + &utils.TenantID{Tenant: tenant, ID: id}, &fltr) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -472,15 +454,9 @@ func (dm *DataManager) GetThreshold(tenant, id string, } th, err = dm.dataDB.GetThresholdDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if th, rmtErr = rmtDM.GetThreshold(tenant, id, false, - false, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetThreshold, + &utils.TenantID{Tenant: tenant, ID: id}, &th) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -525,15 +501,9 @@ func (dm *DataManager) GetThresholdProfile(tenant, id string, cacheRead, cacheWr } th, err = dm.dataDB.GetThresholdProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if th, rmtErr = rmtDM.GetThresholdProfile(tenant, id, false, - false, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetThresholdProfile, + &utils.TenantID{Tenant: tenant, ID: id}, &th) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -630,15 +600,9 @@ func (dm *DataManager) GetStatQueueProfile(tenant, id string, cacheRead, cacheWr } sqp, err = dm.dataDB.GetStatQueueProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if sqp, rmtErr = rmtDM.GetStatQueueProfile(tenant, id, false, - false, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetStatQueueProfile, + &utils.TenantID{Tenant: tenant, ID: id}, &sqp) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -720,15 +684,9 @@ func (dm *DataManager) GetTiming(id string, skipCache bool, } t, err = dm.dataDB.GetTimingDrv(id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if t, rmtErr = rmtDM.GetTiming(id, false, - utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetTiming, + id, &t) } if err != nil { if err == utils.ErrNotFound { @@ -776,15 +734,9 @@ func (dm *DataManager) GetResource(tenant, id string, cacheRead, cacheWrite bool } rs, err = dm.dataDB.GetResourceDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if rs, rmtErr = rmtDM.GetResource(tenant, id, false, - false, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetResource, + &utils.TenantID{Tenant: tenant, ID: id}, &rs) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -829,15 +781,9 @@ func (dm *DataManager) GetResourceProfile(tenant, id string, cacheRead, cacheWri } rp, err = dm.dataDB.GetResourceProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if rp, rmtErr = rmtDM.GetResourceProfile(tenant, id, false, - false, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetResourceProfile, + &utils.TenantID{Tenant: tenant, ID: id}, &rp) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -918,15 +864,9 @@ func (dm *DataManager) GetActionTriggers(id string, skipCache bool, } attrs, err = dm.dataDB.GetActionTriggersDrv(id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if attrs, rmtErr = rmtDM.GetActionTriggers(id, true, - utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetActionTriggers, + id, attrs) } if err != nil { if err == utils.ErrNotFound { @@ -973,15 +913,9 @@ func (dm *DataManager) GetSharedGroup(key string, skipCache bool, } sg, err = dm.DataDB().GetSharedGroupDrv(key) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if sg, rmtErr = rmtDM.GetSharedGroup(key, true, - utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetShareGroup, + key, &sg) } if err != nil { if err == utils.ErrNotFound { @@ -1031,15 +965,9 @@ func (dm *DataManager) GetActions(key string, skipCache bool, transactionID stri } as, err = dm.DataDB().GetActionsDrv(key) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if as, rmtErr = rmtDM.GetActions(key, true, - utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetActions, + key, &as) } if err != nil { if err == utils.ErrNotFound { @@ -1075,15 +1003,9 @@ func (dm *DataManager) RemoveActions(key, transactionID string) (err error) { func (dm *DataManager) GetActionPlan(key string, skipCache bool, transactionID string) (ats *ActionPlan, err error) { ats, err = dm.dataDB.GetActionPlanDrv(key, skipCache, transactionID) - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if ats, rmtErr = rmtDM.GetActionPlan(key, true, - utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetActionPlan, + key, &ats) } if err != nil { return nil, err @@ -1093,14 +1015,9 @@ func (dm *DataManager) GetActionPlan(key string, skipCache bool, transactionID s func (dm *DataManager) GetAllActionPlans() (ats map[string]*ActionPlan, err error) { ats, err = dm.dataDB.GetAllActionPlansDrv() - if ((err == nil && len(ats) == 0) || err == utils.ErrNotFound) && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if ats, rmtErr = rmtDM.GetAllActionPlans(); rmtErr == nil { - break - } - } - err = rmtErr + if ((err == nil && len(ats) == 0) || err == utils.ErrNotFound) && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetAllActionPlans, + utils.EmptyString, &ats) } if err != nil { return nil, err @@ -1111,14 +1028,9 @@ func (dm *DataManager) GetAllActionPlans() (ats map[string]*ActionPlan, err erro func (dm *DataManager) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) { apIDs, err = dm.dataDB.GetAccountActionPlansDrv(acntID, skipCache, transactionID) - if ((err == nil && len(apIDs) == 0) || err == utils.ErrNotFound) && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if apIDs, rmtErr = rmtDM.dataDB.GetAccountActionPlansDrv(acntID, skipCache, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if ((err == nil && len(apIDs) == 0) || err == utils.ErrNotFound) && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetAccountActionPlans, + acntID, &apIDs) } if err != nil { return nil, err @@ -1138,15 +1050,9 @@ func (dm *DataManager) GetRatingPlan(key string, skipCache bool, } rp, err = dm.DataDB().GetRatingPlanDrv(key) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if rp, rmtErr = rmtDM.GetRatingPlan(key, true, - utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetRatingPlan, + key, &rp) } if err != nil { if err == utils.ErrNotFound { @@ -1192,15 +1098,9 @@ func (dm *DataManager) GetRatingProfile(key string, skipCache bool, } rpf, err = dm.DataDB().GetRatingProfileDrv(key) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if rpf, rmtErr = rmtDM.GetRatingProfile(key, true, - utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetRatingProfile, + key, &rpf) } if err != nil { if err == utils.ErrNotFound { @@ -1243,16 +1143,16 @@ func (dm *DataManager) HasData(category, subject, tenant string) (has bool, err func (dm *DataManager) GetFilterIndexes(cacheID, itemIDPrefix, filterType string, fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) { if indexes, err = dm.DataDB().GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType, fldNameVal); err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if indexes, rmtErr = rmtDM.GetFilterIndexes(cacheID, itemIDPrefix, - filterType, fldNameVal); rmtErr == nil { - break - } - } - err = rmtErr - } + //if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { + // var rmtErr error + // for _, rmtDM := range dm.rmtDataDBs { + // if indexes, rmtErr = rmtDM.GetFilterIndexes(cacheID, itemIDPrefix, + // filterType, fldNameVal); rmtErr == nil { + // break + // } + // } + // err = rmtErr + //} if err != nil { return nil, err } @@ -1302,16 +1202,16 @@ func (dm *DataManager) MatchFilterIndex(cacheID, itemIDPrefix, // Not found in cache, check in DB itemIDs, err = dm.DataDB().MatchFilterIndexDrv(cacheID, itemIDPrefix, filterType, fieldName, fieldVal) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if itemIDs, rmtErr = rmtDM.MatchFilterIndex(cacheID, itemIDPrefix, - filterType, fieldName, fieldVal); rmtErr == nil { - break - } - } - err = rmtErr - } + //if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { + // var rmtErr error + // for _, rmtDM := range dm.rmtDataDBs { + // if itemIDs, rmtErr = rmtDM.MatchFilterIndex(cacheID, itemIDPrefix, + // filterType, fieldName, fieldVal); rmtErr == nil { + // break + // } + // } + // err = rmtErr + //} if err != nil { if err == utils.ErrNotFound { Cache.Set(cacheID, fieldValKey, nil, nil, @@ -1339,15 +1239,9 @@ func (dm *DataManager) GetSupplierProfile(tenant, id string, cacheRead, cacheWri } supp, err = dm.dataDB.GetSupplierProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if supp, rmtErr = rmtDM.GetSupplierProfile(tenant, id, false, - false, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetSupplierProfile, + &utils.TenantID{Tenant: tenant, ID: id}, &supp) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -1433,15 +1327,9 @@ func (dm *DataManager) GetAttributeProfile(tenant, id string, cacheRead, cacheWr } attrPrfl, err = dm.dataDB.GetAttributeProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if attrPrfl, rmtErr = rmtDM.GetAttributeProfile(tenant, id, false, - false, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetAttributeProfile, + &utils.TenantID{Tenant: tenant, ID: id}, &attrPrfl) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -1536,15 +1424,9 @@ func (dm *DataManager) GetChargerProfile(tenant, id string, cacheRead, cacheWrit } cpp, err = dm.dataDB.GetChargerProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if cpp, rmtErr = rmtDM.GetChargerProfile(tenant, id, false, - false, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetChargerProfile, + &utils.TenantID{Tenant: tenant, ID: id}, &cpp) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -1627,15 +1509,9 @@ func (dm *DataManager) GetDispatcherProfile(tenant, id string, cacheRead, cacheW } dpp, err = dm.dataDB.GetDispatcherProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if dpp, rmtErr = rmtDM.GetDispatcherProfile(tenant, id, false, - false, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetDispatcherProfile, + &utils.TenantID{Tenant: tenant, ID: id}, &dpp) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -1727,15 +1603,9 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit } dH, err = dm.dataDB.GetDispatcherHostDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if dH, rmtErr = rmtDM.GetDispatcherHost(tenant, id, false, - false, utils.NonTransactional); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetDispatcherHost, + &utils.TenantID{Tenant: tenant, ID: id}, &dH) } if err != nil { if err == utils.ErrNotFound && cacheWrite { @@ -1788,14 +1658,9 @@ func (dm *DataManager) RemoveDispatcherHost(tenant, id string, func (dm *DataManager) GetItemLoadIDs(itemIDPrefix string, cacheWrite bool) (loadIDs map[string]int64, err error) { loadIDs, err = dm.DataDB().GetItemLoadIDsDrv(itemIDPrefix) if err != nil { - if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - var rmtErr error - for _, rmtDM := range dm.rmtDataDBs { - if loadIDs, rmtErr = rmtDM.GetItemLoadIDs(itemIDPrefix, false); rmtErr == nil { - break - } - } - err = rmtErr + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetItemLoadIDs, + itemIDPrefix, &loadIDs) } if err != nil { if err == utils.ErrNotFound && cacheWrite { diff --git a/engine/libtest.go b/engine/libtest.go index 916f043c1..83d46cd15 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -292,33 +292,7 @@ func InitDataDb(cfg *config.CGRConfig) error { if err != nil { return err } - var rmtDBConns []*DataManager - var rplConns *rpcclient.RpcClientPool - if len(cfg.DataDbCfg().RmtDataDBCfgs) != 0 { - rmtDBConns = make([]*DataManager, len(cfg.DataDbCfg().RmtDataDBCfgs)) - for i, dbCfg := range cfg.DataDbCfg().RmtDataDBCfgs { - dbConn, err := NewDataDBConn(dbCfg.DataDbType, - dbCfg.DataDbHost, dbCfg.DataDbPort, - dbCfg.DataDbName, dbCfg.DataDbUser, - dbCfg.DataDbPass, cfg.GeneralCfg().DBDataEncoding, - dbCfg.DataDbSentinelName) - if err != nil { - return err - } - rmtDBConns[i] = NewDataManager(dbConn, nil, nil, nil) - } - } - if len(cfg.DataDbCfg().RplConns) != 0 { - rplConns, err = NewRPCPool(rpcclient.POOL_BROADCAST, cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DataDbCfg().RplConns, nil, false) - if err != nil { - return err - } - } - dm := NewDataManager(d, cfg.CacheCfg(), rmtDBConns, rplConns) + dm := NewDataManager(d, cfg.CacheCfg(), nil, nil) if err := dm.DataDB().Flush(""); err != nil { return err diff --git a/engine/tpreader.go b/engine/tpreader.go index 0cc105f8c..4d20069c2 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -68,20 +68,16 @@ type TpReader struct { func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string, cacheS rpcclient.RpcClientConnection, schedulerS rpcclient.RpcClientConnection) (*TpReader, error) { - var rmtDBConns []*DataManager - var rplConns *rpcclient.RpcClientPool - if len(config.CgrConfig().DataDbCfg().RmtDataDBCfgs) != 0 { - rmtDBConns = make([]*DataManager, len(config.CgrConfig().DataDbCfg().RmtDataDBCfgs)) - for i, dbCfg := range config.CgrConfig().DataDbCfg().RmtDataDBCfgs { - dbConn, err := NewDataDBConn(dbCfg.DataDbType, - dbCfg.DataDbHost, dbCfg.DataDbPort, - dbCfg.DataDbName, dbCfg.DataDbUser, - dbCfg.DataDbPass, config.CgrConfig().GeneralCfg().DBDataEncoding, - dbCfg.DataDbSentinelName) - if err != nil { - return nil, err - } - rmtDBConns[i] = NewDataManager(dbConn, nil, nil, nil) + var rmtConns, rplConns *rpcclient.RpcClientPool + if len(config.CgrConfig().DataDbCfg().RmtConns) != 0 { + var err error + rmtConns, err = NewRPCPool(rpcclient.POOL_FIRST, config.CgrConfig().TlsCfg().ClientKey, + config.CgrConfig().TlsCfg().ClientCerificate, config.CgrConfig().TlsCfg().CaCertificate, + config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects, + config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout, + config.CgrConfig().DataDbCfg().RmtConns, nil, false) + if err != nil { + return nil, err } } if len(config.CgrConfig().DataDbCfg().RplConns) != 0 { @@ -98,7 +94,7 @@ func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string, tpr := &TpReader{ tpid: tpid, timezone: timezone, - dm: NewDataManager(db, config.CgrConfig().CacheCfg(), rmtDBConns, rplConns), // ToDo: add ChacheCfg as parameter to the NewTpReader + dm: NewDataManager(db, config.CgrConfig().CacheCfg(), rmtConns, rplConns), // ToDo: add ChacheCfg as parameter to the NewTpReader lr: lr, cacheS: cacheS, schedulerS: schedulerS, diff --git a/services/datadb.go b/services/datadb.go index 1f866ae79..4c5968910 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -68,20 +68,16 @@ func (db *DataDBService) Start() (err error) { utils.Logger.Warning(fmt.Sprintf("Could not configure dataDb: %s.Some SessionS APIs will not work", err)) return } - var rmtDBConns []*engine.DataManager - var rplConns *rpcclient.RpcClientPool - if len(db.cfg.DataDbCfg().RmtDataDBCfgs) != 0 { - rmtDBConns = make([]*engine.DataManager, len(db.cfg.DataDbCfg().RmtDataDBCfgs)) - for i, dbCfg := range db.cfg.DataDbCfg().RmtDataDBCfgs { - dbConn, err := engine.NewDataDBConn(dbCfg.DataDbType, - dbCfg.DataDbHost, dbCfg.DataDbPort, - dbCfg.DataDbName, dbCfg.DataDbUser, - dbCfg.DataDbPass, db.cfg.GeneralCfg().DBDataEncoding, - dbCfg.DataDbSentinelName) - if err != nil { - log.Fatalf("Coud not open dataDB connection: %s", err.Error()) - } - rmtDBConns[i] = engine.NewDataManager(dbConn, nil, nil, nil) + var rmtConns, rplConns *rpcclient.RpcClientPool + if len(db.cfg.DataDbCfg().RmtConns) != 0 { + var err error + rmtConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, db.cfg.TlsCfg().ClientKey, + db.cfg.TlsCfg().ClientCerificate, db.cfg.TlsCfg().CaCertificate, + db.cfg.GeneralCfg().ConnectAttempts, db.cfg.GeneralCfg().Reconnects, + db.cfg.GeneralCfg().ConnectTimeout, db.cfg.GeneralCfg().ReplyTimeout, + db.cfg.DataDbCfg().RmtConns, nil, false) + if err != nil { + log.Fatalf("Coud not confignure dataDB remote connections: %s", err.Error()) } } if len(config.CgrConfig().DataDbCfg().RplConns) != 0 { @@ -95,7 +91,7 @@ func (db *DataDBService) Start() (err error) { log.Fatalf("Coud not confignure dataDB replication connections: %s", err.Error()) } } - db.db = engine.NewDataManager(d, db.cfg.CacheCfg(), rmtDBConns, rplConns) + db.db = engine.NewDataManager(d, db.cfg.CacheCfg(), rmtConns, rplConns) engine.SetDataStorage(db.db) if err = engine.CheckVersions(db.db.DataDB()); err != nil { fmt.Println(err) diff --git a/utils/consts.go b/utils/consts.go index 7bb2cb5d8..d9fc551fb 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -792,9 +792,32 @@ const ( // ReplicatorSv1 APIs const ( - ReplicatorSv1 = "ReplicatorSv1" - ReplicatorSv1Ping = "ReplicatorSv1.Ping" - ReplicatorSv1SetThresholdProfile = "ReplicatorSv1.SetThresholdProfile" + ReplicatorSv1 = "ReplicatorSv1" + ReplicatorSv1Ping = "ReplicatorSv1.Ping" + ReplicatorSv1GetAccount = "ReplicatorSv1.GetAccount" + ReplicatorSv1GetStatQueue = "ReplicatorSv1.GetStatQueue" + ReplicatorSv1GetFilter = "ReplicatorSv1.GetFilter" + ReplicatorSv1GetThreshold = "ReplicatorSv1.GetThreshold" + ReplicatorSv1GetThresholdProfile = "ReplicatorSv1.GetThresholdProfile" + ReplicatorSv1GetStatQueueProfile = "ReplicatorSv1.GetStatQueueProfile" + ReplicatorSv1GetTiming = "ReplicatorSv1.GetTiming" + ReplicatorSv1GetResource = "ReplicatorSv1.GetResource" + ReplicatorSv1GetResourceProfile = "ReplicatorSv1.GetResourceProfile" + ReplicatorSv1GetActionTriggers = "ReplicatorSv1.GetActionTriggers" + ReplicatorSv1GetShareGroup = "ReplicatorSv1.GetShareGroup" + ReplicatorSv1GetActions = "ReplicatorSv1.GetActions" + ReplicatorSv1GetActionPlan = "ReplicatorSv1.GetActionPlan" + ReplicatorSv1GetAllActionPlans = "ReplicatorSv1.GetAllActionPlans" + ReplicatorSv1GetAccountActionPlans = "ReplicatorSv1.GetAccountActionPlans" + ReplicatorSv1GetRatingPlan = "ReplicatorSv1.GetRatingPlan" + ReplicatorSv1GetRatingProfile = "ReplicatorSv1.GetRatingProfile" + ReplicatorSv1GetSupplierProfile = "ReplicatorSv1.GetSupplierProfile" + ReplicatorSv1GetAttributeProfile = "ReplicatorSv1.GetAttributeProfile" + ReplicatorSv1GetChargerProfile = "ReplicatorSv1.GetChargerProfile" + ReplicatorSv1GetDispatcherProfile = "ReplicatorSv1.GetDispatcherProfile" + ReplicatorSv1GetDispatcherHost = "ReplicatorSv1.GetDispatcheHost" + ReplicatorSv1GetItemLoadIDs = "ReplicatorSv1.GetItemLoadIDs" + ReplicatorSv1SetThresholdProfile = "ReplicatorSv1.SetThresholdProfile" ) // ApierV1 APIs