Moved BiRPC support in rpcclient library

This commit is contained in:
Trial97
2021-02-16 17:17:57 +02:00
committed by Dan Christian Bogos
parent e695b852fa
commit 3593bb9557
13 changed files with 65 additions and 125 deletions

View File

@@ -129,8 +129,9 @@ func main() {
return
}
var err error
client, err = rpcclient.NewRPCClient(utils.TCP, *server, *tls, *keyPath, *certificatePath, *caPath, 3, 3,
time.Second, time.Duration(*replyTimeOut)*time.Second, *rpcEncoding, nil, false)
time.Second, time.Duration(*replyTimeOut)*time.Second, *rpcEncoding, nil, false, nil)
if err != nil {
cgrConsoleFlags.PrintDefaults()
log.Fatal("Could not connect to server " + *server)

View File

@@ -44,7 +44,7 @@ func main() {
log.Fatal("Got config error: ", err.Error())
}
cdrsMasterRpc, err = rpcclient.NewRPCClient(utils.TCP, cdrsMasterCfg.ListenCfg().RPCJSONListen, false, "", "", "", 1, 1,
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false)
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil)
if err != nil {
log.Fatal("Could not connect to rater: ", err.Error())
}

View File

@@ -704,7 +704,7 @@ func cgrRPCAction(ub *Account, a *Action, acs Actions, extraData interface{}) (e
} else if client, err = rpcclient.NewRPCClient(utils.TCP, req.Address, false, "", "", "",
req.Attempts, 0, config.CgrConfig().GeneralCfg().ConnectTimeout,
config.CgrConfig().GeneralCfg().ReplyTimeout, req.Transport,
nil, false); err != nil {
nil, false, nil); err != nil {
return
}
in, out := params.InParam, params.OutParam

View File

