Remove clsChan in favor of using the service indexer

This commit is contained in:
ionutboangiu
2024-12-10 19:53:54 +02:00
committed by Dan Christian Bogos
parent 98b5b74b23
commit 55ecdf45e4
27 changed files with 180 additions and 181 deletions

View File

@@ -134,7 +134,6 @@ func runCGREngine(fs []string) (err error) {
iGuardianSCh := make(chan birpc.ClientConnector, 1)
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, iGuardianSCh)
clsCh := make(chan *commonlisteners.CommonListenerS, 1)
iFilterSCh := make(chan *engine.FilterS, 1)
// ServiceIndexer will share service references to all services
@@ -142,40 +141,40 @@ func runCGREngine(fs []string) (err error) {
gvS := services.NewGlobalVarS(cfg, srvIdxr)
dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep, srvIdxr)
sdbS := services.NewStorDBService(cfg, *flags.SetVersions, srvIdxr)
cls := services.NewCommonListenerService(cfg, caps, clsCh, srvIdxr)
anzS := services.NewAnalyzerService(cfg, clsCh, iFilterSCh, srvIdxr)
coreS := services.NewCoreService(cfg, caps, clsCh, cpuPrfF, shdWg, srvIdxr)
cacheS := services.NewCacheService(cfg, dmS, connMgr, clsCh, coreS, srvIdxr)
dspS := services.NewDispatcherService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr)
ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr)
efs := services.NewExportFailoverService(cfg, connMgr, clsCh, srvIdxr)
adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, srvIdxr)
sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr)
attrS := services.NewAttributeService(cfg, dmS, iFilterSCh, clsCh, dspS, srvIdxr)
chrgS := services.NewChargerService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr)
routeS := services.NewRouteService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr)
resourceS := services.NewResourceService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr)
trendS := services.NewTrendService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr)
rankingS := services.NewRankingService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr)
thS := services.NewThresholdService(cfg, dmS, iFilterSCh, connMgr, clsCh, srvDep, srvIdxr)
stS := services.NewStatService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr)
erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, clsCh, srvIdxr)
cls := services.NewCommonListenerService(cfg, caps, srvIdxr)
anzS := services.NewAnalyzerService(cfg, iFilterSCh, srvIdxr)
coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg, srvIdxr)
cacheS := services.NewCacheService(cfg, dmS, connMgr, coreS, srvIdxr)
dspS := services.NewDispatcherService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
efs := services.NewExportFailoverService(cfg, connMgr, srvIdxr)
adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, connMgr, srvIdxr)
sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
attrS := services.NewAttributeService(cfg, dmS, iFilterSCh, dspS, srvIdxr)
chrgS := services.NewChargerService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
routeS := services.NewRouteService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
resourceS := services.NewResourceService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr)
trendS := services.NewTrendService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr)
rankingS := services.NewRankingService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr)
thS := services.NewThresholdService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr)
stS := services.NewStatService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr)
erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, srvIdxr)
dnsAgent := services.NewDNSAgent(cfg, iFilterSCh, connMgr, srvIdxr)
fsAgent := services.NewFreeswitchAgent(cfg, connMgr, srvIdxr)
kamAgent := services.NewKamailioAgent(cfg, connMgr, srvIdxr)
janusAgent := services.NewJanusAgent(cfg, iFilterSCh, clsCh, connMgr, srvIdxr)
janusAgent := services.NewJanusAgent(cfg, iFilterSCh, connMgr, srvIdxr)
astAgent := services.NewAsteriskAgent(cfg, connMgr, srvIdxr)
radAgent := services.NewRadiusAgent(cfg, iFilterSCh, connMgr, srvIdxr)
diamAgent := services.NewDiameterAgent(cfg, iFilterSCh, connMgr, caps, srvIdxr)
httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, clsCh, connMgr, srvIdxr)
httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, connMgr, srvIdxr)
sipAgent := services.NewSIPAgent(cfg, iFilterSCh, connMgr, srvIdxr)
eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, clsCh, srvIdxr)
cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, srvIdxr)
eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, srvIdxr)
cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, connMgr, srvIdxr)
registrarcS := services.NewRegistrarCService(cfg, connMgr, srvIdxr)
rateS := services.NewRateService(cfg, iFilterSCh, dmS, clsCh, srvIdxr)
actionS := services.NewActionService(cfg, dmS, iFilterSCh, connMgr, clsCh, srvIdxr)
accS := services.NewAccountService(cfg, dmS, iFilterSCh, connMgr, clsCh, srvIdxr)
tpeS := services.NewTPeService(cfg, connMgr, dmS, clsCh, srvIdxr)
rateS := services.NewRateService(cfg, iFilterSCh, dmS, srvIdxr)
actionS := services.NewActionService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
accS := services.NewAccountService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
tpeS := services.NewTPeService(cfg, connMgr, dmS, srvIdxr)
srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{
gvS,
@@ -312,9 +311,9 @@ func runCGREngine(fs []string) (err error) {
// Start FilterS
go cgrStartFilterService(ctx, iFilterSCh, cacheS.GetCacheSChan(), connMgr, cfg, dmS)
cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, clsCh, anzS)
cgrInitGuardianSv1(iGuardianSCh, cfg, clsCh, anzS)
cgrInitConfigSv1(iConfigCh, cfg, clsCh, anzS)
cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, cls.CLS(), anzS)
cgrInitGuardianSv1(iGuardianSCh, cfg, cls.CLS(), anzS)
cgrInitConfigSv1(iConfigCh, cfg, cls.CLS(), anzS)
if *flags.Preload != utils.EmptyString {
if err = cgrRunPreload(ctx, cfg, *flags.Preload, srvIdxr); err != nil {
@@ -323,7 +322,7 @@ func runCGREngine(fs []string) (err error) {
}
// Serve rpc connections
cgrStartRPC(ctx, cancel, cfg, clsCh, srvIdxr)
cgrStartRPC(ctx, cancel, cfg, srvIdxr)
// TODO: find a better location for this if block
if *flags.MemPrfDir != "" {
@@ -393,9 +392,7 @@ func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS
}
func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig,
clSChan chan *commonlisteners.CommonListenerS, anz *services.AnalyzerService) {
cl := <-clSChan
clSChan <- cl
cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) {
srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)
if !cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
@@ -407,9 +404,7 @@ func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGR
func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig,
clSChan chan *commonlisteners.CommonListenerS, anz *services.AnalyzerService) {
cl := <-clSChan
clSChan <- cl
cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) {
srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false)
if !cfg.DispatcherSCfg().Enabled {
cl.RpcRegister(srv)
@@ -418,9 +413,7 @@ func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
}
func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector,
cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, anz *services.AnalyzerService) {
cl := <-clSChan
clSChan <- cl
cfg *config.CGRConfig, cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) {
srv, _ := engine.NewServiceWithName(cfg, utils.ConfigS, true)
// srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false)
if !cfg.DispatcherSCfg().Enabled {
@@ -432,9 +425,7 @@ func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector,
}
func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc,
cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS, sIdxr *servmanager.ServiceIndexer) {
cl := <-clSChan
clSChan <- cl
cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer) {
if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this
select {
case <-sIdxr.GetService(utils.DispatcherS).StateChan(utils.StateServiceUP):
@@ -442,6 +433,7 @@ func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc,
return
}
}
cl := sIdxr.GetService(utils.CommonListenerS).(*services.CommonListenerService).CLS()
cl.StartServer(ctx, shtdwnEngine, cfg)
}

