From 0580c5ba392455e76d9d7d216411275ef0a45542 Mon Sep 17 00:00:00 2001 From: Tripon Alexandru-Ionut Date: Mon, 8 Apr 2019 14:44:15 +0300 Subject: [PATCH] Updated internal RPC --- cmd/cgr-engine/cgr-engine.go | 23 +++++---- cmd/cgr-engine/rater.go | 6 +-- engine/datamanager.go | 2 +- engine/libengine.go | 97 +++++++++++++++++++++--------------- utils/consts.go | 5 +- 5 files changed, 77 insertions(+), 56 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 8f7053416..435a29b3d 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -302,7 +302,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(ssv1) } else { - engine.IntRPC.AddConnection(utils.SessionSv1, ssv1) + engine.AddInternalRPCClient(utils.SessionSv1, ssv1) } // Register BiRpc handlers if cfg.SessionSCfg().ListenBijson != "" { @@ -681,7 +681,7 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(aSv1) } else { - engine.IntRPC.AddConnection(utils.AttributeSv1, aSv1) + engine.AddInternalRPCClient(utils.AttributeSv1, aSv1) } internalAttributeSChan <- aSv1 } @@ -734,7 +734,7 @@ func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(cSv1) } else { - engine.IntRPC.AddConnection(utils.ChargerSv1, cSv1) + engine.AddInternalRPCClient(utils.ChargerSv1, cSv1) } internalChargerSChan <- cSv1 } @@ -785,7 +785,7 @@ func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cac if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(rsV1) } else { - engine.IntRPC.AddConnection(utils.ResourceSv1, rsV1) + engine.AddInternalRPCClient(utils.ResourceSv1, rsV1) } internalRsChan <- rsV1 } @@ -836,7 +836,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(stsV1) } else { - engine.IntRPC.AddConnection(utils.StatSv1, stsV1) + engine.AddInternalRPCClient(utils.StatSv1, stsV1) } internalStatSChan <- stsV1 } @@ -871,7 +871,7 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(tSv1) } else { - engine.IntRPC.AddConnection(utils.ThresholdSv1, tSv1) + engine.AddInternalRPCClient(utils.ThresholdSv1, tSv1) } internalThresholdSChan <- tSv1 } @@ -956,7 +956,7 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti if !config.CgrConfig().DispatcherSCfg().Enabled { server.RpcRegister(splV1) } else { - engine.IntRPC.AddConnection(utils.SupplierSv1, splV1) + engine.AddInternalRPCClient(utils.SupplierSv1, splV1) } internalSupplierSChan <- splV1 } @@ -1109,7 +1109,7 @@ func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection, if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(v1.NewCacheSv1(chS)) } else { - engine.IntRPC.AddConnection(utils.CacheSv1, v1.NewCacheSv1(chS)) + engine.AddInternalRPCClient(utils.CacheSv1, v1.NewCacheSv1(chS)) } internalCacheSChan <- chS return @@ -1119,7 +1119,7 @@ func initGuardianSv1(server *utils.Server) { if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(v1.NewGuardianSv1()) } else { - engine.IntRPC.AddConnection(utils.GuardianSv1, v1.NewGuardianSv1()) + engine.AddInternalRPCClient(utils.GuardianSv1, v1.NewGuardianSv1()) } } @@ -1129,7 +1129,7 @@ func initSchedulerS(internalCacheSChan chan rpcclient.RpcClientConnection, if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(v1.NewSchedulerSv1(schdS)) } else { - engine.IntRPC.AddConnection(utils.SchedulerSv1, v1.NewSchedulerSv1(schdS)) + engine.AddInternalRPCClient(utils.SchedulerSv1, v1.NewSchedulerSv1(schdS)) } internalCacheSChan <- schdS } @@ -1427,6 +1427,9 @@ func main() { engine.SetRpSubjectPrefixMatching(cfg.RalsCfg().RpSubjectPrefixMatching) stopHandled := false + // initializate internal RPC conections for dispatcher + engine.InitInternalRPC() + // Rpc/http server server := utils.NewServer() diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 1a800e5cc..10902e2c8 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -191,10 +191,10 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(responder) } else { - engine.IntRPC.AddConnection(utils.Responder, responder) + engine.AddInternalRPCClient(utils.Responder, responder) } - server.RpcRegister(apierRpcV1) - server.RpcRegister(apierRpcV2) + server.RpcRegister(apierRpcV1) // ToDo: Add apierv1 to dispatcher + server.RpcRegister(apierRpcV2) // ToDo: Add apierv2 to dispatcher utils.RegisterRpcParams("", &v1.CDRsV1{}) utils.RegisterRpcParams("", &v2.CDRsV2{}) diff --git a/engine/datamanager.go b/engine/datamanager.go index 2241fe741..0a3ba3938 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1333,7 +1333,7 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - dH.Conns, IntRPC.GetConnChan(), cfg.GeneralCfg().InternalTtl, false); err != nil { + dH.Conns, GetInternalRPCClientChanel(), cfg.GeneralCfg().InternalTtl, false); err != nil { return nil, err } if cacheWrite { diff --git a/engine/libengine.go b/engine/libengine.go index cb932f294..eb5465b6e 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -19,10 +19,7 @@ along with this program. If not, see package engine import ( - "errors" "fmt" - "strings" - "sync" "time" "github.com/cgrates/cgrates/config" @@ -39,15 +36,8 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA atLestOneConnected := false // If one connected we don't longer return errors for _, rpcConnCfg := range rpcConnCfgs { if rpcConnCfg.Address == utils.MetaInternal { - var internalConn rpcclient.RpcClientConnection - select { - case internalConn = <-internalConnChan: - internalConnChan <- internalConn - case <-time.After(ttl): - return nil, errors.New("TTL triggered") - } rpcClient, err = rpcclient.NewRpcClient("", "", rpcConnCfg.TLS, keyPath, certPath, caPath, connAttempts, - reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, lazyConnect) + reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConnChan, lazyConnect) } else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) { codec := utils.GOB if rpcConnCfg.Transport != "" { @@ -69,42 +59,67 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA return rpcPool, err } -var IntRPC *InternalRPC +var IntRPC *rpcclient.RPCClientSet -func init() { - IntRPC = &InternalRPC{subsystems: make(map[string]rpcclient.RpcClientConnection)} -} - -type InternalRPC struct { - sync.Mutex - subsystems map[string]rpcclient.RpcClientConnection -} - -func (irpc *InternalRPC) AddConnection(name string, conn rpcclient.RpcClientConnection) { - if conn == nil { +func InitInternalRPC() { + if !config.CgrConfig().DispatcherSCfg().Enabled { return } - irpc.Lock() - irpc.subsystems[name] = conn - irpc.Unlock() + var subsystems []string + subsystems = append(subsystems, utils.CacheSv1) + subsystems = append(subsystems, utils.GuardianSv1) + subsystems = append(subsystems, utils.SchedulerSv1) + subsystems = append(subsystems, utils.LoaderSv1) + if config.CgrConfig().AttributeSCfg().Enabled { + subsystems = append(subsystems, utils.AttributeSv1) + } + if config.CgrConfig().RalsCfg().RALsEnabled { + subsystems = append(subsystems, utils.ApierV1) + subsystems = append(subsystems, utils.ApierV2) + subsystems = append(subsystems, utils.Responder) + } + if config.CgrConfig().CdrsCfg().CDRSEnabled { + subsystems = append(subsystems, utils.CDRsV1) + subsystems = append(subsystems, utils.CDRsV2) + } + if config.CgrConfig().AnalyzerSCfg().Enabled { + subsystems = append(subsystems, utils.AnalyzerSv1) + } + if config.CgrConfig().SessionSCfg().Enabled { + subsystems = append(subsystems, utils.SessionSv1) + } + if config.CgrConfig().ChargerSCfg().Enabled { + subsystems = append(subsystems, utils.ChargerSv1) + } + if config.CgrConfig().ResourceSCfg().Enabled { + subsystems = append(subsystems, utils.ResourceSv1) + } + if config.CgrConfig().StatSCfg().Enabled { + subsystems = append(subsystems, utils.StatSv1) + } + if config.CgrConfig().ThresholdSCfg().Enabled { + subsystems = append(subsystems, utils.ThresholdSv1) + } + if config.CgrConfig().SupplierSCfg().Enabled { + subsystems = append(subsystems, utils.SupplierSv1) + } + IntRPC = rpcclient.NewRPCClientSet(subsystems, config.CgrConfig().GeneralCfg().InternalTtl) } -func (irpc *InternalRPC) Call(method string, args interface{}, reply interface{}) error { - methodSplit := strings.Split(method, ".") - if len(methodSplit) != 2 { - return rpcclient.ErrUnsupporteServiceMethod +func AddInternalRPCClient(name string, rpc rpcclient.RpcClientConnection) { + connChan := make(chan rpcclient.RpcClientConnection, 1) + connChan <- rpc + err := IntRPC.AddRPCConnection(name, "", "", false, "", "", "", + config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects, + config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout, + rpcclient.INTERNAL_RPC, connChan, true) + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Error adding %s to the set: %v", name, err.Error())) } - irpc.Lock() - defer irpc.Unlock() - conn, has := irpc.subsystems[methodSplit[0]] - if !has { - return rpcclient.ErrUnsupporteServiceMethod - } - return conn.Call(method, args, reply) } -func (irpc *InternalRPC) GetConnChan() (connChan chan rpcclient.RpcClientConnection) { - connChan = make(chan rpcclient.RpcClientConnection, 1) - connChan <- irpc - return +func GetInternalRPCClientChanel() chan rpcclient.RpcClientConnection { + connChan := make(chan rpcclient.RpcClientConnection, 1) + connChan <- IntRPC + return connChan } diff --git a/utils/consts.go b/utils/consts.go index 48c1afe38..251114af3 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -822,11 +822,13 @@ const ( // AnalyzerS APIs const ( + AnalyzerSv1 = "AnalyzerSv1" AnalyzerSv1Ping = "AnalyzerSv1.Ping" ) // LoaderS APIs const ( + LoaderSv1 = "LoaderSv1" LoaderSv1Load = "LoaderSv1.Load" LoaderSv1Ping = "LoaderSv1.Ping" ) @@ -868,8 +870,9 @@ const ( CDRsV1ProcessExternalCDR = "CDRsV1.ProcessExternalCDR" CDRsV1StoreSessionCost = "CDRsV1.StoreSessionCost" CDRsV1ProcessEvent = "CDRsV1.ProcessEvent" - CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost" CDRsV1Ping = "CDRsV1.Ping" + CDRsV2 = "CDRsV2" + CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost" ) // Scheduler