Use channel instead of wait func (CLS)

This commit is contained in:
ionutboangiu
2024-11-11 14:24:29 +02:00
committed by Dan Christian Bogos
parent a1ed51b301
commit 59f8b1379e
28 changed files with 191 additions and 230 deletions

View File

@@ -37,7 +37,7 @@ import (
// NewAccountService returns the Account Service
func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager, cls *CommonListenerService,
connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS,
internalChan chan birpc.ClientConnector,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &AccountService{
@@ -47,7 +47,7 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
cacheS: cacheS,
filterSChan: filterSChan,
connMgr: connMgr,
cls: cls,
clSChan: clSChan,
anz: anz,
srvDep: srvDep,
rldChan: make(chan struct{}, 1),
@@ -58,7 +58,7 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
type AccountService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
cacheS *CacheService
anz *AnalyzerService
@@ -81,10 +81,8 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
return utils.ErrServiceAlreadyRunning
}
acts.cl, err = acts.cls.WaitForCLS(ctx)
if err != nil {
return err
}
acts.cl = <-acts.clSChan
acts.clSChan <- acts.cl
if err = acts.cacheS.WaitToPrecache(ctx,
utils.CacheAccounts,
utils.CacheAccountsFilterIndexes); err != nil {

View File

@@ -38,7 +38,7 @@ import (
func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
cls *CommonListenerService, internalChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &ActionService{
connChan: internalChan,
@@ -47,7 +47,7 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
anz: anz,
srvDep: srvDep,
rldChan: make(chan struct{}, 1),
@@ -58,7 +58,7 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
type ActionService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
@@ -82,9 +82,8 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
return utils.ErrServiceAlreadyRunning
}
if acts.cl, err = acts.cls.WaitForCLS(ctx); err != nil {
return err
}
acts.cl = <-acts.clSChan
acts.clSChan <- acts.cl
if err = acts.cacheS.WaitToPrecache(ctx,
utils.CacheActionProfiles,
utils.CacheActionProfilesFilterIndexes); err != nil {

View File

@@ -34,7 +34,7 @@ import (
// NewAPIerSv1Service returns the APIerSv1 Service
func NewAdminSv1Service(cfg *config.CGRConfig,
dm *DataDBService, storDB *StorDBService,
filterSChan chan *engine.FilterS, cls *CommonListenerService,
filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
internalAPIerSv1Chan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
@@ -44,7 +44,7 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
dm: dm,
storDB: storDB,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -55,7 +55,7 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
type AdminSv1Service struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
storDB *StorDBService
anz *AnalyzerService
@@ -78,9 +78,8 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
return utils.ErrServiceAlreadyRunning
}
if apiService.cl, err = apiService.cls.WaitForCLS(ctx); err != nil {
return err
}
apiService.cl = <-apiService.clSChan
apiService.clSChan <- apiService.cl
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil {
return

View File

@@ -32,14 +32,14 @@ import (
)
// NewAnalyzerService returns the Analyzer Service
func NewAnalyzerService(cfg *config.CGRConfig, clSrv *CommonListenerService,
func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS,
filterSChan chan *engine.FilterS,
internalAnalyzerSChan chan birpc.ClientConnector,
srvDep map[string]*sync.WaitGroup) *AnalyzerService {
return &AnalyzerService{
connChan: internalAnalyzerSChan,
cfg: cfg,
cls: clSrv,
clSChan: clSChan,
filterSChan: filterSChan,
srvDep: srvDep,
}
@@ -49,7 +49,7 @@ func NewAnalyzerService(cfg *config.CGRConfig, clSrv *CommonListenerService,
type AnalyzerService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
filterSChan chan *engine.FilterS
anz *analyzers.AnalyzerS
@@ -68,9 +68,8 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun
return utils.ErrServiceAlreadyRunning
}
if anz.cl, err = anz.cls.WaitForCLS(ctx); err != nil {
return
}
anz.cl = <-anz.clSChan
anz.clSChan <- anz.cl
anz.Lock()
defer anz.Unlock()

View File

@@ -35,7 +35,7 @@ import (
// NewAttributeService returns the Attribute Service
func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
cls *CommonListenerService, internalChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
anz *AnalyzerService, dspS *DispatcherService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &AttributeService{
@@ -44,7 +44,7 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
anz: anz,
srvDep: srvDep,
dspS: dspS,
@@ -55,7 +55,7 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
type AttributeService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
@@ -77,9 +77,8 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
return utils.ErrServiceAlreadyRunning
}
if attrS.cl, err = attrS.cls.WaitForCLS(ctx); err != nil {
return err
}
attrS.cl = <-attrS.clSChan
attrS.clSChan <- attrS.cl
if err = attrS.cacheS.WaitToPrecache(ctx,
utils.CacheAttributeProfiles,
utils.CacheAttributeFilterIndexes); err != nil {

View File

@@ -32,7 +32,7 @@ import (
// NewCacheService .
func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager,
cls *CommonListenerService, internalChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
anz *AnalyzerService, // dspS *DispatcherService,
cores *CoreService,
srvDep map[string]*sync.WaitGroup) *CacheService {
@@ -41,7 +41,7 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C
srvDep: srvDep,
anz: anz,
cores: cores,
cls: cls,
clSChan: clSChan,
dm: dm,
connMgr: connMgr,
rpc: internalChan,
@@ -51,10 +51,10 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C
// CacheService implements Agent interface
type CacheService struct {
anz *AnalyzerService
cores *CoreService
cls *CommonListenerService
dm *DataDBService
anz *AnalyzerService
cores *CoreService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
cl *commonlisteners.CommonListenerS
@@ -67,10 +67,8 @@ type CacheService struct {
// Start should handle the sercive start
func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (err error) {
if cS.cl, err = cS.cls.WaitForCLS(ctx); err != nil {
return err
}
cS.cl = <-cS.clSChan
cS.clSChan <- cS.cl
var dm *engine.DataManager
if dm, err = cS.dm.WaitForDM(ctx); err != nil {
return

View File

@@ -36,7 +36,7 @@ import (
// NewCDRServer returns the CDR Server
func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
storDB *StorDBService, filterSChan chan *engine.FilterS,
cls *CommonListenerService, internalCDRServerChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalCDRServerChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &CDRService{
@@ -45,7 +45,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
storDB: storDB,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -56,7 +56,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
type CDRService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
storDB *StorDBService
anz *AnalyzerService
@@ -80,9 +80,8 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs))
if cs.cl, err = cs.cls.WaitForCLS(ctx); err != nil {
return err
}
cs.cl = <-cs.clSChan
cs.clSChan <- cs.cl
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, cs.filterSChan); err != nil {
return

View File

@@ -29,6 +29,7 @@ import (
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/efs"
@@ -84,6 +85,7 @@ func NewCGREngine(cfg *config.CGRConfig) *CGREngine {
utils.ThresholdS: new(sync.WaitGroup),
utils.TPeS: new(sync.WaitGroup),
},
clsCh: make(chan *commonlisteners.CommonListenerS, 1),
iFilterSCh: make(chan *engine.FilterS, 1),
}
}
@@ -111,6 +113,7 @@ type CGREngine struct {
efs *ExportFailoverService
// chans (need to move this as services)
clsCh chan *commonlisteners.CommonListenerS
iFilterSCh chan *engine.FilterS
iGuardianSCh chan birpc.ClientConnector
iConfigCh chan birpc.ClientConnector
@@ -191,70 +194,70 @@ func (cgr *CGREngine) InitServices(setVersions bool) {
cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep)
cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, setVersions, cgr.srvDep)
cgr.sdbS = NewStorDBService(cgr.cfg, setVersions, cgr.srvDep)
cgr.cls = NewCommonListenerService(cgr.cfg, cgr.caps, cgr.srvDep)
cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.cls,
cgr.cls = NewCommonListenerService(cgr.cfg, cgr.caps, cgr.clsCh, cgr.srvDep)
cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.clsCh,
cgr.iFilterSCh, iAnalyzerSCh, cgr.srvDep)
cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.cls, iCoreSv1Ch, cgr.anzS, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep)
cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.clsCh, iCoreSv1Ch, cgr.anzS, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep)
cgr.cacheS = NewCacheService(cgr.cfg, cgr.dmS, cgr.cM,
cgr.cls, iCacheSCh, cgr.anzS, cgr.coreS,
cgr.clsCh, iCacheSCh, cgr.anzS, cgr.coreS,
cgr.srvDep)
dspS := NewDispatcherService(cgr.cfg, cgr.dmS, cgr.cacheS,
cgr.iFilterSCh, cgr.cls, cgr.iDispatcherSCh, cgr.cM,
cgr.iFilterSCh, cgr.clsCh, cgr.iDispatcherSCh, cgr.cM,
cgr.anzS, cgr.srvDep)
cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.cls,
cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh,
iLoaderSCh, cgr.cM, cgr.anzS, cgr.srvDep)
cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.cls, cgr.srvDep)
cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.clsCh, cgr.srvDep)
cgr.srvManager.AddServices(cgr.gvS, cgr.cls, cgr.coreS, cgr.cacheS,
cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.sdbS, cgr.efs,
NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls,
NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.clsCh,
iAdminSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.cls, iSessionSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewAttributeService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, iAttributeSCh,
NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh, iSessionSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewAttributeService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, iAttributeSCh,
cgr.anzS, dspS, cgr.srvDep),
NewChargerService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
NewChargerService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iChargerSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewRouteService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
NewRouteService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iRouteSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewResourceService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
NewResourceService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iResourceSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewTrendService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
NewTrendService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iTrendSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewRankingService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
NewRankingService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iRankingSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewThresholdService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh,
cgr.cM, cgr.cls, iThresholdSCh, cgr.anzS, cgr.srvDep),
NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls,
cgr.cM, cgr.clsCh, iThresholdSCh, cgr.anzS, cgr.srvDep),
NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iStatSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.cls, iERsCh, cgr.anzS, cgr.srvDep),
NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iERsCh, cgr.anzS, cgr.srvDep),
NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep),
NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep),
NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep),
NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cls, cgr.cM, cgr.srvDep),
NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep),
NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep), // partial reload
NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload
NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.caps, cgr.srvDep), // partial reload
NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cls, cgr.cM, cgr.srvDep), // no reload
NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep), // no reload
NewSIPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep),
NewEventExporterService(cgr.cfg, cgr.iFilterSCh,
cgr.cM, cgr.cls, iEEsCh, cgr.anzS, cgr.srvDep),
NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls, iCDRServerCh,
cgr.cM, cgr.clsCh, iEEsCh, cgr.anzS, cgr.srvDep),
NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.clsCh, iCDRServerCh,
cgr.cM, cgr.anzS, cgr.srvDep),
NewRegistrarCService(cgr.cfg, cgr.cM, cgr.anzS, cgr.srvDep),
NewRateService(cgr.cfg, cgr.cacheS, cgr.iFilterSCh, cgr.dmS,
cgr.cls, iRateSCh, cgr.anzS, cgr.srvDep),
NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.cls, iActionSCh, cgr.anzS, cgr.srvDep),
NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.cls, iAccountSCh, cgr.anzS, cgr.srvDep),
NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.cls, cgr.srvDep),
cgr.clsCh, iRateSCh, cgr.anzS, cgr.srvDep),
NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iActionSCh, cgr.anzS, cgr.srvDep),
NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iAccountSCh, cgr.anzS, cgr.srvDep),
NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.clsCh, cgr.srvDep),
)
}
@@ -321,9 +324,9 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu
go cgrStartFilterService(ctx, cgr.iFilterSCh, cgr.cacheS.GetCacheSChan(), cgr.cM,
cgr.cfg, cgr.dmS)
cgrInitServiceManagerV1(ctx, cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.cls, cgr.anzS)
cgrInitGuardianSv1(ctx, cgr.iGuardianSCh, cgr.cfg, cgr.cls, cgr.anzS)
cgrInitConfigSv1(ctx, cgr.iConfigCh, cgr.cfg, cgr.cls, cgr.anzS)
cgrInitServiceManagerV1(cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.clsCh, cgr.anzS)
cgrInitGuardianSv1(cgr.iGuardianSCh, cgr.cfg, cgr.clsCh, cgr.anzS)
cgrInitConfigSv1(cgr.iConfigCh, cgr.cfg, cgr.clsCh, cgr.anzS)
if preload != utils.EmptyString {
if err = cgrRunPreload(ctx, cgr.cfg, preload, cgr.ldrs); err != nil {
@@ -332,7 +335,7 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu
}
// Serve rpc connections
cgrStartRPC(ctx, shtDw, cgr.cfg, cgr.cls, cgr.iDispatcherSCh)
cgrStartRPC(ctx, shtDw, cgr.cfg, cgr.clsCh, cgr.iDispatcherSCh)
// TODO: find a better location for this if block
if memProfParams.DirPath != "" {

View File

@@ -34,7 +34,7 @@ import (
// NewChargerService returns the Charger Service
func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS, cls *CommonListenerService,
cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
internalChargerSChan chan birpc.ClientConnector, connMgr *engine.ConnManager,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &ChargerService{
@@ -43,7 +43,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -54,7 +54,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
type ChargerService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
cacheS *CacheService
anz *AnalyzerService
@@ -75,9 +75,8 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
return utils.ErrServiceAlreadyRunning
}
if chrS.cl, err = chrS.cls.WaitForCLS(ctx); err != nil {
return err
}
chrS.cl = <-chrS.clSChan
chrS.clSChan <- chrS.cl
if err = chrS.cacheS.WaitToPrecache(ctx,
utils.CacheChargerProfiles,
utils.CacheChargerFilterIndexes); err != nil {

View File

@@ -30,12 +30,12 @@ import (
)
// NewCommonListenerService instantiates a new CommonListenerService.
func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) *CommonListenerService {
func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) *CommonListenerService {
return &CommonListenerService{
cfg: cfg,
caps: caps,
clsCh: make(chan *commonlisteners.CommonListenerS, 1),
srvDep: srvDep,
cfg: cfg,
caps: caps,
clSChan: clSChan,
srvDep: srvDep,
}
}
@@ -45,10 +45,10 @@ type CommonListenerService struct {
cls *commonlisteners.CommonListenerS
clsCh chan *commonlisteners.CommonListenerS
caps *engine.Caps
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
clSChan chan *commonlisteners.CommonListenerS
caps *engine.Caps
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// Start handles the service start.
@@ -59,7 +59,7 @@ func (cl *CommonListenerService) Start(*context.Context, context.CancelFunc) err
cl.mu.Lock()
defer cl.mu.Unlock()
cl.cls = commonlisteners.NewCommonListenerS(cl.caps)
cl.clsCh <- cl.cls
cl.clSChan <- cl.cls
if len(cl.cfg.HTTPCfg().RegistrarSURL) != 0 {
cl.cls.RegisterHTTPFunc(cl.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar)
}
@@ -79,7 +79,7 @@ func (cl *CommonListenerService) Shutdown() error {
cl.mu.Lock()
defer cl.mu.Unlock()
cl.cls = nil
<-cl.clsCh
<-cl.clSChan
return nil
}
@@ -99,18 +99,3 @@ func (cl *CommonListenerService) ServiceName() string {
func (cl *CommonListenerService) ShouldRun() bool {
return true
}
// WaitForCLS waits for the CommonListenerS structure to be initialized.
func (cl *CommonListenerService) WaitForCLS(ctx *context.Context) (*commonlisteners.CommonListenerS, error) {
cl.mu.RLock()
clsCh := cl.clsCh
cl.mu.RUnlock()
var cls *commonlisteners.CommonListenerS
select {
case <-ctx.Done():
return nil, ctx.Err()
case cls = <-clsCh:
clsCh <- cls
}
return cls, nil
}

View File

@@ -33,7 +33,7 @@ import (
)
// NewCoreService returns the Core Service
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, cls *CommonListenerService,
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS,
internalCoreSChan chan birpc.ClientConnector, anz *AnalyzerService,
fileCPU *os.File, shdWg *sync.WaitGroup,
srvDep map[string]*sync.WaitGroup) *CoreService {
@@ -43,7 +43,7 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, cls *CommonListene
cfg: cfg,
caps: caps,
fileCPU: fileCPU,
cls: cls,
clSChan: clSChan,
anz: anz,
srvDep: srvDep,
csCh: make(chan *cores.CoreS, 1),
@@ -54,8 +54,8 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, cls *CommonListene
type CoreService struct {
mu sync.RWMutex
anz *AnalyzerService
cls *CommonListenerService
anz *AnalyzerService
clSChan chan *commonlisteners.CommonListenerS
cS *cores.CoreS
cl *commonlisteners.CommonListenerS
@@ -76,11 +76,8 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err
return utils.ErrServiceAlreadyRunning
}
var err error
cS.cl, err = cS.cls.WaitForCLS(ctx)
if err != nil {
return err
}
cS.cl = <-cS.clSChan
cS.clSChan <- cS.cl
if err := cS.anz.WaitForAnalyzerS(ctx); err != nil {
return err
}

View File

@@ -33,7 +33,7 @@ import (
// NewDispatcherService returns the Dispatcher Service
func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
cls *CommonListenerService, internalChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) *DispatcherService {
return &DispatcherService{
@@ -42,7 +42,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -54,7 +54,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
type DispatcherService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
@@ -76,9 +76,8 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
return utils.ErrServiceAlreadyRunning
}
utils.Logger.Info("Starting CGRateS DispatcherS service.")
if dspS.cl, err = dspS.cls.WaitForCLS(ctx); err != nil {
return err
}
dspS.cl = <-dspS.clSChan
dspS.clSChan <- dspS.cl
if err = dspS.cacheS.WaitToPrecache(ctx,
utils.CacheDispatcherProfiles,
utils.CacheDispatcherHosts,

View File

@@ -34,13 +34,13 @@ import (
// NewEventExporterService constructs EventExporterService
func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager, cls *CommonListenerService, intConnChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, intConnChan chan birpc.ClientConnector,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &EventExporterService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
cls: cls,
clSChan: clSChan,
intConnChan: intConnChan,
anz: anz,
srvDep: srvDep,
@@ -51,7 +51,7 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil
type EventExporterService struct {
mu sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
anz *AnalyzerService
filterSChan chan *engine.FilterS
@@ -107,10 +107,8 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
return utils.ErrServiceAlreadyRunning
}
var err error
if es.cl, err = es.cls.WaitForCLS(ctx); err != nil {
return err
}
es.cl = <-es.clSChan
es.clSChan <- es.cl
fltrS, err := waitForFilterS(ctx, es.filterSChan)
if err != nil {
return err

View File

@@ -36,7 +36,7 @@ import (
type ExportFailoverService struct {
sync.Mutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
efS *efs.EfS
cl *commonlisteners.CommonListenerS
@@ -52,10 +52,10 @@ type ExportFailoverService struct {
// NewExportFailoverService is the constructor for the TpeService
func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
intConnChan chan birpc.ClientConnector,
cls *CommonListenerService, srvDep map[string]*sync.WaitGroup) *ExportFailoverService {
clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) *ExportFailoverService {
return &ExportFailoverService{
cfg: cfg,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
intConnChan: intConnChan,
srvDep: srvDep,
@@ -67,9 +67,8 @@ func (efServ *ExportFailoverService) Start(ctx *context.Context, _ context.Cance
if efServ.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
if efServ.cl, err = efServ.cls.WaitForCLS(ctx); err != nil {
return err
}
efServ.cl = <-efServ.clSChan
efServ.clSChan <- efServ.cl
efServ.Lock()
efServ.efS = efs.NewEfs(efServ.cfg, efServ.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EFs))

View File

@@ -37,7 +37,7 @@ func NewEventReaderService(
cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
cls *CommonListenerService,
clSChan chan *commonlisteners.CommonListenerS,
intConn chan birpc.ClientConnector,
anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
@@ -46,7 +46,7 @@ func NewEventReaderService(
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
cls: cls,
clSChan: clSChan,
intConn: intConn,
anz: anz,
srvDep: srvDep,
@@ -57,7 +57,7 @@ func NewEventReaderService(
type EventReaderService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
anz *AnalyzerService
filterSChan chan *engine.FilterS
@@ -78,9 +78,8 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel
return utils.ErrServiceAlreadyRunning
}
if erS.cl, err = erS.cls.WaitForCLS(ctx); err != nil {
return err
}
erS.cl = <-erS.clSChan
erS.clSChan <- erS.cl
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil {
return

View File

@@ -33,12 +33,12 @@ import (
// NewHTTPAgent returns the HTTP Agent
func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
cls *CommonListenerService, connMgr *engine.ConnManager,
clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &HTTPAgent{
cfg: cfg,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -48,7 +48,7 @@ func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
type HTTPAgent struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
filterSChan chan *engine.FilterS
cl *commonlisteners.CommonListenerS
@@ -68,10 +68,8 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro
return utils.ErrServiceAlreadyRunning
}
cl, err := ha.cls.WaitForCLS(ctx)
if err != nil {
return err
}
cl := <-ha.clSChan
ha.clSChan <- cl
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ha.filterSChan); err != nil {
return

View File

@@ -25,6 +25,7 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -33,12 +34,12 @@ import (
// NewJanusAgent returns the Janus Agent
func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
cls *CommonListenerService, connMgr *engine.ConnManager,
clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &JanusAgent{
cfg: cfg,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -48,7 +49,7 @@ func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
type JanusAgent struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
filterSChan chan *engine.FilterS
jA *agents.JanusAgent
@@ -64,11 +65,8 @@ type JanusAgent struct {
// Start should jandle the sercive start
func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
cl, err := ja.cls.WaitForCLS(ctx)
if err != nil {
return err
}
cl := <-ja.clSChan
ja.clSChan <- cl
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ja.filterSChan); err != nil {
return

View File

@@ -32,6 +32,7 @@ import (
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
@@ -183,9 +184,10 @@ func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS
}
}
func cgrInitGuardianSv1(ctx *context.Context, iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig,
cls *CommonListenerService, anz *AnalyzerService) {
cl, _ := cls.WaitForCLS(ctx)
func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig,
clSChan chan *commonlisteners.CommonListenerS, anz *AnalyzerService) {
cl := <-clSChan
clSChan <- cl
srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)
if !cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
@@ -195,10 +197,11 @@ func cgrInitGuardianSv1(ctx *context.Context, iGuardianSCh chan birpc.ClientConn
iGuardianSCh <- anz.GetInternalCodec(srv, utils.GuardianS)
}
func cgrInitServiceManagerV1(ctx *context.Context, iServMngrCh chan birpc.ClientConnector,
func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig,
cls *CommonListenerService, anz *AnalyzerService) {
cl, _ := cls.WaitForCLS(ctx)
clSChan chan *commonlisteners.CommonListenerS, anz *AnalyzerService) {
cl := <-clSChan
clSChan <- cl
srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false)
if !cfg.DispatcherSCfg().Enabled {
cl.RpcRegister(srv)
@@ -206,9 +209,10 @@ func cgrInitServiceManagerV1(ctx *context.Context, iServMngrCh chan birpc.Client
iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager)
}
func cgrInitConfigSv1(ctx *context.Context, iConfigCh chan birpc.ClientConnector,
cfg *config.CGRConfig, cls *CommonListenerService, anz *AnalyzerService) {
cl, _ := cls.WaitForCLS(ctx)
func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector,
cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, anz *AnalyzerService) {
cl := <-clSChan
clSChan <- cl
srv, _ := engine.NewServiceWithName(cfg, utils.ConfigS, true)
// srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false)
if !cfg.DispatcherSCfg().Enabled {
@@ -220,8 +224,9 @@ func cgrInitConfigSv1(ctx *context.Context, iConfigCh chan birpc.ClientConnector
}
func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc,
cfg *config.CGRConfig, cls *CommonListenerService, internalDispatcherSChan chan birpc.ClientConnector) {
cl, _ := cls.WaitForCLS(ctx)
cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, internalDispatcherSChan chan birpc.ClientConnector) {
cl := <-clSChan
clSChan <- cl
if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this
select {
case dispatcherS := <-internalDispatcherSChan:

View File

@@ -33,7 +33,7 @@ import (
// NewLoaderService returns the Loader Service
func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS, cls *CommonListenerService,
filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
internalLoaderSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) *LoaderService {
@@ -42,7 +42,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
stopChan: make(chan struct{}),
anz: anz,
@@ -54,7 +54,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
type LoaderService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
anz *AnalyzerService
filterSChan chan *engine.FilterS
@@ -75,9 +75,8 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
return utils.ErrServiceAlreadyRunning
}
if ldrs.cl, err = ldrs.cls.WaitForCLS(ctx); err != nil {
return err
}
ldrs.cl = <-ldrs.clSChan
ldrs.clSChan <- ldrs.cl
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil {
return

View File

@@ -35,7 +35,7 @@ import (
// NewRankingService returns the RankingS Service
func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
cls *CommonListenerService, internalRankingSChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalRankingSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &RankingService{
@@ -44,7 +44,7 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -54,7 +54,7 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
type RankingService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
@@ -76,9 +76,8 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
}
ran.srvDep[utils.DataDB].Add(1)
if ran.cl, err = ran.cls.WaitForCLS(ctx); err != nil {
return err
}
ran.cl = <-ran.clSChan
ran.clSChan <- ran.cl
if err = ran.cacheS.WaitToPrecache(ctx,
utils.CacheRankingProfiles,
utils.CacheRankings,

View File

@@ -34,7 +34,7 @@ import (
// NewRateService constructs RateService
func NewRateService(cfg *config.CGRConfig,
cacheS *CacheService, filterSChan chan *engine.FilterS,
dmS *DataDBService, cls *CommonListenerService,
dmS *DataDBService, clSChan chan *commonlisteners.CommonListenerS,
intConnChan chan birpc.ClientConnector, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &RateService{
@@ -42,7 +42,7 @@ func NewRateService(cfg *config.CGRConfig,
cacheS: cacheS,
filterSChan: filterSChan,
dmS: dmS,
cls: cls,
clSChan: clSChan,
intConnChan: intConnChan,
rldChan: make(chan struct{}),
anz: anz,
@@ -54,7 +54,7 @@ func NewRateService(cfg *config.CGRConfig,
type RateService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
anz *AnalyzerService
dmS *DataDBService
cacheS *CacheService
@@ -111,9 +111,8 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
return utils.ErrServiceAlreadyRunning
}
if rs.cl, err = rs.cls.WaitForCLS(ctx); err != nil {
return err
}
rs.cl = <-rs.clSChan
rs.clSChan <- rs.cl
if err = rs.cacheS.WaitToPrecache(ctx,
utils.CacheRateProfiles,
utils.CacheRateProfilesFilterIndexes,

View File

@@ -34,7 +34,7 @@ import (
// NewResourceService returns the Resource Service
func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
cls *CommonListenerService, internalResourceSChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalResourceSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &ResourceService{
@@ -43,7 +43,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -54,7 +54,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
type ResourceService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
@@ -76,9 +76,8 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
}
reS.srvDep[utils.DataDB].Add(1)
if reS.cl, err = reS.cls.WaitForCLS(ctx); err != nil {
return err
}
reS.cl = <-reS.clSChan
reS.clSChan <- reS.cl
if err = reS.cacheS.WaitToPrecache(ctx,
utils.CacheResourceProfiles,
utils.CacheResources,

View File

@@ -35,7 +35,7 @@ import (
// NewRouteService returns the Route Service
func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
cls *CommonListenerService, internalRouteSChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalRouteSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &RouteService{
@@ -44,7 +44,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -55,7 +55,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
type RouteService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
@@ -76,9 +76,8 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
return utils.ErrServiceAlreadyRunning
}
if routeS.cl, err = routeS.cls.WaitForCLS(ctx); err != nil {
return err
}
routeS.cl = <-routeS.clSChan
routeS.clSChan <- routeS.cl
if err = routeS.cacheS.WaitToPrecache(ctx,
utils.CacheRouteProfiles,
utils.CacheRouteFilterIndexes); err != nil {

View File

@@ -36,7 +36,7 @@ import (
// NewSessionService returns the Session Service
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS,
cls *CommonListenerService, internalChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &SessionService{
@@ -44,7 +44,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -55,7 +55,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha
type SessionService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
anz *AnalyzerService
filterSChan chan *engine.FilterS
@@ -77,9 +77,8 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
return utils.ErrServiceAlreadyRunning
}
if smg.cl, err = smg.cls.WaitForCLS(ctx); err != nil {
return err
}
smg.cl = <-smg.clSChan
smg.clSChan <- smg.cl
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil {
return

View File

@@ -34,7 +34,7 @@ import (
// NewStatService returns the Stat Service
func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
cls *CommonListenerService, internalStatSChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalStatSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &StatService{
@@ -43,7 +43,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -54,7 +54,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
type StatService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
@@ -76,9 +76,8 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
}
sts.srvDep[utils.DataDB].Add(1)
if sts.cl, err = sts.cls.WaitForCLS(ctx); err != nil {
return err
}
sts.cl = <-sts.clSChan
sts.clSChan <- sts.cl
if err = sts.cacheS.WaitToPrecache(ctx,
utils.CacheStatQueueProfiles,
utils.CacheStatQueues,

View File

@@ -35,7 +35,7 @@ import (
func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
cls *CommonListenerService, internalThresholdSChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalThresholdSChan chan birpc.ClientConnector,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &ThresholdService{
connChan: internalThresholdSChan,
@@ -43,7 +43,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
cls: cls,
clSChan: clSChan,
anz: anz,
srvDep: srvDep,
connMgr: connMgr,
@@ -54,7 +54,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
type ThresholdService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
@@ -76,9 +76,8 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
}
thrs.srvDep[utils.DataDB].Add(1)
if thrs.cl, err = thrs.cls.WaitForCLS(ctx); err != nil {
return err
}
thrs.cl = <-thrs.clSChan
thrs.clSChan <- thrs.cl
if err = thrs.cacheS.WaitToPrecache(ctx,
utils.CacheThresholdProfiles,
utils.CacheThresholds,

View File

@@ -34,13 +34,13 @@ import (
// NewTPeService is the constructor for the TpeService
func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService,
cls *CommonListenerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &TPeService{
cfg: cfg,
srvDep: srvDep,
dm: dm,
connMgr: connMgr,
cls: cls,
clSChan: clSChan,
}
}
@@ -48,8 +48,8 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataD
type TPeService struct {
sync.RWMutex
cls *CommonListenerService
dm *DataDBService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
tpes *tpes.TPeS
cl *commonlisteners.CommonListenerS
@@ -63,9 +63,8 @@ type TPeService struct {
// Start should handle the service start
func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
if ts.cl, err = ts.cls.WaitForCLS(ctx); err != nil {
return err
}
ts.cl = <-ts.clSChan
ts.clSChan <- ts.cl
var datadb *engine.DataManager
if datadb, err = ts.dm.WaitForDM(ctx); err != nil {
return

View File

@@ -34,7 +34,7 @@ import (
// NewTrendsService returns the TrendS Service
func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
cls *CommonListenerService, internalTrendSChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, internalTrendSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &TrendService{
@@ -42,7 +42,7 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
cfg: cfg,
dm: dm,
cacheS: cacheS,
cls: cls,
clSChan: clSChan,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -52,7 +52,7 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
type TrendService struct {
sync.RWMutex
cls *CommonListenerService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
@@ -74,9 +74,8 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
}
trs.srvDep[utils.DataDB].Add(1)
if trs.cl, err = trs.cls.WaitForCLS(ctx); err != nil {
return err
}
trs.cl = <-trs.clSChan
trs.clSChan <- trs.cl
if err = trs.cacheS.WaitToPrecache(ctx,
utils.CacheTrendProfiles,
utils.CacheTrends,