From 0d9358cf300fa4d9963985c9879bdc40ddf82b8f Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 7 Nov 2024 14:49:34 +0200 Subject: [PATCH] Ensure services wait for AnalyzerS to be initiated Same logic as waiting for FilterS/DataDBService. The difference will be that services will only wait when AnalyzerS is enabled. --- services/accounts.go | 5 +++-- services/actions.go | 6 ++++-- services/adminsv1.go | 4 +++- services/analyzers.go | 27 +++++++++++++++++++++++++-- services/attributes.go | 5 +++-- services/caches.go | 3 +++ services/cdrs.go | 4 +++- services/cgr-engine.go | 2 +- services/chargers.go | 5 +++-- services/cores.go | 6 +++++- services/dispatchers.go | 5 +++-- services/ees.go | 3 +++ services/ers.go | 4 ++++ services/globalvars.go | 3 +-- services/loaders.go | 4 ++++ services/rankings.go | 6 +++++- services/rates.go | 5 +++-- services/resources.go | 7 ++++--- services/routes.go | 6 ++++-- services/sessions.go | 5 ++++- services/stats.go | 6 ++++-- services/thresholds.go | 6 ++++-- services/trends.go | 6 +++++- 23 files changed, 101 insertions(+), 32 deletions(-) diff --git a/services/accounts.go b/services/accounts.go index 011ee4f7a..3c1ced45e 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -84,16 +84,17 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e utils.CacheAccountsFilterIndexes); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = acts.dm.WaitForDM(ctx); err != nil { return } + if err = acts.anz.WaitForAnalyzerS(ctx); err != nil { + return + } acts.Lock() defer acts.Unlock() diff --git a/services/actions.go b/services/actions.go index 1b5481bde..7e690c39a 100644 --- a/services/actions.go +++ b/services/actions.go @@ -84,16 +84,18 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er utils.CacheActionProfilesFilterIndexes); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = acts.dm.WaitForDM(ctx); err != nil { return } + if err = acts.anz.WaitForAnalyzerS(ctx); err != nil { + return + } + acts.Lock() defer acts.Unlock() acts.acts = actions.NewActionS(acts.cfg, filterS, datadb, acts.connMgr) diff --git a/services/adminsv1.go b/services/adminsv1.go index 6dcfb5863..340cf4be1 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -81,11 +81,13 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = apiService.dm.WaitForDM(ctx); err != nil { return } + if err = apiService.anz.WaitForAnalyzerS(ctx); err != nil { + return + } storDBChan := make(chan engine.StorDB, 1) apiService.stopChan = make(chan struct{}) diff --git a/services/analyzers.go b/services/analyzers.go index cd1f747e2..43e96a3a4 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -54,7 +54,9 @@ type AnalyzerService struct { ctx *context.Context cancelFunc context.CancelFunc - anz *analyzers.AnalyzerS + anz *analyzers.AnalyzerS + started chan struct{} + connChan chan birpc.ClientConnector srvDep map[string]*sync.WaitGroup } @@ -67,10 +69,12 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun anz.Lock() defer anz.Unlock() + anz.started = make(chan struct{}) if anz.anz, err = analyzers.NewAnalyzerS(anz.cfg); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error())) return } + close(anz.started) anz.ctx, anz.cancelFunc = context.WithCancel(ctx) go func(a *analyzers.AnalyzerS) { if err := a.ListenAndServe(anz.ctx); err != nil { @@ -119,6 +123,12 @@ func (anz *AnalyzerService) Shutdown() (err error) { anz.server.SetAnalyzer(nil) anz.anz.Shutdown() anz.anz = nil + + // Close the channel before making it nil to prevent stale goroutines + // in case there are other services waiting on AnalyzerS. + close(anz.started) + + anz.started = nil <-anz.connChan anz.Unlock() anz.server.RpcUnregisterName(utils.AnalyzerSv1) @@ -129,7 +139,7 @@ func (anz *AnalyzerService) Shutdown() (err error) { func (anz *AnalyzerService) IsRunning() bool { anz.RLock() defer anz.RUnlock() - return anz != nil && anz.anz != nil + return anz.anz != nil } // ServiceName returns the service name @@ -149,3 +159,16 @@ func (anz *AnalyzerService) GetInternalCodec(c birpc.ClientConnector, to string) } return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to) } + +// WaitForAnalyzerS waits for the AnalyzerS structure to be initialized. +func (a *AnalyzerService) WaitForAnalyzerS(ctx *context.Context) error { + if a.started == nil { // AnalyzerS is disabled + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-a.started: + } + return nil +} diff --git a/services/attributes.go b/services/attributes.go index 9d23b198b..8be5153e1 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -79,16 +79,17 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) utils.CacheAttributeFilterIndexes); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, attrS.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = attrS.dm.WaitForDM(ctx); err != nil { return } + if err = attrS.anz.WaitForAnalyzerS(ctx); err != nil { + return + } attrS.Lock() defer attrS.Unlock() diff --git a/services/caches.go b/services/caches.go index 7b4081731..8574345b0 100644 --- a/services/caches.go +++ b/services/caches.go @@ -69,6 +69,9 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e if dm, err = cS.dm.WaitForDM(ctx); err != nil { return } + if err = cS.anz.WaitForAnalyzerS(ctx); err != nil { + return + } var cs *cores.CoreS if cs, err = cS.cores.WaitForCoreS(ctx); err != nil { return diff --git a/services/cdrs.go b/services/cdrs.go index 3cf3cc89c..a253241c1 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -83,11 +83,13 @@ func (cdrSrv *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err if filterS, err = waitForFilterS(ctx, cdrSrv.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = cdrSrv.dm.WaitForDM(ctx); err != nil { return } + if err = cdrSrv.anz.WaitForAnalyzerS(ctx); err != nil { + return + } storDBChan := make(chan engine.StorDB, 1) cdrSrv.stopChan = make(chan struct{}) diff --git a/services/cgr-engine.go b/services/cgr-engine.go index 36b5eb667..ec8226127 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -102,7 +102,7 @@ type CGREngine struct { cpuPrfF *os.File // services - gvS servmanager.Service + gvS *GlobalVarS dmS *DataDBService sdbS *StorDBService anzS *AnalyzerService diff --git a/services/chargers.go b/services/chargers.go index c00300955..132b94e52 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -77,16 +77,17 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e utils.CacheChargerFilterIndexes); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, chrS.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = chrS.dm.WaitForDM(ctx); err != nil { return } + if err = chrS.anz.WaitForAnalyzerS(ctx); err != nil { + return + } chrS.Lock() defer chrS.Unlock() diff --git a/services/cores.go b/services/cores.go index 6e0d767f1..ab4180bd8 100644 --- a/services/cores.go +++ b/services/cores.go @@ -67,11 +67,15 @@ type CoreService struct { } // Start should handle the service start -func (cS *CoreService) Start(_ *context.Context, shtDw context.CancelFunc) error { +func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) error { if cS.IsRunning() { return utils.ErrServiceAlreadyRunning } + if err := cS.anz.WaitForAnalyzerS(ctx); err != nil { + return err + } + cS.mu.Lock() defer cS.mu.Unlock() utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS)) diff --git a/services/dispatchers.go b/services/dispatchers.go index 209881ab1..e574a14b9 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -80,16 +80,17 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) utils.CacheDispatcherFilterIndexes); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, dspS.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = dspS.dm.WaitForDM(ctx); err != nil { return } + if err = dspS.anz.WaitForAnalyzerS(ctx); err != nil { + return + } dspS.Lock() defer dspS.Unlock() diff --git a/services/ees.go b/services/ees.go index e156c5791..fb136a8b6 100644 --- a/services/ees.go +++ b/services/ees.go @@ -109,6 +109,9 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc if err != nil { return err } + if err := es.anz.WaitForAnalyzerS(ctx); err != nil { + return err + } utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EEs)) diff --git a/services/ers.go b/services/ers.go index e4ffd453a..ca9248073 100644 --- a/services/ers.go +++ b/services/ers.go @@ -74,10 +74,14 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel if erS.IsRunning() { return utils.ErrServiceAlreadyRunning } + var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil { return } + if err = erS.anz.WaitForAnalyzerS(ctx); err != nil { + return + } erS.Lock() defer erS.Unlock() diff --git a/services/globalvars.go b/services/globalvars.go index b19cb5993..ca45b87c4 100644 --- a/services/globalvars.go +++ b/services/globalvars.go @@ -25,13 +25,12 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewGlobalVarS . func NewGlobalVarS(cfg *config.CGRConfig, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup) *GlobalVarS { return &GlobalVarS{ cfg: cfg, srvDep: srvDep, diff --git a/services/loaders.go b/services/loaders.go index 448437495..d9bfeca50 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -71,6 +71,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er if ldrs.IsRunning() { return utils.ErrServiceAlreadyRunning } + var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil { return @@ -79,6 +80,9 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er if datadb, err = ldrs.dm.WaitForDM(ctx); err != nil { return } + if err = ldrs.anz.WaitForAnalyzerS(ctx); err != nil { + return + } ldrs.Lock() defer ldrs.Unlock() diff --git a/services/rankings.go b/services/rankings.go index a77a1fcd1..e2ddb6a84 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -70,6 +70,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er if ran.IsRunning() { return utils.ErrServiceAlreadyRunning } + ran.srvDep[utils.DataDB].Add(1) if err = ran.cacheS.WaitToPrecache(ctx, utils.CacheRankingProfiles, @@ -81,11 +82,14 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er if datadb, err = ran.dm.WaitForDM(ctx); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, ran.filterSChan); err != nil { return } + if err = ran.anz.WaitForAnalyzerS(ctx); err != nil { + return + } + ran.Lock() defer ran.Unlock() ran.ran = engine.NewRankingS(datadb, ran.connMgr, filterS, ran.cfg) diff --git a/services/rates.go b/services/rates.go index 19f57d0ee..49c057050 100644 --- a/services/rates.go +++ b/services/rates.go @@ -116,16 +116,17 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er utils.CacheRateFilterIndexes); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, rs.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = rs.dmS.WaitForDM(ctx); err != nil { return } + if err = rs.anz.WaitForAnalyzerS(ctx); err != nil { + return + } rs.Lock() rs.rateS = rates.NewRateS(rs.cfg, filterS, datadb) diff --git a/services/resources.go b/services/resources.go index 5da084676..4ca197e0f 100644 --- a/services/resources.go +++ b/services/resources.go @@ -71,24 +71,25 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e if reS.IsRunning() { return utils.ErrServiceAlreadyRunning } - reS.srvDep[utils.DataDB].Add(1) + reS.srvDep[utils.DataDB].Add(1) if err = reS.cacheS.WaitToPrecache(ctx, utils.CacheResourceProfiles, utils.CacheResources, utils.CacheResourceFilterIndexes); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, reS.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = reS.dm.WaitForDM(ctx); err != nil { return } + if err = reS.anz.WaitForAnalyzerS(ctx); err != nil { + return + } reS.Lock() defer reS.Unlock() diff --git a/services/routes.go b/services/routes.go index 759d20568..7518277de 100644 --- a/services/routes.go +++ b/services/routes.go @@ -78,16 +78,18 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e utils.CacheRouteFilterIndexes); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, routeS.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = routeS.dm.WaitForDM(ctx); err != nil { return } + if err = routeS.anz.WaitForAnalyzerS(ctx); err != nil { + return + } + routeS.Lock() defer routeS.Unlock() routeS.routeS = engine.NewRouteService(datadb, filterS, routeS.cfg, routeS.connMgr) diff --git a/services/sessions.go b/services/sessions.go index 42fe68fca..1584a4e46 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -80,11 +80,14 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = smg.dm.WaitForDM(ctx); err != nil { return } + if err = smg.anz.WaitForAnalyzerS(ctx); err != nil { + return + } + smg.Lock() defer smg.Unlock() diff --git a/services/stats.go b/services/stats.go index 1d382a969..ee08127f2 100644 --- a/services/stats.go +++ b/services/stats.go @@ -71,6 +71,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e if sts.IsRunning() { return utils.ErrServiceAlreadyRunning } + sts.srvDep[utils.DataDB].Add(1) if err = sts.cacheS.WaitToPrecache(ctx, utils.CacheStatQueueProfiles, @@ -78,16 +79,17 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e utils.CacheStatFilterIndexes); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, sts.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = sts.dm.WaitForDM(ctx); err != nil { return } + if err = sts.anz.WaitForAnalyzerS(ctx); err != nil { + return + } sts.Lock() defer sts.Unlock() diff --git a/services/thresholds.go b/services/thresholds.go index 764efe32e..61a5ad8f7 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -71,6 +71,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) if thrs.IsRunning() { return utils.ErrServiceAlreadyRunning } + thrs.srvDep[utils.DataDB].Add(1) if err = thrs.cacheS.WaitToPrecache(ctx, utils.CacheThresholdProfiles, @@ -78,16 +79,17 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) utils.CacheThresholdFilterIndexes); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, thrs.filterSChan); err != nil { return } - var datadb *engine.DataManager if datadb, err = thrs.dm.WaitForDM(ctx); err != nil { return } + if err = thrs.anz.WaitForAnalyzerS(ctx); err != nil { + return + } thrs.Lock() defer thrs.Unlock() diff --git a/services/trends.go b/services/trends.go index 7516bb97c..cd066bad1 100644 --- a/services/trends.go +++ b/services/trends.go @@ -68,6 +68,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err if trs.IsRunning() { return utils.ErrServiceAlreadyRunning } + trs.srvDep[utils.DataDB].Add(1) if err = trs.cacheS.WaitToPrecache(ctx, utils.CacheTrendProfiles, @@ -79,11 +80,14 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err if datadb, err = trs.dm.WaitForDM(ctx); err != nil { return } - var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, trs.filterSChan); err != nil { return } + if err = trs.anz.WaitForAnalyzerS(ctx); err != nil { + return + } + trs.Lock() defer trs.Unlock() trs.trs = engine.NewTrendService(datadb, trs.cfg, filterS, trs.connMgr)