Updated ServiceManager

This commit is contained in:
Trial97
2019-09-12 14:33:06 +03:00
committed by Dan Christian Bogos
parent 82c64e22b7
commit 0d1d762572
2 changed files with 124 additions and 18 deletions

View File

@@ -1680,6 +1680,22 @@ func main() {
internalCoreSv1Chan := make(chan rpcclient.RpcClientConnection, 1)
internalRALsv1Chan := make(chan rpcclient.RpcClientConnection, 1)
// init CacheS
cacheS := initCacheS(internalCacheSChan, server, dm, exitChan)
// init GuardianSv1
initGuardianSv1(internalGuardianSChan, server)
// init CoreSv1
initCoreSv1(internalCoreSv1Chan, server)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, cdrDb,
loadDb, filterSChan, server, exitChan)
go srvManager.StartServices()
initServiceManagerV1(internalServeManagerChan, srvManager, server)
// init internalRPCSet
engine.IntRPC = engine.NewRPCClientSet()
if cfg.DispatcherSCfg().Enabled {
@@ -1706,19 +1722,6 @@ func main() {
engine.IntRPC.AddInternalRPCClient(utils.RALsV1, internalRALsv1Chan)
}
// init CacheS
cacheS := initCacheS(internalCacheSChan, server, dm, exitChan)
// init GuardianSv1
initGuardianSv1(internalGuardianSChan, server)
// init CoreSv1
initCoreSv1(internalCoreSv1Chan, server)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, exitChan)
initServiceManagerV1(internalServeManagerChan, srvManager, server)
// init SchedulerS
initSchedulerS(internalSchedSChan, srvManager, server)

View File

@@ -20,6 +20,7 @@ package servmanager
import (
"errors"
"fmt"
"reflect"
"strings"
"sync"
@@ -32,13 +33,22 @@ import (
)
func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager,
cacheS *engine.CacheS, engineShutdown chan bool) *ServiceManager {
return &ServiceManager{
cacheS *engine.CacheS, cdrStorage engine.CdrStorage,
loadStorage engine.LoadStorage, filterSChan chan *engine.FilterS,
server *utils.Server, engineShutdown chan bool) *ServiceManager {
sm := &ServiceManager{
cfg: cfg,
dm: dm,
engineShutdown: engineShutdown,
cacheS: cacheS,
cdrStorage: cdrStorage,
loadStorage: loadStorage,
filterS: filterSChan,
server: server,
subsystems: make(map[string]Service),
}
return sm
}
// ServiceManager handles service management ran by the engine
@@ -49,6 +59,12 @@ type ServiceManager struct {
engineShutdown chan bool
cacheS *engine.CacheS
sched *scheduler.Scheduler
cdrStorage engine.CdrStorage
loadStorage engine.LoadStorage
filterS chan *engine.FilterS
server *utils.Server
subsystems map[string]Service
}
func (srvMngr *ServiceManager) StartScheduler(waitCache bool) error {
@@ -187,6 +203,85 @@ func (srvMngr *ServiceManager) V1ServiceStatus(args ArgStartService, reply *stri
return nil
}
// GetDM returns the DataManager
func (srvMngr *ServiceManager) GetDM() *engine.DataManager {
srvMngr.RLock()
defer srvMngr.RUnlock()
return srvMngr.dm
}
// GetCDRStorage returns the CdrStorage
func (srvMngr *ServiceManager) GetCDRStorage() engine.CdrStorage {
srvMngr.RLock()
defer srvMngr.RUnlock()
return srvMngr.cdrStorage
}
// GetLoadStorage returns the LoadStorage
func (srvMngr *ServiceManager) GetLoadStorage() engine.LoadStorage {
srvMngr.RLock()
defer srvMngr.RUnlock()
return srvMngr.loadStorage
}
// GetConfig returns the Configuration
func (srvMngr *ServiceManager) GetConfig() *config.CGRConfig {
srvMngr.RLock()
defer srvMngr.RUnlock()
return srvMngr.cfg
}
// GetCacheS returns the CacheS
func (srvMngr *ServiceManager) GetCacheS() *engine.CacheS {
srvMngr.RLock()
defer srvMngr.RUnlock()
return srvMngr.cacheS
}
// GetFilterS returns the FilterS
func (srvMngr *ServiceManager) GetFilterS() (fS *engine.FilterS) {
srvMngr.RLock()
defer srvMngr.RUnlock()
fS = <-srvMngr.filterS
srvMngr.filterS <- fS
return
}
// GetServer returns the Server
func (srvMngr *ServiceManager) GetServer() *utils.Server {
srvMngr.RLock()
defer srvMngr.RUnlock()
return srvMngr.server
}
// GetExitChan returns the exit chanel
func (srvMngr *ServiceManager) GetExitChan() chan bool {
return srvMngr.engineShutdown
}
// GetConnection creates a rpcClient to the specified subsystem
func (srvMngr *ServiceManager) GetConnection(subsystem string, cfg *config.RemoteHost) (rpcclient.RpcClientConnection, error) {
return nil, nil
}
// StartServices starts all enabled services
func (srvMngr *ServiceManager) StartServices() (err error) {
// go hendleReloads()
// startServer()
return
}
// AddService adds given services
func (srvMngr *ServiceManager) AddService(services ...Service) {
for _, srv := range services {
if _, has := srvMngr.subsystems[srv.ServiceName()]; has { // do not rewrite the service
continue
}
srvMngr.subsystems[srv.ServiceName()] = srv
}
}
// ServiceProvider should implement this to provide information for service
type ServiceProvider interface {
// GetDM returns the DataManager
@@ -212,9 +307,17 @@ type ServiceProvider interface {
// Service interface that describes what functions should a service implement
type Service interface {
// Start should handle the sercive start
Start(sp ServiceProvider)
Start(sp ServiceProvider, waitCache bool) error
// Reload handles the change of config
Reload(sp ServiceProvider)
Reload(sp ServiceProvider) error
// GetRPCInterface returns the interface to register for server
GetRPCInterface() interface{}
// GetIntenternalChan returns the internal connection chanel
GetIntenternalChan() chan rpcclient.RpcClientConnection
// IsRunning returns if the service is running
IsRunning() bool
// ServiceName returns the service name
ServiceName() string
// Shutdown stops the service
Shutdown()
Shutdown() error
}