From c49e67b2ed6c271e6894e8d8f7c4ebdcd921666d Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 9 Jan 2025 17:55:31 +0200 Subject: [PATCH] Handle SERVICE_UP/DOWN states on ServManager layer --- services/accounts.go | 2 -- services/actions.go | 2 -- services/adminsv1.go | 2 -- services/analyzers.go | 8 -------- services/asteriskagent.go | 2 -- services/attributes.go | 2 -- services/caches.go | 2 -- services/cdrs.go | 2 -- services/chargers.go | 2 -- services/commonlisteners.go | 2 -- services/config.go | 2 -- services/cores.go | 2 -- services/datadb.go | 2 -- services/diameteragent.go | 2 -- services/dispatchers.go | 2 -- services/dnsagent.go | 2 -- services/ees.go | 2 -- services/efs.go | 2 -- services/ers.go | 2 -- services/filters.go | 2 -- services/freeswitchagent.go | 2 -- services/globalvars.go | 2 -- services/guardian.go | 2 -- services/httpagent.go | 2 -- services/janus.go | 2 -- services/kamailioagent.go | 2 -- services/loaders.go | 2 -- services/radiusagent.go | 2 -- services/rankings.go | 2 -- services/rates.go | 2 -- services/registrarc.go | 2 -- services/resources.go | 2 -- services/routes.go | 2 -- services/sessions.go | 2 -- services/sipagent.go | 2 -- services/statedeps.go | 6 ++++++ services/stats.go | 2 -- services/stordb.go | 2 -- services/thresholds.go | 2 -- services/tpes.go | 2 -- services/trends.go | 2 -- servmanager/servmanager.go | 8 ++++++++ 42 files changed, 14 insertions(+), 86 deletions(-) diff --git a/services/accounts.go b/services/accounts.go index 0b8b16a22..35469ba34 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -98,7 +98,6 @@ func (acts *AccountService) Start(shutdown chan struct{}, registry *servmanager. } acts.intRPCconn = anz.GetInternalCodec(srv, utils.AccountS) - close(acts.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -115,7 +114,6 @@ func (acts *AccountService) Shutdown(_ *servmanager.ServiceRegistry) (err error) acts.acts = nil acts.Unlock() acts.cl.RpcUnregisterName(utils.AccountSv1) - close(acts.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/actions.go b/services/actions.go index 79b590c85..e344d1a32 100644 --- a/services/actions.go +++ b/services/actions.go @@ -99,7 +99,6 @@ func (acts *ActionService) Start(shutdown chan struct{}, registry *servmanager.S } acts.intRPCconn = anz.GetInternalCodec(srv, utils.ActionS) - close(acts.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -117,7 +116,6 @@ func (acts *ActionService) Shutdown(_ *servmanager.ServiceRegistry) (err error) acts.acts.Shutdown() acts.acts = nil acts.cl.RpcUnregisterName(utils.ActionSv1) - close(acts.stateDeps.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/adminsv1.go b/services/adminsv1.go index 59e68434f..1da692742 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -96,7 +96,6 @@ func (apiService *AdminSv1Service) Start(_ chan struct{}, registry *servmanager. //backwards compatible apiService.intRPCconn = anz.GetInternalCodec(srv, utils.AdminSv1) - close(apiService.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -112,7 +111,6 @@ func (apiService *AdminSv1Service) Shutdown(_ *servmanager.ServiceRegistry) (err apiService.api = nil apiService.cl.RpcUnregisterName(utils.AdminSv1) apiService.Unlock() - close(apiService.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/analyzers.go b/services/analyzers.go index 8d33dbfcc..7cc58a37a 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -38,12 +38,6 @@ func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService { cfg: cfg, stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } - - // Wait for AnalyzerService only when it should run. - if !anz.ShouldRun() { - close(anz.StateChan(utils.StateServiceUP)) - } - return anz } @@ -86,7 +80,6 @@ func (anz *AnalyzerService) Start(shutdown chan struct{}, registry *servmanager. }(anz.anz) anz.cl.SetAnalyzer(anz.anz) go anz.start(registry) - close(anz.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -123,7 +116,6 @@ func (anz *AnalyzerService) Shutdown(_ *servmanager.ServiceRegistry) (err error) anz.anz = nil anz.Unlock() anz.cl.RpcUnregisterName(utils.AnalyzerSv1) - close(anz.stateDeps.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/asteriskagent.go b/services/asteriskagent.go index 766bd4866..4ce71e969 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -71,7 +71,6 @@ func (ast *AsteriskAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRe ast.smas[connIdx] = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr) go listenAndServe(ast.smas[connIdx], ast.stopChan) } - close(ast.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -84,7 +83,6 @@ func (ast *AsteriskAgent) Reload(shutdown chan struct{}, registry *servmanager.S // Shutdown stops the service func (ast *AsteriskAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { ast.shutdown() - close(ast.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/attributes.go b/services/attributes.go index 8684a0272..77abb0118 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -106,7 +106,6 @@ func (attrS *AttributeService) Start(shutdown chan struct{}, registry *servmanag }() attrS.intRPCconn = anz.GetInternalCodec(srv, utils.AttributeS) - close(attrS.stateDeps.StateChan(utils.StateServiceUP)) // inform listeners about the service reaching UP state return } @@ -123,7 +122,6 @@ func (attrS *AttributeService) Shutdown(_ *servmanager.ServiceRegistry) (err err attrS.cl.RpcUnregisterName(utils.AttributeSv1) attrS.dspS.UnregisterShutdownChan(attrS.ServiceName()) attrS.Unlock() - close(attrS.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/caches.go b/services/caches.go index eba4cfe6b..b6f9ac728 100644 --- a/services/caches.go +++ b/services/caches.go @@ -86,7 +86,6 @@ func (cS *CacheService) Start(shutdown chan struct{}, registry *servmanager.Serv } } cS.intRPCconn = anz.GetInternalCodec(srv, utils.CacheS) - close(cS.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -98,7 +97,6 @@ func (cS *CacheService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) // Shutdown stops the service func (cS *CacheService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { cS.cl.RpcUnregisterName(utils.CacheSv1) - close(cS.stateDeps.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/cdrs.go b/services/cdrs.go index 7be02262b..33c776ba3 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -89,7 +89,6 @@ func (cs *CDRService) Start(_ chan struct{}, registry *servmanager.ServiceRegist } cs.intRPCconn = anz.GetInternalCodec(srv, utils.CDRServer) - close(cs.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -104,7 +103,6 @@ func (cs *CDRService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { cs.cdrS = nil cs.Unlock() cs.cl.RpcUnregisterName(utils.CDRsV1) - close(cs.stateDeps.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/chargers.go b/services/chargers.go index edb1d46c3..69152f525 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -90,7 +90,6 @@ func (chrS *ChargerService) Start(shutdown chan struct{}, registry *servmanager. } chrS.intRPCconn = anz.GetInternalCodec(srv, utils.ChargerS) - close(chrS.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -105,7 +104,6 @@ func (chrS *ChargerService) Shutdown(_ *servmanager.ServiceRegistry) (err error) defer chrS.Unlock() chrS.chrS = nil chrS.cl.RpcUnregisterName(utils.ChargerSv1) - close(chrS.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 1af102eb1..e131ada6a 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -63,7 +63,6 @@ func (cl *CommonListenerService) Start(_ chan struct{}, _ *servmanager.ServiceRe if cl.cfg.ConfigSCfg().Enabled { cl.cls.RegisterHTTPFunc(cl.cfg.ConfigSCfg().URL, config.HandlerConfigS) } - close(cl.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -77,7 +76,6 @@ func (cl *CommonListenerService) Shutdown(_ *servmanager.ServiceRegistry) error cl.mu.Lock() defer cl.mu.Unlock() cl.cls = nil - close(cl.StateChan(utils.StateServiceDOWN)) return nil } diff --git a/services/config.go b/services/config.go index d9f13f5e6..54bf8fe0e 100644 --- a/services/config.go +++ b/services/config.go @@ -67,7 +67,6 @@ func (s *ConfigService) Start(_ chan struct{}, registry *servmanager.ServiceRegi } } s.intRPCconn = anz.GetInternalCodec(svcs, utils.ConfigSv1) - close(s.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -79,7 +78,6 @@ func (s *ConfigService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) // Shutdown stops the service. func (s *ConfigService) Shutdown(_ *servmanager.ServiceRegistry) error { s.cl.RpcUnregisterName(utils.ConfigSv1) - close(s.StateChan(utils.StateServiceDOWN)) return nil } diff --git a/services/cores.go b/services/cores.go index e7cb748a4..09ab664c9 100644 --- a/services/cores.go +++ b/services/cores.go @@ -92,7 +92,6 @@ func (cS *CoreService) Start(shutdown chan struct{}, registry *servmanager.Servi } cS.intRPCconn = anz.GetInternalCodec(srv, utils.CoreS) - close(cS.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -112,7 +111,6 @@ func (cS *CoreService) Shutdown(_ *servmanager.ServiceRegistry) error { cS.cS = nil <-cS.csCh cS.cl.RpcUnregisterName(utils.CoreSv1) - close(cS.StateChan(utils.StateServiceDOWN)) return nil } diff --git a/services/datadb.go b/services/datadb.go index 05bc84629..0c394f0b9 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -82,7 +82,6 @@ func (db *DataDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) return err } - close(db.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -122,7 +121,6 @@ func (db *DataDBService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { db.dm.DataDB().Close() db.dm = nil db.Unlock() - close(db.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/diameteragent.go b/services/diameteragent.go index 9451b5fe5..2c83bd8a2 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -89,7 +89,6 @@ func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutd close(shutdown) } }(da.da) - close(da.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -117,7 +116,6 @@ func (da *DiameterAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { close(da.stopChan) da.da = nil da.Unlock() - close(da.StateChan(utils.StateServiceDOWN)) return // no shutdown for the momment } diff --git a/services/dispatchers.go b/services/dispatchers.go index 4bac8a486..889aba913 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -99,7 +99,6 @@ func (dspS *DispatcherService) Start(shutdown chan struct{}, registry *servmanag // until we figured out a better sollution in case of gob server // dspS.server.SetDispatched() dspS.intRPCconn = anz.GetInternalCodec(srv, utils.DispatcherS) - close(dspS.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -119,7 +118,6 @@ func (dspS *DispatcherService) Shutdown(_ *servmanager.ServiceRegistry) (err err dspS.unregisterAllDispatchedSubsystems() dspS.connMgr.DisableDispatcher() dspS.sync() - close(dspS.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/dnsagent.go b/services/dnsagent.go index 915d18b3a..a001b8b11 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -72,7 +72,6 @@ func (dns *DNSAgent) Start(shutdown chan struct{}, registry *servmanager.Service } dns.stopChan = make(chan struct{}) go dns.listenAndServe(dns.stopChan, shutdown) - close(dns.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -124,7 +123,6 @@ func (dns *DNSAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { dns.Lock() defer dns.Unlock() dns.dns = nil - close(dns.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/ees.go b/services/ees.go index 794bffa88..330c1e5ca 100644 --- a/services/ees.go +++ b/services/ees.go @@ -79,7 +79,6 @@ func (es *EventExporterService) Shutdown(_ *servmanager.ServiceRegistry) error { es.eeS.ClearExporterCache() es.eeS = nil es.cl.RpcUnregisterName(utils.EeSv1) - close(es.StateChan(utils.StateServiceDOWN)) return nil } @@ -114,7 +113,6 @@ func (es *EventExporterService) Start(_ chan struct{}, registry *servmanager.Ser } es.intRPCconn = anz.GetInternalCodec(srv, utils.EEs) - close(es.stateDeps.StateChan(utils.StateServiceUP)) return nil } diff --git a/services/efs.go b/services/efs.go index 6913f9489..e4c2f3363 100644 --- a/services/efs.go +++ b/services/efs.go @@ -71,7 +71,6 @@ func (efServ *ExportFailoverService) Start(_ chan struct{}, registry *servmanage efServ.stopChan = make(chan struct{}) efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx) efServ.cl.RpcRegister(efServ.srv) - close(efServ.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -85,7 +84,6 @@ func (efServ *ExportFailoverService) Shutdown(_ *servmanager.ServiceRegistry) (e efServ.srv = nil close(efServ.stopChan) // NEXT SHOULD EXPORT ALL THE SHUTDOWN LOGGERS TO WRITE - close(efServ.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/ers.go b/services/ers.go index 92ae2fcae..caa251247 100644 --- a/services/ers.go +++ b/services/ers.go @@ -93,7 +93,6 @@ func (erS *EventReaderService) Start(shutdown chan struct{}, registry *servmanag erS.cl.RpcRegister(srv) } erS.intRPCconn = anz.GetInternalCodec(srv, utils.ERs) - close(erS.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -120,7 +119,6 @@ func (erS *EventReaderService) Shutdown(_ *servmanager.ServiceRegistry) (err err close(erS.stopChan) erS.ers = nil erS.cl.RpcUnregisterName(utils.ErSv1) - close(erS.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/filters.go b/services/filters.go index 4b312a719..a59e8a185 100644 --- a/services/filters.go +++ b/services/filters.go @@ -71,7 +71,6 @@ func (s *FilterService) Start(shutdown chan struct{}, registry *servmanager.Serv defer s.mu.Unlock() s.fltrS = engine.NewFilterS(s.cfg, s.connMgr, dbs.DataManager()) - close(s.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -85,7 +84,6 @@ func (s *FilterService) Shutdown(_ *servmanager.ServiceRegistry) error { s.mu.Lock() defer s.mu.Unlock() s.fltrS = nil - close(s.stateDeps.StateChan(utils.StateServiceDOWN)) return nil } diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index b0e33870f..b2d526820 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -61,7 +61,6 @@ func (fS *FreeswitchAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceR fS.fS = agents.NewFSsessions(fS.cfg.FsAgentCfg(), fS.cfg.GeneralCfg().DefaultTimezone, fS.connMgr) go fS.connect(shutdown) - close(fS.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -91,7 +90,6 @@ func (fS *FreeswitchAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) defer fS.Unlock() err = fS.fS.Shutdown() fS.fS = nil - close(fS.stateDeps.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/globalvars.go b/services/globalvars.go index 6efbf934c..07a775326 100644 --- a/services/globalvars.go +++ b/services/globalvars.go @@ -50,7 +50,6 @@ func (gv *GlobalVarS) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) err utils.DecimalContext.MinScale = gv.cfg.GeneralCfg().DecimalMinScale utils.DecimalContext.Precision = gv.cfg.GeneralCfg().DecimalPrecision utils.DecimalContext.RoundingMode = gv.cfg.GeneralCfg().DecimalRoundingMode - close(gv.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -66,7 +65,6 @@ func (gv *GlobalVarS) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) er // Shutdown stops the service func (gv *GlobalVarS) Shutdown(_ *servmanager.ServiceRegistry) error { - close(gv.StateChan(utils.StateServiceDOWN)) return nil } diff --git a/services/guardian.go b/services/guardian.go index 514b2433c..42d91ddc3 100644 --- a/services/guardian.go +++ b/services/guardian.go @@ -71,7 +71,6 @@ func (s *GuardianService) Start(_ chan struct{}, registry *servmanager.ServiceRe } } s.intRPCconn = anz.GetInternalCodec(svcs, utils.GuardianS) - close(s.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -85,7 +84,6 @@ func (s *GuardianService) Shutdown(_ *servmanager.ServiceRegistry) error { s.mu.Lock() defer s.mu.Unlock() s.cl.RpcUnregisterName(utils.GuardianSv1) - close(s.StateChan(utils.StateServiceDOWN)) return nil } diff --git a/services/httpagent.go b/services/httpagent.go index 6f3e4e0ec..636981dda 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -81,7 +81,6 @@ func (ha *HTTPAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegistr ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload, agntCfg.ReplyPayload, agntCfg.RequestProcessors)) } - close(ha.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -95,7 +94,6 @@ func (ha *HTTPAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { ha.Lock() ha.started = false ha.Unlock() - close(ha.stateDeps.StateChan(utils.StateServiceDOWN)) return // no shutdown for the momment } diff --git a/services/janus.go b/services/janus.go index 3530b8ab1..af6c5848f 100644 --- a/services/janus.go +++ b/services/janus.go @@ -95,7 +95,6 @@ func (ja *JanusAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegist ja.started = true ja.Unlock() - close(ja.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -110,7 +109,6 @@ func (ja *JanusAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { err = ja.jA.Shutdown() ja.started = false ja.Unlock() - close(ja.stateDeps.StateChan(utils.StateServiceDOWN)) return // no shutdown for the momment } diff --git a/services/kamailioagent.go b/services/kamailioagent.go index 8c26eb726..20178960e 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -62,7 +62,6 @@ func (kam *KamailioAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRe utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone)) go kam.connect(kam.kam, shutdown) - close(kam.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -96,7 +95,6 @@ func (kam *KamailioAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { defer kam.Unlock() err = kam.kam.Shutdown() kam.kam = nil - close(kam.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/loaders.go b/services/loaders.go index 17cbfcea6..a855f5c80 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -93,7 +93,6 @@ func (ldrs *LoaderService) Start(_ chan struct{}, registry *servmanager.ServiceR } } ldrs.intRPCconn = anz.GetInternalCodec(srv, utils.LoaderS) - close(ldrs.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -127,7 +126,6 @@ func (ldrs *LoaderService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { close(ldrs.stopChan) ldrs.cl.RpcUnregisterName(utils.LoaderSv1) ldrs.Unlock() - close(ldrs.stateDeps.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/radiusagent.go b/services/radiusagent.go index 1f2d8280f..045b9f9f2 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -78,7 +78,6 @@ func (rad *RadiusAgent) Start(shutdown chan struct{}, registry *servmanager.Serv rad.stopChan = make(chan struct{}) go rad.listenAndServe(rad.rad, shutdown) - close(rad.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -105,7 +104,6 @@ func (rad *RadiusAgent) Reload(shutdown chan struct{}, registry *servmanager.Ser // Shutdown stops the service func (rad *RadiusAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { rad.shutdown() - close(rad.StateChan(utils.StateServiceDOWN)) return // no shutdown for the momment } diff --git a/services/rankings.go b/services/rankings.go index 945c4742b..e13f6cb4d 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -100,7 +100,6 @@ func (ran *RankingService) Start(shutdown chan struct{}, registry *servmanager.S } } ran.intRPCconn = anz.GetInternalCodec(srv, utils.RankingS) - close(ran.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -120,7 +119,6 @@ func (ran *RankingService) Shutdown(_ *servmanager.ServiceRegistry) (err error) ran.ran.StopRankingS() ran.ran = nil ran.cl.RpcUnregisterName(utils.RankingSv1) - close(ran.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/rates.go b/services/rates.go index 5b681d7f6..5789d4165 100644 --- a/services/rates.go +++ b/services/rates.go @@ -77,7 +77,6 @@ func (rs *RateService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { close(rs.stopChan) rs.rateS = nil rs.cl.RpcUnregisterName(utils.RateSv1) - close(rs.StateChan(utils.StateServiceDOWN)) return } @@ -124,7 +123,6 @@ func (rs *RateService) Start(shutdown chan struct{}, registry *servmanager.Servi } rs.intRPCconn = anz.GetInternalCodec(srv, utils.RateS) - close(rs.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/registrarc.go b/services/registrarc.go index 6ed3290df..ce81e27f5 100644 --- a/services/registrarc.go +++ b/services/registrarc.go @@ -62,7 +62,6 @@ func (dspS *RegistrarCService) Start(_ chan struct{}, _ *servmanager.ServiceRegi dspS.rldChan = make(chan struct{}) dspS.dspS = registrarc.NewRegistrarCService(dspS.cfg, dspS.connMgr) go dspS.dspS.ListenAndServe(dspS.stopChan, dspS.rldChan) - close(dspS.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -79,7 +78,6 @@ func (dspS *RegistrarCService) Shutdown(_ *servmanager.ServiceRegistry) (err err dspS.dspS.Shutdown() dspS.dspS = nil dspS.Unlock() - close(dspS.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/resources.go b/services/resources.go index b35280aa4..87769197a 100644 --- a/services/resources.go +++ b/services/resources.go @@ -98,7 +98,6 @@ func (reS *ResourceService) Start(shutdown chan struct{}, registry *servmanager. } reS.intRPCconn = anz.GetInternalCodec(srv, utils.ResourceS) - close(reS.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -118,7 +117,6 @@ func (reS *ResourceService) Shutdown(_ *servmanager.ServiceRegistry) (err error) reS.reS.Shutdown(context.TODO()) //we don't verify the error because shutdown never returns an error reS.reS = nil reS.cl.RpcUnregisterName(utils.ResourceSv1) - close(reS.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/routes.go b/services/routes.go index 814cb4620..eb59e4d2c 100644 --- a/services/routes.go +++ b/services/routes.go @@ -89,7 +89,6 @@ func (routeS *RouteService) Start(shutdown chan struct{}, registry *servmanager. } } routeS.intRPCconn = anz.GetInternalCodec(srv, utils.RouteS) - close(routeS.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -104,7 +103,6 @@ func (routeS *RouteService) Shutdown(_ *servmanager.ServiceRegistry) (err error) defer routeS.Unlock() routeS.routeS = nil routeS.cl.RpcUnregisterName(utils.RouteSv1) - close(routeS.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/sessions.go b/services/sessions.go index 92cc5c404..5954f624e 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -103,7 +103,6 @@ func (smg *SessionService) Start(shutdown chan struct{}, registry *servmanager.S go smg.start(shutdown) } smg.intRPCconn = anz.GetInternalCodec(srv, utils.SessionS) - close(smg.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -139,7 +138,6 @@ func (smg *SessionService) Shutdown(_ *servmanager.ServiceRegistry) (err error) smg.sm = nil smg.cl.RpcUnregisterName(utils.SessionSv1) // smg.server.BiRPCUnregisterName(utils.SessionSv1) - close(smg.stateDeps.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/sipagent.go b/services/sipagent.go index 75da8a2ed..a1d2ffd78 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -72,7 +72,6 @@ func (sip *SIPAgent) Start(shutdown chan struct{}, registry *servmanager.Service return } go sip.listenAndServe(shutdown) - close(sip.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -103,7 +102,6 @@ func (sip *SIPAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { defer sip.Unlock() sip.sip.Shutdown() sip.sip = nil - close(sip.stateDeps.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/statedeps.go b/services/statedeps.go index ee1e27bc0..63901e7e0 100644 --- a/services/statedeps.go +++ b/services/statedeps.go @@ -24,6 +24,7 @@ import ( "time" "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" ) // NewStateDependencies constructs a StateDependencies struct @@ -70,6 +71,11 @@ func waitForServicesToReachState(state string, serviceIDs []string, indexer *ser func waitForServiceState(state, serviceID string, indexer *servmanager.ServiceRegistry, timeout time.Duration, ) (servmanager.Service, error) { srv := indexer.Lookup(serviceID) + if serviceID == utils.AnalyzerS && !srv.ShouldRun() { + // Return disabled analyzer service immediately since dependent + // services still need the instance. + return srv, nil + } select { case <-srv.StateChan(state): return srv, nil diff --git a/services/stats.go b/services/stats.go index 266224016..880454801 100644 --- a/services/stats.go +++ b/services/stats.go @@ -95,7 +95,6 @@ func (sts *StatService) Start(shutdown chan struct{}, registry *servmanager.Serv } } sts.intRPCconn = anz.GetInternalCodec(srv, utils.StatS) - close(sts.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -115,7 +114,6 @@ func (sts *StatService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { sts.sts.Shutdown(context.TODO()) sts.sts = nil sts.cl.RpcUnregisterName(utils.StatSv1) - close(sts.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/stordb.go b/services/stordb.go index c31b16edf..b9aa6deaf 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -75,7 +75,6 @@ func (db *StorDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) if err != nil { return err } - close(db.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -132,7 +131,6 @@ func (db *StorDBService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { db.db.Close() db.db = nil db.Unlock() - close(db.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/thresholds.go b/services/thresholds.go index f5fb196a4..cf61ccdae 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -97,7 +97,6 @@ func (thrs *ThresholdService) Start(shutdown chan struct{}, registry *servmanage } } thrs.intRPCconn = anz.GetInternalCodec(srv, utils.ThresholdS) - close(thrs.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -117,7 +116,6 @@ func (thrs *ThresholdService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) thrs.thrs.Shutdown(context.TODO()) thrs.thrs = nil thrs.cl.RpcUnregisterName(utils.ThresholdSv1) - close(thrs.stateDeps.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/tpes.go b/services/tpes.go index 78cf3b40c..e67a12101 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -74,7 +74,6 @@ func (ts *TPeService) Start(_ chan struct{}, registry *servmanager.ServiceRegist ts.stopChan = make(chan struct{}) ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false) ts.cl.RpcRegister(ts.srv) - close(ts.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -87,7 +86,6 @@ func (ts *TPeService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (e func (ts *TPeService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { ts.srv = nil close(ts.stopChan) - close(ts.StateChan(utils.StateServiceDOWN)) return } diff --git a/services/trends.go b/services/trends.go index 9c14ad9d0..da634971e 100644 --- a/services/trends.go +++ b/services/trends.go @@ -99,7 +99,6 @@ func (trs *TrendService) Start(shutdown chan struct{}, registry *servmanager.Ser } } trs.intRPCconn = anz.GetInternalCodec(srv, utils.Trends) - close(trs.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -119,7 +118,6 @@ func (trs *TrendService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { trs.trs.StopTrendS() trs.trs = nil trs.cl.RpcUnregisterName(utils.TrendSv1) - close(trs.StateChan(utils.StateServiceDOWN)) return } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 675f32460..8d716f5b9 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -58,6 +58,9 @@ type ServiceManager struct { func (m *ServiceManager) StartServices(shutdown chan struct{}) { go m.handleReload(shutdown) for _, svc := range m.registry.List() { + // TODO: verify if IsServiceInState check is needed. It should + // be redundant since ServManager manages all services and this + // runs only at startup if svc.ShouldRun() && !IsServiceInState(svc, utils.StateServiceUP) { m.shdWg.Add(1) go func() { @@ -66,6 +69,7 @@ func (m *ServiceManager) StartServices(shutdown chan struct{}) { utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) close(shutdown) } + close(svc.StateChan(utils.StateServiceUP)) utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName())) }() } @@ -124,6 +128,7 @@ func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err e isUp := IsServiceInState(svc, utils.StateServiceUP) if svc.ShouldRun() { if isUp { + // TODO: state channels must be reinitiated for both SERVICE_UP and SERVICE_DOWN. if err = svc.Reload(shutdown, m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) close(shutdown) @@ -137,6 +142,7 @@ func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err e close(shutdown) return // stop if we encounter an error } + close(svc.StateChan(utils.StateServiceUP)) utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName())) } } else if isUp { @@ -144,6 +150,7 @@ func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err e utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) close(shutdown) } + close(svc.StateChan(utils.StateServiceDOWN)) utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName())) m.shdWg.Done() } @@ -161,6 +168,7 @@ func (m *ServiceManager) ShutdownServices() { utils.ServiceManager, svc.ServiceName(), err)) return } + close(svc.StateChan(utils.StateServiceDOWN)) utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName())) }() }