RPCClient with lazyConnect

This commit is contained in:
DanB
2016-11-08 16:03:30 +01:00
parent d2afaba0a8
commit 37c853cb91
10 changed files with 27 additions and 16 deletions

View File

@@ -109,7 +109,8 @@ func main() {
return
}
var err error
client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), *rpc_encoding, nil)
client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3,
time.Duration(1*time.Second), time.Duration(5*time.Minute), *rpc_encoding, nil, false)
if err != nil {
flag.PrintDefaults()
log.Fatal("Could not connect to server " + *server)

View File

@@ -153,7 +153,7 @@ func startSmGeneric(internalSMGChan chan *sessionmanager.SMGeneric, internalRate
smgReplConns := make([]*sessionmanager.SMGReplicationConn, len(cfg.SmGenericConfig.SMGReplicationConns))
for i, replConnCfg := range cfg.SmGenericConfig.SMGReplicationConns {
if replCon, err := rpcclient.NewRpcClient("tcp", replConnCfg.Address, cfg.ConnectAttempts, cfg.Reconnects,
cfg.ConnectTimeout, cfg.ReplyTimeout, replConnCfg.Transport[1:], nil); err != nil {
cfg.ConnectTimeout, cfg.ReplyTimeout, replConnCfg.Transport[1:], nil, true); err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to SMGReplicationConn: <%s>, error: <%s>", replConnCfg.Address, err.Error()))
exitChan <- true
return

View File

@@ -299,7 +299,8 @@ func main() {
return
}
if *historyServer != "" { // Init scribeAgent so we can store the differences
if scribeAgent, err := rpcclient.NewRpcClient("tcp", *historyServer, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), utils.GOB, nil); err != nil {
if scribeAgent, err := rpcclient.NewRpcClient("tcp", *historyServer, 3, 3,
time.Duration(1*time.Second), time.Duration(5*time.Minute), utils.GOB, nil, false); err != nil {
log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error())
return
} else {

View File

@@ -42,7 +42,8 @@ func main() {
if cdrsMasterCfg, err = config.NewCGRConfigFromFolder(cdrsMasterCfgPath); err != nil {
log.Fatal("Got config error: ", err.Error())
}
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil)
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false)
if err != nil {
log.Fatal("Could not connect to rater: ", err.Error())
}

View File

@@ -698,7 +698,8 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti
}
var client rpcclient.RpcClientConnection
if req.Address != utils.MetaInternal {
if client, err = rpcclient.NewRpcClient("tcp", req.Address, req.Attempts, 0, config.CgrConfig().ConnectTimeout, config.CgrConfig().ReplyTimeout, req.Transport, nil); err != nil {
if client, err = rpcclient.NewRpcClient("tcp", req.Address, req.Attempts, 0,
config.CgrConfig().ConnectTimeout, config.CgrConfig().ReplyTimeout, req.Transport, nil, false); err != nil {
return err
}
} else {

View File

@@ -42,13 +42,13 @@ func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTi
case <-time.After(ttl):
return nil, errors.New("TTL triggered")
}
rpcClient, err = rpcclient.NewRpcClient("", "", connAttempts, reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn)
rpcClient, err = rpcclient.NewRpcClient("", "", connAttempts, reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, false)
} else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) {
codec := utils.GOB
if rpcConnCfg.Transport != "" {
codec = rpcConnCfg.Transport[1:] // Transport contains always * before codec understood by rpcclient
}
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil)
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil, false)
} else {
return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport)
}

View File

@@ -231,7 +231,7 @@ type ProxyPubSub struct {
}
func NewProxyPubSub(addr string, attempts, reconnects int, connectTimeout, replyTimeout time.Duration) (*ProxyPubSub, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, connectTimeout, replyTimeout, utils.GOB, nil)
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, connectTimeout, replyTimeout, utils.GOB, nil, false)
if err != nil {
return nil, err
}

View File

@@ -96,7 +96,8 @@ func TestCdrsHttpCdrReplication(t *testing.T) {
if !*testIntegration {
return
}
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil)
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false)
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
@@ -113,13 +114,15 @@ func TestCdrsHttpCdrReplication(t *testing.T) {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil)
cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil, false)
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
// ToDo: Fix cdr_http to be compatible with rest of processCdr methods
var rcvedCdrs []*engine.ExternalCDR
if err := cdrsSlaveRpc.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{CGRIDs: []string{testCdr1.CGRID}, RunIDs: []string{utils.META_DEFAULT}}, &rcvedCdrs); err != nil {
if err := cdrsSlaveRpc.Call("ApierV2.GetCdrs",
utils.RPCCDRsFilter{CGRIDs: []string{testCdr1.CGRID}, RunIDs: []string{utils.META_DEFAULT}}, &rcvedCdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(rcvedCdrs) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(rcvedCdrs))

View File

@@ -76,12 +76,14 @@ func TestRPCITRpcConnPoolFirst(t *testing.T) {
return
}
rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0)
rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil)
rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 3, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false)
if err == nil {
t.Fatal("Should receive cannot connect error here")
}
rpcPoolFirst.AddClient(rpcRAL1)
rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil)
rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, 3, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false)
if err != nil {
t.Fatal(err)
}
@@ -218,12 +220,14 @@ func TestRPCITRmtRpcConnPool(t *testing.T) {
return
}
rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0)
rpcRALRmt, err := rpcclient.NewRpcClient("tcp", "172.16.254.83:2012", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil)
rpcRALRmt, err := rpcclient.NewRpcClient("tcp", "172.16.254.83:2012", 1, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false)
if err != nil {
t.Fatal(err)
}
rpcPoolFirst.AddClient(rpcRALRmt)
rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil)
rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 1, 1,
time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil, false)
if err != nil {
t.Fatal(err)
}

2
glide.lock generated
View File

@@ -16,7 +16,7 @@ imports:
- name: github.com/cgrates/osipsdagram
version: 3d6beed663452471dec3ca194137a30d379d9e8f
- name: github.com/cgrates/rpcclient
version: 75134dcad8238f8bc8b8d61b37aecf89cf8db797
version: 8bb59b56f8c7bee0d5603914a044a1d64ab8781e
- name: github.com/ChrisTrenkamp/goxpath
version: 4aad8d0161aae7d17df4755d2c1e86cd1fcaaab6
subpackages: