Channels for CDRS sent to SM

This commit is contained in:
DanB
2015-12-21 19:27:40 +01:00
parent d91dc98ebc
commit 2f66d81ff9

View File

@@ -136,7 +136,7 @@ func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *
}
}
func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internalRaterChan chan *engine.Responder, server *utils.Server, exitChan chan bool) {
func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internalRaterChan chan *engine.Responder, internalCDRSChan chan *engine.CdrServer, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-Generic service.")
raterConn := rpcclient.NewRpcClientPool(rpcclient.POOL_BROADCAST)
cdrsConn := rpcclient.NewRpcClientPool(rpcclient.POOL_BROADCAST)
@@ -164,9 +164,9 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal
} else if len(cfg.SmGenericConfig.CdrsConns) != 0 {
for _, cdrsCfg := range cfg.SmGenericConfig.CdrsConns {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
cdrsConn.AddClient(client)
internalRaterChan <- resp
resp := <-internalCDRSChan
cdrsConn.AddClient(resp)
internalCDRSChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
@@ -230,7 +230,7 @@ func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit
exitChan <- true
}
func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) {
func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, internalCDRSChan chan *engine.CdrServer, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-FreeSWITCH service.")
raterConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
cdrsConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
@@ -258,9 +258,9 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd
} else if len(cfg.SmFsConfig.CdrsConns) != 0 {
for _, cdrsCfg := range cfg.SmFsConfig.CdrsConns {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
resp := <-internalCDRSChan
cdrsConn.AddClient(resp)
internalRaterChan <- resp
internalCDRSChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
@@ -280,7 +280,7 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd
exitChan <- true
}
func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) {
func startSmKamailio(internalRaterChan chan *engine.Responder, internalCDRSChan chan *engine.CdrServer, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-Kamailio service.")
raterConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
cdrsConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
@@ -308,9 +308,9 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
} else if len(cfg.SmKamConfig.CdrsConns) != 0 {
for _, cdrsCfg := range cfg.SmKamConfig.CdrsConns {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
resp := <-internalCDRSChan
cdrsConn.AddClient(resp)
internalRaterChan <- resp
internalCDRSChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
@@ -330,7 +330,7 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
exitChan <- true
}
func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) {
func startSmOpenSIPS(internalRaterChan chan *engine.Responder, internalCDRSChan chan *engine.CdrServer, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-OpenSIPS service.")
raterConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
cdrsConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
@@ -358,9 +358,9 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
} else if len(cfg.SmOsipsConfig.CdrsConns) != 0 {
for _, cdrsCfg := range cfg.SmOsipsConfig.CdrsConns {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
resp := <-internalCDRSChan
cdrsConn.AddClient(resp)
internalRaterChan <- resp
internalCDRSChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
@@ -718,23 +718,23 @@ func main() {
// Start SM-Generic
if cfg.SmGenericConfig.Enabled {
go startSmGeneric(internalSMGChan, internalRaterChan, server, exitChan)
go startSmGeneric(internalSMGChan, internalRaterChan, internalCdrSChan, server, exitChan)
}
// Start SM-FreeSWITCH
if cfg.SmFsConfig.Enabled {
go startSmFreeSWITCH(internalRaterChan, cdrDb, exitChan)
go startSmFreeSWITCH(internalRaterChan, internalCdrSChan, cdrDb, exitChan)
// close all sessions on shutdown
go shutdownSessionmanagerSingnalHandler(exitChan)
}
// Start SM-Kamailio
if cfg.SmKamConfig.Enabled {
go startSmKamailio(internalRaterChan, cdrDb, exitChan)
go startSmKamailio(internalRaterChan, internalCdrSChan, cdrDb, exitChan)
}
// Start SM-OpenSIPS
if cfg.SmOsipsConfig.Enabled {
go startSmOpenSIPS(internalRaterChan, cdrDb, exitChan)
go startSmOpenSIPS(internalRaterChan, internalCdrSChan, cdrDb, exitChan)
}
// Register session manager service // FixMe: make sure this is thread safe