mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 13:19:53 +05:00
Let ServManager manage all services except itself
This commit is contained in:
committed by
Dan Christian Bogos
parent
dcb38c78bf
commit
09b75a1045
@@ -34,7 +34,6 @@ import (
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/commonlisteners"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/efs"
|
||||
@@ -97,6 +96,7 @@ func runCGREngine(fs []string) (err error) {
|
||||
}
|
||||
shdWg.Add(1)
|
||||
go func() { // Schedule shutdown
|
||||
defer shdWg.Done()
|
||||
tm := time.NewTimer(shtDwDur)
|
||||
select {
|
||||
case <-tm.C:
|
||||
@@ -104,7 +104,6 @@ func runCGREngine(fs []string) (err error) {
|
||||
case <-shutdown:
|
||||
tm.Stop()
|
||||
}
|
||||
shdWg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -240,73 +239,11 @@ func runCGREngine(fs []string) (err error) {
|
||||
// TODO: check if there's any need to manually stop memory profiling.
|
||||
// It should be stopped automatically during CoreS service shutdown.
|
||||
|
||||
utils.Logger.Info("<CoreS> stopped all components. CGRateS shutdown!")
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> stopped all services. CGRateS shutdown!", utils.ServiceManager))
|
||||
}()
|
||||
|
||||
shdWg.Add(1)
|
||||
if err = gvS.Start(shutdown); err != nil {
|
||||
shdWg.Done()
|
||||
srvManager.ShutdownServices()
|
||||
return
|
||||
}
|
||||
if cls.ShouldRun() {
|
||||
shdWg.Add(1)
|
||||
if err = cls.Start(shutdown); err != nil {
|
||||
shdWg.Done()
|
||||
srvManager.ShutdownServices()
|
||||
return
|
||||
}
|
||||
}
|
||||
if efs.ShouldRun() { // efs checking first because of loggers
|
||||
shdWg.Add(1)
|
||||
if err = efs.Start(shutdown); err != nil {
|
||||
shdWg.Done()
|
||||
srvManager.ShutdownServices()
|
||||
return
|
||||
}
|
||||
}
|
||||
if dmS.ShouldRun() { // Some services can run without db, ie: ERs
|
||||
shdWg.Add(1)
|
||||
if err = dmS.Start(shutdown); err != nil {
|
||||
shdWg.Done()
|
||||
srvManager.ShutdownServices()
|
||||
return
|
||||
}
|
||||
}
|
||||
if sdbS.ShouldRun() {
|
||||
shdWg.Add(1)
|
||||
if err = sdbS.Start(shutdown); err != nil {
|
||||
shdWg.Done()
|
||||
srvManager.ShutdownServices()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if anzS.ShouldRun() {
|
||||
shdWg.Add(1)
|
||||
if err = anzS.Start(shutdown); err != nil {
|
||||
shdWg.Done()
|
||||
srvManager.ShutdownServices()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
close(anzS.StateChan(utils.StateServiceUP))
|
||||
}
|
||||
|
||||
shdWg.Add(1)
|
||||
if err = coreS.Start(shutdown); err != nil {
|
||||
shdWg.Done()
|
||||
srvManager.ShutdownServices()
|
||||
return
|
||||
}
|
||||
shdWg.Add(1)
|
||||
if err = cacheS.Start(shutdown); err != nil {
|
||||
shdWg.Done()
|
||||
srvManager.ShutdownServices()
|
||||
return
|
||||
}
|
||||
srvManager.StartServices(shutdown)
|
||||
cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, cls.CLS(), anzS)
|
||||
cgrInitServiceManagerV1(iServeManagerCh, cfg, srvManager, srvIdxr)
|
||||
|
||||
if *flags.Preload != utils.EmptyString {
|
||||
if err = cgrRunPreload(cfg, *flags.Preload, srvIdxr); err != nil {
|
||||
@@ -360,9 +297,17 @@ func cgrRunPreload(cfg *config.CGRConfig, loaderIDs string,
|
||||
return
|
||||
}
|
||||
|
||||
func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
|
||||
srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig,
|
||||
cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) {
|
||||
func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, cfg *config.CGRConfig,
|
||||
srvMngr *servmanager.ServiceManager, srvIndexer *servmanager.ServiceIndexer) {
|
||||
cls := srvIndexer.GetService(utils.CommonListenerS).(*services.CommonListenerService)
|
||||
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) {
|
||||
return
|
||||
}
|
||||
cl := cls.CLS()
|
||||
anz := srvIndexer.GetService(utils.AnalyzerS).(*services.AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) {
|
||||
return
|
||||
}
|
||||
srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
cl.RpcRegister(srv)
|
||||
@@ -383,6 +328,7 @@ func cgrStartRPC(cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer, shutd
|
||||
}
|
||||
|
||||
func handleSignals(stopChan chan struct{}, cfg *config.CGRConfig, shdWg *sync.WaitGroup) {
|
||||
defer shdWg.Done()
|
||||
shutdownSignal := make(chan os.Signal, 1)
|
||||
reloadSignal := make(chan os.Signal, 1)
|
||||
signal.Notify(shutdownSignal, os.Interrupt,
|
||||
@@ -391,7 +337,6 @@ func handleSignals(stopChan chan struct{}, cfg *config.CGRConfig, shdWg *sync.Wa
|
||||
for {
|
||||
select {
|
||||
case <-stopChan:
|
||||
shdWg.Done()
|
||||
return
|
||||
case <-shutdownSignal:
|
||||
close(stopChan)
|
||||
|
||||
@@ -35,11 +35,18 @@ import (
|
||||
// NewAnalyzerService returns the Analyzer Service
|
||||
func NewAnalyzerService(cfg *config.CGRConfig,
|
||||
srvIndexer *servmanager.ServiceIndexer) *AnalyzerService {
|
||||
return &AnalyzerService{
|
||||
anz := &AnalyzerService{
|
||||
cfg: cfg,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
}
|
||||
|
||||
// Wait for AnalyzerService only when it should run.
|
||||
if !anz.ShouldRun() {
|
||||
close(anz.StateChan(utils.StateServiceUP))
|
||||
}
|
||||
|
||||
return anz
|
||||
}
|
||||
|
||||
// AnalyzerService implements Service interface
|
||||
|
||||
Reference in New Issue
Block a user