From 09b75a1045648f7c199548b8d67612ca2b76bd20 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 18 Dec 2024 07:55:31 +0200 Subject: [PATCH] Let ServManager manage all services except itself --- cmd/cgr-engine/cgr-engine.go | 85 +++++++----------------------------- services/analyzers.go | 9 +++- 2 files changed, 23 insertions(+), 71 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 22242bce5..c051d5859 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -34,7 +34,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/apis" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/efs" @@ -97,6 +96,7 @@ func runCGREngine(fs []string) (err error) { } shdWg.Add(1) go func() { // Schedule shutdown + defer shdWg.Done() tm := time.NewTimer(shtDwDur) select { case <-tm.C: @@ -104,7 +104,6 @@ func runCGREngine(fs []string) (err error) { case <-shutdown: tm.Stop() } - shdWg.Done() }() } @@ -240,73 +239,11 @@ func runCGREngine(fs []string) (err error) { // TODO: check if there's any need to manually stop memory profiling. // It should be stopped automatically during CoreS service shutdown. - utils.Logger.Info(" stopped all components. CGRateS shutdown!") + utils.Logger.Info(fmt.Sprintf("<%s> stopped all services. CGRateS shutdown!", utils.ServiceManager)) }() - shdWg.Add(1) - if err = gvS.Start(shutdown); err != nil { - shdWg.Done() - srvManager.ShutdownServices() - return - } - if cls.ShouldRun() { - shdWg.Add(1) - if err = cls.Start(shutdown); err != nil { - shdWg.Done() - srvManager.ShutdownServices() - return - } - } - if efs.ShouldRun() { // efs checking first because of loggers - shdWg.Add(1) - if err = efs.Start(shutdown); err != nil { - shdWg.Done() - srvManager.ShutdownServices() - return - } - } - if dmS.ShouldRun() { // Some services can run without db, ie: ERs - shdWg.Add(1) - if err = dmS.Start(shutdown); err != nil { - shdWg.Done() - srvManager.ShutdownServices() - return - } - } - if sdbS.ShouldRun() { - shdWg.Add(1) - if err = sdbS.Start(shutdown); err != nil { - shdWg.Done() - srvManager.ShutdownServices() - return - } - } - - if anzS.ShouldRun() { - shdWg.Add(1) - if err = anzS.Start(shutdown); err != nil { - shdWg.Done() - srvManager.ShutdownServices() - return - } - } else { - close(anzS.StateChan(utils.StateServiceUP)) - } - - shdWg.Add(1) - if err = coreS.Start(shutdown); err != nil { - shdWg.Done() - srvManager.ShutdownServices() - return - } - shdWg.Add(1) - if err = cacheS.Start(shutdown); err != nil { - shdWg.Done() - srvManager.ShutdownServices() - return - } srvManager.StartServices(shutdown) - cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, cls.CLS(), anzS) + cgrInitServiceManagerV1(iServeManagerCh, cfg, srvManager, srvIdxr) if *flags.Preload != utils.EmptyString { if err = cgrRunPreload(cfg, *flags.Preload, srvIdxr); err != nil { @@ -360,9 +297,17 @@ func cgrRunPreload(cfg *config.CGRConfig, loaderIDs string, return } -func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, - srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig, - cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { +func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, cfg *config.CGRConfig, + srvMngr *servmanager.ServiceManager, srvIndexer *servmanager.ServiceIndexer) { + cls := srvIndexer.GetService(utils.CommonListenerS).(*services.CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) { + return + } + cl := cls.CLS() + anz := srvIndexer.GetService(utils.AnalyzerS).(*services.AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) { + return + } srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false) if !cfg.DispatcherSCfg().Enabled { cl.RpcRegister(srv) @@ -383,6 +328,7 @@ func cgrStartRPC(cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer, shutd } func handleSignals(stopChan chan struct{}, cfg *config.CGRConfig, shdWg *sync.WaitGroup) { + defer shdWg.Done() shutdownSignal := make(chan os.Signal, 1) reloadSignal := make(chan os.Signal, 1) signal.Notify(shutdownSignal, os.Interrupt, @@ -391,7 +337,6 @@ func handleSignals(stopChan chan struct{}, cfg *config.CGRConfig, shdWg *sync.Wa for { select { case <-stopChan: - shdWg.Done() return case <-shutdownSignal: close(stopChan) diff --git a/services/analyzers.go b/services/analyzers.go index 106764c3d..e31dc436b 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -35,11 +35,18 @@ import ( // NewAnalyzerService returns the Analyzer Service func NewAnalyzerService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceIndexer) *AnalyzerService { - return &AnalyzerService{ + anz := &AnalyzerService{ cfg: cfg, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } + + // Wait for AnalyzerService only when it should run. + if !anz.ShouldRun() { + close(anz.StateChan(utils.StateServiceUP)) + } + + return anz } // AnalyzerService implements Service interface