From 79619b3d52a5d0f419d2d77e9c5fb41b4cec3abf Mon Sep 17 00:00:00 2001 From: nickolasdaniel Date: Fri, 16 Jul 2021 12:56:21 +0300 Subject: [PATCH] Redesigned NewRPCConnection and getConnWithConfig in order to use custom config over default config --- engine/connmanager.go | 8 +-- engine/libengine.go | 9 ++-- engine/libengine_test.go | 114 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 8 deletions(-) create mode 100644 engine/libengine_test.go diff --git a/engine/connmanager.go b/engine/connmanager.go index ed074a9a2..7fe614392 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -107,10 +107,10 @@ func (cM *ConnManager) getConnWithConfig(ctx *context.Context, connID string, co return } if conn, err = rpcclient.NewRPCParallelClientPool(ctx, utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS, - cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, - cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, - cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, - cM.cfg.GeneralCfg().ReplyTimeout, codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false, ctx.Client); err != nil { + utils.FirstNonEmpty(rpcConnCfg.ClientKey, cM.cfg.TLSCfg().ClientKey), utils.FirstNonEmpty(rpcConnCfg.ClientCertificate, cM.cfg.TLSCfg().ClientCerificate), + utils.FirstNonEmpty(rpcConnCfg.CaCertificate, cM.cfg.TLSCfg().CaCertificate), utils.FirstIntNonEmpty(rpcConnCfg.ConnectAttempts, cM.cfg.GeneralCfg().ConnectAttempts), + utils.FirstIntNonEmpty(rpcConnCfg.Reconnects, cM.cfg.GeneralCfg().Reconnects), utils.FirstDurationNonEmpty(rpcConnCfg.ConnectTimeout, cM.cfg.GeneralCfg().ConnectTimeout), + utils.FirstDurationNonEmpty(rpcConnCfg.ReplyTimeout, cM.cfg.GeneralCfg().ReplyTimeout), codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false, ctx.Client); err != nil { return } } else { diff --git a/engine/libengine.go b/engine/libengine.go index 86edfbeeb..753cd02d2 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -76,11 +76,12 @@ func NewRPCConnection(ctx *context.Context, cfg *config.RemoteHost, keyPath, cer } if cfg.Address == rpcclient.InternalRPC || cfg.Address == rpcclient.BiRPCInternal { - client, err = rpcclient.NewRPCClient(ctx, "", "", cfg.TLS, keyPath, certPath, caPath, connAttempts, - reconnects, connectTimeout, replyTimeout, cfg.Address, internalConnChan, lazyConnect, biRPCClient) + client, err = rpcclient.NewRPCClient(ctx, "", "", cfg.TLS, utils.FirstNonEmpty(cfg.ClientKey, keyPath), utils.FirstNonEmpty(cfg.ClientCertificate, certPath), utils.FirstNonEmpty(cfg.CaCertificate, caPath), utils.FirstIntNonEmpty(cfg.ConnectAttempts, connAttempts), + utils.FirstIntNonEmpty(cfg.Reconnects, reconnects), utils.FirstDurationNonEmpty(cfg.ConnectTimeout, connectTimeout), utils.FirstDurationNonEmpty(cfg.ReplyTimeout, replyTimeout), cfg.Address, internalConnChan, lazyConnect, biRPCClient) } else { - client, err = rpcclient.NewRPCClient(ctx, utils.TCP, cfg.Address, cfg.TLS, keyPath, certPath, caPath, - connAttempts, reconnects, connectTimeout, replyTimeout, + client, err = rpcclient.NewRPCClient(ctx, utils.TCP, cfg.Address, cfg.TLS, utils.FirstNonEmpty(cfg.ClientKey, keyPath), utils.FirstNonEmpty(cfg.ClientCertificate, certPath), utils.FirstNonEmpty(cfg.CaCertificate, caPath), + utils.FirstIntNonEmpty(cfg.ConnectAttempts, connAttempts), + utils.FirstIntNonEmpty(cfg.Reconnects, reconnects), utils.FirstDurationNonEmpty(cfg.ConnectTimeout, connectTimeout), utils.FirstDurationNonEmpty(cfg.ReplyTimeout, replyTimeout), utils.FirstNonEmpty(cfg.Transport, rpcclient.GOBrpc), nil, lazyConnect, biRPCClient) } if connID != utils.EmptyString && diff --git a/engine/libengine_test.go b/engine/libengine_test.go new file mode 100644 index 000000000..5810dbebc --- /dev/null +++ b/engine/libengine_test.go @@ -0,0 +1,114 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "reflect" + "testing" + "time" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" + "github.com/cgrates/rpcclient" +) + +// For the purpose of this test, we don't need our client to establish a connection +// we only want to check if the client loaded with the given config where needed +func TestLibengineNewRPCConnection(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + cfg := &config.RemoteHost{ + ID: "a4f3f", + Address: "localhost:6012", + Transport: "*json", + Synchronous: false, + ConnectAttempts: 2, + Reconnects: 5, + ConnectTimeout: 2 * time.Minute, + ReplyTimeout: 3 * time.Minute, + TLS: true, + ClientKey: "key1", + } + expectedErr := "dial tcp [::1]:6012: connect: connection refused" + cM := NewConnManager(config.NewDefaultCGRConfig(), nil) + exp, err := rpcclient.NewRPCClient(context.Background(), utils.TCP, cfg.Address, cfg.TLS, cfg.ClientKey, cM.cfg.TLSCfg().ClientCerificate, + cM.cfg.TLSCfg().CaCertificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.Transport, nil, false, nil) + + if err.Error() != expectedErr { + t.Errorf("Expected %v \n but received \n %v", expectedErr, err) + } + + conn, err := NewRPCConnection(context.Background(), cfg, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, + cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, + nil, false, nil, "*localhost", "a4f3f", new(ltcache.Cache)) + if err.Error() != expectedErr { + t.Errorf("Expected %v \n but received \n %v", expectedErr, err) + } + if !reflect.DeepEqual(exp, conn) { + t.Error("Connections don't match") + } +} + +func TestLibengineNewRPCConnectionInternal(t *testing.T) { + tmp := Cache + defer func() { + Cache = tmp + }() + + cfg := &config.RemoteHost{ + ID: "a4f3f", + Address: rpcclient.InternalRPC, + Transport: "", + Synchronous: false, + ConnectAttempts: 2, + Reconnects: 5, + ConnectTimeout: 2 * time.Minute, + ReplyTimeout: 3 * time.Minute, + TLS: true, + ClientKey: "key1", + } + cM := NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan birpc.ClientConnector)) + exp, err := rpcclient.NewRPCClient(context.Background(), "", "", cfg.TLS, cfg.ClientKey, cM.cfg.TLSCfg().ClientCerificate, + cM.cfg.TLSCfg().ClientCerificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + rpcclient.InternalRPC, cM.rpcInternal["a4f3f"], false, nil) + + // We only want to check if the client loaded with the correct config, + // therefore connection is not mandatory + if err != rpcclient.ErrInternallyDisconnected { + t.Error(err) + } + + conn, err := NewRPCConnection(context.Background(), cfg, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, + cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, + cM.rpcInternal["a4f3f"], false, nil, "*internal", "a4f3f", new(ltcache.Cache)) + + if err != rpcclient.ErrInternallyDisconnected { + t.Error(err) + } + if !reflect.DeepEqual(exp, conn) { + t.Error("Connections don't match") + } +}