diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 0af0389cb..1f9d16fc8 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -144,20 +144,20 @@ func runCGREngine(fs []string) (err error) { cls := services.NewCommonListenerService(cfg, caps, srvIdxr) anzS := services.NewAnalyzerService(cfg, iFilterSCh, srvIdxr) coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg, srvIdxr) - cacheS := services.NewCacheService(cfg, dmS, connMgr, coreS, srvIdxr) - dspS := services.NewDispatcherService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) - ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) + cacheS := services.NewCacheService(cfg, connMgr, coreS, srvIdxr) + dspS := services.NewDispatcherService(cfg, iFilterSCh, connMgr, srvIdxr) + ldrs := services.NewLoaderService(cfg, iFilterSCh, connMgr, srvIdxr) efs := services.NewExportFailoverService(cfg, connMgr, srvIdxr) - adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, connMgr, srvIdxr) - sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) - attrS := services.NewAttributeService(cfg, dmS, iFilterSCh, dspS, srvIdxr) - chrgS := services.NewChargerService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) - routeS := services.NewRouteService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) - resourceS := services.NewResourceService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr) - trendS := services.NewTrendService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr) - rankingS := services.NewRankingService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr) - thS := services.NewThresholdService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr) - stS := services.NewStatService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr) + adminS := services.NewAdminSv1Service(cfg, sdbS, iFilterSCh, connMgr, srvIdxr) + sessionS := services.NewSessionService(cfg, iFilterSCh, connMgr, srvIdxr) + attrS := services.NewAttributeService(cfg, iFilterSCh, dspS, srvIdxr) + chrgS := services.NewChargerService(cfg, iFilterSCh, connMgr, srvIdxr) + routeS := services.NewRouteService(cfg, iFilterSCh, connMgr, srvIdxr) + resourceS := services.NewResourceService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) + trendS := services.NewTrendService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) + rankingS := services.NewRankingService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) + thS := services.NewThresholdService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) + stS := services.NewStatService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, srvIdxr) dnsAgent := services.NewDNSAgent(cfg, iFilterSCh, connMgr, srvIdxr) fsAgent := services.NewFreeswitchAgent(cfg, connMgr, srvIdxr) @@ -169,12 +169,12 @@ func runCGREngine(fs []string) (err error) { httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, connMgr, srvIdxr) sipAgent := services.NewSIPAgent(cfg, iFilterSCh, connMgr, srvIdxr) eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, srvIdxr) - cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, connMgr, srvIdxr) + cdrS := services.NewCDRServer(cfg, sdbS, iFilterSCh, connMgr, srvIdxr) registrarcS := services.NewRegistrarCService(cfg, connMgr, srvIdxr) - rateS := services.NewRateService(cfg, iFilterSCh, dmS, srvIdxr) - actionS := services.NewActionService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) - accS := services.NewAccountService(cfg, dmS, iFilterSCh, connMgr, srvIdxr) - tpeS := services.NewTPeService(cfg, connMgr, dmS, srvIdxr) + rateS := services.NewRateService(cfg, iFilterSCh, srvIdxr) + actionS := services.NewActionService(cfg, iFilterSCh, connMgr, srvIdxr) + accS := services.NewAccountService(cfg, iFilterSCh, connMgr, srvIdxr) + tpeS := services.NewTPeService(cfg, connMgr, srvIdxr) srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{ gvS, @@ -380,13 +380,9 @@ func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS case <-ctx.Done(): return } - dm, err := db.WaitForDM(ctx) - if err != nil { - return - } select { case <-cacheS.GetPrecacheChannel(utils.CacheFilters): - iFilterSCh <- engine.NewFilterS(cfg, connMgr, dm) + iFilterSCh <- engine.NewFilterS(cfg, connMgr, db.DataManager()) case <-ctx.Done(): } } diff --git a/services/accounts.go b/services/accounts.go index 7860abcfe..bf68b8e92 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -35,13 +35,12 @@ import ( ) // NewAccountService returns the Account Service -func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, +func NewAccountService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AccountService{ cfg: cfg, - dm: dm, filterSChan: filterSChan, connMgr: connMgr, rldChan: make(chan struct{}, 1), @@ -54,7 +53,6 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, type AccountService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS acts *accounts.AccountS @@ -93,9 +91,9 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = acts.dm.WaitForDM(ctx); err != nil { - return + dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AccountS, utils.DataDB, utils.StateServiceUP) } anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { @@ -104,7 +102,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e acts.Lock() defer acts.Unlock() - acts.acts = accounts.NewAccountS(acts.cfg, filterS, acts.connMgr, datadb) + acts.acts = accounts.NewAccountS(acts.cfg, filterS, acts.connMgr, dbs.DataManager()) acts.stopChan = make(chan struct{}) go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) diff --git a/services/actions.go b/services/actions.go index dc44726d9..b31a35d19 100644 --- a/services/actions.go +++ b/services/actions.go @@ -35,14 +35,13 @@ import ( ) // NewActionService returns the Action Service -func NewActionService(cfg *config.CGRConfig, dm *DataDBService, +func NewActionService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ActionService{ connMgr: connMgr, cfg: cfg, - dm: dm, filterSChan: filterSChan, rldChan: make(chan struct{}, 1), srvIndexer: srvIndexer, @@ -54,7 +53,6 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService, type ActionService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS acts *actions.ActionS @@ -95,9 +93,9 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = acts.dm.WaitForDM(ctx); err != nil { - return + dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ActionS, utils.DataDB, utils.StateServiceUP) } anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { @@ -106,7 +104,7 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er acts.Lock() defer acts.Unlock() - acts.acts = actions.NewActionS(acts.cfg, filterS, datadb, acts.connMgr) + acts.acts = actions.NewActionS(acts.cfg, filterS, dbs.DataManager(), acts.connMgr) acts.stopChan = make(chan struct{}) go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) diff --git a/services/adminsv1.go b/services/adminsv1.go index 0bc12d7af..ceba872b6 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -33,13 +33,12 @@ import ( // NewAPIerSv1Service returns the APIerSv1 Service func NewAdminSv1Service(cfg *config.CGRConfig, - dm *DataDBService, storDB *StorDBService, + storDB *StorDBService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AdminSv1Service{ cfg: cfg, - dm: dm, storDB: storDB, filterSChan: filterSChan, connMgr: connMgr, @@ -52,7 +51,6 @@ func NewAdminSv1Service(cfg *config.CGRConfig, type AdminSv1Service struct { sync.RWMutex - dm *DataDBService storDB *StorDBService filterSChan chan *engine.FilterS @@ -84,9 +82,9 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = apiService.dm.WaitForDM(ctx); err != nil { - return + dbs := apiService.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AdminS, utils.DataDB, utils.StateServiceUP) } anz := apiService.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) { @@ -100,7 +98,7 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF apiService.Lock() defer apiService.Unlock() - apiService.api = apis.NewAdminSv1(apiService.cfg, datadb, apiService.connMgr, filterS, storDBChan) + apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), apiService.connMgr, filterS, storDBChan) // go apiService.api.ListenAndServe(apiService.stopChan) // runtime.Gosched() @@ -111,7 +109,7 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF for _, s := range srv { apiService.cl.RpcRegister(s) } - rpl, _ := engine.NewService(apis.NewReplicatorSv1(datadb, apiService.api)) + rpl, _ := engine.NewService(apis.NewReplicatorSv1(dbs.DataManager(), apiService.api)) for _, s := range rpl { apiService.cl.RpcRegister(s) } diff --git a/services/attributes.go b/services/attributes.go index c42aea8d0..989472d0e 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -33,13 +33,12 @@ import ( ) // NewAttributeService returns the Attribute Service -func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, +func NewAttributeService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, dspS *DispatcherService, sIndxr *servmanager.ServiceIndexer) servmanager.Service { return &AttributeService{ cfg: cfg, - dm: dm, filterSChan: filterSChan, dspS: dspS, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -51,7 +50,6 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, type AttributeService struct { sync.RWMutex - dm *DataDBService dspS *DispatcherService filterSChan chan *engine.FilterS @@ -94,9 +92,9 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) if filterS, err = waitForFilterS(ctx, attrS.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = attrS.dm.WaitForDM(ctx); err != nil { - return + dbs := attrS.serviceIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.DataDB, utils.StateServiceUP) } anz := attrS.serviceIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { @@ -105,7 +103,7 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) attrS.Lock() defer attrS.Unlock() - attrS.attrS = engine.NewAttributeService(datadb, filterS, attrS.cfg) + attrS.attrS = engine.NewAttributeService(dbs.DataManager(), filterS, attrS.cfg) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AttributeS)) attrS.rpc = apis.NewAttributeSv1(attrS.attrS) srv, _ := engine.NewService(attrS.rpc) diff --git a/services/caches.go b/services/caches.go index 9d4685a32..070efaba0 100644 --- a/services/caches.go +++ b/services/caches.go @@ -30,13 +30,12 @@ import ( ) // NewCacheService . -func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager, +func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager, cores *CoreService, srvIndexer *servmanager.ServiceIndexer) *CacheService { return &CacheService{ cfg: cfg, cores: cores, - dm: dm, connMgr: connMgr, cacheCh: make(chan *engine.CacheS, 1), srvIndexer: srvIndexer, @@ -47,7 +46,6 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C // CacheService implements Agent interface type CacheService struct { cores *CoreService - dm *DataDBService cl *commonlisteners.CommonListenerS @@ -67,11 +65,10 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CommonListenerS, utils.StateServiceUP) } cS.cl = cls.CLS() - var dm *engine.DataManager - if dm, err = cS.dm.WaitForDM(ctx); err != nil { - return + dbs := cS.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.CacheS, utils.DataDB, utils.StateServiceUP) } - anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.CacheS, utils.AnalyzerS, utils.StateServiceUP) @@ -80,7 +77,7 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e if cs, err = cS.cores.WaitForCoreS(ctx); err != nil { return } - engine.Cache = engine.NewCacheS(cS.cfg, dm, cS.connMgr, cs.CapsStats) + engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CapsStats) go engine.Cache.Precache(ctx, shtDw) cS.cacheCh <- engine.Cache diff --git a/services/cdrs.go b/services/cdrs.go index 233b51e5f..675a838c2 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -34,13 +34,12 @@ import ( ) // NewCDRServer returns the CDR Server -func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, +func NewCDRServer(cfg *config.CGRConfig, storDB *StorDBService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &CDRService{ cfg: cfg, - dm: dm, storDB: storDB, filterSChan: filterSChan, connMgr: connMgr, @@ -53,7 +52,6 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, type CDRService struct { sync.RWMutex - dm *DataDBService storDB *StorDBService filterSChan chan *engine.FilterS @@ -86,9 +84,9 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err if filterS, err = waitForFilterS(ctx, cs.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = cs.dm.WaitForDM(ctx); err != nil { - return + dbs := cs.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.CDRs, utils.DataDB, utils.StateServiceUP) } anz := cs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) { @@ -102,7 +100,7 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err cs.Lock() defer cs.Unlock() - cs.cdrS = cdrs.NewCDRServer(cs.cfg, datadb, filterS, cs.connMgr, storDBChan) + cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), filterS, cs.connMgr, storDBChan) go cs.cdrS.ListenAndServe(cs.stopChan) runtime.Gosched() utils.Logger.Info("Registering CDRS RPC service.") diff --git a/services/chargers.go b/services/chargers.go index 93b34c0d3..cec4aba7d 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -33,13 +33,12 @@ import ( ) // NewChargerService returns the Charger Service -func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, +func NewChargerService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ChargerService{ cfg: cfg, - dm: dm, filterSChan: filterSChan, connMgr: connMgr, srvIndexer: srvIndexer, @@ -51,7 +50,6 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, type ChargerService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS chrS *engine.ChargerS @@ -89,9 +87,9 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e if filterS, err = waitForFilterS(ctx, chrS.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = chrS.dm.WaitForDM(ctx); err != nil { - return + dbs := chrS.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.DataDB, utils.StateServiceUP) } anz := chrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { @@ -100,7 +98,7 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e chrS.Lock() defer chrS.Unlock() - chrS.chrS = engine.NewChargerService(datadb, filterS, chrS.cfg, chrS.connMgr) + chrS.chrS = engine.NewChargerService(dbs.DataManager(), filterS, chrS.cfg, chrS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ChargerS)) srv, _ := engine.NewService(chrS.chrS) // srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false) diff --git a/services/datadb.go b/services/datadb.go index 704d3a8cf..643ba0132 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -36,7 +36,6 @@ func NewDataDBService(cfg *config.CGRConfig, connMgr *engine.ConnManager, setVer srvIndexer *servmanager.ServiceIndexer) *DataDBService { return &DataDBService{ cfg: cfg, - dbchan: make(chan *engine.DataManager, 1), connMgr: connMgr, setVersions: setVersions, srvDep: srvDep, @@ -53,7 +52,6 @@ type DataDBService struct { connMgr *engine.ConnManager dm *engine.DataManager - dbchan chan *engine.DataManager setVersions bool srvDep map[string]*sync.WaitGroup @@ -91,7 +89,6 @@ func (db *DataDBService) Start(*context.Context, context.CancelFunc) (err error) return err } - db.dbchan <- db.dm close(db.stateDeps.StateChan(utils.StateServiceUP)) return } @@ -131,7 +128,6 @@ func (db *DataDBService) Shutdown() (_ error) { db.Lock() db.dm.DataDB().Close() db.dm = nil - <-db.dbchan db.Unlock() return } @@ -186,18 +182,9 @@ func (db *DataDBService) needsConnectionReload() bool { db.oldDBCfg.Opts.RedisPoolPipelineLimit != db.cfg.DataDbCfg().Opts.RedisPoolPipelineLimit) } -// GetDM returns the DataManager -func (db *DataDBService) WaitForDM(ctx *context.Context) (datadb *engine.DataManager, err error) { - db.RLock() - dbCh := db.dbchan - db.RUnlock() - select { - case <-ctx.Done(): - err = ctx.Err() - case datadb = <-dbCh: - dbCh <- datadb - } - return +// DataManager returns the DataManager object. +func (db *DataDBService) DataManager() *engine.DataManager { + return db.dm } // StateChan returns signaling channel of specific state diff --git a/services/dispatchers.go b/services/dispatchers.go index ac38f871c..993a9af84 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -32,13 +32,12 @@ import ( ) // NewDispatcherService returns the Dispatcher Service -func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, +func NewDispatcherService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) *DispatcherService { return &DispatcherService{ cfg: cfg, - dm: dm, filterSChan: filterSChan, connMgr: connMgr, srvsReload: make(map[string]chan struct{}), @@ -51,7 +50,6 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, type DispatcherService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS dspS *dispatchers.DispatcherService @@ -91,9 +89,9 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) if filterS, err = waitForFilterS(ctx, dspS.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = dspS.dm.WaitForDM(ctx); err != nil { - return + dbs := dspS.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.DataDB, utils.StateServiceUP) } anz := dspS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { @@ -103,7 +101,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) dspS.Lock() defer dspS.Unlock() - dspS.dspS = dispatchers.NewDispatcherService(datadb, dspS.cfg, filterS, dspS.connMgr) + dspS.dspS = dispatchers.NewDispatcherService(dbs.DataManager(), dspS.cfg, filterS, dspS.connMgr) dspS.unregisterAllDispatchedSubsystems() // unregister all rpc services that can be dispatched diff --git a/services/loaders.go b/services/loaders.go index 1f0023355..cec70afda 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -33,13 +33,12 @@ import ( ) // NewLoaderService returns the Loader Service -func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, +func NewLoaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) *LoaderService { return &LoaderService{ cfg: cfg, - dm: dm, filterSChan: filterSChan, connMgr: connMgr, stopChan: make(chan struct{}), @@ -52,7 +51,6 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, type LoaderService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS ldrs *loaders.LoaderS @@ -82,9 +80,9 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = ldrs.dm.WaitForDM(ctx); err != nil { - return + dbs := ldrs.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.DataDB, utils.StateServiceUP) } anz := ldrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { @@ -94,7 +92,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er ldrs.Lock() defer ldrs.Unlock() - ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, datadb, filterS, ldrs.connMgr) + ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), filterS, ldrs.connMgr) if !ldrs.ldrs.Enabled() { return @@ -120,9 +118,9 @@ func (ldrs *LoaderService) Reload(ctx *context.Context, _ context.CancelFunc) er if err != nil { return err } - datadb, err := ldrs.dm.WaitForDM(ctx) - if err != nil { - return err + dbs := ldrs.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.DataDB, utils.StateServiceUP) } close(ldrs.stopChan) ldrs.stopChan = make(chan struct{}) @@ -130,7 +128,7 @@ func (ldrs *LoaderService) Reload(ctx *context.Context, _ context.CancelFunc) er ldrs.RLock() defer ldrs.RUnlock() - ldrs.ldrs.Reload(datadb, filterS, ldrs.connMgr) + ldrs.ldrs.Reload(dbs.DataManager(), filterS, ldrs.connMgr) return ldrs.ldrs.ListenAndServe(ldrs.stopChan) } diff --git a/services/rankings.go b/services/rankings.go index 40178313c..9401a047b 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -33,14 +33,13 @@ import ( ) // NewRankingService returns the RankingS Service -func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, +func NewRankingService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RankingService{ cfg: cfg, - dm: dm, filterSChan: filterSChan, connMgr: connMgr, srvDep: srvDep, @@ -52,7 +51,6 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, type RankingService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS ran *engine.RankingS @@ -89,9 +87,9 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er ); err != nil { return err } - var datadb *engine.DataManager - if datadb, err = ran.dm.WaitForDM(ctx); err != nil { - return + dbs := ran.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RankingS, utils.DataDB, utils.StateServiceUP) } var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, ran.filterSChan); err != nil { @@ -104,7 +102,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er ran.Lock() defer ran.Unlock() - ran.ran = engine.NewRankingS(datadb, ran.connMgr, filterS, ran.cfg) + ran.ran = engine.NewRankingS(dbs.DataManager(), ran.connMgr, filterS, ran.cfg) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.RankingS)) diff --git a/services/rates.go b/services/rates.go index c1cdf7945..c85bd6204 100644 --- a/services/rates.go +++ b/services/rates.go @@ -34,12 +34,10 @@ import ( // NewRateService constructs RateService func NewRateService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - dmS *DataDBService, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RateService{ cfg: cfg, filterSChan: filterSChan, - dmS: dmS, rldChan: make(chan struct{}), srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -124,9 +122,9 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er if filterS, err = waitForFilterS(ctx, rs.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = rs.dmS.WaitForDM(ctx); err != nil { - return + dbs := rs.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RateS, utils.DataDB, utils.StateServiceUP) } anz := rs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { @@ -134,7 +132,7 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er } rs.Lock() - rs.rateS = rates.NewRateS(rs.cfg, filterS, datadb) + rs.rateS = rates.NewRateS(rs.cfg, filterS, dbs.DataManager()) rs.Unlock() rs.stopChan = make(chan struct{}) diff --git a/services/resources.go b/services/resources.go index a41fab605..118fe5671 100644 --- a/services/resources.go +++ b/services/resources.go @@ -32,14 +32,13 @@ import ( ) // NewResourceService returns the Resource Service -func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, +func NewResourceService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ResourceService{ cfg: cfg, - dm: dm, filterSChan: filterSChan, connMgr: connMgr, srvDep: srvDep, @@ -52,7 +51,6 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, type ResourceService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS reS *engine.ResourceS @@ -93,9 +91,9 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e if filterS, err = waitForFilterS(ctx, reS.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = reS.dm.WaitForDM(ctx); err != nil { - return + dbs := reS.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.DataDB, utils.StateServiceUP) } anz := reS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { @@ -104,7 +102,7 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e reS.Lock() defer reS.Unlock() - reS.reS = engine.NewResourceService(datadb, reS.cfg, filterS, reS.connMgr) + reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, filterS, reS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS)) reS.reS.StartLoop(ctx) srv, _ := engine.NewService(reS.reS) diff --git a/services/routes.go b/services/routes.go index a0fee78fd..2e80cf59c 100644 --- a/services/routes.go +++ b/services/routes.go @@ -33,13 +33,12 @@ import ( ) // NewRouteService returns the Route Service -func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, +func NewRouteService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RouteService{ cfg: cfg, - dm: dm, filterSChan: filterSChan, connMgr: connMgr, srvIndexer: srvIndexer, @@ -51,7 +50,6 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, type RouteService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS routeS *engine.RouteS @@ -89,9 +87,9 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e if filterS, err = waitForFilterS(ctx, routeS.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = routeS.dm.WaitForDM(ctx); err != nil { - return + dbs := routeS.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RouteS, utils.DataDB, utils.StateServiceUP) } anz := routeS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { @@ -100,7 +98,7 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e routeS.Lock() defer routeS.Unlock() - routeS.routeS = engine.NewRouteService(datadb, filterS, routeS.cfg, routeS.connMgr) + routeS.routeS = engine.NewRouteService(dbs.DataManager(), filterS, routeS.cfg, routeS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.RouteS)) srv, _ := engine.NewService(routeS.routeS) diff --git a/services/sessions.go b/services/sessions.go index d01035079..b75eb1240 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -35,12 +35,11 @@ import ( ) // NewSessionService returns the Session Service -func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, +func NewSessionService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &SessionService{ cfg: cfg, - dm: dm, filterSChan: filterSChan, connMgr: connMgr, srvIndexer: srvIndexer, @@ -52,7 +51,6 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha type SessionService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS sm *sessions.SessionS @@ -83,9 +81,9 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = smg.dm.WaitForDM(ctx); err != nil { - return + dbs := smg.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.SessionS, utils.DataDB, utils.StateServiceUP) } anz := smg.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) { @@ -95,7 +93,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) smg.Lock() defer smg.Unlock() - smg.sm = sessions.NewSessionS(smg.cfg, datadb, filterS, smg.connMgr) + smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), filterS, smg.connMgr) //start sync session in a separate goroutine smg.stopChan = make(chan struct{}) go smg.sm.ListenAndServe(smg.stopChan) diff --git a/services/stats.go b/services/stats.go index 90d7704fa..4ffa82171 100644 --- a/services/stats.go +++ b/services/stats.go @@ -32,14 +32,13 @@ import ( ) // NewStatService returns the Stat Service -func NewStatService(cfg *config.CGRConfig, dm *DataDBService, +func NewStatService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &StatService{ cfg: cfg, - dm: dm, filterSChan: filterSChan, connMgr: connMgr, srvDep: srvDep, @@ -52,7 +51,6 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, type StatService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS sts *engine.StatS @@ -93,9 +91,9 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e if filterS, err = waitForFilterS(ctx, sts.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = sts.dm.WaitForDM(ctx); err != nil { - return + dbs := sts.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.StatS, utils.DataDB, utils.StateServiceUP) } anz := sts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { @@ -104,7 +102,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e sts.Lock() defer sts.Unlock() - sts.sts = engine.NewStatService(datadb, sts.cfg, filterS, sts.connMgr) + sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, filterS, sts.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS)) diff --git a/services/thresholds.go b/services/thresholds.go index 892f2eb2c..84cbb4ea9 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -32,14 +32,13 @@ import ( ) // NewThresholdService returns the Threshold Service -func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, +func NewThresholdService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ThresholdService{ cfg: cfg, - dm: dm, filterSChan: filterSChan, srvDep: srvDep, connMgr: connMgr, @@ -52,7 +51,6 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, type ThresholdService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS thrs *engine.ThresholdS @@ -93,9 +91,9 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) if filterS, err = waitForFilterS(ctx, thrs.filterSChan); err != nil { return } - var datadb *engine.DataManager - if datadb, err = thrs.dm.WaitForDM(ctx); err != nil { - return + dbs := thrs.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.DataDB, utils.StateServiceUP) } anz := thrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { @@ -104,7 +102,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) thrs.Lock() defer thrs.Unlock() - thrs.thrs = engine.NewThresholdService(datadb, thrs.cfg, filterS, thrs.connMgr) + thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, filterS, thrs.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS)) thrs.thrs.StartLoop(ctx) diff --git a/services/tpes.go b/services/tpes.go index cb485f6c7..296bae308 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -33,11 +33,10 @@ import ( ) // NewTPeService is the constructor for the TpeService -func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService, +func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &TPeService{ cfg: cfg, - dm: dm, connMgr: connMgr, srvIndexer: srvIndexer, stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), @@ -48,8 +47,6 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataD type TPeService struct { sync.RWMutex - dm *DataDBService - tpes *tpes.TPeS cl *commonlisteners.CommonListenerS srv *birpc.Service @@ -70,12 +67,12 @@ func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err err return utils.NewServiceStateTimeoutError(utils.TPeS, utils.CommonListenerS, utils.StateServiceUP) } ts.cl = cls.CLS() - var datadb *engine.DataManager - if datadb, err = ts.dm.WaitForDM(ctx); err != nil { - return + dbs := ts.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.TPeS, utils.DataDB, utils.StateServiceUP) } - ts.tpes = tpes.NewTPeS(ts.cfg, datadb, ts.connMgr) + ts.tpes = tpes.NewTPeS(ts.cfg, dbs.DataManager(), ts.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TPeS)) ts.stopChan = make(chan struct{}) ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false) diff --git a/services/trends.go b/services/trends.go index f37ec0b01..541999940 100644 --- a/services/trends.go +++ b/services/trends.go @@ -32,14 +32,13 @@ import ( ) // NewTrendsService returns the TrendS Service -func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, +func NewTrendService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &TrendService{ cfg: cfg, - dm: dm, connMgr: connMgr, srvDep: srvDep, filterSChan: filterSChan, @@ -51,7 +50,6 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, type TrendService struct { sync.RWMutex - dm *DataDBService filterSChan chan *engine.FilterS trs *engine.TrendS @@ -88,9 +86,9 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err ); err != nil { return err } - var datadb *engine.DataManager - if datadb, err = trs.dm.WaitForDM(ctx); err != nil { - return + dbs := trs.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.TrendS, utils.DataDB, utils.StateServiceUP) } var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, trs.filterSChan); err != nil { @@ -103,7 +101,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err trs.Lock() defer trs.Unlock() - trs.trs = engine.NewTrendService(datadb, trs.cfg, filterS, trs.connMgr) + trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, filterS, trs.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TrendS)) if err := trs.trs.StartTrendS(ctx); err != nil { return err