diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 94434b5bb..e6e62711f 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -817,7 +817,7 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach } cs.CdrStats = len(queueIds) } - if self.Config.RaterUserServer == utils.INTERNAL { + if self.Users != nil { var ups engine.UserProfiles if err := self.Users.Call("UsersV1.GetUsers", &engine.UserProfile{}, &ups); err != nil { return utils.NewErrServerError(err) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e5fd78bcd..00b983106 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -104,25 +104,16 @@ func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConn // Fires up a cdrc instance func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, closeChan chan struct{}, exitChan chan bool) { - var cdrsConn rpcclient.RpcClientConnection var cdrcCfg *config.CdrcConfig for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break } - if cdrcCfg.Cdrs == utils.INTERNAL { - cdrsChan := <-internalCdrSChan // This will signal that the cdrs part is populated in internalRaterChan - internalCdrSChan <- cdrsChan // Put it back for other components - resp := <-internalRaterChan - cdrsConn = resp - internalRaterChan <- resp - } else { - conn, err := rpcclient.NewRpcClient("tcp", cdrcCfg.Cdrs, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) - exitChan <- true - return - } - cdrsConn = conn + cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cdrcCfg.CdrsConns, internalCdrSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %s", err.Error())) + exitChan <- true + return } cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.DefaultTimezone) if err != nil { @@ -139,18 +130,18 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS SM-Generic service.") ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.SmGenericConfig.RaterConns, internalRaterChan) + cfg.SmGenericConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) exitChan <- true return } var cdrsConn *rpcclient.RpcClientPool - if reflect.DeepEqual(cfg.SmGenericConfig.RaterConns, cfg.SmGenericConfig.CdrsConns) { + if reflect.DeepEqual(cfg.SmGenericConfig.RALsConns, cfg.SmGenericConfig.CDRsConns) { cdrsConn = ralConn } else { cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.SmGenericConfig.CdrsConns, internalCDRSChan) + cfg.SmGenericConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) exitChan <- true @@ -179,18 +170,23 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { utils.Logger.Info("Starting CGRateS DiameterAgent service.") smgConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.DiameterAgentCfg().SMGenericConns, internalSMGChan) + cfg.DiameterAgentCfg().SMGenericConns, internalSMGChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to SMG: %s", err.Error())) exitChan <- true return } - pubsubConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.DiameterAgentCfg().PubSubConns, internalPubSubSChan) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubS: %s", err.Error())) - exitChan <- true - return + var pubsubConn *rpcclient.RpcClientPool + if reflect.DeepEqual(cfg.DiameterAgentCfg().SMGenericConns, cfg.DiameterAgentCfg().PubSubConns) { + pubsubConn = smgConn + } else { + pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.DiameterAgentCfg().PubSubConns, internalPubSubSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubS: %s", err.Error())) + exitChan <- true + return + } } da, err := agents.NewDiameterAgent(cfg, smgConn, pubsubConn) if err != nil { @@ -205,68 +201,83 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC } func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { - utils.Logger.Info("Starting CGRateS SM-FreeSWITCH service.") + utils.Logger.Info("Starting CGRateS SMFreeSWITCH service.") ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.SmFsConfig.RaterConns, internalRaterChan) + cfg.SmFsConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) + utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) exitChan <- true return } - cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.SmFsConfig.CdrsConns, internalRaterChan) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) - exitChan <- true - return + var cdrsConn *rpcclient.RpcClientPool + if reflect.DeepEqual(cfg.SmFsConfig.RALsConns, cfg.SmFsConfig.CDRsConns) { + cdrsConn = ralConn + } else { + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.SmFsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) + exitChan <- true + return + } } sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, ralConn, cdrsConn, cfg.DefaultTimezone) smRpc.SMs = append(smRpc.SMs, sm) if err = sm.Connect(); err != nil { - utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) + utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) } exitChan <- true } func startSmKamailio(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { - utils.Logger.Info("Starting CGRateS SM-Kamailio service.") + utils.Logger.Info("Starting CGRateS SMKamailio service.") ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.SmKamConfig.RaterConns, internalRaterChan) + cfg.SmKamConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) + utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) exitChan <- true return } - cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.SmKamConfig.CdrsConns, internalRaterChan) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) - exitChan <- true - return + var cdrsConn *rpcclient.RpcClientPool + if reflect.DeepEqual(cfg.SmKamConfig.RALsConns, cfg.SmKamConfig.CDRsConns) { + cdrsConn = ralConn + } else { + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.SmKamConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) + exitChan <- true + return + } } sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, ralConn, cdrsConn, cfg.DefaultTimezone) smRpc.SMs = append(smRpc.SMs, sm) if err = sm.Connect(); err != nil { - utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) + utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) } exitChan <- true } func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { - utils.Logger.Info("Starting CGRateS SM-OpenSIPS service.") + utils.Logger.Info("Starting CGRateS SMOpenSIPS service.") ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.SmOsipsConfig.RaterConns, internalRaterChan) + cfg.SmOsipsConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) + utils.Logger.Crit(fmt.Sprintf(" Could not connect to RALs: %s", err.Error())) exitChan <- true return } - cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.SmOsipsConfig.CdrsConns, internalRaterChan) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) - exitChan <- true - return + var cdrsConn *rpcclient.RpcClientPool + if reflect.DeepEqual(cfg.SmOsipsConfig.RALsConns, cfg.SmOsipsConfig.CDRsConns) { + cdrsConn = ralConn + } else { + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.SmOsipsConfig.CDRsConns, internalRaterChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRs: %s", err.Error())) + exitChan <- true + return + } } sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, cfg.Reconnects, ralConn, cdrsConn, cfg.DefaultTimezone) smRpc.SMs = append(smRpc.SMs, sm) @@ -283,45 +294,64 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine utils.Logger.Info("Starting CGRateS CDRS service.") // Conn pool towards RAL ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.CDRSRaterConns, internalRaterChan) + cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) exitChan <- true return } // Pubsub connection init - pubSubConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.CDRSPubSubSConns, internalPubSubSChan) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubSystem: %s", err.Error())) - exitChan <- true - return + var pubSubConn *rpcclient.RpcClientPool + if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSPubSubSConns) { + pubSubConn = ralConn + } else { + pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.CDRSPubSubSConns, internalPubSubSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubSystem: %s", err.Error())) + exitChan <- true + return + } } // Users connection init - usersConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.CDRSUserSConns, internalUserSChan) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to UserS: %s", err.Error())) - exitChan <- true - return + var usersConn *rpcclient.RpcClientPool + if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSUserSConns) { + pubSubConn = ralConn + } else { + usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to UserS: %s", err.Error())) + exitChan <- true + return + } } // Aliases connection init - aliasesConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.CDRSAliaseSConns, internalAliaseSChan) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to AliaseS: %s", err.Error())) - exitChan <- true - return + var aliasesConn *rpcclient.RpcClientPool + if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSAliaseSConns) { + pubSubConn = ralConn + } else { + aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.CDRSAliaseSConns, internalAliaseSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to AliaseS: %s", err.Error())) + exitChan <- true + return + } } // Stats connection init - statsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, - cfg.CDRSStatSConns, internalCdrStatSChan) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) - exitChan <- true - return + var statsConn *rpcclient.RpcClientPool + if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSStatSConns) { + pubSubConn = ralConn + } else { + statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.CDRSStatSConns, internalCdrStatSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) + exitChan <- true + return + } } - cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, ralConn, pubSubConn, usersConn, aliasesConn, statsConn) cdrServer.SetTimeToLive(cfg.ResponseCacheTTL, nil) utils.Logger.Info("Registering CDRS HTTP Handlers.") @@ -459,7 +489,7 @@ func main() { } config.SetCgrConfig(cfg) // Share the config object if *raterEnabled { - cfg.RaterEnabled = *raterEnabled + cfg.RALsEnabled = *raterEnabled } if *schedEnabled { cfg.SchedulerEnabled = *schedEnabled @@ -472,7 +502,7 @@ func main() { var logDb engine.LogStorage var loadDb engine.LoadStorage var cdrDb engine.CdrStorage - if cfg.RaterEnabled || cfg.SchedulerEnabled || cfg.CDRStatsEnabled { // Only connect to dataDb if necessary + if cfg.RALsEnabled || cfg.SchedulerEnabled || cfg.CDRStatsEnabled { // Only connect to dataDb if necessary ratingDb, err = engine.ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort, cfg.TpDbName, cfg.TpDbUser, cfg.TpDbPass, cfg.DBDataEncoding) if err != nil { // Cannot configure getter database, show stopper @@ -482,7 +512,7 @@ func main() { defer ratingDb.Close() engine.SetRatingStorage(ratingDb) } - if cfg.RaterEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled || cfg.AliasesServerEnabled || cfg.UserServerEnabled { + if cfg.RALsEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled || cfg.AliasesServerEnabled || cfg.UserServerEnabled { accountDb, err = engine.ConfigureAccountingStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort, cfg.DataDbName, cfg.DataDbUser, cfg.DataDbPass, cfg.DBDataEncoding) if err != nil { // Cannot configure getter database, show stopper @@ -492,7 +522,7 @@ func main() { defer accountDb.Close() engine.SetAccountingStorage(accountDb) } - if cfg.RaterEnabled || cfg.CDRSEnabled || cfg.SchedulerEnabled { // Only connect to storDb if necessary + if cfg.RALsEnabled || cfg.CDRSEnabled || cfg.SchedulerEnabled { // Only connect to storDb if necessary logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns, cfg.StorDBCDRSIndexes) if err != nil { // Cannot configure logger database, show stopper @@ -536,7 +566,7 @@ func main() { } // Start rater service - if cfg.RaterEnabled { + if cfg.RALsEnabled { go startRater(internalRaterChan, cacheDoneChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, server, ratingDb, accountDb, loadDb, cdrDb, logDb, &stopHandled, exitChan) } diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index e56892ee4..b1f21d4b4 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -87,12 +87,12 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC // Connection to balancer var bal *balancer2go.Balancer - if cfg.RaterBalancer != "" { + if cfg.RALsBalancer != "" { balTaskChan := make(chan struct{}) waitTasks = append(waitTasks, balTaskChan) go func() { defer close(balTaskChan) - if cfg.RaterBalancer == utils.INTERNAL { + if cfg.RALsBalancer == utils.INTERNAL { select { case bal = <-internalBalancerChan: internalBalancerChan <- bal // Put it back if someone else is interested about @@ -108,39 +108,37 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } }() } - - // Connection to CDRStats - var cdrStats rpcclient.RpcClientConnection - if cfg.RaterCdrStats != "" { + // Connections to CDRStats + var cdrStats *rpcclient.RpcClientPool + if len(cfg.RALsCDRStatSConns) != 0 { cdrstatTaskChan := make(chan struct{}) waitTasks = append(waitTasks, cdrstatTaskChan) go func() { defer close(cdrstatTaskChan) - if cfg.RaterCdrStats == utils.INTERNAL { - select { - case cdrStats = <-internalCdrStatSChan: - internalCdrStatSChan <- cdrStats - case <-time.After(cfg.InternalTtl): - utils.Logger.Crit(": Internal cdrstats connection timeout.") - exitChan <- true - return - } - } else if cdrStats, err = rpcclient.NewRpcClient("tcp", cfg.RaterCdrStats, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to cdrstats, error: %s", err.Error())) + cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cfg.CDRSRaterConns, internalCdrStatSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRStatS, error: %s", err.Error())) exitChan <- true return } }() } - // Connection to HistoryS - if cfg.RaterHistoryServer != "" { + // Connection to HistoryS, + // FixMe via multiple connections + var ralsHistoryServer string + for _, connCfg := range cfg.RALsHistorySConns { + ralsHistoryServer = connCfg.Address + break + } + if ralsHistoryServer != "" { histTaskChan := make(chan struct{}) waitTasks = append(waitTasks, histTaskChan) go func() { defer close(histTaskChan) var scribeServer rpcclient.RpcClientConnection - if cfg.RaterHistoryServer == utils.INTERNAL { + if ralsHistoryServer == utils.INTERNAL { select { case scribeServer = <-internalHistorySChan: internalHistorySChan <- scribeServer @@ -149,7 +147,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC exitChan <- true return } - } else if scribeServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterHistoryServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { + } else if scribeServer, err = rpcclient.NewRpcClient("tcp", ralsHistoryServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect historys, error: %s", err.Error())) exitChan <- true return @@ -157,15 +155,19 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC engine.SetHistoryScribe(scribeServer) }() } - // Connection to pubsubs - if cfg.RaterPubSubServer != "" { + var ralsPubSubServer string + for _, connCfg := range cfg.RALsPubSubSConns { + ralsPubSubServer = connCfg.Address + break + } + if ralsPubSubServer != "" { pubsubTaskChan := make(chan struct{}) waitTasks = append(waitTasks, pubsubTaskChan) go func() { defer close(pubsubTaskChan) var pubSubServer rpcclient.RpcClientConnection - if cfg.RaterPubSubServer == utils.INTERNAL { + if ralsPubSubServer == utils.INTERNAL { select { case pubSubServer = <-internalPubSubSChan: internalPubSubSChan <- pubSubServer @@ -174,7 +176,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC exitChan <- true return } - } else if pubSubServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterPubSubServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { + } else if pubSubServer, err = rpcclient.NewRpcClient("tcp", ralsPubSubServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to pubsubs: %s", err.Error())) exitChan <- true return @@ -184,13 +186,18 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } // Connection to AliasService - if cfg.RaterAliasesServer != "" { + var ralsAliasServer string + for _, connCfg := range cfg.RALsAliasSConns { + ralsAliasServer = connCfg.Address + break + } + if ralsAliasServer != "" { aliasesTaskChan := make(chan struct{}) waitTasks = append(waitTasks, aliasesTaskChan) go func() { defer close(aliasesTaskChan) var aliasesServer rpcclient.RpcClientConnection - if cfg.RaterAliasesServer == utils.INTERNAL { + if ralsAliasServer == utils.INTERNAL { select { case aliasesServer = <-internalAliaseSChan: internalAliaseSChan <- aliasesServer @@ -199,7 +206,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC exitChan <- true return } - } else if aliasesServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterAliasesServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { + } else if aliasesServer, err = rpcclient.NewRpcClient("tcp", ralsAliasServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to aliases, error: %s", err.Error())) exitChan <- true return @@ -209,13 +216,18 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } // Connection to UserService + var ralsUserServer string + for _, connCfg := range cfg.RALsUserSConns { + ralsUserServer = connCfg.Address + break + } var userServer rpcclient.RpcClientConnection - if cfg.RaterUserServer != "" { + if ralsUserServer != "" { usersTaskChan := make(chan struct{}) waitTasks = append(waitTasks, usersTaskChan) go func() { defer close(usersTaskChan) - if cfg.RaterUserServer == utils.INTERNAL { + if ralsUserServer == utils.INTERNAL { select { case userServer = <-internalUserSChan: internalUserSChan <- userServer @@ -224,7 +236,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC exitChan <- true return } - } else if userServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterUserServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { + } else if userServer, err = rpcclient.NewRpcClient("tcp", ralsUserServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect users, error: %s", err.Error())) exitChan <- true return diff --git a/cmd/cgr-engine/registration.go b/cmd/cgr-engine/registration.go index a25514911..de7a724f1 100644 --- a/cmd/cgr-engine/registration.go +++ b/cmd/cgr-engine/registration.go @@ -83,14 +83,14 @@ func stopRaterSignalHandler(internalCdrStatSChan chan rpcclient.RpcClientConnect Connects to the balancer and calls unregister RPC method. */ func unregisterFromBalancer(exitChan chan bool) { - client, err := rpc.Dial("tcp", cfg.RaterBalancer) + client, err := rpc.Dial("tcp", cfg.RALsBalancer) if err != nil { utils.Logger.Crit("Cannot contact the balancer!") exitChan <- true return } var reply int - utils.Logger.Info(fmt.Sprintf("Unregistering from balancer %s", cfg.RaterBalancer)) + utils.Logger.Info(fmt.Sprintf("Unregistering from balancer %s", cfg.RALsBalancer)) client.Call("Responder.UnRegisterRater", cfg.RPCGOBListen, &reply) if err := client.Close(); err != nil { utils.Logger.Crit("Could not close balancer unregistration!") @@ -102,14 +102,14 @@ func unregisterFromBalancer(exitChan chan bool) { Connects to the balancer and rehisters the engine to the server. */ func registerToBalancer(exitChan chan bool) { - client, err := rpc.Dial("tcp", cfg.RaterBalancer) + client, err := rpc.Dial("tcp", cfg.RALsBalancer) if err != nil { utils.Logger.Crit(fmt.Sprintf("Cannot contact the balancer: %v", err)) exitChan <- true return } var reply int - utils.Logger.Info(fmt.Sprintf("Registering to balancer %s", cfg.RaterBalancer)) + utils.Logger.Info(fmt.Sprintf("Registering to balancer %s", cfg.RALsBalancer)) client.Call("Responder.RegisterRater", cfg.RPCGOBListen, &reply) if err := client.Close(); err != nil { utils.Logger.Crit("Could not close balancer registration!") diff --git a/engine/cdr_test.go b/engine/cdr_test.go index 013e0c8dc..af2529958 100644 --- a/engine/cdr_test.go +++ b/engine/cdr_test.go @@ -566,7 +566,7 @@ func TestUsageReqAsCD(t *testing.T) { Account: "1001", Subject: "1001", Destination: "1002", SetupTime: "2013-11-07T08:42:20Z", AnswerTime: "2013-11-07T08:42:26Z", Usage: "0.00000001", } - eCD := &CallDescriptor{CgrId: "9473e7b2e075d168b9da10ae957ee68fe5a217e4", TOR: req.ToR, Direction: req.Direction, Tenant: req.Tenant, Category: req.Category, Account: req.Account, Subject: req.Subject, Destination: req.Destination, TimeStart: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), TimeEnd: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).Add(time.Duration(10))} + eCD := &CallDescriptor{CgrID: "9473e7b2e075d168b9da10ae957ee68fe5a217e4", TOR: req.ToR, Direction: req.Direction, Tenant: req.Tenant, Category: req.Category, Account: req.Account, Subject: req.Subject, Destination: req.Destination, TimeStart: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), TimeEnd: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).Add(time.Duration(10))} if cd, err := req.AsCallDescriptor(""); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCD, cd) { diff --git a/engine/cdrs.go b/engine/cdrs.go index 4431b9137..c31badcd3 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -486,7 +486,7 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { self.cgrCfg.HttpFailedDir, rplCfg.FallbackFileName()) _, err := utils.HttpPoster( - rplCfg.Server, self.cgrCfg.HttpSkipTlsVerify, body, + rplCfg.Address, self.cgrCfg.HttpSkipTlsVerify, body, content, rplCfg.Attempts, fallbackPath) if err != nil { utils.Logger.Err(fmt.Sprintf( diff --git a/engine/libengine.go b/engine/libengine.go index e694f599c..d541de76d 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -19,23 +19,31 @@ along with this program. If not, see package engine import ( + "errors" + "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, codec string, - rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection) (*rpcclient.RpcClientPool, error) { + rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) { var rpcClient *rpcclient.RpcClient var err error rpcPool := rpcclient.NewRpcClientPool(dispatchStrategy) for _, rpcConnCfg := range rpcConnCfgs { - if rpcConnCfg.Server == utils.INTERNAL { - internalConn := <-internalConnChan - internalConnChan <- internalConn + if rpcConnCfg.Address == utils.MetaInternal { + var internalConn rpcclient.RpcClientConnection + select { + case internalConn := <-internalConnChan: + internalConnChan <- internalConn + case <-time.After(ttl): + return nil, errors.New("TTL triggered") + } rpcClient, err = rpcclient.NewRpcClient("", "", 0, 0, rpcclient.INTERNAL_RPC, internalConn) } else { - rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Server, connAttempts, reconnects, codec, nil) + rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, codec, nil) } if err != nil { break diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 7b64860be..f18ee52ec 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -231,9 +231,9 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) { func (sm *FSSessionManager) Connect() error { eventFilters := map[string]string{"Call-Direction": "inbound"} errChan := make(chan error) - for _, connCfg := range sm.cfg.Connections { + for _, connCfg := range sm.cfg.EventSocketConns { connId := utils.GenUUID() - fSock, err := fsock.NewFSock(connCfg.Server, connCfg.Password, connCfg.Reconnects, sm.createHandlers(), eventFilters, utils.Logger.(*syslog.Writer), connId) + fSock, err := fsock.NewFSock(connCfg.Address, connCfg.Password, connCfg.Reconnects, sm.createHandlers(), eventFilters, utils.Logger.(*syslog.Writer), connId) if err != nil { return err } else if !fSock.Connected() { @@ -246,7 +246,7 @@ func (sm *FSSessionManager) Connect() error { errChan <- err } }() - if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1, sm.cfg.MaxWaitConnection, + if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Address, connCfg.Password, 1, sm.cfg.MaxWaitConnection, make(map[string][]func(string, string)), make(map[string]string), utils.Logger.(*syslog.Writer), connId); err != nil { return fmt.Errorf("Cannot connect FreeSWITCH senders pool, error: %s", err.Error()) } else if fsSenderPool == nil { diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index b56d0c96d..af7c5fb53 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -170,9 +170,9 @@ func (self *KamailioSessionManager) Connect() error { regexp.MustCompile("CGR_CALL_END"): []func([]byte, string){self.onCallEnd}, } errChan := make(chan error) - for _, connCfg := range self.cfg.Connections { + for _, connCfg := range self.cfg.EvapiConns { connId := utils.GenUUID() - if self.conns[connId], err = kamevapi.NewKamEvapi(connCfg.EvapiAddr, connId, connCfg.Reconnects, eventHandlers, utils.Logger.(*syslog.Writer)); err != nil { + if self.conns[connId], err = kamevapi.NewKamEvapi(connCfg.Address, connId, connCfg.Reconnects, eventHandlers, utils.Logger.(*syslog.Writer)); err != nil { return err } go func() { // Start reading in own goroutine, return on error