Updated internal RPCSet

This commit is contained in:
Trial97
2020-03-03 08:18:27 +02:00
committed by Dan Christian Bogos
parent df5313da61
commit 5d2c05902e
2 changed files with 13 additions and 21 deletions

View File

@@ -28,6 +28,7 @@ import (
"github.com/cgrates/rpcclient"
)
// NewRPCPool returns a new pool of connection with the given configuration
func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connAttempts, reconnects int,
connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.RemoteHost,
internalConnChan chan rpcclient.ClientConnector, lazyConnect bool) (*rpcclient.RPCPool, error) {
@@ -60,51 +61,41 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA
return rpcPool, err
}
// IntRPC is the global variable that is used to comunicate with all the subsystems internally
var IntRPC *RPCClientSet
// NewRPCClientSet initilalizates the map of connections
func NewRPCClientSet() (s *RPCClientSet) {
return &RPCClientSet{set: make(map[string]*rpcclient.RPCClient)}
}
// RPCClientSet is a RPC ClientConnector for the internal subsystems
type RPCClientSet struct {
set map[string]*rpcclient.RPCClient
}
func (s *RPCClientSet) AddRPCClient(name string, rpc *rpcclient.RPCClient) {
s.set[name] = rpc
}
func (s *RPCClientSet) AddRPCConnection(name, transport, addr string, tls bool,
key_path, cert_path, ca_path string, connectAttempts, reconnects int,
connTimeout, replyTimeout time.Duration, codec string,
internalChan chan rpcclient.ClientConnector, lazyConnect bool) error {
rpc, err := rpcclient.NewRPCClient(transport, addr, tls, key_path, cert_path,
ca_path, connectAttempts, reconnects, connTimeout, replyTimeout,
codec, internalChan, lazyConnect)
if err != nil {
return err
}
s.AddRPCClient(name, rpc)
return nil
}
// AddInternalRPCClient creates and adds to the set a new rpc client using the provided configuration
func (s *RPCClientSet) AddInternalRPCClient(name string, connChan chan rpcclient.ClientConnector) {
err := s.AddRPCConnection(name, utils.EmptyString, utils.EmptyString,
false, utils.EmptyString, utils.EmptyString, utils.EmptyString,
rpc, err := rpcclient.NewRPCClient(utils.EmptyString, utils.EmptyString, false,
utils.EmptyString, utils.EmptyString, utils.EmptyString,
config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects,
config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout,
rpcclient.InternalRPC, connChan, true)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<InternalRCP> Error adding %s to the set: %v", name, err.Error()))
utils.Logger.Err(fmt.Sprintf("<%s> Error adding %s to the set: %s", utils.InternalRPCSet, name, err.Error()))
return
}
s.set[name] = rpc
}
// GetInternalChanel is used when RPCClientSet is passed as internal connection for RPCPool
func (s *RPCClientSet) GetInternalChanel() chan rpcclient.ClientConnector {
connChan := make(chan rpcclient.ClientConnector, 1)
connChan <- s
return connChan
}
// Call the implementation of the rpcclient.ClientConnector interface
func (s *RPCClientSet) Call(method string, args interface{}, reply interface{}) error {
methodSplit := strings.Split(method, ".")
if len(methodSplit) != 2 {

View File

@@ -654,6 +654,7 @@ const (
TmpSuffix = ".tmp"
MetaDiamreq = "*diamreq"
MetaGroup = "*group"
InternalRPCSet = "InternalRPCSet"
)
// Migrator Action