mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Redesigned NewRPCConnection and getConnWithConfig in order to use custom config over default config
This commit is contained in:
committed by
Dan Christian Bogos
parent
016e913190
commit
aa6951eef8
@@ -105,11 +105,13 @@ func (cM *ConnManager) getConnWithConfig(connID string, connCfg *config.RPCConn,
|
||||
err = fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport)
|
||||
return
|
||||
}
|
||||
// fmt.Println(utils.ToJSON(rpcConnCfg))
|
||||
// fmt.Println(utils.ToJSON(cM.cfg.GeneralCfg()))
|
||||
if conn, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS,
|
||||
cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate,
|
||||
cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts,
|
||||
cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout,
|
||||
cM.cfg.GeneralCfg().ReplyTimeout, codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false, biRPCClient); err != nil {
|
||||
utils.FirstNonEmpty(rpcConnCfg.ClientKey, cM.cfg.TLSCfg().ClientKey), utils.FirstNonEmpty(rpcConnCfg.ClientCertificate, cM.cfg.TLSCfg().ClientCerificate),
|
||||
utils.FirstNonEmpty(rpcConnCfg.CaCertificate, cM.cfg.TLSCfg().CaCertificate), utils.FirstIntNonEmpty(rpcConnCfg.ConnectAttempts, cM.cfg.GeneralCfg().ConnectAttempts),
|
||||
utils.FirstIntNonEmpty(rpcConnCfg.Reconnects, cM.cfg.GeneralCfg().Reconnects), utils.FirstDurationNonEmpty(rpcConnCfg.ConnectTimeout, cM.cfg.GeneralCfg().ConnectTimeout),
|
||||
utils.FirstDurationNonEmpty(rpcConnCfg.ReplyTimeout, cM.cfg.GeneralCfg().ReplyTimeout), codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false, biRPCClient); err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -74,11 +74,12 @@ func NewRPCConnection(cfg *config.RemoteHost, keyPath, certPath, caPath string,
|
||||
}
|
||||
if cfg.Address == rpcclient.InternalRPC ||
|
||||
cfg.Address == rpcclient.BiRPCInternal {
|
||||
client, err = rpcclient.NewRPCClient("", "", cfg.TLS, keyPath, certPath, caPath, connAttempts,
|
||||
reconnects, connectTimeout, replyTimeout, cfg.Address, internalConnChan, lazyConnect, biRPCClient)
|
||||
client, err = rpcclient.NewRPCClient("", "", cfg.TLS, utils.FirstNonEmpty(cfg.ClientKey, keyPath), utils.FirstNonEmpty(cfg.ClientCertificate, certPath), utils.FirstNonEmpty(cfg.CaCertificate, caPath), utils.FirstIntNonEmpty(cfg.ConnectAttempts, connAttempts),
|
||||
utils.FirstIntNonEmpty(cfg.Reconnects, reconnects), utils.FirstDurationNonEmpty(cfg.ConnectTimeout, connectTimeout), utils.FirstDurationNonEmpty(cfg.ReplyTimeout, replyTimeout), cfg.Address, internalConnChan, lazyConnect, biRPCClient)
|
||||
} else {
|
||||
client, err = rpcclient.NewRPCClient(utils.TCP, cfg.Address, cfg.TLS, keyPath, certPath, caPath,
|
||||
connAttempts, reconnects, connectTimeout, replyTimeout,
|
||||
client, err = rpcclient.NewRPCClient(utils.TCP, cfg.Address, cfg.TLS, utils.FirstNonEmpty(cfg.ClientKey, keyPath), utils.FirstNonEmpty(cfg.ClientCertificate, certPath), utils.FirstNonEmpty(cfg.CaCertificate, caPath),
|
||||
utils.FirstIntNonEmpty(cfg.ConnectAttempts, connAttempts),
|
||||
utils.FirstIntNonEmpty(cfg.Reconnects, reconnects), utils.FirstDurationNonEmpty(cfg.ConnectTimeout, connectTimeout), utils.FirstDurationNonEmpty(cfg.ReplyTimeout, replyTimeout),
|
||||
utils.FirstNonEmpty(cfg.Transport, rpcclient.GOBrpc), nil, lazyConnect, biRPCClient)
|
||||
}
|
||||
if connID != utils.EmptyString &&
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -65,6 +66,87 @@ func TestLibengineNewRPCPoolNoAddress(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// For the purpose of this test, we don't need our client to establish a connection
|
||||
// we only want to check if the client loaded with the given config where needed
|
||||
func TestLibengineNewRPCConnection(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
cfg := &config.RemoteHost{
|
||||
ID: "a4f3f",
|
||||
Address: "localhost:6012",
|
||||
Transport: "*json",
|
||||
Synchronous: false,
|
||||
ConnectAttempts: 2,
|
||||
Reconnects: 5,
|
||||
ConnectTimeout: 2 * time.Minute,
|
||||
ReplyTimeout: 3 * time.Minute,
|
||||
TLS: true,
|
||||
ClientKey: "key1",
|
||||
}
|
||||
expectedErr := "dial tcp [::1]:6012: connect: connection refused"
|
||||
cM := NewConnManager(config.NewDefaultCGRConfig(), nil)
|
||||
exp, err := rpcclient.NewRPCClient(utils.TCP, cfg.Address, cfg.TLS, cfg.ClientKey, cM.cfg.TLSCfg().ClientCerificate,
|
||||
cM.cfg.TLSCfg().CaCertificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
cfg.Transport, nil, false, nil)
|
||||
|
||||
if err.Error() != expectedErr {
|
||||
t.Errorf("Expected %v \n but received \n %v", expectedErr, err)
|
||||
}
|
||||
|
||||
conn, err := NewRPCConnection(cfg, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate,
|
||||
cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
|
||||
nil, false, nil, "*localhost", "a4f3f", new(ltcache.Cache))
|
||||
if err.Error() != expectedErr {
|
||||
t.Errorf("Expected %v \n but received \n %v", expectedErr, err)
|
||||
}
|
||||
if !reflect.DeepEqual(exp, conn) {
|
||||
t.Error("Connections don't match")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLibengineNewRPCConnectionInternal(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
Cache = tmp
|
||||
}()
|
||||
|
||||
cfg := &config.RemoteHost{
|
||||
ID: "a4f3f",
|
||||
Address: rpcclient.InternalRPC,
|
||||
Transport: "",
|
||||
Synchronous: false,
|
||||
ConnectAttempts: 2,
|
||||
Reconnects: 5,
|
||||
ConnectTimeout: 2 * time.Minute,
|
||||
ReplyTimeout: 3 * time.Minute,
|
||||
TLS: true,
|
||||
ClientKey: "key1",
|
||||
}
|
||||
cM := NewConnManager(config.NewDefaultCGRConfig(), make(map[string]chan rpcclient.ClientConnector))
|
||||
exp, err := rpcclient.NewRPCClient("", "", cfg.TLS, cfg.ClientKey, cM.cfg.TLSCfg().ClientCerificate,
|
||||
cM.cfg.TLSCfg().ClientCerificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
rpcclient.InternalRPC, cM.rpcInternal["a4f3f"], false, nil)
|
||||
|
||||
// We only want to check if the client loaded with the correct config,
|
||||
// therefore connection is not mandatory
|
||||
if err != rpcclient.ErrInternallyDisconnected {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
conn, err := NewRPCConnection(cfg, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate,
|
||||
cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
|
||||
cM.rpcInternal["a4f3f"], false, nil, "*internal", "a4f3f", new(ltcache.Cache))
|
||||
|
||||
if err != rpcclient.ErrInternallyDisconnected {
|
||||
t.Error(err)
|
||||
}
|
||||
if !reflect.DeepEqual(exp, conn) {
|
||||
t.Error("Connections don't match")
|
||||
}
|
||||
}
|
||||
func TestLibengineNewRPCPoolUnsupportedTransport(t *testing.T) {
|
||||
tmp := Cache
|
||||
defer func() {
|
||||
|
||||
Reference in New Issue
Block a user