diff --git a/engine/connmanager.go b/engine/connmanager.go index d2ab7db08..101f23fea 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -105,11 +105,13 @@ func (cM *ConnManager) getConnWithConfig(connID string, connCfg *config.RPCConn, err = fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) return } + // fmt.Println(utils.ToJSON(rpcConnCfg)) + // fmt.Println(utils.ToJSON(cM.cfg.GeneralCfg())) if conn, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS, - cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, - cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, - cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, - cM.cfg.GeneralCfg().ReplyTimeout, codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false, biRPCClient); err != nil { + utils.FirstNonEmpty(rpcConnCfg.ClientKey, cM.cfg.TLSCfg().ClientKey), utils.FirstNonEmpty(rpcConnCfg.ClientCertificate, cM.cfg.TLSCfg().ClientCerificate), + utils.FirstNonEmpty(rpcConnCfg.CaCertificate, cM.cfg.TLSCfg().CaCertificate), utils.FirstIntNonEmpty(rpcConnCfg.ConnectAttempts, cM.cfg.GeneralCfg().ConnectAttempts), + utils.FirstIntNonEmpty(rpcConnCfg.Reconnects, cM.cfg.GeneralCfg().Reconnects), utils.FirstDurationNonEmpty(rpcConnCfg.ConnectTimeout, cM.cfg.GeneralCfg().ConnectTimeout), + utils.FirstDurationNonEmpty(rpcConnCfg.ReplyTimeout, cM.cfg.GeneralCfg().ReplyTimeout), codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false, biRPCClient); err != nil { return } } else { diff --git a/engine/libengine.go b/engine/libengine.go index c4f8adea0..a61a10818 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -74,11 +74,12 @@ func NewRPCConnection(cfg *config.RemoteHost, keyPath, certPath, caPath string, } if cfg.Address == rpcclient.InternalRPC || cfg.Address == rpcclient.BiRPCInternal { - client, err = rpcclient.NewRPCClient("", "", cfg.TLS, keyPath, certPath, caPath, connAttempts, - reconnects, connectTimeout, replyTimeout, cfg.Address, internalConnChan, lazyConnect, biRPCClient) + client, err = rpcclient.NewRPCClient("", "", cfg.TLS, utils.FirstNonEmpty(cfg.ClientKey, keyPath), utils.FirstNonEmpty(cfg.ClientCertificate, certPath), utils.FirstNonEmpty(cfg.CaCertificate, caPath), utils.FirstIntNonEmpty(cfg.ConnectAttempts, connAttempts), + utils.FirstIntNonEmpty(cfg.Reconnects, reconnects), utils.FirstDurationNonEmpty(cfg.ConnectTimeout, connectTimeout), utils.FirstDurationNonEmpty(cfg.ReplyTimeout, replyTimeout), cfg.Address, internalConnChan, lazyConnect, biRPCClient) } else { - client, err = rpcclient.NewRPCClient(utils.TCP, cfg.Address, cfg.TLS, keyPath, certPath, caPath, - connAttempts, reconnects, connectTimeout, replyTimeout, + client, err = rpcclient.NewRPCClient(utils.TCP, cfg.Address, cfg.TLS, utils.FirstNonEmpty(cfg.ClientKey, keyPath), utils.FirstNonEmpty(cfg.ClientCertificate, certPath), utils.FirstNonEmpty(cfg.CaCertificate, caPath), + utils.FirstIntNonEmpty(cfg.ConnectAttempts, connAttempts), + utils.FirstIntNonEmpty(cfg.Reconnects, reconnects), utils.FirstDurationNonEmpty(cfg.ConnectTimeout, connectTimeout), utils.FirstDurationNonEmpty(cfg.ReplyTimeout, replyTimeout), utils.FirstNonEmpty(cfg.Transport, rpcclient.GOBrpc), nil, lazyConnect, biRPCClient) } if connID != utils.EmptyString && diff --git a/engine/libengine_test.go b/engine/libengine_test.go index 32d41f6c5..7c951243e 100644 --- a/engine/libengine_test.go +++ b/engine/libengine_test.go @@ -25,6 +25,7 @@ import ( "os" "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -65,6 +66,87 @@ func TestLibengineNewRPCPoolNoAddress(t *testing.T) { } } +// For the purpose of this test, we don't need our client to establish a connection +// we only want to check if the client loaded with the given config where needed +func TestLibengineNewRPCConnection(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + cfg := &config.RemoteHost{ + ID: "a4f3f", + Address: "localhost:6012", + Transport: "*json", + Synchronous: false, + ConnectAttempts: 2, + Reconnects: 5, + ConnectTimeout: 2 * time.Minute, + ReplyTimeout: 3 * time.Minute, + TLS: true, + ClientKey: "key1", + } + expectedErr := "dial tcp [::1]:6012: connect: connection refused" + cM := NewConnManager(config.NewDefaultCGRConfig(), nil) + exp, err := rpcclient.NewRPCClient(utils.TCP, cfg.Address, cfg.TLS, cfg.ClientKey, cM.cfg.TLSCfg().ClientCerificate, + cM.cfg.TLSCfg().CaCertificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.Transport, nil, false, nil) + + if err.Error() != expectedErr { + t.Errorf("Expected %v \n but received \n %v", expectedErr, err) + } + + conn, err := NewRPCConnection(cfg, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, + cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, + nil, false, nil, "*localhost", "a4f3f", new(ltcache.Cache)) + if err.Error() != expectedErr { + t.Errorf("Expected %v \n but received \n %v", expectedErr, err) + } + if !reflect.DeepEqual(exp, conn) { + t.Error("Connections don't match") + } +} + +func TestLibengineNewRPCConnectionInternal(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + cfg := &config.RemoteHost{ + ID: "a4f3f", + Address: rpcclient.InternalRPC, + Transport: "", + Synchronous: false, + ConnectAttempts: 2, + Reconnects: 5, + ConnectTimeout: 2 * time.Minute, + ReplyTimeout: 3 * time.Minute, + TLS: true, + ClientKey: "key1", + } + cM := NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan rpcclient.ClientConnector)) + exp, err := rpcclient.NewRPCClient("", "", cfg.TLS, cfg.ClientKey, cM.cfg.TLSCfg().ClientCerificate, + cM.cfg.TLSCfg().ClientCerificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + rpcclient.InternalRPC, cM.rpcInternal["a4f3f"], false, nil) + + // We only want to check if the client loaded with the correct config, + // therefore connection is not mandatory + if err != rpcclient.ErrInternallyDisconnected { + t.Error(err) + } + + conn, err := NewRPCConnection(cfg, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, + cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, + cM.rpcInternal["a4f3f"], false, nil, "*internal", "a4f3f", new(ltcache.Cache)) + + if err != rpcclient.ErrInternallyDisconnected { + t.Error(err) + } + if !reflect.DeepEqual(exp, conn) { + t.Error("Connections don't match") + } +} func TestLibengineNewRPCPoolUnsupportedTransport(t *testing.T) { tmp := Cache defer func() {