@@ -41,7 +41,7 @@ type ConnManager struct {
// getConn is used to retrieve a connection from cache
// in case this doesn't exist create it and cache it
func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnector) (conn rpcclient.ClientConnector, err error) {
func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.BiRPCConector) (conn rpcclient.ClientConnector, err error) {
//try to get the connection from cache
if x, ok := Cache.Get(utils.CacheRPCConnections, connID); ok {
if x == nil {
@@ -51,13 +51,9 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec
}
// in case we don't find in cache create the connection and add this in cache
var intChan chan rpcclient.ClientConnector
var connCfg *config.RPCConn
isBiRPCCLient := false
if internalChan, has := cM.rpcInternal[connID]; has {
connCfg = cM.cfg.RPCConns()[utils.MetaInternal]
intChan = internalChan
isBiRPCCLient = true
} else {
var isInternalRPC bool
connCfg := cM.cfg.RPCConns()[utils.MetaInternal]
if intChan, isInternalRPC = cM.rpcInternal[connID]; !isInternalRPC {
connCfg = cM.cfg.RPCConns()[connID]
for _, rpcConn := range connCfg.Conns {
if rpcConn.Address == utils.MetaInternal {
@@ -67,12 +63,15 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec
}
}
switch {
case biRPCClient != nil && isBiRPCCLient: // special handling for SessionS BiJSONRPCClient
case biRPCClient != nil && isInternalRPC: // special handling for SessionS BiJSONRPCClient
if conn, err = rpcclient.NewRPCClient(utils.EmptyString, utils.EmptyString, false,
utils.EmptyString, utils.EmptyString, utils.EmptyString,
cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects,
cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
rpcclient.BiRPCInternal, intChan, false, biRPCClient); err != nil {
return
}
var rply string
sSIntConn := <-intChan
intChan <- sSIntConn
conn = utils.NewBiRPCInternalClient(sSIntConn.(utils.BiRPCServer))
conn.(*utils.BiRPCInternalClient).SetClientConn(biRPCClient)
if err = conn.Call(utils.SessionSv1RegisterInternalBiJSONConn,
utils.EmptyString, &rply); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not register biRPCClient, error: <%s>",
@@ -81,41 +80,40 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec
}
case connCfg.Strategy == rpcclient.PoolParallel:
rpcConnCfg := connCfg.Conns[0] // for parrallel we need only the first connection
var conPool *rpcclient.RPCParallelClientPool
if rpcConnCfg.Address == utils.MetaInternal {
conPool, err = rpcclient.NewRPCParallelClientPool("", "", rpcConnCfg.TLS,
cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate,
cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts,
cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout,
cM.cfg.GeneralCfg().ReplyTimeout, rpcclient.InternalRPC, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false)
} else if utils.SliceHasMember([]string{utils.EmptyString, utils.MetaGOB, utils.MetaJSON}, rpcConnCfg.Transport) {
codec := rpcclient.GOBrpc
if rpcConnCfg.Transport != "" {
codec = rpcConnCfg.Transport
}
conPool, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS,
cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate,
cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts,
cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout,
cM.cfg.GeneralCfg().ReplyTimeout, codec, nil, int64(cM.cfg.GeneralCfg().MaxParallelConns), false)
} else {
codec := rpcclient.GOBrpc
switch {
case rpcConnCfg.Address == rpcclient.InternalRPC:
codec = rpcclient.InternalRPC
case rpcConnCfg.Address == rpcclient.BiRPCInternal:
codec = rpcclient.BiRPCInternal
case rpcConnCfg.Transport == utils.EmptyString:
intChan = nil
case rpcConnCfg.Transport == rpcclient.GOBrpc,
rpcConnCfg.Transport == rpcclient.JSONrpc,
rpcConnCfg.Transport == rpcclient.BiRPCGOB,
rpcConnCfg.Transport == rpcclient.BiRPCJSON:
codec = rpcConnCfg.Transport
intChan = nil
default:
err = fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport)
}
if err != nil {
return
}
conn = conPool
if conn, err = rpcclient.NewRPCParallelClientPool(utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS,
cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate,
cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts,
cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout,
cM.cfg.GeneralCfg().ReplyTimeout, codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false, biRPCClient); err != nil {
return
}
default:
var conPool *rpcclient.RPCPool
if conPool, err = NewRPCPool(connCfg.Strategy,
if conn, err = NewRPCPool(connCfg.Strategy,
cM.cfg.TLSCfg().ClientKey,
cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate,
cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects,
cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout,
connCfg.Conns, intChan, false); err != nil {
connCfg.Conns, intChan, false, biRPCClient); err != nil {
return
}
conn = conPool
}
if err = Cache.Set(utils.CacheRPCConnections, connID, conn, nil,
@@ -126,7 +124,7 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec
}
// Call gets the connection calls the method on it
func (cM *ConnManager) Call(connIDs []string, biRPCClient rpcclient.ClientConnector,
func (cM *ConnManager) Call(connIDs []string, biRPCClient rpcclient.BiRPCConector,
method string, arg, reply interface{}) (err error) {
if len(connIDs) == 0 {
return utils.NewErrMandatoryIeMissing("connIDs")

View File

@@ -160,7 +160,7 @@ func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply int
cfg.TLSCfg().ClientCerificate, cfg.TLSCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
IntRPC.GetInternalChanel(), false); err != nil {
IntRPC.GetInternalChanel(), false, nil); err != nil {
return
}

View File

@@ -31,13 +31,14 @@ import (
// NewRPCPool returns a new pool of connection with the given configuration
func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connAttempts, reconnects int,
connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.RemoteHost,
internalConnChan chan rpcclient.ClientConnector, lazyConnect bool) (rpcPool *rpcclient.RPCPool, err error) {
internalConnChan chan rpcclient.ClientConnector, lazyConnect bool,
biRPCClient rpcclient.BiRPCConector) (rpcPool *rpcclient.RPCPool, err error) {
var rpcClient rpcclient.ClientConnector
var atLestOneConnected bool // If one connected we don't longer return errors
rpcPool = rpcclient.NewRPCPool(dispatchStrategy, replyTimeout)
for _, rpcConnCfg := range rpcConnCfgs {
rpcClient, err = NewRPCConnection(rpcConnCfg, keyPath, certPath, caPath, connAttempts, reconnects,
connectTimeout, replyTimeout, internalConnChan, lazyConnect)
connectTimeout, replyTimeout, internalConnChan, lazyConnect, biRPCClient)
if err == rpcclient.ErrUnsupportedCodec {
return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport)
}
@@ -54,14 +55,16 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA
// NewRPCConnection creates a new connection based on the RemoteHost structure
func NewRPCConnection(cfg *config.RemoteHost, keyPath, certPath, caPath string, connAttempts, reconnects int,
connectTimeout, replyTimeout time.Duration, internalConnChan chan rpcclient.ClientConnector, lazyConnect bool) (client rpcclient.ClientConnector, err error) {
if cfg.Address == utils.MetaInternal {
connectTimeout, replyTimeout time.Duration, internalConnChan chan rpcclient.ClientConnector, lazyConnect bool,
biRPCClient rpcclient.BiRPCConector) (client rpcclient.ClientConnector, err error) {
if cfg.Address == rpcclient.InternalRPC ||
cfg.Address == rpcclient.BiRPCInternal {
return rpcclient.NewRPCClient("", "", cfg.TLS, keyPath, certPath, caPath, connAttempts,
reconnects, connectTimeout, replyTimeout, rpcclient.InternalRPC, internalConnChan, lazyConnect)
reconnects, connectTimeout, replyTimeout, cfg.Address, internalConnChan, lazyConnect, biRPCClient)
}
return rpcclient.NewRPCClient(utils.TCP, cfg.Address, cfg.TLS, keyPath, certPath, caPath,
connAttempts, reconnects, connectTimeout, replyTimeout,
utils.FirstNonEmpty(cfg.Transport, rpcclient.GOBrpc), nil, lazyConnect)
utils.FirstNonEmpty(cfg.Transport, rpcclient.GOBrpc), nil, lazyConnect, biRPCClient)
}
// IntRPC is the global variable that is used to comunicate with all the subsystems internally
@@ -81,7 +84,7 @@ func (s RPCClientSet) AddInternalRPCClient(name string, connChan chan rpcclient.
utils.EmptyString, utils.EmptyString, utils.EmptyString,
config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects,
config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout,
rpcclient.InternalRPC, connChan, true)
rpcclient.InternalRPC, connChan, true, nil)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error adding %s to the set: %s", utils.InternalRPCSet, name, err.Error()))
return

View File

@@ -185,7 +185,7 @@ func testCDRsOnExpAMQPQueuesCreation(t *testing.T) {
func testCDRsOnExpInitMasterRPC(t *testing.T) {
var err error
cdrsMasterRpc, err = rpcclient.NewRPCClient(utils.TCP, cdrsMasterCfg.ListenCfg().RPCJSONListen, false, "", "", "", 1, 1,
time.Second, 5*time.Second, rpcclient.JSONrpc, nil, false)
time.Second, 5*time.Second, rpcclient.JSONrpc, nil, false, nil)
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
@@ -287,7 +287,7 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) {
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
cdrsSlaveRpc, err := rpcclient.NewRPCClient(utils.TCP, "127.0.0.1:12012", false, "", "", "", 1, 1,
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false)
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil)
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}

View File

@@ -119,13 +119,13 @@ func testRPCITLclStartSecondEngine(t *testing.T) {
func testRPCITLclRpcConnPoolFirst(t *testing.T) {
rpcPoolFirst = rpcclient.NewRPCPool(rpcclient.PoolFirst, 0)
rpcRAL1, err = rpcclient.NewRPCClient(utils.TCP, rpcITCfg1.ListenCfg().RPCJSONListen, false, "", "", "", 3, 1,
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false)
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil)
if err == nil {
t.Fatal("Should receive cannot connect error here")
}
rpcPoolFirst.AddClient(rpcRAL1)
rpcRAL2, err = rpcclient.NewRPCClient(utils.TCP, rpcITCfg2.ListenCfg().RPCJSONListen, false, "", "", "", 3, 1,
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false)
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil)
if err != nil {
t.Fatal(err)
}
@@ -353,13 +353,13 @@ func TestRPCITRmtRpcConnPool(t *testing.T) {
}
rpcPoolFirst = rpcclient.NewRPCPool(rpcclient.PoolFirst, 0)
rpcRALRmt, err := rpcclient.NewRPCClient(utils.TCP, RemoteRALsAddr1, false, "", "", "", 1, 1,
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false)
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil)
if err != nil {
t.Fatal(err)
}
rpcPoolFirst.AddClient(rpcRALRmt)
rpcRAL1, err = rpcclient.NewRPCClient(utils.TCP, RemoteRALsAddr2, false, "", "", "", 1, 1,
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false)
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false, nil)
if err != nil {
t.Fatal(err)
}

