mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 13:19:53 +05:00
Pass connection pool to engine services
This commit is contained in:
@@ -126,122 +126,67 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
}
|
||||
|
||||
// Connection to HistoryS,
|
||||
// FixMe via multiple connections
|
||||
var ralsHistoryServer string
|
||||
for _, connCfg := range cfg.RALsHistorySConns {
|
||||
ralsHistoryServer = connCfg.Address
|
||||
break
|
||||
}
|
||||
if ralsHistoryServer != "" {
|
||||
if len(cfg.RALsHistorySConns) != 0 {
|
||||
histTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, histTaskChan)
|
||||
go func() {
|
||||
defer close(histTaskChan)
|
||||
var scribeServer rpcclient.RpcClientConnection
|
||||
if ralsHistoryServer == utils.INTERNAL {
|
||||
select {
|
||||
case scribeServer = <-internalHistorySChan:
|
||||
internalHistorySChan <- scribeServer
|
||||
case <-time.After(cfg.InternalTtl):
|
||||
utils.Logger.Crit("<Rater>: Internal historys connection timeout.")
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
} else if scribeServer, err = rpcclient.NewRpcClient("tcp", ralsHistoryServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect historys, error: %s", err.Error()))
|
||||
if historySConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.RALsHistorySConns, internalHistorySChan, cfg.InternalTtl); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect HistoryS, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
} else {
|
||||
engine.SetHistoryScribe(historySConns)
|
||||
}
|
||||
engine.SetHistoryScribe(scribeServer)
|
||||
}()
|
||||
}
|
||||
// Connection to pubsubs
|
||||
var ralsPubSubServer string
|
||||
for _, connCfg := range cfg.RALsPubSubSConns {
|
||||
ralsPubSubServer = connCfg.Address
|
||||
break
|
||||
}
|
||||
if ralsPubSubServer != "" {
|
||||
if len(cfg.RALsPubSubSConns) != 0 {
|
||||
pubsubTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, pubsubTaskChan)
|
||||
go func() {
|
||||
defer close(pubsubTaskChan)
|
||||
var pubSubServer rpcclient.RpcClientConnection
|
||||
if ralsPubSubServer == utils.INTERNAL {
|
||||
select {
|
||||
case pubSubServer = <-internalPubSubSChan:
|
||||
internalPubSubSChan <- pubSubServer
|
||||
case <-time.After(cfg.InternalTtl):
|
||||
utils.Logger.Crit("<Rater>: Internal pubsub connection timeout.")
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
} else if pubSubServer, err = rpcclient.NewRpcClient("tcp", ralsPubSubServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to pubsubs: %s", err.Error()))
|
||||
if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.RALsPubSubSConns, internalPubSubSChan, cfg.InternalTtl); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to PubSubS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
} else {
|
||||
engine.SetPubSub(pubSubSConns)
|
||||
}
|
||||
engine.SetPubSub(pubSubServer)
|
||||
}()
|
||||
}
|
||||
|
||||
// Connection to AliasService
|
||||
var ralsAliasServer string
|
||||
for _, connCfg := range cfg.RALsAliasSConns {
|
||||
ralsAliasServer = connCfg.Address
|
||||
break
|
||||
}
|
||||
if ralsAliasServer != "" {
|
||||
if len(cfg.RALsAliasSConns) != 0 {
|
||||
aliasesTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, aliasesTaskChan)
|
||||
go func() {
|
||||
defer close(aliasesTaskChan)
|
||||
var aliasesServer rpcclient.RpcClientConnection
|
||||
if ralsAliasServer == utils.INTERNAL {
|
||||
select {
|
||||
case aliasesServer = <-internalAliaseSChan:
|
||||
internalAliaseSChan <- aliasesServer
|
||||
case <-time.After(cfg.InternalTtl):
|
||||
utils.Logger.Crit("<Rater>: Internal aliases connection timeout.")
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
} else if aliasesServer, err = rpcclient.NewRpcClient("tcp", ralsAliasServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to aliases, error: %s", err.Error()))
|
||||
if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.RALsAliasSConns, internalAliaseSChan, cfg.InternalTtl); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to AliaseS, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
} else {
|
||||
engine.SetAliasService(aliaseSCons)
|
||||
}
|
||||
engine.SetAliasService(aliasesServer)
|
||||
}()
|
||||
}
|
||||
|
||||
// Connection to UserService
|
||||
var ralsUserServer string
|
||||
for _, connCfg := range cfg.RALsUserSConns {
|
||||
ralsUserServer = connCfg.Address
|
||||
break
|
||||
}
|
||||
var userServer rpcclient.RpcClientConnection
|
||||
if ralsUserServer != "" {
|
||||
var usersConns rpcclient.RpcClientConnection
|
||||
if len(cfg.RALsUserSConns) != 0 {
|
||||
usersTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, usersTaskChan)
|
||||
go func() {
|
||||
defer close(usersTaskChan)
|
||||
if ralsUserServer == utils.INTERNAL {
|
||||
select {
|
||||
case userServer = <-internalUserSChan:
|
||||
internalUserSChan <- userServer
|
||||
case <-time.After(cfg.InternalTtl):
|
||||
utils.Logger.Crit("<Rater>: Internal users connection timeout.")
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
} else if userServer, err = rpcclient.NewRpcClient("tcp", ralsUserServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect users, error: %s", err.Error()))
|
||||
if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.RALsAliasSConns, internalAliaseSChan, cfg.InternalTtl); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect UserS, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
engine.SetUserService(userServer)
|
||||
engine.SetUserService(usersConns)
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -253,7 +198,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
responder := &engine.Responder{Bal: bal, ExitChan: exitChan, Stats: cdrStats}
|
||||
responder.SetTimeToLive(cfg.ResponseCacheTTL, nil)
|
||||
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Sched: sched,
|
||||
Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: userServer}
|
||||
Config: cfg, Responder: responder, CdrStatsSrv: cdrStats, Users: usersConns}
|
||||
apierRpcV2 := &v2.ApierV2{
|
||||
ApierV1: *apierRpcV1}
|
||||
// internalSchedulerChan shared here
|
||||
|
||||
Reference in New Issue
Block a user