From 59f8b1379e179bce0cdee350269c372e7f940bed Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Mon, 11 Nov 2024 14:24:29 +0200 Subject: [PATCH] Use channel instead of wait func (CLS) --- services/accounts.go | 12 +++---- services/actions.go | 11 +++---- services/adminsv1.go | 11 +++---- services/analyzers.go | 11 +++---- services/attributes.go | 11 +++---- services/caches.go | 18 +++++------ services/cdrs.go | 11 +++---- services/cgr-engine.go | 63 +++++++++++++++++++------------------ services/chargers.go | 11 +++---- services/commonlisteners.go | 37 +++++++--------------- services/cores.go | 15 ++++----- services/dispatchers.go | 11 +++---- services/ees.go | 12 +++---- services/efs.go | 11 +++---- services/ers.go | 11 +++---- services/httpagent.go | 12 +++---- services/janus.go | 14 ++++----- services/libcgr-engine.go | 27 +++++++++------- services/loaders.go | 11 +++---- services/rankings.go | 11 +++---- services/rates.go | 11 +++---- services/resources.go | 11 +++---- services/routes.go | 11 +++---- services/sessions.go | 11 +++---- services/stats.go | 11 +++---- services/thresholds.go | 11 +++---- services/tpes.go | 13 ++++---- services/trends.go | 11 +++---- 28 files changed, 191 insertions(+), 230 deletions(-) diff --git a/services/accounts.go b/services/accounts.go index eae646159..b14454635 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -37,7 +37,7 @@ import ( // NewAccountService returns the Account Service func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - connMgr *engine.ConnManager, cls *CommonListenerService, + connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &AccountService{ @@ -47,7 +47,7 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, cacheS: cacheS, filterSChan: filterSChan, connMgr: connMgr, - cls: cls, + clSChan: clSChan, anz: anz, srvDep: srvDep, rldChan: make(chan struct{}, 1), @@ -58,7 +58,7 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, type AccountService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService cacheS *CacheService anz *AnalyzerService @@ -81,10 +81,8 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e return utils.ErrServiceAlreadyRunning } - acts.cl, err = acts.cls.WaitForCLS(ctx) - if err != nil { - return err - } + acts.cl = <-acts.clSChan + acts.clSChan <- acts.cl if err = acts.cacheS.WaitToPrecache(ctx, utils.CacheAccounts, utils.CacheAccountsFilterIndexes); err != nil { diff --git a/services/actions.go b/services/actions.go index 691f23dbc..20530112c 100644 --- a/services/actions.go +++ b/services/actions.go @@ -38,7 +38,7 @@ import ( func NewActionService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - cls *CommonListenerService, internalChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &ActionService{ connChan: internalChan, @@ -47,7 +47,7 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, anz: anz, srvDep: srvDep, rldChan: make(chan struct{}, 1), @@ -58,7 +58,7 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService, type ActionService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService anz *AnalyzerService cacheS *CacheService @@ -82,9 +82,8 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er return utils.ErrServiceAlreadyRunning } - if acts.cl, err = acts.cls.WaitForCLS(ctx); err != nil { - return err - } + acts.cl = <-acts.clSChan + acts.clSChan <- acts.cl if err = acts.cacheS.WaitToPrecache(ctx, utils.CacheActionProfiles, utils.CacheActionProfilesFilterIndexes); err != nil { diff --git a/services/adminsv1.go b/services/adminsv1.go index dc2e95f6c..4f762dcd7 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -34,7 +34,7 @@ import ( // NewAPIerSv1Service returns the APIerSv1 Service func NewAdminSv1Service(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, - filterSChan chan *engine.FilterS, cls *CommonListenerService, + filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalAPIerSv1Chan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { @@ -44,7 +44,7 @@ func NewAdminSv1Service(cfg *config.CGRConfig, dm: dm, storDB: storDB, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -55,7 +55,7 @@ func NewAdminSv1Service(cfg *config.CGRConfig, type AdminSv1Service struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService storDB *StorDBService anz *AnalyzerService @@ -78,9 +78,8 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF return utils.ErrServiceAlreadyRunning } - if apiService.cl, err = apiService.cls.WaitForCLS(ctx); err != nil { - return err - } + apiService.cl = <-apiService.clSChan + apiService.clSChan <- apiService.cl var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil { return diff --git a/services/analyzers.go b/services/analyzers.go index 27250397f..8bd61a49d 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -32,14 +32,14 @@ import ( ) // NewAnalyzerService returns the Analyzer Service -func NewAnalyzerService(cfg *config.CGRConfig, clSrv *CommonListenerService, +func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, filterSChan chan *engine.FilterS, internalAnalyzerSChan chan birpc.ClientConnector, srvDep map[string]*sync.WaitGroup) *AnalyzerService { return &AnalyzerService{ connChan: internalAnalyzerSChan, cfg: cfg, - cls: clSrv, + clSChan: clSChan, filterSChan: filterSChan, srvDep: srvDep, } @@ -49,7 +49,7 @@ func NewAnalyzerService(cfg *config.CGRConfig, clSrv *CommonListenerService, type AnalyzerService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS filterSChan chan *engine.FilterS anz *analyzers.AnalyzerS @@ -68,9 +68,8 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun return utils.ErrServiceAlreadyRunning } - if anz.cl, err = anz.cls.WaitForCLS(ctx); err != nil { - return - } + anz.cl = <-anz.clSChan + anz.clSChan <- anz.cl anz.Lock() defer anz.Unlock() diff --git a/services/attributes.go b/services/attributes.go index 898048c9a..6e61dd8bf 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -35,7 +35,7 @@ import ( // NewAttributeService returns the Attribute Service func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - cls *CommonListenerService, internalChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, anz *AnalyzerService, dspS *DispatcherService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &AttributeService{ @@ -44,7 +44,7 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, anz: anz, srvDep: srvDep, dspS: dspS, @@ -55,7 +55,7 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, type AttributeService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService anz *AnalyzerService cacheS *CacheService @@ -77,9 +77,8 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) return utils.ErrServiceAlreadyRunning } - if attrS.cl, err = attrS.cls.WaitForCLS(ctx); err != nil { - return err - } + attrS.cl = <-attrS.clSChan + attrS.clSChan <- attrS.cl if err = attrS.cacheS.WaitToPrecache(ctx, utils.CacheAttributeProfiles, utils.CacheAttributeFilterIndexes); err != nil { diff --git a/services/caches.go b/services/caches.go index 0fc1a7a8e..74f4032f7 100644 --- a/services/caches.go +++ b/services/caches.go @@ -32,7 +32,7 @@ import ( // NewCacheService . func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager, - cls *CommonListenerService, internalChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, anz *AnalyzerService, // dspS *DispatcherService, cores *CoreService, srvDep map[string]*sync.WaitGroup) *CacheService { @@ -41,7 +41,7 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C srvDep: srvDep, anz: anz, cores: cores, - cls: cls, + clSChan: clSChan, dm: dm, connMgr: connMgr, rpc: internalChan, @@ -51,10 +51,10 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C // CacheService implements Agent interface type CacheService struct { - anz *AnalyzerService - cores *CoreService - cls *CommonListenerService - dm *DataDBService + anz *AnalyzerService + cores *CoreService + clSChan chan *commonlisteners.CommonListenerS + dm *DataDBService cl *commonlisteners.CommonListenerS @@ -67,10 +67,8 @@ type CacheService struct { // Start should handle the sercive start func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (err error) { - - if cS.cl, err = cS.cls.WaitForCLS(ctx); err != nil { - return err - } + cS.cl = <-cS.clSChan + cS.clSChan <- cS.cl var dm *engine.DataManager if dm, err = cS.dm.WaitForDM(ctx); err != nil { return diff --git a/services/cdrs.go b/services/cdrs.go index 4fe7cd2e7..ed3b66c66 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -36,7 +36,7 @@ import ( // NewCDRServer returns the CDR Server func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, filterSChan chan *engine.FilterS, - cls *CommonListenerService, internalCDRServerChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalCDRServerChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &CDRService{ @@ -45,7 +45,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, dm: dm, storDB: storDB, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -56,7 +56,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, type CDRService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService storDB *StorDBService anz *AnalyzerService @@ -80,9 +80,8 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs)) - if cs.cl, err = cs.cls.WaitForCLS(ctx); err != nil { - return err - } + cs.cl = <-cs.clSChan + cs.clSChan <- cs.cl var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, cs.filterSChan); err != nil { return diff --git a/services/cgr-engine.go b/services/cgr-engine.go index ca0266d27..20bfa4cf7 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -29,6 +29,7 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/efs" @@ -84,6 +85,7 @@ func NewCGREngine(cfg *config.CGRConfig) *CGREngine { utils.ThresholdS: new(sync.WaitGroup), utils.TPeS: new(sync.WaitGroup), }, + clsCh: make(chan *commonlisteners.CommonListenerS, 1), iFilterSCh: make(chan *engine.FilterS, 1), } } @@ -111,6 +113,7 @@ type CGREngine struct { efs *ExportFailoverService // chans (need to move this as services) + clsCh chan *commonlisteners.CommonListenerS iFilterSCh chan *engine.FilterS iGuardianSCh chan birpc.ClientConnector iConfigCh chan birpc.ClientConnector @@ -191,70 +194,70 @@ func (cgr *CGREngine) InitServices(setVersions bool) { cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep) cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, setVersions, cgr.srvDep) cgr.sdbS = NewStorDBService(cgr.cfg, setVersions, cgr.srvDep) - cgr.cls = NewCommonListenerService(cgr.cfg, cgr.caps, cgr.srvDep) - cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.cls, + cgr.cls = NewCommonListenerService(cgr.cfg, cgr.caps, cgr.clsCh, cgr.srvDep) + cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.clsCh, cgr.iFilterSCh, iAnalyzerSCh, cgr.srvDep) - cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.cls, iCoreSv1Ch, cgr.anzS, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep) + cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.clsCh, iCoreSv1Ch, cgr.anzS, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep) cgr.cacheS = NewCacheService(cgr.cfg, cgr.dmS, cgr.cM, - cgr.cls, iCacheSCh, cgr.anzS, cgr.coreS, + cgr.clsCh, iCacheSCh, cgr.anzS, cgr.coreS, cgr.srvDep) dspS := NewDispatcherService(cgr.cfg, cgr.dmS, cgr.cacheS, - cgr.iFilterSCh, cgr.cls, cgr.iDispatcherSCh, cgr.cM, + cgr.iFilterSCh, cgr.clsCh, cgr.iDispatcherSCh, cgr.cM, cgr.anzS, cgr.srvDep) - cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.cls, + cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh, iLoaderSCh, cgr.cM, cgr.anzS, cgr.srvDep) - cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.cls, cgr.srvDep) + cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.clsCh, cgr.srvDep) cgr.srvManager.AddServices(cgr.gvS, cgr.cls, cgr.coreS, cgr.cacheS, cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.sdbS, cgr.efs, - NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls, + NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.clsCh, iAdminSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.cls, iSessionSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewAttributeService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, iAttributeSCh, + NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh, iSessionSCh, cgr.cM, cgr.anzS, cgr.srvDep), + NewAttributeService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, iAttributeSCh, cgr.anzS, dspS, cgr.srvDep), - NewChargerService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, + NewChargerService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, iChargerSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewRouteService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, + NewRouteService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, iRouteSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewResourceService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, + NewResourceService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, iResourceSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewTrendService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, + NewTrendService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, iTrendSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewRankingService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, + NewRankingService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, iRankingSCh, cgr.cM, cgr.anzS, cgr.srvDep), NewThresholdService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, - cgr.cM, cgr.cls, iThresholdSCh, cgr.anzS, cgr.srvDep), - NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, + cgr.cM, cgr.clsCh, iThresholdSCh, cgr.anzS, cgr.srvDep), + NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, iStatSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.cls, iERsCh, cgr.anzS, cgr.srvDep), + NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iERsCh, cgr.anzS, cgr.srvDep), NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep), NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep), - NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cls, cgr.cM, cgr.srvDep), + NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep), NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep), // partial reload NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.caps, cgr.srvDep), // partial reload - NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cls, cgr.cM, cgr.srvDep), // no reload + NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep), // no reload NewSIPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), NewEventExporterService(cgr.cfg, cgr.iFilterSCh, - cgr.cM, cgr.cls, iEEsCh, cgr.anzS, cgr.srvDep), - NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls, iCDRServerCh, + cgr.cM, cgr.clsCh, iEEsCh, cgr.anzS, cgr.srvDep), + NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.clsCh, iCDRServerCh, cgr.cM, cgr.anzS, cgr.srvDep), NewRegistrarCService(cgr.cfg, cgr.cM, cgr.anzS, cgr.srvDep), NewRateService(cgr.cfg, cgr.cacheS, cgr.iFilterSCh, cgr.dmS, - cgr.cls, iRateSCh, cgr.anzS, cgr.srvDep), - NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.cls, iActionSCh, cgr.anzS, cgr.srvDep), - NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.cls, iAccountSCh, cgr.anzS, cgr.srvDep), - NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.cls, cgr.srvDep), + cgr.clsCh, iRateSCh, cgr.anzS, cgr.srvDep), + NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iActionSCh, cgr.anzS, cgr.srvDep), + NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iAccountSCh, cgr.anzS, cgr.srvDep), + NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.clsCh, cgr.srvDep), ) } @@ -321,9 +324,9 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu go cgrStartFilterService(ctx, cgr.iFilterSCh, cgr.cacheS.GetCacheSChan(), cgr.cM, cgr.cfg, cgr.dmS) - cgrInitServiceManagerV1(ctx, cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.cls, cgr.anzS) - cgrInitGuardianSv1(ctx, cgr.iGuardianSCh, cgr.cfg, cgr.cls, cgr.anzS) - cgrInitConfigSv1(ctx, cgr.iConfigCh, cgr.cfg, cgr.cls, cgr.anzS) + cgrInitServiceManagerV1(cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.clsCh, cgr.anzS) + cgrInitGuardianSv1(cgr.iGuardianSCh, cgr.cfg, cgr.clsCh, cgr.anzS) + cgrInitConfigSv1(cgr.iConfigCh, cgr.cfg, cgr.clsCh, cgr.anzS) if preload != utils.EmptyString { if err = cgrRunPreload(ctx, cgr.cfg, preload, cgr.ldrs); err != nil { @@ -332,7 +335,7 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu } // Serve rpc connections - cgrStartRPC(ctx, shtDw, cgr.cfg, cgr.cls, cgr.iDispatcherSCh) + cgrStartRPC(ctx, shtDw, cgr.cfg, cgr.clsCh, cgr.iDispatcherSCh) // TODO: find a better location for this if block if memProfParams.DirPath != "" { diff --git a/services/chargers.go b/services/chargers.go index 3f363547c..28d343189 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -34,7 +34,7 @@ import ( // NewChargerService returns the Charger Service func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, - cacheS *CacheService, filterSChan chan *engine.FilterS, cls *CommonListenerService, + cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalChargerSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &ChargerService{ @@ -43,7 +43,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -54,7 +54,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, type ChargerService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService cacheS *CacheService anz *AnalyzerService @@ -75,9 +75,8 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e return utils.ErrServiceAlreadyRunning } - if chrS.cl, err = chrS.cls.WaitForCLS(ctx); err != nil { - return err - } + chrS.cl = <-chrS.clSChan + chrS.clSChan <- chrS.cl if err = chrS.cacheS.WaitToPrecache(ctx, utils.CacheChargerProfiles, utils.CacheChargerFilterIndexes); err != nil { diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 21349ddf2..4f885c15b 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -30,12 +30,12 @@ import ( ) // NewCommonListenerService instantiates a new CommonListenerService. -func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) *CommonListenerService { +func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) *CommonListenerService { return &CommonListenerService{ - cfg: cfg, - caps: caps, - clsCh: make(chan *commonlisteners.CommonListenerS, 1), - srvDep: srvDep, + cfg: cfg, + caps: caps, + clSChan: clSChan, + srvDep: srvDep, } } @@ -45,10 +45,10 @@ type CommonListenerService struct { cls *commonlisteners.CommonListenerS - clsCh chan *commonlisteners.CommonListenerS - caps *engine.Caps - cfg *config.CGRConfig - srvDep map[string]*sync.WaitGroup + clSChan chan *commonlisteners.CommonListenerS + caps *engine.Caps + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup } // Start handles the service start. @@ -59,7 +59,7 @@ func (cl *CommonListenerService) Start(*context.Context, context.CancelFunc) err cl.mu.Lock() defer cl.mu.Unlock() cl.cls = commonlisteners.NewCommonListenerS(cl.caps) - cl.clsCh <- cl.cls + cl.clSChan <- cl.cls if len(cl.cfg.HTTPCfg().RegistrarSURL) != 0 { cl.cls.RegisterHTTPFunc(cl.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar) } @@ -79,7 +79,7 @@ func (cl *CommonListenerService) Shutdown() error { cl.mu.Lock() defer cl.mu.Unlock() cl.cls = nil - <-cl.clsCh + <-cl.clSChan return nil } @@ -99,18 +99,3 @@ func (cl *CommonListenerService) ServiceName() string { func (cl *CommonListenerService) ShouldRun() bool { return true } - -// WaitForCLS waits for the CommonListenerS structure to be initialized. -func (cl *CommonListenerService) WaitForCLS(ctx *context.Context) (*commonlisteners.CommonListenerS, error) { - cl.mu.RLock() - clsCh := cl.clsCh - cl.mu.RUnlock() - var cls *commonlisteners.CommonListenerS - select { - case <-ctx.Done(): - return nil, ctx.Err() - case cls = <-clsCh: - clsCh <- cls - } - return cls, nil -} diff --git a/services/cores.go b/services/cores.go index b0d263ec0..d2499d2fa 100644 --- a/services/cores.go +++ b/services/cores.go @@ -33,7 +33,7 @@ import ( ) // NewCoreService returns the Core Service -func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, cls *CommonListenerService, +func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS, internalCoreSChan chan birpc.ClientConnector, anz *AnalyzerService, fileCPU *os.File, shdWg *sync.WaitGroup, srvDep map[string]*sync.WaitGroup) *CoreService { @@ -43,7 +43,7 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, cls *CommonListene cfg: cfg, caps: caps, fileCPU: fileCPU, - cls: cls, + clSChan: clSChan, anz: anz, srvDep: srvDep, csCh: make(chan *cores.CoreS, 1), @@ -54,8 +54,8 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, cls *CommonListene type CoreService struct { mu sync.RWMutex - anz *AnalyzerService - cls *CommonListenerService + anz *AnalyzerService + clSChan chan *commonlisteners.CommonListenerS cS *cores.CoreS cl *commonlisteners.CommonListenerS @@ -76,11 +76,8 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err return utils.ErrServiceAlreadyRunning } - var err error - cS.cl, err = cS.cls.WaitForCLS(ctx) - if err != nil { - return err - } + cS.cl = <-cS.clSChan + cS.clSChan <- cS.cl if err := cS.anz.WaitForAnalyzerS(ctx); err != nil { return err } diff --git a/services/dispatchers.go b/services/dispatchers.go index 0bf5aff26..ee397a793 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -33,7 +33,7 @@ import ( // NewDispatcherService returns the Dispatcher Service func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - cls *CommonListenerService, internalChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) *DispatcherService { return &DispatcherService{ @@ -42,7 +42,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -54,7 +54,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, type DispatcherService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService anz *AnalyzerService cacheS *CacheService @@ -76,9 +76,8 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) return utils.ErrServiceAlreadyRunning } utils.Logger.Info("Starting CGRateS DispatcherS service.") - if dspS.cl, err = dspS.cls.WaitForCLS(ctx); err != nil { - return err - } + dspS.cl = <-dspS.clSChan + dspS.clSChan <- dspS.cl if err = dspS.cacheS.WaitToPrecache(ctx, utils.CacheDispatcherProfiles, utils.CacheDispatcherHosts, diff --git a/services/ees.go b/services/ees.go index 6656d3cdb..0d8d71f0a 100644 --- a/services/ees.go +++ b/services/ees.go @@ -34,13 +34,13 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - connMgr *engine.ConnManager, cls *CommonListenerService, intConnChan chan birpc.ClientConnector, + connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, intConnChan chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &EventExporterService{ cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, - cls: cls, + clSChan: clSChan, intConnChan: intConnChan, anz: anz, srvDep: srvDep, @@ -51,7 +51,7 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil type EventExporterService struct { mu sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS anz *AnalyzerService filterSChan chan *engine.FilterS @@ -107,10 +107,8 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc return utils.ErrServiceAlreadyRunning } - var err error - if es.cl, err = es.cls.WaitForCLS(ctx); err != nil { - return err - } + es.cl = <-es.clSChan + es.clSChan <- es.cl fltrS, err := waitForFilterS(ctx, es.filterSChan) if err != nil { return err diff --git a/services/efs.go b/services/efs.go index 23cc77a94..09a03268a 100644 --- a/services/efs.go +++ b/services/efs.go @@ -36,7 +36,7 @@ import ( type ExportFailoverService struct { sync.Mutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS efS *efs.EfS cl *commonlisteners.CommonListenerS @@ -52,10 +52,10 @@ type ExportFailoverService struct { // NewExportFailoverService is the constructor for the TpeService func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager, intConnChan chan birpc.ClientConnector, - cls *CommonListenerService, srvDep map[string]*sync.WaitGroup) *ExportFailoverService { + clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) *ExportFailoverService { return &ExportFailoverService{ cfg: cfg, - cls: cls, + clSChan: clSChan, connMgr: connMgr, intConnChan: intConnChan, srvDep: srvDep, @@ -67,9 +67,8 @@ func (efServ *ExportFailoverService) Start(ctx *context.Context, _ context.Cance if efServ.IsRunning() { return utils.ErrServiceAlreadyRunning } - if efServ.cl, err = efServ.cls.WaitForCLS(ctx); err != nil { - return err - } + efServ.cl = <-efServ.clSChan + efServ.clSChan <- efServ.cl efServ.Lock() efServ.efS = efs.NewEfs(efServ.cfg, efServ.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EFs)) diff --git a/services/ers.go b/services/ers.go index 524b67faf..5136bfd83 100644 --- a/services/ers.go +++ b/services/ers.go @@ -37,7 +37,7 @@ func NewEventReaderService( cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - cls *CommonListenerService, + clSChan chan *commonlisteners.CommonListenerS, intConn chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { @@ -46,7 +46,7 @@ func NewEventReaderService( cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, - cls: cls, + clSChan: clSChan, intConn: intConn, anz: anz, srvDep: srvDep, @@ -57,7 +57,7 @@ func NewEventReaderService( type EventReaderService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS anz *AnalyzerService filterSChan chan *engine.FilterS @@ -78,9 +78,8 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel return utils.ErrServiceAlreadyRunning } - if erS.cl, err = erS.cls.WaitForCLS(ctx); err != nil { - return err - } + erS.cl = <-erS.clSChan + erS.clSChan <- erS.cl var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil { return diff --git a/services/httpagent.go b/services/httpagent.go index 56c59c6bb..d4e184941 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -33,12 +33,12 @@ import ( // NewHTTPAgent returns the HTTP Agent func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - cls *CommonListenerService, connMgr *engine.ConnManager, + clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &HTTPAgent{ cfg: cfg, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, srvDep: srvDep, } @@ -48,7 +48,7 @@ func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, type HTTPAgent struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS filterSChan chan *engine.FilterS cl *commonlisteners.CommonListenerS @@ -68,10 +68,8 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro return utils.ErrServiceAlreadyRunning } - cl, err := ha.cls.WaitForCLS(ctx) - if err != nil { - return err - } + cl := <-ha.clSChan + ha.clSChan <- cl var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, ha.filterSChan); err != nil { return diff --git a/services/janus.go b/services/janus.go index 7f83bc27b..5222e3b39 100644 --- a/services/janus.go +++ b/services/janus.go @@ -25,6 +25,7 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -33,12 +34,12 @@ import ( // NewJanusAgent returns the Janus Agent func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - cls *CommonListenerService, connMgr *engine.ConnManager, + clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &JanusAgent{ cfg: cfg, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, srvDep: srvDep, } @@ -48,7 +49,7 @@ func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, type JanusAgent struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS filterSChan chan *engine.FilterS jA *agents.JanusAgent @@ -64,11 +65,8 @@ type JanusAgent struct { // Start should jandle the sercive start func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err error) { - - cl, err := ja.cls.WaitForCLS(ctx) - if err != nil { - return err - } + cl := <-ja.clSChan + ja.clSChan <- cl var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, ja.filterSChan); err != nil { return diff --git a/services/libcgr-engine.go b/services/libcgr-engine.go index df9e48a71..4c7fdaf4c 100644 --- a/services/libcgr-engine.go +++ b/services/libcgr-engine.go @@ -32,6 +32,7 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/apis" + "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/guardian" @@ -183,9 +184,10 @@ func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS } } -func cgrInitGuardianSv1(ctx *context.Context, iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig, - cls *CommonListenerService, anz *AnalyzerService) { - cl, _ := cls.WaitForCLS(ctx) +func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig, + clSChan chan *commonlisteners.CommonListenerS, anz *AnalyzerService) { + cl := <-clSChan + clSChan <- cl srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true) if !cfg.DispatcherSCfg().Enabled { for _, s := range srv { @@ -195,10 +197,11 @@ func cgrInitGuardianSv1(ctx *context.Context, iGuardianSCh chan birpc.ClientConn iGuardianSCh <- anz.GetInternalCodec(srv, utils.GuardianS) } -func cgrInitServiceManagerV1(ctx *context.Context, iServMngrCh chan birpc.ClientConnector, +func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig, - cls *CommonListenerService, anz *AnalyzerService) { - cl, _ := cls.WaitForCLS(ctx) + clSChan chan *commonlisteners.CommonListenerS, anz *AnalyzerService) { + cl := <-clSChan + clSChan <- cl srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false) if !cfg.DispatcherSCfg().Enabled { cl.RpcRegister(srv) @@ -206,9 +209,10 @@ func cgrInitServiceManagerV1(ctx *context.Context, iServMngrCh chan birpc.Client iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager) } -func cgrInitConfigSv1(ctx *context.Context, iConfigCh chan birpc.ClientConnector, - cfg *config.CGRConfig, cls *CommonListenerService, anz *AnalyzerService) { - cl, _ := cls.WaitForCLS(ctx) +func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector, + cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, anz *AnalyzerService) { + cl := <-clSChan + clSChan <- cl srv, _ := engine.NewServiceWithName(cfg, utils.ConfigS, true) // srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false) if !cfg.DispatcherSCfg().Enabled { @@ -220,8 +224,9 @@ func cgrInitConfigSv1(ctx *context.Context, iConfigCh chan birpc.ClientConnector } func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, - cfg *config.CGRConfig, cls *CommonListenerService, internalDispatcherSChan chan birpc.ClientConnector) { - cl, _ := cls.WaitForCLS(ctx) + cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, internalDispatcherSChan chan birpc.ClientConnector) { + cl := <-clSChan + clSChan <- cl if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this select { case dispatcherS := <-internalDispatcherSChan: diff --git a/services/loaders.go b/services/loaders.go index f5d9e2e61..5ee7a907c 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -33,7 +33,7 @@ import ( // NewLoaderService returns the Loader Service func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, - filterSChan chan *engine.FilterS, cls *CommonListenerService, + filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalLoaderSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) *LoaderService { @@ -42,7 +42,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, cfg: cfg, dm: dm, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, stopChan: make(chan struct{}), anz: anz, @@ -54,7 +54,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, type LoaderService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService anz *AnalyzerService filterSChan chan *engine.FilterS @@ -75,9 +75,8 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er return utils.ErrServiceAlreadyRunning } - if ldrs.cl, err = ldrs.cls.WaitForCLS(ctx); err != nil { - return err - } + ldrs.cl = <-ldrs.clSChan + ldrs.clSChan <- ldrs.cl var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil { return diff --git a/services/rankings.go b/services/rankings.go index e05697f5b..d0d5926fb 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -35,7 +35,7 @@ import ( // NewRankingService returns the RankingS Service func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - cls *CommonListenerService, internalRankingSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalRankingSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &RankingService{ @@ -44,7 +44,7 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -54,7 +54,7 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, type RankingService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService anz *AnalyzerService cacheS *CacheService @@ -76,9 +76,8 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er } ran.srvDep[utils.DataDB].Add(1) - if ran.cl, err = ran.cls.WaitForCLS(ctx); err != nil { - return err - } + ran.cl = <-ran.clSChan + ran.clSChan <- ran.cl if err = ran.cacheS.WaitToPrecache(ctx, utils.CacheRankingProfiles, utils.CacheRankings, diff --git a/services/rates.go b/services/rates.go index 8e1533cab..c603bf00c 100644 --- a/services/rates.go +++ b/services/rates.go @@ -34,7 +34,7 @@ import ( // NewRateService constructs RateService func NewRateService(cfg *config.CGRConfig, cacheS *CacheService, filterSChan chan *engine.FilterS, - dmS *DataDBService, cls *CommonListenerService, + dmS *DataDBService, clSChan chan *commonlisteners.CommonListenerS, intConnChan chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &RateService{ @@ -42,7 +42,7 @@ func NewRateService(cfg *config.CGRConfig, cacheS: cacheS, filterSChan: filterSChan, dmS: dmS, - cls: cls, + clSChan: clSChan, intConnChan: intConnChan, rldChan: make(chan struct{}), anz: anz, @@ -54,7 +54,7 @@ func NewRateService(cfg *config.CGRConfig, type RateService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS anz *AnalyzerService dmS *DataDBService cacheS *CacheService @@ -111,9 +111,8 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er return utils.ErrServiceAlreadyRunning } - if rs.cl, err = rs.cls.WaitForCLS(ctx); err != nil { - return err - } + rs.cl = <-rs.clSChan + rs.clSChan <- rs.cl if err = rs.cacheS.WaitToPrecache(ctx, utils.CacheRateProfiles, utils.CacheRateProfilesFilterIndexes, diff --git a/services/resources.go b/services/resources.go index 4e08b4615..7f9363f3d 100644 --- a/services/resources.go +++ b/services/resources.go @@ -34,7 +34,7 @@ import ( // NewResourceService returns the Resource Service func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - cls *CommonListenerService, internalResourceSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalResourceSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &ResourceService{ @@ -43,7 +43,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -54,7 +54,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, type ResourceService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService anz *AnalyzerService cacheS *CacheService @@ -76,9 +76,8 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e } reS.srvDep[utils.DataDB].Add(1) - if reS.cl, err = reS.cls.WaitForCLS(ctx); err != nil { - return err - } + reS.cl = <-reS.clSChan + reS.clSChan <- reS.cl if err = reS.cacheS.WaitToPrecache(ctx, utils.CacheResourceProfiles, utils.CacheResources, diff --git a/services/routes.go b/services/routes.go index f2d6195cb..bba1803fc 100644 --- a/services/routes.go +++ b/services/routes.go @@ -35,7 +35,7 @@ import ( // NewRouteService returns the Route Service func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - cls *CommonListenerService, internalRouteSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalRouteSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &RouteService{ @@ -44,7 +44,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -55,7 +55,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, type RouteService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService anz *AnalyzerService cacheS *CacheService @@ -76,9 +76,8 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e return utils.ErrServiceAlreadyRunning } - if routeS.cl, err = routeS.cls.WaitForCLS(ctx); err != nil { - return err - } + routeS.cl = <-routeS.clSChan + routeS.clSChan <- routeS.cl if err = routeS.cacheS.WaitToPrecache(ctx, utils.CacheRouteProfiles, utils.CacheRouteFilterIndexes); err != nil { diff --git a/services/sessions.go b/services/sessions.go index e9acd0d5f..361befbd7 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -36,7 +36,7 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - cls *CommonListenerService, internalChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &SessionService{ @@ -44,7 +44,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha cfg: cfg, dm: dm, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -55,7 +55,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha type SessionService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService anz *AnalyzerService filterSChan chan *engine.FilterS @@ -77,9 +77,8 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) return utils.ErrServiceAlreadyRunning } - if smg.cl, err = smg.cls.WaitForCLS(ctx); err != nil { - return err - } + smg.cl = <-smg.clSChan + smg.clSChan <- smg.cl var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil { return diff --git a/services/stats.go b/services/stats.go index ac5eb9520..f764f9b36 100644 --- a/services/stats.go +++ b/services/stats.go @@ -34,7 +34,7 @@ import ( // NewStatService returns the Stat Service func NewStatService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - cls *CommonListenerService, internalStatSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalStatSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &StatService{ @@ -43,7 +43,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -54,7 +54,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, type StatService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService anz *AnalyzerService cacheS *CacheService @@ -76,9 +76,8 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e } sts.srvDep[utils.DataDB].Add(1) - if sts.cl, err = sts.cls.WaitForCLS(ctx); err != nil { - return err - } + sts.cl = <-sts.clSChan + sts.clSChan <- sts.cl if err = sts.cacheS.WaitToPrecache(ctx, utils.CacheStatQueueProfiles, utils.CacheStatQueues, diff --git a/services/thresholds.go b/services/thresholds.go index da11f4924..e40399428 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -35,7 +35,7 @@ import ( func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - cls *CommonListenerService, internalThresholdSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalThresholdSChan chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &ThresholdService{ connChan: internalThresholdSChan, @@ -43,7 +43,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - cls: cls, + clSChan: clSChan, anz: anz, srvDep: srvDep, connMgr: connMgr, @@ -54,7 +54,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, type ThresholdService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService anz *AnalyzerService cacheS *CacheService @@ -76,9 +76,8 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) } thrs.srvDep[utils.DataDB].Add(1) - if thrs.cl, err = thrs.cls.WaitForCLS(ctx); err != nil { - return err - } + thrs.cl = <-thrs.clSChan + thrs.clSChan <- thrs.cl if err = thrs.cacheS.WaitToPrecache(ctx, utils.CacheThresholdProfiles, utils.CacheThresholds, diff --git a/services/tpes.go b/services/tpes.go index bf0f21f66..c38e04960 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -34,13 +34,13 @@ import ( // NewTPeService is the constructor for the TpeService func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService, - cls *CommonListenerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { + clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &TPeService{ cfg: cfg, srvDep: srvDep, dm: dm, connMgr: connMgr, - cls: cls, + clSChan: clSChan, } } @@ -48,8 +48,8 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataD type TPeService struct { sync.RWMutex - cls *CommonListenerService - dm *DataDBService + clSChan chan *commonlisteners.CommonListenerS + dm *DataDBService tpes *tpes.TPeS cl *commonlisteners.CommonListenerS @@ -63,9 +63,8 @@ type TPeService struct { // Start should handle the service start func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { - if ts.cl, err = ts.cls.WaitForCLS(ctx); err != nil { - return err - } + ts.cl = <-ts.clSChan + ts.clSChan <- ts.cl var datadb *engine.DataManager if datadb, err = ts.dm.WaitForDM(ctx); err != nil { return diff --git a/services/trends.go b/services/trends.go index 45b7381b6..ff3b708f7 100644 --- a/services/trends.go +++ b/services/trends.go @@ -34,7 +34,7 @@ import ( // NewTrendsService returns the TrendS Service func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - cls *CommonListenerService, internalTrendSChan chan birpc.ClientConnector, + clSChan chan *commonlisteners.CommonListenerS, internalTrendSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &TrendService{ @@ -42,7 +42,7 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, cfg: cfg, dm: dm, cacheS: cacheS, - cls: cls, + clSChan: clSChan, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -52,7 +52,7 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, type TrendService struct { sync.RWMutex - cls *CommonListenerService + clSChan chan *commonlisteners.CommonListenerS dm *DataDBService anz *AnalyzerService cacheS *CacheService @@ -74,9 +74,8 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err } trs.srvDep[utils.DataDB].Add(1) - if trs.cl, err = trs.cls.WaitForCLS(ctx); err != nil { - return err - } + trs.cl = <-trs.clSChan + trs.clSChan <- trs.cl if err = trs.cacheS.WaitToPrecache(ctx, utils.CacheTrendProfiles, utils.CacheTrends,