Ensure services wait for AnalyzerS to be initiated

Same logic as waiting for FilterS/DataDBService.
The difference will be that services will only wait when AnalyzerS
is enabled.
This commit is contained in:
ionutboangiu
2024-11-07 14:49:34 +02:00
committed by Dan Christian Bogos
parent b7038dd61e
commit 0d9358cf30
23 changed files with 101 additions and 32 deletions

View File

@@ -84,16 +84,17 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
utils.CacheAccountsFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = acts.dm.WaitForDM(ctx); err != nil {
return
}
if err = acts.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
acts.Lock()
defer acts.Unlock()

View File

@@ -84,16 +84,18 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
utils.CacheActionProfilesFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = acts.dm.WaitForDM(ctx); err != nil {
return
}
if err = acts.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
acts.Lock()
defer acts.Unlock()
acts.acts = actions.NewActionS(acts.cfg, filterS, datadb, acts.connMgr)

View File

@@ -81,11 +81,13 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = apiService.dm.WaitForDM(ctx); err != nil {
return
}
if err = apiService.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
storDBChan := make(chan engine.StorDB, 1)
apiService.stopChan = make(chan struct{})

View File

@@ -54,7 +54,9 @@ type AnalyzerService struct {
ctx *context.Context
cancelFunc context.CancelFunc
anz *analyzers.AnalyzerS
anz *analyzers.AnalyzerS
started chan struct{}
connChan chan birpc.ClientConnector
srvDep map[string]*sync.WaitGroup
}
@@ -67,10 +69,12 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun
anz.Lock()
defer anz.Unlock()
anz.started = make(chan struct{})
if anz.anz, err = analyzers.NewAnalyzerS(anz.cfg); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error()))
return
}
close(anz.started)
anz.ctx, anz.cancelFunc = context.WithCancel(ctx)
go func(a *analyzers.AnalyzerS) {
if err := a.ListenAndServe(anz.ctx); err != nil {
@@ -119,6 +123,12 @@ func (anz *AnalyzerService) Shutdown() (err error) {
anz.server.SetAnalyzer(nil)
anz.anz.Shutdown()
anz.anz = nil
// Close the channel before making it nil to prevent stale goroutines
// in case there are other services waiting on AnalyzerS.
close(anz.started)
anz.started = nil
<-anz.connChan
anz.Unlock()
anz.server.RpcUnregisterName(utils.AnalyzerSv1)
@@ -129,7 +139,7 @@ func (anz *AnalyzerService) Shutdown() (err error) {
func (anz *AnalyzerService) IsRunning() bool {
anz.RLock()
defer anz.RUnlock()
return anz != nil && anz.anz != nil
return anz.anz != nil
}
// ServiceName returns the service name
@@ -149,3 +159,16 @@ func (anz *AnalyzerService) GetInternalCodec(c birpc.ClientConnector, to string)
}
return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to)
}
// WaitForAnalyzerS waits for the AnalyzerS structure to be initialized.
func (a *AnalyzerService) WaitForAnalyzerS(ctx *context.Context) error {
if a.started == nil { // AnalyzerS is disabled
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-a.started:
}
return nil
}

View File

@@ -79,16 +79,17 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
utils.CacheAttributeFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, attrS.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = attrS.dm.WaitForDM(ctx); err != nil {
return
}
if err = attrS.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
attrS.Lock()
defer attrS.Unlock()

View File

@@ -69,6 +69,9 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e
if dm, err = cS.dm.WaitForDM(ctx); err != nil {
return
}
if err = cS.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
var cs *cores.CoreS
if cs, err = cS.cores.WaitForCoreS(ctx); err != nil {
return

View File

@@ -83,11 +83,13 @@ func (cdrSrv *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err
if filterS, err = waitForFilterS(ctx, cdrSrv.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = cdrSrv.dm.WaitForDM(ctx); err != nil {
return
}
if err = cdrSrv.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
storDBChan := make(chan engine.StorDB, 1)
cdrSrv.stopChan = make(chan struct{})

View File

@@ -102,7 +102,7 @@ type CGREngine struct {
cpuPrfF *os.File
// services
gvS servmanager.Service
gvS *GlobalVarS
dmS *DataDBService
sdbS *StorDBService
anzS *AnalyzerService

View File

@@ -77,16 +77,17 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
utils.CacheChargerFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, chrS.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = chrS.dm.WaitForDM(ctx); err != nil {
return
}
if err = chrS.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
chrS.Lock()
defer chrS.Unlock()

View File

@@ -67,11 +67,15 @@ type CoreService struct {
}
// Start should handle the service start
func (cS *CoreService) Start(_ *context.Context, shtDw context.CancelFunc) error {
func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) error {
if cS.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
if err := cS.anz.WaitForAnalyzerS(ctx); err != nil {
return err
}
cS.mu.Lock()
defer cS.mu.Unlock()
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS))

View File

@@ -80,16 +80,17 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
utils.CacheDispatcherFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, dspS.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = dspS.dm.WaitForDM(ctx); err != nil {
return
}
if err = dspS.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
dspS.Lock()
defer dspS.Unlock()

View File

@@ -109,6 +109,9 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
if err != nil {
return err
}
if err := es.anz.WaitForAnalyzerS(ctx); err != nil {
return err
}
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EEs))

View File

@@ -74,10 +74,14 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel
if erS.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil {
return
}
if err = erS.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
erS.Lock()
defer erS.Unlock()

View File

@@ -25,13 +25,12 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewGlobalVarS .
func NewGlobalVarS(cfg *config.CGRConfig,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup) *GlobalVarS {
return &GlobalVarS{
cfg: cfg,
srvDep: srvDep,

View File

@@ -71,6 +71,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
if ldrs.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil {
return
@@ -79,6 +80,9 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
if datadb, err = ldrs.dm.WaitForDM(ctx); err != nil {
return
}
if err = ldrs.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
ldrs.Lock()
defer ldrs.Unlock()

View File

@@ -70,6 +70,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
if ran.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
ran.srvDep[utils.DataDB].Add(1)
if err = ran.cacheS.WaitToPrecache(ctx,
utils.CacheRankingProfiles,
@@ -81,11 +82,14 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
if datadb, err = ran.dm.WaitForDM(ctx); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ran.filterSChan); err != nil {
return
}
if err = ran.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
ran.Lock()
defer ran.Unlock()
ran.ran = engine.NewRankingS(datadb, ran.connMgr, filterS, ran.cfg)

View File

@@ -116,16 +116,17 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
utils.CacheRateFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, rs.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = rs.dmS.WaitForDM(ctx); err != nil {
return
}
if err = rs.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
rs.Lock()
rs.rateS = rates.NewRateS(rs.cfg, filterS, datadb)

View File

@@ -71,24 +71,25 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
if reS.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
reS.srvDep[utils.DataDB].Add(1)
reS.srvDep[utils.DataDB].Add(1)
if err = reS.cacheS.WaitToPrecache(ctx,
utils.CacheResourceProfiles,
utils.CacheResources,
utils.CacheResourceFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, reS.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = reS.dm.WaitForDM(ctx); err != nil {
return
}
if err = reS.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
reS.Lock()
defer reS.Unlock()

View File

@@ -78,16 +78,18 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
utils.CacheRouteFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, routeS.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = routeS.dm.WaitForDM(ctx); err != nil {
return
}
if err = routeS.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
routeS.Lock()
defer routeS.Unlock()
routeS.routeS = engine.NewRouteService(datadb, filterS, routeS.cfg, routeS.connMgr)

View File

@@ -80,11 +80,14 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = smg.dm.WaitForDM(ctx); err != nil {
return
}
if err = smg.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
smg.Lock()
defer smg.Unlock()

View File

@@ -71,6 +71,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
if sts.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
sts.srvDep[utils.DataDB].Add(1)
if err = sts.cacheS.WaitToPrecache(ctx,
utils.CacheStatQueueProfiles,
@@ -78,16 +79,17 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
utils.CacheStatFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, sts.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = sts.dm.WaitForDM(ctx); err != nil {
return
}
if err = sts.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
sts.Lock()
defer sts.Unlock()

View File

@@ -71,6 +71,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
if thrs.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
thrs.srvDep[utils.DataDB].Add(1)
if err = thrs.cacheS.WaitToPrecache(ctx,
utils.CacheThresholdProfiles,
@@ -78,16 +79,17 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
utils.CacheThresholdFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, thrs.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = thrs.dm.WaitForDM(ctx); err != nil {
return
}
if err = thrs.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
thrs.Lock()
defer thrs.Unlock()

View File

@@ -68,6 +68,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
if trs.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
trs.srvDep[utils.DataDB].Add(1)
if err = trs.cacheS.WaitToPrecache(ctx,
utils.CacheTrendProfiles,
@@ -79,11 +80,14 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
if datadb, err = trs.dm.WaitForDM(ctx); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, trs.filterSChan); err != nil {
return
}
if err = trs.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
trs.Lock()
defer trs.Unlock()
trs.trs = engine.NewTrendService(datadb, trs.cfg, filterS, trs.connMgr)