View File

@@ -86,21 +86,21 @@ func testTLSRpcConn(t *testing.T) {
var err error
tlsRpcClientJson, err = rpcclient.NewRPCClient(utils.TCP, "localhost:2022", true, tlsCfg.TLSCfg().ClientKey,
tlsCfg.TLSCfg().ClientCerificate, tlsCfg.TLSCfg().CaCertificate, 3, 3,
time.Second, 5*time.Minute, rpcclient.JSONrpc, nil, false)
time.Second, 5*time.Minute, rpcclient.JSONrpc, nil, false, nil)
if err != nil {
t.Errorf("Error: %s when dialing", err)
}
tlsRpcClientGob, err = rpcclient.NewRPCClient(utils.TCP, "localhost:2023", true, tlsCfg.TLSCfg().ClientKey,
tlsCfg.TLSCfg().ClientCerificate, tlsCfg.TLSCfg().CaCertificate, 3, 3,
time.Second, 5*time.Minute, rpcclient.GOBrpc, nil, false)
time.Second, 5*time.Minute, rpcclient.GOBrpc, nil, false, nil)
if err != nil {
t.Errorf("Error: %s when dialing", err)
}
tlsHTTPJson, err = rpcclient.NewRPCClient(utils.TCP, "https://localhost:2280/jsonrpc", true, tlsCfg.TLSCfg().ClientKey,
tlsCfg.TLSCfg().ClientCerificate, tlsCfg.TLSCfg().CaCertificate, 3, 3,
time.Second, 5*time.Minute, rpcclient.HTTPjson, nil, false)
time.Second, 5*time.Minute, rpcclient.HTTPjson, nil, false, nil)
if err != nil {
t.Errorf("Error: %s when dialing", err)
}

