From 55ecdf45e404ddb0c8fefa812a691ab6dae8b598 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 10 Dec 2024 19:53:54 +0200 Subject: [PATCH] Remove clsChan in favor of using the service indexer --- cmd/cgr-engine/cgr-engine.go | 78 ++++++++++++++++-------------------- services/accounts.go | 11 ++--- services/actions.go | 10 ++--- services/adminsv1.go | 11 ++--- services/analyzers.go | 11 ++--- services/attributes.go | 10 ++--- services/caches.go | 14 +++---- services/cdrs.go | 10 ++--- services/chargers.go | 11 ++--- services/commonlisteners.go | 14 +++---- services/cores.go | 12 +++--- services/dispatchers.go | 10 ++--- services/ees.go | 11 ++--- services/efs.go | 11 +++-- services/ers.go | 10 ++--- services/httpagent.go | 11 ++--- services/janus.go | 12 +++--- services/loaders.go | 11 ++--- services/rankings.go | 10 ++--- services/rates.go | 11 ++--- services/resources.go | 10 ++--- services/routes.go | 10 ++--- services/sessions.go | 10 ++--- services/stats.go | 10 ++--- services/thresholds.go | 10 ++--- services/tpes.go | 12 +++--- services/trends.go | 10 ++--- 27 files changed, 180 insertions(+), 181 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 13da69960..0af0389cb 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -134,7 +134,6 @@ func runCGREngine(fs []string) (err error) { iGuardianSCh := make(chan birpc.ClientConnector, 1) connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, iGuardianSCh) - clsCh := make(chan *commonlisteners.CommonListenerS, 1) iFilterSCh := make(chan *engine.FilterS, 1) // ServiceIndexer will share service references to all services @@ -142,40 +141,40 @@ func runCGREngine(fs []string) (err error) { gvS := services.NewGlobalVarS(cfg, srvIdxr) dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep, srvIdxr) sdbS := services.NewStorDBService(cfg, *flags.SetVersions, srvIdxr) - cls := services.NewCommonListenerService(cfg, caps, clsCh, srvIdxr) - anzS := services.NewAnalyzerService(cfg, clsCh, iFilterSCh, srvIdxr) - coreS := services.NewCoreService(cfg, caps, clsCh, cpuPrfF, shdWg, srvIdxr) - cacheS := services.NewCacheService(cfg, dmS, connMgr, clsCh, coreS, srvIdxr) - dspS := services.NewDispatcherService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr) - ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr) - efs := services.NewExportFailoverService(cfg, connMgr, clsCh, srvIdxr) - adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, srvIdxr) - sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr) - attrS := services.NewAttributeService(cfg, dmS, iFilterSCh, clsCh, dspS, srvIdxr) - chrgS := services.NewChargerService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr) - routeS := services.NewRouteService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr) - resourceS := services.NewResourceService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr) - trendS := services.NewTrendService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr) - rankingS := services.NewRankingService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr) - thS := services.NewThresholdService(cfg, dmS, iFilterSCh, connMgr, clsCh, srvDep, srvIdxr) - stS := services.NewStatService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr) - erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, clsCh, srvIdxr) + cls := services.NewCommonListenerService(cfg, caps, srvIdxr) + anzS := services.NewAnalyzerService(cfg, iFilterSCh, srvIdxr) + coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg, srvIdxr) + cacheS := services.NewCacheService(cfg, dmS, connMgr, coreS, srvIdxr) + dspS := services.NewDispatcherService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) + ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) + efs := services.NewExportFailoverService(cfg, connMgr, srvIdxr) + adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, connMgr, srvIdxr) + sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) + attrS := services.NewAttributeService(cfg, dmS, iFilterSCh, dspS, srvIdxr) + chrgS := services.NewChargerService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) + routeS := services.NewRouteService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) + resourceS := services.NewResourceService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr) + trendS := services.NewTrendService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr) + rankingS := services.NewRankingService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr) + thS := services.NewThresholdService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr) + stS := services.NewStatService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr) + erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, srvIdxr) dnsAgent := services.NewDNSAgent(cfg, iFilterSCh, connMgr, srvIdxr) fsAgent := services.NewFreeswitchAgent(cfg, connMgr, srvIdxr) kamAgent := services.NewKamailioAgent(cfg, connMgr, srvIdxr) - janusAgent := services.NewJanusAgent(cfg, iFilterSCh, clsCh, connMgr, srvIdxr) + janusAgent := services.NewJanusAgent(cfg, iFilterSCh, connMgr, srvIdxr) astAgent := services.NewAsteriskAgent(cfg, connMgr, srvIdxr) radAgent := services.NewRadiusAgent(cfg, iFilterSCh, connMgr, srvIdxr) diamAgent := services.NewDiameterAgent(cfg, iFilterSCh, connMgr, caps, srvIdxr) - httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, clsCh, connMgr, srvIdxr) + httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, connMgr, srvIdxr) sipAgent := services.NewSIPAgent(cfg, iFilterSCh, connMgr, srvIdxr) - eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, clsCh, srvIdxr) - cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, srvIdxr) + eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, srvIdxr) + cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, connMgr, srvIdxr) registrarcS := services.NewRegistrarCService(cfg, connMgr, srvIdxr) - rateS := services.NewRateService(cfg, iFilterSCh, dmS, clsCh, srvIdxr) - actionS := services.NewActionService(cfg, dmS, iFilterSCh, connMgr, clsCh, srvIdxr) - accS := services.NewAccountService(cfg, dmS, iFilterSCh, connMgr, clsCh, srvIdxr) - tpeS := services.NewTPeService(cfg, connMgr, dmS, clsCh, srvIdxr) + rateS := services.NewRateService(cfg, iFilterSCh, dmS, srvIdxr) + actionS := services.NewActionService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) + accS := services.NewAccountService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) + tpeS := services.NewTPeService(cfg, connMgr, dmS, srvIdxr) srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{ gvS, @@ -312,9 +311,9 @@ func runCGREngine(fs []string) (err error) { // Start FilterS go cgrStartFilterService(ctx, iFilterSCh, cacheS.GetCacheSChan(), connMgr, cfg, dmS) - cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, clsCh, anzS) - cgrInitGuardianSv1(iGuardianSCh, cfg, clsCh, anzS) - cgrInitConfigSv1(iConfigCh, cfg, clsCh, anzS) + cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, cls.CLS(), anzS) + cgrInitGuardianSv1(iGuardianSCh, cfg, cls.CLS(), anzS) + cgrInitConfigSv1(iConfigCh, cfg, cls.CLS(), anzS) if *flags.Preload != utils.EmptyString { if err = cgrRunPreload(ctx, cfg, *flags.Preload, srvIdxr); err != nil { @@ -323,7 +322,7 @@ func runCGREngine(fs []string) (err error) { } // Serve rpc connections - cgrStartRPC(ctx, cancel, cfg, clsCh, srvIdxr) + cgrStartRPC(ctx, cancel, cfg, srvIdxr) // TODO: find a better location for this if block if *flags.MemPrfDir != "" { @@ -393,9 +392,7 @@ func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS } func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig, - clSChan chan *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { - cl := <-clSChan - clSChan <- cl + cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true) if !cfg.DispatcherSCfg().Enabled { for _, s := range srv { @@ -407,9 +404,7 @@ func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGR func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig, - clSChan chan *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { - cl := <-clSChan - clSChan <- cl + cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false) if !cfg.DispatcherSCfg().Enabled { cl.RpcRegister(srv) @@ -418,9 +413,7 @@ func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, } func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector, - cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { - cl := <-clSChan - clSChan <- cl + cfg *config.CGRConfig, cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { srv, _ := engine.NewServiceWithName(cfg, utils.ConfigS, true) // srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false) if !cfg.DispatcherSCfg().Enabled { @@ -432,9 +425,7 @@ func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector, } func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, - cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, sIdxr *servmanager.ServiceIndexer) { - cl := <-clSChan - clSChan <- cl + cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer) { if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this select { case <-sIdxr.GetService(utils.DispatcherS).StateChan(utils.StateServiceUP): @@ -442,6 +433,7 @@ func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, return } } + cl := sIdxr.GetService(utils.CommonListenerS).(*services.CommonListenerService).CLS() cl.StartServer(ctx, shtdwnEngine, cfg) } diff --git a/services/accounts.go b/services/accounts.go index 9f2034b6a..7860abcfe 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -37,14 +37,13 @@ import ( // NewAccountService returns the Account Service func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, + connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AccountService{ cfg: cfg, dm: dm, filterSChan: filterSChan, connMgr: connMgr, - clSChan: clSChan, rldChan: make(chan struct{}, 1), srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -55,7 +54,6 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, type AccountService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -77,8 +75,11 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e if acts.IsRunning() { return utils.ErrServiceAlreadyRunning } - acts.cl = <-acts.clSChan - acts.clSChan <- acts.cl + cls := acts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CommonListenerS, utils.StateServiceUP) + } + acts.cl = cls.CLS() cacheS := acts.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.AccountS, utils.CacheS, utils.StateServiceUP) diff --git a/services/actions.go b/services/actions.go index 282d18db8..dc44726d9 100644 --- a/services/actions.go +++ b/services/actions.go @@ -38,14 +38,12 @@ import ( func NewActionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - clSChan chan *commonlisteners.CommonListenerS, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ActionService{ connMgr: connMgr, cfg: cfg, dm: dm, filterSChan: filterSChan, - clSChan: clSChan, rldChan: make(chan struct{}, 1), srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -56,7 +54,6 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService, type ActionService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -80,8 +77,11 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er return utils.ErrServiceAlreadyRunning } - acts.cl = <-acts.clSChan - acts.clSChan <- acts.cl + cls := acts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CommonListenerS, utils.StateServiceUP) + } + acts.cl = cls.CLS() cacheS := acts.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CacheS, utils.StateServiceUP) diff --git a/services/adminsv1.go b/services/adminsv1.go index f0f576946..0bc12d7af 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -34,7 +34,7 @@ import ( // NewAPIerSv1Service returns the APIerSv1 Service func NewAdminSv1Service(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, - filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, + filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AdminSv1Service{ @@ -42,7 +42,6 @@ func NewAdminSv1Service(cfg *config.CGRConfig, dm: dm, storDB: storDB, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -53,7 +52,6 @@ func NewAdminSv1Service(cfg *config.CGRConfig, type AdminSv1Service struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService storDB *StorDBService filterSChan chan *engine.FilterS @@ -77,8 +75,11 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF return utils.ErrServiceAlreadyRunning } - apiService.cl = <-apiService.clSChan - apiService.clSChan <- apiService.cl + cls := apiService.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AdminS, utils.CommonListenerS, utils.StateServiceUP) + } + apiService.cl = cls.CLS() var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil { return diff --git a/services/analyzers.go b/services/analyzers.go index 2bec8ba7f..12e5d3265 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -33,12 +33,11 @@ import ( ) // NewAnalyzerService returns the Analyzer Service -func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, +func NewAnalyzerService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, srvIndexer *servmanager.ServiceIndexer) *AnalyzerService { return &AnalyzerService{ cfg: cfg, - clSChan: clSChan, filterSChan: filterSChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -49,7 +48,6 @@ func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.Com type AnalyzerService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS filterSChan chan *engine.FilterS anz *analyzers.AnalyzerS @@ -70,8 +68,11 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun return utils.ErrServiceAlreadyRunning } - anz.cl = <-anz.clSChan - anz.clSChan <- anz.cl + cls := anz.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), anz.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AnalyzerS, utils.CommonListenerS, utils.StateServiceUP) + } + anz.cl = cls.CLS() anz.Lock() defer anz.Unlock() diff --git a/services/attributes.go b/services/attributes.go index 54a6b3658..c42aea8d0 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -35,14 +35,12 @@ import ( // NewAttributeService returns the Attribute Service func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, dspS *DispatcherService, sIndxr *servmanager.ServiceIndexer) servmanager.Service { return &AttributeService{ cfg: cfg, dm: dm, filterSChan: filterSChan, - clSChan: clSChan, dspS: dspS, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), serviceIndexer: sIndxr, @@ -53,7 +51,6 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, type AttributeService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService dspS *DispatcherService filterSChan chan *engine.FilterS @@ -79,8 +76,11 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) attrS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP) } - attrS.cl = <-attrS.clSChan - attrS.clSChan <- attrS.cl + cls := attrS.serviceIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP) + } + attrS.cl = cls.CLS() cacheS := attrS.serviceIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CacheS, utils.StateServiceUP) diff --git a/services/caches.go b/services/caches.go index 66ff97bf1..9d4685a32 100644 --- a/services/caches.go +++ b/services/caches.go @@ -31,13 +31,11 @@ import ( // NewCacheService . func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager, - clSChan chan *commonlisteners.CommonListenerS, cores *CoreService, srvIndexer *servmanager.ServiceIndexer) *CacheService { return &CacheService{ cfg: cfg, cores: cores, - clSChan: clSChan, dm: dm, connMgr: connMgr, cacheCh: make(chan *engine.CacheS, 1), @@ -48,9 +46,8 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C // CacheService implements Agent interface type CacheService struct { - cores *CoreService - clSChan chan *commonlisteners.CommonListenerS - dm *DataDBService + cores *CoreService + dm *DataDBService cl *commonlisteners.CommonListenerS @@ -65,8 +62,11 @@ type CacheService struct { // Start should handle the sercive start func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (err error) { - cS.cl = <-cS.clSChan - cS.clSChan <- cS.cl + cls := cS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CommonListenerS, utils.StateServiceUP) + } + cS.cl = cls.CLS() var dm *engine.DataManager if dm, err = cS.dm.WaitForDM(ctx); err != nil { return diff --git a/services/cdrs.go b/services/cdrs.go index 8451a4d72..233b51e5f 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -36,7 +36,6 @@ import ( // NewCDRServer returns the CDR Server func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &CDRService{ @@ -44,7 +43,6 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, dm: dm, storDB: storDB, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -55,7 +53,6 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, type CDRService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService storDB *StorDBService filterSChan chan *engine.FilterS @@ -80,8 +77,11 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs)) - cs.cl = <-cs.clSChan - cs.clSChan <- cs.cl + cls := cs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.CDRs, utils.CommonListenerS, utils.StateServiceUP) + } + cs.cl = cls.CLS() var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, cs.filterSChan); err != nil { return diff --git a/services/chargers.go b/services/chargers.go index 0ecd167af..93b34c0d3 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -34,14 +34,13 @@ import ( // NewChargerService returns the Charger Service func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, - filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, + filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ChargerService{ cfg: cfg, dm: dm, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -52,7 +51,6 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, type ChargerService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -73,8 +71,11 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e return utils.ErrServiceAlreadyRunning } - chrS.cl = <-chrS.clSChan - chrS.clSChan <- chrS.cl + cls := chrS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.CommonListenerS, utils.StateServiceUP) + } + chrS.cl = cls.CLS() cacheS := chrS.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.CacheS, utils.StateServiceUP) diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 1b0802164..83ac17b22 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -33,12 +33,10 @@ import ( // NewCommonListenerService instantiates a new CommonListenerService. func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, - clSChan chan *commonlisteners.CommonListenerS, srvIndexer *servmanager.ServiceIndexer) *CommonListenerService { return &CommonListenerService{ cfg: cfg, caps: caps, - clSChan: clSChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -50,9 +48,8 @@ type CommonListenerService struct { cls *commonlisteners.CommonListenerS - clSChan chan *commonlisteners.CommonListenerS - caps *engine.Caps - cfg *config.CGRConfig + caps *engine.Caps + cfg *config.CGRConfig intRPCconn birpc.ClientConnector // expose API methods over internal connection srvIndexer *servmanager.ServiceIndexer // access directly services from here @@ -67,7 +64,6 @@ func (cl *CommonListenerService) Start(*context.Context, context.CancelFunc) err cl.mu.Lock() defer cl.mu.Unlock() cl.cls = commonlisteners.NewCommonListenerS(cl.caps) - cl.clSChan <- cl.cls if len(cl.cfg.HTTPCfg().RegistrarSURL) != 0 { cl.cls.RegisterHTTPFunc(cl.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar) } @@ -88,7 +84,6 @@ func (cl *CommonListenerService) Shutdown() error { cl.mu.Lock() defer cl.mu.Unlock() cl.cls = nil - <-cl.clSChan return nil } @@ -118,3 +113,8 @@ func (cl *CommonListenerService) StateChan(stateID string) chan struct{} { func (cl *CommonListenerService) IntRPCConn() birpc.ClientConnector { return cl.intRPCconn } + +// CLS returns the CommonListenerS object. +func (cl *CommonListenerService) CLS() *commonlisteners.CommonListenerS { + return cl.cls +} diff --git a/services/cores.go b/services/cores.go index d9dcba3b0..ed7e2b8e5 100644 --- a/services/cores.go +++ b/services/cores.go @@ -34,7 +34,7 @@ import ( ) // NewCoreService returns the Core Service -func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS, +func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU *os.File, shdWg *sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) *CoreService { return &CoreService{ @@ -42,7 +42,6 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *comm cfg: cfg, caps: caps, fileCPU: fileCPU, - clSChan: clSChan, csCh: make(chan *cores.CoreS, 1), srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -53,8 +52,6 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *comm type CoreService struct { mu sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS - cS *cores.CoreS cl *commonlisteners.CommonListenerS @@ -76,8 +73,11 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err return utils.ErrServiceAlreadyRunning } - cS.cl = <-cS.clSChan - cS.clSChan <- cS.cl + cls := cS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.CoreS, utils.CommonListenerS, utils.StateServiceUP) + } + cS.cl = cls.CLS() anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.CoreS, utils.AnalyzerS, utils.StateServiceUP) diff --git a/services/dispatchers.go b/services/dispatchers.go index bffbbab9c..ac38f871c 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -34,14 +34,12 @@ import ( // NewDispatcherService returns the Dispatcher Service func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) *DispatcherService { return &DispatcherService{ cfg: cfg, dm: dm, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, srvsReload: make(map[string]chan struct{}), srvIndexer: srvIndexer, @@ -53,7 +51,6 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, type DispatcherService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -75,8 +72,11 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) return utils.ErrServiceAlreadyRunning } utils.Logger.Info("Starting CGRateS DispatcherS service.") - dspS.cl = <-dspS.clSChan - dspS.clSChan <- dspS.cl + cls := dspS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.CommonListenerS, utils.StateServiceUP) + } + dspS.cl = cls.CLS() cacheS := dspS.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.CacheS, utils.StateServiceUP) diff --git a/services/ees.go b/services/ees.go index 1fbcf8969..da4c453e7 100644 --- a/services/ees.go +++ b/services/ees.go @@ -34,13 +34,12 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, + connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &EventExporterService{ cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, - clSChan: clSChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -50,7 +49,6 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil type EventExporterService struct { mu sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS filterSChan chan *engine.FilterS eeS *ees.EeS @@ -106,8 +104,11 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc return utils.ErrServiceAlreadyRunning } - es.cl = <-es.clSChan - es.clSChan <- es.cl + cls := es.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.EEs, utils.CommonListenerS, utils.StateServiceUP) + } + es.cl = cls.CLS() fltrS, err := waitForFilterS(ctx, es.filterSChan) if err != nil { return err diff --git a/services/efs.go b/services/efs.go index f1abf3312..813bee5ea 100644 --- a/services/efs.go +++ b/services/efs.go @@ -37,8 +37,6 @@ import ( type ExportFailoverService struct { sync.Mutex - clSChan chan *commonlisteners.CommonListenerS - efS *efs.EfS cl *commonlisteners.CommonListenerS srv *birpc.Service @@ -54,11 +52,9 @@ type ExportFailoverService struct { // NewExportFailoverService is the constructor for the TpeService func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - clSChan chan *commonlisteners.CommonListenerS, srvIndexer *servmanager.ServiceIndexer) *ExportFailoverService { return &ExportFailoverService{ cfg: cfg, - clSChan: clSChan, connMgr: connMgr, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -70,8 +66,11 @@ func (efServ *ExportFailoverService) Start(ctx *context.Context, _ context.Cance if efServ.IsRunning() { return utils.ErrServiceAlreadyRunning } - efServ.cl = <-efServ.clSChan - efServ.clSChan <- efServ.cl + cls := efServ.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), efServ.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.EFs, utils.CommonListenerS, utils.StateServiceUP) + } + efServ.cl = cls.CLS() efServ.Lock() efServ.efS = efs.NewEfs(efServ.cfg, efServ.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EFs)) diff --git a/services/ers.go b/services/ers.go index e4f77121f..047130f18 100644 --- a/services/ers.go +++ b/services/ers.go @@ -37,14 +37,12 @@ func NewEventReaderService( cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - clSChan chan *commonlisteners.CommonListenerS, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &EventReaderService{ rldChan: make(chan struct{}, 1), cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, - clSChan: clSChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -54,7 +52,6 @@ func NewEventReaderService( type EventReaderService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS filterSChan chan *engine.FilterS ers *ers.ERService @@ -76,8 +73,11 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel return utils.ErrServiceAlreadyRunning } - erS.cl = <-erS.clSChan - erS.clSChan <- erS.cl + cls := erS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ERs, utils.CommonListenerS, utils.StateServiceUP) + } + erS.cl = cls.CLS() var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil { return diff --git a/services/httpagent.go b/services/httpagent.go index cbd12a0ad..1edf35a10 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -34,12 +34,11 @@ import ( // NewHTTPAgent returns the HTTP Agent func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, + connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &HTTPAgent{ cfg: cfg, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -50,7 +49,6 @@ func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, type HTTPAgent struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS filterSChan chan *engine.FilterS cl *commonlisteners.CommonListenerS @@ -73,8 +71,11 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro return utils.ErrServiceAlreadyRunning } - cl := <-ha.clSChan - ha.clSChan <- cl + cls := ha.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ha.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.CommonListenerS, utils.StateServiceUP) + } + cl := cls.CLS() var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, ha.filterSChan); err != nil { return diff --git a/services/janus.go b/services/janus.go index f2248f302..2b18e0a5d 100644 --- a/services/janus.go +++ b/services/janus.go @@ -26,7 +26,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -35,12 +34,11 @@ import ( // NewJanusAgent returns the Janus Agent func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, + connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &JanusAgent{ cfg: cfg, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -51,7 +49,6 @@ func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, type JanusAgent struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS filterSChan chan *engine.FilterS jA *agents.JanusAgent @@ -70,8 +67,11 @@ type JanusAgent struct { // Start should jandle the sercive start func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err error) { - cl := <-ja.clSChan - ja.clSChan <- cl + cls := ja.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ja.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.CommonListenerS, utils.StateServiceUP) + } + cl := cls.CLS() var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, ja.filterSChan); err != nil { return diff --git a/services/loaders.go b/services/loaders.go index e0bcf1129..1f0023355 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -34,14 +34,13 @@ import ( // NewLoaderService returns the Loader Service func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, - filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, + filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) *LoaderService { return &LoaderService{ cfg: cfg, dm: dm, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, stopChan: make(chan struct{}), srvIndexer: srvIndexer, @@ -53,7 +52,6 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, type LoaderService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -75,8 +73,11 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er return utils.ErrServiceAlreadyRunning } - ldrs.cl = <-ldrs.clSChan - ldrs.clSChan <- ldrs.cl + cls := ldrs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.CommonListenerS, utils.StateServiceUP) + } + ldrs.cl = cls.CLS() var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil { return diff --git a/services/rankings.go b/services/rankings.go index 499c89e49..40178313c 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -35,7 +35,6 @@ import ( // NewRankingService returns the RankingS Service func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { @@ -43,7 +42,6 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, cfg: cfg, dm: dm, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, @@ -54,7 +52,6 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, type RankingService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -77,8 +74,11 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er } ran.srvDep[utils.DataDB].Add(1) - ran.cl = <-ran.clSChan - ran.clSChan <- ran.cl + cls := ran.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RankingS, utils.CommonListenerS, utils.StateServiceUP) + } + ran.cl = cls.CLS() cacheS := ran.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.RankingS, utils.CacheS, utils.StateServiceUP) diff --git a/services/rates.go b/services/rates.go index fb9cef0d0..c1cdf7945 100644 --- a/services/rates.go +++ b/services/rates.go @@ -34,13 +34,12 @@ import ( // NewRateService constructs RateService func NewRateService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - dmS *DataDBService, clSChan chan *commonlisteners.CommonListenerS, + dmS *DataDBService, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RateService{ cfg: cfg, filterSChan: filterSChan, dmS: dmS, - clSChan: clSChan, rldChan: make(chan struct{}), srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -51,7 +50,6 @@ func NewRateService(cfg *config.CGRConfig, type RateService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dmS *DataDBService filterSChan chan *engine.FilterS @@ -107,8 +105,11 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er return utils.ErrServiceAlreadyRunning } - rs.cl = <-rs.clSChan - rs.clSChan <- rs.cl + cls := rs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RateS, utils.CommonListenerS, utils.StateServiceUP) + } + rs.cl = cls.CLS() cacheS := rs.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.RateS, utils.CacheS, utils.StateServiceUP) diff --git a/services/resources.go b/services/resources.go index f48d15678..a41fab605 100644 --- a/services/resources.go +++ b/services/resources.go @@ -34,7 +34,6 @@ import ( // NewResourceService returns the Resource Service func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { @@ -42,7 +41,6 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, cfg: cfg, dm: dm, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, @@ -54,7 +52,6 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, type ResourceService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -77,8 +74,11 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e } reS.srvDep[utils.DataDB].Add(1) - reS.cl = <-reS.clSChan - reS.clSChan <- reS.cl + cls := reS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.CommonListenerS, utils.StateServiceUP) + } + reS.cl = cls.CLS() cacheS := reS.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.CacheS, utils.StateServiceUP) diff --git a/services/routes.go b/services/routes.go index 3c996d501..a0fee78fd 100644 --- a/services/routes.go +++ b/services/routes.go @@ -35,14 +35,12 @@ import ( // NewRouteService returns the Route Service func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RouteService{ cfg: cfg, dm: dm, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -53,7 +51,6 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, type RouteService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -74,8 +71,11 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e return utils.ErrServiceAlreadyRunning } - routeS.cl = <-routeS.clSChan - routeS.clSChan <- routeS.cl + cls := routeS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RouteS, utils.CommonListenerS, utils.StateServiceUP) + } + routeS.cl = cls.CLS() cacheS := routeS.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.RouteS, utils.CacheS, utils.StateServiceUP) diff --git a/services/sessions.go b/services/sessions.go index 03c5dcbc0..d01035079 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -36,14 +36,12 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &SessionService{ cfg: cfg, dm: dm, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -54,7 +52,6 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha type SessionService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -77,8 +74,11 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) return utils.ErrServiceAlreadyRunning } - smg.cl = <-smg.clSChan - smg.clSChan <- smg.cl + cls := smg.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.SessionS, utils.CommonListenerS, utils.StateServiceUP) + } + smg.cl = cls.CLS() var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil { return diff --git a/services/stats.go b/services/stats.go index 19c8a080d..90d7704fa 100644 --- a/services/stats.go +++ b/services/stats.go @@ -34,7 +34,6 @@ import ( // NewStatService returns the Stat Service func NewStatService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { @@ -42,7 +41,6 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, cfg: cfg, dm: dm, filterSChan: filterSChan, - clSChan: clSChan, connMgr: connMgr, srvDep: srvDep, srvIndexer: srvIndexer, @@ -54,7 +52,6 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, type StatService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -77,8 +74,11 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e } sts.srvDep[utils.DataDB].Add(1) - sts.cl = <-sts.clSChan - sts.clSChan <- sts.cl + cls := sts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.StatS, utils.CommonListenerS, utils.StateServiceUP) + } + sts.cl = cls.CLS() cacheS := sts.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.StatS, utils.CacheS, utils.StateServiceUP) diff --git a/services/thresholds.go b/services/thresholds.go index e7e52b7a3..892f2eb2c 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -35,14 +35,12 @@ import ( func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ThresholdService{ cfg: cfg, dm: dm, filterSChan: filterSChan, - clSChan: clSChan, srvDep: srvDep, connMgr: connMgr, srvIndexer: srvIndexer, @@ -54,7 +52,6 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, type ThresholdService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -77,8 +74,11 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) } thrs.srvDep[utils.DataDB].Add(1) - thrs.cl = <-thrs.clSChan - thrs.clSChan <- thrs.cl + cls := thrs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.CommonListenerS, utils.StateServiceUP) + } + thrs.cl = cls.CLS() cacheS := thrs.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.CacheS, utils.StateServiceUP) diff --git a/services/tpes.go b/services/tpes.go index ae1fb15ba..cb485f6c7 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -34,13 +34,11 @@ import ( // NewTPeService is the constructor for the TpeService func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService, - clSChan chan *commonlisteners.CommonListenerS, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &TPeService{ cfg: cfg, dm: dm, connMgr: connMgr, - clSChan: clSChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -50,8 +48,7 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataD type TPeService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS - dm *DataDBService + dm *DataDBService tpes *tpes.TPeS cl *commonlisteners.CommonListenerS @@ -68,8 +65,11 @@ type TPeService struct { // Start should handle the service start func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { - ts.cl = <-ts.clSChan - ts.clSChan <- ts.cl + cls := ts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.TPeS, utils.CommonListenerS, utils.StateServiceUP) + } + ts.cl = cls.CLS() var datadb *engine.DataManager if datadb, err = ts.dm.WaitForDM(ctx); err != nil { return diff --git a/services/trends.go b/services/trends.go index a75a4e337..f37ec0b01 100644 --- a/services/trends.go +++ b/services/trends.go @@ -34,14 +34,12 @@ import ( // NewTrendsService returns the TrendS Service func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &TrendService{ cfg: cfg, dm: dm, - clSChan: clSChan, connMgr: connMgr, srvDep: srvDep, filterSChan: filterSChan, @@ -53,7 +51,6 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, type TrendService struct { sync.RWMutex - clSChan chan *commonlisteners.CommonListenerS dm *DataDBService filterSChan chan *engine.FilterS @@ -76,8 +73,11 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err } trs.srvDep[utils.DataDB].Add(1) - trs.cl = <-trs.clSChan - trs.clSChan <- trs.cl + cls := trs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.TrendS, utils.CommonListenerS, utils.StateServiceUP) + } + trs.cl = cls.CLS() cacheS := trs.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.TrendS, utils.CacheS, utils.StateServiceUP)