diff --git a/actions/accounts_test.go b/actions/accounts_test.go index e8f2080eb..24581ac4a 100644 --- a/actions/accounts_test.go +++ b/actions/accounts_test.go @@ -74,10 +74,12 @@ func TestACExecuteAccountsSetBalance(t *testing.T) { //invalid to parse a value from diktats actCdrLG.aCfg.Diktats[0].Value = "10" + ctx, cancel := context.WithTimeout(context.Background(), 10) expected = context.DeadlineExceeded.Error() - if err := actCdrLG.execute(context.Background(), dataStorage, utils.MetaBalanceLimit); err == nil || err.Error() != expected { + if err := actCdrLG.execute(ctx, dataStorage, utils.MetaBalanceLimit); err == nil || err.Error() != expected { t.Errorf("Expected %+v, received %+v", expected, err) } + cancel() } func TestACExecuteAccountsRemBalance(t *testing.T) { @@ -110,10 +112,12 @@ func TestACExecuteAccountsRemBalance(t *testing.T) { } actRemBal.config.ActionSCfg().AccountSConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts)} + ctx, cancel := context.WithTimeout(context.Background(), 10) expected = context.DeadlineExceeded.Error() - if err := actRemBal.execute(context.Background(), nil, utils.MetaRemBalance); err == nil || err.Error() != expected { + if err := actRemBal.execute(ctx, nil, utils.MetaRemBalance); err == nil || err.Error() != expected { t.Errorf("Expected %+v, received %+v", expected, err) } + cancel() } func TestACExecuteAccountsParseError(t *testing.T) { diff --git a/cmd/cgr-console/cgr-console.go b/cmd/cgr-console/cgr-console.go index bd45a8a0a..72a6de3aa 100644 --- a/cmd/cgr-console/cgr-console.go +++ b/cmd/cgr-console/cgr-console.go @@ -131,7 +131,7 @@ func main() { } var err error - client, err = rpcclient.NewRPCClient(utils.TCP, *server, *tls, *keyPath, *certificatePath, *caPath, 3, 3, + client, err = rpcclient.NewRPCClient(context.Background(), utils.TCP, *server, *tls, *keyPath, *certificatePath, *caPath, 3, 3, time.Second, time.Duration(*replyTimeOut)*time.Second, *rpcEncoding, nil, false, nil) if err != nil { cgrConsoleFlags.PrintDefaults() diff --git a/engine/connmanager.go b/engine/connmanager.go index b6fd1eb75..32492589c 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -106,7 +106,7 @@ func (cM *ConnManager) getConnWithConfig(ctx *context.Context, connID string, co err = fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) return } - if conn, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS, + if conn, err = rpcclient.NewRPCParallelClientPool(ctx, utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, @@ -114,7 +114,7 @@ func (cM *ConnManager) getConnWithConfig(ctx *context.Context, connID string, co return } } else { - if conn, err = NewRPCPool(connCfg.Strategy, + if conn, err = NewRPCPool(ctx, connCfg.Strategy, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, diff --git a/engine/dispatcherprfl.go b/engine/dispatcherprfl.go index 288aa6bf2..881ecf331 100644 --- a/engine/dispatcherprfl.go +++ b/engine/dispatcherprfl.go @@ -153,7 +153,7 @@ func (dH *DispatcherHost) Call(ctx *context.Context, serviceMethod string, args if dH.rpcConn == nil { // connect the rpcConn cfg := config.CgrConfig() - if dH.rpcConn, err = NewRPCConnection(dH.RemoteHost, + if dH.rpcConn, err = NewRPCConnection(ctx, dH.RemoteHost, cfg.TLSCfg().ClientKey, cfg.TLSCfg().ClientCerificate, cfg.TLSCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, diff --git a/engine/libengine.go b/engine/libengine.go index 68fbba148..99b2ec368 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -32,7 +32,7 @@ import ( ) // NewRPCPool returns a new pool of connection with the given configuration -func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connAttempts, reconnects int, +func NewRPCPool(ctx *context.Context, dispatchStrategy string, keyPath, certPath, caPath string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.RemoteHost, internalConnChan chan birpc.ClientConnector, lazyConnect bool, biRPCClient interface{}, poolID string, connCache *ltcache.Cache) (rpcPool *rpcclient.RPCPool, err error) { @@ -46,7 +46,7 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA err = rpcclient.ErrDisconnected continue } - if rpcClient, err = NewRPCConnection(rpcConnCfg, keyPath, certPath, caPath, connAttempts, reconnects, + if rpcClient, err = NewRPCConnection(ctx, rpcConnCfg, keyPath, certPath, caPath, connAttempts, reconnects, connectTimeout, replyTimeout, internalConnChan, lazyConnect, biRPCClient, poolID, rpcConnCfg.ID, connCache); err == rpcclient.ErrUnsupportedCodec { return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) @@ -64,7 +64,7 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA // NewRPCConnection creates a new connection based on the RemoteHost structure // connCache is used to cache the connection with ID -func NewRPCConnection(cfg *config.RemoteHost, keyPath, certPath, caPath string, connAttempts, reconnects int, +func NewRPCConnection(ctx *context.Context, cfg *config.RemoteHost, keyPath, certPath, caPath string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration, internalConnChan chan birpc.ClientConnector, lazyConnect bool, biRPCClient interface{}, poolID, connID string, connCache *ltcache.Cache) (client birpc.ClientConnector, err error) { var id string @@ -76,10 +76,10 @@ func NewRPCConnection(cfg *config.RemoteHost, keyPath, certPath, caPath string, } if cfg.Address == rpcclient.InternalRPC || cfg.Address == rpcclient.BiRPCInternal { - client, err = rpcclient.NewRPCClient("", "", cfg.TLS, keyPath, certPath, caPath, connAttempts, + client, err = rpcclient.NewRPCClient(ctx, "", "", cfg.TLS, keyPath, certPath, caPath, connAttempts, reconnects, connectTimeout, replyTimeout, cfg.Address, internalConnChan, lazyConnect, biRPCClient) } else { - client, err = rpcclient.NewRPCClient(utils.TCP, cfg.Address, cfg.TLS, keyPath, certPath, caPath, + client, err = rpcclient.NewRPCClient(ctx, utils.TCP, cfg.Address, cfg.TLS, keyPath, certPath, caPath, connAttempts, reconnects, connectTimeout, replyTimeout, utils.FirstNonEmpty(cfg.Transport, rpcclient.GOBrpc), nil, lazyConnect, biRPCClient) } @@ -103,7 +103,7 @@ type RPCClientSet map[string]*rpcclient.RPCClient // AddInternalRPCClient creates and adds to the set a new rpc client using the provided configuration func (s RPCClientSet) AddInternalRPCClient(name string, connChan chan birpc.ClientConnector) { - rpc, err := rpcclient.NewRPCClient(utils.EmptyString, utils.EmptyString, false, + rpc, err := rpcclient.NewRPCClient(context.Background(), utils.EmptyString, utils.EmptyString, false, utils.EmptyString, utils.EmptyString, utils.EmptyString, config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects, config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout, diff --git a/go.mod b/go.mod index b36f26409..06736e43a 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/cgrates/kamevapi v0.0.0-20191001125829-7dbc3ad58817 github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca github.com/cgrates/radigo v0.0.0-20201113143731-162035428d72 - github.com/cgrates/rpcclient v0.0.0-20210413084509-dc66fe9852ca + github.com/cgrates/rpcclient v0.0.0-20210419061328-2c8f9a923648 github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60 github.com/creack/pty v1.1.11 diff --git a/go.sum b/go.sum index 2920cae23..d14d28343 100644 --- a/go.sum +++ b/go.sum @@ -97,8 +97,8 @@ github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca h1:Ejj4m0Ccl8dMMVn github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca/go.mod h1:q7c996DUu8OrJRnewVSQzM+y/bRcxZAHoo+zCD8bFBo= github.com/cgrates/radigo v0.0.0-20201113143731-162035428d72 h1:cTAWQEbab3gKkDSeaxkTaoiP/cNFx+7/kC96wYckk3g= github.com/cgrates/radigo v0.0.0-20201113143731-162035428d72/go.mod h1:3IDSbfIqU5VsYKjrwa3HhuAK1jlI65wa1coHetoaN20= -github.com/cgrates/rpcclient v0.0.0-20210413084509-dc66fe9852ca h1:ucFZvI96qqO9ukR2EF1lyNEl8t8I1GNaxs/zjIAAkZA= -github.com/cgrates/rpcclient v0.0.0-20210413084509-dc66fe9852ca/go.mod h1:5LEYQt4uXkY4TeYsmAm/2gzAK08igOAblyUPTSh+k3Q= +github.com/cgrates/rpcclient v0.0.0-20210419061328-2c8f9a923648 h1:CuAVnDD/QnHXplsgZmaktdS+zlTLG8beSkqde0PZ6eo= +github.com/cgrates/rpcclient v0.0.0-20210419061328-2c8f9a923648/go.mod h1:5LEYQt4uXkY4TeYsmAm/2gzAK08igOAblyUPTSh+k3Q= github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e h1:izFjZB83/XRXInc+gMIssUxdbleGsGIuGCPj2u7RQo0= github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e/go.mod h1:0f2+3dq5Iiv3VlcuY83VPJ0QzqRlzDG1Cr8okogQE3g= github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60 h1:TQDg+HGB17LU8FitLiLvYazYSy62GQ1lO3lGKI3xUrU=