diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 842b39e60..dabc6d1f5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -135,7 +135,6 @@ func runCGREngine(fs []string) (err error) { connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, iGuardianSCh) clsCh := make(chan *commonlisteners.CommonListenerS, 1) - anzCh := make(chan *services.AnalyzerService, 1) iFilterSCh := make(chan *engine.FilterS, 1) // ServiceIndexer will share service references to all services @@ -144,23 +143,23 @@ func runCGREngine(fs []string) (err error) { dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep, srvIdxr) sdbS := services.NewStorDBService(cfg, *flags.SetVersions, srvIdxr) cls := services.NewCommonListenerService(cfg, caps, clsCh, srvIdxr) - anzS := services.NewAnalyzerService(cfg, clsCh, iFilterSCh, anzCh, srvIdxr) - coreS := services.NewCoreService(cfg, caps, clsCh, anzCh, cpuPrfF, shdWg, srvIdxr) - cacheS := services.NewCacheService(cfg, dmS, connMgr, clsCh, anzCh, coreS, srvIdxr) - dspS := services.NewDispatcherService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr) - ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr) + anzS := services.NewAnalyzerService(cfg, clsCh, iFilterSCh, srvIdxr) + coreS := services.NewCoreService(cfg, caps, clsCh, cpuPrfF, shdWg, srvIdxr) + cacheS := services.NewCacheService(cfg, dmS, connMgr, clsCh, coreS, srvIdxr) + dspS := services.NewDispatcherService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvIdxr) + ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr) efs := services.NewExportFailoverService(cfg, connMgr, clsCh, srvIdxr) - adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr) - sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr) - attrS := services.NewAttributeService(cfg, dmS, cacheS, iFilterSCh, clsCh, anzCh, dspS, srvIdxr) - chrgS := services.NewChargerService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr) - routeS := services.NewRouteService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr) - resourceS := services.NewResourceService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) - trendS := services.NewTrendService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) - rankingS := services.NewRankingService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) - thS := services.NewThresholdService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, anzCh, srvDep, srvIdxr) - stS := services.NewStatService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr) - erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, clsCh, anzCh, srvIdxr) + adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, srvIdxr) + sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr) + attrS := services.NewAttributeService(cfg, dmS, cacheS, iFilterSCh, clsCh, dspS, srvIdxr) + chrgS := services.NewChargerService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvIdxr) + routeS := services.NewRouteService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvIdxr) + resourceS := services.NewResourceService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr) + trendS := services.NewTrendService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr) + rankingS := services.NewRankingService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr) + thS := services.NewThresholdService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, srvDep, srvIdxr) + stS := services.NewStatService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr) + erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, clsCh, srvIdxr) dnsAgent := services.NewDNSAgent(cfg, iFilterSCh, connMgr, srvIdxr) fsAgent := services.NewFreeswitchAgent(cfg, connMgr, srvIdxr) kamAgent := services.NewKamailioAgent(cfg, connMgr, srvIdxr) @@ -170,12 +169,12 @@ func runCGREngine(fs []string) (err error) { diamAgent := services.NewDiameterAgent(cfg, iFilterSCh, connMgr, caps, srvIdxr) httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, clsCh, connMgr, srvIdxr) sipAgent := services.NewSIPAgent(cfg, iFilterSCh, connMgr, srvIdxr) - eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, clsCh, anzCh, srvIdxr) - cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr) + eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, clsCh, srvIdxr) + cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, srvIdxr) registrarcS := services.NewRegistrarCService(cfg, connMgr, srvIdxr) - rateS := services.NewRateService(cfg, cacheS, iFilterSCh, dmS, clsCh, anzCh, srvIdxr) - actionS := services.NewActionService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, anzCh, srvIdxr) - accS := services.NewAccountService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, anzCh, srvIdxr) + rateS := services.NewRateService(cfg, cacheS, iFilterSCh, dmS, clsCh, srvIdxr) + actionS := services.NewActionService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, srvIdxr) + accS := services.NewAccountService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, srvIdxr) tpeS := services.NewTPeService(cfg, connMgr, dmS, clsCh, srvIdxr) srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{ @@ -294,7 +293,7 @@ func runCGREngine(fs []string) (err error) { return } } else { - anzCh <- anzS + close(anzS.StateChan(utils.StateServiceUP)) } shdWg.Add(1) @@ -339,7 +338,6 @@ func runCGREngine(fs []string) (err error) { } <-ctx.Done() - //<-stopChan return } @@ -464,7 +462,7 @@ func handleSignals(ctx *context.Context, shutdown context.CancelFunc, shdWg.Done() return case <-reloadSignal: - // do it in it's own goroutine in order to not block the signal handler with the reload functionality + // do it in its own goroutine in order to not block the signal handler with the reload functionality go func() { var reply string if err := cfg.V1ReloadConfig(ctx, diff --git a/services/accounts.go b/services/accounts.go index 2437bdc6f..f9637ca08 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -38,7 +38,6 @@ import ( func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, - anzChan chan *AnalyzerService, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AccountService{ cfg: cfg, @@ -47,7 +46,6 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, connMgr: connMgr, clSChan: clSChan, - anzChan: anzChan, rldChan: make(chan struct{}, 1), srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -61,7 +59,6 @@ type AccountService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService cacheS *CacheService - anzChan chan *AnalyzerService filterSChan chan *engine.FilterS acts *accounts.AccountS @@ -97,8 +94,10 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e if datadb, err = acts.dm.WaitForDM(ctx); err != nil { return } - anz := <-acts.anzChan - acts.anzChan <- anz + anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AccountS, utils.AnalyzerS, utils.StateServiceUP) + } acts.Lock() defer acts.Unlock() diff --git a/services/actions.go b/services/actions.go index 3a50d71f9..b2199a674 100644 --- a/services/actions.go +++ b/services/actions.go @@ -39,7 +39,6 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, - anzChan chan *AnalyzerService, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ActionService{ connMgr: connMgr, @@ -48,7 +47,6 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService, cacheS: cacheS, filterSChan: filterSChan, clSChan: clSChan, - anzChan: anzChan, rldChan: make(chan struct{}, 1), srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -61,7 +59,6 @@ type ActionService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService - anzChan chan *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS @@ -100,8 +97,10 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er if datadb, err = acts.dm.WaitForDM(ctx); err != nil { return } - anz := <-acts.anzChan - acts.anzChan <- anz + anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ActionS, utils.AnalyzerS, utils.StateServiceUP) + } acts.Lock() defer acts.Unlock() diff --git a/services/adminsv1.go b/services/adminsv1.go index d05bee1bd..f0f576946 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -35,7 +35,7 @@ import ( func NewAdminSv1Service(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - connMgr *engine.ConnManager, anzChan chan *AnalyzerService, + connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AdminSv1Service{ cfg: cfg, @@ -44,7 +44,6 @@ func NewAdminSv1Service(cfg *config.CGRConfig, filterSChan: filterSChan, clSChan: clSChan, connMgr: connMgr, - anzChan: anzChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -57,7 +56,6 @@ type AdminSv1Service struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService storDB *StorDBService - anzChan chan *AnalyzerService filterSChan chan *engine.FilterS api *apis.AdminSv1 @@ -89,8 +87,10 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF if datadb, err = apiService.dm.WaitForDM(ctx); err != nil { return } - anz := <-apiService.anzChan - apiService.anzChan <- anz + anz := apiService.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AdminS, utils.AnalyzerS, utils.StateServiceUP) + } storDBChan := make(chan engine.StorDB, 1) apiService.stopChan = make(chan struct{}) diff --git a/services/analyzers.go b/services/analyzers.go index 802a5f300..2bec8ba7f 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -35,13 +35,11 @@ import ( // NewAnalyzerService returns the Analyzer Service func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, filterSChan chan *engine.FilterS, - anzChan chan *AnalyzerService, srvIndexer *servmanager.ServiceIndexer) *AnalyzerService { return &AnalyzerService{ cfg: cfg, clSChan: clSChan, filterSChan: filterSChan, - anzChan: anzChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -53,7 +51,6 @@ type AnalyzerService struct { clSChan chan *commonlisteners.CommonListenerS filterSChan chan *engine.FilterS - anzChan chan *AnalyzerService anz *analyzers.AnalyzerS cl *commonlisteners.CommonListenerS @@ -82,7 +79,6 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error())) return } - anz.anzChan <- anz anzCtx, cancel := context.WithCancel(ctx) anz.cancelFunc = cancel go func(a *analyzers.AnalyzerS) { @@ -129,12 +125,6 @@ func (anz *AnalyzerService) Shutdown() (err error) { anz.Lock() anz.cancelFunc() anz.cl.SetAnalyzer(nil) - - // Close the channel before making it nil to prevent stale goroutines - // in case there are other services waiting on AnalyzerS. - close(anz.anzChan) - - anz.anzChan = nil anz.anz.Shutdown() anz.anz = nil anz.Unlock() diff --git a/services/attributes.go b/services/attributes.go index c05790972..eccbcea75 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -36,7 +36,7 @@ import ( func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - anzChan chan *AnalyzerService, dspS *DispatcherService, + dspS *DispatcherService, sIndxr *servmanager.ServiceIndexer) servmanager.Service { return &AttributeService{ cfg: cfg, @@ -44,7 +44,6 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, cacheS: cacheS, filterSChan: filterSChan, clSChan: clSChan, - anzChan: anzChan, dspS: dspS, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), serviceIndexer: sIndxr, @@ -57,7 +56,6 @@ type AttributeService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService - anzChan chan *AnalyzerService cacheS *CacheService dspS *DispatcherService filterSChan chan *engine.FilterS @@ -98,8 +96,10 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) if datadb, err = attrS.dm.WaitForDM(ctx); err != nil { return } - anz := <-attrS.anzChan - attrS.anzChan <- anz + anz := attrS.serviceIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.AnalyzerS, utils.StateServiceUP) + } attrS.Lock() defer attrS.Unlock() diff --git a/services/caches.go b/services/caches.go index 85190fd05..66ff97bf1 100644 --- a/services/caches.go +++ b/services/caches.go @@ -32,12 +32,10 @@ import ( // NewCacheService . func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, - anzChan chan *AnalyzerService, // dspS *DispatcherService, cores *CoreService, srvIndexer *servmanager.ServiceIndexer) *CacheService { return &CacheService{ cfg: cfg, - anzChan: anzChan, cores: cores, clSChan: clSChan, dm: dm, @@ -50,7 +48,6 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C // CacheService implements Agent interface type CacheService struct { - anzChan chan *AnalyzerService cores *CoreService clSChan chan *commonlisteners.CommonListenerS dm *DataDBService @@ -75,8 +72,10 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e return } - anz := <-cS.anzChan - cS.anzChan <- anz + anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.CacheS, utils.AnalyzerS, utils.StateServiceUP) + } var cs *cores.CoreS if cs, err = cS.cores.WaitForCoreS(ctx); err != nil { return diff --git a/services/cdrs.go b/services/cdrs.go index 194e70aa2..8451a4d72 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -37,7 +37,7 @@ import ( func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - connMgr *engine.ConnManager, anzChan chan *AnalyzerService, + connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &CDRService{ cfg: cfg, @@ -46,7 +46,6 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, clSChan: clSChan, connMgr: connMgr, - anzChan: anzChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -59,7 +58,6 @@ type CDRService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService storDB *StorDBService - anzChan chan *AnalyzerService filterSChan chan *engine.FilterS cdrS *cdrs.CDRServer @@ -92,8 +90,10 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err if datadb, err = cs.dm.WaitForDM(ctx); err != nil { return } - anz := <-cs.anzChan - cs.anzChan <- anz + anz := cs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.CDRs, utils.AnalyzerS, utils.StateServiceUP) + } storDBChan := make(chan engine.StorDB, 1) cs.stopChan = make(chan struct{}) diff --git a/services/chargers.go b/services/chargers.go index d12318514..26b1fcc26 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -36,7 +36,6 @@ import ( func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, - anzChan chan *AnalyzerService, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ChargerService{ cfg: cfg, @@ -45,7 +44,6 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, clSChan: clSChan, connMgr: connMgr, - anzChan: anzChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -58,7 +56,6 @@ type ChargerService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService cacheS *CacheService - anzChan chan *AnalyzerService filterSChan chan *engine.FilterS chrS *engine.ChargerS @@ -93,8 +90,10 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e if datadb, err = chrS.dm.WaitForDM(ctx); err != nil { return } - anz := <-chrS.anzChan - chrS.anzChan <- anz + anz := chrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.AnalyzerS, utils.StateServiceUP) + } chrS.Lock() defer chrS.Unlock() diff --git a/services/cores.go b/services/cores.go index fc953deb8..d9dcba3b0 100644 --- a/services/cores.go +++ b/services/cores.go @@ -35,7 +35,6 @@ import ( // NewCoreService returns the Core Service func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS, - anzChan chan *AnalyzerService, fileCPU *os.File, shdWg *sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) *CoreService { return &CoreService{ @@ -44,7 +43,6 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *comm caps: caps, fileCPU: fileCPU, clSChan: clSChan, - anzChan: anzChan, csCh: make(chan *cores.CoreS, 1), srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -55,7 +53,6 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *comm type CoreService struct { mu sync.RWMutex - anzChan chan *AnalyzerService clSChan chan *commonlisteners.CommonListenerS cS *cores.CoreS @@ -81,8 +78,10 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err cS.cl = <-cS.clSChan cS.clSChan <- cS.cl - anz := <-cS.anzChan - cS.anzChan <- anz + anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.CoreS, utils.AnalyzerS, utils.StateServiceUP) + } cS.mu.Lock() defer cS.mu.Unlock() diff --git a/services/dispatchers.go b/services/dispatchers.go index 1eee9d444..4482ff1ee 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -35,7 +35,7 @@ import ( func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - connMgr *engine.ConnManager, anzChan chan *AnalyzerService, + connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) *DispatcherService { return &DispatcherService{ cfg: cfg, @@ -44,7 +44,6 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, clSChan: clSChan, connMgr: connMgr, - anzChan: anzChan, srvsReload: make(map[string]chan struct{}), srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -57,7 +56,6 @@ type DispatcherService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService - anzChan chan *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS @@ -95,8 +93,10 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) if datadb, err = dspS.dm.WaitForDM(ctx); err != nil { return } - anz := <-dspS.anzChan - dspS.anzChan <- anz + anz := dspS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.AnalyzerS, utils.StateServiceUP) + } dspS.Lock() defer dspS.Unlock() diff --git a/services/ees.go b/services/ees.go index 881c9291a..1fbcf8969 100644 --- a/services/ees.go +++ b/services/ees.go @@ -35,14 +35,12 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, - anzChan chan *AnalyzerService, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &EventExporterService{ cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, clSChan: clSChan, - anzChan: anzChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -53,7 +51,6 @@ type EventExporterService struct { mu sync.RWMutex clSChan chan *commonlisteners.CommonListenerS - anzChan chan *AnalyzerService filterSChan chan *engine.FilterS eeS *ees.EeS @@ -115,8 +112,10 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc if err != nil { return err } - anz := <-es.anzChan - es.anzChan <- anz + anz := es.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.EEs, utils.AnalyzerS, utils.StateServiceUP) + } utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EEs)) diff --git a/services/ers.go b/services/ers.go index 8618cce9d..e4f77121f 100644 --- a/services/ers.go +++ b/services/ers.go @@ -38,7 +38,6 @@ func NewEventReaderService( filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, - anzChan chan *AnalyzerService, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &EventReaderService{ rldChan: make(chan struct{}, 1), @@ -46,7 +45,6 @@ func NewEventReaderService( filterSChan: filterSChan, connMgr: connMgr, clSChan: clSChan, - anzChan: anzChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -57,7 +55,6 @@ type EventReaderService struct { sync.RWMutex clSChan chan *commonlisteners.CommonListenerS - anzChan chan *AnalyzerService filterSChan chan *engine.FilterS ers *ers.ERService @@ -85,8 +82,10 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil { return } - anz := <-erS.anzChan - erS.anzChan <- anz + anz := erS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ERs, utils.AnalyzerS, utils.StateServiceUP) + } erS.Lock() defer erS.Unlock() diff --git a/services/loaders.go b/services/loaders.go index b9022b565..e0bcf1129 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -35,7 +35,7 @@ import ( // NewLoaderService returns the Loader Service func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - connMgr *engine.ConnManager, anzChan chan *AnalyzerService, + connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) *LoaderService { return &LoaderService{ cfg: cfg, @@ -44,7 +44,6 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, clSChan: clSChan, connMgr: connMgr, stopChan: make(chan struct{}), - anzChan: anzChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -56,7 +55,6 @@ type LoaderService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService - anzChan chan *AnalyzerService filterSChan chan *engine.FilterS ldrs *loaders.LoaderS @@ -87,8 +85,10 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er if datadb, err = ldrs.dm.WaitForDM(ctx); err != nil { return } - anz := <-ldrs.anzChan - ldrs.anzChan <- anz + anz := ldrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.AnalyzerS, utils.StateServiceUP) + } ldrs.Lock() defer ldrs.Unlock() diff --git a/services/rankings.go b/services/rankings.go index 4e335fe64..bac3ec673 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -36,7 +36,7 @@ import ( func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - connMgr *engine.ConnManager, anzChan chan *AnalyzerService, + connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RankingService{ @@ -46,7 +46,6 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, clSChan: clSChan, connMgr: connMgr, - anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -58,7 +57,6 @@ type RankingService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService - anzChan chan *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS @@ -97,8 +95,10 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er if filterS, err = waitForFilterS(ctx, ran.filterSChan); err != nil { return } - anz := <-ran.anzChan - ran.anzChan <- anz + anz := ran.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RankingS, utils.AnalyzerS, utils.StateServiceUP) + } ran.Lock() defer ran.Unlock() diff --git a/services/rates.go b/services/rates.go index 890751ffd..26cd6592a 100644 --- a/services/rates.go +++ b/services/rates.go @@ -35,7 +35,6 @@ import ( func NewRateService(cfg *config.CGRConfig, cacheS *CacheService, filterSChan chan *engine.FilterS, dmS *DataDBService, clSChan chan *commonlisteners.CommonListenerS, - anzChan chan *AnalyzerService, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RateService{ cfg: cfg, @@ -44,7 +43,6 @@ func NewRateService(cfg *config.CGRConfig, dmS: dmS, clSChan: clSChan, rldChan: make(chan struct{}), - anzChan: anzChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -55,7 +53,6 @@ type RateService struct { sync.RWMutex clSChan chan *commonlisteners.CommonListenerS - anzChan chan *AnalyzerService dmS *DataDBService cacheS *CacheService filterSChan chan *engine.FilterS @@ -128,8 +125,10 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er if datadb, err = rs.dmS.WaitForDM(ctx); err != nil { return } - anz := <-rs.anzChan - rs.anzChan <- anz + anz := rs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RateS, utils.AnalyzerS, utils.StateServiceUP) + } rs.Lock() rs.rateS = rates.NewRateS(rs.cfg, filterS, datadb) diff --git a/services/resources.go b/services/resources.go index d512ac7ad..70f73b7e5 100644 --- a/services/resources.go +++ b/services/resources.go @@ -35,7 +35,7 @@ import ( func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - connMgr *engine.ConnManager, anzChan chan *AnalyzerService, + connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ResourceService{ @@ -45,7 +45,6 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, clSChan: clSChan, connMgr: connMgr, - anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -58,7 +57,6 @@ type ResourceService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService - anzChan chan *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS @@ -97,8 +95,10 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e if datadb, err = reS.dm.WaitForDM(ctx); err != nil { return } - anz := <-reS.anzChan - reS.anzChan <- anz + anz := reS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.AnalyzerS, utils.StateServiceUP) + } reS.Lock() defer reS.Unlock() diff --git a/services/routes.go b/services/routes.go index 7031e69c3..a9c8c34df 100644 --- a/services/routes.go +++ b/services/routes.go @@ -36,7 +36,7 @@ import ( func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - connMgr *engine.ConnManager, anzChan chan *AnalyzerService, + connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RouteService{ cfg: cfg, @@ -45,7 +45,6 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, clSChan: clSChan, connMgr: connMgr, - anzChan: anzChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -57,7 +56,6 @@ type RouteService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService - anzChan chan *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS @@ -93,8 +91,10 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e if datadb, err = routeS.dm.WaitForDM(ctx); err != nil { return } - anz := <-routeS.anzChan - routeS.anzChan <- anz + anz := routeS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RouteS, utils.AnalyzerS, utils.StateServiceUP) + } routeS.Lock() defer routeS.Unlock() diff --git a/services/sessions.go b/services/sessions.go index 008235796..03c5dcbc0 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -37,7 +37,7 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - connMgr *engine.ConnManager, anzChan chan *AnalyzerService, + connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &SessionService{ cfg: cfg, @@ -45,7 +45,6 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha filterSChan: filterSChan, clSChan: clSChan, connMgr: connMgr, - anzChan: anzChan, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } @@ -57,7 +56,6 @@ type SessionService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService - anzChan chan *AnalyzerService filterSChan chan *engine.FilterS sm *sessions.SessionS @@ -89,8 +87,10 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) if datadb, err = smg.dm.WaitForDM(ctx); err != nil { return } - anz := <-smg.anzChan - smg.anzChan <- anz + anz := smg.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.SessionS, utils.AnalyzerS, utils.StateServiceUP) + } smg.Lock() defer smg.Unlock() diff --git a/services/stats.go b/services/stats.go index 06b613347..1f519fad8 100644 --- a/services/stats.go +++ b/services/stats.go @@ -35,7 +35,7 @@ import ( func NewStatService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - connMgr *engine.ConnManager, anzChan chan *AnalyzerService, + connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &StatService{ @@ -45,7 +45,6 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, clSChan: clSChan, connMgr: connMgr, - anzChan: anzChan, srvDep: srvDep, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -58,7 +57,6 @@ type StatService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService - anzChan chan *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS @@ -97,8 +95,10 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e if datadb, err = sts.dm.WaitForDM(ctx); err != nil { return } - anz := <-sts.anzChan - sts.anzChan <- anz + anz := sts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.StatS, utils.AnalyzerS, utils.StateServiceUP) + } sts.Lock() defer sts.Unlock() diff --git a/services/thresholds.go b/services/thresholds.go index 279c01eb7..37165f72b 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -36,7 +36,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, - anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, + srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ThresholdService{ cfg: cfg, @@ -44,7 +44,6 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, cacheS: cacheS, filterSChan: filterSChan, clSChan: clSChan, - anzChan: anzChan, srvDep: srvDep, connMgr: connMgr, srvIndexer: srvIndexer, @@ -58,7 +57,6 @@ type ThresholdService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService - anzChan chan *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS @@ -97,8 +95,10 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) if datadb, err = thrs.dm.WaitForDM(ctx); err != nil { return } - anz := <-thrs.anzChan - thrs.anzChan <- anz + anz := thrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.AnalyzerS, utils.StateServiceUP) + } thrs.Lock() defer thrs.Unlock() diff --git a/services/trends.go b/services/trends.go index 41baf00f3..0aa54ee1a 100644 --- a/services/trends.go +++ b/services/trends.go @@ -35,7 +35,7 @@ import ( func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, - connMgr *engine.ConnManager, anzChan chan *AnalyzerService, + connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &TrendService{ @@ -44,7 +44,6 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, cacheS: cacheS, clSChan: clSChan, connMgr: connMgr, - anzChan: anzChan, srvDep: srvDep, filterSChan: filterSChan, srvIndexer: srvIndexer, @@ -57,7 +56,6 @@ type TrendService struct { clSChan chan *commonlisteners.CommonListenerS dm *DataDBService - anzChan chan *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS @@ -96,8 +94,10 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err if filterS, err = waitForFilterS(ctx, trs.filterSChan); err != nil { return } - anz := <-trs.anzChan - trs.anzChan <- anz + anz := trs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.TrendS, utils.AnalyzerS, utils.StateServiceUP) + } trs.Lock() defer trs.Unlock()