Fix rater and cdrs connection in pool for session managers

This commit is contained in:
DanB
2015-09-19 19:58:53 +02:00
parent 99076bb7e4
commit 9daa3ae4e0
2 changed files with 75 additions and 102 deletions

View File

@@ -135,51 +135,41 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd
var raterConn, cdrsConn engine.ConnectorPool
var client *rpcclient.RpcClient
var err error
delay := utils.Fib()
// Connect to rater
for _, raterCfg := range cfg.SmFsConfig.HaRater {
if raterCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
raterConn = append(raterConn, resp)
internalRaterChan <- resp
}
for i := 0; i < cfg.Reconnects; i++ {
} else {
var err error
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(delay())
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to rater via RPC: %v", err))
exitChan <- true
return
}
raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout})
}
if reflect.DeepEqual(cfg.SmFsConfig.HaCdrs, cfg.SmFsConfig.HaRater) {
cdrsConn = raterConn
} else if len(cfg.SmFsConfig.HaCdrs) != 0 {
delay = utils.Fib()
for _, cdrsCfg := range cfg.SmFsConfig.HaCdrs {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
raterConn = append(raterConn, resp)
internalRaterChan <- resp
}
for i := 0; i < cfg.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(delay())
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to CDRS via RPC: %v", err))
if err != nil { //Connected so no need to reiterate
engine.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to rater via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = append(cdrsConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout})
raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout})
}
}
// Connect to CDRS
if reflect.DeepEqual(cfg.SmFsConfig.HaCdrs, cfg.SmFsConfig.HaRater) {
cdrsConn = raterConn
} else if len(cfg.SmFsConfig.HaCdrs) != 0 {
for _, cdrsCfg := range cfg.SmFsConfig.HaCdrs {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
cdrsConn = append(cdrsConn, resp)
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = append(cdrsConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout})
}
}
}
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, raterConn, cdrsConn, cfg.DefaultTimezone)
@@ -195,46 +185,37 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
engine.Logger.Info("Starting CGRateS SM-Kamailio service.")
var raterConn, cdrsConn engine.ConnectorPool
var client *rpcclient.RpcClient
var err error
delay := utils.Fib()
// Connect to rater
for _, raterCfg := range cfg.SmKamConfig.HaRater {
if raterCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
raterConn = append(raterConn, resp)
internalRaterChan <- resp
}
for i := 0; i < cfg.Reconnects; i++ {
} else {
var err error
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
if err != nil { //Connected so no need to reiterate
engine.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to rater via RPC: %v", err))
exitChan <- true
return
}
time.Sleep(delay())
raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout})
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SessionManager> Could not connect to rater: %v", err))
exitChan <- true
}
raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout})
if reflect.DeepEqual(cfg.SmKamConfig.HaCdrs, cfg.SmKamConfig.HaRater) {
cdrsConn = raterConn
} else if len(cfg.SmKamConfig.HaCdrs) != 0 {
delay := utils.Fib()
for _, cdrsCfg := range cfg.SmKamConfig.HaCdrs {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
raterConn = append(raterConn, resp)
internalRaterChan <- resp
}
for i := 0; i < cfg.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(delay())
}
}
// Connect to CDRS
if reflect.DeepEqual(cfg.SmKamConfig.HaCdrs, cfg.SmKamConfig.HaRater) {
cdrsConn = raterConn
} else if len(cfg.SmKamConfig.HaCdrs) != 0 {
for _, cdrsCfg := range cfg.SmKamConfig.HaCdrs {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
cdrsConn = append(cdrsConn, resp)
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-Kamailio> Could not connect to CDRS via RPC: %v", err))
engine.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
@@ -254,52 +235,42 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) {
var raterConn, cdrsConn engine.ConnectorPool
var client *rpcclient.RpcClient
var err error
delay := utils.Fib()
// Connect to rater
for _, raterCfg := range cfg.SmOsipsConfig.HaRater {
if raterCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
raterConn = append(raterConn, resp)
internalRaterChan <- resp
}
for i := 0; i < cfg.Reconnects; i++ {
} else {
var err error
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(delay())
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SessionManager> Could not connect to rater: %v", err))
exitChan <- true
}
raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout})
}
if reflect.DeepEqual(cfg.SmOsipsConfig.HaCdrs, cfg.SmOsipsConfig.HaRater) {
cdrsConn = raterConn
}
for _, cdrsCfg := range cfg.SmOsipsConfig.HaCdrs {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
raterConn = append(raterConn, resp)
internalRaterChan <- resp
}
if len(cfg.SmOsipsConfig.HaCdrs) != 0 {
delay := utils.Fib()
for i := 0; i < cfg.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(delay())
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
if err != nil { //Connected so no need to reiterate
engine.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to rater via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout})
raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout})
}
}
// Connect to CDRS
if reflect.DeepEqual(cfg.SmOsipsConfig.HaCdrs, cfg.SmOsipsConfig.HaRater) {
cdrsConn = raterConn
} else if len(cfg.SmOsipsConfig.HaCdrs) != 0 {
for _, cdrsCfg := range cfg.SmOsipsConfig.HaCdrs {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
cdrsConn = append(cdrsConn, resp)
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = append(cdrsConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout})
}
}
}
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, cfg.Reconnects, raterConn, cdrsConn, cfg.DefaultTimezone)

View File

@@ -72,6 +72,7 @@ func TestTutFsCallsResetStorDb(t *testing.T) {
}
}
/*
// start FS server
func TestTutFsCallsStartFS(t *testing.T) {
if !*testCalls {
@@ -103,6 +104,7 @@ func TestTutFsCallsRestartFS(t *testing.T) {
t.Fatal(err)
}
}
*/
// Connect rpc client to rater
func TestTutFsCallsRpcConn(t *testing.T) {