From dab152f976b633ddcb2ad183fb249e081858ace3 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 18 Dec 2024 19:12:17 +0200 Subject: [PATCH] Integrate StateServiceDOWN + registry related refactor Removed redundant IsRunning service method Removed registry from constructors Pass registry to Start/Reload/Shutdown service methods --- cmd/cgr-engine/cgr-engine.go | 80 +++++++++++++++--------------- services/accounts.go | 37 +++++--------- services/actions.go | 37 +++++--------- services/adminsv1.go | 35 +++++--------- services/analyzers.go | 59 +++++++++------------- services/asteriskagent.go | 35 +++++--------- services/attributes.go | 35 +++++--------- services/caches.go | 35 +++++--------- services/cdrs.go | 35 +++++--------- services/chargers.go | 35 +++++--------- services/commonlisteners.go | 32 ++++-------- services/config.go | 31 ++++-------- services/cores.go | 41 ++++++---------- services/datadb.go | 28 +++-------- services/diameteragent.go | 47 +++++++----------- services/dispatchers.go | 31 ++++-------- services/dnsagent.go | 44 ++++++----------- services/ees.go | 35 +++++--------- services/efs.go | 35 +++++--------- services/ers.go | 37 +++++--------- services/filters.go | 35 +++++--------- services/freeswitchagent.go | 33 ++++--------- services/globalvars.go | 25 ++++------ services/guardian.go | 30 ++++-------- services/httpagent.go | 35 +++++--------- services/janus.go | 31 +++++------- services/kamailioagent.go | 33 ++++--------- services/loaders.go | 54 +++++++++------------ services/radiusagent.go | 37 +++++--------- services/rankings.go | 35 +++++--------- services/rates.go | 35 +++++--------- services/registrarc.go | 32 ++++-------- services/resources.go | 37 +++++--------- services/routes.go | 35 +++++--------- services/sessions.go | 35 +++++--------- services/sipagent.go | 35 +++++--------- services/statedeps.go | 11 ----- services/stats.go | 39 +++++---------- services/stordb.go | 28 +++-------- services/thresholds.go | 37 +++++--------- services/tpes.go | 31 +++++------- services/trends.go | 35 +++++--------- servmanager/servmanager.go | 94 +++++++++++++++++++----------------- utils/consts.go | 4 +- 44 files changed, 566 insertions(+), 1054 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e91fb976f..060c7c8e1 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -129,46 +129,46 @@ func runCGREngine(fs []string) (err error) { // ServiceIndexer will share service references to all services registry := servmanager.NewServiceRegistry() - gvS := services.NewGlobalVarS(cfg, registry) - dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep, registry) - sdbS := services.NewStorDBService(cfg, *flags.SetVersions, registry) - cls := services.NewCommonListenerService(cfg, caps, registry) - anzS := services.NewAnalyzerService(cfg, registry) - configS := services.NewConfigService(cfg, registry) - guardianS := services.NewGuardianService(cfg, registry) - coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg, registry) - cacheS := services.NewCacheService(cfg, connMgr, registry) - fltrS := services.NewFilterService(cfg, connMgr, registry) - dspS := services.NewDispatcherService(cfg, connMgr, registry) - ldrs := services.NewLoaderService(cfg, connMgr, registry) - efs := services.NewExportFailoverService(cfg, connMgr, registry) - adminS := services.NewAdminSv1Service(cfg, connMgr, registry) - sessionS := services.NewSessionService(cfg, connMgr, registry) - attrS := services.NewAttributeService(cfg, dspS, registry) - chrgS := services.NewChargerService(cfg, connMgr, registry) - routeS := services.NewRouteService(cfg, connMgr, registry) - resourceS := services.NewResourceService(cfg, connMgr, srvDep, registry) - trendS := services.NewTrendService(cfg, connMgr, srvDep, registry) - rankingS := services.NewRankingService(cfg, connMgr, srvDep, registry) - thS := services.NewThresholdService(cfg, connMgr, srvDep, registry) - stS := services.NewStatService(cfg, connMgr, srvDep, registry) - erS := services.NewEventReaderService(cfg, connMgr, registry) - dnsAgent := services.NewDNSAgent(cfg, connMgr, registry) - fsAgent := services.NewFreeswitchAgent(cfg, connMgr, registry) - kamAgent := services.NewKamailioAgent(cfg, connMgr, registry) - janusAgent := services.NewJanusAgent(cfg, connMgr, registry) - astAgent := services.NewAsteriskAgent(cfg, connMgr, registry) - radAgent := services.NewRadiusAgent(cfg, connMgr, registry) - diamAgent := services.NewDiameterAgent(cfg, connMgr, caps, registry) - httpAgent := services.NewHTTPAgent(cfg, connMgr, registry) - sipAgent := services.NewSIPAgent(cfg, connMgr, registry) - eeS := services.NewEventExporterService(cfg, connMgr, registry) - cdrS := services.NewCDRServer(cfg, connMgr, registry) - registrarcS := services.NewRegistrarCService(cfg, connMgr, registry) - rateS := services.NewRateService(cfg, registry) - actionS := services.NewActionService(cfg, connMgr, registry) - accS := services.NewAccountService(cfg, connMgr, registry) - tpeS := services.NewTPeService(cfg, connMgr, registry) + gvS := services.NewGlobalVarS(cfg) + dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep) + sdbS := services.NewStorDBService(cfg, *flags.SetVersions) + cls := services.NewCommonListenerService(cfg, caps) + anzS := services.NewAnalyzerService(cfg) + configS := services.NewConfigService(cfg) + guardianS := services.NewGuardianService(cfg) + coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg) + cacheS := services.NewCacheService(cfg, connMgr) + fltrS := services.NewFilterService(cfg, connMgr) + dspS := services.NewDispatcherService(cfg, connMgr) + ldrs := services.NewLoaderService(cfg, connMgr) + efs := services.NewExportFailoverService(cfg, connMgr) + adminS := services.NewAdminSv1Service(cfg, connMgr) + sessionS := services.NewSessionService(cfg, connMgr) + attrS := services.NewAttributeService(cfg, dspS) + chrgS := services.NewChargerService(cfg, connMgr) + routeS := services.NewRouteService(cfg, connMgr) + resourceS := services.NewResourceService(cfg, connMgr, srvDep) + trendS := services.NewTrendService(cfg, connMgr, srvDep) + rankingS := services.NewRankingService(cfg, connMgr, srvDep) + thS := services.NewThresholdService(cfg, connMgr, srvDep) + stS := services.NewStatService(cfg, connMgr, srvDep) + erS := services.NewEventReaderService(cfg, connMgr) + dnsAgent := services.NewDNSAgent(cfg, connMgr) + fsAgent := services.NewFreeswitchAgent(cfg, connMgr) + kamAgent := services.NewKamailioAgent(cfg, connMgr) + janusAgent := services.NewJanusAgent(cfg, connMgr) + astAgent := services.NewAsteriskAgent(cfg, connMgr) + radAgent := services.NewRadiusAgent(cfg, connMgr) + diamAgent := services.NewDiameterAgent(cfg, connMgr, caps) + httpAgent := services.NewHTTPAgent(cfg, connMgr) + sipAgent := services.NewSIPAgent(cfg, connMgr) + eeS := services.NewEventExporterService(cfg, connMgr) + cdrS := services.NewCDRServer(cfg, connMgr) + registrarcS := services.NewRegistrarCService(cfg, connMgr) + rateS := services.NewRateService(cfg) + actionS := services.NewActionService(cfg, connMgr) + accS := services.NewAccountService(cfg, connMgr) + tpeS := services.NewTPeService(cfg, connMgr) srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, registry, []servmanager.Service{ gvS, diff --git a/services/accounts.go b/services/accounts.go index 1d7b7bae1..0b8b16a22 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -33,14 +33,12 @@ import ( // NewAccountService returns the Account Service func NewAccountService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *AccountService { + connMgr *engine.ConnManager) *AccountService { return &AccountService{ - cfg: cfg, - connMgr: connMgr, - rldChan: make(chan struct{}, 1), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + rldChan: make(chan struct{}, 1), + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -56,17 +54,12 @@ type AccountService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start -func (acts *AccountService) Start(shutdown chan struct{}) (err error) { - if acts.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (acts *AccountService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -75,7 +68,7 @@ func (acts *AccountService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - acts.srvIndexer, acts.cfg.GeneralCfg().ConnectTimeout) + registry, acts.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -110,28 +103,22 @@ func (acts *AccountService) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (acts *AccountService) Reload(_ chan struct{}) (err error) { +func (acts *AccountService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { acts.rldChan <- struct{}{} return // for the moment nothing to reload } // Shutdown stops the service -func (acts *AccountService) Shutdown() (err error) { +func (acts *AccountService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { acts.Lock() close(acts.stopChan) acts.acts = nil acts.Unlock() acts.cl.RpcUnregisterName(utils.AccountSv1) + close(acts.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (acts *AccountService) IsRunning() bool { - acts.RLock() - defer acts.RUnlock() - return acts.acts != nil -} - // ServiceName returns the service name func (acts *AccountService) ServiceName() string { return utils.AccountS diff --git a/services/actions.go b/services/actions.go index d10a3ac74..79b590c85 100644 --- a/services/actions.go +++ b/services/actions.go @@ -33,14 +33,12 @@ import ( // NewActionService returns the Action Service func NewActionService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *ActionService { + connMgr *engine.ConnManager) *ActionService { return &ActionService{ - connMgr: connMgr, - cfg: cfg, - rldChan: make(chan struct{}, 1), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + connMgr: connMgr, + cfg: cfg, + rldChan: make(chan struct{}, 1), + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -57,17 +55,12 @@ type ActionService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start -func (acts *ActionService) Start(shutdown chan struct{}) (err error) { - if acts.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (acts *ActionService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -76,7 +69,7 @@ func (acts *ActionService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - acts.srvIndexer, acts.cfg.GeneralCfg().ConnectTimeout) + registry, acts.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -111,29 +104,23 @@ func (acts *ActionService) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (acts *ActionService) Reload(_ chan struct{}) (err error) { +func (acts *ActionService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { acts.rldChan <- struct{}{} return // for the moment nothing to reload } // Shutdown stops the service -func (acts *ActionService) Shutdown() (err error) { +func (acts *ActionService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { acts.Lock() defer acts.Unlock() close(acts.stopChan) acts.acts.Shutdown() acts.acts = nil acts.cl.RpcUnregisterName(utils.ActionSv1) + close(acts.stateDeps.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (acts *ActionService) IsRunning() bool { - acts.RLock() - defer acts.RUnlock() - return acts.acts != nil -} - // ServiceName returns the service name func (acts *ActionService) ServiceName() string { return utils.ActionS diff --git a/services/adminsv1.go b/services/adminsv1.go index e9751dc4e..59e68434f 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -32,13 +32,11 @@ import ( // NewAPIerSv1Service returns the APIerSv1 Service func NewAdminSv1Service(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *AdminSv1Service { + connMgr *engine.ConnManager) *AdminSv1Service { return &AdminSv1Service{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -53,18 +51,13 @@ type AdminSv1Service struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // RPC connector with internal APIs - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // RPC connector with internal APIs + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start // For this service the start should be called from RAL Service -func (apiService *AdminSv1Service) Start(_ chan struct{}) (err error) { - if apiService.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (apiService *AdminSv1Service) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -73,7 +66,7 @@ func (apiService *AdminSv1Service) Start(_ chan struct{}) (err error) { utils.AnalyzerS, utils.StorDB, }, - apiService.srvIndexer, apiService.cfg.GeneralCfg().ConnectTimeout) + registry, apiService.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -108,27 +101,21 @@ func (apiService *AdminSv1Service) Start(_ chan struct{}) (err error) { } // Reload handles the change of config -func (apiService *AdminSv1Service) Reload(_ chan struct{}) (err error) { +func (apiService *AdminSv1Service) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return } // Shutdown stops the service -func (apiService *AdminSv1Service) Shutdown() (err error) { +func (apiService *AdminSv1Service) Shutdown(_ *servmanager.ServiceRegistry) (err error) { apiService.Lock() // close(apiService.stopChan) apiService.api = nil apiService.cl.RpcUnregisterName(utils.AdminSv1) apiService.Unlock() + close(apiService.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (apiService *AdminSv1Service) IsRunning() bool { - apiService.RLock() - defer apiService.RUnlock() - return apiService.api != nil -} - // ServiceName returns the service name func (apiService *AdminSv1Service) ServiceName() string { return utils.AdminS diff --git a/services/analyzers.go b/services/analyzers.go index e47dae293..8d33dbfcc 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -33,12 +33,10 @@ import ( ) // NewAnalyzerService returns the Analyzer Service -func NewAnalyzerService(cfg *config.CGRConfig, - srvIndexer *servmanager.ServiceRegistry) *AnalyzerService { +func NewAnalyzerService(cfg *config.CGRConfig) *AnalyzerService { anz := &AnalyzerService{ - cfg: cfg, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } // Wait for AnalyzerService only when it should run. @@ -59,19 +57,14 @@ type AnalyzerService struct { cancelFunc context.CancelFunc cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (anz *AnalyzerService) Start(shutdown chan struct{}) (err error) { - if anz.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - - cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, anz.srvIndexer, +func (anz *AnalyzerService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { + cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry, anz.cfg.GeneralCfg().ConnectTimeout) if err != nil { return @@ -81,7 +74,6 @@ func (anz *AnalyzerService) Start(shutdown chan struct{}) (err error) { anz.Lock() defer anz.Unlock() if anz.anz, err = analyzers.NewAnalyzerS(anz.cfg); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error())) return } anzCtx, cancel := context.WithCancel(context.TODO()) @@ -93,23 +85,19 @@ func (anz *AnalyzerService) Start(shutdown chan struct{}) (err error) { } }(anz.anz) anz.cl.SetAnalyzer(anz.anz) - go anz.start() + go anz.start(registry) close(anz.stateDeps.StateChan(utils.StateServiceUP)) return } -func (anz *AnalyzerService) start() { - fs := anz.srvIndexer.Lookup(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), anz.cfg.GeneralCfg().ConnectTimeout) { - return - // return utils.NewServiceStateTimeoutError(utils.AnalyzerS, utils.FilterS, utils.StateServiceUP) - } - - if !anz.IsRunning() { +func (anz *AnalyzerService) start(registry *servmanager.ServiceRegistry) { + fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, + anz.cfg.GeneralCfg().ConnectTimeout) + if err != nil { return } anz.Lock() - anz.anz.SetFilterS(fs.FilterS()) + anz.anz.SetFilterS(fs.(*FilterService).FilterS()) srv, _ := engine.NewService(anz.anz) // srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false) @@ -122,12 +110,12 @@ func (anz *AnalyzerService) start() { } // Reload handles the change of config -func (anz *AnalyzerService) Reload(_ chan struct{}) (err error) { +func (anz *AnalyzerService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return // for the momment nothing to reload } // Shutdown stops the service -func (anz *AnalyzerService) Shutdown() (err error) { +func (anz *AnalyzerService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { anz.Lock() anz.cancelFunc() anz.cl.SetAnalyzer(nil) @@ -135,16 +123,10 @@ func (anz *AnalyzerService) Shutdown() (err error) { anz.anz = nil anz.Unlock() anz.cl.RpcUnregisterName(utils.AnalyzerSv1) + close(anz.stateDeps.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (anz *AnalyzerService) IsRunning() bool { - anz.RLock() - defer anz.RUnlock() - return anz.anz != nil -} - // ServiceName returns the service name func (anz *AnalyzerService) ServiceName() string { return utils.AnalyzerS @@ -155,9 +137,14 @@ func (anz *AnalyzerService) ShouldRun() bool { return anz.cfg.AnalyzerSCfg().Enabled } -// GetInternalCodec returns the connection wrapped in analyzer connector +// GetInternalCodec wraps the provided ClientConnector in an analyzer connector +// if the analyzer service should run. Otherwise, it returns the original connector +// unchanged. func (anz *AnalyzerService) GetInternalCodec(c birpc.ClientConnector, to string) birpc.ClientConnector { - if !anz.IsRunning() { + if !anz.ShouldRun() { + // It's enough to check the result of ShouldRun as other + // services calling GetInternalCodec had already waited for + // AnalyzerService to be initiated/started. return c } return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to) diff --git a/services/asteriskagent.go b/services/asteriskagent.go index 1c18b408a..766bd4866 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -33,13 +33,11 @@ import ( // NewAsteriskAgent returns the Asterisk Agent func NewAsteriskAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *AsteriskAgent { + connMgr *engine.ConnManager) *AsteriskAgent { return &AsteriskAgent{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -52,17 +50,12 @@ type AsteriskAgent struct { smas []*agents.AsteriskAgent connMgr *engine.ConnManager - intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // share the API object implementing API calls for internal + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (ast *AsteriskAgent) Start(shutdown chan struct{}) (err error) { - if ast.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (ast *AsteriskAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { ast.Lock() defer ast.Unlock() @@ -83,14 +76,15 @@ func (ast *AsteriskAgent) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (ast *AsteriskAgent) Reload(shutdown chan struct{}) (err error) { +func (ast *AsteriskAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { ast.shutdown() - return ast.Start(shutdown) + return ast.Start(shutdown, registry) } // Shutdown stops the service -func (ast *AsteriskAgent) Shutdown() (err error) { +func (ast *AsteriskAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { ast.shutdown() + close(ast.StateChan(utils.StateServiceDOWN)) return } @@ -102,13 +96,6 @@ func (ast *AsteriskAgent) shutdown() { return // no shutdown for the momment } -// IsRunning returns if the service is running -func (ast *AsteriskAgent) IsRunning() bool { - ast.RLock() - defer ast.RUnlock() - return ast.smas != nil -} - // ServiceName returns the service name func (ast *AsteriskAgent) ServiceName() string { return utils.AsteriskAgent diff --git a/services/attributes.go b/services/attributes.go index 20f2315ec..8684a0272 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -32,13 +32,11 @@ import ( // NewAttributeService returns the Attribute Service func NewAttributeService(cfg *config.CGRConfig, - dspS *DispatcherService, - sIndxr *servmanager.ServiceRegistry) *AttributeService { + dspS *DispatcherService) *AttributeService { return &AttributeService{ - cfg: cfg, - dspS: dspS, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), - srvIndexer: sIndxr, + cfg: cfg, + dspS: dspS, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -54,17 +52,12 @@ type AttributeService struct { cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here + intRPCconn birpc.ClientConnector // expose API methods over internal connection stateDeps *StateDependencies } // Start should handle the service start -func (attrS *AttributeService) Start(shutdown chan struct{}) (err error) { - if attrS.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (attrS *AttributeService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -73,7 +66,7 @@ func (attrS *AttributeService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - attrS.srvIndexer, attrS.cfg.GeneralCfg().ConnectTimeout) + registry, attrS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return } @@ -105,7 +98,7 @@ func (attrS *AttributeService) Start(shutdown chan struct{}) (err error) { if _, closed := <-dspShtdChan; closed { return } - if attrS.IsRunning() { + if servmanager.IsServiceInState(attrS, utils.StateServiceUP) { attrS.cl.RpcRegister(srv) } @@ -118,28 +111,22 @@ func (attrS *AttributeService) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (attrS *AttributeService) Reload(_ chan struct{}) (err error) { +func (attrS *AttributeService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return // for the moment nothing to reload } // Shutdown stops the service -func (attrS *AttributeService) Shutdown() (err error) { +func (attrS *AttributeService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { attrS.Lock() attrS.attrS = nil attrS.rpc = nil attrS.cl.RpcUnregisterName(utils.AttributeSv1) attrS.dspS.UnregisterShutdownChan(attrS.ServiceName()) attrS.Unlock() + close(attrS.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (attrS *AttributeService) IsRunning() bool { - attrS.RLock() - defer attrS.RUnlock() - return attrS.attrS != nil -} - // ServiceName returns the service name func (attrS *AttributeService) ServiceName() string { return utils.AttributeS diff --git a/services/caches.go b/services/caches.go index 0979c91e9..eba4cfe6b 100644 --- a/services/caches.go +++ b/services/caches.go @@ -30,14 +30,12 @@ import ( ) // NewCacheService . -func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *CacheService { +func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *CacheService { return &CacheService{ - cfg: cfg, - connMgr: connMgr, - cacheCh: make(chan *engine.CacheS, 1), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + cacheCh: make(chan *engine.CacheS, 1), + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -50,17 +48,12 @@ type CacheService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (cS *CacheService) Start(shutdown chan struct{}) (err error) { - if cS.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (cS *CacheService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -68,7 +61,7 @@ func (cS *CacheService) Start(shutdown chan struct{}) (err error) { utils.AnalyzerS, utils.CoreS, }, - cS.srvIndexer, cS.cfg.GeneralCfg().ConnectTimeout) + registry, cS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -98,21 +91,17 @@ func (cS *CacheService) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (cS *CacheService) Reload(_ chan struct{}) (_ error) { +func (cS *CacheService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (_ error) { return } // Shutdown stops the service -func (cS *CacheService) Shutdown() (_ error) { +func (cS *CacheService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { cS.cl.RpcUnregisterName(utils.CacheSv1) + close(cS.stateDeps.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (cS *CacheService) IsRunning() bool { - return IsServiceInState(cS.ServiceName(), utils.StateServiceUP, cS.srvIndexer) -} - // ServiceName returns the service name func (cS *CacheService) ServiceName() string { return utils.CacheS diff --git a/services/cdrs.go b/services/cdrs.go index 673eb1fda..7be02262b 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -33,13 +33,11 @@ import ( // NewCDRServer returns the CDR Server func NewCDRServer(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *CDRService { + connMgr *engine.ConnManager) *CDRService { return &CDRService{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -53,17 +51,12 @@ type CDRService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (cs *CDRService) Start(_ chan struct{}) (err error) { - if cs.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (cs *CDRService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -72,7 +65,7 @@ func (cs *CDRService) Start(_ chan struct{}) (err error) { utils.AnalyzerS, utils.StorDB, }, - cs.srvIndexer, cs.cfg.GeneralCfg().ConnectTimeout) + registry, cs.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -101,26 +94,20 @@ func (cs *CDRService) Start(_ chan struct{}) (err error) { } // Reload handles the change of config -func (cs *CDRService) Reload(_ chan struct{}) (err error) { +func (cs *CDRService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return } // Shutdown stops the service -func (cs *CDRService) Shutdown() (err error) { +func (cs *CDRService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { cs.Lock() cs.cdrS = nil cs.Unlock() cs.cl.RpcUnregisterName(utils.CDRsV1) + close(cs.stateDeps.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (cs *CDRService) IsRunning() bool { - cs.RLock() - defer cs.RUnlock() - return cs.cdrS != nil -} - // ServiceName returns the service name func (cs *CDRService) ServiceName() string { return utils.CDRServer diff --git a/services/chargers.go b/services/chargers.go index ee377de8f..edb1d46c3 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -31,13 +31,11 @@ import ( // NewChargerService returns the Charger Service func NewChargerService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *ChargerService { + connMgr *engine.ConnManager) *ChargerService { return &ChargerService{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -51,17 +49,12 @@ type ChargerService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start -func (chrS *ChargerService) Start(shutdown chan struct{}) error { - if chrS.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (chrS *ChargerService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -70,7 +63,7 @@ func (chrS *ChargerService) Start(shutdown chan struct{}) error { utils.DataDB, utils.AnalyzerS, }, - chrS.srvIndexer, chrS.cfg.GeneralCfg().ConnectTimeout) + registry, chrS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -102,26 +95,20 @@ func (chrS *ChargerService) Start(shutdown chan struct{}) error { } // Reload handles the change of config -func (chrS *ChargerService) Reload(_ chan struct{}) (err error) { +func (chrS *ChargerService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return } // Shutdown stops the service -func (chrS *ChargerService) Shutdown() (err error) { +func (chrS *ChargerService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { chrS.Lock() defer chrS.Unlock() chrS.chrS = nil chrS.cl.RpcUnregisterName(utils.ChargerSv1) + close(chrS.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (chrS *ChargerService) IsRunning() bool { - chrS.RLock() - defer chrS.RUnlock() - return chrS.chrS != nil -} - // ServiceName returns the service name func (chrS *ChargerService) ServiceName() string { return utils.ChargerS diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 344495aba..1af102eb1 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -31,13 +31,11 @@ import ( ) // NewCommonListenerService instantiates a new CommonListenerService. -func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, - srvIndexer *servmanager.ServiceRegistry) *CommonListenerService { +func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps) *CommonListenerService { return &CommonListenerService{ - cfg: cfg, - caps: caps, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + caps: caps, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -50,16 +48,12 @@ type CommonListenerService struct { caps *engine.Caps cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. -func (cl *CommonListenerService) Start(_ chan struct{}) error { - if cl.IsRunning() { - return utils.ErrServiceAlreadyRunning - } +func (cl *CommonListenerService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) error { cl.mu.Lock() defer cl.mu.Unlock() cl.cls = commonlisteners.NewCommonListenerS(cl.caps) @@ -74,25 +68,19 @@ func (cl *CommonListenerService) Start(_ chan struct{}) error { } // Reload handles the config changes. -func (cl *CommonListenerService) Reload(_ chan struct{}) error { +func (cl *CommonListenerService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { return nil } // Shutdown stops the service. -func (cl *CommonListenerService) Shutdown() error { +func (cl *CommonListenerService) Shutdown(_ *servmanager.ServiceRegistry) error { cl.mu.Lock() defer cl.mu.Unlock() cl.cls = nil + close(cl.StateChan(utils.StateServiceDOWN)) return nil } -// IsRunning returns whether the service is running or not. -func (cl *CommonListenerService) IsRunning() bool { - cl.mu.RLock() - defer cl.mu.RUnlock() - return cl.cls != nil -} - // ServiceName returns the service name func (cl *CommonListenerService) ServiceName() string { return utils.CommonListenerS diff --git a/services/config.go b/services/config.go index b8171c01d..d9f13f5e6 100644 --- a/services/config.go +++ b/services/config.go @@ -30,11 +30,10 @@ import ( ) // NewConfigService instantiates a new ConfigService. -func NewConfigService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceRegistry) *ConfigService { +func NewConfigService(cfg *config.CGRConfig) *ConfigService { return &ConfigService{ - cfg: cfg, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -43,23 +42,18 @@ type ConfigService struct { mu sync.RWMutex cfg *config.CGRConfig cl *commonlisteners.CommonListenerS - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. -func (s *ConfigService) Start(_ chan struct{}) error { - if s.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (s *ConfigService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, utils.AnalyzerS, }, - s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout) + registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -78,22 +72,17 @@ func (s *ConfigService) Start(_ chan struct{}) error { } // Reload handles the config changes. -func (s *ConfigService) Reload(_ chan struct{}) error { +func (s *ConfigService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { return nil } // Shutdown stops the service. -func (s *ConfigService) Shutdown() error { +func (s *ConfigService) Shutdown(_ *servmanager.ServiceRegistry) error { s.cl.RpcUnregisterName(utils.ConfigSv1) + close(s.StateChan(utils.StateServiceDOWN)) return nil } -// IsRunning returns whether the service is running or not. -func (s *ConfigService) IsRunning() bool { - return IsServiceInState(s.ServiceName(), utils.StateServiceUP, s.srvIndexer) -} - -// ServiceName returns the service name func (s *ConfigService) ServiceName() string { return utils.ConfigS } diff --git a/services/cores.go b/services/cores.go index c03bdf055..e7cb748a4 100644 --- a/services/cores.go +++ b/services/cores.go @@ -33,16 +33,14 @@ import ( // NewCoreService returns the Core Service func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, - fileCPU *os.File, shdWg *sync.WaitGroup, - srvIndexer *servmanager.ServiceRegistry) *CoreService { + fileCPU *os.File, shdWg *sync.WaitGroup) *CoreService { return &CoreService{ - shdWg: shdWg, - cfg: cfg, - caps: caps, - fileCPU: fileCPU, - csCh: make(chan *cores.CoreS, 1), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + shdWg: shdWg, + cfg: cfg, + caps: caps, + fileCPU: fileCPU, + csCh: make(chan *cores.CoreS, 1), + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -60,23 +58,18 @@ type CoreService struct { shdWg *sync.WaitGroup cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start -func (cS *CoreService) Start(shutdown chan struct{}) error { - if cS.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (cS *CoreService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, utils.AnalyzerS, }, - cS.srvIndexer, cS.cfg.GeneralCfg().ConnectTimeout) + registry, cS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -104,12 +97,12 @@ func (cS *CoreService) Start(shutdown chan struct{}) error { } // Reload handles the change of config -func (cS *CoreService) Reload(_ chan struct{}) error { +func (cS *CoreService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { return nil } // Shutdown stops the service -func (cS *CoreService) Shutdown() error { +func (cS *CoreService) Shutdown(_ *servmanager.ServiceRegistry) error { cS.mu.Lock() defer cS.mu.Unlock() cS.cS.Shutdown() @@ -119,16 +112,10 @@ func (cS *CoreService) Shutdown() error { cS.cS = nil <-cS.csCh cS.cl.RpcUnregisterName(utils.CoreSv1) + close(cS.StateChan(utils.StateServiceDOWN)) return nil } -// IsRunning returns if the service is running -func (cS *CoreService) IsRunning() bool { - cS.mu.RLock() - defer cS.mu.RUnlock() - return cS.cS != nil -} - // ServiceName returns the service name func (cS *CoreService) ServiceName() string { return utils.CoreS diff --git a/services/datadb.go b/services/datadb.go index f3d243d88..05bc84629 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -31,15 +31,13 @@ import ( // NewDataDBService returns the DataDB Service func NewDataDBService(cfg *config.CGRConfig, connMgr *engine.ConnManager, setVersions bool, - srvDep map[string]*sync.WaitGroup, - srvIndexer *servmanager.ServiceRegistry) *DataDBService { + srvDep map[string]*sync.WaitGroup) *DataDBService { return &DataDBService{ cfg: cfg, connMgr: connMgr, setVersions: setVersions, srvDep: srvDep, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -55,16 +53,12 @@ type DataDBService struct { srvDep map[string]*sync.WaitGroup - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. -func (db *DataDBService) Start(_ chan struct{}) (err error) { - if db.IsRunning() { - return utils.ErrServiceAlreadyRunning - } +func (db *DataDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { db.Lock() defer db.Unlock() db.oldDBCfg = db.cfg.DataDbCfg().Clone() @@ -93,7 +87,7 @@ func (db *DataDBService) Start(_ chan struct{}) (err error) { } // Reload handles the change of config -func (db *DataDBService) Reload(_ chan struct{}) (err error) { +func (db *DataDBService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { db.Lock() defer db.Unlock() if db.needsConnectionReload() { @@ -122,22 +116,16 @@ func (db *DataDBService) Reload(_ chan struct{}) (err error) { } // Shutdown stops the service -func (db *DataDBService) Shutdown() (_ error) { +func (db *DataDBService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { db.srvDep[utils.DataDB].Wait() db.Lock() db.dm.DataDB().Close() db.dm = nil db.Unlock() + close(db.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (db *DataDBService) IsRunning() bool { - db.RLock() - defer db.RUnlock() - return db.dm != nil && db.dm.DataDB() != nil -} - // ServiceName returns the service name func (db *DataDBService) ServiceName() string { return utils.DataDB diff --git a/services/diameteragent.go b/services/diameteragent.go index 72b8148e6..9451b5fe5 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -32,14 +32,12 @@ import ( // NewDiameterAgent returns the Diameter Agent func NewDiameterAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager, caps *engine.Caps, - srvIndexer *servmanager.ServiceRegistry) *DiameterAgent { + connMgr *engine.ConnManager, caps *engine.Caps) *DiameterAgent { return &DiameterAgent{ - cfg: cfg, - connMgr: connMgr, - caps: caps, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + caps: caps, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -56,18 +54,13 @@ type DiameterAgent struct { lnet string laddr string - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (da *DiameterAgent) Start(shutdown chan struct{}) error { - if da.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - - fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, da.srvIndexer, +func (da *DiameterAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error { + fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, da.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err @@ -101,7 +94,7 @@ func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutd } // Reload handles the change of config -func (da *DiameterAgent) Reload(shutdown chan struct{}) (err error) { +func (da *DiameterAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { da.Lock() defer da.Unlock() if da.lnet == da.cfg.DiameterAgentCfg().ListenNet && @@ -109,29 +102,25 @@ func (da *DiameterAgent) Reload(shutdown chan struct{}) (err error) { return } close(da.stopChan) - fs := da.srvIndexer.Lookup(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), da.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.DiameterAgent, utils.FilterS, utils.StateServiceUP) + + fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, + da.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } - return da.start(fs.FilterS(), da.caps, shutdown) + return da.start(fs.(*FilterService).FilterS(), da.caps, shutdown) } // Shutdown stops the service -func (da *DiameterAgent) Shutdown() (err error) { +func (da *DiameterAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { da.Lock() close(da.stopChan) da.da = nil da.Unlock() + close(da.StateChan(utils.StateServiceDOWN)) return // no shutdown for the momment } -// IsRunning returns if the service is running -func (da *DiameterAgent) IsRunning() bool { - da.RLock() - defer da.RUnlock() - return da.da != nil -} - // ServiceName returns the service name func (da *DiameterAgent) ServiceName() string { return utils.DiameterAgent diff --git a/services/dispatchers.go b/services/dispatchers.go index 518b05b57..4bac8a486 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -32,14 +32,12 @@ import ( // NewDispatcherService returns the Dispatcher Service func NewDispatcherService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *DispatcherService { + connMgr *engine.ConnManager) *DispatcherService { return &DispatcherService{ cfg: cfg, connMgr: connMgr, srvsReload: make(map[string]chan struct{}), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -54,17 +52,12 @@ type DispatcherService struct { cfg *config.CGRConfig srvsReload map[string]chan struct{} - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (dspS *DispatcherService) Start(shutdown chan struct{}) (err error) { - if dspS.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (dspS *DispatcherService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -73,7 +66,7 @@ func (dspS *DispatcherService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - dspS.srvIndexer, dspS.cfg.GeneralCfg().ConnectTimeout) + registry, dspS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -111,12 +104,12 @@ func (dspS *DispatcherService) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (dspS *DispatcherService) Reload(_ chan struct{}) (err error) { +func (dspS *DispatcherService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return // for the momment nothing to reload } // Shutdown stops the service -func (dspS *DispatcherService) Shutdown() (err error) { +func (dspS *DispatcherService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { dspS.Lock() defer dspS.Unlock() dspS.dspS = nil @@ -126,16 +119,10 @@ func (dspS *DispatcherService) Shutdown() (err error) { dspS.unregisterAllDispatchedSubsystems() dspS.connMgr.DisableDispatcher() dspS.sync() + close(dspS.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (dspS *DispatcherService) IsRunning() bool { - dspS.RLock() - defer dspS.RUnlock() - return dspS.dspS != nil -} - // ServiceName returns the service name func (dspS *DispatcherService) ServiceName() string { return utils.DispatcherS diff --git a/services/dnsagent.go b/services/dnsagent.go index 0ac3f0c98..915d18b3a 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -32,13 +32,11 @@ import ( // NewDNSAgent returns the DNS Agent func NewDNSAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *DNSAgent { + connMgr *engine.ConnManager) *DNSAgent { return &DNSAgent{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -52,18 +50,13 @@ type DNSAgent struct { dns *agents.DNSAgent connMgr *engine.ConnManager - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start -func (dns *DNSAgent) Start(shutdown chan struct{}) (err error) { - if dns.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - - fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, dns.srvIndexer, +func (dns *DNSAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { + fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, dns.cfg.GeneralCfg().ConnectTimeout) if err != nil { return @@ -84,10 +77,11 @@ func (dns *DNSAgent) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (dns *DNSAgent) Reload(shutdown chan struct{}) (err error) { - fs := dns.srvIndexer.Lookup(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dns.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.DNSAgent, utils.FilterS, utils.StateServiceUP) +func (dns *DNSAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { + fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, + dns.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return } dns.Lock() @@ -97,7 +91,7 @@ func (dns *DNSAgent) Reload(shutdown chan struct{}) (err error) { close(dns.stopChan) } - dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), dns.connMgr) + dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.(*FilterService).FilterS(), dns.connMgr) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) dns.dns = nil @@ -122,7 +116,7 @@ func (dns *DNSAgent) listenAndServe(stopChan chan struct{}, shutdown chan struct } // Shutdown stops the service -func (dns *DNSAgent) Shutdown() (err error) { +func (dns *DNSAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { if dns.dns == nil { return } @@ -130,16 +124,10 @@ func (dns *DNSAgent) Shutdown() (err error) { dns.Lock() defer dns.Unlock() dns.dns = nil + close(dns.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (dns *DNSAgent) IsRunning() bool { - dns.RLock() - defer dns.RUnlock() - return dns.dns != nil -} - // ServiceName returns the service name func (dns *DNSAgent) ServiceName() string { return utils.DNSAgent diff --git a/services/ees.go b/services/ees.go index b50fd6ecd..794bffa88 100644 --- a/services/ees.go +++ b/services/ees.go @@ -32,13 +32,11 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *EventExporterService { + connMgr *engine.ConnManager) *EventExporterService { return &EventExporterService{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -52,9 +50,8 @@ type EventExporterService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // ServiceName returns the service name @@ -67,15 +64,8 @@ func (es *EventExporterService) ShouldRun() (should bool) { return es.cfg.EEsCfg().Enabled } -// IsRunning returns if the service is running -func (es *EventExporterService) IsRunning() bool { - es.mu.RLock() - defer es.mu.RUnlock() - return es.eeS != nil -} - // Reload handles the change of config -func (es *EventExporterService) Reload(_ chan struct{}) error { +func (es *EventExporterService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { es.mu.Lock() defer es.mu.Unlock() es.eeS.ClearExporterCache() @@ -83,28 +73,25 @@ func (es *EventExporterService) Reload(_ chan struct{}) error { } // Shutdown stops the service -func (es *EventExporterService) Shutdown() error { +func (es *EventExporterService) Shutdown(_ *servmanager.ServiceRegistry) error { es.mu.Lock() defer es.mu.Unlock() es.eeS.ClearExporterCache() es.eeS = nil es.cl.RpcUnregisterName(utils.EeSv1) + close(es.StateChan(utils.StateServiceDOWN)) return nil } // Start should handle the service start -func (es *EventExporterService) Start(_ chan struct{}) error { - if es.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (es *EventExporterService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, utils.FilterS, utils.AnalyzerS, }, - es.srvIndexer, es.cfg.GeneralCfg().ConnectTimeout) + registry, es.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } diff --git a/services/efs.go b/services/efs.go index 83d53827a..6913f9489 100644 --- a/services/efs.go +++ b/services/efs.go @@ -42,29 +42,22 @@ type ExportFailoverService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // NewExportFailoverService is the constructor for the TpeService -func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *ExportFailoverService { +func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *ExportFailoverService { return &ExportFailoverService{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } // Start should handle the service start -func (efServ *ExportFailoverService) Start(_ chan struct{}) (err error) { - if efServ.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - - cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, efServ.srvIndexer, +func (efServ *ExportFailoverService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { + cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry, efServ.cfg.GeneralCfg().ConnectTimeout) if err != nil { return @@ -83,25 +76,19 @@ func (efServ *ExportFailoverService) Start(_ chan struct{}) (err error) { } // Reload handles the change of config -func (efServ *ExportFailoverService) Reload(_ chan struct{}) (err error) { +func (efServ *ExportFailoverService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return } // Shutdown stops the service -func (efServ *ExportFailoverService) Shutdown() (err error) { +func (efServ *ExportFailoverService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { efServ.srv = nil close(efServ.stopChan) // NEXT SHOULD EXPORT ALL THE SHUTDOWN LOGGERS TO WRITE + close(efServ.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (efServ *ExportFailoverService) IsRunning() bool { - efServ.Lock() - defer efServ.Unlock() - return efServ.efS != nil -} - // ShouldRun returns if the service should be running func (efServ *ExportFailoverService) ShouldRun() bool { return efServ.cfg.EFsCfg().Enabled diff --git a/services/ers.go b/services/ers.go index 1575e225f..92ae2fcae 100644 --- a/services/ers.go +++ b/services/ers.go @@ -34,14 +34,12 @@ import ( // NewEventReaderService returns the EventReader Service func NewEventReaderService( cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *EventReaderService { + connMgr *engine.ConnManager) *EventReaderService { return &EventReaderService{ - rldChan: make(chan struct{}, 1), - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + rldChan: make(chan struct{}, 1), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -57,24 +55,19 @@ type EventReaderService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (erS *EventReaderService) Start(shutdown chan struct{}) (err error) { - if erS.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (erS *EventReaderService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, utils.FilterS, utils.AnalyzerS, }, - erS.srvIndexer, erS.cfg.GeneralCfg().ConnectTimeout) + registry, erS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -113,7 +106,7 @@ func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan, rldC } // Reload handles the change of config -func (erS *EventReaderService) Reload(_ chan struct{}) (err error) { +func (erS *EventReaderService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { erS.RLock() erS.rldChan <- struct{}{} erS.RUnlock() @@ -121,22 +114,16 @@ func (erS *EventReaderService) Reload(_ chan struct{}) (err error) { } // Shutdown stops the service -func (erS *EventReaderService) Shutdown() (err error) { +func (erS *EventReaderService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { erS.Lock() defer erS.Unlock() close(erS.stopChan) erS.ers = nil erS.cl.RpcUnregisterName(utils.ErSv1) + close(erS.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (erS *EventReaderService) IsRunning() bool { - erS.RLock() - defer erS.RUnlock() - return erS.ers != nil -} - // ServiceName returns the service name func (erS *EventReaderService) ServiceName() string { return utils.ERs diff --git a/services/filters.go b/services/filters.go index f5558035a..4b312a719 100644 --- a/services/filters.go +++ b/services/filters.go @@ -29,13 +29,11 @@ import ( ) // NewFilterService instantiates a new FilterService. -func NewFilterService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *FilterService { +func NewFilterService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *FilterService { return &FilterService{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -48,23 +46,18 @@ type FilterService struct { cfg *config.CGRConfig connMgr *engine.ConnManager - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. -func (s *FilterService) Start(shutdown chan struct{}) error { - if s.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (s *FilterService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CacheS, utils.DataDB, }, - s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout) + registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -83,25 +76,19 @@ func (s *FilterService) Start(shutdown chan struct{}) error { } // Reload handles the config changes. -func (s *FilterService) Reload(_ chan struct{}) error { +func (s *FilterService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { return nil } // Shutdown stops the service. -func (s *FilterService) Shutdown() error { +func (s *FilterService) Shutdown(_ *servmanager.ServiceRegistry) error { s.mu.Lock() defer s.mu.Unlock() s.fltrS = nil + close(s.stateDeps.StateChan(utils.StateServiceDOWN)) return nil } -// IsRunning returns whether the service is running or not. -func (s *FilterService) IsRunning() bool { - s.mu.RLock() - defer s.mu.RUnlock() - return s.fltrS != nil -} - // ServiceName returns the service name func (s *FilterService) ServiceName() string { return utils.FilterS diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index ad0a6f367..b0e33870f 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -33,13 +33,11 @@ import ( // NewFreeswitchAgent returns the Freeswitch Agent func NewFreeswitchAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *FreeswitchAgent { + connMgr *engine.ConnManager) *FreeswitchAgent { return &FreeswitchAgent{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -51,17 +49,12 @@ type FreeswitchAgent struct { fS *agents.FSsessions connMgr *engine.ConnManager - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (fS *FreeswitchAgent) Start(shutdown chan struct{}) (err error) { - if fS.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (fS *FreeswitchAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { fS.Lock() defer fS.Unlock() @@ -73,7 +66,7 @@ func (fS *FreeswitchAgent) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (fS *FreeswitchAgent) Reload(shutdown chan struct{}) (err error) { +func (fS *FreeswitchAgent) Reload(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { fS.Lock() defer fS.Unlock() if err = fS.fS.Shutdown(); err != nil { @@ -93,21 +86,15 @@ func (fS *FreeswitchAgent) connect(shutdown chan struct{}) { } // Shutdown stops the service -func (fS *FreeswitchAgent) Shutdown() (err error) { +func (fS *FreeswitchAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { fS.Lock() defer fS.Unlock() err = fS.fS.Shutdown() fS.fS = nil + close(fS.stateDeps.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (fS *FreeswitchAgent) IsRunning() bool { - fS.RLock() - defer fS.RUnlock() - return fS.fS != nil -} - // ServiceName returns the service name func (fS *FreeswitchAgent) ServiceName() string { return utils.FreeSWITCHAgent diff --git a/services/globalvars.go b/services/globalvars.go index d5cbfdf34..6efbf934c 100644 --- a/services/globalvars.go +++ b/services/globalvars.go @@ -28,12 +28,10 @@ import ( ) // NewGlobalVarS . -func NewGlobalVarS(cfg *config.CGRConfig, - srvIndexer *servmanager.ServiceRegistry) *GlobalVarS { +func NewGlobalVarS(cfg *config.CGRConfig) *GlobalVarS { return &GlobalVarS{ - cfg: cfg, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -41,13 +39,12 @@ func NewGlobalVarS(cfg *config.CGRConfig, type GlobalVarS struct { cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (gv *GlobalVarS) Start(_ chan struct{}) error { +func (gv *GlobalVarS) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) error { engine.SetHTTPPstrTransport(gv.cfg.HTTPCfg().ClientOpts) utils.DecimalContext.MaxScale = gv.cfg.GeneralCfg().DecimalMaxScale utils.DecimalContext.MinScale = gv.cfg.GeneralCfg().DecimalMinScale @@ -58,7 +55,7 @@ func (gv *GlobalVarS) Start(_ chan struct{}) error { } // Reload handles the change of config -func (gv *GlobalVarS) Reload(_ chan struct{}) error { +func (gv *GlobalVarS) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { engine.SetHTTPPstrTransport(gv.cfg.HTTPCfg().ClientOpts) utils.DecimalContext.MaxScale = gv.cfg.GeneralCfg().DecimalMaxScale utils.DecimalContext.MinScale = gv.cfg.GeneralCfg().DecimalMinScale @@ -68,15 +65,11 @@ func (gv *GlobalVarS) Reload(_ chan struct{}) error { } // Shutdown stops the service -func (gv *GlobalVarS) Shutdown() error { +func (gv *GlobalVarS) Shutdown(_ *servmanager.ServiceRegistry) error { + close(gv.StateChan(utils.StateServiceDOWN)) return nil } -// IsRunning returns if the service is running -func (gv *GlobalVarS) IsRunning() bool { - return IsServiceInState(gv.ServiceName(), utils.StateServiceUP, gv.srvIndexer) -} - // ServiceName returns the service name func (gv *GlobalVarS) ServiceName() string { return utils.GlobalVarS diff --git a/services/guardian.go b/services/guardian.go index 546204cd5..514b2433c 100644 --- a/services/guardian.go +++ b/services/guardian.go @@ -31,11 +31,10 @@ import ( ) // NewGuardianService instantiates a new GuardianService. -func NewGuardianService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceRegistry) *GuardianService { +func NewGuardianService(cfg *config.CGRConfig) *GuardianService { return &GuardianService{ - cfg: cfg, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -44,23 +43,18 @@ type GuardianService struct { mu sync.RWMutex cfg *config.CGRConfig cl *commonlisteners.CommonListenerS - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. -func (s *GuardianService) Start(_ chan struct{}) error { - if s.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (s *GuardianService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) error { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, utils.AnalyzerS, }, - s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout) + registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -82,23 +76,19 @@ func (s *GuardianService) Start(_ chan struct{}) error { } // Reload handles the config changes. -func (s *GuardianService) Reload(_ chan struct{}) error { +func (s *GuardianService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error { return nil } // Shutdown stops the service. -func (s *GuardianService) Shutdown() error { +func (s *GuardianService) Shutdown(_ *servmanager.ServiceRegistry) error { s.mu.Lock() defer s.mu.Unlock() s.cl.RpcUnregisterName(utils.GuardianSv1) + close(s.StateChan(utils.StateServiceDOWN)) return nil } -// IsRunning returns whether the service is running or not. -func (s *GuardianService) IsRunning() bool { - return IsServiceInState(s.ServiceName(), utils.StateServiceUP, s.srvIndexer) -} - // ServiceName returns the service name func (s *GuardianService) ServiceName() string { return utils.GuardianS diff --git a/services/httpagent.go b/services/httpagent.go index 38c88fda0..6f3e4e0ec 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -32,13 +32,11 @@ import ( // NewHTTPAgent returns the HTTP Agent func NewHTTPAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *HTTPAgent { + connMgr *engine.ConnManager) *HTTPAgent { return &HTTPAgent{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -55,23 +53,18 @@ type HTTPAgent struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (ha *HTTPAgent) Start(_ chan struct{}) (err error) { - if ha.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (ha *HTTPAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, utils.FilterS, }, - ha.srvIndexer, ha.cfg.GeneralCfg().ConnectTimeout) + registry, ha.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -93,25 +86,19 @@ func (ha *HTTPAgent) Start(_ chan struct{}) (err error) { } // Reload handles the change of config -func (ha *HTTPAgent) Reload(_ chan struct{}) (err error) { +func (ha *HTTPAgent) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return // no reload } // Shutdown stops the service -func (ha *HTTPAgent) Shutdown() (err error) { +func (ha *HTTPAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { ha.Lock() ha.started = false ha.Unlock() + close(ha.stateDeps.StateChan(utils.StateServiceDOWN)) return // no shutdown for the momment } -// IsRunning returns if the service is running -func (ha *HTTPAgent) IsRunning() bool { - ha.RLock() - defer ha.RUnlock() - return ha.started -} - // ServiceName returns the service name func (ha *HTTPAgent) ServiceName() string { return utils.HTTPAgent diff --git a/services/janus.go b/services/janus.go index 4df44c2ea..3530b8ab1 100644 --- a/services/janus.go +++ b/services/janus.go @@ -33,13 +33,11 @@ import ( // NewJanusAgent returns the Janus Agent func NewJanusAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *JanusAgent { + connMgr *engine.ConnManager) *JanusAgent { return &JanusAgent{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -56,19 +54,18 @@ type JanusAgent struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should jandle the sercive start -func (ja *JanusAgent) Start(_ chan struct{}) (err error) { +func (ja *JanusAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, utils.FilterS, }, - ja.srvIndexer, ja.cfg.GeneralCfg().ConnectTimeout) + registry, ja.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -103,26 +100,20 @@ func (ja *JanusAgent) Start(_ chan struct{}) (err error) { } // Reload jandles the change of config -func (ja *JanusAgent) Reload(_ chan struct{}) (err error) { +func (ja *JanusAgent) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return // no reload } // Shutdown stops the service -func (ja *JanusAgent) Shutdown() (err error) { +func (ja *JanusAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { ja.Lock() err = ja.jA.Shutdown() ja.started = false ja.Unlock() + close(ja.stateDeps.StateChan(utils.StateServiceDOWN)) return // no shutdown for the momment } -// IsRunning returns if the service is running -func (ja *JanusAgent) IsRunning() bool { - ja.RLock() - defer ja.RUnlock() - return ja.started -} - // ServiceName returns the service name func (ja *JanusAgent) ServiceName() string { return utils.JanusAgent diff --git a/services/kamailioagent.go b/services/kamailioagent.go index d92679b1a..8c26eb726 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -33,13 +33,11 @@ import ( // NewKamailioAgent returns the Kamailio Agent func NewKamailioAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *KamailioAgent { + connMgr *engine.ConnManager) *KamailioAgent { return &KamailioAgent{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -51,17 +49,12 @@ type KamailioAgent struct { kam *agents.KamailioAgent connMgr *engine.ConnManager - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (kam *KamailioAgent) Start(shutdown chan struct{}) (err error) { - if kam.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (kam *KamailioAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { kam.Lock() defer kam.Unlock() @@ -74,7 +67,7 @@ func (kam *KamailioAgent) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (kam *KamailioAgent) Reload(shutdown chan struct{}) (err error) { +func (kam *KamailioAgent) Reload(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { kam.Lock() defer kam.Unlock() if err = kam.kam.Shutdown(); err != nil { @@ -98,21 +91,15 @@ func (kam *KamailioAgent) connect(k *agents.KamailioAgent, shutdown chan struct{ } // Shutdown stops the service -func (kam *KamailioAgent) Shutdown() (err error) { +func (kam *KamailioAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { kam.Lock() defer kam.Unlock() err = kam.kam.Shutdown() kam.kam = nil + close(kam.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (kam *KamailioAgent) IsRunning() bool { - kam.RLock() - defer kam.RUnlock() - return kam.kam != nil -} - // ServiceName returns the service name func (kam *KamailioAgent) ServiceName() string { return utils.KamailioAgent diff --git a/services/loaders.go b/services/loaders.go index 74c276e94..17cbfcea6 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -32,14 +32,12 @@ import ( // NewLoaderService returns the Loader Service func NewLoaderService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *LoaderService { + connMgr *engine.ConnManager) *LoaderService { return &LoaderService{ - cfg: cfg, - connMgr: connMgr, - stopChan: make(chan struct{}), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stopChan: make(chan struct{}), + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -54,17 +52,12 @@ type LoaderService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start -func (ldrs *LoaderService) Start(_ chan struct{}) (err error) { - if ldrs.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (ldrs *LoaderService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -72,7 +65,7 @@ func (ldrs *LoaderService) Start(_ chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - ldrs.srvIndexer, ldrs.cfg.GeneralCfg().ConnectTimeout) + registry, ldrs.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -105,15 +98,18 @@ func (ldrs *LoaderService) Start(_ chan struct{}) (err error) { } // Reload handles the change of config -func (ldrs *LoaderService) Reload(_ chan struct{}) error { - fs := ldrs.srvIndexer.Lookup(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.FilterS, utils.StateServiceUP) - } - dbs := ldrs.srvIndexer.Lookup(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.DataDB, utils.StateServiceUP) +func (ldrs *LoaderService) Reload(_ chan struct{}, registry *servmanager.ServiceRegistry) error { + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.FilterS, + utils.DataDB, + }, + registry, ldrs.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) close(ldrs.stopChan) ldrs.stopChan = make(chan struct{}) @@ -125,22 +121,16 @@ func (ldrs *LoaderService) Reload(_ chan struct{}) error { } // Shutdown stops the service -func (ldrs *LoaderService) Shutdown() (_ error) { +func (ldrs *LoaderService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { ldrs.Lock() ldrs.ldrs = nil close(ldrs.stopChan) ldrs.cl.RpcUnregisterName(utils.LoaderSv1) ldrs.Unlock() + close(ldrs.stateDeps.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (ldrs *LoaderService) IsRunning() bool { - ldrs.RLock() - defer ldrs.RUnlock() - return ldrs.ldrs != nil && ldrs.ldrs.Enabled() -} - // ServiceName returns the service name func (ldrs *LoaderService) ServiceName() string { return utils.LoaderS diff --git a/services/radiusagent.go b/services/radiusagent.go index 070d8c27a..1f2d8280f 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -32,13 +32,11 @@ import ( // NewRadiusAgent returns the Radius Agent func NewRadiusAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *RadiusAgent { + connMgr *engine.ConnManager) *RadiusAgent { return &RadiusAgent{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -55,18 +53,13 @@ type RadiusAgent struct { lauth string lacct string - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (rad *RadiusAgent) Start(shutdown chan struct{}) (err error) { - if rad.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - - fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, rad.srvIndexer, +func (rad *RadiusAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { + fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, rad.cfg.GeneralCfg().ConnectTimeout) if err != nil { return @@ -98,7 +91,7 @@ func (rad *RadiusAgent) listenAndServe(r *agents.RadiusAgent, shutdown chan stru } // Reload handles the change of config -func (rad *RadiusAgent) Reload(shutdown chan struct{}) (err error) { +func (rad *RadiusAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { if rad.lnet == rad.cfg.RadiusAgentCfg().ListenNet && rad.lauth == rad.cfg.RadiusAgentCfg().ListenAuth && rad.lacct == rad.cfg.RadiusAgentCfg().ListenAcct { @@ -106,12 +99,13 @@ func (rad *RadiusAgent) Reload(shutdown chan struct{}) (err error) { } rad.shutdown() - return rad.Start(shutdown) + return rad.Start(shutdown, registry) } // Shutdown stops the service -func (rad *RadiusAgent) Shutdown() (err error) { +func (rad *RadiusAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { rad.shutdown() + close(rad.StateChan(utils.StateServiceDOWN)) return // no shutdown for the momment } @@ -122,13 +116,6 @@ func (rad *RadiusAgent) shutdown() { rad.Unlock() } -// IsRunning returns if the service is running -func (rad *RadiusAgent) IsRunning() bool { - rad.RLock() - defer rad.RUnlock() - return rad.rad != nil -} - // ServiceName returns the service name func (rad *RadiusAgent) ServiceName() string { return utils.RadiusAgent diff --git a/services/rankings.go b/services/rankings.go index 3ae5e215f..945c4742b 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -34,14 +34,12 @@ import ( // NewRankingService returns the RankingS Service func NewRankingService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup, - srvIndexer *servmanager.ServiceRegistry) *RankingService { + srvDep map[string]*sync.WaitGroup) *RankingService { return &RankingService{ - cfg: cfg, - connMgr: connMgr, - srvDep: srvDep, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -55,17 +53,12 @@ type RankingService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (ran *RankingService) Start(shutdown chan struct{}) (err error) { - if ran.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (ran *RankingService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { ran.srvDep[utils.DataDB].Add(1) srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, @@ -76,7 +69,7 @@ func (ran *RankingService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - ran.srvIndexer, ran.cfg.GeneralCfg().ConnectTimeout) + registry, ran.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -112,7 +105,7 @@ func (ran *RankingService) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (ran *RankingService) Reload(_ chan struct{}) (err error) { +func (ran *RankingService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { ran.Lock() ran.ran.Reload(context.TODO()) ran.Unlock() @@ -120,21 +113,17 @@ func (ran *RankingService) Reload(_ chan struct{}) (err error) { } // Shutdown stops the service -func (ran *RankingService) Shutdown() (err error) { +func (ran *RankingService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { defer ran.srvDep[utils.DataDB].Done() ran.Lock() defer ran.Unlock() ran.ran.StopRankingS() ran.ran = nil ran.cl.RpcUnregisterName(utils.RankingSv1) + close(ran.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (ran *RankingService) IsRunning() bool { - return ran.ran != nil -} - // ServiceName returns the service name func (ran *RankingService) ServiceName() string { return utils.RankingS diff --git a/services/rates.go b/services/rates.go index c24157814..5b681d7f6 100644 --- a/services/rates.go +++ b/services/rates.go @@ -31,13 +31,11 @@ import ( ) // NewRateService constructs RateService -func NewRateService(cfg *config.CGRConfig, - srvIndexer *servmanager.ServiceRegistry) *RateService { +func NewRateService(cfg *config.CGRConfig) *RateService { return &RateService{ - cfg: cfg, - rldChan: make(chan struct{}), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + rldChan: make(chan struct{}), + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -52,9 +50,8 @@ type RateService struct { stopChan chan struct{} cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // ServiceName returns the service name @@ -67,35 +64,25 @@ func (rs *RateService) ShouldRun() (should bool) { return rs.cfg.RateSCfg().Enabled } -// IsRunning returns if the service is running -func (rs *RateService) IsRunning() bool { - rs.RLock() - defer rs.RUnlock() - return rs.rateS != nil -} - // Reload handles the change of config -func (rs *RateService) Reload(_ chan struct{}) (_ error) { +func (rs *RateService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (_ error) { rs.rldChan <- struct{}{} return } // Shutdown stops the service -func (rs *RateService) Shutdown() (err error) { +func (rs *RateService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { rs.Lock() defer rs.Unlock() close(rs.stopChan) rs.rateS = nil rs.cl.RpcUnregisterName(utils.RateSv1) + close(rs.StateChan(utils.StateServiceDOWN)) return } // Start should handle the service start -func (rs *RateService) Start(shutdown chan struct{}) (err error) { - if rs.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (rs *RateService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -104,7 +91,7 @@ func (rs *RateService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - rs.srvIndexer, rs.cfg.GeneralCfg().ConnectTimeout) + registry, rs.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } diff --git a/services/registrarc.go b/services/registrarc.go index af8c4827b..6ed3290df 100644 --- a/services/registrarc.go +++ b/services/registrarc.go @@ -30,13 +30,11 @@ import ( ) // NewRegistrarCService returns the Dispatcher Service -func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *RegistrarCService { +func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *RegistrarCService { return &RegistrarCService{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -51,16 +49,12 @@ type RegistrarCService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (dspS *RegistrarCService) Start(_ chan struct{}) (err error) { - if dspS.IsRunning() { - return utils.ErrServiceAlreadyRunning - } +func (dspS *RegistrarCService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { dspS.Lock() defer dspS.Unlock() @@ -73,28 +67,22 @@ func (dspS *RegistrarCService) Start(_ chan struct{}) (err error) { } // Reload handles the change of config -func (dspS *RegistrarCService) Reload(_ chan struct{}) (err error) { +func (dspS *RegistrarCService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { dspS.rldChan <- struct{}{} return // for the momment nothing to reload } // Shutdown stops the service -func (dspS *RegistrarCService) Shutdown() (err error) { +func (dspS *RegistrarCService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { dspS.Lock() close(dspS.stopChan) dspS.dspS.Shutdown() dspS.dspS = nil dspS.Unlock() + close(dspS.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (dspS *RegistrarCService) IsRunning() bool { - dspS.RLock() - defer dspS.RUnlock() - return dspS.dspS != nil -} - // ServiceName returns the service name func (dspS *RegistrarCService) ServiceName() string { return utils.RegistrarC diff --git a/services/resources.go b/services/resources.go index 23d8f0277..b35280aa4 100644 --- a/services/resources.go +++ b/services/resources.go @@ -33,14 +33,12 @@ import ( // NewResourceService returns the Resource Service func NewResourceService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup, - srvIndexer *servmanager.ServiceRegistry) *ResourceService { + srvDep map[string]*sync.WaitGroup) *ResourceService { return &ResourceService{ - cfg: cfg, - connMgr: connMgr, - srvDep: srvDep, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -55,17 +53,12 @@ type ResourceService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start -func (reS *ResourceService) Start(shutdown chan struct{}) (err error) { - if reS.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (reS *ResourceService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { reS.srvDep[utils.DataDB].Add(1) srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, @@ -76,7 +69,7 @@ func (reS *ResourceService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - reS.srvIndexer, reS.cfg.GeneralCfg().ConnectTimeout) + registry, reS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -110,7 +103,7 @@ func (reS *ResourceService) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (reS *ResourceService) Reload(_ chan struct{}) (err error) { +func (reS *ResourceService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { reS.Lock() reS.reS.Reload(context.TODO()) reS.Unlock() @@ -118,23 +111,17 @@ func (reS *ResourceService) Reload(_ chan struct{}) (err error) { } // Shutdown stops the service -func (reS *ResourceService) Shutdown() (err error) { +func (reS *ResourceService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { defer reS.srvDep[utils.DataDB].Done() reS.Lock() defer reS.Unlock() reS.reS.Shutdown(context.TODO()) //we don't verify the error because shutdown never returns an error reS.reS = nil reS.cl.RpcUnregisterName(utils.ResourceSv1) + close(reS.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (reS *ResourceService) IsRunning() bool { - reS.RLock() - defer reS.RUnlock() - return reS.reS != nil -} - // ServiceName returns the service name func (reS *ResourceService) ServiceName() string { return utils.ResourceS diff --git a/services/routes.go b/services/routes.go index f818f35b9..814cb4620 100644 --- a/services/routes.go +++ b/services/routes.go @@ -31,13 +31,11 @@ import ( // NewRouteService returns the Route Service func NewRouteService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *RouteService { + connMgr *engine.ConnManager) *RouteService { return &RouteService{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -51,17 +49,12 @@ type RouteService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (routeS *RouteService) Start(shutdown chan struct{}) (err error) { - if routeS.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (routeS *RouteService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -70,7 +63,7 @@ func (routeS *RouteService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - routeS.srvIndexer, routeS.cfg.GeneralCfg().ConnectTimeout) + registry, routeS.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -101,26 +94,20 @@ func (routeS *RouteService) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (routeS *RouteService) Reload(_ chan struct{}) (err error) { +func (routeS *RouteService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return } // Shutdown stops the service -func (routeS *RouteService) Shutdown() (err error) { +func (routeS *RouteService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { routeS.Lock() defer routeS.Unlock() routeS.routeS = nil routeS.cl.RpcUnregisterName(utils.RouteSv1) + close(routeS.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (routeS *RouteService) IsRunning() bool { - routeS.RLock() - defer routeS.RUnlock() - return routeS.routeS != nil -} - // ServiceName returns the service name func (routeS *RouteService) ServiceName() string { return utils.RouteS diff --git a/services/sessions.go b/services/sessions.go index 104ce7d8b..92cc5c404 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -34,13 +34,11 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *SessionService { + connMgr *engine.ConnManager) *SessionService { return &SessionService{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -56,17 +54,12 @@ type SessionService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start -func (smg *SessionService) Start(shutdown chan struct{}) (err error) { - if smg.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (smg *SessionService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -74,7 +67,7 @@ func (smg *SessionService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - smg.srvIndexer, smg.cfg.GeneralCfg().ConnectTimeout) + registry, smg.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -127,12 +120,12 @@ func (smg *SessionService) start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (smg *SessionService) Reload(_ chan struct{}) (err error) { +func (smg *SessionService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return } // Shutdown stops the service -func (smg *SessionService) Shutdown() (err error) { +func (smg *SessionService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { smg.Lock() defer smg.Unlock() close(smg.stopChan) @@ -146,16 +139,10 @@ func (smg *SessionService) Shutdown() (err error) { smg.sm = nil smg.cl.RpcUnregisterName(utils.SessionSv1) // smg.server.BiRPCUnregisterName(utils.SessionSv1) + close(smg.stateDeps.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (smg *SessionService) IsRunning() bool { - smg.RLock() - defer smg.RUnlock() - return smg.sm != nil -} - // ServiceName returns the service name func (smg *SessionService) ServiceName() string { return utils.SessionS diff --git a/services/sipagent.go b/services/sipagent.go index 95674e024..75da8a2ed 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -32,13 +32,11 @@ import ( // NewSIPAgent returns the sip Agent func NewSIPAgent(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *SIPAgent { + connMgr *engine.ConnManager) *SIPAgent { return &SIPAgent{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -52,18 +50,13 @@ type SIPAgent struct { oldListen string - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (sip *SIPAgent) Start(shutdown chan struct{}) (err error) { - if sip.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - - fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, sip.srvIndexer, +func (sip *SIPAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { + fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry, sip.cfg.GeneralCfg().ConnectTimeout) if err != nil { return @@ -91,7 +84,7 @@ func (sip *SIPAgent) listenAndServe(shutdown chan struct{}) { } // Reload handles the change of config -func (sip *SIPAgent) Reload(shutdown chan struct{}) (err error) { +func (sip *SIPAgent) Reload(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) { if sip.oldListen == sip.cfg.SIPAgentCfg().Listen { return } @@ -105,21 +98,15 @@ func (sip *SIPAgent) Reload(shutdown chan struct{}) (err error) { } // Shutdown stops the service -func (sip *SIPAgent) Shutdown() (err error) { +func (sip *SIPAgent) Shutdown(_ *servmanager.ServiceRegistry) (err error) { sip.Lock() defer sip.Unlock() sip.sip.Shutdown() sip.sip = nil + close(sip.stateDeps.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (sip *SIPAgent) IsRunning() bool { - sip.RLock() - defer sip.RUnlock() - return sip.sip != nil -} - // ServiceName returns the service name func (sip *SIPAgent) ServiceName() string { return utils.SIPAgent diff --git a/services/statedeps.go b/services/statedeps.go index 9b4a8d121..ee1e27bc0 100644 --- a/services/statedeps.go +++ b/services/statedeps.go @@ -77,14 +77,3 @@ func waitForServiceState(state, serviceID string, indexer *servmanager.ServiceRe return nil, fmt.Errorf("timed out waiting for service %q state %q", serviceID, state) } } - -// IsServiceInState performs a non-blocking check to determine if a service is in the specified state. -func IsServiceInState(serviceID, state string, indexer *servmanager.ServiceRegistry) bool { - svc := indexer.Lookup(serviceID) - select { - case <-svc.StateChan(state): - return true - default: - return false - } -} diff --git a/services/stats.go b/services/stats.go index 899fa849c..266224016 100644 --- a/services/stats.go +++ b/services/stats.go @@ -31,16 +31,12 @@ import ( ) // NewStatService returns the Stat Service -func NewStatService(cfg *config.CGRConfig, - connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup, - srvIndexer *servmanager.ServiceRegistry) *StatService { +func NewStatService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup) *StatService { return &StatService{ - cfg: cfg, - connMgr: connMgr, - srvDep: srvDep, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -55,17 +51,12 @@ type StatService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (sts *StatService) Start(shutdown chan struct{}) (err error) { - if sts.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (sts *StatService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { sts.srvDep[utils.DataDB].Add(1) srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, @@ -76,7 +67,7 @@ func (sts *StatService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - sts.srvIndexer, sts.cfg.GeneralCfg().ConnectTimeout) + registry, sts.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -109,7 +100,7 @@ func (sts *StatService) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (sts *StatService) Reload(_ chan struct{}) (err error) { +func (sts *StatService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { sts.Lock() sts.sts.Reload(context.TODO()) sts.Unlock() @@ -117,23 +108,17 @@ func (sts *StatService) Reload(_ chan struct{}) (err error) { } // Shutdown stops the service -func (sts *StatService) Shutdown() (err error) { +func (sts *StatService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { defer sts.srvDep[utils.DataDB].Done() sts.Lock() defer sts.Unlock() sts.sts.Shutdown(context.TODO()) sts.sts = nil sts.cl.RpcUnregisterName(utils.StatSv1) + close(sts.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (sts *StatService) IsRunning() bool { - sts.RLock() - defer sts.RUnlock() - return sts.sts != nil -} - // ServiceName returns the service name func (sts *StatService) ServiceName() string { return utils.StatS diff --git a/services/stordb.go b/services/stordb.go index 9be949571..c31b16edf 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -30,13 +30,11 @@ import ( ) // NewStorDBService returns the StorDB Service -func NewStorDBService(cfg *config.CGRConfig, setVersions bool, - srvIndexer *servmanager.ServiceRegistry) *StorDBService { +func NewStorDBService(cfg *config.CGRConfig, setVersions bool) *StorDBService { return &StorDBService{ cfg: cfg, setVersions: setVersions, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -49,16 +47,12 @@ type StorDBService struct { db engine.StorDB setVersions bool - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start -func (db *StorDBService) Start(_ chan struct{}) (err error) { - if db.IsRunning() { - return utils.ErrServiceAlreadyRunning - } +func (db *StorDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { db.Lock() defer db.Unlock() db.oldDBCfg = db.cfg.StorDbCfg().Clone() @@ -86,7 +80,7 @@ func (db *StorDBService) Start(_ chan struct{}) (err error) { } // Reload handles the change of config -func (db *StorDBService) Reload(_ chan struct{}) (err error) { +func (db *StorDBService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { db.Lock() defer db.Unlock() if db.needsConnectionReload() { @@ -133,21 +127,15 @@ func (db *StorDBService) Reload(_ chan struct{}) (err error) { } // Shutdown stops the service -func (db *StorDBService) Shutdown() (_ error) { +func (db *StorDBService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { db.Lock() db.db.Close() db.db = nil db.Unlock() + close(db.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (db *StorDBService) IsRunning() bool { - db.RLock() - defer db.RUnlock() - return db.isRunning() -} - // isRunning returns if the service is running (not thread safe) func (db *StorDBService) isRunning() bool { return db.db != nil diff --git a/services/thresholds.go b/services/thresholds.go index 76b612318..f5fb196a4 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -33,14 +33,12 @@ import ( // NewThresholdService returns the Threshold Service func NewThresholdService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup, - srvIndexer *servmanager.ServiceRegistry) *ThresholdService { + srvDep map[string]*sync.WaitGroup) *ThresholdService { return &ThresholdService{ - cfg: cfg, - srvDep: srvDep, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + srvDep: srvDep, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -55,17 +53,12 @@ type ThresholdService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (thrs *ThresholdService) Start(shutdown chan struct{}) (err error) { - if thrs.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (thrs *ThresholdService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { thrs.srvDep[utils.DataDB].Add(1) srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, @@ -76,7 +69,7 @@ func (thrs *ThresholdService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - thrs.srvIndexer, thrs.cfg.GeneralCfg().ConnectTimeout) + registry, thrs.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -109,7 +102,7 @@ func (thrs *ThresholdService) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (thrs *ThresholdService) Reload(_ chan struct{}) (_ error) { +func (thrs *ThresholdService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (_ error) { thrs.Lock() thrs.thrs.Reload(context.TODO()) thrs.Unlock() @@ -117,23 +110,17 @@ func (thrs *ThresholdService) Reload(_ chan struct{}) (_ error) { } // Shutdown stops the service -func (thrs *ThresholdService) Shutdown() (_ error) { +func (thrs *ThresholdService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { defer thrs.srvDep[utils.DataDB].Done() thrs.Lock() defer thrs.Unlock() thrs.thrs.Shutdown(context.TODO()) thrs.thrs = nil thrs.cl.RpcUnregisterName(utils.ThresholdSv1) + close(thrs.stateDeps.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (thrs *ThresholdService) IsRunning() bool { - thrs.RLock() - defer thrs.RUnlock() - return thrs.thrs != nil -} - // ServiceName returns the service name func (thrs *ThresholdService) ServiceName() string { return utils.ThresholdS diff --git a/services/tpes.go b/services/tpes.go index ba1e47a21..78cf3b40c 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -31,13 +31,11 @@ import ( ) // NewTPeService is the constructor for the TpeService -func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvIndexer *servmanager.ServiceRegistry) *TPeService { +func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *TPeService { return &TPeService{ - cfg: cfg, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -53,20 +51,19 @@ type TPeService struct { connMgr *engine.ConnManager cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start -func (ts *TPeService) Start(_ chan struct{}) (err error) { +func (ts *TPeService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) { srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, utils.DataDB, }, - ts.srvIndexer, ts.cfg.GeneralCfg().ConnectTimeout) + registry, ts.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -82,24 +79,18 @@ func (ts *TPeService) Start(_ chan struct{}) (err error) { } // Reload handles the change of config -func (ts *TPeService) Reload(_ chan struct{}) (err error) { +func (ts *TPeService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { return } // Shutdown stops the service -func (ts *TPeService) Shutdown() (err error) { +func (ts *TPeService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { ts.srv = nil close(ts.stopChan) + close(ts.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (ts *TPeService) IsRunning() bool { - ts.Lock() - defer ts.Unlock() - return ts.tpes != nil -} - // ServiceName returns the service name func (ts *TPeService) ServiceName() string { return utils.TPeS diff --git a/services/trends.go b/services/trends.go index f3390039d..9c14ad9d0 100644 --- a/services/trends.go +++ b/services/trends.go @@ -33,14 +33,12 @@ import ( // NewTrendsService returns the TrendS Service func NewTrendService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup, - srvIndexer *servmanager.ServiceRegistry) *TrendService { + srvDep map[string]*sync.WaitGroup) *TrendService { return &TrendService{ - cfg: cfg, - connMgr: connMgr, - srvDep: srvDep, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } @@ -54,17 +52,12 @@ type TrendService struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup - intRPCconn birpc.ClientConnector // expose API methods over internal connection - srvIndexer *servmanager.ServiceRegistry // access directly services from here - stateDeps *StateDependencies // channel subscriptions for state changes + intRPCconn birpc.ClientConnector // expose API methods over internal connection + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start -func (trs *TrendService) Start(shutdown chan struct{}) (err error) { - if trs.IsRunning() { - return utils.ErrServiceAlreadyRunning - } - +func (trs *TrendService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) { trs.srvDep[utils.DataDB].Add(1) srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, @@ -75,7 +68,7 @@ func (trs *TrendService) Start(shutdown chan struct{}) (err error) { utils.DataDB, utils.AnalyzerS, }, - trs.srvIndexer, trs.cfg.GeneralCfg().ConnectTimeout) + registry, trs.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } @@ -111,7 +104,7 @@ func (trs *TrendService) Start(shutdown chan struct{}) (err error) { } // Reload handles the change of config -func (trs *TrendService) Reload(_ chan struct{}) (err error) { +func (trs *TrendService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) { trs.Lock() trs.trs.Reload(context.TODO()) trs.Unlock() @@ -119,21 +112,17 @@ func (trs *TrendService) Reload(_ chan struct{}) (err error) { } // Shutdown stops the service -func (trs *TrendService) Shutdown() (err error) { +func (trs *TrendService) Shutdown(_ *servmanager.ServiceRegistry) (err error) { defer trs.srvDep[utils.DataDB].Done() trs.Lock() defer trs.Unlock() trs.trs.StopTrendS() trs.trs = nil trs.cl.RpcUnregisterName(utils.TrendSv1) + close(trs.StateChan(utils.StateServiceDOWN)) return } -// IsRunning returns if the service is running -func (trs *TrendService) IsRunning() bool { - return trs.trs != nil -} - // ServiceName returns the service name func (trs *TrendService) ServiceName() string { return utils.TrendS diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index b21723091..675f32460 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -57,16 +57,16 @@ type ServiceManager struct { // StartServices starts all enabled services func (m *ServiceManager) StartServices(shutdown chan struct{}) { go m.handleReload(shutdown) - for _, service := range m.registry.List() { - if service.ShouldRun() && !service.IsRunning() { + for _, svc := range m.registry.List() { + if svc.ShouldRun() && !IsServiceInState(svc, utils.StateServiceUP) { m.shdWg.Add(1) go func() { - if err := service.Start(shutdown); err != nil && + if err := svc.Start(shutdown, m.registry); err != nil && err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine - utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> service: %v", utils.ServiceManager, service.ServiceName(), err)) + utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) close(shutdown) } - utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, service.ServiceName())) + utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName())) }() } } @@ -76,9 +76,9 @@ func (m *ServiceManager) StartServices(shutdown chan struct{}) { // AddServices adds given services func (m *ServiceManager) AddServices(services ...Service) { m.Lock() - for _, srv := range services { - m.registry.Register(srv) - if sAPIData, hasAPIData := serviceAPIData[srv.ServiceName()]; hasAPIData { // Add the internal connections + for _, svc := range services { + m.registry.Register(svc) + if sAPIData, hasAPIData := serviceAPIData[svc.ServiceName()]; hasAPIData { // Add the internal connections rpcIntChan := make(chan birpc.ClientConnector, 1) m.connMgr.AddInternalConn(sAPIData[1], sAPIData[0], rpcIntChan) if len(sAPIData) > 2 { // Add the bidirectional API @@ -86,14 +86,14 @@ func (m *ServiceManager) AddServices(services ...Service) { } go func() { // ToDo: centralize management into one single goroutine if utils.StructChanTimeout( - m.registry.Lookup(srv.ServiceName()).StateChan(utils.StateServiceUP), + m.registry.Lookup(svc.ServiceName()).StateChan(utils.StateServiceUP), m.cfg.GeneralCfg().ConnectTimeout) { utils.Logger.Err( fmt.Sprintf("<%s> failed to register internal connection to service %s because of timeout waiting for ServiceUP state", - utils.ServiceManager, srv.ServiceName())) + utils.ServiceManager, svc.ServiceName())) // toDo: shutdown service } - rpcIntChan <- srv.IntRPCConn() + rpcIntChan <- svc.IntRPCConn() }() } } @@ -101,49 +101,50 @@ func (m *ServiceManager) AddServices(services ...Service) { } func (m *ServiceManager) handleReload(shutdown chan struct{}) { - var srvName string + var serviceID string for { select { case <-shutdown: m.ShutdownServices() return - case srvName = <-m.rldChan: + case serviceID = <-m.rldChan: } - if srvName == config.RPCConnsJSON { + if serviceID == config.RPCConnsJSON { go m.connMgr.Reload() } else { - go m.reloadService(srvName, shutdown) + go m.reloadService(serviceID, shutdown) } // handle RPC server } } -func (m *ServiceManager) reloadService(srvName string, shutdown chan struct{}) (err error) { - srv := m.registry.Lookup(srvName) - if srv.ShouldRun() { - if srv.IsRunning() { - if err = srv.Reload(shutdown); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> service: %v", utils.ServiceManager, srv.ServiceName(), err)) +func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err error) { + svc := m.registry.Lookup(id) + isUp := IsServiceInState(svc, utils.StateServiceUP) + if svc.ShouldRun() { + if isUp { + if err = svc.Reload(shutdown, m.registry); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) close(shutdown) return // stop if we encounter an error } - utils.Logger.Info(fmt.Sprintf("<%s> reloaded <%s> service", utils.ServiceManager, srv.ServiceName())) + utils.Logger.Info(fmt.Sprintf("<%s> reloaded <%s> service", utils.ServiceManager, svc.ServiceName())) } else { m.shdWg.Add(1) - if err = srv.Start(shutdown); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> serivce: %v", utils.ServiceManager, srv.ServiceName(), err)) + if err = svc.Start(shutdown, m.registry); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> serivce: %v", utils.ServiceManager, svc.ServiceName(), err)) close(shutdown) return // stop if we encounter an error } - utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, srv.ServiceName())) + utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName())) } - } else if srv.IsRunning() { - if err = srv.Shutdown(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v", utils.ServiceManager, srv.ServiceName(), err)) + } else if isUp { + if err = svc.Shutdown(m.registry); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) close(shutdown) } - utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, srv.ServiceName())) + utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName())) m.shdWg.Done() } return @@ -151,16 +152,16 @@ func (m *ServiceManager) reloadService(srvName string, shutdown chan struct{}) ( // ShutdownServices will stop all services func (m *ServiceManager) ShutdownServices() { - for _, srv := range m.registry.List() { - if srv.IsRunning() { + for _, svc := range m.registry.List() { + if IsServiceInState(svc, utils.StateServiceUP) { go func() { defer m.shdWg.Done() - if err := srv.Shutdown(); err != nil { + if err := svc.Shutdown(m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v", - utils.ServiceManager, srv.ServiceName(), err)) + utils.ServiceManager, svc.ServiceName(), err)) return } - utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, srv.ServiceName())) + utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName())) }() } } @@ -169,13 +170,11 @@ func (m *ServiceManager) ShutdownServices() { // Service interface that describes what functions should a service implement type Service interface { // Start should handle the service start - Start(chan struct{}) error + Start(chan struct{}, *ServiceRegistry) error // Reload handles the change of config - Reload(chan struct{}) error + Reload(chan struct{}, *ServiceRegistry) error // Shutdown stops the service - Shutdown() error - // IsRunning returns if the service is running - IsRunning() bool + Shutdown(*ServiceRegistry) error // ShouldRun returns if the service should be running ShouldRun() bool // ServiceName returns the service name @@ -217,13 +216,12 @@ func (m *ServiceManager) V1ServiceStatus(ctx *context.Context, args *ArgsService m.RLock() defer m.RUnlock() - srv := m.registry.Lookup(args.ServiceID) - if srv == nil { + svc := m.registry.Lookup(args.ServiceID) + if svc == nil { return utils.ErrUnsupportedServiceID } - running := srv.IsRunning() - if running { + if IsServiceInState(svc, utils.StateServiceUP) { *reply = utils.RunningCaps } else { *reply = utils.StoppedCaps @@ -413,3 +411,13 @@ var serviceAPIData = map[string][]string{ utils.ErSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs)}, } + +// IsServiceInState performs a non-blocking check to determine if a service is in the specified state. +func IsServiceInState(svc Service, state string) bool { + select { + case <-svc.StateChan(state): + return true + default: + return false + } +} diff --git a/utils/consts.go b/utils/consts.go index 6e1a4fcfa..503dbde7b 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2831,7 +2831,9 @@ const ( ) const ( - StateServiceUP = "SERVICE_UP" + StateServiceUP = "SERVICE_UP" + StateServiceDOWN = "SERVICE_DOWN" + StateServiceInit = "SERVICE_INIT" ) func buildCacheInstRevPrefixes() {