View File

@@ -37,14 +37,13 @@ import (
// NewAccountService returns the Account Service
func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &AccountService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
connMgr: connMgr,
clSChan: clSChan,
rldChan: make(chan struct{}, 1),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -55,7 +54,6 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
type AccountService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -77,8 +75,11 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
if acts.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
acts.cl = <-acts.clSChan
acts.clSChan <- acts.cl
cls := acts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CommonListenerS, utils.StateServiceUP)
}
acts.cl = cls.CLS()
cacheS := acts.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AccountS, utils.CacheS, utils.StateServiceUP)

View File

@@ -38,14 +38,12 @@ import (
func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
clSChan chan *commonlisteners.CommonListenerS,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ActionService{
connMgr: connMgr,
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
clSChan: clSChan,
rldChan: make(chan struct{}, 1),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -56,7 +54,6 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
type ActionService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -80,8 +77,11 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
return utils.ErrServiceAlreadyRunning
}
acts.cl = <-acts.clSChan
acts.clSChan <- acts.cl
cls := acts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CommonListenerS, utils.StateServiceUP)
}
acts.cl = cls.CLS()
cacheS := acts.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.CacheS, utils.StateServiceUP)

View File

@@ -34,7 +34,7 @@ import (
// NewAPIerSv1Service returns the APIerSv1 Service
func NewAdminSv1Service(cfg *config.CGRConfig,
dm *DataDBService, storDB *StorDBService,
filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &AdminSv1Service{
@@ -42,7 +42,6 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
dm: dm,
storDB: storDB,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -53,7 +52,6 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
type AdminSv1Service struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
storDB *StorDBService
filterSChan chan *engine.FilterS
@@ -77,8 +75,11 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
return utils.ErrServiceAlreadyRunning
}
apiService.cl = <-apiService.clSChan
apiService.clSChan <- apiService.cl
cls := apiService.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AdminS, utils.CommonListenerS, utils.StateServiceUP)
}
apiService.cl = cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil {
return

View File

@@ -33,12 +33,11 @@ import (
)
// NewAnalyzerService returns the Analyzer Service
func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS,
func NewAnalyzerService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
srvIndexer *servmanager.ServiceIndexer) *AnalyzerService {
return &AnalyzerService{
cfg: cfg,
clSChan: clSChan,
filterSChan: filterSChan,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -49,7 +48,6 @@ func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.Com
type AnalyzerService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
filterSChan chan *engine.FilterS
anz *analyzers.AnalyzerS
@@ -70,8 +68,11 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun
return utils.ErrServiceAlreadyRunning
}
anz.cl = <-anz.clSChan
anz.clSChan <- anz.cl
cls := anz.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), anz.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AnalyzerS, utils.CommonListenerS, utils.StateServiceUP)
}
anz.cl = cls.CLS()
anz.Lock()
defer anz.Unlock()

View File

@@ -35,14 +35,12 @@ import (
// NewAttributeService returns the Attribute Service
func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS,
dspS *DispatcherService,
sIndxr *servmanager.ServiceIndexer) servmanager.Service {
return &AttributeService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
clSChan: clSChan,
dspS: dspS,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
serviceIndexer: sIndxr,
@@ -53,7 +51,6 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
type AttributeService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
dspS *DispatcherService
filterSChan chan *engine.FilterS
@@ -79,8 +76,11 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP)
}
attrS.cl = <-attrS.clSChan
attrS.clSChan <- attrS.cl
cls := attrS.serviceIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP)
}
attrS.cl = cls.CLS()
cacheS := attrS.serviceIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CacheS, utils.StateServiceUP)

View File

@@ -31,13 +31,11 @@ import (
// NewCacheService .
func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager,
clSChan chan *commonlisteners.CommonListenerS,
cores *CoreService,
srvIndexer *servmanager.ServiceIndexer) *CacheService {
return &CacheService{
cfg: cfg,
cores: cores,
clSChan: clSChan,
dm: dm,
connMgr: connMgr,
cacheCh: make(chan *engine.CacheS, 1),
@@ -48,9 +46,8 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C
// CacheService implements Agent interface
type CacheService struct {
cores *CoreService
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
cores *CoreService
dm *DataDBService
cl *commonlisteners.CommonListenerS
@@ -65,8 +62,11 @@ type CacheService struct {
// Start should handle the sercive start
func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (err error) {
cS.cl = <-cS.clSChan
cS.clSChan <- cS.cl
cls := cS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CommonListenerS, utils.StateServiceUP)
}
cS.cl = cls.CLS()
var dm *engine.DataManager
if dm, err = cS.dm.WaitForDM(ctx); err != nil {
return

View File

@@ -36,7 +36,6 @@ import (
// NewCDRServer returns the CDR Server
func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
storDB *StorDBService, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &CDRService{
@@ -44,7 +43,6 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
storDB: storDB,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -55,7 +53,6 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
type CDRService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
storDB *StorDBService
filterSChan chan *engine.FilterS
@@ -80,8 +77,11 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs))
cs.cl = <-cs.clSChan
cs.clSChan <- cs.cl
cls := cs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CDRs, utils.CommonListenerS, utils.StateServiceUP)
}
cs.cl = cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, cs.filterSChan); err != nil {
return

View File

@@ -34,14 +34,13 @@ import (
// NewChargerService returns the Charger Service
func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ChargerService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -52,7 +51,6 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
type ChargerService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -73,8 +71,11 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
return utils.ErrServiceAlreadyRunning
}
chrS.cl = <-chrS.clSChan
chrS.clSChan <- chrS.cl
cls := chrS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.CommonListenerS, utils.StateServiceUP)
}
chrS.cl = cls.CLS()
cacheS := chrS.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.CacheS, utils.StateServiceUP)

View File

