Implement FilterService

And use ServiceIndexer to sync with it
This commit is contained in:
ionutboangiu
2024-12-11 18:39:48 +02:00
committed by Dan Christian Bogos
parent db301f7901
commit da2052e7b3
29 changed files with 404 additions and 406 deletions

View File

@@ -134,46 +134,45 @@ func runCGREngine(fs []string) (err error) {
iGuardianSCh := make(chan birpc.ClientConnector, 1)
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, iGuardianSCh)
iFilterSCh := make(chan *engine.FilterS, 1)
// ServiceIndexer will share service references to all services
srvIdxr := servmanager.NewServiceIndexer()
gvS := services.NewGlobalVarS(cfg, srvIdxr)
dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep, srvIdxr)
sdbS := services.NewStorDBService(cfg, *flags.SetVersions, srvIdxr)
cls := services.NewCommonListenerService(cfg, caps, srvIdxr)
anzS := services.NewAnalyzerService(cfg, iFilterSCh, srvIdxr)
anzS := services.NewAnalyzerService(cfg, srvIdxr)
coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg, srvIdxr)
cacheS := services.NewCacheService(cfg, connMgr, coreS, srvIdxr)
dspS := services.NewDispatcherService(cfg, iFilterSCh, connMgr, srvIdxr)
ldrs := services.NewLoaderService(cfg, iFilterSCh, connMgr, srvIdxr)
cacheS := services.NewCacheService(cfg, connMgr, srvIdxr)
fltrS := services.NewFilterService(cfg, connMgr, srvIdxr)
dspS := services.NewDispatcherService(cfg, connMgr, srvIdxr)
ldrs := services.NewLoaderService(cfg, connMgr, srvIdxr)
efs := services.NewExportFailoverService(cfg, connMgr, srvIdxr)
adminS := services.NewAdminSv1Service(cfg, iFilterSCh, connMgr, srvIdxr)
sessionS := services.NewSessionService(cfg, iFilterSCh, connMgr, srvIdxr)
attrS := services.NewAttributeService(cfg, iFilterSCh, dspS, srvIdxr)
chrgS := services.NewChargerService(cfg, iFilterSCh, connMgr, srvIdxr)
routeS := services.NewRouteService(cfg, iFilterSCh, connMgr, srvIdxr)
resourceS := services.NewResourceService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr)
trendS := services.NewTrendService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr)
rankingS := services.NewRankingService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr)
thS := services.NewThresholdService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr)
stS := services.NewStatService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr)
erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, srvIdxr)
dnsAgent := services.NewDNSAgent(cfg, iFilterSCh, connMgr, srvIdxr)
adminS := services.NewAdminSv1Service(cfg, connMgr, srvIdxr)
sessionS := services.NewSessionService(cfg, connMgr, srvIdxr)
attrS := services.NewAttributeService(cfg, dspS, srvIdxr)
chrgS := services.NewChargerService(cfg, connMgr, srvIdxr)
routeS := services.NewRouteService(cfg, connMgr, srvIdxr)
resourceS := services.NewResourceService(cfg, connMgr, srvDep, srvIdxr)
trendS := services.NewTrendService(cfg, connMgr, srvDep, srvIdxr)
rankingS := services.NewRankingService(cfg, connMgr, srvDep, srvIdxr)
thS := services.NewThresholdService(cfg, connMgr, srvDep, srvIdxr)
stS := services.NewStatService(cfg, connMgr, srvDep, srvIdxr)
erS := services.NewEventReaderService(cfg, connMgr, srvIdxr)
dnsAgent := services.NewDNSAgent(cfg, connMgr, srvIdxr)
fsAgent := services.NewFreeswitchAgent(cfg, connMgr, srvIdxr)
kamAgent := services.NewKamailioAgent(cfg, connMgr, srvIdxr)
janusAgent := services.NewJanusAgent(cfg, iFilterSCh, connMgr, srvIdxr)
janusAgent := services.NewJanusAgent(cfg, connMgr, srvIdxr)
astAgent := services.NewAsteriskAgent(cfg, connMgr, srvIdxr)
radAgent := services.NewRadiusAgent(cfg, iFilterSCh, connMgr, srvIdxr)
diamAgent := services.NewDiameterAgent(cfg, iFilterSCh, connMgr, caps, srvIdxr)
httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, connMgr, srvIdxr)
sipAgent := services.NewSIPAgent(cfg, iFilterSCh, connMgr, srvIdxr)
eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, srvIdxr)
cdrS := services.NewCDRServer(cfg, iFilterSCh, connMgr, srvIdxr)
radAgent := services.NewRadiusAgent(cfg, connMgr, srvIdxr)
diamAgent := services.NewDiameterAgent(cfg, connMgr, caps, srvIdxr)
httpAgent := services.NewHTTPAgent(cfg, connMgr, srvIdxr)
sipAgent := services.NewSIPAgent(cfg, connMgr, srvIdxr)
eeS := services.NewEventExporterService(cfg, connMgr, srvIdxr)
cdrS := services.NewCDRServer(cfg, connMgr, srvIdxr)
registrarcS := services.NewRegistrarCService(cfg, connMgr, srvIdxr)
rateS := services.NewRateService(cfg, iFilterSCh, srvIdxr)
actionS := services.NewActionService(cfg, iFilterSCh, connMgr, srvIdxr)
accS := services.NewAccountService(cfg, iFilterSCh, connMgr, srvIdxr)
rateS := services.NewRateService(cfg, srvIdxr)
actionS := services.NewActionService(cfg, connMgr, srvIdxr)
accS := services.NewAccountService(cfg, connMgr, srvIdxr)
tpeS := services.NewTPeService(cfg, connMgr, srvIdxr)
srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{
@@ -184,6 +183,7 @@ func runCGREngine(fs []string) (err error) {
anzS,
coreS,
cacheS,
fltrS,
dspS,
ldrs,
efs,
@@ -308,8 +308,6 @@ func runCGREngine(fs []string) (err error) {
return
}
srvManager.StartServices(ctx, cancel)
// Start FilterS
go cgrStartFilterService(ctx, iFilterSCh, cacheS.GetCacheSChan(), connMgr, cfg, dmS)
cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, cls.CLS(), anzS)
cgrInitGuardianSv1(iGuardianSCh, cfg, cls.CLS(), anzS)
@@ -326,7 +324,7 @@ func runCGREngine(fs []string) (err error) {
// TODO: find a better location for this if block
if *flags.MemPrfDir != "" {
if err := coreS.GetCoreS().StartMemoryProfiling(cores.MemoryProfilingParams{
if err := coreS.CoreS().StartMemoryProfiling(cores.MemoryProfilingParams{
DirPath: *flags.MemPrfDir,
MaxFiles: *flags.MemPrfMaxF,
Interval: *flags.MemPrfInterval,
@@ -369,24 +367,6 @@ func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string
return
}
// cgrStartFilterService fires up the FilterS
func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS,
cacheSCh chan *engine.CacheS, connMgr *engine.ConnManager,
cfg *config.CGRConfig, db *services.DataDBService) {
var cacheS *engine.CacheS
select {
case cacheS = <-cacheSCh:
cacheSCh <- cacheS
case <-ctx.Done():
return
}
select {
case <-cacheS.GetPrecacheChannel(utils.CacheFilters):
iFilterSCh <- engine.NewFilterS(cfg, connMgr, db.DataManager())
case <-ctx.Done():
}
}
func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig,
cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) {
srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)

View File

@@ -36,16 +36,14 @@ import (
// NewAccountService returns the Account Service
func NewAccountService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &AccountService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
rldChan: make(chan struct{}, 1),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
rldChan: make(chan struct{}, 1),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -53,8 +51,6 @@ func NewAccountService(cfg *config.CGRConfig,
type AccountService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
acts *accounts.AccountS
cl *commonlisteners.CommonListenerS
@@ -87,9 +83,9 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
utils.CacheAccountsFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil {
return
fs := acts.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AccountS, utils.FilterS, utils.StateServiceUP)
}
dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
@@ -102,7 +98,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
acts.Lock()
defer acts.Unlock()
acts.acts = accounts.NewAccountS(acts.cfg, filterS, acts.connMgr, dbs.DataManager())
acts.acts = accounts.NewAccountS(acts.cfg, fs.FilterS(), acts.connMgr, dbs.DataManager())
acts.stopChan = make(chan struct{})
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)

View File

@@ -36,16 +36,14 @@ import (
// NewActionService returns the Action Service
func NewActionService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ActionService{
connMgr: connMgr,
cfg: cfg,
filterSChan: filterSChan,
rldChan: make(chan struct{}, 1),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
connMgr: connMgr,
cfg: cfg,
rldChan: make(chan struct{}, 1),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -53,8 +51,6 @@ func NewActionService(cfg *config.CGRConfig,
type ActionService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
acts *actions.ActionS
cl *commonlisteners.CommonListenerS
@@ -89,9 +85,9 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
utils.CacheActionProfilesFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil {
return
fs := acts.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ActionS, utils.FilterS, utils.StateServiceUP)
}
dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) {
@@ -104,7 +100,7 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
acts.Lock()
defer acts.Unlock()
acts.acts = actions.NewActionS(acts.cfg, filterS, dbs.DataManager(), acts.connMgr)
acts.acts = actions.NewActionS(acts.cfg, fs.FilterS(), dbs.DataManager(), acts.connMgr)
acts.stopChan = make(chan struct{})
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)

View File

@@ -33,15 +33,13 @@ import (
// NewAPIerSv1Service returns the APIerSv1 Service
func NewAdminSv1Service(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &AdminSv1Service{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -49,8 +47,6 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
type AdminSv1Service struct {
sync.RWMutex
filterSChan chan *engine.FilterS
api *apis.AdminSv1
cl *commonlisteners.CommonListenerS
@@ -75,9 +71,9 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
return utils.NewServiceStateTimeoutError(utils.AdminS, utils.CommonListenerS, utils.StateServiceUP)
}
apiService.cl = cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil {
return
fs := apiService.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AdminS, utils.FilterS, utils.StateServiceUP)
}
dbs := apiService.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) {
@@ -95,7 +91,7 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
apiService.Lock()
defer apiService.Unlock()
apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), apiService.connMgr, filterS, sdbs.DB())
apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), apiService.connMgr, fs.FilterS(), sdbs.DB())
srv, _ := engine.NewService(apiService.api)
// srv, _ := birpc.NewService(apiService.api, "", false)

View File

@@ -34,13 +34,11 @@ import (
// NewAnalyzerService returns the Analyzer Service
func NewAnalyzerService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
srvIndexer *servmanager.ServiceIndexer) *AnalyzerService {
return &AnalyzerService{
cfg: cfg,
filterSChan: filterSChan,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -48,8 +46,6 @@ func NewAnalyzerService(cfg *config.CGRConfig,
type AnalyzerService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
anz *analyzers.AnalyzerS
cl *commonlisteners.CommonListenerS
@@ -89,22 +85,23 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun
}
}(anz.anz)
anz.cl.SetAnalyzer(anz.anz)
go anz.start(ctx)
go anz.start()
close(anz.stateDeps.StateChan(utils.StateServiceUP))
return
}
func (anz *AnalyzerService) start(ctx *context.Context) {
fS, err := waitForFilterS(ctx, anz.filterSChan)
if err != nil {
func (anz *AnalyzerService) start() {
fs := anz.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), anz.cfg.GeneralCfg().ConnectTimeout) {
return
// return utils.NewServiceStateTimeoutError(utils.AnalyzerS, utils.FilterS, utils.StateServiceUP)
}
if !anz.IsRunning() {
return
}
anz.Lock()
anz.anz.SetFilterS(fS)
anz.anz.SetFilterS(fs.FilterS())
srv, _ := engine.NewService(anz.anz)
// srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false)

View File

@@ -34,15 +34,13 @@ import (
// NewAttributeService returns the Attribute Service
func NewAttributeService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
dspS *DispatcherService,
sIndxr *servmanager.ServiceIndexer) servmanager.Service {
return &AttributeService{
cfg: cfg,
filterSChan: filterSChan,
dspS: dspS,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
serviceIndexer: sIndxr,
cfg: cfg,
dspS: dspS,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
srvIndexer: sIndxr,
}
}
@@ -50,8 +48,7 @@ func NewAttributeService(cfg *config.CGRConfig,
type AttributeService struct {
sync.RWMutex
dspS *DispatcherService
filterSChan chan *engine.FilterS
dspS *DispatcherService
attrS *engine.AttributeS
cl *commonlisteners.CommonListenerS
@@ -59,9 +56,9 @@ type AttributeService struct {
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
serviceIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies
}
// Start should handle the service start
@@ -70,16 +67,16 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
return utils.ErrServiceAlreadyRunning
}
if utils.StructChanTimeout(
attrS.serviceIndexer.GetService(utils.CommonListenerS).StateChan(utils.StateServiceUP),
attrS.srvIndexer.GetService(utils.CommonListenerS).StateChan(utils.StateServiceUP),
attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP)
}
cls := attrS.serviceIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
cls := attrS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP)
}
attrS.cl = cls.CLS()
cacheS := attrS.serviceIndexer.GetService(utils.CacheS).(*CacheService)
cacheS := attrS.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CacheS, utils.StateServiceUP)
}
@@ -88,22 +85,22 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
utils.CacheAttributeFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, attrS.filterSChan); err != nil {
return
fs := attrS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.FilterS, utils.StateServiceUP)
}
dbs := attrS.serviceIndexer.GetService(utils.DataDB).(*DataDBService)
dbs := attrS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.DataDB, utils.StateServiceUP)
}
anz := attrS.serviceIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
anz := attrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.AnalyzerS, utils.StateServiceUP)
}
attrS.Lock()
defer attrS.Unlock()
attrS.attrS = engine.NewAttributeService(dbs.DataManager(), filterS, attrS.cfg)
attrS.attrS = engine.NewAttributeService(dbs.DataManager(), fs.FilterS(), attrS.cfg)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AttributeS))
attrS.rpc = apis.NewAttributeSv1(attrS.attrS)
srv, _ := engine.NewService(attrS.rpc)

View File

@@ -23,7 +23,6 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -31,11 +30,9 @@ import (
// NewCacheService .
func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
cores *CoreService,
srvIndexer *servmanager.ServiceIndexer) *CacheService {
return &CacheService{
cfg: cfg,
cores: cores,
connMgr: connMgr,
cacheCh: make(chan *engine.CacheS, 1),
srvIndexer: srvIndexer,
@@ -45,8 +42,6 @@ func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
// CacheService implements Agent interface
type CacheService struct {
cores *CoreService
cl *commonlisteners.CommonListenerS
cacheCh chan *engine.CacheS
@@ -73,11 +68,11 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CacheS, utils.AnalyzerS, utils.StateServiceUP)
}
var cs *cores.CoreS
if cs, err = cS.cores.WaitForCoreS(ctx); err != nil {
return
cs := cS.srvIndexer.GetService(utils.CoreS).(*CoreService)
if utils.StructChanTimeout(cs.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CoreS, utils.StateServiceUP)
}
engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CapsStats)
engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CoreS().CapsStats)
go engine.Cache.Precache(ctx, shtDw)
cS.cacheCh <- engine.Cache

View File

@@ -35,15 +35,13 @@ import (
// NewCDRServer returns the CDR Server
func NewCDRServer(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &CDRService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -51,8 +49,6 @@ func NewCDRServer(cfg *config.CGRConfig,
type CDRService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
cdrS *cdrs.CDRServer
cl *commonlisteners.CommonListenerS
@@ -77,9 +73,9 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err
return utils.NewServiceStateTimeoutError(utils.CDRs, utils.CommonListenerS, utils.StateServiceUP)
}
cs.cl = cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, cs.filterSChan); err != nil {
return
fs := cs.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.CDRs, utils.FilterS, utils.StateServiceUP)
}
dbs := cs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) {
@@ -97,7 +93,7 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err
cs.Lock()
defer cs.Unlock()
cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), filterS, cs.connMgr, sdbs.DB())
cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs.FilterS(), cs.connMgr, sdbs.DB())
runtime.Gosched()
utils.Logger.Info("Registering CDRS RPC service.")
srv, err := engine.NewServiceWithPing(cs.cdrS, utils.CDRsV1, utils.V1Prfx)

View File

@@ -34,15 +34,13 @@ import (
// NewChargerService returns the Charger Service
func NewChargerService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ChargerService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -50,8 +48,6 @@ func NewChargerService(cfg *config.CGRConfig,
type ChargerService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
chrS *engine.ChargerS
cl *commonlisteners.CommonListenerS
@@ -83,9 +79,9 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
utils.CacheChargerFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, chrS.filterSChan); err != nil {
return
fs := chrS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.FilterS, utils.StateServiceUP)
}
dbs := chrS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) {
@@ -98,7 +94,7 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
chrS.Lock()
defer chrS.Unlock()
chrS.chrS = engine.NewChargerService(dbs.DataManager(), filterS, chrS.cfg, chrS.connMgr)
chrS.chrS = engine.NewChargerService(dbs.DataManager(), fs.FilterS(), chrS.cfg, chrS.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ChargerS))
srv, _ := engine.NewService(chrS.chrS)
// srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false)

View File

@@ -140,20 +140,6 @@ func (cS *CoreService) ShouldRun() bool {
return true
}
// GetCoreS returns the coreS
func (cS *CoreService) WaitForCoreS(ctx *context.Context) (cs *cores.CoreS, err error) {
cS.mu.RLock()
cSCh := cS.csCh
cS.mu.RUnlock()
select {
case <-ctx.Done():
err = ctx.Err()
case cs = <-cSCh:
cSCh <- cs
}
return
}
// StateChan returns signaling channel of specific state
func (cS *CoreService) StateChan(stateID string) chan struct{} {
return cS.stateDeps.StateChan(stateID)
@@ -164,7 +150,8 @@ func (cS *CoreService) IntRPCConn() birpc.ClientConnector {
return cS.intRPCconn
}
func (cS *CoreService) GetCoreS() *cores.CoreS {
// CoreS returns the CoreS object.
func (cS *CoreService) CoreS() *cores.CoreS {
cS.mu.RLock()
defer cS.mu.RUnlock()
return cS.cS

View File

@@ -32,25 +32,23 @@ import (
)
// NewDiameterAgent returns the Diameter Agent
func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
func NewDiameterAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager, caps *engine.Caps,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &DiameterAgent{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
caps: caps,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
caps: caps,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
// DiameterAgent implements Agent interface
type DiameterAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
stopChan chan struct{}
cfg *config.CGRConfig
stopChan chan struct{}
da *agents.DiameterAgent
connMgr *engine.ConnManager
@@ -70,13 +68,13 @@ func (da *DiameterAgent) Start(ctx *context.Context, shtDwn context.CancelFunc)
return utils.ErrServiceAlreadyRunning
}
filterS, err := waitForFilterS(ctx, da.filterSChan)
if err != nil {
return err
fs := da.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), da.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DiameterAgent, utils.FilterS, utils.StateServiceUP)
}
da.Lock()
defer da.Unlock()
return da.start(filterS, shtDwn, da.caps)
return da.start(fs.FilterS(), shtDwn, da.caps)
}
func (da *DiameterAgent) start(filterS *engine.FilterS, shtDwn context.CancelFunc, caps *engine.Caps) error {
@@ -110,11 +108,11 @@ func (da *DiameterAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc)
return
}
close(da.stopChan)
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, da.filterSChan); err != nil {
return
fs := da.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), da.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DiameterAgent, utils.FilterS, utils.StateServiceUP)
}
return da.start(filterS, shtDwn, da.caps)
return da.start(fs.FilterS(), shtDwn, da.caps)
}
// Shutdown stops the service

View File

@@ -33,16 +33,14 @@ import (
// NewDispatcherService returns the Dispatcher Service
func NewDispatcherService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) *DispatcherService {
return &DispatcherService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvsReload: make(map[string]chan struct{}),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvsReload: make(map[string]chan struct{}),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -50,8 +48,6 @@ func NewDispatcherService(cfg *config.CGRConfig,
type DispatcherService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
dspS *dispatchers.DispatcherService
cl *commonlisteners.CommonListenerS
@@ -85,9 +81,10 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
utils.CacheDispatcherFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, dspS.filterSChan); err != nil {
return
fs := dspS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.FilterS, utils.StateServiceUP)
}
dbs := dspS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) {
@@ -101,7 +98,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
dspS.Lock()
defer dspS.Unlock()
dspS.dspS = dispatchers.NewDispatcherService(dbs.DataManager(), dspS.cfg, filterS, dspS.connMgr)
dspS.dspS = dispatchers.NewDispatcherService(dbs.DataManager(), dspS.cfg, fs.FilterS(), dspS.connMgr)
dspS.unregisterAllDispatchedSubsystems() // unregister all rpc services that can be dispatched

View File

@@ -32,23 +32,21 @@ import (
)
// NewDNSAgent returns the DNS Agent
func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
func NewDNSAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &DNSAgent{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
// DNSAgent implements Agent interface
type DNSAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
cfg *config.CGRConfig
stopChan chan struct{}
@@ -65,14 +63,14 @@ func (dns *DNSAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err
if dns.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, dns.filterSChan); err != nil {
return
fs := dns.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dns.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DNSAgent, utils.FilterS, utils.StateServiceUP)
}
dns.Lock()
defer dns.Unlock()
dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr)
dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), dns.connMgr)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
dns.dns = nil
@@ -86,8 +84,10 @@ func (dns *DNSAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err
// Reload handles the change of config
func (dns *DNSAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) (err error) {
filterS := <-dns.filterSChan
dns.filterSChan <- filterS
fs := dns.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dns.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.DNSAgent, utils.FilterS, utils.StateServiceUP)
}
dns.Lock()
defer dns.Unlock()
@@ -96,7 +96,7 @@ func (dns *DNSAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) (er
close(dns.stopChan)
}
dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr)
dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), dns.connMgr)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
dns.dns = nil

View File

@@ -33,15 +33,14 @@ import (
)
// NewEventExporterService constructs EventExporterService
func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
func NewEventExporterService(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &EventExporterService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -49,8 +48,6 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil
type EventExporterService struct {
mu sync.RWMutex
filterSChan chan *engine.FilterS
eeS *ees.EeS
cl *commonlisteners.CommonListenerS
@@ -109,9 +106,9 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
return utils.NewServiceStateTimeoutError(utils.EEs, utils.CommonListenerS, utils.StateServiceUP)
}
es.cl = cls.CLS()
fltrS, err := waitForFilterS(ctx, es.filterSChan)
if err != nil {
return err
fs := es.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.EEs, utils.FilterS, utils.StateServiceUP)
}
anz := es.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) {
@@ -123,7 +120,8 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
es.mu.Lock()
defer es.mu.Unlock()
es.eeS, err = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr)
var err error
es.eeS, err = ees.NewEventExporterS(es.cfg, fs.FilterS(), es.connMgr)
if err != nil {
return err
}

View File

@@ -35,16 +35,14 @@ import (
// NewEventReaderService returns the EventReader Service
func NewEventReaderService(
cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &EventReaderService{
rldChan: make(chan struct{}, 1),
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
rldChan: make(chan struct{}, 1),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -52,8 +50,6 @@ func NewEventReaderService(
type EventReaderService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
ers *ers.ERService
cl *commonlisteners.CommonListenerS
@@ -78,9 +74,9 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel
return utils.NewServiceStateTimeoutError(utils.ERs, utils.CommonListenerS, utils.StateServiceUP)
}
erS.cl = cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil {
return
fs := erS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ERs, utils.FilterS, utils.StateServiceUP)
}
anz := erS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) {
@@ -96,7 +92,7 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs))
// build the service
erS.ers = ers.NewERService(erS.cfg, filterS, erS.connMgr)
erS.ers = ers.NewERService(erS.cfg, fs.FilterS(), erS.connMgr)
go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan, shtDwn)
srv, err := engine.NewServiceWithPing(erS.ers, utils.ErSv1, utils.V1Prfx)

118
services/filters.go Normal file
View File

@@ -0,0 +1,118 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewFilterService instantiates a new FilterService.
func NewFilterService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) *FilterService {
return &FilterService{
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
// FilterService implements Service interface.
type FilterService struct {
mu sync.RWMutex
fltrS *engine.FilterS
cfg *config.CGRConfig
connMgr *engine.ConnManager
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start handles the service start.
func (s *FilterService) Start(ctx *context.Context, _ context.CancelFunc) error {
cacheS := s.srvIndexer.GetService(utils.CacheS).(*CacheService)
if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.FilterS, utils.CacheS, utils.StateServiceUP)
}
if err := cacheS.WaitToPrecache(ctx, utils.CacheFilters); err != nil {
return err
}
dbs := s.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.FilterS, utils.DataDB, utils.StateServiceUP)
}
s.fltrS = engine.NewFilterS(s.cfg, s.connMgr, dbs.DataManager())
close(s.stateDeps.StateChan(utils.StateServiceUP))
return nil
}
// Reload handles the config changes.
func (s *FilterService) Reload(*context.Context, context.CancelFunc) error {
return nil
}
// Shutdown stops the service.
func (s *FilterService) Shutdown() error {
s.mu.Lock()
defer s.mu.Unlock()
s.fltrS = nil
return nil
}
// IsRunning returns whether the service is running or not.
func (s *FilterService) IsRunning() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.fltrS != nil
}
// ServiceName returns the service name
func (s *FilterService) ServiceName() string {
return utils.FilterS
}
// ShouldRun returns if the service should be running.
func (s *FilterService) ShouldRun() bool {
return true
}
// StateChan returns signaling channel of specific state
func (s *FilterService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (s *FilterService) IntRPCConn() birpc.ClientConnector {
return s.intRPCconn
}
// FilterS returns the FilterS object.
func (s *FilterService) FilterS() *engine.FilterS {
return s.fltrS
}

View File

@@ -33,15 +33,14 @@ import (
)
// NewHTTPAgent returns the HTTP Agent
func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
func NewHTTPAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &HTTPAgent{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -49,8 +48,6 @@ func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
type HTTPAgent struct {
sync.RWMutex
filterSChan chan *engine.FilterS
cl *commonlisteners.CommonListenerS
// we can realy stop the HTTPAgent so keep a flag
@@ -76,9 +73,9 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro
return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.CommonListenerS, utils.StateServiceUP)
}
cl := cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ha.filterSChan); err != nil {
return
fs := ha.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ha.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.FilterS, utils.StateServiceUP)
}
ha.Lock()
@@ -86,7 +83,7 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro
utils.Logger.Info(fmt.Sprintf("<%s> successfully started HTTPAgent", utils.HTTPAgent))
for _, agntCfg := range ha.cfg.HTTPAgentCfg() {
cl.RegisterHttpHandler(agntCfg.URL,
agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, filterS,
agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, fs.FilterS(),
ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload,
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
}

View File

@@ -33,15 +33,14 @@ import (
)
// NewJanusAgent returns the Janus Agent
func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
func NewJanusAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &JanusAgent{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -49,8 +48,6 @@ func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
type JanusAgent struct {
sync.RWMutex
filterSChan chan *engine.FilterS
jA *agents.JanusAgent
// we can realy stop the JanusAgent so keep a flag
@@ -72,9 +69,9 @@ func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err err
return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.CommonListenerS, utils.StateServiceUP)
}
cl := cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ja.filterSChan); err != nil {
return
fs := ja.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ja.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.FilterS, utils.StateServiceUP)
}
ja.Lock()
@@ -82,7 +79,7 @@ func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err err
ja.Unlock()
return utils.ErrServiceAlreadyRunning
}
ja.jA, err = agents.NewJanusAgent(ja.cfg, ja.connMgr, filterS)
ja.jA, err = agents.NewJanusAgent(ja.cfg, ja.connMgr, fs.FilterS())
if err != nil {
return
}

View File

@@ -34,16 +34,14 @@ import (
// NewLoaderService returns the Loader Service
func NewLoaderService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) *LoaderService {
return &LoaderService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
stopChan: make(chan struct{}),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
stopChan: make(chan struct{}),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -51,8 +49,6 @@ func NewLoaderService(cfg *config.CGRConfig,
type LoaderService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
ldrs *loaders.LoaderS
cl *commonlisteners.CommonListenerS
@@ -76,9 +72,10 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.CommonListenerS, utils.StateServiceUP)
}
ldrs.cl = cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil {
return
fs := ldrs.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.FilterS, utils.StateServiceUP)
}
dbs := ldrs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
@@ -92,7 +89,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
ldrs.Lock()
defer ldrs.Unlock()
ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), filterS, ldrs.connMgr)
ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), fs.FilterS(), ldrs.connMgr)
if !ldrs.ldrs.Enabled() {
return
@@ -114,9 +111,9 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
// Reload handles the change of config
func (ldrs *LoaderService) Reload(ctx *context.Context, _ context.CancelFunc) error {
filterS, err := waitForFilterS(ctx, ldrs.filterSChan)
if err != nil {
return err
fs := ldrs.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.FilterS, utils.StateServiceUP)
}
dbs := ldrs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) {
@@ -128,7 +125,7 @@ func (ldrs *LoaderService) Reload(ctx *context.Context, _ context.CancelFunc) er
ldrs.RLock()
defer ldrs.RUnlock()
ldrs.ldrs.Reload(dbs.DataManager(), filterS, ldrs.connMgr)
ldrs.ldrs.Reload(dbs.DataManager(), fs.FilterS(), ldrs.connMgr)
return ldrs.ldrs.ListenAndServe(ldrs.stopChan)
}

View File

@@ -32,24 +32,22 @@ import (
)
// NewRadiusAgent returns the Radius Agent
func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
func NewRadiusAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RadiusAgent{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
// RadiusAgent implements Agent interface
type RadiusAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
stopChan chan struct{}
cfg *config.CGRConfig
stopChan chan struct{}
rad *agents.RadiusAgent
connMgr *engine.ConnManager
@@ -69,9 +67,9 @@ func (rad *RadiusAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (
return utils.ErrServiceAlreadyRunning
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, rad.filterSChan); err != nil {
return
fs := rad.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), rad.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RadiusAgent, utils.FilterS, utils.StateServiceUP)
}
rad.Lock()
@@ -81,7 +79,7 @@ func (rad *RadiusAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (
rad.lauth = rad.cfg.RadiusAgentCfg().ListenAuth
rad.lacct = rad.cfg.RadiusAgentCfg().ListenAcct
if rad.rad, err = agents.NewRadiusAgent(rad.cfg, filterS, rad.connMgr); err != nil {
if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.FilterS(), rad.connMgr); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error()))
return
}

View File

@@ -34,25 +34,21 @@ import (
// NewRankingService returns the RankingS Service
func NewRankingService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RankingService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
type RankingService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
ran *engine.RankingS
cl *commonlisteners.CommonListenerS
@@ -91,9 +87,9 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RankingS, utils.DataDB, utils.StateServiceUP)
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ran.filterSChan); err != nil {
return
fs := ran.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RankingS, utils.FilterS, utils.StateServiceUP)
}
anz := ran.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) {
@@ -102,7 +98,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
ran.Lock()
defer ran.Unlock()
ran.ran = engine.NewRankingS(dbs.DataManager(), ran.connMgr, filterS, ran.cfg)
ran.ran = engine.NewRankingS(dbs.DataManager(), ran.connMgr, fs.FilterS(), ran.cfg)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
utils.CoreS, utils.RankingS))

View File

@@ -33,14 +33,12 @@ import (
// NewRateService constructs RateService
func NewRateService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RateService{
cfg: cfg,
filterSChan: filterSChan,
rldChan: make(chan struct{}),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
rldChan: make(chan struct{}),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -48,9 +46,6 @@ func NewRateService(cfg *config.CGRConfig,
type RateService struct {
sync.RWMutex
dmS *DataDBService
filterSChan chan *engine.FilterS
rateS *rates.RateS
cl *commonlisteners.CommonListenerS
@@ -118,9 +113,9 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
utils.CacheRateFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, rs.filterSChan); err != nil {
return
fs := rs.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RateS, utils.FilterS, utils.StateServiceUP)
}
dbs := rs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) {
@@ -132,7 +127,7 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
}
rs.Lock()
rs.rateS = rates.NewRateS(rs.cfg, filterS, dbs.DataManager())
rs.rateS = rates.NewRateS(rs.cfg, fs.FilterS(), dbs.DataManager())
rs.Unlock()
rs.stopChan = make(chan struct{})

View File

@@ -33,17 +33,15 @@ import (
// NewResourceService returns the Resource Service
func NewResourceService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ResourceService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -51,8 +49,6 @@ func NewResourceService(cfg *config.CGRConfig,
type ResourceService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
reS *engine.ResourceS
cl *commonlisteners.CommonListenerS
@@ -87,9 +83,9 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
utils.CacheResourceFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, reS.filterSChan); err != nil {
return
fs := reS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.FilterS, utils.StateServiceUP)
}
dbs := reS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) {
@@ -102,7 +98,7 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
reS.Lock()
defer reS.Unlock()
reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, filterS, reS.connMgr)
reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, fs.FilterS(), reS.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS))
reS.reS.StartLoop(ctx)
srv, _ := engine.NewService(reS.reS)

View File

