mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Remove anzCh in favor of using the service indexer
This commit is contained in:
committed by
Dan Christian Bogos
parent
0783984bfe
commit
77af4f95b3
@@ -135,7 +135,6 @@ func runCGREngine(fs []string) (err error) {
|
||||
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, iGuardianSCh)
|
||||
|
||||
clsCh := make(chan *commonlisteners.CommonListenerS, 1)
|
||||
anzCh := make(chan *services.AnalyzerService, 1)
|
||||
iFilterSCh := make(chan *engine.FilterS, 1)
|
||||
|
||||
// ServiceIndexer will share service references to all services
|
||||
@@ -144,23 +143,23 @@ func runCGREngine(fs []string) (err error) {
|
||||
dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep, srvIdxr)
|
||||
sdbS := services.NewStorDBService(cfg, *flags.SetVersions, srvIdxr)
|
||||
cls := services.NewCommonListenerService(cfg, caps, clsCh, srvIdxr)
|
||||
anzS := services.NewAnalyzerService(cfg, clsCh, iFilterSCh, anzCh, srvIdxr)
|
||||
coreS := services.NewCoreService(cfg, caps, clsCh, anzCh, cpuPrfF, shdWg, srvIdxr)
|
||||
cacheS := services.NewCacheService(cfg, dmS, connMgr, clsCh, anzCh, coreS, srvIdxr)
|
||||
dspS := services.NewDispatcherService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr)
|
||||
ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr)
|
||||
anzS := services.NewAnalyzerService(cfg, clsCh, iFilterSCh, srvIdxr)
|
||||
coreS := services.NewCoreService(cfg, caps, clsCh, cpuPrfF, shdWg, srvIdxr)
|
||||
cacheS := services.NewCacheService(cfg, dmS, connMgr, clsCh, coreS, srvIdxr)
|
||||
dspS := services.NewDispatcherService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvIdxr)
|
||||
ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr)
|
||||
efs := services.NewExportFailoverService(cfg, connMgr, clsCh, srvIdxr)
|
||||
adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr)
|
||||
sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr)
|
||||
attrS := services.NewAttributeService(cfg, dmS, cacheS, iFilterSCh, clsCh, anzCh, dspS, srvIdxr)
|
||||
chrgS := services.NewChargerService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr)
|
||||
routeS := services.NewRouteService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr)
|
||||
resourceS := services.NewResourceService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr)
|
||||
trendS := services.NewTrendService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr)
|
||||
rankingS := services.NewRankingService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr)
|
||||
thS := services.NewThresholdService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, anzCh, srvDep, srvIdxr)
|
||||
stS := services.NewStatService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, anzCh, srvDep, srvIdxr)
|
||||
erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, clsCh, anzCh, srvIdxr)
|
||||
adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, srvIdxr)
|
||||
sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, clsCh, connMgr, srvIdxr)
|
||||
attrS := services.NewAttributeService(cfg, dmS, cacheS, iFilterSCh, clsCh, dspS, srvIdxr)
|
||||
chrgS := services.NewChargerService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvIdxr)
|
||||
routeS := services.NewRouteService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvIdxr)
|
||||
resourceS := services.NewResourceService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr)
|
||||
trendS := services.NewTrendService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr)
|
||||
rankingS := services.NewRankingService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr)
|
||||
thS := services.NewThresholdService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, srvDep, srvIdxr)
|
||||
stS := services.NewStatService(cfg, dmS, cacheS, iFilterSCh, clsCh, connMgr, srvDep, srvIdxr)
|
||||
erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, clsCh, srvIdxr)
|
||||
dnsAgent := services.NewDNSAgent(cfg, iFilterSCh, connMgr, srvIdxr)
|
||||
fsAgent := services.NewFreeswitchAgent(cfg, connMgr, srvIdxr)
|
||||
kamAgent := services.NewKamailioAgent(cfg, connMgr, srvIdxr)
|
||||
@@ -170,12 +169,12 @@ func runCGREngine(fs []string) (err error) {
|
||||
diamAgent := services.NewDiameterAgent(cfg, iFilterSCh, connMgr, caps, srvIdxr)
|
||||
httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, clsCh, connMgr, srvIdxr)
|
||||
sipAgent := services.NewSIPAgent(cfg, iFilterSCh, connMgr, srvIdxr)
|
||||
eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, clsCh, anzCh, srvIdxr)
|
||||
cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, anzCh, srvIdxr)
|
||||
eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, clsCh, srvIdxr)
|
||||
cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, clsCh, connMgr, srvIdxr)
|
||||
registrarcS := services.NewRegistrarCService(cfg, connMgr, srvIdxr)
|
||||
rateS := services.NewRateService(cfg, cacheS, iFilterSCh, dmS, clsCh, anzCh, srvIdxr)
|
||||
actionS := services.NewActionService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, anzCh, srvIdxr)
|
||||
accS := services.NewAccountService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, anzCh, srvIdxr)
|
||||
rateS := services.NewRateService(cfg, cacheS, iFilterSCh, dmS, clsCh, srvIdxr)
|
||||
actionS := services.NewActionService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, srvIdxr)
|
||||
accS := services.NewAccountService(cfg, dmS, cacheS, iFilterSCh, connMgr, clsCh, srvIdxr)
|
||||
tpeS := services.NewTPeService(cfg, connMgr, dmS, clsCh, srvIdxr)
|
||||
|
||||
srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{
|
||||
@@ -294,7 +293,7 @@ func runCGREngine(fs []string) (err error) {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
anzCh <- anzS
|
||||
close(anzS.StateChan(utils.StateServiceUP))
|
||||
}
|
||||
|
||||
shdWg.Add(1)
|
||||
@@ -339,7 +338,6 @@ func runCGREngine(fs []string) (err error) {
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
//<-stopChan
|
||||
return
|
||||
}
|
||||
|
||||
@@ -464,7 +462,7 @@ func handleSignals(ctx *context.Context, shutdown context.CancelFunc,
|
||||
shdWg.Done()
|
||||
return
|
||||
case <-reloadSignal:
|
||||
// do it in it's own goroutine in order to not block the signal handler with the reload functionality
|
||||
// do it in its own goroutine in order to not block the signal handler with the reload functionality
|
||||
go func() {
|
||||
var reply string
|
||||
if err := cfg.V1ReloadConfig(ctx,
|
||||
|
||||
@@ -38,7 +38,6 @@ import (
|
||||
func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS,
|
||||
connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS,
|
||||
anzChan chan *AnalyzerService,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &AccountService{
|
||||
cfg: cfg,
|
||||
@@ -47,7 +46,6 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan: filterSChan,
|
||||
connMgr: connMgr,
|
||||
clSChan: clSChan,
|
||||
anzChan: anzChan,
|
||||
rldChan: make(chan struct{}, 1),
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
@@ -61,7 +59,6 @@ type AccountService struct {
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
cacheS *CacheService
|
||||
anzChan chan *AnalyzerService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
acts *accounts.AccountS
|
||||
@@ -97,8 +94,10 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
|
||||
if datadb, err = acts.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-acts.anzChan
|
||||
acts.anzChan <- anz
|
||||
anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.AccountS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
acts.Lock()
|
||||
defer acts.Unlock()
|
||||
|
||||
@@ -39,7 +39,6 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS,
|
||||
connMgr *engine.ConnManager,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
anzChan chan *AnalyzerService,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &ActionService{
|
||||
connMgr: connMgr,
|
||||
@@ -48,7 +47,6 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS: cacheS,
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
anzChan: anzChan,
|
||||
rldChan: make(chan struct{}, 1),
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
@@ -61,7 +59,6 @@ type ActionService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
anzChan chan *AnalyzerService
|
||||
cacheS *CacheService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
@@ -100,8 +97,10 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
|
||||
if datadb, err = acts.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-acts.anzChan
|
||||
acts.anzChan <- anz
|
||||
anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
acts.Lock()
|
||||
defer acts.Unlock()
|
||||
|
||||
@@ -35,7 +35,7 @@ import (
|
||||
func NewAdminSv1Service(cfg *config.CGRConfig,
|
||||
dm *DataDBService, storDB *StorDBService,
|
||||
filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
|
||||
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
|
||||
connMgr *engine.ConnManager,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &AdminSv1Service{
|
||||
cfg: cfg,
|
||||
@@ -44,7 +44,6 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
connMgr: connMgr,
|
||||
anzChan: anzChan,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
}
|
||||
@@ -57,7 +56,6 @@ type AdminSv1Service struct {
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
storDB *StorDBService
|
||||
anzChan chan *AnalyzerService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
api *apis.AdminSv1
|
||||
@@ -89,8 +87,10 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
|
||||
if datadb, err = apiService.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-apiService.anzChan
|
||||
apiService.anzChan <- anz
|
||||
anz := apiService.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.AdminS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
storDBChan := make(chan engine.StorDB, 1)
|
||||
apiService.stopChan = make(chan struct{})
|
||||
|
||||
@@ -35,13 +35,11 @@ import (
|
||||
// NewAnalyzerService returns the Analyzer Service
|
||||
func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.CommonListenerS,
|
||||
filterSChan chan *engine.FilterS,
|
||||
anzChan chan *AnalyzerService,
|
||||
srvIndexer *servmanager.ServiceIndexer) *AnalyzerService {
|
||||
return &AnalyzerService{
|
||||
cfg: cfg,
|
||||
clSChan: clSChan,
|
||||
filterSChan: filterSChan,
|
||||
anzChan: anzChan,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
}
|
||||
@@ -53,7 +51,6 @@ type AnalyzerService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
filterSChan chan *engine.FilterS
|
||||
anzChan chan *AnalyzerService
|
||||
|
||||
anz *analyzers.AnalyzerS
|
||||
cl *commonlisteners.CommonListenerS
|
||||
@@ -82,7 +79,6 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error()))
|
||||
return
|
||||
}
|
||||
anz.anzChan <- anz
|
||||
anzCtx, cancel := context.WithCancel(ctx)
|
||||
anz.cancelFunc = cancel
|
||||
go func(a *analyzers.AnalyzerS) {
|
||||
@@ -129,12 +125,6 @@ func (anz *AnalyzerService) Shutdown() (err error) {
|
||||
anz.Lock()
|
||||
anz.cancelFunc()
|
||||
anz.cl.SetAnalyzer(nil)
|
||||
|
||||
// Close the channel before making it nil to prevent stale goroutines
|
||||
// in case there are other services waiting on AnalyzerS.
|
||||
close(anz.anzChan)
|
||||
|
||||
anz.anzChan = nil
|
||||
anz.anz.Shutdown()
|
||||
anz.anz = nil
|
||||
anz.Unlock()
|
||||
|
||||
@@ -36,7 +36,7 @@ import (
|
||||
func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
anzChan chan *AnalyzerService, dspS *DispatcherService,
|
||||
dspS *DispatcherService,
|
||||
sIndxr *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &AttributeService{
|
||||
cfg: cfg,
|
||||
@@ -44,7 +44,6 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS: cacheS,
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
anzChan: anzChan,
|
||||
dspS: dspS,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
serviceIndexer: sIndxr,
|
||||
@@ -57,7 +56,6 @@ type AttributeService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
anzChan chan *AnalyzerService
|
||||
cacheS *CacheService
|
||||
dspS *DispatcherService
|
||||
filterSChan chan *engine.FilterS
|
||||
@@ -98,8 +96,10 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
|
||||
if datadb, err = attrS.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-attrS.anzChan
|
||||
attrS.anzChan <- anz
|
||||
anz := attrS.serviceIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
attrS.Lock()
|
||||
defer attrS.Unlock()
|
||||
|
||||
@@ -32,12 +32,10 @@ import (
|
||||
// NewCacheService .
|
||||
func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
anzChan chan *AnalyzerService, // dspS *DispatcherService,
|
||||
cores *CoreService,
|
||||
srvIndexer *servmanager.ServiceIndexer) *CacheService {
|
||||
return &CacheService{
|
||||
cfg: cfg,
|
||||
anzChan: anzChan,
|
||||
cores: cores,
|
||||
clSChan: clSChan,
|
||||
dm: dm,
|
||||
@@ -50,7 +48,6 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C
|
||||
|
||||
// CacheService implements Agent interface
|
||||
type CacheService struct {
|
||||
anzChan chan *AnalyzerService
|
||||
cores *CoreService
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
@@ -75,8 +72,10 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e
|
||||
return
|
||||
}
|
||||
|
||||
anz := <-cS.anzChan
|
||||
cS.anzChan <- anz
|
||||
anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.CacheS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
var cs *cores.CoreS
|
||||
if cs, err = cS.cores.WaitForCoreS(ctx); err != nil {
|
||||
return
|
||||
|
||||
@@ -37,7 +37,7 @@ import (
|
||||
func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
|
||||
storDB *StorDBService, filterSChan chan *engine.FilterS,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
|
||||
connMgr *engine.ConnManager,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &CDRService{
|
||||
cfg: cfg,
|
||||
@@ -46,7 +46,6 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
connMgr: connMgr,
|
||||
anzChan: anzChan,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
}
|
||||
@@ -59,7 +58,6 @@ type CDRService struct {
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
storDB *StorDBService
|
||||
anzChan chan *AnalyzerService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
cdrS *cdrs.CDRServer
|
||||
@@ -92,8 +90,10 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err
|
||||
if datadb, err = cs.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-cs.anzChan
|
||||
cs.anzChan <- anz
|
||||
anz := cs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.CDRs, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
storDBChan := make(chan engine.StorDB, 1)
|
||||
cs.stopChan = make(chan struct{})
|
||||
|
||||
@@ -36,7 +36,6 @@ import (
|
||||
func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
|
||||
connMgr *engine.ConnManager,
|
||||
anzChan chan *AnalyzerService,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &ChargerService{
|
||||
cfg: cfg,
|
||||
@@ -45,7 +44,6 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
connMgr: connMgr,
|
||||
anzChan: anzChan,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
}
|
||||
@@ -58,7 +56,6 @@ type ChargerService struct {
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
cacheS *CacheService
|
||||
anzChan chan *AnalyzerService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
chrS *engine.ChargerS
|
||||
@@ -93,8 +90,10 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
|
||||
if datadb, err = chrS.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-chrS.anzChan
|
||||
chrS.anzChan <- anz
|
||||
anz := chrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
chrS.Lock()
|
||||
defer chrS.Unlock()
|
||||
|
||||
@@ -35,7 +35,6 @@ import (
|
||||
|
||||
// NewCoreService returns the Core Service
|
||||
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS,
|
||||
anzChan chan *AnalyzerService,
|
||||
fileCPU *os.File, shdWg *sync.WaitGroup,
|
||||
srvIndexer *servmanager.ServiceIndexer) *CoreService {
|
||||
return &CoreService{
|
||||
@@ -44,7 +43,6 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *comm
|
||||
caps: caps,
|
||||
fileCPU: fileCPU,
|
||||
clSChan: clSChan,
|
||||
anzChan: anzChan,
|
||||
csCh: make(chan *cores.CoreS, 1),
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
@@ -55,7 +53,6 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *comm
|
||||
type CoreService struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
anzChan chan *AnalyzerService
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
|
||||
cS *cores.CoreS
|
||||
@@ -81,8 +78,10 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err
|
||||
|
||||
cS.cl = <-cS.clSChan
|
||||
cS.clSChan <- cS.cl
|
||||
anz := <-cS.anzChan
|
||||
cS.anzChan <- anz
|
||||
anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.CoreS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
cS.mu.Lock()
|
||||
defer cS.mu.Unlock()
|
||||
|
||||
@@ -35,7 +35,7 @@ import (
|
||||
func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
|
||||
connMgr *engine.ConnManager,
|
||||
srvIndexer *servmanager.ServiceIndexer) *DispatcherService {
|
||||
return &DispatcherService{
|
||||
cfg: cfg,
|
||||
@@ -44,7 +44,6 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
connMgr: connMgr,
|
||||
anzChan: anzChan,
|
||||
srvsReload: make(map[string]chan struct{}),
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
@@ -57,7 +56,6 @@ type DispatcherService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
anzChan chan *AnalyzerService
|
||||
cacheS *CacheService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
@@ -95,8 +93,10 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
|
||||
if datadb, err = dspS.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-dspS.anzChan
|
||||
dspS.anzChan <- anz
|
||||
anz := dspS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
dspS.Lock()
|
||||
defer dspS.Unlock()
|
||||
|
||||
@@ -35,14 +35,12 @@ import (
|
||||
// NewEventExporterService constructs EventExporterService
|
||||
func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
|
||||
connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS,
|
||||
anzChan chan *AnalyzerService,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &EventExporterService{
|
||||
cfg: cfg,
|
||||
filterSChan: filterSChan,
|
||||
connMgr: connMgr,
|
||||
clSChan: clSChan,
|
||||
anzChan: anzChan,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
}
|
||||
@@ -53,7 +51,6 @@ type EventExporterService struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
anzChan chan *AnalyzerService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
eeS *ees.EeS
|
||||
@@ -115,8 +112,10 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
anz := <-es.anzChan
|
||||
es.anzChan <- anz
|
||||
anz := es.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.EEs, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EEs))
|
||||
|
||||
|
||||
@@ -38,7 +38,6 @@ func NewEventReaderService(
|
||||
filterSChan chan *engine.FilterS,
|
||||
connMgr *engine.ConnManager,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
anzChan chan *AnalyzerService,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &EventReaderService{
|
||||
rldChan: make(chan struct{}, 1),
|
||||
@@ -46,7 +45,6 @@ func NewEventReaderService(
|
||||
filterSChan: filterSChan,
|
||||
connMgr: connMgr,
|
||||
clSChan: clSChan,
|
||||
anzChan: anzChan,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
}
|
||||
@@ -57,7 +55,6 @@ type EventReaderService struct {
|
||||
sync.RWMutex
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
anzChan chan *AnalyzerService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
ers *ers.ERService
|
||||
@@ -85,8 +82,10 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel
|
||||
if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-erS.anzChan
|
||||
erS.anzChan <- anz
|
||||
anz := erS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.ERs, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
erS.Lock()
|
||||
defer erS.Unlock()
|
||||
|
||||
@@ -35,7 +35,7 @@ import (
|
||||
// NewLoaderService returns the Loader Service
|
||||
func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
|
||||
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
|
||||
connMgr *engine.ConnManager,
|
||||
srvIndexer *servmanager.ServiceIndexer) *LoaderService {
|
||||
return &LoaderService{
|
||||
cfg: cfg,
|
||||
@@ -44,7 +44,6 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
clSChan: clSChan,
|
||||
connMgr: connMgr,
|
||||
stopChan: make(chan struct{}),
|
||||
anzChan: anzChan,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
}
|
||||
@@ -56,7 +55,6 @@ type LoaderService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
anzChan chan *AnalyzerService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
ldrs *loaders.LoaderS
|
||||
@@ -87,8 +85,10 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
|
||||
if datadb, err = ldrs.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-ldrs.anzChan
|
||||
ldrs.anzChan <- anz
|
||||
anz := ldrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
ldrs.Lock()
|
||||
defer ldrs.Unlock()
|
||||
|
||||
@@ -36,7 +36,7 @@ import (
|
||||
func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
|
||||
connMgr *engine.ConnManager,
|
||||
srvDep map[string]*sync.WaitGroup,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &RankingService{
|
||||
@@ -46,7 +46,6 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
connMgr: connMgr,
|
||||
anzChan: anzChan,
|
||||
srvDep: srvDep,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
@@ -58,7 +57,6 @@ type RankingService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
anzChan chan *AnalyzerService
|
||||
cacheS *CacheService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
@@ -97,8 +95,10 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
|
||||
if filterS, err = waitForFilterS(ctx, ran.filterSChan); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-ran.anzChan
|
||||
ran.anzChan <- anz
|
||||
anz := ran.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.RankingS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
ran.Lock()
|
||||
defer ran.Unlock()
|
||||
|
||||
@@ -35,7 +35,6 @@ import (
|
||||
func NewRateService(cfg *config.CGRConfig,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS,
|
||||
dmS *DataDBService, clSChan chan *commonlisteners.CommonListenerS,
|
||||
anzChan chan *AnalyzerService,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &RateService{
|
||||
cfg: cfg,
|
||||
@@ -44,7 +43,6 @@ func NewRateService(cfg *config.CGRConfig,
|
||||
dmS: dmS,
|
||||
clSChan: clSChan,
|
||||
rldChan: make(chan struct{}),
|
||||
anzChan: anzChan,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
}
|
||||
@@ -55,7 +53,6 @@ type RateService struct {
|
||||
sync.RWMutex
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
anzChan chan *AnalyzerService
|
||||
dmS *DataDBService
|
||||
cacheS *CacheService
|
||||
filterSChan chan *engine.FilterS
|
||||
@@ -128,8 +125,10 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
|
||||
if datadb, err = rs.dmS.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-rs.anzChan
|
||||
rs.anzChan <- anz
|
||||
anz := rs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.RateS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
rs.Lock()
|
||||
rs.rateS = rates.NewRateS(rs.cfg, filterS, datadb)
|
||||
|
||||
@@ -35,7 +35,7 @@ import (
|
||||
func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
|
||||
connMgr *engine.ConnManager,
|
||||
srvDep map[string]*sync.WaitGroup,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &ResourceService{
|
||||
@@ -45,7 +45,6 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
connMgr: connMgr,
|
||||
anzChan: anzChan,
|
||||
srvDep: srvDep,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
@@ -58,7 +57,6 @@ type ResourceService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
anzChan chan *AnalyzerService
|
||||
cacheS *CacheService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
@@ -97,8 +95,10 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
|
||||
if datadb, err = reS.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-reS.anzChan
|
||||
reS.anzChan <- anz
|
||||
anz := reS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
reS.Lock()
|
||||
defer reS.Unlock()
|
||||
|
||||
@@ -36,7 +36,7 @@ import (
|
||||
func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
|
||||
connMgr *engine.ConnManager,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &RouteService{
|
||||
cfg: cfg,
|
||||
@@ -45,7 +45,6 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
connMgr: connMgr,
|
||||
anzChan: anzChan,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
}
|
||||
@@ -57,7 +56,6 @@ type RouteService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
anzChan chan *AnalyzerService
|
||||
cacheS *CacheService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
@@ -93,8 +91,10 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
|
||||
if datadb, err = routeS.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-routeS.anzChan
|
||||
routeS.anzChan <- anz
|
||||
anz := routeS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.RouteS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
routeS.Lock()
|
||||
defer routeS.Unlock()
|
||||
|
||||
@@ -37,7 +37,7 @@ import (
|
||||
// NewSessionService returns the Session Service
|
||||
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
|
||||
connMgr *engine.ConnManager,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &SessionService{
|
||||
cfg: cfg,
|
||||
@@ -45,7 +45,6 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
connMgr: connMgr,
|
||||
anzChan: anzChan,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
}
|
||||
@@ -57,7 +56,6 @@ type SessionService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
anzChan chan *AnalyzerService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
sm *sessions.SessionS
|
||||
@@ -89,8 +87,10 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
|
||||
if datadb, err = smg.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-smg.anzChan
|
||||
smg.anzChan <- anz
|
||||
anz := smg.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.SessionS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
smg.Lock()
|
||||
defer smg.Unlock()
|
||||
|
||||
@@ -35,7 +35,7 @@ import (
|
||||
func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
|
||||
connMgr *engine.ConnManager,
|
||||
srvDep map[string]*sync.WaitGroup,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &StatService{
|
||||
@@ -45,7 +45,6 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
connMgr: connMgr,
|
||||
anzChan: anzChan,
|
||||
srvDep: srvDep,
|
||||
srvIndexer: srvIndexer,
|
||||
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
|
||||
@@ -58,7 +57,6 @@ type StatService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
anzChan chan *AnalyzerService
|
||||
cacheS *CacheService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
@@ -97,8 +95,10 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
|
||||
if datadb, err = sts.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-sts.anzChan
|
||||
sts.anzChan <- anz
|
||||
anz := sts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.StatS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
sts.Lock()
|
||||
defer sts.Unlock()
|
||||
|
||||
@@ -36,7 +36,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS,
|
||||
connMgr *engine.ConnManager,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup,
|
||||
srvDep map[string]*sync.WaitGroup,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &ThresholdService{
|
||||
cfg: cfg,
|
||||
@@ -44,7 +44,6 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS: cacheS,
|
||||
filterSChan: filterSChan,
|
||||
clSChan: clSChan,
|
||||
anzChan: anzChan,
|
||||
srvDep: srvDep,
|
||||
connMgr: connMgr,
|
||||
srvIndexer: srvIndexer,
|
||||
@@ -58,7 +57,6 @@ type ThresholdService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
anzChan chan *AnalyzerService
|
||||
cacheS *CacheService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
@@ -97,8 +95,10 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
|
||||
if datadb, err = thrs.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-thrs.anzChan
|
||||
thrs.anzChan <- anz
|
||||
anz := thrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
thrs.Lock()
|
||||
defer thrs.Unlock()
|
||||
|
||||
@@ -35,7 +35,7 @@ import (
|
||||
func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *CacheService, filterSChan chan *engine.FilterS,
|
||||
clSChan chan *commonlisteners.CommonListenerS,
|
||||
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
|
||||
connMgr *engine.ConnManager,
|
||||
srvDep map[string]*sync.WaitGroup,
|
||||
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
|
||||
return &TrendService{
|
||||
@@ -44,7 +44,6 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS: cacheS,
|
||||
clSChan: clSChan,
|
||||
connMgr: connMgr,
|
||||
anzChan: anzChan,
|
||||
srvDep: srvDep,
|
||||
filterSChan: filterSChan,
|
||||
srvIndexer: srvIndexer,
|
||||
@@ -57,7 +56,6 @@ type TrendService struct {
|
||||
|
||||
clSChan chan *commonlisteners.CommonListenerS
|
||||
dm *DataDBService
|
||||
anzChan chan *AnalyzerService
|
||||
cacheS *CacheService
|
||||
filterSChan chan *engine.FilterS
|
||||
|
||||
@@ -96,8 +94,10 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
|
||||
if filterS, err = waitForFilterS(ctx, trs.filterSChan); err != nil {
|
||||
return
|
||||
}
|
||||
anz := <-trs.anzChan
|
||||
trs.anzChan <- anz
|
||||
anz := trs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
|
||||
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
|
||||
return utils.NewServiceStateTimeoutError(utils.TrendS, utils.AnalyzerS, utils.StateServiceUP)
|
||||
}
|
||||
|
||||
trs.Lock()
|
||||
defer trs.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user