@@ -33,12 +33,10 @@ import (
// NewCommonListenerService instantiates a new CommonListenerService.
func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps,
clSChan chan *commonlisteners.CommonListenerS,
srvIndexer *servmanager.ServiceIndexer) *CommonListenerService {
return &CommonListenerService{
cfg: cfg,
caps: caps,
clSChan: clSChan,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
@@ -50,9 +48,8 @@ type CommonListenerService struct {
cls *commonlisteners.CommonListenerS
clSChan chan *commonlisteners.CommonListenerS
caps *engine.Caps
cfg *config.CGRConfig
caps *engine.Caps
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
@@ -67,7 +64,6 @@ func (cl *CommonListenerService) Start(*context.Context, context.CancelFunc) err
cl.mu.Lock()
defer cl.mu.Unlock()
cl.cls = commonlisteners.NewCommonListenerS(cl.caps)
cl.clSChan <- cl.cls
if len(cl.cfg.HTTPCfg().RegistrarSURL) != 0 {
cl.cls.RegisterHTTPFunc(cl.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar)
}
@@ -88,7 +84,6 @@ func (cl *CommonListenerService) Shutdown() error {
cl.mu.Lock()
defer cl.mu.Unlock()
cl.cls = nil
<-cl.clSChan
return nil
}
@@ -118,3 +113,8 @@ func (cl *CommonListenerService) StateChan(stateID string) chan struct{} {
func (cl *CommonListenerService) IntRPCConn() birpc.ClientConnector {
return cl.intRPCconn
}
// CLS returns the CommonListenerS object.
func (cl *CommonListenerService) CLS() *commonlisteners.CommonListenerS {
return cl.cls
}

View File

@@ -34,7 +34,7 @@ import (
)
// NewCoreService returns the Core Service
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS,
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps,
fileCPU *os.File, shdWg *sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) *CoreService {
return &CoreService{
@@ -42,7 +42,6 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *comm
cfg: cfg,
caps: caps,
fileCPU: fileCPU,
clSChan: clSChan,
csCh: make(chan *cores.CoreS, 1),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -53,8 +52,6 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *comm
type CoreService struct {
mu sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
cS *cores.CoreS
cl *commonlisteners.CommonListenerS
@@ -76,8 +73,11 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err
return utils.ErrServiceAlreadyRunning
}
cS.cl = <-cS.clSChan
cS.clSChan <- cS.cl
cls := cS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CoreS, utils.CommonListenerS, utils.StateServiceUP)
}
cS.cl = cls.CLS()
anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CoreS, utils.AnalyzerS, utils.StateServiceUP)

View File

@@ -34,14 +34,12 @@ import (
// NewDispatcherService returns the Dispatcher Service
func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) *DispatcherService {
return &DispatcherService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvsReload: make(map[string]chan struct{}),
srvIndexer: srvIndexer,
@@ -53,7 +51,6 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
type DispatcherService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -75,8 +72,11 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
return utils.ErrServiceAlreadyRunning
}
utils.Logger.Info("Starting CGRateS DispatcherS service.")
dspS.cl = <-dspS.clSChan
dspS.clSChan <- dspS.cl
cls := dspS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.CommonListenerS, utils.StateServiceUP)
}
dspS.cl = cls.CLS()
cacheS := dspS.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.CacheS, utils.StateServiceUP)

View File

