From 0d1d762572c82137a9bbbd0d4b3c1c91fb4f49f1 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 12 Sep 2019 14:33:06 +0300 Subject: [PATCH] Updated ServiceManager --- cmd/cgr-engine/cgr-engine.go | 29 +++++---- servmanager/servmanager.go | 113 +++++++++++++++++++++++++++++++++-- 2 files changed, 124 insertions(+), 18 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ab569a3fa..6b6ea413d 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -1680,6 +1680,22 @@ func main() { internalCoreSv1Chan := make(chan rpcclient.RpcClientConnection, 1) internalRALsv1Chan := make(chan rpcclient.RpcClientConnection, 1) + // init CacheS + cacheS := initCacheS(internalCacheSChan, server, dm, exitChan) + + // init GuardianSv1 + initGuardianSv1(internalGuardianSChan, server) + + // init CoreSv1 + initCoreSv1(internalCoreSv1Chan, server) + + // Start ServiceManager + srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, cdrDb, + loadDb, filterSChan, server, exitChan) + go srvManager.StartServices() + + initServiceManagerV1(internalServeManagerChan, srvManager, server) + // init internalRPCSet engine.IntRPC = engine.NewRPCClientSet() if cfg.DispatcherSCfg().Enabled { @@ -1706,19 +1722,6 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.RALsV1, internalRALsv1Chan) } - // init CacheS - cacheS := initCacheS(internalCacheSChan, server, dm, exitChan) - - // init GuardianSv1 - initGuardianSv1(internalGuardianSChan, server) - - // init CoreSv1 - initCoreSv1(internalCoreSv1Chan, server) - - // Start ServiceManager - srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, exitChan) - initServiceManagerV1(internalServeManagerChan, srvManager, server) - // init SchedulerS initSchedulerS(internalSchedSChan, srvManager, server) diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 0b6ab8e5c..c609fd1c9 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -20,6 +20,7 @@ package servmanager import ( "errors" + "fmt" "reflect" "strings" "sync" @@ -32,13 +33,22 @@ import ( ) func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager, - cacheS *engine.CacheS, engineShutdown chan bool) *ServiceManager { - return &ServiceManager{ + cacheS *engine.CacheS, cdrStorage engine.CdrStorage, + loadStorage engine.LoadStorage, filterSChan chan *engine.FilterS, + server *utils.Server, engineShutdown chan bool) *ServiceManager { + sm := &ServiceManager{ cfg: cfg, dm: dm, engineShutdown: engineShutdown, cacheS: cacheS, + + cdrStorage: cdrStorage, + loadStorage: loadStorage, + filterS: filterSChan, + server: server, + subsystems: make(map[string]Service), } + return sm } // ServiceManager handles service management ran by the engine @@ -49,6 +59,12 @@ type ServiceManager struct { engineShutdown chan bool cacheS *engine.CacheS sched *scheduler.Scheduler + + cdrStorage engine.CdrStorage + loadStorage engine.LoadStorage + filterS chan *engine.FilterS + server *utils.Server + subsystems map[string]Service } func (srvMngr *ServiceManager) StartScheduler(waitCache bool) error { @@ -187,6 +203,85 @@ func (srvMngr *ServiceManager) V1ServiceStatus(args ArgStartService, reply *stri return nil } +// GetDM returns the DataManager +func (srvMngr *ServiceManager) GetDM() *engine.DataManager { + srvMngr.RLock() + defer srvMngr.RUnlock() + return srvMngr.dm +} + +// GetCDRStorage returns the CdrStorage +func (srvMngr *ServiceManager) GetCDRStorage() engine.CdrStorage { + srvMngr.RLock() + defer srvMngr.RUnlock() + return srvMngr.cdrStorage +} + +// GetLoadStorage returns the LoadStorage +func (srvMngr *ServiceManager) GetLoadStorage() engine.LoadStorage { + srvMngr.RLock() + defer srvMngr.RUnlock() + return srvMngr.loadStorage +} + +// GetConfig returns the Configuration +func (srvMngr *ServiceManager) GetConfig() *config.CGRConfig { + srvMngr.RLock() + defer srvMngr.RUnlock() + return srvMngr.cfg +} + +// GetCacheS returns the CacheS +func (srvMngr *ServiceManager) GetCacheS() *engine.CacheS { + srvMngr.RLock() + defer srvMngr.RUnlock() + return srvMngr.cacheS +} + +// GetFilterS returns the FilterS +func (srvMngr *ServiceManager) GetFilterS() (fS *engine.FilterS) { + srvMngr.RLock() + defer srvMngr.RUnlock() + fS = <-srvMngr.filterS + srvMngr.filterS <- fS + return +} + +// GetServer returns the Server +func (srvMngr *ServiceManager) GetServer() *utils.Server { + srvMngr.RLock() + defer srvMngr.RUnlock() + return srvMngr.server +} + +// GetExitChan returns the exit chanel +func (srvMngr *ServiceManager) GetExitChan() chan bool { + return srvMngr.engineShutdown +} + +// GetConnection creates a rpcClient to the specified subsystem +func (srvMngr *ServiceManager) GetConnection(subsystem string, cfg *config.RemoteHost) (rpcclient.RpcClientConnection, error) { + return nil, nil +} + +// StartServices starts all enabled services +func (srvMngr *ServiceManager) StartServices() (err error) { + // go hendleReloads() + + // startServer() + return +} + +// AddService adds given services +func (srvMngr *ServiceManager) AddService(services ...Service) { + for _, srv := range services { + if _, has := srvMngr.subsystems[srv.ServiceName()]; has { // do not rewrite the service + continue + } + srvMngr.subsystems[srv.ServiceName()] = srv + } +} + // ServiceProvider should implement this to provide information for service type ServiceProvider interface { // GetDM returns the DataManager @@ -212,9 +307,17 @@ type ServiceProvider interface { // Service interface that describes what functions should a service implement type Service interface { // Start should handle the sercive start - Start(sp ServiceProvider) + Start(sp ServiceProvider, waitCache bool) error // Reload handles the change of config - Reload(sp ServiceProvider) + Reload(sp ServiceProvider) error + // GetRPCInterface returns the interface to register for server + GetRPCInterface() interface{} + // GetIntenternalChan returns the internal connection chanel + GetIntenternalChan() chan rpcclient.RpcClientConnection + // IsRunning returns if the service is running + IsRunning() bool + // ServiceName returns the service name + ServiceName() string // Shutdown stops the service - Shutdown() + Shutdown() error }