mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 12:49:54 +05:00
Fix build on hapool
This commit is contained in:
@@ -817,7 +817,7 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach
|
||||
}
|
||||
cs.CdrStats = len(queueIds)
|
||||
}
|
||||
if self.Config.RaterUserServer == utils.INTERNAL {
|
||||
if self.Users != nil {
|
||||
var ups engine.UserProfiles
|
||||
if err := self.Users.Call("UsersV1.GetUsers", &engine.UserProfile{}, &ups); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
|
||||
@@ -104,25 +104,16 @@ func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConn
|
||||
// Fires up a cdrc instance
|
||||
func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool,
|
||||
closeChan chan struct{}, exitChan chan bool) {
|
||||
var cdrsConn rpcclient.RpcClientConnection
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
}
|
||||
if cdrcCfg.Cdrs == utils.INTERNAL {
|
||||
cdrsChan := <-internalCdrSChan // This will signal that the cdrs part is populated in internalRaterChan
|
||||
internalCdrSChan <- cdrsChan // Put it back for other components
|
||||
resp := <-internalRaterChan
|
||||
cdrsConn = resp
|
||||
internalRaterChan <- resp
|
||||
} else {
|
||||
conn, err := rpcclient.NewRpcClient("tcp", cdrcCfg.Cdrs, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %v", err))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
cdrsConn = conn
|
||||
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cdrcCfg.CdrsConns, internalCdrSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.DefaultTimezone)
|
||||
if err != nil {
|
||||
@@ -139,18 +130,18 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
|
||||
func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
|
||||
utils.Logger.Info("Starting CGRateS SM-Generic service.")
|
||||
ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.SmGenericConfig.RaterConns, internalRaterChan)
|
||||
cfg.SmGenericConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
var cdrsConn *rpcclient.RpcClientPool
|
||||
if reflect.DeepEqual(cfg.SmGenericConfig.RaterConns, cfg.SmGenericConfig.CdrsConns) {
|
||||
if reflect.DeepEqual(cfg.SmGenericConfig.RALsConns, cfg.SmGenericConfig.CDRsConns) {
|
||||
cdrsConn = ralConn
|
||||
} else {
|
||||
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.SmGenericConfig.CdrsConns, internalCDRSChan)
|
||||
cfg.SmGenericConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
|
||||
exitChan <- true
|
||||
@@ -179,18 +170,23 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal
|
||||
func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
|
||||
utils.Logger.Info("Starting CGRateS DiameterAgent service.")
|
||||
smgConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.DiameterAgentCfg().SMGenericConns, internalSMGChan)
|
||||
cfg.DiameterAgentCfg().SMGenericConns, internalSMGChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to SMG: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
pubsubConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.DiameterAgentCfg().PubSubConns, internalPubSubSChan)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to PubSubS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
var pubsubConn *rpcclient.RpcClientPool
|
||||
if reflect.DeepEqual(cfg.DiameterAgentCfg().SMGenericConns, cfg.DiameterAgentCfg().PubSubConns) {
|
||||
pubsubConn = smgConn
|
||||
} else {
|
||||
pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.DiameterAgentCfg().PubSubConns, internalPubSubSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to PubSubS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
da, err := agents.NewDiameterAgent(cfg, smgConn, pubsubConn)
|
||||
if err != nil {
|
||||
@@ -205,68 +201,83 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC
|
||||
}
|
||||
|
||||
func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
|
||||
utils.Logger.Info("Starting CGRateS SM-FreeSWITCH service.")
|
||||
utils.Logger.Info("Starting CGRateS SMFreeSWITCH service.")
|
||||
ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.SmFsConfig.RaterConns, internalRaterChan)
|
||||
cfg.SmFsConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
|
||||
utils.Logger.Crit(fmt.Sprintf("<SMFreeSWITCH> Could not connect to RAL: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.SmFsConfig.CdrsConns, internalRaterChan)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
var cdrsConn *rpcclient.RpcClientPool
|
||||
if reflect.DeepEqual(cfg.SmFsConfig.RALsConns, cfg.SmFsConfig.CDRsConns) {
|
||||
cdrsConn = ralConn
|
||||
} else {
|
||||
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.SmFsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SMFreeSWITCH> Could not connect to RAL: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, ralConn, cdrsConn, cfg.DefaultTimezone)
|
||||
smRpc.SMs = append(smRpc.SMs, sm)
|
||||
if err = sm.Connect(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> error: %s!", err))
|
||||
utils.Logger.Err(fmt.Sprintf("<SMFreeSWITCH> error: %s!", err))
|
||||
}
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startSmKamailio(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
|
||||
utils.Logger.Info("Starting CGRateS SM-Kamailio service.")
|
||||
utils.Logger.Info("Starting CGRateS SMKamailio service.")
|
||||
ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.SmKamConfig.RaterConns, internalRaterChan)
|
||||
cfg.SmKamConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
|
||||
utils.Logger.Crit(fmt.Sprintf("<SMKamailio> Could not connect to RAL: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.SmKamConfig.CdrsConns, internalRaterChan)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
var cdrsConn *rpcclient.RpcClientPool
|
||||
if reflect.DeepEqual(cfg.SmKamConfig.RALsConns, cfg.SmKamConfig.CDRsConns) {
|
||||
cdrsConn = ralConn
|
||||
} else {
|
||||
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.SmKamConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SMKamailio> Could not connect to RAL: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, ralConn, cdrsConn, cfg.DefaultTimezone)
|
||||
smRpc.SMs = append(smRpc.SMs, sm)
|
||||
if err = sm.Connect(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> error: %s!", err))
|
||||
utils.Logger.Err(fmt.Sprintf("<SMKamailio> error: %s!", err))
|
||||
}
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
|
||||
utils.Logger.Info("Starting CGRateS SM-OpenSIPS service.")
|
||||
utils.Logger.Info("Starting CGRateS SMOpenSIPS service.")
|
||||
ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.SmOsipsConfig.RaterConns, internalRaterChan)
|
||||
cfg.SmOsipsConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
|
||||
utils.Logger.Crit(fmt.Sprintf("<SMOpenSIPS> Could not connect to RALs: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.SmOsipsConfig.CdrsConns, internalRaterChan)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
var cdrsConn *rpcclient.RpcClientPool
|
||||
if reflect.DeepEqual(cfg.SmOsipsConfig.RALsConns, cfg.SmOsipsConfig.CDRsConns) {
|
||||
cdrsConn = ralConn
|
||||
} else {
|
||||
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.SmOsipsConfig.CDRsConns, internalRaterChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SMOpenSIPS> Could not connect to CDRs: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, cfg.Reconnects, ralConn, cdrsConn, cfg.DefaultTimezone)
|
||||
smRpc.SMs = append(smRpc.SMs, sm)
|
||||
@@ -283,45 +294,64 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
|
||||
utils.Logger.Info("Starting CGRateS CDRS service.")
|
||||
// Conn pool towards RAL
|
||||
ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.CDRSRaterConns, internalRaterChan)
|
||||
cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to RAL: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
// Pubsub connection init
|
||||
pubSubConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.CDRSPubSubSConns, internalPubSubSChan)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to PubSubSystem: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
var pubSubConn *rpcclient.RpcClientPool
|
||||
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSPubSubSConns) {
|
||||
pubSubConn = ralConn
|
||||
} else {
|
||||
pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.CDRSPubSubSConns, internalPubSubSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to PubSubSystem: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
// Users connection init
|
||||
usersConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.CDRSUserSConns, internalUserSChan)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to UserS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
var usersConn *rpcclient.RpcClientPool
|
||||
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSUserSConns) {
|
||||
pubSubConn = ralConn
|
||||
} else {
|
||||
usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to UserS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
// Aliases connection init
|
||||
aliasesConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.CDRSAliaseSConns, internalAliaseSChan)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to AliaseS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
var aliasesConn *rpcclient.RpcClientPool
|
||||
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSAliaseSConns) {
|
||||
pubSubConn = ralConn
|
||||
} else {
|
||||
aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.CDRSAliaseSConns, internalAliaseSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to AliaseS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
// Stats connection init
|
||||
statsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.CDRSStatSConns, internalCdrStatSChan)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to StatS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
var statsConn *rpcclient.RpcClientPool
|
||||
if reflect.DeepEqual(cfg.CDRSRaterConns, cfg.CDRSStatSConns) {
|
||||
pubSubConn = ralConn
|
||||
} else {
|
||||
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.CDRSStatSConns, internalCdrStatSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to StatS: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, ralConn, pubSubConn, usersConn, aliasesConn, statsConn)
|
||||
cdrServer.SetTimeToLive(cfg.ResponseCacheTTL, nil)
|
||||
utils.Logger.Info("Registering CDRS HTTP Handlers.")
|
||||
@@ -459,7 +489,7 @@ func main() {
|
||||
}
|
||||
config.SetCgrConfig(cfg) // Share the config object
|
||||
if *raterEnabled {
|
||||
cfg.RaterEnabled = *raterEnabled
|
||||
cfg.RALsEnabled = *raterEnabled
|
||||
}
|
||||
if *schedEnabled {
|
||||
cfg.SchedulerEnabled = *schedEnabled
|
||||
@@ -472,7 +502,7 @@ func main() {
|
||||
var logDb engine.LogStorage
|
||||
var loadDb engine.LoadStorage
|
||||
var cdrDb engine.CdrStorage
|
||||
if cfg.RaterEnabled || cfg.SchedulerEnabled || cfg.CDRStatsEnabled { // Only connect to dataDb if necessary
|
||||
if cfg.RALsEnabled || cfg.SchedulerEnabled || cfg.CDRStatsEnabled { // Only connect to dataDb if necessary
|
||||
ratingDb, err = engine.ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort,
|
||||
cfg.TpDbName, cfg.TpDbUser, cfg.TpDbPass, cfg.DBDataEncoding)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
@@ -482,7 +512,7 @@ func main() {
|
||||
defer ratingDb.Close()
|
||||
engine.SetRatingStorage(ratingDb)
|
||||
}
|
||||
if cfg.RaterEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled || cfg.AliasesServerEnabled || cfg.UserServerEnabled {
|
||||
if cfg.RALsEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled || cfg.AliasesServerEnabled || cfg.UserServerEnabled {
|
||||
accountDb, err = engine.ConfigureAccountingStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort,
|
||||
cfg.DataDbName, cfg.DataDbUser, cfg.DataDbPass, cfg.DBDataEncoding)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
@@ -492,7 +522,7 @@ func main() {
|
||||
defer accountDb.Close()
|
||||
engine.SetAccountingStorage(accountDb)
|
||||
}
|
||||
if cfg.RaterEnabled || cfg.CDRSEnabled || cfg.SchedulerEnabled { // Only connect to storDb if necessary
|
||||
if cfg.RALsEnabled || cfg.CDRSEnabled || cfg.SchedulerEnabled { // Only connect to storDb if necessary
|
||||
logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort,
|
||||
cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns, cfg.StorDBCDRSIndexes)
|
||||
if err != nil { // Cannot configure logger database, show stopper
|
||||
@@ -536,7 +566,7 @@ func main() {
|
||||
}
|
||||
|
||||
// Start rater service
|
||||
if cfg.RaterEnabled {
|
||||
if cfg.RALsEnabled {
|
||||
go startRater(internalRaterChan, cacheDoneChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
|
||||
server, ratingDb, accountDb, loadDb, cdrDb, logDb, &stopHandled, exitChan)
|
||||
}
|
||||
|
||||
@@ -87,12 +87,12 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
|
||||
// Connection to balancer
|
||||
var bal *balancer2go.Balancer
|
||||
if cfg.RaterBalancer != "" {
|
||||
if cfg.RALsBalancer != "" {
|
||||
balTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, balTaskChan)
|
||||
go func() {
|
||||
defer close(balTaskChan)
|
||||
if cfg.RaterBalancer == utils.INTERNAL {
|
||||
if cfg.RALsBalancer == utils.INTERNAL {
|
||||
select {
|
||||
case bal = <-internalBalancerChan:
|
||||
internalBalancerChan <- bal // Put it back if someone else is interested about
|
||||
@@ -108,39 +108,37 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Connection to CDRStats
|
||||
var cdrStats rpcclient.RpcClientConnection
|
||||
if cfg.RaterCdrStats != "" {
|
||||
// Connections to CDRStats
|
||||
var cdrStats *rpcclient.RpcClientPool
|
||||
if len(cfg.RALsCDRStatSConns) != 0 {
|
||||
cdrstatTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, cdrstatTaskChan)
|
||||
go func() {
|
||||
defer close(cdrstatTaskChan)
|
||||
if cfg.RaterCdrStats == utils.INTERNAL {
|
||||
select {
|
||||
case cdrStats = <-internalCdrStatSChan:
|
||||
internalCdrStatSChan <- cdrStats
|
||||
case <-time.After(cfg.InternalTtl):
|
||||
utils.Logger.Crit("<Rater>: Internal cdrstats connection timeout.")
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
} else if cdrStats, err = rpcclient.NewRpcClient("tcp", cfg.RaterCdrStats, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to cdrstats, error: %s", err.Error()))
|
||||
cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
|
||||
cfg.CDRSRaterConns, internalCdrStatSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to CDRStatS, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Connection to HistoryS
|
||||
if cfg.RaterHistoryServer != "" {
|
||||
// Connection to HistoryS,
|
||||
// FixMe via multiple connections
|
||||
var ralsHistoryServer string
|
||||
for _, connCfg := range cfg.RALsHistorySConns {
|
||||
ralsHistoryServer = connCfg.Address
|
||||
break
|
||||
}
|
||||
if ralsHistoryServer != "" {
|
||||
histTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, histTaskChan)
|
||||
go func() {
|
||||
defer close(histTaskChan)
|
||||
var scribeServer rpcclient.RpcClientConnection
|
||||
if cfg.RaterHistoryServer == utils.INTERNAL {
|
||||
if ralsHistoryServer == utils.INTERNAL {
|
||||
select {
|
||||
case scribeServer = <-internalHistorySChan:
|
||||
internalHistorySChan <- scribeServer
|
||||
@@ -149,7 +147,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
} else if scribeServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterHistoryServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
} else if scribeServer, err = rpcclient.NewRpcClient("tcp", ralsHistoryServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect historys, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
@@ -157,15 +155,19 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
engine.SetHistoryScribe(scribeServer)
|
||||
}()
|
||||
}
|
||||
|
||||
// Connection to pubsubs
|
||||
if cfg.RaterPubSubServer != "" {
|
||||
var ralsPubSubServer string
|
||||
for _, connCfg := range cfg.RALsPubSubSConns {
|
||||
ralsPubSubServer = connCfg.Address
|
||||
break
|
||||
}
|
||||
if ralsPubSubServer != "" {
|
||||
pubsubTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, pubsubTaskChan)
|
||||
go func() {
|
||||
defer close(pubsubTaskChan)
|
||||
var pubSubServer rpcclient.RpcClientConnection
|
||||
if cfg.RaterPubSubServer == utils.INTERNAL {
|
||||
if ralsPubSubServer == utils.INTERNAL {
|
||||
select {
|
||||
case pubSubServer = <-internalPubSubSChan:
|
||||
internalPubSubSChan <- pubSubServer
|
||||
@@ -174,7 +176,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
} else if pubSubServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterPubSubServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
} else if pubSubServer, err = rpcclient.NewRpcClient("tcp", ralsPubSubServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to pubsubs: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
@@ -184,13 +186,18 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
}
|
||||
|
||||
// Connection to AliasService
|
||||
if cfg.RaterAliasesServer != "" {
|
||||
var ralsAliasServer string
|
||||
for _, connCfg := range cfg.RALsAliasSConns {
|
||||
ralsAliasServer = connCfg.Address
|
||||
break
|
||||
}
|
||||
if ralsAliasServer != "" {
|
||||
aliasesTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, aliasesTaskChan)
|
||||
go func() {
|
||||
defer close(aliasesTaskChan)
|
||||
var aliasesServer rpcclient.RpcClientConnection
|
||||
if cfg.RaterAliasesServer == utils.INTERNAL {
|
||||
if ralsAliasServer == utils.INTERNAL {
|
||||
select {
|
||||
case aliasesServer = <-internalAliaseSChan:
|
||||
internalAliaseSChan <- aliasesServer
|
||||
@@ -199,7 +206,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
} else if aliasesServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterAliasesServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
} else if aliasesServer, err = rpcclient.NewRpcClient("tcp", ralsAliasServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to aliases, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
@@ -209,13 +216,18 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
}
|
||||
|
||||
// Connection to UserService
|
||||
var ralsUserServer string
|
||||
for _, connCfg := range cfg.RALsUserSConns {
|
||||
ralsUserServer = connCfg.Address
|
||||
break
|
||||
}
|
||||
var userServer rpcclient.RpcClientConnection
|
||||
if cfg.RaterUserServer != "" {
|
||||
if ralsUserServer != "" {
|
||||
usersTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, usersTaskChan)
|
||||
go func() {
|
||||
defer close(usersTaskChan)
|
||||
if cfg.RaterUserServer == utils.INTERNAL {
|
||||
if ralsUserServer == utils.INTERNAL {
|
||||
select {
|
||||
case userServer = <-internalUserSChan:
|
||||
internalUserSChan <- userServer
|
||||
@@ -224,7 +236,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
} else if userServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterUserServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
} else if userServer, err = rpcclient.NewRpcClient("tcp", ralsUserServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect users, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
|
||||
@@ -83,14 +83,14 @@ func stopRaterSignalHandler(internalCdrStatSChan chan rpcclient.RpcClientConnect
|
||||
Connects to the balancer and calls unregister RPC method.
|
||||
*/
|
||||
func unregisterFromBalancer(exitChan chan bool) {
|
||||
client, err := rpc.Dial("tcp", cfg.RaterBalancer)
|
||||
client, err := rpc.Dial("tcp", cfg.RALsBalancer)
|
||||
if err != nil {
|
||||
utils.Logger.Crit("Cannot contact the balancer!")
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
var reply int
|
||||
utils.Logger.Info(fmt.Sprintf("Unregistering from balancer %s", cfg.RaterBalancer))
|
||||
utils.Logger.Info(fmt.Sprintf("Unregistering from balancer %s", cfg.RALsBalancer))
|
||||
client.Call("Responder.UnRegisterRater", cfg.RPCGOBListen, &reply)
|
||||
if err := client.Close(); err != nil {
|
||||
utils.Logger.Crit("Could not close balancer unregistration!")
|
||||
@@ -102,14 +102,14 @@ func unregisterFromBalancer(exitChan chan bool) {
|
||||
Connects to the balancer and rehisters the engine to the server.
|
||||
*/
|
||||
func registerToBalancer(exitChan chan bool) {
|
||||
client, err := rpc.Dial("tcp", cfg.RaterBalancer)
|
||||
client, err := rpc.Dial("tcp", cfg.RALsBalancer)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Cannot contact the balancer: %v", err))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
var reply int
|
||||
utils.Logger.Info(fmt.Sprintf("Registering to balancer %s", cfg.RaterBalancer))
|
||||
utils.Logger.Info(fmt.Sprintf("Registering to balancer %s", cfg.RALsBalancer))
|
||||
client.Call("Responder.RegisterRater", cfg.RPCGOBListen, &reply)
|
||||
if err := client.Close(); err != nil {
|
||||
utils.Logger.Crit("Could not close balancer registration!")
|
||||
|
||||
@@ -566,7 +566,7 @@ func TestUsageReqAsCD(t *testing.T) {
|
||||
Account: "1001", Subject: "1001", Destination: "1002",
|
||||
SetupTime: "2013-11-07T08:42:20Z", AnswerTime: "2013-11-07T08:42:26Z", Usage: "0.00000001",
|
||||
}
|
||||
eCD := &CallDescriptor{CgrId: "9473e7b2e075d168b9da10ae957ee68fe5a217e4", TOR: req.ToR, Direction: req.Direction, Tenant: req.Tenant, Category: req.Category, Account: req.Account, Subject: req.Subject, Destination: req.Destination, TimeStart: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), TimeEnd: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).Add(time.Duration(10))}
|
||||
eCD := &CallDescriptor{CgrID: "9473e7b2e075d168b9da10ae957ee68fe5a217e4", TOR: req.ToR, Direction: req.Direction, Tenant: req.Tenant, Category: req.Category, Account: req.Account, Subject: req.Subject, Destination: req.Destination, TimeStart: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), TimeEnd: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).Add(time.Duration(10))}
|
||||
if cd, err := req.AsCallDescriptor(""); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eCD, cd) {
|
||||
|
||||
@@ -486,7 +486,7 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
|
||||
self.cgrCfg.HttpFailedDir,
|
||||
rplCfg.FallbackFileName())
|
||||
_, err := utils.HttpPoster(
|
||||
rplCfg.Server, self.cgrCfg.HttpSkipTlsVerify, body,
|
||||
rplCfg.Address, self.cgrCfg.HttpSkipTlsVerify, body,
|
||||
content, rplCfg.Attempts, fallbackPath)
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf(
|
||||
|
||||
@@ -19,23 +19,31 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, codec string,
|
||||
rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection) (*rpcclient.RpcClientPool, error) {
|
||||
rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) {
|
||||
var rpcClient *rpcclient.RpcClient
|
||||
var err error
|
||||
rpcPool := rpcclient.NewRpcClientPool(dispatchStrategy)
|
||||
for _, rpcConnCfg := range rpcConnCfgs {
|
||||
if rpcConnCfg.Server == utils.INTERNAL {
|
||||
internalConn := <-internalConnChan
|
||||
internalConnChan <- internalConn
|
||||
if rpcConnCfg.Address == utils.MetaInternal {
|
||||
var internalConn rpcclient.RpcClientConnection
|
||||
select {
|
||||
case internalConn := <-internalConnChan:
|
||||
internalConnChan <- internalConn
|
||||
case <-time.After(ttl):
|
||||
return nil, errors.New("TTL triggered")
|
||||
}
|
||||
rpcClient, err = rpcclient.NewRpcClient("", "", 0, 0, rpcclient.INTERNAL_RPC, internalConn)
|
||||
} else {
|
||||
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Server, connAttempts, reconnects, codec, nil)
|
||||
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, codec, nil)
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
|
||||
@@ -231,9 +231,9 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) {
|
||||
func (sm *FSSessionManager) Connect() error {
|
||||
eventFilters := map[string]string{"Call-Direction": "inbound"}
|
||||
errChan := make(chan error)
|
||||
for _, connCfg := range sm.cfg.Connections {
|
||||
for _, connCfg := range sm.cfg.EventSocketConns {
|
||||
connId := utils.GenUUID()
|
||||
fSock, err := fsock.NewFSock(connCfg.Server, connCfg.Password, connCfg.Reconnects, sm.createHandlers(), eventFilters, utils.Logger.(*syslog.Writer), connId)
|
||||
fSock, err := fsock.NewFSock(connCfg.Address, connCfg.Password, connCfg.Reconnects, sm.createHandlers(), eventFilters, utils.Logger.(*syslog.Writer), connId)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if !fSock.Connected() {
|
||||
@@ -246,7 +246,7 @@ func (sm *FSSessionManager) Connect() error {
|
||||
errChan <- err
|
||||
}
|
||||
}()
|
||||
if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1, sm.cfg.MaxWaitConnection,
|
||||
if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Address, connCfg.Password, 1, sm.cfg.MaxWaitConnection,
|
||||
make(map[string][]func(string, string)), make(map[string]string), utils.Logger.(*syslog.Writer), connId); err != nil {
|
||||
return fmt.Errorf("Cannot connect FreeSWITCH senders pool, error: %s", err.Error())
|
||||
} else if fsSenderPool == nil {
|
||||
|
||||
@@ -170,9 +170,9 @@ func (self *KamailioSessionManager) Connect() error {
|
||||
regexp.MustCompile("CGR_CALL_END"): []func([]byte, string){self.onCallEnd},
|
||||
}
|
||||
errChan := make(chan error)
|
||||
for _, connCfg := range self.cfg.Connections {
|
||||
for _, connCfg := range self.cfg.EvapiConns {
|
||||
connId := utils.GenUUID()
|
||||
if self.conns[connId], err = kamevapi.NewKamEvapi(connCfg.EvapiAddr, connId, connCfg.Reconnects, eventHandlers, utils.Logger.(*syslog.Writer)); err != nil {
|
||||
if self.conns[connId], err = kamevapi.NewKamEvapi(connCfg.Address, connId, connCfg.Reconnects, eventHandlers, utils.Logger.(*syslog.Writer)); err != nil {
|
||||
return err
|
||||
}
|
||||
go func() { // Start reading in own goroutine, return on error
|
||||
|
||||
Reference in New Issue
Block a user