@@ -34,13 +34,12 @@ import (
// NewEventExporterService constructs EventExporterService
func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &EventExporterService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
clSChan: clSChan,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
@@ -50,7 +49,6 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil
type EventExporterService struct {
mu sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
filterSChan chan *engine.FilterS
eeS *ees.EeS
@@ -106,8 +104,11 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
return utils.ErrServiceAlreadyRunning
}
es.cl = <-es.clSChan
es.clSChan <- es.cl
cls := es.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.EEs, utils.CommonListenerS, utils.StateServiceUP)
}
es.cl = cls.CLS()
fltrS, err := waitForFilterS(ctx, es.filterSChan)
if err != nil {
return err

View File

@@ -37,8 +37,6 @@ import (
type ExportFailoverService struct {
sync.Mutex
clSChan chan *commonlisteners.CommonListenerS
efS *efs.EfS
cl *commonlisteners.CommonListenerS
srv *birpc.Service
@@ -54,11 +52,9 @@ type ExportFailoverService struct {
// NewExportFailoverService is the constructor for the TpeService
func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
clSChan chan *commonlisteners.CommonListenerS,
srvIndexer *servmanager.ServiceIndexer) *ExportFailoverService {
return &ExportFailoverService{
cfg: cfg,
clSChan: clSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -70,8 +66,11 @@ func (efServ *ExportFailoverService) Start(ctx *context.Context, _ context.Cance
if efServ.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
efServ.cl = <-efServ.clSChan
efServ.clSChan <- efServ.cl
cls := efServ.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), efServ.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.EFs, utils.CommonListenerS, utils.StateServiceUP)
}
efServ.cl = cls.CLS()
efServ.Lock()
efServ.efS = efs.NewEfs(efServ.cfg, efServ.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EFs))

View File

@@ -37,14 +37,12 @@ func NewEventReaderService(
cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
clSChan chan *commonlisteners.CommonListenerS,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &EventReaderService{
rldChan: make(chan struct{}, 1),
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
clSChan: clSChan,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
@@ -54,7 +52,6 @@ func NewEventReaderService(
type EventReaderService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
filterSChan chan *engine.FilterS
ers *ers.ERService
@@ -76,8 +73,11 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel
return utils.ErrServiceAlreadyRunning
}
erS.cl = <-erS.clSChan
erS.clSChan <- erS.cl
cls := erS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ERs, utils.CommonListenerS, utils.StateServiceUP)
}
erS.cl = cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil {
return

View File

@@ -34,12 +34,11 @@ import (
// NewHTTPAgent returns the HTTP Agent
func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &HTTPAgent{
cfg: cfg,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -50,7 +49,6 @@ func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
type HTTPAgent struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
filterSChan chan *engine.FilterS
cl *commonlisteners.CommonListenerS
@@ -73,8 +71,11 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro
return utils.ErrServiceAlreadyRunning
}
cl := <-ha.clSChan
ha.clSChan <- cl
cls := ha.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ha.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.CommonListenerS, utils.StateServiceUP)
}
cl := cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ha.filterSChan); err != nil {
return

View File

@@ -26,7 +26,6 @@ import (
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -35,12 +34,11 @@ import (
// NewJanusAgent returns the Janus Agent
func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &JanusAgent{
cfg: cfg,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -51,7 +49,6 @@ func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
type JanusAgent struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
filterSChan chan *engine.FilterS
jA *agents.JanusAgent
@@ -70,8 +67,11 @@ type JanusAgent struct {
// Start should jandle the sercive start
func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
cl := <-ja.clSChan
ja.clSChan <- cl
cls := ja.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ja.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.CommonListenerS, utils.StateServiceUP)
}
cl := cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ja.filterSChan); err != nil {
return

View File

@@ -34,14 +34,13 @@ import (
// NewLoaderService returns the Loader Service
func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) *LoaderService {
return &LoaderService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
stopChan: make(chan struct{}),
srvIndexer: srvIndexer,
@@ -53,7 +52,6 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
type LoaderService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -75,8 +73,11 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
return utils.ErrServiceAlreadyRunning
}
ldrs.cl = <-ldrs.clSChan
ldrs.clSChan <- ldrs.cl
cls := ldrs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.CommonListenerS, utils.StateServiceUP)
}
ldrs.cl = cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil {
return

View File

@@ -35,7 +35,6 @@ import (
// NewRankingService returns the RankingS Service
func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
@@ -43,7 +42,6 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
@@ -54,7 +52,6 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
type RankingService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -77,8 +74,11 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
}
ran.srvDep[utils.DataDB].Add(1)
ran.cl = <-ran.clSChan
ran.clSChan <- ran.cl
cls := ran.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RankingS, utils.CommonListenerS, utils.StateServiceUP)
}
ran.cl = cls.CLS()
cacheS := ran.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RankingS, utils.CacheS, utils.StateServiceUP)

View File

@@ -34,13 +34,12 @@ import (
// NewRateService constructs RateService
func NewRateService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
dmS *DataDBService, clSChan chan *commonlisteners.CommonListenerS,
dmS *DataDBService,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RateService{
cfg: cfg,
filterSChan: filterSChan,
dmS: dmS,
clSChan: clSChan,
rldChan: make(chan struct{}),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -51,7 +50,6 @@ func NewRateService(cfg *config.CGRConfig,
type RateService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dmS *DataDBService
filterSChan chan *engine.FilterS
@@ -107,8 +105,11 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
return utils.ErrServiceAlreadyRunning
}
rs.cl = <-rs.clSChan
rs.clSChan <- rs.cl
cls := rs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RateS, utils.CommonListenerS, utils.StateServiceUP)
}
rs.cl = cls.CLS()
cacheS := rs.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RateS, utils.CacheS, utils.StateServiceUP)

View File

