From 37c853cb9187fdec7cdc36b15bcfd3d03c7e3576 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 8 Nov 2016 16:03:30 +0100 Subject: [PATCH] RPCClient with lazyConnect --- cmd/cgr-console/cgr-console.go | 3 ++- cmd/cgr-engine/cgr-engine.go | 2 +- cmd/cgr-loader/cgr-loader.go | 3 ++- cmd/cgr-tester/cdr_repl/process_cdr.go | 3 ++- engine/action.go | 3 ++- engine/libengine.go | 4 ++-- engine/pubsub.go | 2 +- general_tests/cdrs_replication_it_test.go | 9 ++++++--- general_tests/rpcclient_it_test.go | 12 ++++++++---- glide.lock | 2 +- 10 files changed, 27 insertions(+), 16 deletions(-) diff --git a/cmd/cgr-console/cgr-console.go b/cmd/cgr-console/cgr-console.go index ed3be9374..7fcd8c60b 100644 --- a/cmd/cgr-console/cgr-console.go +++ b/cmd/cgr-console/cgr-console.go @@ -109,7 +109,8 @@ func main() { return } var err error - client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), *rpc_encoding, nil) + client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3, + time.Duration(1*time.Second), time.Duration(5*time.Minute), *rpc_encoding, nil, false) if err != nil { flag.PrintDefaults() log.Fatal("Could not connect to server " + *server) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 9fac4aa90..e48f92c9d 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -153,7 +153,7 @@ func startSmGeneric(internalSMGChan chan *sessionmanager.SMGeneric, internalRate smgReplConns := make([]*sessionmanager.SMGReplicationConn, len(cfg.SmGenericConfig.SMGReplicationConns)) for i, replConnCfg := range cfg.SmGenericConfig.SMGReplicationConns { if replCon, err := rpcclient.NewRpcClient("tcp", replConnCfg.Address, cfg.ConnectAttempts, cfg.Reconnects, - cfg.ConnectTimeout, cfg.ReplyTimeout, replConnCfg.Transport[1:], nil); err != nil { + cfg.ConnectTimeout, cfg.ReplyTimeout, replConnCfg.Transport[1:], nil, true); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to SMGReplicationConn: <%s>, error: <%s>", replConnCfg.Address, err.Error())) exitChan <- true return diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 195d4f953..c53c6ad53 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -299,7 +299,8 @@ func main() { return } if *historyServer != "" { // Init scribeAgent so we can store the differences - if scribeAgent, err := rpcclient.NewRpcClient("tcp", *historyServer, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), utils.GOB, nil); err != nil { + if scribeAgent, err := rpcclient.NewRpcClient("tcp", *historyServer, 3, 3, + time.Duration(1*time.Second), time.Duration(5*time.Minute), utils.GOB, nil, false); err != nil { log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error()) return } else { diff --git a/cmd/cgr-tester/cdr_repl/process_cdr.go b/cmd/cgr-tester/cdr_repl/process_cdr.go index 1de0fd4da..6ca1df8b3 100644 --- a/cmd/cgr-tester/cdr_repl/process_cdr.go +++ b/cmd/cgr-tester/cdr_repl/process_cdr.go @@ -42,7 +42,8 @@ func main() { if cdrsMasterCfg, err = config.NewCGRConfigFromFolder(cdrsMasterCfgPath); err != nil { log.Fatal("Got config error: ", err.Error()) } - cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil) + cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, + time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false) if err != nil { log.Fatal("Could not connect to rater: ", err.Error()) } diff --git a/engine/action.go b/engine/action.go index 44e867e60..f9b64ae98 100644 --- a/engine/action.go +++ b/engine/action.go @@ -698,7 +698,8 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti } var client rpcclient.RpcClientConnection if req.Address != utils.MetaInternal { - if client, err = rpcclient.NewRpcClient("tcp", req.Address, req.Attempts, 0, config.CgrConfig().ConnectTimeout, config.CgrConfig().ReplyTimeout, req.Transport, nil); err != nil { + if client, err = rpcclient.NewRpcClient("tcp", req.Address, req.Attempts, 0, + config.CgrConfig().ConnectTimeout, config.CgrConfig().ReplyTimeout, req.Transport, nil, false); err != nil { return err } } else { diff --git a/engine/libengine.go b/engine/libengine.go index b01ea0609..7c9fd4d41 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -42,13 +42,13 @@ func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTi case <-time.After(ttl): return nil, errors.New("TTL triggered") } - rpcClient, err = rpcclient.NewRpcClient("", "", connAttempts, reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn) + rpcClient, err = rpcclient.NewRpcClient("", "", connAttempts, reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, false) } else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) { codec := utils.GOB if rpcConnCfg.Transport != "" { codec = rpcConnCfg.Transport[1:] // Transport contains always * before codec understood by rpcclient } - rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil) + rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil, false) } else { return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) } diff --git a/engine/pubsub.go b/engine/pubsub.go index 1b9d70183..95776e3ae 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -231,7 +231,7 @@ type ProxyPubSub struct { } func NewProxyPubSub(addr string, attempts, reconnects int, connectTimeout, replyTimeout time.Duration) (*ProxyPubSub, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, connectTimeout, replyTimeout, utils.GOB, nil) + client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, connectTimeout, replyTimeout, utils.GOB, nil, false) if err != nil { return nil, err } diff --git a/general_tests/cdrs_replication_it_test.go b/general_tests/cdrs_replication_it_test.go index fb5f455b1..b92ef3ba5 100644 --- a/general_tests/cdrs_replication_it_test.go +++ b/general_tests/cdrs_replication_it_test.go @@ -96,7 +96,8 @@ func TestCdrsHttpCdrReplication(t *testing.T) { if !*testIntegration { return } - cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil) + cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, + time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false) if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -113,13 +114,15 @@ func TestCdrsHttpCdrReplication(t *testing.T) { t.Error("Unexpected reply received: ", reply) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) - cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil) + cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1, + time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false) if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } // ToDo: Fix cdr_http to be compatible with rest of processCdr methods var rcvedCdrs []*engine.ExternalCDR - if err := cdrsSlaveRpc.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{CGRIDs: []string{testCdr1.CGRID}, RunIDs: []string{utils.META_DEFAULT}}, &rcvedCdrs); err != nil { + if err := cdrsSlaveRpc.Call("ApierV2.GetCdrs", + utils.RPCCDRsFilter{CGRIDs: []string{testCdr1.CGRID}, RunIDs: []string{utils.META_DEFAULT}}, &rcvedCdrs); err != nil { t.Error("Unexpected error: ", err.Error()) } else if len(rcvedCdrs) != 1 { t.Error("Unexpected number of CDRs returned: ", len(rcvedCdrs)) diff --git a/general_tests/rpcclient_it_test.go b/general_tests/rpcclient_it_test.go index ad9724420..54ef16fb5 100644 --- a/general_tests/rpcclient_it_test.go +++ b/general_tests/rpcclient_it_test.go @@ -76,12 +76,14 @@ func TestRPCITRpcConnPoolFirst(t *testing.T) { return } rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0) - rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil) + rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 3, 1, + time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false) if err == nil { t.Fatal("Should receive cannot connect error here") } rpcPoolFirst.AddClient(rpcRAL1) - rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil) + rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, 3, 1, + time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false) if err != nil { t.Fatal(err) } @@ -218,12 +220,14 @@ func TestRPCITRmtRpcConnPool(t *testing.T) { return } rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0) - rpcRALRmt, err := rpcclient.NewRpcClient("tcp", "172.16.254.83:2012", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil) + rpcRALRmt, err := rpcclient.NewRpcClient("tcp", "172.16.254.83:2012", 1, 1, + time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false) if err != nil { t.Fatal(err) } rpcPoolFirst.AddClient(rpcRALRmt) - rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil) + rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 1, 1, + time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false) if err != nil { t.Fatal(err) } diff --git a/glide.lock b/glide.lock index ae6a76e85..d270875b0 100644 --- a/glide.lock +++ b/glide.lock @@ -16,7 +16,7 @@ imports: - name: github.com/cgrates/osipsdagram version: 3d6beed663452471dec3ca194137a30d379d9e8f - name: github.com/cgrates/rpcclient - version: 75134dcad8238f8bc8b8d61b37aecf89cf8db797 + version: 8bb59b56f8c7bee0d5603914a044a1d64ab8781e - name: github.com/ChrisTrenkamp/goxpath version: 4aad8d0161aae7d17df4755d2c1e86cd1fcaaab6 subpackages: