Remove dmS parameter from service constructors

Use ServiceIndexer instead
This commit is contained in:
ionutboangiu
2024-12-10 20:37:16 +02:00
committed by Dan Christian Bogos
parent 55ecdf45e4
commit 10dfcc5e48
20 changed files with 116 additions and 171 deletions

View File

@@ -144,20 +144,20 @@ func runCGREngine(fs []string) (err error) {
cls := services.NewCommonListenerService(cfg, caps, srvIdxr)
anzS := services.NewAnalyzerService(cfg, iFilterSCh, srvIdxr)
coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg, srvIdxr)
cacheS := services.NewCacheService(cfg, dmS, connMgr, coreS, srvIdxr)
dspS := services.NewDispatcherService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
ldrs := services.NewLoaderService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
cacheS := services.NewCacheService(cfg, connMgr, coreS, srvIdxr)
dspS := services.NewDispatcherService(cfg, iFilterSCh, connMgr, srvIdxr)
ldrs := services.NewLoaderService(cfg, iFilterSCh, connMgr, srvIdxr)
efs := services.NewExportFailoverService(cfg, connMgr, srvIdxr)
adminS := services.NewAdminSv1Service(cfg, dmS, sdbS, iFilterSCh, connMgr, srvIdxr)
sessionS := services.NewSessionService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
attrS := services.NewAttributeService(cfg, dmS, iFilterSCh, dspS, srvIdxr)
chrgS := services.NewChargerService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
routeS := services.NewRouteService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
resourceS := services.NewResourceService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr)
trendS := services.NewTrendService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr)
rankingS := services.NewRankingService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr)
thS := services.NewThresholdService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr)
stS := services.NewStatService(cfg, dmS, iFilterSCh, connMgr, srvDep, srvIdxr)
adminS := services.NewAdminSv1Service(cfg, sdbS, iFilterSCh, connMgr, srvIdxr)
sessionS := services.NewSessionService(cfg, iFilterSCh, connMgr, srvIdxr)
attrS := services.NewAttributeService(cfg, iFilterSCh, dspS, srvIdxr)
chrgS := services.NewChargerService(cfg, iFilterSCh, connMgr, srvIdxr)
routeS := services.NewRouteService(cfg, iFilterSCh, connMgr, srvIdxr)
resourceS := services.NewResourceService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr)
trendS := services.NewTrendService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr)
rankingS := services.NewRankingService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr)
thS := services.NewThresholdService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr)
stS := services.NewStatService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr)
erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, srvIdxr)
dnsAgent := services.NewDNSAgent(cfg, iFilterSCh, connMgr, srvIdxr)
fsAgent := services.NewFreeswitchAgent(cfg, connMgr, srvIdxr)
@@ -169,12 +169,12 @@ func runCGREngine(fs []string) (err error) {
httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, connMgr, srvIdxr)
sipAgent := services.NewSIPAgent(cfg, iFilterSCh, connMgr, srvIdxr)
eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, srvIdxr)
cdrS := services.NewCDRServer(cfg, dmS, sdbS, iFilterSCh, connMgr, srvIdxr)
cdrS := services.NewCDRServer(cfg, sdbS, iFilterSCh, connMgr, srvIdxr)
registrarcS := services.NewRegistrarCService(cfg, connMgr, srvIdxr)
rateS := services.NewRateService(cfg, iFilterSCh, dmS, srvIdxr)
actionS := services.NewActionService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
accS := services.NewAccountService(cfg, dmS, iFilterSCh, connMgr, srvIdxr)
tpeS := services.NewTPeService(cfg, connMgr, dmS, srvIdxr)
rateS := services.NewRateService(cfg, iFilterSCh, srvIdxr)
actionS := services.NewActionService(cfg, iFilterSCh, connMgr, srvIdxr)
accS := services.NewAccountService(cfg, iFilterSCh, connMgr, srvIdxr)
tpeS := services.NewTPeService(cfg, connMgr, srvIdxr)
srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{
gvS,
@@ -380,13 +380,9 @@ func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS
case <-ctx.Done():
return
}
dm, err := db.WaitForDM(ctx)
if err != nil {
return
}
select {
case <-cacheS.GetPrecacheChannel(utils.CacheFilters):
iFilterSCh <- engine.NewFilterS(cfg, connMgr, dm)
iFilterSCh <- engine.NewFilterS(cfg, connMgr, db.DataManager())
case <-ctx.Done():
}
}

View File

@@ -35,13 +35,12 @@ import (
)
// NewAccountService returns the Account Service
func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
func NewAccountService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &AccountService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
connMgr: connMgr,
rldChan: make(chan struct{}, 1),
@@ -54,7 +53,6 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
type AccountService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
acts *accounts.AccountS
@@ -93,9 +91,9 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = acts.dm.WaitForDM(ctx); err != nil {
return
dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AccountS, utils.DataDB, utils.StateServiceUP)
}
anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
@@ -104,7 +102,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
acts.Lock()
defer acts.Unlock()
acts.acts = accounts.NewAccountS(acts.cfg, filterS, acts.connMgr, datadb)
acts.acts = accounts.NewAccountS(acts.cfg, filterS, acts.connMgr, dbs.DataManager())
acts.stopChan = make(chan struct{})
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)

View File

@@ -35,14 +35,13 @@ import (
)
// NewActionService returns the Action Service
func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
func NewActionService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ActionService{
connMgr: connMgr,
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
rldChan: make(chan struct{}, 1),
srvIndexer: srvIndexer,
@@ -54,7 +53,6 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
type ActionService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
acts *actions.ActionS
@@ -95,9 +93,9 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = acts.dm.WaitForDM(ctx); err != nil {
return
dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.DataDB, utils.StateServiceUP)
}
anz := acts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
@@ -106,7 +104,7 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
acts.Lock()
defer acts.Unlock()
acts.acts = actions.NewActionS(acts.cfg, filterS, datadb, acts.connMgr)
acts.acts = actions.NewActionS(acts.cfg, filterS, dbs.DataManager(), acts.connMgr)
acts.stopChan = make(chan struct{})
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)

View File

@@ -33,13 +33,12 @@ import (
// NewAPIerSv1Service returns the APIerSv1 Service
func NewAdminSv1Service(cfg *config.CGRConfig,
dm *DataDBService, storDB *StorDBService,
storDB *StorDBService,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &AdminSv1Service{
cfg: cfg,
dm: dm,
storDB: storDB,
filterSChan: filterSChan,
connMgr: connMgr,
@@ -52,7 +51,6 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
type AdminSv1Service struct {
sync.RWMutex
dm *DataDBService
storDB *StorDBService
filterSChan chan *engine.FilterS
@@ -84,9 +82,9 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = apiService.dm.WaitForDM(ctx); err != nil {
return
dbs := apiService.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AdminS, utils.DataDB, utils.StateServiceUP)
}
anz := apiService.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
@@ -100,7 +98,7 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
apiService.Lock()
defer apiService.Unlock()
apiService.api = apis.NewAdminSv1(apiService.cfg, datadb, apiService.connMgr, filterS, storDBChan)
apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), apiService.connMgr, filterS, storDBChan)
// go apiService.api.ListenAndServe(apiService.stopChan)
// runtime.Gosched()
@@ -111,7 +109,7 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
for _, s := range srv {
apiService.cl.RpcRegister(s)
}
rpl, _ := engine.NewService(apis.NewReplicatorSv1(datadb, apiService.api))
rpl, _ := engine.NewService(apis.NewReplicatorSv1(dbs.DataManager(), apiService.api))
for _, s := range rpl {
apiService.cl.RpcRegister(s)
}

View File

@@ -33,13 +33,12 @@ import (
)
// NewAttributeService returns the Attribute Service
func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
func NewAttributeService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
dspS *DispatcherService,
sIndxr *servmanager.ServiceIndexer) servmanager.Service {
return &AttributeService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
dspS: dspS,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -51,7 +50,6 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
type AttributeService struct {
sync.RWMutex
dm *DataDBService
dspS *DispatcherService
filterSChan chan *engine.FilterS
@@ -94,9 +92,9 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
if filterS, err = waitForFilterS(ctx, attrS.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = attrS.dm.WaitForDM(ctx); err != nil {
return
dbs := attrS.serviceIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.DataDB, utils.StateServiceUP)
}
anz := attrS.serviceIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
@@ -105,7 +103,7 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
attrS.Lock()
defer attrS.Unlock()
attrS.attrS = engine.NewAttributeService(datadb, filterS, attrS.cfg)
attrS.attrS = engine.NewAttributeService(dbs.DataManager(), filterS, attrS.cfg)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AttributeS))
attrS.rpc = apis.NewAttributeSv1(attrS.attrS)
srv, _ := engine.NewService(attrS.rpc)

View File

@@ -30,13 +30,12 @@ import (
)
// NewCacheService .
func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager,
func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
cores *CoreService,
srvIndexer *servmanager.ServiceIndexer) *CacheService {
return &CacheService{
cfg: cfg,
cores: cores,
dm: dm,
connMgr: connMgr,
cacheCh: make(chan *engine.CacheS, 1),
srvIndexer: srvIndexer,
@@ -47,7 +46,6 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C
// CacheService implements Agent interface
type CacheService struct {
cores *CoreService
dm *DataDBService
cl *commonlisteners.CommonListenerS
@@ -67,11 +65,10 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e
return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CommonListenerS, utils.StateServiceUP)
}
cS.cl = cls.CLS()
var dm *engine.DataManager
if dm, err = cS.dm.WaitForDM(ctx); err != nil {
return
dbs := cS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CacheS, utils.DataDB, utils.StateServiceUP)
}
anz := cS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CacheS, utils.AnalyzerS, utils.StateServiceUP)
@@ -80,7 +77,7 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e
if cs, err = cS.cores.WaitForCoreS(ctx); err != nil {
return
}
engine.Cache = engine.NewCacheS(cS.cfg, dm, cS.connMgr, cs.CapsStats)
engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CapsStats)
go engine.Cache.Precache(ctx, shtDw)
cS.cacheCh <- engine.Cache

View File

@@ -34,13 +34,12 @@ import (
)
// NewCDRServer returns the CDR Server
func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
func NewCDRServer(cfg *config.CGRConfig,
storDB *StorDBService, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &CDRService{
cfg: cfg,
dm: dm,
storDB: storDB,
filterSChan: filterSChan,
connMgr: connMgr,
@@ -53,7 +52,6 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
type CDRService struct {
sync.RWMutex
dm *DataDBService
storDB *StorDBService
filterSChan chan *engine.FilterS
@@ -86,9 +84,9 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err
if filterS, err = waitForFilterS(ctx, cs.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = cs.dm.WaitForDM(ctx); err != nil {
return
dbs := cs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CDRs, utils.DataDB, utils.StateServiceUP)
}
anz := cs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
@@ -102,7 +100,7 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err
cs.Lock()
defer cs.Unlock()
cs.cdrS = cdrs.NewCDRServer(cs.cfg, datadb, filterS, cs.connMgr, storDBChan)
cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), filterS, cs.connMgr, storDBChan)
go cs.cdrS.ListenAndServe(cs.stopChan)
runtime.Gosched()
utils.Logger.Info("Registering CDRS RPC service.")

View File

@@ -33,13 +33,12 @@ import (
)
// NewChargerService returns the Charger Service
func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
func NewChargerService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ChargerService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
@@ -51,7 +50,6 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
type ChargerService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
chrS *engine.ChargerS
@@ -89,9 +87,9 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
if filterS, err = waitForFilterS(ctx, chrS.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = chrS.dm.WaitForDM(ctx); err != nil {
return
dbs := chrS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.DataDB, utils.StateServiceUP)
}
anz := chrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
@@ -100,7 +98,7 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
chrS.Lock()
defer chrS.Unlock()
chrS.chrS = engine.NewChargerService(datadb, filterS, chrS.cfg, chrS.connMgr)
chrS.chrS = engine.NewChargerService(dbs.DataManager(), filterS, chrS.cfg, chrS.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ChargerS))
srv, _ := engine.NewService(chrS.chrS)
// srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false)

View File

@@ -36,7 +36,6 @@ func NewDataDBService(cfg *config.CGRConfig, connMgr *engine.ConnManager, setVer
srvIndexer *servmanager.ServiceIndexer) *DataDBService {
return &DataDBService{
cfg: cfg,
dbchan: make(chan *engine.DataManager, 1),
connMgr: connMgr,
setVersions: setVersions,
srvDep: srvDep,
@@ -53,7 +52,6 @@ type DataDBService struct {
connMgr *engine.ConnManager
dm *engine.DataManager
dbchan chan *engine.DataManager
setVersions bool
srvDep map[string]*sync.WaitGroup
@@ -91,7 +89,6 @@ func (db *DataDBService) Start(*context.Context, context.CancelFunc) (err error)
return err
}
db.dbchan <- db.dm
close(db.stateDeps.StateChan(utils.StateServiceUP))
return
}
@@ -131,7 +128,6 @@ func (db *DataDBService) Shutdown() (_ error) {
db.Lock()
db.dm.DataDB().Close()
db.dm = nil
<-db.dbchan
db.Unlock()
return
}
@@ -186,18 +182,9 @@ func (db *DataDBService) needsConnectionReload() bool {
db.oldDBCfg.Opts.RedisPoolPipelineLimit != db.cfg.DataDbCfg().Opts.RedisPoolPipelineLimit)
}
// GetDM returns the DataManager
func (db *DataDBService) WaitForDM(ctx *context.Context) (datadb *engine.DataManager, err error) {
db.RLock()
dbCh := db.dbchan
db.RUnlock()
select {
case <-ctx.Done():
err = ctx.Err()
case datadb = <-dbCh:
dbCh <- datadb
}
return
// DataManager returns the DataManager object.
func (db *DataDBService) DataManager() *engine.DataManager {
return db.dm
}
// StateChan returns signaling channel of specific state

View File

@@ -32,13 +32,12 @@ import (
)
// NewDispatcherService returns the Dispatcher Service
func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
func NewDispatcherService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) *DispatcherService {
return &DispatcherService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
connMgr: connMgr,
srvsReload: make(map[string]chan struct{}),
@@ -51,7 +50,6 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
type DispatcherService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
dspS *dispatchers.DispatcherService
@@ -91,9 +89,9 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
if filterS, err = waitForFilterS(ctx, dspS.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = dspS.dm.WaitForDM(ctx); err != nil {
return
dbs := dspS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.DataDB, utils.StateServiceUP)
}
anz := dspS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
@@ -103,7 +101,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
dspS.Lock()
defer dspS.Unlock()
dspS.dspS = dispatchers.NewDispatcherService(datadb, dspS.cfg, filterS, dspS.connMgr)
dspS.dspS = dispatchers.NewDispatcherService(dbs.DataManager(), dspS.cfg, filterS, dspS.connMgr)
dspS.unregisterAllDispatchedSubsystems() // unregister all rpc services that can be dispatched

View File

@@ -33,13 +33,12 @@ import (
)
// NewLoaderService returns the Loader Service
func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
func NewLoaderService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) *LoaderService {
return &LoaderService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
connMgr: connMgr,
stopChan: make(chan struct{}),
@@ -52,7 +51,6 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
type LoaderService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
ldrs *loaders.LoaderS
@@ -82,9 +80,9 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = ldrs.dm.WaitForDM(ctx); err != nil {
return
dbs := ldrs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.DataDB, utils.StateServiceUP)
}
anz := ldrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
@@ -94,7 +92,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
ldrs.Lock()
defer ldrs.Unlock()
ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, datadb, filterS, ldrs.connMgr)
ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), filterS, ldrs.connMgr)
if !ldrs.ldrs.Enabled() {
return
@@ -120,9 +118,9 @@ func (ldrs *LoaderService) Reload(ctx *context.Context, _ context.CancelFunc) er
if err != nil {
return err
}
datadb, err := ldrs.dm.WaitForDM(ctx)
if err != nil {
return err
dbs := ldrs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.DataDB, utils.StateServiceUP)
}
close(ldrs.stopChan)
ldrs.stopChan = make(chan struct{})
@@ -130,7 +128,7 @@ func (ldrs *LoaderService) Reload(ctx *context.Context, _ context.CancelFunc) er
ldrs.RLock()
defer ldrs.RUnlock()
ldrs.ldrs.Reload(datadb, filterS, ldrs.connMgr)
ldrs.ldrs.Reload(dbs.DataManager(), filterS, ldrs.connMgr)
return ldrs.ldrs.ListenAndServe(ldrs.stopChan)
}

View File

@@ -33,14 +33,13 @@ import (
)
// NewRankingService returns the RankingS Service
func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
func NewRankingService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RankingService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
connMgr: connMgr,
srvDep: srvDep,
@@ -52,7 +51,6 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
type RankingService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
ran *engine.RankingS
@@ -89,9 +87,9 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
); err != nil {
return err
}
var datadb *engine.DataManager
if datadb, err = ran.dm.WaitForDM(ctx); err != nil {
return
dbs := ran.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RankingS, utils.DataDB, utils.StateServiceUP)
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ran.filterSChan); err != nil {
@@ -104,7 +102,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
ran.Lock()
defer ran.Unlock()
ran.ran = engine.NewRankingS(datadb, ran.connMgr, filterS, ran.cfg)
ran.ran = engine.NewRankingS(dbs.DataManager(), ran.connMgr, filterS, ran.cfg)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
utils.CoreS, utils.RankingS))

View File

@@ -34,12 +34,10 @@ import (
// NewRateService constructs RateService
func NewRateService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
dmS *DataDBService,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RateService{
cfg: cfg,
filterSChan: filterSChan,
dmS: dmS,
rldChan: make(chan struct{}),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -124,9 +122,9 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
if filterS, err = waitForFilterS(ctx, rs.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = rs.dmS.WaitForDM(ctx); err != nil {
return
dbs := rs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RateS, utils.DataDB, utils.StateServiceUP)
}
anz := rs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
@@ -134,7 +132,7 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
}
rs.Lock()
rs.rateS = rates.NewRateS(rs.cfg, filterS, datadb)
rs.rateS = rates.NewRateS(rs.cfg, filterS, dbs.DataManager())
rs.Unlock()
rs.stopChan = make(chan struct{})

View File

@@ -32,14 +32,13 @@ import (
)
// NewResourceService returns the Resource Service
func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
func NewResourceService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ResourceService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
connMgr: connMgr,
srvDep: srvDep,
@@ -52,7 +51,6 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
type ResourceService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
reS *engine.ResourceS
@@ -93,9 +91,9 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
if filterS, err = waitForFilterS(ctx, reS.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = reS.dm.WaitForDM(ctx); err != nil {
return
dbs := reS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.DataDB, utils.StateServiceUP)
}
anz := reS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
@@ -104,7 +102,7 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
reS.Lock()
defer reS.Unlock()
reS.reS = engine.NewResourceService(datadb, reS.cfg, filterS, reS.connMgr)
reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, filterS, reS.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS))
reS.reS.StartLoop(ctx)
srv, _ := engine.NewService(reS.reS)

View File

@@ -33,13 +33,12 @@ import (
)
// NewRouteService returns the Route Service
func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
func NewRouteService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RouteService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
@@ -51,7 +50,6 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
type RouteService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
routeS *engine.RouteS
@@ -89,9 +87,9 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
if filterS, err = waitForFilterS(ctx, routeS.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = routeS.dm.WaitForDM(ctx); err != nil {
return
dbs := routeS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RouteS, utils.DataDB, utils.StateServiceUP)
}
anz := routeS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
@@ -100,7 +98,7 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
routeS.Lock()
defer routeS.Unlock()
routeS.routeS = engine.NewRouteService(datadb, filterS, routeS.cfg, routeS.connMgr)
routeS.routeS = engine.NewRouteService(dbs.DataManager(), filterS, routeS.cfg, routeS.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.RouteS))
srv, _ := engine.NewService(routeS.routeS)

View File

@@ -35,12 +35,11 @@ import (
)
// NewSessionService returns the Session Service
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS,
func NewSessionService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &SessionService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
@@ -52,7 +51,6 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha
type SessionService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
sm *sessions.SessionS
@@ -83,9 +81,9 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = smg.dm.WaitForDM(ctx); err != nil {
return
dbs := smg.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.SessionS, utils.DataDB, utils.StateServiceUP)
}
anz := smg.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
@@ -95,7 +93,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
smg.Lock()
defer smg.Unlock()
smg.sm = sessions.NewSessionS(smg.cfg, datadb, filterS, smg.connMgr)
smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), filterS, smg.connMgr)
//start sync session in a separate goroutine
smg.stopChan = make(chan struct{})
go smg.sm.ListenAndServe(smg.stopChan)

