From 7d95f1220ac2c833b456441910e40cc2fe1d1f0e Mon Sep 17 00:00:00 2001 From: Tripon Alexandru-Ionut Date: Mon, 8 Apr 2019 18:37:48 +0300 Subject: [PATCH] Added RPCSet --- cmd/cgr-engine/cgr-engine.go | 76 +++++++++++++++-------------- cmd/cgr-engine/rater.go | 2 - engine/datamanager.go | 2 +- engine/libengine.go | 93 +++++++++++++++++------------------- 4 files changed, 85 insertions(+), 88 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 435a29b3d..275243ac9 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -301,8 +301,6 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in ssv1 := v1.NewSessionSv1(sm) // methods with multiple options if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(ssv1) - } else { - engine.AddInternalRPCClient(utils.SessionSv1, ssv1) } // Register BiRpc handlers if cfg.SessionSCfg().ListenBijson != "" { @@ -680,8 +678,6 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec aSv1 := v1.NewAttributeSv1(aS) if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(aSv1) - } else { - engine.AddInternalRPCClient(utils.AttributeSv1, aSv1) } internalAttributeSChan <- aSv1 } @@ -733,8 +729,6 @@ func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection cSv1 := v1.NewChargerSv1(cS) if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(cSv1) - } else { - engine.AddInternalRPCClient(utils.ChargerSv1, cSv1) } internalChargerSChan <- cSv1 } @@ -784,8 +778,6 @@ func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cac rsV1 := v1.NewResourceSv1(rS) if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(rsV1) - } else { - engine.AddInternalRPCClient(utils.ResourceSv1, rsV1) } internalRsChan <- rsV1 } @@ -835,8 +827,6 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach stsV1 := v1.NewStatSv1(sS) if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(stsV1) - } else { - engine.AddInternalRPCClient(utils.StatSv1, stsV1) } internalStatSChan <- stsV1 } @@ -870,8 +860,6 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec tSv1 := v1.NewThresholdSv1(tS) if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(tSv1) - } else { - engine.AddInternalRPCClient(utils.ThresholdSv1, tSv1) } internalThresholdSChan <- tSv1 } @@ -955,8 +943,6 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti splV1 := v1.NewSupplierSv1(splS) if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(splV1) - } else { - engine.AddInternalRPCClient(utils.SupplierSv1, splV1) } internalSupplierSChan <- splV1 } @@ -972,7 +958,7 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, // loaderService will start and register APIs for LoaderService if enabled func startLoaderS(cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, - filterSChan chan *engine.FilterS, cacheSChan chan rpcclient.RpcClientConnection) { + filterSChan chan *engine.FilterS, internalLoaderSChan, cacheSChan chan rpcclient.RpcClientConnection) { filterS := <-filterSChan filterSChan <- filterS @@ -982,7 +968,9 @@ func startLoaderS(cfg *config.CGRConfig, return } go ldrS.ListenAndServe(exitChan) - server.RpcRegister(v1.NewLoaderSv1(ldrS)) + ldrSv1 := v1.NewLoaderSv1(ldrS) + server.RpcRegister(ldrSv1) + internalLoaderSChan <- ldrSv1 } // startDispatcherService fires up the DispatcherS @@ -1106,32 +1094,29 @@ func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection, } }() + chSv1 := v1.NewCacheSv1(chS) if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(v1.NewCacheSv1(chS)) - } else { - engine.AddInternalRPCClient(utils.CacheSv1, v1.NewCacheSv1(chS)) + server.RpcRegister(chSv1) } - internalCacheSChan <- chS + internalCacheSChan <- chS //v1 return } -func initGuardianSv1(server *utils.Server) { +func initGuardianSv1(internalGuardianSChan chan rpcclient.RpcClientConnection, server *utils.Server) { + grdSv1 := v1.NewGuardianSv1() if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(v1.NewGuardianSv1()) - } else { - engine.AddInternalRPCClient(utils.GuardianSv1, v1.NewGuardianSv1()) + server.RpcRegister(grdSv1) } + internalGuardianSChan <- grdSv1 } -func initSchedulerS(internalCacheSChan chan rpcclient.RpcClientConnection, +func initSchedulerS(internalSchedSChan chan rpcclient.RpcClientConnection, srvMngr *servmanager.ServiceManager, server *utils.Server) { schdS := servmanager.NewSchedulerS(srvMngr) if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(v1.NewSchedulerSv1(schdS)) - } else { - engine.AddInternalRPCClient(utils.SchedulerSv1, v1.NewSchedulerSv1(schdS)) } - internalCacheSChan <- schdS + internalSchedSChan <- schdS } func startRpc(server *utils.Server, internalRaterChan, @@ -1427,9 +1412,6 @@ func main() { engine.SetRpSubjectPrefixMatching(cfg.RalsCfg().RpSubjectPrefixMatching) stopHandled := false - // initializate internal RPC conections for dispatcher - engine.InitInternalRPC() - // Rpc/http server server := utils.NewServer() @@ -1439,6 +1421,8 @@ func main() { // Async starts here, will follow cgrates.json start order // Define internal connections via channels + filterSChan := make(chan *engine.FilterS, 1) + internalDispatcherSChan := make(chan *dispatchers.DispatcherService, 1) internalRaterChan := make(chan rpcclient.RpcClientConnection, 1) internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1) internalSMGChan := make(chan rpcclient.RpcClientConnection, 1) @@ -1448,17 +1432,39 @@ func main() { internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) internalSupplierSChan := make(chan rpcclient.RpcClientConnection, 1) - filterSChan := make(chan *engine.FilterS, 1) - internalDispatcherSChan := make(chan *dispatchers.DispatcherService, 1) internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1) internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1) internalSchedSChan := make(chan rpcclient.RpcClientConnection, 1) + internalGuardianSChan := make(chan rpcclient.RpcClientConnection, 1) + internalLoaderSChan := make(chan rpcclient.RpcClientConnection, 1) + + // init internalRPCSet + engine.IntRPC = engine.NewRPCClientSet() + if cfg.DispatcherSCfg().Enabled { + engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, internalAnalyzerSChan) + // engine.IntRPC.AddInternalRPCClient(utils.ApierV1, internalApierV1Chan) + // engine.IntRPC.AddInternalRPCClient(utils.ApierV2, internalApierV2Chan) + engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, internalAttributeSChan) + engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) // server or from apier + // engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, internalCdrSChan) + // engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, internalCdrSChan) + engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, internalChargerSChan) + engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan) + engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan) + engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, internalRsChan) + engine.IntRPC.AddInternalRPCClient(utils.Responder, internalRaterChan) + engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, internalSchedSChan) // server or from apier + engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, internalSMGChan) // server or from apier + engine.IntRPC.AddInternalRPCClient(utils.StatSv1, internalStatSChan) + engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, internalSupplierSChan) + engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, internalThresholdSChan) + } // init CacheS cacheS := initCacheS(internalCacheSChan, server, dm, exitChan) // init GuardianSv1 - initGuardianSv1(server) + initGuardianSv1(internalGuardianSChan, server) // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheS) @@ -1575,7 +1581,7 @@ func main() { go startAnalyzerService(internalAnalyzerSChan, server, exitChan) } - go startLoaderS(cfg, dm, server, exitChan, filterSChan, internalCacheSChan) + go startLoaderS(cfg, dm, server, exitChan, filterSChan, internalLoaderSChan, internalCacheSChan) // Serve rpc connections go startRpc(server, internalRaterChan, internalCdrSChan, diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 10902e2c8..368c92799 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -190,8 +190,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(responder) - } else { - engine.AddInternalRPCClient(utils.Responder, responder) } server.RpcRegister(apierRpcV1) // ToDo: Add apierv1 to dispatcher server.RpcRegister(apierRpcV2) // ToDo: Add apierv2 to dispatcher diff --git a/engine/datamanager.go b/engine/datamanager.go index 0a3ba3938..9f28bdaba 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1333,7 +1333,7 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - dH.Conns, GetInternalRPCClientChanel(), cfg.GeneralCfg().InternalTtl, false); err != nil { + dH.Conns, IntRPC.GetInternalChanel(), cfg.GeneralCfg().InternalTtl, false); err != nil { return nil, err } if cacheWrite { diff --git a/engine/libengine.go b/engine/libengine.go index eb5465b6e..ac979a51a 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -20,6 +20,7 @@ package engine import ( "fmt" + "strings" "time" "github.com/cgrates/cgrates/config" @@ -59,57 +60,37 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA return rpcPool, err } -var IntRPC *rpcclient.RPCClientSet +var IntRPC *RPCClientSet -func InitInternalRPC() { - if !config.CgrConfig().DispatcherSCfg().Enabled { - return - } - var subsystems []string - subsystems = append(subsystems, utils.CacheSv1) - subsystems = append(subsystems, utils.GuardianSv1) - subsystems = append(subsystems, utils.SchedulerSv1) - subsystems = append(subsystems, utils.LoaderSv1) - if config.CgrConfig().AttributeSCfg().Enabled { - subsystems = append(subsystems, utils.AttributeSv1) - } - if config.CgrConfig().RalsCfg().RALsEnabled { - subsystems = append(subsystems, utils.ApierV1) - subsystems = append(subsystems, utils.ApierV2) - subsystems = append(subsystems, utils.Responder) - } - if config.CgrConfig().CdrsCfg().CDRSEnabled { - subsystems = append(subsystems, utils.CDRsV1) - subsystems = append(subsystems, utils.CDRsV2) - } - if config.CgrConfig().AnalyzerSCfg().Enabled { - subsystems = append(subsystems, utils.AnalyzerSv1) - } - if config.CgrConfig().SessionSCfg().Enabled { - subsystems = append(subsystems, utils.SessionSv1) - } - if config.CgrConfig().ChargerSCfg().Enabled { - subsystems = append(subsystems, utils.ChargerSv1) - } - if config.CgrConfig().ResourceSCfg().Enabled { - subsystems = append(subsystems, utils.ResourceSv1) - } - if config.CgrConfig().StatSCfg().Enabled { - subsystems = append(subsystems, utils.StatSv1) - } - if config.CgrConfig().ThresholdSCfg().Enabled { - subsystems = append(subsystems, utils.ThresholdSv1) - } - if config.CgrConfig().SupplierSCfg().Enabled { - subsystems = append(subsystems, utils.SupplierSv1) - } - IntRPC = rpcclient.NewRPCClientSet(subsystems, config.CgrConfig().GeneralCfg().InternalTtl) +func NewRPCClientSet() (s *RPCClientSet) { + return &RPCClientSet{set: make(map[string]*rpcclient.RpcClient)} } -func AddInternalRPCClient(name string, rpc rpcclient.RpcClientConnection) { - connChan := make(chan rpcclient.RpcClientConnection, 1) - connChan <- rpc - err := IntRPC.AddRPCConnection(name, "", "", false, "", "", "", +type RPCClientSet struct { + set map[string]*rpcclient.RpcClient +} + +func (s *RPCClientSet) AddRPCClient(name string, rpc *rpcclient.RpcClient) { + s.set[name] = rpc +} + +func (s *RPCClientSet) AddRPCConnection(name, transport, addr string, tls bool, + key_path, cert_path, ca_path string, connectAttempts, reconnects int, + connTimeout, replyTimeout time.Duration, codec string, + internalChan chan rpcclient.RpcClientConnection, lazyConnect bool) error { + rpc, err := rpcclient.NewRpcClient(transport, addr, tls, key_path, cert_path, + ca_path, connectAttempts, reconnects, connTimeout, replyTimeout, + codec, internalChan, lazyConnect) + if err != nil { + return err + } + s.AddRPCClient(name, rpc) + return nil +} + +func (s *RPCClientSet) AddInternalRPCClient(name string, connChan chan rpcclient.RpcClientConnection) { + err := s.AddRPCConnection(name, utils.EmptyString, utils.EmptyString, + false, utils.EmptyString, utils.EmptyString, utils.EmptyString, config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects, config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout, rpcclient.INTERNAL_RPC, connChan, true) @@ -118,8 +99,20 @@ func AddInternalRPCClient(name string, rpc rpcclient.RpcClientConnection) { } } -func GetInternalRPCClientChanel() chan rpcclient.RpcClientConnection { +func (s *RPCClientSet) GetInternalChanel() chan rpcclient.RpcClientConnection { connChan := make(chan rpcclient.RpcClientConnection, 1) - connChan <- IntRPC + connChan <- s return connChan } + +func (s *RPCClientSet) Call(method string, args interface{}, reply interface{}) error { + methodSplit := strings.Split(method, ".") + if len(methodSplit) != 2 { + return rpcclient.ErrUnsupporteServiceMethod + } + conn, has := s.set[methodSplit[0]] + if !has { + return rpcclient.ErrUnsupporteServiceMethod + } + return conn.Call(method, args, reply) +}