From 9daa3ae4e0bf7c469de8b6166f0082671217b8a9 Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 19 Sep 2015 19:58:53 +0200 Subject: [PATCH] Fix rater and cdrs connection in pool for session managers --- cmd/cgr-engine/cgr-engine.go | 175 ++++++++++-------------- general_tests/tutorial_fs_calls_test.go | 2 + 2 files changed, 75 insertions(+), 102 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 63c6648f7..faf816f48 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -135,51 +135,41 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd var raterConn, cdrsConn engine.ConnectorPool var client *rpcclient.RpcClient var err error - delay := utils.Fib() + // Connect to rater for _, raterCfg := range cfg.SmFsConfig.HaRater { if raterCfg.Server == utils.INTERNAL { resp := <-internalRaterChan raterConn = append(raterConn, resp) internalRaterChan <- resp - } - for i := 0; i < cfg.Reconnects; i++ { + } else { + var err error client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) - if err == nil { //Connected so no need to reiterate - break - } - time.Sleep(delay()) - } - if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to rater via RPC: %v", err)) - exitChan <- true - return - } - raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout}) - } - if reflect.DeepEqual(cfg.SmFsConfig.HaCdrs, cfg.SmFsConfig.HaRater) { - cdrsConn = raterConn - } else if len(cfg.SmFsConfig.HaCdrs) != 0 { - delay = utils.Fib() - for _, cdrsCfg := range cfg.SmFsConfig.HaCdrs { - if cdrsCfg.Server == utils.INTERNAL { - resp := <-internalRaterChan - raterConn = append(raterConn, resp) - internalRaterChan <- resp - } - for i := 0; i < cfg.Reconnects; i++ { - client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) - if err == nil { //Connected so no need to reiterate - break - } - time.Sleep(delay()) - } - if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) + if err != nil { //Connected so no need to reiterate + engine.Logger.Crit(fmt.Sprintf(" Could not connect to rater via RPC: %v", err)) exitChan <- true return } - cdrsConn = append(cdrsConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout}) - + raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout}) + } + } + // Connect to CDRS + if reflect.DeepEqual(cfg.SmFsConfig.HaCdrs, cfg.SmFsConfig.HaRater) { + cdrsConn = raterConn + } else if len(cfg.SmFsConfig.HaCdrs) != 0 { + for _, cdrsCfg := range cfg.SmFsConfig.HaCdrs { + if cdrsCfg.Server == utils.INTERNAL { + resp := <-internalRaterChan + cdrsConn = append(cdrsConn, resp) + internalRaterChan <- resp + } else { + client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) + exitChan <- true + return + } + cdrsConn = append(cdrsConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout}) + } } } sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, raterConn, cdrsConn, cfg.DefaultTimezone) @@ -195,46 +185,37 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS engine.Logger.Info("Starting CGRateS SM-Kamailio service.") var raterConn, cdrsConn engine.ConnectorPool var client *rpcclient.RpcClient - var err error - delay := utils.Fib() + // Connect to rater for _, raterCfg := range cfg.SmKamConfig.HaRater { if raterCfg.Server == utils.INTERNAL { resp := <-internalRaterChan raterConn = append(raterConn, resp) internalRaterChan <- resp - } - for i := 0; i < cfg.Reconnects; i++ { + } else { + var err error client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) - if err == nil { //Connected so no need to reiterate - break + if err != nil { //Connected so no need to reiterate + engine.Logger.Crit(fmt.Sprintf(" Could not connect to rater via RPC: %v", err)) + exitChan <- true + return } - time.Sleep(delay()) + raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout}) } - if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to rater: %v", err)) - exitChan <- true - } - raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout}) - if reflect.DeepEqual(cfg.SmKamConfig.HaCdrs, cfg.SmKamConfig.HaRater) { - cdrsConn = raterConn - } else if len(cfg.SmKamConfig.HaCdrs) != 0 { - delay := utils.Fib() - for _, cdrsCfg := range cfg.SmKamConfig.HaCdrs { - if cdrsCfg.Server == utils.INTERNAL { - resp := <-internalRaterChan - raterConn = append(raterConn, resp) - internalRaterChan <- resp - } - for i := 0; i < cfg.Reconnects; i++ { - client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) - if err == nil { //Connected so no need to reiterate - break - } - time.Sleep(delay()) - } + } + // Connect to CDRS + if reflect.DeepEqual(cfg.SmKamConfig.HaCdrs, cfg.SmKamConfig.HaRater) { + cdrsConn = raterConn + } else if len(cfg.SmKamConfig.HaCdrs) != 0 { + for _, cdrsCfg := range cfg.SmKamConfig.HaCdrs { + if cdrsCfg.Server == utils.INTERNAL { + resp := <-internalRaterChan + cdrsConn = append(cdrsConn, resp) + internalRaterChan <- resp + } else { + client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) + engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) exitChan <- true return } @@ -254,52 +235,42 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrStorage, exitChan chan bool) { var raterConn, cdrsConn engine.ConnectorPool var client *rpcclient.RpcClient - var err error - delay := utils.Fib() + // Connect to rater for _, raterCfg := range cfg.SmOsipsConfig.HaRater { if raterCfg.Server == utils.INTERNAL { resp := <-internalRaterChan raterConn = append(raterConn, resp) internalRaterChan <- resp - } - for i := 0; i < cfg.Reconnects; i++ { + } else { + var err error client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) - if err == nil { //Connected so no need to reiterate - break - } - time.Sleep(delay()) - } - if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to rater: %v", err)) - exitChan <- true - } - raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout}) - } - if reflect.DeepEqual(cfg.SmOsipsConfig.HaCdrs, cfg.SmOsipsConfig.HaRater) { - cdrsConn = raterConn - } - for _, cdrsCfg := range cfg.SmOsipsConfig.HaCdrs { - if cdrsCfg.Server == utils.INTERNAL { - resp := <-internalRaterChan - raterConn = append(raterConn, resp) - internalRaterChan <- resp - } - if len(cfg.SmOsipsConfig.HaCdrs) != 0 { - delay := utils.Fib() - for i := 0; i < cfg.Reconnects; i++ { - client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) - if err == nil { //Connected so no need to reiterate - break - } - time.Sleep(delay()) - } - if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) + if err != nil { //Connected so no need to reiterate + engine.Logger.Crit(fmt.Sprintf(" Could not connect to rater via RPC: %v", err)) exitChan <- true return } - cdrsConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout}) + raterConn = append(raterConn, &engine.RPCClientConnector{Client: client, Timeout: raterCfg.Timeout}) + } + } + // Connect to CDRS + if reflect.DeepEqual(cfg.SmOsipsConfig.HaCdrs, cfg.SmOsipsConfig.HaRater) { + cdrsConn = raterConn + } else if len(cfg.SmOsipsConfig.HaCdrs) != 0 { + for _, cdrsCfg := range cfg.SmOsipsConfig.HaCdrs { + if cdrsCfg.Server == utils.INTERNAL { + resp := <-internalRaterChan + cdrsConn = append(cdrsConn, resp) + internalRaterChan <- resp + } else { + client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) + exitChan <- true + return + } + cdrsConn = append(cdrsConn, &engine.RPCClientConnector{Client: client, Timeout: cdrsCfg.Timeout}) + } } } sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, cfg.Reconnects, raterConn, cdrsConn, cfg.DefaultTimezone) diff --git a/general_tests/tutorial_fs_calls_test.go b/general_tests/tutorial_fs_calls_test.go index b8d7458f7..c301a06ba 100644 --- a/general_tests/tutorial_fs_calls_test.go +++ b/general_tests/tutorial_fs_calls_test.go @@ -72,6 +72,7 @@ func TestTutFsCallsResetStorDb(t *testing.T) { } } +/* // start FS server func TestTutFsCallsStartFS(t *testing.T) { if !*testCalls { @@ -103,6 +104,7 @@ func TestTutFsCallsRestartFS(t *testing.T) { t.Fatal(err) } } +*/ // Connect rpc client to rater func TestTutFsCallsRpcConn(t *testing.T) {