@@ -34,15 +34,13 @@ import (
// NewRouteService returns the Route Service
func NewRouteService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RouteService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -50,8 +48,6 @@ func NewRouteService(cfg *config.CGRConfig,
type RouteService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
routeS *engine.RouteS
cl *commonlisteners.CommonListenerS
@@ -83,9 +79,9 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
utils.CacheRouteFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, routeS.filterSChan); err != nil {
return
fs := routeS.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.RouteS, utils.FilterS, utils.StateServiceUP)
}
dbs := routeS.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) {
@@ -98,7 +94,7 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
routeS.Lock()
defer routeS.Unlock()
routeS.routeS = engine.NewRouteService(dbs.DataManager(), filterS, routeS.cfg, routeS.connMgr)
routeS.routeS = engine.NewRouteService(dbs.DataManager(), fs.FilterS(), routeS.cfg, routeS.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.RouteS))
srv, _ := engine.NewService(routeS.routeS)

View File

@@ -35,15 +35,14 @@ import (
)
// NewSessionService returns the Session Service
func NewSessionService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
func NewSessionService(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &SessionService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -51,8 +50,6 @@ func NewSessionService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
type SessionService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
sm *sessions.SessionS
cl *commonlisteners.CommonListenerS
@@ -77,9 +74,9 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
return utils.NewServiceStateTimeoutError(utils.SessionS, utils.CommonListenerS, utils.StateServiceUP)
}
smg.cl = cls.CLS()
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil {
return
fs := smg.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.SessionS, utils.FilterS, utils.StateServiceUP)
}
dbs := smg.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) {
@@ -93,7 +90,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
smg.Lock()
defer smg.Unlock()
smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), filterS, smg.connMgr)
smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), fs.FilterS(), smg.connMgr)
//start sync session in a separate goroutine
smg.stopChan = make(chan struct{})
go smg.sm.ListenAndServe(smg.stopChan)

View File

@@ -32,23 +32,21 @@ import (
)
// NewSIPAgent returns the sip Agent
func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
func NewSIPAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &SIPAgent{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
// SIPAgent implements Agent interface
type SIPAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
cfg *config.CGRConfig
sip *agents.SIPAgent
connMgr *engine.ConnManager
@@ -66,15 +64,15 @@ func (sip *SIPAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err
return utils.ErrServiceAlreadyRunning
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, sip.filterSChan); err != nil {
return
fs := sip.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), sip.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.SIPAgent, utils.FilterS, utils.StateServiceUP)
}
sip.Lock()
defer sip.Unlock()
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS)
sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, fs.FilterS())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!",
utils.SIPAgent, err))

View File

@@ -33,17 +33,15 @@ import (
// NewStatService returns the Stat Service
func NewStatService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &StatService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -51,8 +49,6 @@ func NewStatService(cfg *config.CGRConfig,
type StatService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
sts *engine.StatS
cl *commonlisteners.CommonListenerS
@@ -87,9 +83,9 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
utils.CacheStatFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, sts.filterSChan); err != nil {
return
fs := sts.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.StatS, utils.FilterS, utils.StateServiceUP)
}
dbs := sts.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) {
@@ -102,7 +98,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
sts.Lock()
defer sts.Unlock()
sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, filterS, sts.connMgr)
sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, fs.FilterS(), sts.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
utils.CoreS, utils.StatS))

View File

@@ -33,17 +33,15 @@ import (
// NewThresholdService returns the Threshold Service
func NewThresholdService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ThresholdService{
cfg: cfg,
filterSChan: filterSChan,
srvDep: srvDep,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
srvDep: srvDep,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -51,8 +49,6 @@ func NewThresholdService(cfg *config.CGRConfig,
type ThresholdService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
thrs *engine.ThresholdS
cl *commonlisteners.CommonListenerS
@@ -87,9 +83,9 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
utils.CacheThresholdFilterIndexes); err != nil {
return
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, thrs.filterSChan); err != nil {
return
fs := thrs.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.FilterS, utils.StateServiceUP)
}
dbs := thrs.srvIndexer.GetService(utils.DataDB).(*DataDBService)
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) {
@@ -102,7 +98,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
thrs.Lock()
defer thrs.Unlock()
thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, filterS, thrs.connMgr)
thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, fs.FilterS(), thrs.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS))
thrs.thrs.StartLoop(ctx)

View File

@@ -33,25 +33,21 @@ import (
// NewTrendsService returns the TrendS Service
func NewTrendService(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &TrendService{
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
filterSChan: filterSChan,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
type TrendService struct {
sync.RWMutex
filterSChan chan *engine.FilterS
trs *engine.TrendS
cl *commonlisteners.CommonListenerS
@@ -90,9 +86,9 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TrendS, utils.DataDB, utils.StateServiceUP)
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, trs.filterSChan); err != nil {
return
fs := trs.srvIndexer.GetService(utils.FilterS).(*FilterService)
if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.TrendS, utils.FilterS, utils.StateServiceUP)
}
anz := trs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) {
@@ -101,7 +97,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
trs.Lock()
defer trs.Unlock()
trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, filterS, trs.connMgr)
trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), trs.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TrendS))
if err := trs.trs.StartTrendS(ctx); err != nil {
return err