diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index b1f21d4b4..4f8e17be6 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -126,122 +126,67 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } // Connection to HistoryS, - // FixMe via multiple connections - var ralsHistoryServer string - for _, connCfg := range cfg.RALsHistorySConns { - ralsHistoryServer = connCfg.Address - break - } - if ralsHistoryServer != "" { + if len(cfg.RALsHistorySConns) != 0 { histTaskChan := make(chan struct{}) waitTasks = append(waitTasks, histTaskChan) go func() { defer close(histTaskChan) - var scribeServer rpcclient.RpcClientConnection - if ralsHistoryServer == utils.INTERNAL { - select { - case scribeServer = <-internalHistorySChan: - internalHistorySChan <- scribeServer - case <-time.After(cfg.InternalTtl): - utils.Logger.Crit(": Internal historys connection timeout.") - exitChan <- true - return - } - } else if scribeServer, err = rpcclient.NewRpcClient("tcp", ralsHistoryServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect historys, error: %s", err.Error())) + if historySConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.RALsHistorySConns, internalHistorySChan, cfg.InternalTtl); err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect HistoryS, error: %s", err.Error())) exitChan <- true return + } else { + engine.SetHistoryScribe(historySConns) } - engine.SetHistoryScribe(scribeServer) }() } // Connection to pubsubs - var ralsPubSubServer string - for _, connCfg := range cfg.RALsPubSubSConns { - ralsPubSubServer = connCfg.Address - break - } - if ralsPubSubServer != "" { + if len(cfg.RALsPubSubSConns) != 0 { pubsubTaskChan := make(chan struct{}) waitTasks = append(waitTasks, pubsubTaskChan) go func() { defer close(pubsubTaskChan) - var pubSubServer rpcclient.RpcClientConnection - if ralsPubSubServer == utils.INTERNAL { - select { - case pubSubServer = <-internalPubSubSChan: - internalPubSubSChan <- pubSubServer - case <-time.After(cfg.InternalTtl): - utils.Logger.Crit(": Internal pubsub connection timeout.") - exitChan <- true - return - } - } else if pubSubServer, err = rpcclient.NewRpcClient("tcp", ralsPubSubServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to pubsubs: %s", err.Error())) + if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.RALsPubSubSConns, internalPubSubSChan, cfg.InternalTtl); err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubS: %s", err.Error())) exitChan <- true return + } else { + engine.SetPubSub(pubSubSConns) } - engine.SetPubSub(pubSubServer) }() } - // Connection to AliasService - var ralsAliasServer string - for _, connCfg := range cfg.RALsAliasSConns { - ralsAliasServer = connCfg.Address - break - } - if ralsAliasServer != "" { + if len(cfg.RALsAliasSConns) != 0 { aliasesTaskChan := make(chan struct{}) waitTasks = append(waitTasks, aliasesTaskChan) go func() { defer close(aliasesTaskChan) - var aliasesServer rpcclient.RpcClientConnection - if ralsAliasServer == utils.INTERNAL { - select { - case aliasesServer = <-internalAliaseSChan: - internalAliaseSChan <- aliasesServer - case <-time.After(cfg.InternalTtl): - utils.Logger.Crit(": Internal aliases connection timeout.") - exitChan <- true - return - } - } else if aliasesServer, err = rpcclient.NewRpcClient("tcp", ralsAliasServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to aliases, error: %s", err.Error())) + if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.RALsAliasSConns, internalAliaseSChan, cfg.InternalTtl); err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to AliaseS, error: %s", err.Error())) exitChan <- true return + } else { + engine.SetAliasService(aliaseSCons) } - engine.SetAliasService(aliasesServer) }() } - // Connection to UserService - var ralsUserServer string - for _, connCfg := range cfg.RALsUserSConns { - ralsUserServer = connCfg.Address - break - } - var userServer rpcclient.RpcClientConnection - if ralsUserServer != "" { + var usersConns rpcclient.RpcClientConnection + if len(cfg.RALsUserSConns) != 0 { usersTaskChan := make(chan struct{}) waitTasks = append(waitTasks, usersTaskChan) go func() { defer close(usersTaskChan) - if ralsUserServer == utils.INTERNAL { - select { - case userServer = <-internalUserSChan: - internalUserSChan <- userServer - case <-time.After(cfg.InternalTtl): - utils.Logger.Crit(": Internal users connection timeout.") - exitChan <- true - return - } - } else if userServer, err = rpcclient.NewRpcClient("tcp", ralsUserServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect users, error: %s", err.Error())) + if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.RALsAliasSConns, internalAliaseSChan, cfg.InternalTtl); err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect UserS, error: %s", err.Error())) exitChan <- true return } - engine.SetUserService(userServer) + engine.SetUserService(usersConns) }() } @@ -253,7 +198,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC responder := &engine.Responder{Bal: bal, ExitChan: exitChan, Stats: cdrStats} responder.SetTimeToLive(cfg.ResponseCacheTTL, nil) apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Sched: sched, - Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: userServer} + Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: usersConns} apierRpcV2 := &v2.ApierV2{ ApierV1: *apierRpcV1} // internalSchedulerChan shared here