From 20ee079e12e6fc4b429248bd791eb312af15c72b Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 18 Dec 2024 13:58:52 +0200 Subject: [PATCH] Use the waitForServicesToReachState helper --- services/accounts.go | 39 ++++++++++++++++------------------ services/actions.go | 38 +++++++++++++++------------------ services/adminsv1.go | 36 ++++++++++++++------------------ services/analyzers.go | 9 ++++---- services/attributes.go | 42 +++++++++++++++---------------------- services/caches.go | 39 +++++++++++++++++++++------------- services/cdrs.go | 37 ++++++++++++++------------------ services/chargers.go | 42 +++++++++++++++++-------------------- services/config.go | 32 ++++++++++++++++++---------- services/cores.go | 18 +++++++++------- services/diameteragent.go | 10 +++++---- services/dispatchers.go | 39 +++++++++++++++------------------- services/dnsagent.go | 10 +++++---- services/ees.go | 25 +++++++++++----------- services/efs.go | 14 ++++++++----- services/ers.go | 24 ++++++++++----------- services/filters.go | 25 +++++++++++++++------- services/guardian.go | 38 +++++++++++++++++++++++---------- services/httpagent.go | 23 ++++++++++---------- services/janus.go | 19 +++++++++-------- services/loaders.go | 31 +++++++++++++-------------- services/radiusagent.go | 10 ++++----- services/rankings.go | 44 ++++++++++++++++++--------------------- services/rates.go | 38 +++++++++++++++------------------ services/resources.go | 37 +++++++++++++++----------------- services/routes.go | 36 ++++++++++++++------------------ services/sessions.go | 31 +++++++++++++-------------- services/sipagent.go | 9 ++++---- services/stats.go | 37 +++++++++++++++----------------- services/thresholds.go | 37 +++++++++++++++----------------- services/tpes.go | 19 ++++++++++------- services/trends.go | 44 ++++++++++++++++++--------------------- 32 files changed, 466 insertions(+), 466 deletions(-) diff --git a/services/accounts.go b/services/accounts.go index d988a94bc..a13d2959f 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -66,32 +66,29 @@ func (acts *AccountService) Start(shutdown chan struct{}) (err error) { if acts.IsRunning() { return utils.ErrServiceAlreadyRunning } - cls := acts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CommonListenerS, utils.StateServiceUP) - } - acts.cl = cls.CLS() - cacheS := acts.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AccountS, utils.CacheS, utils.StateServiceUP) + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + acts.srvIndexer, acts.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheAccounts, utils.CacheAccountsFilterIndexes); err != nil { - return - } - fs := acts.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AccountS, utils.FilterS, utils.StateServiceUP) - } - dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AccountS, utils.DataDB, utils.StateServiceUP) - } - anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AccountS, utils.AnalyzerS, utils.StateServiceUP) + return err } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) acts.Lock() defer acts.Unlock() diff --git a/services/actions.go b/services/actions.go index 2c0575a17..891c6ec2e 100644 --- a/services/actions.go +++ b/services/actions.go @@ -68,32 +68,28 @@ func (acts *ActionService) Start(shutdown chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - cls := acts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CommonListenerS, utils.StateServiceUP) - } - acts.cl = cls.CLS() - cacheS := acts.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CacheS, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + acts.srvIndexer, acts.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + acts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheActionProfiles, utils.CacheActionProfilesFilterIndexes); err != nil { - return - } - fs := acts.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ActionS, utils.FilterS, utils.StateServiceUP) - } - dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ActionS, utils.DataDB, utils.StateServiceUP) - } - anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ActionS, utils.AnalyzerS, utils.StateServiceUP) + return err } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) acts.Lock() defer acts.Unlock() diff --git a/services/adminsv1.go b/services/adminsv1.go index 193be5b5f..bbaffabe6 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -65,27 +65,23 @@ func (apiService *AdminSv1Service) Start(_ chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - cls := apiService.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AdminS, utils.CommonListenerS, utils.StateServiceUP) - } - apiService.cl = cls.CLS() - fs := apiService.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AdminS, utils.FilterS, utils.StateServiceUP) - } - dbs := apiService.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AdminS, utils.DataDB, utils.StateServiceUP) - } - anz := apiService.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AdminS, utils.AnalyzerS, utils.StateServiceUP) - } - sdbs := apiService.srvIndexer.GetService(utils.StorDB).(*StorDBService) - if utils.StructChanTimeout(sdbs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AdminS, utils.StorDB, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + utils.StorDB, + }, + apiService.srvIndexer, apiService.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + apiService.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + sdbs := srvDeps[utils.StorDB].(*StorDBService) apiService.Lock() defer apiService.Unlock() diff --git a/services/analyzers.go b/services/analyzers.go index e31dc436b..b4a570177 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -71,11 +71,12 @@ func (anz *AnalyzerService) Start(shutdown chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - cls := anz.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), anz.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AnalyzerS, utils.CommonListenerS, utils.StateServiceUP) + cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, anz.srvIndexer, + anz.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return } - anz.cl = cls.CLS() + anz.cl = cls.(*CommonListenerService).CLS() anz.Lock() defer anz.Unlock() diff --git a/services/attributes.go b/services/attributes.go index f2d4de397..7b2add4ee 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -64,37 +64,29 @@ func (attrS *AttributeService) Start(shutdown chan struct{}) (err error) { if attrS.IsRunning() { return utils.ErrServiceAlreadyRunning } - if utils.StructChanTimeout( - attrS.srvIndexer.GetService(utils.CommonListenerS).StateChan(utils.StateServiceUP), - attrS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP) - } - cls := attrS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP) - } - attrS.cl = cls.CLS() - cacheS := attrS.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CacheS, utils.StateServiceUP) + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + attrS.srvIndexer, attrS.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return } + attrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheAttributeProfiles, utils.CacheAttributeFilterIndexes); err != nil { return } - fs := attrS.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.FilterS, utils.StateServiceUP) - } - dbs := attrS.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.DataDB, utils.StateServiceUP) - } - anz := attrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.AnalyzerS, utils.StateServiceUP) - } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) attrS.Lock() defer attrS.Unlock() diff --git a/services/caches.go b/services/caches.go index bd5161948..dbd55fe9b 100644 --- a/services/caches.go +++ b/services/caches.go @@ -19,6 +19,8 @@ along with this program. If not, see package services import ( + "sync" + "github.com/cgrates/birpc" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" @@ -41,6 +43,7 @@ func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager, // CacheService implements Agent interface type CacheService struct { + mu sync.Mutex cl *commonlisteners.CommonListenerS cacheCh chan *engine.CacheS @@ -54,23 +57,29 @@ type CacheService struct { // Start should handle the sercive start func (cS *CacheService) Start(shutdown chan struct{}) (err error) { - cls := cS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CommonListenerS, utils.StateServiceUP) + if cS.IsRunning() { + return utils.ErrServiceAlreadyRunning } - cS.cl = cls.CLS() - dbs := cS.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.CacheS, utils.DataDB, utils.StateServiceUP) - } - anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.CacheS, utils.AnalyzerS, utils.StateServiceUP) - } - cs := cS.srvIndexer.GetService(utils.CoreS).(*CoreService) - if utils.StructChanTimeout(cs.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CoreS, utils.StateServiceUP) + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.DataDB, + utils.AnalyzerS, + utils.CoreS, + }, + cS.srvIndexer, cS.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + cs := srvDeps[utils.CoreS].(*CoreService) + + cS.mu.Lock() + defer cS.mu.Unlock() + engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CoreS().CapsStats) go engine.Cache.Precache(shutdown) diff --git a/services/cdrs.go b/services/cdrs.go index 1d966b55b..5ddf80451 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -64,34 +64,29 @@ func (cs *CDRService) Start(_ chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - cls := cs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.CDRs, utils.CommonListenerS, utils.StateServiceUP) - } - cs.cl = cls.CLS() - fs := cs.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.CDRs, utils.FilterS, utils.StateServiceUP) - } - dbs := cs.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.CDRs, utils.DataDB, utils.StateServiceUP) - } - anz := cs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.CDRs, utils.AnalyzerS, utils.StateServiceUP) - } - sdbs := cs.srvIndexer.GetService(utils.StorDB).(*StorDBService) - if utils.StructChanTimeout(sdbs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.CDRs, utils.StorDB, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + utils.StorDB, + }, + cs.srvIndexer, cs.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + cs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + sdbs := srvDeps[utils.StorDB].(*StorDBService) cs.Lock() defer cs.Unlock() cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs.FilterS(), cs.connMgr, sdbs.DB()) runtime.Gosched() - utils.Logger.Info("Registering CDRS RPC service.") srv, err := engine.NewServiceWithPing(cs.cdrS, utils.CDRsV1, utils.V1Prfx) if err != nil { return err diff --git a/services/chargers.go b/services/chargers.go index 18d0d6805..ed3cf8888 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -57,37 +57,33 @@ type ChargerService struct { } // Start should handle the service start -func (chrS *ChargerService) Start(shutdown chan struct{}) (err error) { +func (chrS *ChargerService) Start(shutdown chan struct{}) error { if chrS.IsRunning() { return utils.ErrServiceAlreadyRunning } - cls := chrS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.CommonListenerS, utils.StateServiceUP) - } - chrS.cl = cls.CLS() - cacheS := chrS.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.CacheS, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + chrS.srvIndexer, chrS.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + chrS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheChargerProfiles, utils.CacheChargerFilterIndexes); err != nil { - return - } - fs := chrS.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.FilterS, utils.StateServiceUP) - } - dbs := chrS.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.DataDB, utils.StateServiceUP) - } - anz := chrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.AnalyzerS, utils.StateServiceUP) + return err } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) chrS.Lock() defer chrS.Unlock() @@ -102,7 +98,7 @@ func (chrS *ChargerService) Start(shutdown chan struct{}) (err error) { chrS.intRPCconn = anz.GetInternalCodec(srv, utils.ChargerS) close(chrS.stateDeps.StateChan(utils.StateServiceUP)) - return + return nil } // Reload handles the change of config diff --git a/services/config.go b/services/config.go index 5eee72fa2..decfa0f79 100644 --- a/services/config.go +++ b/services/config.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/cgrates/birpc" + "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -41,6 +42,7 @@ func NewConfigService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceInde type ConfigService struct { mu sync.RWMutex cfg *config.CGRConfig + cl *commonlisteners.CommonListenerS intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes @@ -48,22 +50,29 @@ type ConfigService struct { // Start handles the service start. func (s *ConfigService) Start(_ chan struct{}) error { - cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP) - } - anz := s.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.AnalyzerS, utils.StateServiceUP) + if s.IsRunning() { + return utils.ErrServiceAlreadyRunning } - srv, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.AnalyzerS, + }, + s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err + } + s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + + svcs, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true) if !s.cfg.DispatcherSCfg().Enabled { - for _, s := range srv { - cls.CLS().RpcRegister(s) + for _, svc := range svcs { + s.cl.RpcRegister(svc) } } - s.intRPCconn = anz.GetInternalCodec(srv, utils.ConfigSv1) + s.intRPCconn = anz.GetInternalCodec(svcs, utils.ConfigSv1) close(s.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -75,6 +84,7 @@ func (s *ConfigService) Reload(_ chan struct{}) error { // Shutdown stops the service. func (s *ConfigService) Shutdown() error { + s.cl.RpcUnregisterName(utils.ConfigSv1) return nil } diff --git a/services/cores.go b/services/cores.go index 441635259..957693590 100644 --- a/services/cores.go +++ b/services/cores.go @@ -71,15 +71,17 @@ func (cS *CoreService) Start(shutdown chan struct{}) error { return utils.ErrServiceAlreadyRunning } - cls := cS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.CoreS, utils.CommonListenerS, utils.StateServiceUP) - } - cS.cl = cls.CLS() - anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.CoreS, utils.AnalyzerS, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.AnalyzerS, + }, + cS.srvIndexer, cS.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + cS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) cS.mu.Lock() defer cS.mu.Unlock() diff --git a/services/diameteragent.go b/services/diameteragent.go index ed4b2cd93..f87aa3206 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -67,13 +67,15 @@ func (da *DiameterAgent) Start(shutdown chan struct{}) error { return utils.ErrServiceAlreadyRunning } - fs := da.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), da.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.DiameterAgent, utils.FilterS, utils.StateServiceUP) + fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, da.srvIndexer, + da.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + da.Lock() defer da.Unlock() - return da.start(fs.FilterS(), da.caps, shutdown) + return da.start(fs.(*FilterService).FilterS(), da.caps, shutdown) } func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutdown chan struct{}) error { diff --git a/services/dispatchers.go b/services/dispatchers.go index 8457e7943..83ce35164 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -64,35 +64,30 @@ func (dspS *DispatcherService) Start(shutdown chan struct{}) (err error) { if dspS.IsRunning() { return utils.ErrServiceAlreadyRunning } - utils.Logger.Info("Starting CGRateS DispatcherS service.") - cls := dspS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.CommonListenerS, utils.StateServiceUP) - } - dspS.cl = cls.CLS() - cacheS := dspS.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.CacheS, utils.StateServiceUP) + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + dspS.srvIndexer, dspS.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + dspS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheDispatcherProfiles, utils.CacheDispatcherHosts, utils.CacheDispatcherFilterIndexes); err != nil { return } - - fs := dspS.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.FilterS, utils.StateServiceUP) - } - dbs := dspS.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.DataDB, utils.StateServiceUP) - } - anz := dspS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.AnalyzerS, utils.StateServiceUP) - } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) dspS.Lock() defer dspS.Unlock() diff --git a/services/dnsagent.go b/services/dnsagent.go index 9b3f5a851..9144c1adb 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -62,14 +62,16 @@ func (dns *DNSAgent) Start(shutdown chan struct{}) (err error) { if dns.IsRunning() { return utils.ErrServiceAlreadyRunning } - fs := dns.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dns.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.DNSAgent, utils.FilterS, utils.StateServiceUP) + + fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, dns.srvIndexer, + dns.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return } dns.Lock() defer dns.Unlock() - dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), dns.connMgr) + dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.(*FilterService).FilterS(), dns.connMgr) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) dns.dns = nil diff --git a/services/ees.go b/services/ees.go index 35bb24095..35451f57d 100644 --- a/services/ees.go +++ b/services/ees.go @@ -98,24 +98,23 @@ func (es *EventExporterService) Start(_ chan struct{}) error { return utils.ErrServiceAlreadyRunning } - cls := es.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.EEs, utils.CommonListenerS, utils.StateServiceUP) - } - es.cl = cls.CLS() - fs := es.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.EEs, utils.FilterS, utils.StateServiceUP) - } - anz := es.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.EEs, utils.AnalyzerS, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.FilterS, + utils.AnalyzerS, + }, + es.srvIndexer, es.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + es.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + fs := srvDeps[utils.FilterS].(*FilterService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) es.mu.Lock() defer es.mu.Unlock() - var err error es.eeS, err = ees.NewEventExporterS(es.cfg, fs.FilterS(), es.connMgr) if err != nil { return err diff --git a/services/efs.go b/services/efs.go index 1a68b2b18..750131a3f 100644 --- a/services/efs.go +++ b/services/efs.go @@ -63,17 +63,21 @@ func (efServ *ExportFailoverService) Start(_ chan struct{}) (err error) { if efServ.IsRunning() { return utils.ErrServiceAlreadyRunning } - cls := efServ.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), efServ.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.EFs, utils.CommonListenerS, utils.StateServiceUP) + + cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, efServ.srvIndexer, + efServ.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return } - efServ.cl = cls.CLS() + efServ.cl = cls.(*CommonListenerService).CLS() + efServ.Lock() + defer efServ.Unlock() + efServ.efS = efs.NewEfs(efServ.cfg, efServ.connMgr) efServ.stopChan = make(chan struct{}) efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx) efServ.cl.RpcRegister(efServ.srv) - efServ.Unlock() close(efServ.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/ers.go b/services/ers.go index 6748edc59..d8e9c6c0c 100644 --- a/services/ers.go +++ b/services/ers.go @@ -68,19 +68,19 @@ func (erS *EventReaderService) Start(shutdown chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - cls := erS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ERs, utils.CommonListenerS, utils.StateServiceUP) - } - erS.cl = cls.CLS() - fs := erS.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ERs, utils.FilterS, utils.StateServiceUP) - } - anz := erS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ERs, utils.AnalyzerS, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.FilterS, + utils.AnalyzerS, + }, + erS.srvIndexer, erS.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + erS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + fs := srvDeps[utils.FilterS].(*FilterService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) erS.Lock() defer erS.Unlock() diff --git a/services/filters.go b/services/filters.go index 17f1fce36..227763b5d 100644 --- a/services/filters.go +++ b/services/filters.go @@ -55,17 +55,28 @@ type FilterService struct { // Start handles the service start. func (s *FilterService) Start(shutdown chan struct{}) error { - cacheS := s.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.FilterS, utils.CacheS, utils.StateServiceUP) + if s.IsRunning() { + return utils.ErrServiceAlreadyRunning } - if err := cacheS.WaitToPrecache(shutdown, utils.CacheFilters); err != nil { + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CacheS, + utils.DataDB, + }, + s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout) + if err != nil { return err } - dbs := s.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.FilterS, utils.DataDB, utils.StateServiceUP) + cacheS := srvDeps[utils.CacheS].(*CacheService) + if err = cacheS.WaitToPrecache(shutdown, utils.CacheFilters); err != nil { + return err } + dbs := srvDeps[utils.DataDB].(*DataDBService) + + s.mu.Lock() + defer s.mu.Unlock() + s.fltrS = engine.NewFilterS(s.cfg, s.connMgr, dbs.DataManager()) close(s.stateDeps.StateChan(utils.StateServiceUP)) return nil diff --git a/services/guardian.go b/services/guardian.go index 668875cb8..bb8f5de78 100644 --- a/services/guardian.go +++ b/services/guardian.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/cgrates/birpc" + "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/guardian" @@ -42,6 +43,7 @@ func NewGuardianService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceIn type GuardianService struct { mu sync.RWMutex cfg *config.CGRConfig + cl *commonlisteners.CommonListenerS intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here stateDeps *StateDependencies // channel subscriptions for state changes @@ -49,21 +51,32 @@ type GuardianService struct { // Start handles the service start. func (s *GuardianService) Start(_ chan struct{}) error { - cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP) + if s.IsRunning() { + return utils.ErrServiceAlreadyRunning } - anz := s.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.AnalyzerS, utils.StateServiceUP) + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.AnalyzerS, + }, + s.srvIndexer, s.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } - srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true) + s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) + + s.mu.Lock() + defer s.mu.Unlock() + + svcs, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true) if !s.cfg.DispatcherSCfg().Enabled { - for _, s := range srv { - cls.CLS().RpcRegister(s) + for _, svc := range svcs { + s.cl.RpcRegister(svc) } } - s.intRPCconn = anz.GetInternalCodec(srv, utils.GuardianS) + s.intRPCconn = anz.GetInternalCodec(svcs, utils.GuardianS) close(s.stateDeps.StateChan(utils.StateServiceUP)) return nil } @@ -75,6 +88,9 @@ func (s *GuardianService) Reload(_ chan struct{}) error { // Shutdown stops the service. func (s *GuardianService) Shutdown() error { + s.mu.Lock() + defer s.mu.Unlock() + s.cl.RpcUnregisterName(utils.GuardianSv1) return nil } @@ -85,7 +101,7 @@ func (s *GuardianService) IsRunning() bool { // ServiceName returns the service name func (s *GuardianService) ServiceName() string { - return utils.FilterS + return utils.GuardianS } // ShouldRun returns if the service should be running. diff --git a/services/httpagent.go b/services/httpagent.go index aaa76c21e..d9922ac02 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -19,7 +19,6 @@ along with this program. If not, see package services import ( - "fmt" "sync" "github.com/cgrates/birpc" @@ -67,26 +66,28 @@ func (ha *HTTPAgent) Start(_ chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - cls := ha.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ha.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.CommonListenerS, utils.StateServiceUP) - } - cl := cls.CLS() - fs := ha.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ha.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.FilterS, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.FilterS, + }, + ha.srvIndexer, ha.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + fs := srvDeps[utils.FilterS].(*FilterService) ha.Lock() + defer ha.Unlock() + ha.started = true - utils.Logger.Info(fmt.Sprintf("<%s> successfully started HTTPAgent", utils.HTTPAgent)) for _, agntCfg := range ha.cfg.HTTPAgentCfg() { cl.RegisterHttpHandler(agntCfg.URL, agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, fs.FilterS(), ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload, agntCfg.ReplyPayload, agntCfg.RequestProcessors)) } - ha.Unlock() close(ha.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/janus.go b/services/janus.go index 60679ff6a..5d2134183 100644 --- a/services/janus.go +++ b/services/janus.go @@ -63,15 +63,17 @@ type JanusAgent struct { // Start should jandle the sercive start func (ja *JanusAgent) Start(_ chan struct{}) (err error) { - cls := ja.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ja.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.CommonListenerS, utils.StateServiceUP) - } - cl := cls.CLS() - fs := ja.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ja.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.FilterS, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.FilterS, + }, + ja.srvIndexer, ja.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + cl := srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + fs := srvDeps[utils.FilterS].(*FilterService) ja.Lock() if ja.started { @@ -97,7 +99,6 @@ func (ja *JanusAgent) Start(_ chan struct{}) (err error) { ja.started = true ja.Unlock() close(ja.stateDeps.StateChan(utils.StateServiceUP)) - utils.Logger.Info(fmt.Sprintf("<%s> successfully started.", utils.JanusAgent)) return } diff --git a/services/loaders.go b/services/loaders.go index 22207ea4c..add5423e2 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -65,24 +65,21 @@ func (ldrs *LoaderService) Start(_ chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - cls := ldrs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.CommonListenerS, utils.StateServiceUP) - } - ldrs.cl = cls.CLS() - - fs := ldrs.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.FilterS, utils.StateServiceUP) - } - dbs := ldrs.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.DataDB, utils.StateServiceUP) - } - anz := ldrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.AnalyzerS, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + ldrs.srvIndexer, ldrs.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + ldrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) ldrs.Lock() defer ldrs.Unlock() diff --git a/services/radiusagent.go b/services/radiusagent.go index 714d0f33a..879db8b68 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -66,9 +66,10 @@ func (rad *RadiusAgent) Start(shutdown chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - fs := rad.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), rad.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RadiusAgent, utils.FilterS, utils.StateServiceUP) + fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, rad.srvIndexer, + rad.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return } rad.Lock() @@ -78,8 +79,7 @@ func (rad *RadiusAgent) Start(shutdown chan struct{}) (err error) { rad.lauth = rad.cfg.RadiusAgentCfg().ListenAuth rad.lacct = rad.cfg.RadiusAgentCfg().ListenAcct - if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.FilterS(), rad.connMgr); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error())) + if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.(*FilterService).FilterS(), rad.connMgr); err != nil { return } rad.stopChan = make(chan struct{}) diff --git a/services/rankings.go b/services/rankings.go index 80a1b9b3d..8d06c5aea 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -67,33 +67,29 @@ func (ran *RankingService) Start(shutdown chan struct{}) (err error) { } ran.srvDep[utils.DataDB].Add(1) - cls := ran.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RankingS, utils.CommonListenerS, utils.StateServiceUP) - } - ran.cl = cls.CLS() - cacheS := ran.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RankingS, utils.CacheS, utils.StateServiceUP) - } - if err = cacheS.WaitToPrecache(shutdown, - utils.CacheRankingProfiles, - utils.CacheRankings, - ); err != nil { + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + ran.srvIndexer, ran.cfg.GeneralCfg().ConnectTimeout) + if err != nil { return err } - dbs := ran.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RankingS, utils.DataDB, utils.StateServiceUP) - } - fs := ran.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RankingS, utils.FilterS, utils.StateServiceUP) - } - anz := ran.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RankingS, utils.AnalyzerS, utils.StateServiceUP) + ran.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) + if err = cacheS.WaitToPrecache(shutdown, + utils.CacheRankingProfiles, + utils.CacheRankings); err != nil { + return err } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) ran.Lock() defer ran.Unlock() diff --git a/services/rates.go b/services/rates.go index 1ff179fef..51e07b070 100644 --- a/services/rates.go +++ b/services/rates.go @@ -96,33 +96,29 @@ func (rs *RateService) Start(shutdown chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - cls := rs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RateS, utils.CommonListenerS, utils.StateServiceUP) - } - rs.cl = cls.CLS() - cacheS := rs.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RateS, utils.CacheS, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + rs.srvIndexer, rs.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + rs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheRateProfiles, utils.CacheRateProfilesFilterIndexes, utils.CacheRateFilterIndexes); err != nil { - return - } - fs := rs.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RateS, utils.FilterS, utils.StateServiceUP) - } - dbs := rs.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RateS, utils.DataDB, utils.StateServiceUP) - } - anz := rs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RateS, utils.AnalyzerS, utils.StateServiceUP) + return err } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) rs.Lock() rs.rateS = rates.NewRateS(rs.cfg, fs.FilterS(), dbs.DataManager()) diff --git a/services/resources.go b/services/resources.go index 19a5fc1c2..610f62b7f 100644 --- a/services/resources.go +++ b/services/resources.go @@ -67,33 +67,30 @@ func (reS *ResourceService) Start(shutdown chan struct{}) (err error) { } reS.srvDep[utils.DataDB].Add(1) - cls := reS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.CommonListenerS, utils.StateServiceUP) - } - reS.cl = cls.CLS() - cacheS := reS.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.CacheS, utils.StateServiceUP) + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + reS.srvIndexer, reS.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + reS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheResourceProfiles, utils.CacheResources, utils.CacheResourceFilterIndexes); err != nil { return } - fs := reS.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.FilterS, utils.StateServiceUP) - } - dbs := reS.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.DataDB, utils.StateServiceUP) - } - anz := reS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.AnalyzerS, utils.StateServiceUP) - } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) reS.Lock() defer reS.Unlock() diff --git a/services/routes.go b/services/routes.go index 9710042e1..f002181f6 100644 --- a/services/routes.go +++ b/services/routes.go @@ -62,32 +62,28 @@ func (routeS *RouteService) Start(shutdown chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - cls := routeS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RouteS, utils.CommonListenerS, utils.StateServiceUP) - } - routeS.cl = cls.CLS() - cacheS := routeS.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RouteS, utils.CacheS, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + routeS.srvIndexer, routeS.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + routeS.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheRouteProfiles, utils.CacheRouteFilterIndexes); err != nil { return } - fs := routeS.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RouteS, utils.FilterS, utils.StateServiceUP) - } - dbs := routeS.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RouteS, utils.DataDB, utils.StateServiceUP) - } - anz := routeS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.RouteS, utils.AnalyzerS, utils.StateServiceUP) - } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) routeS.Lock() defer routeS.Unlock() diff --git a/services/sessions.go b/services/sessions.go index 45355b009..c13c9b018 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -67,23 +67,21 @@ func (smg *SessionService) Start(shutdown chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - cls := smg.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.SessionS, utils.CommonListenerS, utils.StateServiceUP) - } - smg.cl = cls.CLS() - fs := smg.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.SessionS, utils.FilterS, utils.StateServiceUP) - } - dbs := smg.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.SessionS, utils.DataDB, utils.StateServiceUP) - } - anz := smg.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.SessionS, utils.AnalyzerS, utils.StateServiceUP) + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + smg.srvIndexer, smg.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + smg.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) smg.Lock() defer smg.Unlock() @@ -111,6 +109,7 @@ func (smg *SessionService) Start(shutdown chan struct{}) (err error) { // run this in it's own goroutine go smg.start(shutdown) } + smg.intRPCconn = anz.GetInternalCodec(srv, utils.SessionS) close(smg.stateDeps.StateChan(utils.StateServiceUP)) return } diff --git a/services/sipagent.go b/services/sipagent.go index 03621692d..6a9354fe8 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -63,15 +63,16 @@ func (sip *SIPAgent) Start(shutdown chan struct{}) (err error) { return utils.ErrServiceAlreadyRunning } - fs := sip.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), sip.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.SIPAgent, utils.FilterS, utils.StateServiceUP) + fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, sip.srvIndexer, + sip.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return } sip.Lock() defer sip.Unlock() sip.oldListen = sip.cfg.SIPAgentCfg().Listen - sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, fs.FilterS()) + sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, fs.(*FilterService).FilterS()) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SIPAgent, err)) diff --git a/services/stats.go b/services/stats.go index c303c6fd3..aa26e3410 100644 --- a/services/stats.go +++ b/services/stats.go @@ -67,33 +67,30 @@ func (sts *StatService) Start(shutdown chan struct{}) (err error) { } sts.srvDep[utils.DataDB].Add(1) - cls := sts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.StatS, utils.CommonListenerS, utils.StateServiceUP) - } - sts.cl = cls.CLS() - cacheS := sts.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.StatS, utils.CacheS, utils.StateServiceUP) + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + sts.srvIndexer, sts.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + sts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheStatQueueProfiles, utils.CacheStatQueues, utils.CacheStatFilterIndexes); err != nil { return } - fs := sts.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.StatS, utils.FilterS, utils.StateServiceUP) - } - dbs := sts.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.StatS, utils.DataDB, utils.StateServiceUP) - } - anz := sts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.StatS, utils.AnalyzerS, utils.StateServiceUP) - } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) sts.Lock() defer sts.Unlock() diff --git a/services/thresholds.go b/services/thresholds.go index 204e21ee6..754e3980c 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -67,33 +67,30 @@ func (thrs *ThresholdService) Start(shutdown chan struct{}) (err error) { } thrs.srvDep[utils.DataDB].Add(1) - cls := thrs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.CommonListenerS, utils.StateServiceUP) - } - thrs.cl = cls.CLS() - cacheS := thrs.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.CacheS, utils.StateServiceUP) + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + thrs.srvIndexer, thrs.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + thrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) if err = cacheS.WaitToPrecache(shutdown, utils.CacheThresholdProfiles, utils.CacheThresholds, utils.CacheThresholdFilterIndexes); err != nil { return } - fs := thrs.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.FilterS, utils.StateServiceUP) - } - dbs := thrs.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.DataDB, utils.StateServiceUP) - } - anz := thrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.AnalyzerS, utils.StateServiceUP) - } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) thrs.Lock() defer thrs.Unlock() diff --git a/services/tpes.go b/services/tpes.go index 628119f27..ca3915c9c 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -60,15 +60,18 @@ type TPeService struct { // Start should handle the service start func (ts *TPeService) Start(_ chan struct{}) (err error) { - cls := ts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.TPeS, utils.CommonListenerS, utils.StateServiceUP) - } - ts.cl = cls.CLS() - dbs := ts.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ts.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.TPeS, utils.DataDB, utils.StateServiceUP) + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.DataDB, + }, + ts.srvIndexer, ts.cfg.GeneralCfg().ConnectTimeout) + if err != nil { + return err } + ts.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + dbs := srvDeps[utils.DataDB].(*DataDBService) ts.tpes = tpes.NewTPeS(ts.cfg, dbs.DataManager(), ts.connMgr) ts.stopChan = make(chan struct{}) diff --git a/services/trends.go b/services/trends.go index e24f2df1e..d76438fcd 100644 --- a/services/trends.go +++ b/services/trends.go @@ -66,33 +66,29 @@ func (trs *TrendService) Start(shutdown chan struct{}) (err error) { } trs.srvDep[utils.DataDB].Add(1) - cls := trs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) - if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.TrendS, utils.CommonListenerS, utils.StateServiceUP) - } - trs.cl = cls.CLS() - cacheS := trs.srvIndexer.GetService(utils.CacheS).(*CacheService) - if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.TrendS, utils.CacheS, utils.StateServiceUP) - } - if err = cacheS.WaitToPrecache(shutdown, - utils.CacheTrendProfiles, - utils.CacheTrends, - ); err != nil { + + srvDeps, err := waitForServicesToReachState(utils.StateServiceUP, + []string{ + utils.CommonListenerS, + utils.CacheS, + utils.FilterS, + utils.DataDB, + utils.AnalyzerS, + }, + trs.srvIndexer, trs.cfg.GeneralCfg().ConnectTimeout) + if err != nil { return err } - dbs := trs.srvIndexer.GetService(utils.DataDB).(*DataDBService) - if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.TrendS, utils.DataDB, utils.StateServiceUP) - } - fs := trs.srvIndexer.GetService(utils.FilterS).(*FilterService) - if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.TrendS, utils.FilterS, utils.StateServiceUP) - } - anz := trs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) - if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.TrendS, utils.AnalyzerS, utils.StateServiceUP) + trs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + cacheS := srvDeps[utils.CacheS].(*CacheService) + if err = cacheS.WaitToPrecache(shutdown, + utils.CacheTrendProfiles, + utils.CacheTrends); err != nil { + return err } + fs := srvDeps[utils.FilterS].(*FilterService) + dbs := srvDeps[utils.DataDB].(*DataDBService) + anz := srvDeps[utils.AnalyzerS].(*AnalyzerService) trs.Lock() defer trs.Unlock()