mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added RPCSet
This commit is contained in:
committed by
Dan Christian Bogos
parent
0580c5ba39
commit
7d95f1220a
@@ -301,8 +301,6 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
|
||||
ssv1 := v1.NewSessionSv1(sm) // methods with multiple options
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(ssv1)
|
||||
} else {
|
||||
engine.AddInternalRPCClient(utils.SessionSv1, ssv1)
|
||||
}
|
||||
// Register BiRpc handlers
|
||||
if cfg.SessionSCfg().ListenBijson != "" {
|
||||
@@ -680,8 +678,6 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec
|
||||
aSv1 := v1.NewAttributeSv1(aS)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(aSv1)
|
||||
} else {
|
||||
engine.AddInternalRPCClient(utils.AttributeSv1, aSv1)
|
||||
}
|
||||
internalAttributeSChan <- aSv1
|
||||
}
|
||||
@@ -733,8 +729,6 @@ func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection
|
||||
cSv1 := v1.NewChargerSv1(cS)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(cSv1)
|
||||
} else {
|
||||
engine.AddInternalRPCClient(utils.ChargerSv1, cSv1)
|
||||
}
|
||||
internalChargerSChan <- cSv1
|
||||
}
|
||||
@@ -784,8 +778,6 @@ func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cac
|
||||
rsV1 := v1.NewResourceSv1(rS)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(rsV1)
|
||||
} else {
|
||||
engine.AddInternalRPCClient(utils.ResourceSv1, rsV1)
|
||||
}
|
||||
internalRsChan <- rsV1
|
||||
}
|
||||
@@ -835,8 +827,6 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach
|
||||
stsV1 := v1.NewStatSv1(sS)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(stsV1)
|
||||
} else {
|
||||
engine.AddInternalRPCClient(utils.StatSv1, stsV1)
|
||||
}
|
||||
internalStatSChan <- stsV1
|
||||
}
|
||||
@@ -870,8 +860,6 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec
|
||||
tSv1 := v1.NewThresholdSv1(tS)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(tSv1)
|
||||
} else {
|
||||
engine.AddInternalRPCClient(utils.ThresholdSv1, tSv1)
|
||||
}
|
||||
internalThresholdSChan <- tSv1
|
||||
}
|
||||
@@ -955,8 +943,6 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti
|
||||
splV1 := v1.NewSupplierSv1(splS)
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(splV1)
|
||||
} else {
|
||||
engine.AddInternalRPCClient(utils.SupplierSv1, splV1)
|
||||
}
|
||||
internalSupplierSChan <- splV1
|
||||
}
|
||||
@@ -972,7 +958,7 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
|
||||
// loaderService will start and register APIs for LoaderService if enabled
|
||||
func startLoaderS(cfg *config.CGRConfig,
|
||||
dm *engine.DataManager, server *utils.Server, exitChan chan bool,
|
||||
filterSChan chan *engine.FilterS, cacheSChan chan rpcclient.RpcClientConnection) {
|
||||
filterSChan chan *engine.FilterS, internalLoaderSChan, cacheSChan chan rpcclient.RpcClientConnection) {
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
|
||||
@@ -982,7 +968,9 @@ func startLoaderS(cfg *config.CGRConfig,
|
||||
return
|
||||
}
|
||||
go ldrS.ListenAndServe(exitChan)
|
||||
server.RpcRegister(v1.NewLoaderSv1(ldrS))
|
||||
ldrSv1 := v1.NewLoaderSv1(ldrS)
|
||||
server.RpcRegister(ldrSv1)
|
||||
internalLoaderSChan <- ldrSv1
|
||||
}
|
||||
|
||||
// startDispatcherService fires up the DispatcherS
|
||||
@@ -1106,32 +1094,29 @@ func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection,
|
||||
}
|
||||
}()
|
||||
|
||||
chSv1 := v1.NewCacheSv1(chS)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(v1.NewCacheSv1(chS))
|
||||
} else {
|
||||
engine.AddInternalRPCClient(utils.CacheSv1, v1.NewCacheSv1(chS))
|
||||
server.RpcRegister(chSv1)
|
||||
}
|
||||
internalCacheSChan <- chS
|
||||
internalCacheSChan <- chS //v1
|
||||
return
|
||||
}
|
||||
|
||||
func initGuardianSv1(server *utils.Server) {
|
||||
func initGuardianSv1(internalGuardianSChan chan rpcclient.RpcClientConnection, server *utils.Server) {
|
||||
grdSv1 := v1.NewGuardianSv1()
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(v1.NewGuardianSv1())
|
||||
} else {
|
||||
engine.AddInternalRPCClient(utils.GuardianSv1, v1.NewGuardianSv1())
|
||||
server.RpcRegister(grdSv1)
|
||||
}
|
||||
internalGuardianSChan <- grdSv1
|
||||
}
|
||||
|
||||
func initSchedulerS(internalCacheSChan chan rpcclient.RpcClientConnection,
|
||||
func initSchedulerS(internalSchedSChan chan rpcclient.RpcClientConnection,
|
||||
srvMngr *servmanager.ServiceManager, server *utils.Server) {
|
||||
schdS := servmanager.NewSchedulerS(srvMngr)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(v1.NewSchedulerSv1(schdS))
|
||||
} else {
|
||||
engine.AddInternalRPCClient(utils.SchedulerSv1, v1.NewSchedulerSv1(schdS))
|
||||
}
|
||||
internalCacheSChan <- schdS
|
||||
internalSchedSChan <- schdS
|
||||
}
|
||||
|
||||
func startRpc(server *utils.Server, internalRaterChan,
|
||||
@@ -1427,9 +1412,6 @@ func main() {
|
||||
engine.SetRpSubjectPrefixMatching(cfg.RalsCfg().RpSubjectPrefixMatching)
|
||||
stopHandled := false
|
||||
|
||||
// initializate internal RPC conections for dispatcher
|
||||
engine.InitInternalRPC()
|
||||
|
||||
// Rpc/http server
|
||||
server := utils.NewServer()
|
||||
|
||||
@@ -1439,6 +1421,8 @@ func main() {
|
||||
// Async starts here, will follow cgrates.json start order
|
||||
|
||||
// Define internal connections via channels
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
internalDispatcherSChan := make(chan *dispatchers.DispatcherService, 1)
|
||||
internalRaterChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalSMGChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
@@ -1448,17 +1432,39 @@ func main() {
|
||||
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalSupplierSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
internalDispatcherSChan := make(chan *dispatchers.DispatcherService, 1)
|
||||
internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalSchedSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalGuardianSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalLoaderSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
|
||||
// init internalRPCSet
|
||||
engine.IntRPC = engine.NewRPCClientSet()
|
||||
if cfg.DispatcherSCfg().Enabled {
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, internalAnalyzerSChan)
|
||||
// engine.IntRPC.AddInternalRPCClient(utils.ApierV1, internalApierV1Chan)
|
||||
// engine.IntRPC.AddInternalRPCClient(utils.ApierV2, internalApierV2Chan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, internalAttributeSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) // server or from apier
|
||||
// engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, internalCdrSChan)
|
||||
// engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, internalCdrSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, internalChargerSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, internalRsChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.Responder, internalRaterChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, internalSchedSChan) // server or from apier
|
||||
engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, internalSMGChan) // server or from apier
|
||||
engine.IntRPC.AddInternalRPCClient(utils.StatSv1, internalStatSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, internalSupplierSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, internalThresholdSChan)
|
||||
}
|
||||
|
||||
// init CacheS
|
||||
cacheS := initCacheS(internalCacheSChan, server, dm, exitChan)
|
||||
|
||||
// init GuardianSv1
|
||||
initGuardianSv1(server)
|
||||
initGuardianSv1(internalGuardianSChan, server)
|
||||
|
||||
// Start ServiceManager
|
||||
srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheS)
|
||||
@@ -1575,7 +1581,7 @@ func main() {
|
||||
go startAnalyzerService(internalAnalyzerSChan, server, exitChan)
|
||||
}
|
||||
|
||||
go startLoaderS(cfg, dm, server, exitChan, filterSChan, internalCacheSChan)
|
||||
go startLoaderS(cfg, dm, server, exitChan, filterSChan, internalLoaderSChan, internalCacheSChan)
|
||||
|
||||
// Serve rpc connections
|
||||
go startRpc(server, internalRaterChan, internalCdrSChan,
|
||||
|
||||
@@ -190,8 +190,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
|
||||
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(responder)
|
||||
} else {
|
||||
engine.AddInternalRPCClient(utils.Responder, responder)
|
||||
}
|
||||
server.RpcRegister(apierRpcV1) // ToDo: Add apierv1 to dispatcher
|
||||
server.RpcRegister(apierRpcV2) // ToDo: Add apierv2 to dispatcher
|
||||
|
||||
@@ -1333,7 +1333,7 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
dH.Conns, GetInternalRPCClientChanel(), cfg.GeneralCfg().InternalTtl, false); err != nil {
|
||||
dH.Conns, IntRPC.GetInternalChanel(), cfg.GeneralCfg().InternalTtl, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cacheWrite {
|
||||
|
||||
@@ -20,6 +20,7 @@ package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -59,57 +60,37 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA
|
||||
return rpcPool, err
|
||||
}
|
||||
|
||||
var IntRPC *rpcclient.RPCClientSet
|
||||
var IntRPC *RPCClientSet
|
||||
|
||||
func InitInternalRPC() {
|
||||
if !config.CgrConfig().DispatcherSCfg().Enabled {
|
||||
return
|
||||
}
|
||||
var subsystems []string
|
||||
subsystems = append(subsystems, utils.CacheSv1)
|
||||
subsystems = append(subsystems, utils.GuardianSv1)
|
||||
subsystems = append(subsystems, utils.SchedulerSv1)
|
||||
subsystems = append(subsystems, utils.LoaderSv1)
|
||||
if config.CgrConfig().AttributeSCfg().Enabled {
|
||||
subsystems = append(subsystems, utils.AttributeSv1)
|
||||
}
|
||||
if config.CgrConfig().RalsCfg().RALsEnabled {
|
||||
subsystems = append(subsystems, utils.ApierV1)
|
||||
subsystems = append(subsystems, utils.ApierV2)
|
||||
subsystems = append(subsystems, utils.Responder)
|
||||
}
|
||||
if config.CgrConfig().CdrsCfg().CDRSEnabled {
|
||||
subsystems = append(subsystems, utils.CDRsV1)
|
||||
subsystems = append(subsystems, utils.CDRsV2)
|
||||
}
|
||||
if config.CgrConfig().AnalyzerSCfg().Enabled {
|
||||
subsystems = append(subsystems, utils.AnalyzerSv1)
|
||||
}
|
||||
if config.CgrConfig().SessionSCfg().Enabled {
|
||||
subsystems = append(subsystems, utils.SessionSv1)
|
||||
}
|
||||
if config.CgrConfig().ChargerSCfg().Enabled {
|
||||
subsystems = append(subsystems, utils.ChargerSv1)
|
||||
}
|
||||
if config.CgrConfig().ResourceSCfg().Enabled {
|
||||
subsystems = append(subsystems, utils.ResourceSv1)
|
||||
}
|
||||
if config.CgrConfig().StatSCfg().Enabled {
|
||||
subsystems = append(subsystems, utils.StatSv1)
|
||||
}
|
||||
if config.CgrConfig().ThresholdSCfg().Enabled {
|
||||
subsystems = append(subsystems, utils.ThresholdSv1)
|
||||
}
|
||||
if config.CgrConfig().SupplierSCfg().Enabled {
|
||||
subsystems = append(subsystems, utils.SupplierSv1)
|
||||
}
|
||||
IntRPC = rpcclient.NewRPCClientSet(subsystems, config.CgrConfig().GeneralCfg().InternalTtl)
|
||||
func NewRPCClientSet() (s *RPCClientSet) {
|
||||
return &RPCClientSet{set: make(map[string]*rpcclient.RpcClient)}
|
||||
}
|
||||
|
||||
func AddInternalRPCClient(name string, rpc rpcclient.RpcClientConnection) {
|
||||
connChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
connChan <- rpc
|
||||
err := IntRPC.AddRPCConnection(name, "", "", false, "", "", "",
|
||||
type RPCClientSet struct {
|
||||
set map[string]*rpcclient.RpcClient
|
||||
}
|
||||
|
||||
func (s *RPCClientSet) AddRPCClient(name string, rpc *rpcclient.RpcClient) {
|
||||
s.set[name] = rpc
|
||||
}
|
||||
|
||||
func (s *RPCClientSet) AddRPCConnection(name, transport, addr string, tls bool,
|
||||
key_path, cert_path, ca_path string, connectAttempts, reconnects int,
|
||||
connTimeout, replyTimeout time.Duration, codec string,
|
||||
internalChan chan rpcclient.RpcClientConnection, lazyConnect bool) error {
|
||||
rpc, err := rpcclient.NewRpcClient(transport, addr, tls, key_path, cert_path,
|
||||
ca_path, connectAttempts, reconnects, connTimeout, replyTimeout,
|
||||
codec, internalChan, lazyConnect)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.AddRPCClient(name, rpc)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *RPCClientSet) AddInternalRPCClient(name string, connChan chan rpcclient.RpcClientConnection) {
|
||||
err := s.AddRPCConnection(name, utils.EmptyString, utils.EmptyString,
|
||||
false, utils.EmptyString, utils.EmptyString, utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects,
|
||||
config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout,
|
||||
rpcclient.INTERNAL_RPC, connChan, true)
|
||||
@@ -118,8 +99,20 @@ func AddInternalRPCClient(name string, rpc rpcclient.RpcClientConnection) {
|
||||
}
|
||||
}
|
||||
|
||||
func GetInternalRPCClientChanel() chan rpcclient.RpcClientConnection {
|
||||
func (s *RPCClientSet) GetInternalChanel() chan rpcclient.RpcClientConnection {
|
||||
connChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
connChan <- IntRPC
|
||||
connChan <- s
|
||||
return connChan
|
||||
}
|
||||
|
||||
func (s *RPCClientSet) Call(method string, args interface{}, reply interface{}) error {
|
||||
methodSplit := strings.Split(method, ".")
|
||||
if len(methodSplit) != 2 {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
}
|
||||
conn, has := s.set[methodSplit[0]]
|
||||
if !has {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
}
|
||||
return conn.Call(method, args, reply)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user