@@ -34,7 +34,6 @@ import (
// NewResourceService returns the Resource Service
func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
@@ -42,7 +41,6 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
@@ -54,7 +52,6 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
type ResourceService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -77,8 +74,11 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
}
reS.srvDep[utils.DataDB].Add(1)
reS.cl = <-reS.clSChan
reS.clSChan <- reS.cl
cls := reS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.CommonListenerS, utils.StateServiceUP)
}
reS.cl = cls.CLS()
cacheS := reS.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.CacheS, utils.StateServiceUP)

View File

@@ -35,14 +35,12 @@ import (
// NewRouteService returns the Route Service
func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RouteService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -53,7 +51,6 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
type RouteService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -74,8 +71,11 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
return utils.ErrServiceAlreadyRunning
}
routeS.cl = <-routeS.clSChan
routeS.clSChan <- routeS.cl
cls := routeS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RouteS, utils.CommonListenerS, utils.StateServiceUP)
}
routeS.cl = cls.CLS()
cacheS := routeS.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RouteS, utils.CacheS, utils.StateServiceUP)

View File

@@ -36,14 +36,12 @@ import (
// NewSessionService returns the Session Service
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &SessionService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -54,7 +52,6 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha
type SessionService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -77,8 +74,11 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
return utils.ErrServiceAlreadyRunning
}
smg.cl = <-smg.clSChan
smg.clSChan <- smg.cl
cls := smg.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.SessionS, utils.CommonListenerS, utils.StateServiceUP)
}
smg.cl = cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil {
return

View File

@@ -34,7 +34,6 @@ import (
// NewStatService returns the Stat Service
func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
@@ -42,7 +41,6 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
@@ -54,7 +52,6 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
type StatService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -77,8 +74,11 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
}
sts.srvDep[utils.DataDB].Add(1)
sts.cl = <-sts.clSChan
sts.clSChan <- sts.cl
cls := sts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.StatS, utils.CommonListenerS, utils.StateServiceUP)
}
sts.cl = cls.CLS()
cacheS := sts.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.StatS, utils.CacheS, utils.StateServiceUP)

View File

@@ -35,14 +35,12 @@ import (
func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
clSChan chan *commonlisteners.CommonListenerS,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ThresholdService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
clSChan: clSChan,
srvDep: srvDep,
connMgr: connMgr,
srvIndexer: srvIndexer,
@@ -54,7 +52,6 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
type ThresholdService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -77,8 +74,11 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
}
thrs.srvDep[utils.DataDB].Add(1)
thrs.cl = <-thrs.clSChan
thrs.clSChan <- thrs.cl
cls := thrs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.CommonListenerS, utils.StateServiceUP)
}
thrs.cl = cls.CLS()
cacheS := thrs.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.CacheS, utils.StateServiceUP)

View File

@@ -34,13 +34,11 @@ import (
// NewTPeService is the constructor for the TpeService
func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService,
clSChan chan *commonlisteners.CommonListenerS,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &TPeService{
cfg: cfg,
dm: dm,
connMgr: connMgr,
clSChan: clSChan,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
@@ -50,8 +48,7 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataD
type TPeService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
dm *DataDBService
tpes *tpes.TPeS
cl *commonlisteners.CommonListenerS
@@ -68,8 +65,11 @@ type TPeService struct {
// Start should handle the service start
func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
ts.cl = <-ts.clSChan
ts.clSChan <- ts.cl
cls := ts.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), ts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TPeS, utils.CommonListenerS, utils.StateServiceUP)
}
ts.cl = cls.CLS()
var datadb *engine.DataManager
if datadb, err = ts.dm.WaitForDM(ctx); err != nil {
return

View File

@@ -34,14 +34,12 @@ import (
// NewTrendsService returns the TrendS Service
func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &TrendService{
cfg: cfg,
dm: dm,
clSChan: clSChan,
connMgr: connMgr,
srvDep: srvDep,
filterSChan: filterSChan,
@@ -53,7 +51,6 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
type TrendService struct {
sync.RWMutex
clSChan chan *commonlisteners.CommonListenerS
dm *DataDBService
filterSChan chan *engine.FilterS
@@ -76,8 +73,11 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
}
trs.srvDep[utils.DataDB].Add(1)
trs.cl = <-trs.clSChan
trs.clSChan <- trs.cl
cls := trs.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TrendS, utils.CommonListenerS, utils.StateServiceUP)
}
trs.cl = cls.CLS()
cacheS := trs.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TrendS, utils.CacheS, utils.StateServiceUP)