From f14a7046913e260c8f9fcd2ab981341f6283345f Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 28 May 2020 12:06:12 +0300 Subject: [PATCH] Add lazy connect when get dispatcher host from database --- dispatchers/attributes_it_test.go | 44 +++++++++++++++++++++++++++++++ engine/datamanager.go | 4 +-- utils/errors.go | 9 ++++--- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/dispatchers/attributes_it_test.go b/dispatchers/attributes_it_test.go index b9da25a96..cd5358717 100755 --- a/dispatchers/attributes_it_test.go +++ b/dispatchers/attributes_it_test.go @@ -31,6 +31,7 @@ import ( var sTestsDspAttr = []func(t *testing.T){ testDspAttrPingFailover, + testDspAttrPingFailover2, testDspAttrGetAttrFailover, testDspAttrGetAttrRoundRobin, @@ -132,6 +133,49 @@ func testDspAttrPingFailover(t *testing.T) { } } +func testDspAttrPingFailover2(t *testing.T) { + var reply string + if err := allEngine.RPC.Call(utils.AttributeSv1Ping, new(utils.CGREvent), &reply); err != nil { + t.Error(err) + } else if reply != utils.Pong { + t.Errorf("Received: %s", reply) + } + reply = "" + if err := allEngine2.RPC.Call(utils.AttributeSv1Ping, new(utils.CGREvent), &reply); err != nil { + t.Error(err) + } else if reply != utils.Pong { + t.Errorf("Received: %s", reply) + } + reply = "" + ev := utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + }, + ArgDispatcher: &utils.ArgDispatcher{ + APIKey: utils.StringPointer("attr12345"), + }, + } + allEngine.stopEngine(t) // stop the engine and the call should go to the second engine + if err := dispEngine.RPC.Call(utils.AttributeSv1Ping, &ev, &reply); err != nil { + t.Error(err) + } else if reply != utils.Pong { + t.Errorf("Received: %s", reply) + } + allEngine2.stopEngine(t) + reply = "" + if err := dispEngine.RPC.Call(utils.AttributeSv1Ping, &ev, &reply); err == nil { + t.Errorf("Expected error but recived %v and reply %v\n", err, reply) + } + allEngine.startEngine(t) + allEngine2.startEngine(t) + reply = "" + if err := dispEngine.RPC.Call(utils.AttributeSv1Ping, &ev, &reply); err != nil { + t.Error(err) + } else if reply != utils.Pong { + t.Errorf("Received: %s", reply) + } +} + func testDspAttrGetAttrFailover(t *testing.T) { args := &engine.AttrArgsProcessEvent{ Context: utils.StringPointer("simpleauth"), diff --git a/engine/datamanager.go b/engine/datamanager.go index 5e3123d19..3e00485b4 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -2172,13 +2172,13 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit } if cacheWrite { cfg := config.CgrConfig() - if dH.rpcConn, err = NewRPCPool( + if dH.rpcConn, err = NewRPCPool( // send it wil lazy connect on true and try to connect only when the call is make rpcclient.PoolFirst, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - dH.Conns, IntRPC.GetInternalChanel(), false); err != nil { + dH.Conns, IntRPC.GetInternalChanel(), true); err != nil { return nil, err } Cache.Set(utils.CacheDispatcherHosts, tntID, dH, nil, diff --git a/utils/errors.go b/utils/errors.go index 6c7fee4eb..7b00756be 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -245,8 +245,10 @@ func IsNetworkError(err error) bool { return false } if operr, ok := err.(*net.OpError); ok && - strings.HasSuffix(operr.Err.Error(), - syscall.ECONNRESET.Error()) { // connection reset + (strings.HasSuffix(operr.Err.Error(), + syscall.ECONNRESET.Error()) || + (strings.HasSuffix(operr.Err.Error(), + syscall.ECONNREFUSED.Error()))) { // connection reset return true } return err.Error() == rpc.ErrShutdown.Error() || @@ -254,7 +256,8 @@ func IsNetworkError(err error) bool { err.Error() == ErrDisconnected.Error() || err.Error() == ErrReplyTimeout.Error() || err.Error() == ErrSessionNotFound.Error() || - strings.HasPrefix(err.Error(), "rpc: can't find service") + strings.HasPrefix(err.Error(), "rpc: can't find service") || + strings.HasSuffix(err.Error(), "no such host") } func ErrPathNotReachable(path string) error {