View File

@@ -32,14 +32,13 @@ import (
)
// NewStatService returns the Stat Service
func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
func NewStatService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &StatService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
connMgr: connMgr,
srvDep: srvDep,
@@ -52,7 +51,6 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
type StatService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
sts *engine.StatS
@@ -93,9 +91,9 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
if filterS, err = waitForFilterS(ctx, sts.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = sts.dm.WaitForDM(ctx); err != nil {
return
dbs := sts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.StatS, utils.DataDB, utils.StateServiceUP)
}
anz := sts.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
@@ -104,7 +102,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
sts.Lock()
defer sts.Unlock()
sts.sts = engine.NewStatService(datadb, sts.cfg, filterS, sts.connMgr)
sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, filterS, sts.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
utils.CoreS, utils.StatS))

View File

@@ -32,14 +32,13 @@ import (
)
// NewThresholdService returns the Threshold Service
func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
func NewThresholdService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ThresholdService{
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
srvDep: srvDep,
connMgr: connMgr,
@@ -52,7 +51,6 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
type ThresholdService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
thrs *engine.ThresholdS
@@ -93,9 +91,9 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
if filterS, err = waitForFilterS(ctx, thrs.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = thrs.dm.WaitForDM(ctx); err != nil {
return
dbs := thrs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.DataDB, utils.StateServiceUP)
}
anz := thrs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
@@ -104,7 +102,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
thrs.Lock()
defer thrs.Unlock()
thrs.thrs = engine.NewThresholdService(datadb, thrs.cfg, filterS, thrs.connMgr)
thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, filterS, thrs.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS))
thrs.thrs.StartLoop(ctx)

View File

@@ -33,11 +33,10 @@ import (
)
// NewTPeService is the constructor for the TpeService
func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService,
func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &TPeService{
cfg: cfg,
dm: dm,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
@@ -48,8 +47,6 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataD
type TPeService struct {
sync.RWMutex
dm *DataDBService
tpes *tpes.TPeS
cl *commonlisteners.CommonListenerS
srv *birpc.Service
@@ -70,12 +67,12 @@ func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err err
return utils.NewServiceStateTimeoutError(utils.TPeS, utils.CommonListenerS, utils.StateServiceUP)
}
ts.cl = cls.CLS()
var datadb *engine.DataManager
if datadb, err = ts.dm.WaitForDM(ctx); err != nil {
return
dbs := ts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TPeS, utils.DataDB, utils.StateServiceUP)
}
ts.tpes = tpes.NewTPeS(ts.cfg, datadb, ts.connMgr)
ts.tpes = tpes.NewTPeS(ts.cfg, dbs.DataManager(), ts.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TPeS))
ts.stopChan = make(chan struct{})
ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false)

View File

@@ -32,14 +32,13 @@ import (
)
// NewTrendsService returns the TrendS Service
func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
func NewTrendService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &TrendService{
cfg: cfg,
dm: dm,
connMgr: connMgr,
srvDep: srvDep,
filterSChan: filterSChan,
@@ -51,7 +50,6 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
type TrendService struct {
sync.RWMutex
dm *DataDBService
filterSChan chan *engine.FilterS
trs *engine.TrendS
@@ -88,9 +86,9 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
); err != nil {
return err
}
var datadb *engine.DataManager
if datadb, err = trs.dm.WaitForDM(ctx); err != nil {
return
dbs := trs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TrendS, utils.DataDB, utils.StateServiceUP)
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, trs.filterSChan); err != nil {
@@ -103,7 +101,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
trs.Lock()
defer trs.Unlock()
trs.trs = engine.NewTrendService(datadb, trs.cfg, filterS, trs.connMgr)
trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, filterS, trs.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TrendS))
if err := trs.trs.StartTrendS(ctx); err != nil {
return err