4
go.mod
View File

@@ -4,7 +4,7 @@ go 1.15
// replace github.com/cgrates/radigo => ../radigo
// replace github.com/cgrates/rpcclient => ../rpcclient
replace github.com/cgrates/rpcclient => ../rpcclient
require (
cloud.google.com/go v0.75.0 // indirect
@@ -16,7 +16,7 @@ require (
github.com/blevesearch/bleve v1.0.14
github.com/cenk/hub v1.0.1 // indirect
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cenkalti/rpc2 v0.0.0-20201118113917-be2cde9a479f
github.com/cenkalti/rpc2 v0.0.0-20210206021708-de76ddb08fa8
github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d
github.com/cgrates/baningo v0.0.0-20201105145354-6e3173f6a91b
github.com/cgrates/cron v0.0.0-20201022095836-3522d5b72c70

2
go.sum
View File

@@ -84,6 +84,8 @@ github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg=
github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs=
github.com/cenkalti/rpc2 v0.0.0-20201118113917-be2cde9a479f h1:A4x+jqy1uTFFQTb6o9oyRmRPHPciBacoDYzALCNyIs0=
github.com/cenkalti/rpc2 v0.0.0-20201118113917-be2cde9a479f/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M=
github.com/cenkalti/rpc2 v0.0.0-20210206021708-de76ddb08fa8 h1:p9C5lr3/fgr78scg68qXzJ781JPSOPrqNxNQY29MZmA=
github.com/cenkalti/rpc2 v0.0.0-20210206021708-de76ddb08fa8/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d h1:1PLz/t3XZy5KF8EY/ShzBZoVLaY50+tnAbE1wu8rCfg=
github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d/go.mod h1:mMAzSIjK11XfRMrOIa7DXYl64REdPldRCbAgzKB47XQ=

View File

@@ -23,7 +23,6 @@ import (
"github.com/cenkalti/rpc2"
rpc2_jsonrpc "github.com/cenkalti/rpc2/jsonrpc"
"github.com/cgrates/rpcclient"
)
// NewBiJSONrpcClient will create a bidirectional JSON client connection
@@ -39,34 +38,3 @@ func NewBiJSONrpcClient(addr string, handlers map[string]interface{}) (*rpc2.Cli
go clnt.Run()
return clnt, nil
}
// Interface which the server needs to work as BiRPCServer
type BiRPCServer interface {
Call(string, interface{}, interface{}) error // So we can use it also as rpcclient.ClientConnector
CallBiRPC(rpcclient.ClientConnector, string, interface{}, interface{}) error
}
type BiRPCClient interface {
Call(string, interface{}, interface{}) error // So we can use it also as rpcclient.ClientConnector
ID() string
}
func NewBiRPCInternalClient(serverConn BiRPCServer) *BiRPCInternalClient {
return &BiRPCInternalClient{serverConn: serverConn}
}
// Need separate client from the original RpcClientConnection since diretly passing the server is not enough without passing the client's reference
type BiRPCInternalClient struct {
serverConn BiRPCServer
clntConn rpcclient.ClientConnector // conn to reach client and do calls over it
}
// Used in case when clientConn is not available at init time (eg: SMGAsterisk who needs the biRPCConn at initialization)
func (clnt *BiRPCInternalClient) SetClientConn(clntConn rpcclient.ClientConnector) {
clnt.clntConn = clntConn
}
// Part of rpcclient.ClientConnector interface
func (clnt *BiRPCInternalClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
return clnt.serverConn.CallBiRPC(clnt.clntConn, serviceMethod, args, reply)
}

View File

@@ -20,7 +20,6 @@ package utils
import (
"net"
"reflect"
"testing"
"github.com/cenkalti/rpc2"
@@ -64,34 +63,3 @@ func (t *testBiRPCServer) CallBiRPC(_ rpcclient.ClientConnector, metod string, a
t.reply = reply
return nil
}
func TestNewBiRPCInternalClient(t *testing.T) {
//empty check
rpc := &testBiRPCServer{}
eOut := &BiRPCInternalClient{serverConn: rpc}
rcv := NewBiRPCInternalClient(rpc)
if !reflect.DeepEqual(eOut, rcv) {
t.Errorf("Expecting: %+v, received: %+v", eOut, rcv)
}
rcv.SetClientConn(&testBiRPCServer{})
if rcv.clntConn == nil {
t.Error("Client Connection must be not nil")
}
err := rcv.Call(APIerSv1ComputeActionPlanIndexes, "arg1", "reply")
if err != nil {
t.Error(err)
}
testrpc := &testBiRPCServer{
metod: APIerSv1ComputeActionPlanIndexes,
args: "arg1",
reply: "reply",
}
if !reflect.DeepEqual(testrpc, rpc) {
t.Errorf("Expecting: %+v, received: %+v", testrpc, rpc)
}
}