From 3593bb95575dacc9bead18dcc1044290681e6f08 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 16 Feb 2021 17:17:57 +0200 Subject: [PATCH] Moved BiRPC support in rpcclient library --- cmd/cgr-console/cgr-console.go | 3 +- cmd/cgr-tester/cdr_repl/process_cdr.go | 2 +- engine/action.go | 2 +- engine/connmanager.go | 76 +++++++++++++------------- engine/dispatcherprfl.go | 2 +- engine/libengine.go | 17 +++--- general_tests/cdrs_onlexp_it_test.go | 4 +- general_tests/rpcclient_it_test.go | 8 +-- general_tests/tls_it_test.go | 6 +- go.mod | 4 +- go.sum | 2 + utils/birpcint_client.go | 32 ----------- utils/birpcint_client_test.go | 32 ----------- 13 files changed, 65 insertions(+), 125 deletions(-) diff --git a/cmd/cgr-console/cgr-console.go b/cmd/cgr-console/cgr-console.go index 418aa1a9b..4287728c9 100644 --- a/cmd/cgr-console/cgr-console.go +++ b/cmd/cgr-console/cgr-console.go @@ -129,8 +129,9 @@ func main() { return } var err error + client, err = rpcclient.NewRPCClient(utils.TCP, *server, *tls, *keyPath, *certificatePath, *caPath, 3, 3, - time.Second, time.Duration(*replyTimeOut)*time.Second, *rpcEncoding, nil, false) + time.Second, time.Duration(*replyTimeOut)*time.Second, *rpcEncoding, nil, false, nil) if err != nil { cgrConsoleFlags.PrintDefaults() log.Fatal("Could not connect to server " + *server) diff --git a/cmd/cgr-tester/cdr_repl/process_cdr.go b/cmd/cgr-tester/cdr_repl/process_cdr.go index 44bc25cf1..735064d5f 100644 --- a/cmd/cgr-tester/cdr_repl/process_cdr.go +++ b/cmd/cgr-tester/cdr_repl/process_cdr.go @@ -44,7 +44,7 @@ func main() { log.Fatal("Got config error: ", err.Error()) } cdrsMasterRpc, err = rpcclient.NewRPCClient(utils.TCP, cdrsMasterCfg.ListenCfg().RPCJSONListen, false, "", "", "", 1, 1, - time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false) + time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil) if err != nil { log.Fatal("Could not connect to rater: ", err.Error()) } diff --git a/engine/action.go b/engine/action.go index 8f7f31c82..9dcd0e2b3 100644 --- a/engine/action.go +++ b/engine/action.go @@ -704,7 +704,7 @@ func cgrRPCAction(ub *Account, a *Action, acs Actions, extraData interface{}) (e } else if client, err = rpcclient.NewRPCClient(utils.TCP, req.Address, false, "", "", "", req.Attempts, 0, config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout, req.Transport, - nil, false); err != nil { + nil, false, nil); err != nil { return } in, out := params.InParam, params.OutParam diff --git a/engine/connmanager.go b/engine/connmanager.go index e5046f754..63d6b3f54 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -41,7 +41,7 @@ type ConnManager struct { // getConn is used to retrieve a connection from cache // in case this doesn't exist create it and cache it -func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnector) (conn rpcclient.ClientConnector, err error) { +func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.BiRPCConector) (conn rpcclient.ClientConnector, err error) { //try to get the connection from cache if x, ok := Cache.Get(utils.CacheRPCConnections, connID); ok { if x == nil { @@ -51,13 +51,9 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec } // in case we don't find in cache create the connection and add this in cache var intChan chan rpcclient.ClientConnector - var connCfg *config.RPCConn - isBiRPCCLient := false - if internalChan, has := cM.rpcInternal[connID]; has { - connCfg = cM.cfg.RPCConns()[utils.MetaInternal] - intChan = internalChan - isBiRPCCLient = true - } else { + var isInternalRPC bool + connCfg := cM.cfg.RPCConns()[utils.MetaInternal] + if intChan, isInternalRPC = cM.rpcInternal[connID]; !isInternalRPC { connCfg = cM.cfg.RPCConns()[connID] for _, rpcConn := range connCfg.Conns { if rpcConn.Address == utils.MetaInternal { @@ -67,12 +63,15 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec } } switch { - case biRPCClient != nil && isBiRPCCLient: // special handling for SessionS BiJSONRPCClient + case biRPCClient != nil && isInternalRPC: // special handling for SessionS BiJSONRPCClient + if conn, err = rpcclient.NewRPCClient(utils.EmptyString, utils.EmptyString, false, + utils.EmptyString, utils.EmptyString, utils.EmptyString, + cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, + cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, + rpcclient.BiRPCInternal, intChan, false, biRPCClient); err != nil { + return + } var rply string - sSIntConn := <-intChan - intChan <- sSIntConn - conn = utils.NewBiRPCInternalClient(sSIntConn.(utils.BiRPCServer)) - conn.(*utils.BiRPCInternalClient).SetClientConn(biRPCClient) if err = conn.Call(utils.SessionSv1RegisterInternalBiJSONConn, utils.EmptyString, &rply); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not register biRPCClient, error: <%s>", @@ -81,41 +80,40 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec } case connCfg.Strategy == rpcclient.PoolParallel: rpcConnCfg := connCfg.Conns[0] // for parrallel we need only the first connection - var conPool *rpcclient.RPCParallelClientPool - if rpcConnCfg.Address == utils.MetaInternal { - conPool, err = rpcclient.NewRPCParallelClientPool("", "", rpcConnCfg.TLS, - cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, - cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, - cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, - cM.cfg.GeneralCfg().ReplyTimeout, rpcclient.InternalRPC, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false) - } else if utils.SliceHasMember([]string{utils.EmptyString, utils.MetaGOB, utils.MetaJSON}, rpcConnCfg.Transport) { - codec := rpcclient.GOBrpc - if rpcConnCfg.Transport != "" { - codec = rpcConnCfg.Transport - } - conPool, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS, - cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, - cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, - cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, - cM.cfg.GeneralCfg().ReplyTimeout, codec, nil, int64(cM.cfg.GeneralCfg().MaxParallelConns), false) - } else { + codec := rpcclient.GOBrpc + switch { + case rpcConnCfg.Address == rpcclient.InternalRPC: + codec = rpcclient.InternalRPC + case rpcConnCfg.Address == rpcclient.BiRPCInternal: + codec = rpcclient.BiRPCInternal + case rpcConnCfg.Transport == utils.EmptyString: + intChan = nil + case rpcConnCfg.Transport == rpcclient.GOBrpc, + rpcConnCfg.Transport == rpcclient.JSONrpc, + rpcConnCfg.Transport == rpcclient.BiRPCGOB, + rpcConnCfg.Transport == rpcclient.BiRPCJSON: + codec = rpcConnCfg.Transport + intChan = nil + default: err = fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) - } - if err != nil { return } - conn = conPool + if conn, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS, + cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, + cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, + cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, + cM.cfg.GeneralCfg().ReplyTimeout, codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false, biRPCClient); err != nil { + return + } default: - var conPool *rpcclient.RPCPool - if conPool, err = NewRPCPool(connCfg.Strategy, + if conn, err = NewRPCPool(connCfg.Strategy, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, - connCfg.Conns, intChan, false); err != nil { + connCfg.Conns, intChan, false, biRPCClient); err != nil { return } - conn = conPool } if err = Cache.Set(utils.CacheRPCConnections, connID, conn, nil, @@ -126,7 +124,7 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec } // Call gets the connection calls the method on it -func (cM *ConnManager) Call(connIDs []string, biRPCClient rpcclient.ClientConnector, +func (cM *ConnManager) Call(connIDs []string, biRPCClient rpcclient.BiRPCConector, method string, arg, reply interface{}) (err error) { if len(connIDs) == 0 { return utils.NewErrMandatoryIeMissing("connIDs") diff --git a/engine/dispatcherprfl.go b/engine/dispatcherprfl.go index 27613cc30..95a3e1491 100644 --- a/engine/dispatcherprfl.go +++ b/engine/dispatcherprfl.go @@ -160,7 +160,7 @@ func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply int cfg.TLSCfg().ClientCerificate, cfg.TLSCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - IntRPC.GetInternalChanel(), false); err != nil { + IntRPC.GetInternalChanel(), false, nil); err != nil { return } diff --git a/engine/libengine.go b/engine/libengine.go index 175ebc685..fc9da5bc8 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -31,13 +31,14 @@ import ( // NewRPCPool returns a new pool of connection with the given configuration func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.RemoteHost, - internalConnChan chan rpcclient.ClientConnector, lazyConnect bool) (rpcPool *rpcclient.RPCPool, err error) { + internalConnChan chan rpcclient.ClientConnector, lazyConnect bool, + biRPCClient rpcclient.BiRPCConector) (rpcPool *rpcclient.RPCPool, err error) { var rpcClient rpcclient.ClientConnector var atLestOneConnected bool // If one connected we don't longer return errors rpcPool = rpcclient.NewRPCPool(dispatchStrategy, replyTimeout) for _, rpcConnCfg := range rpcConnCfgs { rpcClient, err = NewRPCConnection(rpcConnCfg, keyPath, certPath, caPath, connAttempts, reconnects, - connectTimeout, replyTimeout, internalConnChan, lazyConnect) + connectTimeout, replyTimeout, internalConnChan, lazyConnect, biRPCClient) if err == rpcclient.ErrUnsupportedCodec { return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) } @@ -54,14 +55,16 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA // NewRPCConnection creates a new connection based on the RemoteHost structure func NewRPCConnection(cfg *config.RemoteHost, keyPath, certPath, caPath string, connAttempts, reconnects int, - connectTimeout, replyTimeout time.Duration, internalConnChan chan rpcclient.ClientConnector, lazyConnect bool) (client rpcclient.ClientConnector, err error) { - if cfg.Address == utils.MetaInternal { + connectTimeout, replyTimeout time.Duration, internalConnChan chan rpcclient.ClientConnector, lazyConnect bool, + biRPCClient rpcclient.BiRPCConector) (client rpcclient.ClientConnector, err error) { + if cfg.Address == rpcclient.InternalRPC || + cfg.Address == rpcclient.BiRPCInternal { return rpcclient.NewRPCClient("", "", cfg.TLS, keyPath, certPath, caPath, connAttempts, - reconnects, connectTimeout, replyTimeout, rpcclient.InternalRPC, internalConnChan, lazyConnect) + reconnects, connectTimeout, replyTimeout, cfg.Address, internalConnChan, lazyConnect, biRPCClient) } return rpcclient.NewRPCClient(utils.TCP, cfg.Address, cfg.TLS, keyPath, certPath, caPath, connAttempts, reconnects, connectTimeout, replyTimeout, - utils.FirstNonEmpty(cfg.Transport, rpcclient.GOBrpc), nil, lazyConnect) + utils.FirstNonEmpty(cfg.Transport, rpcclient.GOBrpc), nil, lazyConnect, biRPCClient) } // IntRPC is the global variable that is used to comunicate with all the subsystems internally @@ -81,7 +84,7 @@ func (s RPCClientSet) AddInternalRPCClient(name string, connChan chan rpcclient. utils.EmptyString, utils.EmptyString, utils.EmptyString, config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects, config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout, - rpcclient.InternalRPC, connChan, true) + rpcclient.InternalRPC, connChan, true, nil) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Error adding %s to the set: %s", utils.InternalRPCSet, name, err.Error())) return diff --git a/general_tests/cdrs_onlexp_it_test.go b/general_tests/cdrs_onlexp_it_test.go index 6a699443a..c65518f24 100644 --- a/general_tests/cdrs_onlexp_it_test.go +++ b/general_tests/cdrs_onlexp_it_test.go @@ -185,7 +185,7 @@ func testCDRsOnExpAMQPQueuesCreation(t *testing.T) { func testCDRsOnExpInitMasterRPC(t *testing.T) { var err error cdrsMasterRpc, err = rpcclient.NewRPCClient(utils.TCP, cdrsMasterCfg.ListenCfg().RPCJSONListen, false, "", "", "", 1, 1, - time.Second, 5*time.Second, rpcclient.JSONrpc, nil, false) + time.Second, 5*time.Second, rpcclient.JSONrpc, nil, false, nil) if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -287,7 +287,7 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) { } time.Sleep(time.Duration(*waitRater) * time.Millisecond) cdrsSlaveRpc, err := rpcclient.NewRPCClient(utils.TCP, "127.0.0.1:12012", false, "", "", "", 1, 1, - time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false) + time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil) if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } diff --git a/general_tests/rpcclient_it_test.go b/general_tests/rpcclient_it_test.go index 989579c62..fdaab071e 100644 --- a/general_tests/rpcclient_it_test.go +++ b/general_tests/rpcclient_it_test.go @@ -119,13 +119,13 @@ func testRPCITLclStartSecondEngine(t *testing.T) { func testRPCITLclRpcConnPoolFirst(t *testing.T) { rpcPoolFirst = rpcclient.NewRPCPool(rpcclient.PoolFirst, 0) rpcRAL1, err = rpcclient.NewRPCClient(utils.TCP, rpcITCfg1.ListenCfg().RPCJSONListen, false, "", "", "", 3, 1, - time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false) + time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil) if err == nil { t.Fatal("Should receive cannot connect error here") } rpcPoolFirst.AddClient(rpcRAL1) rpcRAL2, err = rpcclient.NewRPCClient(utils.TCP, rpcITCfg2.ListenCfg().RPCJSONListen, false, "", "", "", 3, 1, - time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false) + time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil) if err != nil { t.Fatal(err) } @@ -353,13 +353,13 @@ func TestRPCITRmtRpcConnPool(t *testing.T) { } rpcPoolFirst = rpcclient.NewRPCPool(rpcclient.PoolFirst, 0) rpcRALRmt, err := rpcclient.NewRPCClient(utils.TCP, RemoteRALsAddr1, false, "", "", "", 1, 1, - time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false) + time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil) if err != nil { t.Fatal(err) } rpcPoolFirst.AddClient(rpcRALRmt) rpcRAL1, err = rpcclient.NewRPCClient(utils.TCP, RemoteRALsAddr2, false, "", "", "", 1, 1, - time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false) + time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil) if err != nil { t.Fatal(err) } diff --git a/general_tests/tls_it_test.go b/general_tests/tls_it_test.go index 506d857df..c5fe877d0 100755 --- a/general_tests/tls_it_test.go +++ b/general_tests/tls_it_test.go @@ -86,21 +86,21 @@ func testTLSRpcConn(t *testing.T) { var err error tlsRpcClientJson, err = rpcclient.NewRPCClient(utils.TCP, "localhost:2022", true, tlsCfg.TLSCfg().ClientKey, tlsCfg.TLSCfg().ClientCerificate, tlsCfg.TLSCfg().CaCertificate, 3, 3, - time.Second, 5*time.Minute, rpcclient.JSONrpc, nil, false) + time.Second, 5*time.Minute, rpcclient.JSONrpc, nil, false, nil) if err != nil { t.Errorf("Error: %s when dialing", err) } tlsRpcClientGob, err = rpcclient.NewRPCClient(utils.TCP, "localhost:2023", true, tlsCfg.TLSCfg().ClientKey, tlsCfg.TLSCfg().ClientCerificate, tlsCfg.TLSCfg().CaCertificate, 3, 3, - time.Second, 5*time.Minute, rpcclient.GOBrpc, nil, false) + time.Second, 5*time.Minute, rpcclient.GOBrpc, nil, false, nil) if err != nil { t.Errorf("Error: %s when dialing", err) } tlsHTTPJson, err = rpcclient.NewRPCClient(utils.TCP, "https://localhost:2280/jsonrpc", true, tlsCfg.TLSCfg().ClientKey, tlsCfg.TLSCfg().ClientCerificate, tlsCfg.TLSCfg().CaCertificate, 3, 3, - time.Second, 5*time.Minute, rpcclient.HTTPjson, nil, false) + time.Second, 5*time.Minute, rpcclient.HTTPjson, nil, false, nil) if err != nil { t.Errorf("Error: %s when dialing", err) } diff --git a/go.mod b/go.mod index a00d3ca3e..5bda91a78 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.15 // replace github.com/cgrates/radigo => ../radigo -// replace github.com/cgrates/rpcclient => ../rpcclient +replace github.com/cgrates/rpcclient => ../rpcclient require ( cloud.google.com/go v0.75.0 // indirect @@ -16,7 +16,7 @@ require ( github.com/blevesearch/bleve v1.0.14 github.com/cenk/hub v1.0.1 // indirect github.com/cenkalti/hub v1.0.1 // indirect - github.com/cenkalti/rpc2 v0.0.0-20201118113917-be2cde9a479f + github.com/cenkalti/rpc2 v0.0.0-20210206021708-de76ddb08fa8 github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d github.com/cgrates/baningo v0.0.0-20201105145354-6e3173f6a91b github.com/cgrates/cron v0.0.0-20201022095836-3522d5b72c70 diff --git a/go.sum b/go.sum index 87b9de551..8298529c8 100644 --- a/go.sum +++ b/go.sum @@ -84,6 +84,8 @@ github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg= github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs= github.com/cenkalti/rpc2 v0.0.0-20201118113917-be2cde9a479f h1:A4x+jqy1uTFFQTb6o9oyRmRPHPciBacoDYzALCNyIs0= github.com/cenkalti/rpc2 v0.0.0-20201118113917-be2cde9a479f/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M= +github.com/cenkalti/rpc2 v0.0.0-20210206021708-de76ddb08fa8 h1:p9C5lr3/fgr78scg68qXzJ781JPSOPrqNxNQY29MZmA= +github.com/cenkalti/rpc2 v0.0.0-20210206021708-de76ddb08fa8/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d h1:1PLz/t3XZy5KF8EY/ShzBZoVLaY50+tnAbE1wu8rCfg= github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d/go.mod h1:mMAzSIjK11XfRMrOIa7DXYl64REdPldRCbAgzKB47XQ= diff --git a/utils/birpcint_client.go b/utils/birpcint_client.go index 2702cd7d8..2762cf608 100644 --- a/utils/birpcint_client.go +++ b/utils/birpcint_client.go @@ -23,7 +23,6 @@ import ( "github.com/cenkalti/rpc2" rpc2_jsonrpc "github.com/cenkalti/rpc2/jsonrpc" - "github.com/cgrates/rpcclient" ) // NewBiJSONrpcClient will create a bidirectional JSON client connection @@ -39,34 +38,3 @@ func NewBiJSONrpcClient(addr string, handlers map[string]interface{}) (*rpc2.Cli go clnt.Run() return clnt, nil } - -// Interface which the server needs to work as BiRPCServer -type BiRPCServer interface { - Call(string, interface{}, interface{}) error // So we can use it also as rpcclient.ClientConnector - CallBiRPC(rpcclient.ClientConnector, string, interface{}, interface{}) error -} - -type BiRPCClient interface { - Call(string, interface{}, interface{}) error // So we can use it also as rpcclient.ClientConnector - ID() string -} - -func NewBiRPCInternalClient(serverConn BiRPCServer) *BiRPCInternalClient { - return &BiRPCInternalClient{serverConn: serverConn} -} - -// Need separate client from the original RpcClientConnection since diretly passing the server is not enough without passing the client's reference -type BiRPCInternalClient struct { - serverConn BiRPCServer - clntConn rpcclient.ClientConnector // conn to reach client and do calls over it -} - -// Used in case when clientConn is not available at init time (eg: SMGAsterisk who needs the biRPCConn at initialization) -func (clnt *BiRPCInternalClient) SetClientConn(clntConn rpcclient.ClientConnector) { - clnt.clntConn = clntConn -} - -// Part of rpcclient.ClientConnector interface -func (clnt *BiRPCInternalClient) Call(serviceMethod string, args interface{}, reply interface{}) error { - return clnt.serverConn.CallBiRPC(clnt.clntConn, serviceMethod, args, reply) -} diff --git a/utils/birpcint_client_test.go b/utils/birpcint_client_test.go index da76669fd..0ed292cf9 100644 --- a/utils/birpcint_client_test.go +++ b/utils/birpcint_client_test.go @@ -20,7 +20,6 @@ package utils import ( "net" - "reflect" "testing" "github.com/cenkalti/rpc2" @@ -64,34 +63,3 @@ func (t *testBiRPCServer) CallBiRPC(_ rpcclient.ClientConnector, metod string, a t.reply = reply return nil } - -func TestNewBiRPCInternalClient(t *testing.T) { - //empty check - - rpc := &testBiRPCServer{} - eOut := &BiRPCInternalClient{serverConn: rpc} - rcv := NewBiRPCInternalClient(rpc) - if !reflect.DeepEqual(eOut, rcv) { - t.Errorf("Expecting: %+v, received: %+v", eOut, rcv) - } - - rcv.SetClientConn(&testBiRPCServer{}) - - if rcv.clntConn == nil { - t.Error("Client Connection must be not nil") - } - - err := rcv.Call(APIerSv1ComputeActionPlanIndexes, "arg1", "reply") - if err != nil { - t.Error(err) - } - testrpc := &testBiRPCServer{ - metod: APIerSv1ComputeActionPlanIndexes, - args: "arg1", - reply: "reply", - } - if !reflect.DeepEqual(testrpc, rpc) { - t.Errorf("Expecting: %+v, received: %+v", testrpc, rpc) - } - -}