From da2052e7b386efe87f2eb72d13f65ecc6e5b5138 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 11 Dec 2024 18:39:48 +0200 Subject: [PATCH] Implement FilterService And use ServiceIndexer to sync with it --- cmd/cgr-engine/cgr-engine.go | 78 +++++++++-------------- services/accounts.go | 22 +++---- services/actions.go | 22 +++---- services/adminsv1.go | 20 +++--- services/analyzers.go | 21 +++---- services/attributes.go | 37 +++++------ services/caches.go | 13 ++-- services/cdrs.go | 20 +++--- services/chargers.go | 20 +++--- services/cores.go | 17 +---- services/diameteragent.go | 34 +++++----- services/dispatchers.go | 23 +++---- services/dnsagent.go | 30 ++++----- services/ees.go | 22 +++---- services/ers.go | 22 +++---- services/filters.go | 118 +++++++++++++++++++++++++++++++++++ services/httpagent.go | 21 +++---- services/janus.go | 21 +++---- services/loaders.go | 31 +++++---- services/radiusagent.go | 24 ++++--- services/rankings.go | 22 +++---- services/rates.go | 21 +++---- services/resources.go | 22 +++---- services/routes.go | 20 +++--- services/sessions.go | 21 +++---- services/sipagent.go | 22 +++---- services/stats.go | 22 +++---- services/thresholds.go | 22 +++---- services/trends.go | 22 +++---- 29 files changed, 404 insertions(+), 406 deletions(-) create mode 100644 services/filters.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index a75d3bbf2..4bc4fa2e2 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -134,46 +134,45 @@ func runCGREngine(fs []string) (err error) { iGuardianSCh := make(chan birpc.ClientConnector, 1) connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, iGuardianSCh) - iFilterSCh := make(chan *engine.FilterS, 1) - // ServiceIndexer will share service references to all services srvIdxr := servmanager.NewServiceIndexer() gvS := services.NewGlobalVarS(cfg, srvIdxr) dmS := services.NewDataDBService(cfg, connMgr, *flags.SetVersions, srvDep, srvIdxr) sdbS := services.NewStorDBService(cfg, *flags.SetVersions, srvIdxr) cls := services.NewCommonListenerService(cfg, caps, srvIdxr) - anzS := services.NewAnalyzerService(cfg, iFilterSCh, srvIdxr) + anzS := services.NewAnalyzerService(cfg, srvIdxr) coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg, srvIdxr) - cacheS := services.NewCacheService(cfg, connMgr, coreS, srvIdxr) - dspS := services.NewDispatcherService(cfg, iFilterSCh, connMgr, srvIdxr) - ldrs := services.NewLoaderService(cfg, iFilterSCh, connMgr, srvIdxr) + cacheS := services.NewCacheService(cfg, connMgr, srvIdxr) + fltrS := services.NewFilterService(cfg, connMgr, srvIdxr) + dspS := services.NewDispatcherService(cfg, connMgr, srvIdxr) + ldrs := services.NewLoaderService(cfg, connMgr, srvIdxr) efs := services.NewExportFailoverService(cfg, connMgr, srvIdxr) - adminS := services.NewAdminSv1Service(cfg, iFilterSCh, connMgr, srvIdxr) - sessionS := services.NewSessionService(cfg, iFilterSCh, connMgr, srvIdxr) - attrS := services.NewAttributeService(cfg, iFilterSCh, dspS, srvIdxr) - chrgS := services.NewChargerService(cfg, iFilterSCh, connMgr, srvIdxr) - routeS := services.NewRouteService(cfg, iFilterSCh, connMgr, srvIdxr) - resourceS := services.NewResourceService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) - trendS := services.NewTrendService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) - rankingS := services.NewRankingService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) - thS := services.NewThresholdService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) - stS := services.NewStatService(cfg, iFilterSCh, connMgr, srvDep, srvIdxr) - erS := services.NewEventReaderService(cfg, iFilterSCh, connMgr, srvIdxr) - dnsAgent := services.NewDNSAgent(cfg, iFilterSCh, connMgr, srvIdxr) + adminS := services.NewAdminSv1Service(cfg, connMgr, srvIdxr) + sessionS := services.NewSessionService(cfg, connMgr, srvIdxr) + attrS := services.NewAttributeService(cfg, dspS, srvIdxr) + chrgS := services.NewChargerService(cfg, connMgr, srvIdxr) + routeS := services.NewRouteService(cfg, connMgr, srvIdxr) + resourceS := services.NewResourceService(cfg, connMgr, srvDep, srvIdxr) + trendS := services.NewTrendService(cfg, connMgr, srvDep, srvIdxr) + rankingS := services.NewRankingService(cfg, connMgr, srvDep, srvIdxr) + thS := services.NewThresholdService(cfg, connMgr, srvDep, srvIdxr) + stS := services.NewStatService(cfg, connMgr, srvDep, srvIdxr) + erS := services.NewEventReaderService(cfg, connMgr, srvIdxr) + dnsAgent := services.NewDNSAgent(cfg, connMgr, srvIdxr) fsAgent := services.NewFreeswitchAgent(cfg, connMgr, srvIdxr) kamAgent := services.NewKamailioAgent(cfg, connMgr, srvIdxr) - janusAgent := services.NewJanusAgent(cfg, iFilterSCh, connMgr, srvIdxr) + janusAgent := services.NewJanusAgent(cfg, connMgr, srvIdxr) astAgent := services.NewAsteriskAgent(cfg, connMgr, srvIdxr) - radAgent := services.NewRadiusAgent(cfg, iFilterSCh, connMgr, srvIdxr) - diamAgent := services.NewDiameterAgent(cfg, iFilterSCh, connMgr, caps, srvIdxr) - httpAgent := services.NewHTTPAgent(cfg, iFilterSCh, connMgr, srvIdxr) - sipAgent := services.NewSIPAgent(cfg, iFilterSCh, connMgr, srvIdxr) - eeS := services.NewEventExporterService(cfg, iFilterSCh, connMgr, srvIdxr) - cdrS := services.NewCDRServer(cfg, iFilterSCh, connMgr, srvIdxr) + radAgent := services.NewRadiusAgent(cfg, connMgr, srvIdxr) + diamAgent := services.NewDiameterAgent(cfg, connMgr, caps, srvIdxr) + httpAgent := services.NewHTTPAgent(cfg, connMgr, srvIdxr) + sipAgent := services.NewSIPAgent(cfg, connMgr, srvIdxr) + eeS := services.NewEventExporterService(cfg, connMgr, srvIdxr) + cdrS := services.NewCDRServer(cfg, connMgr, srvIdxr) registrarcS := services.NewRegistrarCService(cfg, connMgr, srvIdxr) - rateS := services.NewRateService(cfg, iFilterSCh, srvIdxr) - actionS := services.NewActionService(cfg, iFilterSCh, connMgr, srvIdxr) - accS := services.NewAccountService(cfg, iFilterSCh, connMgr, srvIdxr) + rateS := services.NewRateService(cfg, srvIdxr) + actionS := services.NewActionService(cfg, connMgr, srvIdxr) + accS := services.NewAccountService(cfg, connMgr, srvIdxr) tpeS := services.NewTPeService(cfg, connMgr, srvIdxr) srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{ @@ -184,6 +183,7 @@ func runCGREngine(fs []string) (err error) { anzS, coreS, cacheS, + fltrS, dspS, ldrs, efs, @@ -308,8 +308,6 @@ func runCGREngine(fs []string) (err error) { return } srvManager.StartServices(ctx, cancel) - // Start FilterS - go cgrStartFilterService(ctx, iFilterSCh, cacheS.GetCacheSChan(), connMgr, cfg, dmS) cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, cls.CLS(), anzS) cgrInitGuardianSv1(iGuardianSCh, cfg, cls.CLS(), anzS) @@ -326,7 +324,7 @@ func runCGREngine(fs []string) (err error) { // TODO: find a better location for this if block if *flags.MemPrfDir != "" { - if err := coreS.GetCoreS().StartMemoryProfiling(cores.MemoryProfilingParams{ + if err := coreS.CoreS().StartMemoryProfiling(cores.MemoryProfilingParams{ DirPath: *flags.MemPrfDir, MaxFiles: *flags.MemPrfMaxF, Interval: *flags.MemPrfInterval, @@ -369,24 +367,6 @@ func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string return } -// cgrStartFilterService fires up the FilterS -func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS, - cacheSCh chan *engine.CacheS, connMgr *engine.ConnManager, - cfg *config.CGRConfig, db *services.DataDBService) { - var cacheS *engine.CacheS - select { - case cacheS = <-cacheSCh: - cacheSCh <- cacheS - case <-ctx.Done(): - return - } - select { - case <-cacheS.GetPrecacheChannel(utils.CacheFilters): - iFilterSCh <- engine.NewFilterS(cfg, connMgr, db.DataManager()) - case <-ctx.Done(): - } -} - func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig, cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) { srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true) diff --git a/services/accounts.go b/services/accounts.go index bf68b8e92..35e15d469 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -36,16 +36,14 @@ import ( // NewAccountService returns the Account Service func NewAccountService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AccountService{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - rldChan: make(chan struct{}, 1), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + rldChan: make(chan struct{}, 1), + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -53,8 +51,6 @@ func NewAccountService(cfg *config.CGRConfig, type AccountService struct { sync.RWMutex - filterSChan chan *engine.FilterS - acts *accounts.AccountS cl *commonlisteners.CommonListenerS @@ -87,9 +83,9 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e utils.CacheAccountsFilterIndexes); err != nil { return } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil { - return + fs := acts.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AccountS, utils.FilterS, utils.StateServiceUP) } dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { @@ -102,7 +98,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e acts.Lock() defer acts.Unlock() - acts.acts = accounts.NewAccountS(acts.cfg, filterS, acts.connMgr, dbs.DataManager()) + acts.acts = accounts.NewAccountS(acts.cfg, fs.FilterS(), acts.connMgr, dbs.DataManager()) acts.stopChan = make(chan struct{}) go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) diff --git a/services/actions.go b/services/actions.go index b31a35d19..207e98c37 100644 --- a/services/actions.go +++ b/services/actions.go @@ -36,16 +36,14 @@ import ( // NewActionService returns the Action Service func NewActionService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ActionService{ - connMgr: connMgr, - cfg: cfg, - filterSChan: filterSChan, - rldChan: make(chan struct{}, 1), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + connMgr: connMgr, + cfg: cfg, + rldChan: make(chan struct{}, 1), + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -53,8 +51,6 @@ func NewActionService(cfg *config.CGRConfig, type ActionService struct { sync.RWMutex - filterSChan chan *engine.FilterS - acts *actions.ActionS cl *commonlisteners.CommonListenerS @@ -89,9 +85,9 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er utils.CacheActionProfilesFilterIndexes); err != nil { return } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, acts.filterSChan); err != nil { - return + fs := acts.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ActionS, utils.FilterS, utils.StateServiceUP) } dbs := acts.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), acts.cfg.GeneralCfg().ConnectTimeout) { @@ -104,7 +100,7 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er acts.Lock() defer acts.Unlock() - acts.acts = actions.NewActionS(acts.cfg, filterS, dbs.DataManager(), acts.connMgr) + acts.acts = actions.NewActionS(acts.cfg, fs.FilterS(), dbs.DataManager(), acts.connMgr) acts.stopChan = make(chan struct{}) go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) diff --git a/services/adminsv1.go b/services/adminsv1.go index 3a09906e8..d10a6e22f 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -33,15 +33,13 @@ import ( // NewAPIerSv1Service returns the APIerSv1 Service func NewAdminSv1Service(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AdminSv1Service{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -49,8 +47,6 @@ func NewAdminSv1Service(cfg *config.CGRConfig, type AdminSv1Service struct { sync.RWMutex - filterSChan chan *engine.FilterS - api *apis.AdminSv1 cl *commonlisteners.CommonListenerS @@ -75,9 +71,9 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF return utils.NewServiceStateTimeoutError(utils.AdminS, utils.CommonListenerS, utils.StateServiceUP) } apiService.cl = cls.CLS() - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil { - return + fs := apiService.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AdminS, utils.FilterS, utils.StateServiceUP) } dbs := apiService.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), apiService.cfg.GeneralCfg().ConnectTimeout) { @@ -95,7 +91,7 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF apiService.Lock() defer apiService.Unlock() - apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), apiService.connMgr, filterS, sdbs.DB()) + apiService.api = apis.NewAdminSv1(apiService.cfg, dbs.DataManager(), apiService.connMgr, fs.FilterS(), sdbs.DB()) srv, _ := engine.NewService(apiService.api) // srv, _ := birpc.NewService(apiService.api, "", false) diff --git a/services/analyzers.go b/services/analyzers.go index 12e5d3265..1fda1a913 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -34,13 +34,11 @@ import ( // NewAnalyzerService returns the Analyzer Service func NewAnalyzerService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, srvIndexer *servmanager.ServiceIndexer) *AnalyzerService { return &AnalyzerService{ - cfg: cfg, - filterSChan: filterSChan, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -48,8 +46,6 @@ func NewAnalyzerService(cfg *config.CGRConfig, type AnalyzerService struct { sync.RWMutex - filterSChan chan *engine.FilterS - anz *analyzers.AnalyzerS cl *commonlisteners.CommonListenerS @@ -89,22 +85,23 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun } }(anz.anz) anz.cl.SetAnalyzer(anz.anz) - go anz.start(ctx) + go anz.start() close(anz.stateDeps.StateChan(utils.StateServiceUP)) return } -func (anz *AnalyzerService) start(ctx *context.Context) { - fS, err := waitForFilterS(ctx, anz.filterSChan) - if err != nil { +func (anz *AnalyzerService) start() { + fs := anz.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), anz.cfg.GeneralCfg().ConnectTimeout) { return + // return utils.NewServiceStateTimeoutError(utils.AnalyzerS, utils.FilterS, utils.StateServiceUP) } if !anz.IsRunning() { return } anz.Lock() - anz.anz.SetFilterS(fS) + anz.anz.SetFilterS(fs.FilterS()) srv, _ := engine.NewService(anz.anz) // srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false) diff --git a/services/attributes.go b/services/attributes.go index 989472d0e..b74e7b2c1 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -34,15 +34,13 @@ import ( // NewAttributeService returns the Attribute Service func NewAttributeService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, dspS *DispatcherService, sIndxr *servmanager.ServiceIndexer) servmanager.Service { return &AttributeService{ - cfg: cfg, - filterSChan: filterSChan, - dspS: dspS, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), - serviceIndexer: sIndxr, + cfg: cfg, + dspS: dspS, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + srvIndexer: sIndxr, } } @@ -50,8 +48,7 @@ func NewAttributeService(cfg *config.CGRConfig, type AttributeService struct { sync.RWMutex - dspS *DispatcherService - filterSChan chan *engine.FilterS + dspS *DispatcherService attrS *engine.AttributeS cl *commonlisteners.CommonListenerS @@ -59,9 +56,9 @@ type AttributeService struct { cfg *config.CGRConfig - intRPCconn birpc.ClientConnector // expose API methods over internal connection - serviceIndexer *servmanager.ServiceIndexer // access directly services from here - stateDeps *StateDependencies + intRPCconn birpc.ClientConnector // expose API methods over internal connection + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies } // Start should handle the service start @@ -70,16 +67,16 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) return utils.ErrServiceAlreadyRunning } if utils.StructChanTimeout( - attrS.serviceIndexer.GetService(utils.CommonListenerS).StateChan(utils.StateServiceUP), + attrS.srvIndexer.GetService(utils.CommonListenerS).StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP) } - cls := attrS.serviceIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) + cls := attrS.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService) if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP) } attrS.cl = cls.CLS() - cacheS := attrS.serviceIndexer.GetService(utils.CacheS).(*CacheService) + cacheS := attrS.srvIndexer.GetService(utils.CacheS).(*CacheService) if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CacheS, utils.StateServiceUP) } @@ -88,22 +85,22 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) utils.CacheAttributeFilterIndexes); err != nil { return } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, attrS.filterSChan); err != nil { - return + fs := attrS.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.FilterS, utils.StateServiceUP) } - dbs := attrS.serviceIndexer.GetService(utils.DataDB).(*DataDBService) + dbs := attrS.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.DataDB, utils.StateServiceUP) } - anz := attrS.serviceIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) + anz := attrS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), attrS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.AnalyzerS, utils.StateServiceUP) } attrS.Lock() defer attrS.Unlock() - attrS.attrS = engine.NewAttributeService(dbs.DataManager(), filterS, attrS.cfg) + attrS.attrS = engine.NewAttributeService(dbs.DataManager(), fs.FilterS(), attrS.cfg) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AttributeS)) attrS.rpc = apis.NewAttributeSv1(attrS.attrS) srv, _ := engine.NewService(attrS.rpc) diff --git a/services/caches.go b/services/caches.go index 070efaba0..26d399d0d 100644 --- a/services/caches.go +++ b/services/caches.go @@ -23,7 +23,6 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" @@ -31,11 +30,9 @@ import ( // NewCacheService . func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - cores *CoreService, srvIndexer *servmanager.ServiceIndexer) *CacheService { return &CacheService{ cfg: cfg, - cores: cores, connMgr: connMgr, cacheCh: make(chan *engine.CacheS, 1), srvIndexer: srvIndexer, @@ -45,8 +42,6 @@ func NewCacheService(cfg *config.CGRConfig, connMgr *engine.ConnManager, // CacheService implements Agent interface type CacheService struct { - cores *CoreService - cl *commonlisteners.CommonListenerS cacheCh chan *engine.CacheS @@ -73,11 +68,11 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.CacheS, utils.AnalyzerS, utils.StateServiceUP) } - var cs *cores.CoreS - if cs, err = cS.cores.WaitForCoreS(ctx); err != nil { - return + cs := cS.srvIndexer.GetService(utils.CoreS).(*CoreService) + if utils.StructChanTimeout(cs.StateChan(utils.StateServiceUP), cS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.CacheS, utils.CoreS, utils.StateServiceUP) } - engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CapsStats) + engine.Cache = engine.NewCacheS(cS.cfg, dbs.DataManager(), cS.connMgr, cs.CoreS().CapsStats) go engine.Cache.Precache(ctx, shtDw) cS.cacheCh <- engine.Cache diff --git a/services/cdrs.go b/services/cdrs.go index c11f04b08..7ce0387ee 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -35,15 +35,13 @@ import ( // NewCDRServer returns the CDR Server func NewCDRServer(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &CDRService{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -51,8 +49,6 @@ func NewCDRServer(cfg *config.CGRConfig, type CDRService struct { sync.RWMutex - filterSChan chan *engine.FilterS - cdrS *cdrs.CDRServer cl *commonlisteners.CommonListenerS @@ -77,9 +73,9 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err return utils.NewServiceStateTimeoutError(utils.CDRs, utils.CommonListenerS, utils.StateServiceUP) } cs.cl = cls.CLS() - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, cs.filterSChan); err != nil { - return + fs := cs.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.CDRs, utils.FilterS, utils.StateServiceUP) } dbs := cs.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), cs.cfg.GeneralCfg().ConnectTimeout) { @@ -97,7 +93,7 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err cs.Lock() defer cs.Unlock() - cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), filterS, cs.connMgr, sdbs.DB()) + cs.cdrS = cdrs.NewCDRServer(cs.cfg, dbs.DataManager(), fs.FilterS(), cs.connMgr, sdbs.DB()) runtime.Gosched() utils.Logger.Info("Registering CDRS RPC service.") srv, err := engine.NewServiceWithPing(cs.cdrS, utils.CDRsV1, utils.V1Prfx) diff --git a/services/chargers.go b/services/chargers.go index cec4aba7d..ecf63c2c5 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -34,15 +34,13 @@ import ( // NewChargerService returns the Charger Service func NewChargerService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ChargerService{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -50,8 +48,6 @@ func NewChargerService(cfg *config.CGRConfig, type ChargerService struct { sync.RWMutex - filterSChan chan *engine.FilterS - chrS *engine.ChargerS cl *commonlisteners.CommonListenerS @@ -83,9 +79,9 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e utils.CacheChargerFilterIndexes); err != nil { return } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, chrS.filterSChan); err != nil { - return + fs := chrS.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ChargerS, utils.FilterS, utils.StateServiceUP) } dbs := chrS.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), chrS.cfg.GeneralCfg().ConnectTimeout) { @@ -98,7 +94,7 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e chrS.Lock() defer chrS.Unlock() - chrS.chrS = engine.NewChargerService(dbs.DataManager(), filterS, chrS.cfg, chrS.connMgr) + chrS.chrS = engine.NewChargerService(dbs.DataManager(), fs.FilterS(), chrS.cfg, chrS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ChargerS)) srv, _ := engine.NewService(chrS.chrS) // srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false) diff --git a/services/cores.go b/services/cores.go index ed7e2b8e5..4f5218477 100644 --- a/services/cores.go +++ b/services/cores.go @@ -140,20 +140,6 @@ func (cS *CoreService) ShouldRun() bool { return true } -// GetCoreS returns the coreS -func (cS *CoreService) WaitForCoreS(ctx *context.Context) (cs *cores.CoreS, err error) { - cS.mu.RLock() - cSCh := cS.csCh - cS.mu.RUnlock() - select { - case <-ctx.Done(): - err = ctx.Err() - case cs = <-cSCh: - cSCh <- cs - } - return -} - // StateChan returns signaling channel of specific state func (cS *CoreService) StateChan(stateID string) chan struct{} { return cS.stateDeps.StateChan(stateID) @@ -164,7 +150,8 @@ func (cS *CoreService) IntRPCConn() birpc.ClientConnector { return cS.intRPCconn } -func (cS *CoreService) GetCoreS() *cores.CoreS { +// CoreS returns the CoreS object. +func (cS *CoreService) CoreS() *cores.CoreS { cS.mu.RLock() defer cS.mu.RUnlock() return cS.cS diff --git a/services/diameteragent.go b/services/diameteragent.go index e4dde1d19..1c0b27f62 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -32,25 +32,23 @@ import ( ) // NewDiameterAgent returns the Diameter Agent -func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, +func NewDiameterAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, caps *engine.Caps, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &DiameterAgent{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - caps: caps, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + caps: caps, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } // DiameterAgent implements Agent interface type DiameterAgent struct { sync.RWMutex - cfg *config.CGRConfig - filterSChan chan *engine.FilterS - stopChan chan struct{} + cfg *config.CGRConfig + stopChan chan struct{} da *agents.DiameterAgent connMgr *engine.ConnManager @@ -70,13 +68,13 @@ func (da *DiameterAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) return utils.ErrServiceAlreadyRunning } - filterS, err := waitForFilterS(ctx, da.filterSChan) - if err != nil { - return err + fs := da.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), da.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.DiameterAgent, utils.FilterS, utils.StateServiceUP) } da.Lock() defer da.Unlock() - return da.start(filterS, shtDwn, da.caps) + return da.start(fs.FilterS(), shtDwn, da.caps) } func (da *DiameterAgent) start(filterS *engine.FilterS, shtDwn context.CancelFunc, caps *engine.Caps) error { @@ -110,11 +108,11 @@ func (da *DiameterAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) return } close(da.stopChan) - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, da.filterSChan); err != nil { - return + fs := da.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), da.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.DiameterAgent, utils.FilterS, utils.StateServiceUP) } - return da.start(filterS, shtDwn, da.caps) + return da.start(fs.FilterS(), shtDwn, da.caps) } // Shutdown stops the service diff --git a/services/dispatchers.go b/services/dispatchers.go index 993a9af84..290f674f6 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -33,16 +33,14 @@ import ( // NewDispatcherService returns the Dispatcher Service func NewDispatcherService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) *DispatcherService { return &DispatcherService{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvsReload: make(map[string]chan struct{}), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvsReload: make(map[string]chan struct{}), + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -50,8 +48,6 @@ func NewDispatcherService(cfg *config.CGRConfig, type DispatcherService struct { sync.RWMutex - filterSChan chan *engine.FilterS - dspS *dispatchers.DispatcherService cl *commonlisteners.CommonListenerS @@ -85,9 +81,10 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) utils.CacheDispatcherFilterIndexes); err != nil { return } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, dspS.filterSChan); err != nil { - return + + fs := dspS.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.DispatcherS, utils.FilterS, utils.StateServiceUP) } dbs := dspS.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), dspS.cfg.GeneralCfg().ConnectTimeout) { @@ -101,7 +98,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) dspS.Lock() defer dspS.Unlock() - dspS.dspS = dispatchers.NewDispatcherService(dbs.DataManager(), dspS.cfg, filterS, dspS.connMgr) + dspS.dspS = dispatchers.NewDispatcherService(dbs.DataManager(), dspS.cfg, fs.FilterS(), dspS.connMgr) dspS.unregisterAllDispatchedSubsystems() // unregister all rpc services that can be dispatched diff --git a/services/dnsagent.go b/services/dnsagent.go index f9f112b2d..519e6d1a1 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -32,23 +32,21 @@ import ( ) // NewDNSAgent returns the DNS Agent -func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, +func NewDNSAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &DNSAgent{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } // DNSAgent implements Agent interface type DNSAgent struct { sync.RWMutex - cfg *config.CGRConfig - filterSChan chan *engine.FilterS + cfg *config.CGRConfig stopChan chan struct{} @@ -65,14 +63,14 @@ func (dns *DNSAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err if dns.IsRunning() { return utils.ErrServiceAlreadyRunning } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, dns.filterSChan); err != nil { - return + fs := dns.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dns.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.DNSAgent, utils.FilterS, utils.StateServiceUP) } dns.Lock() defer dns.Unlock() - dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr) + dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), dns.connMgr) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) dns.dns = nil @@ -86,8 +84,10 @@ func (dns *DNSAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err // Reload handles the change of config func (dns *DNSAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) (err error) { - filterS := <-dns.filterSChan - dns.filterSChan <- filterS + fs := dns.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), dns.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.DNSAgent, utils.FilterS, utils.StateServiceUP) + } dns.Lock() defer dns.Unlock() @@ -96,7 +96,7 @@ func (dns *DNSAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) (er close(dns.stopChan) } - dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr) + dns.dns, err = agents.NewDNSAgent(dns.cfg, fs.FilterS(), dns.connMgr) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) dns.dns = nil diff --git a/services/ees.go b/services/ees.go index da4c453e7..c7df40485 100644 --- a/services/ees.go +++ b/services/ees.go @@ -33,15 +33,14 @@ import ( ) // NewEventExporterService constructs EventExporterService -func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, +func NewEventExporterService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &EventExporterService{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -49,8 +48,6 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil type EventExporterService struct { mu sync.RWMutex - filterSChan chan *engine.FilterS - eeS *ees.EeS cl *commonlisteners.CommonListenerS @@ -109,9 +106,9 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc return utils.NewServiceStateTimeoutError(utils.EEs, utils.CommonListenerS, utils.StateServiceUP) } es.cl = cls.CLS() - fltrS, err := waitForFilterS(ctx, es.filterSChan) - if err != nil { - return err + fs := es.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.EEs, utils.FilterS, utils.StateServiceUP) } anz := es.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), es.cfg.GeneralCfg().ConnectTimeout) { @@ -123,7 +120,8 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc es.mu.Lock() defer es.mu.Unlock() - es.eeS, err = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr) + var err error + es.eeS, err = ees.NewEventExporterS(es.cfg, fs.FilterS(), es.connMgr) if err != nil { return err } diff --git a/services/ers.go b/services/ers.go index 047130f18..089e32310 100644 --- a/services/ers.go +++ b/services/ers.go @@ -35,16 +35,14 @@ import ( // NewEventReaderService returns the EventReader Service func NewEventReaderService( cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &EventReaderService{ - rldChan: make(chan struct{}, 1), - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + rldChan: make(chan struct{}, 1), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -52,8 +50,6 @@ func NewEventReaderService( type EventReaderService struct { sync.RWMutex - filterSChan chan *engine.FilterS - ers *ers.ERService cl *commonlisteners.CommonListenerS @@ -78,9 +74,9 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel return utils.NewServiceStateTimeoutError(utils.ERs, utils.CommonListenerS, utils.StateServiceUP) } erS.cl = cls.CLS() - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil { - return + fs := erS.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ERs, utils.FilterS, utils.StateServiceUP) } anz := erS.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), erS.cfg.GeneralCfg().ConnectTimeout) { @@ -96,7 +92,7 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs)) // build the service - erS.ers = ers.NewERService(erS.cfg, filterS, erS.connMgr) + erS.ers = ers.NewERService(erS.cfg, fs.FilterS(), erS.connMgr) go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan, shtDwn) srv, err := engine.NewServiceWithPing(erS.ers, utils.ErSv1, utils.V1Prfx) diff --git a/services/filters.go b/services/filters.go new file mode 100644 index 000000000..1b2f8de09 --- /dev/null +++ b/services/filters.go @@ -0,0 +1,118 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "sync" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +// NewFilterService instantiates a new FilterService. +func NewFilterService(cfg *config.CGRConfig, connMgr *engine.ConnManager, + srvIndexer *servmanager.ServiceIndexer) *FilterService { + return &FilterService{ + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + } +} + +// FilterService implements Service interface. +type FilterService struct { + mu sync.RWMutex + + fltrS *engine.FilterS + + cfg *config.CGRConfig + connMgr *engine.ConnManager + + intRPCconn birpc.ClientConnector // expose API methods over internal connection + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes +} + +// Start handles the service start. +func (s *FilterService) Start(ctx *context.Context, _ context.CancelFunc) error { + cacheS := s.srvIndexer.GetService(utils.CacheS).(*CacheService) + if utils.StructChanTimeout(cacheS.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.FilterS, utils.CacheS, utils.StateServiceUP) + } + if err := cacheS.WaitToPrecache(ctx, utils.CacheFilters); err != nil { + return err + } + dbs := s.srvIndexer.GetService(utils.DataDB).(*DataDBService) + if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.FilterS, utils.DataDB, utils.StateServiceUP) + } + s.fltrS = engine.NewFilterS(s.cfg, s.connMgr, dbs.DataManager()) + close(s.stateDeps.StateChan(utils.StateServiceUP)) + return nil +} + +// Reload handles the config changes. +func (s *FilterService) Reload(*context.Context, context.CancelFunc) error { + return nil +} + +// Shutdown stops the service. +func (s *FilterService) Shutdown() error { + s.mu.Lock() + defer s.mu.Unlock() + s.fltrS = nil + return nil +} + +// IsRunning returns whether the service is running or not. +func (s *FilterService) IsRunning() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.fltrS != nil +} + +// ServiceName returns the service name +func (s *FilterService) ServiceName() string { + return utils.FilterS +} + +// ShouldRun returns if the service should be running. +func (s *FilterService) ShouldRun() bool { + return true +} + +// StateChan returns signaling channel of specific state +func (s *FilterService) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) +} + +// IntRPCConn returns the internal connection used by RPCClient +func (s *FilterService) IntRPCConn() birpc.ClientConnector { + return s.intRPCconn +} + +// FilterS returns the FilterS object. +func (s *FilterService) FilterS() *engine.FilterS { + return s.fltrS +} diff --git a/services/httpagent.go b/services/httpagent.go index 1edf35a10..1c5dd33ae 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -33,15 +33,14 @@ import ( ) // NewHTTPAgent returns the HTTP Agent -func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, +func NewHTTPAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &HTTPAgent{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -49,8 +48,6 @@ func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, type HTTPAgent struct { sync.RWMutex - filterSChan chan *engine.FilterS - cl *commonlisteners.CommonListenerS // we can realy stop the HTTPAgent so keep a flag @@ -76,9 +73,9 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.CommonListenerS, utils.StateServiceUP) } cl := cls.CLS() - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, ha.filterSChan); err != nil { - return + fs := ha.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ha.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.HTTPAgent, utils.FilterS, utils.StateServiceUP) } ha.Lock() @@ -86,7 +83,7 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro utils.Logger.Info(fmt.Sprintf("<%s> successfully started HTTPAgent", utils.HTTPAgent)) for _, agntCfg := range ha.cfg.HTTPAgentCfg() { cl.RegisterHttpHandler(agntCfg.URL, - agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, filterS, + agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, fs.FilterS(), ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload, agntCfg.ReplyPayload, agntCfg.RequestProcessors)) } diff --git a/services/janus.go b/services/janus.go index 2b18e0a5d..e0a358bdf 100644 --- a/services/janus.go +++ b/services/janus.go @@ -33,15 +33,14 @@ import ( ) // NewJanusAgent returns the Janus Agent -func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, +func NewJanusAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &JanusAgent{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -49,8 +48,6 @@ func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, type JanusAgent struct { sync.RWMutex - filterSChan chan *engine.FilterS - jA *agents.JanusAgent // we can realy stop the JanusAgent so keep a flag @@ -72,9 +69,9 @@ func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err err return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.CommonListenerS, utils.StateServiceUP) } cl := cls.CLS() - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, ja.filterSChan); err != nil { - return + fs := ja.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ja.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.JanusAgent, utils.FilterS, utils.StateServiceUP) } ja.Lock() @@ -82,7 +79,7 @@ func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err err ja.Unlock() return utils.ErrServiceAlreadyRunning } - ja.jA, err = agents.NewJanusAgent(ja.cfg, ja.connMgr, filterS) + ja.jA, err = agents.NewJanusAgent(ja.cfg, ja.connMgr, fs.FilterS()) if err != nil { return } diff --git a/services/loaders.go b/services/loaders.go index cec70afda..cf6b975a4 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -34,16 +34,14 @@ import ( // NewLoaderService returns the Loader Service func NewLoaderService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) *LoaderService { return &LoaderService{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - stopChan: make(chan struct{}), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + stopChan: make(chan struct{}), + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -51,8 +49,6 @@ func NewLoaderService(cfg *config.CGRConfig, type LoaderService struct { sync.RWMutex - filterSChan chan *engine.FilterS - ldrs *loaders.LoaderS cl *commonlisteners.CommonListenerS @@ -76,9 +72,10 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.CommonListenerS, utils.StateServiceUP) } ldrs.cl = cls.CLS() - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil { - return + + fs := ldrs.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.FilterS, utils.StateServiceUP) } dbs := ldrs.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { @@ -92,7 +89,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er ldrs.Lock() defer ldrs.Unlock() - ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), filterS, ldrs.connMgr) + ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), fs.FilterS(), ldrs.connMgr) if !ldrs.ldrs.Enabled() { return @@ -114,9 +111,9 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er // Reload handles the change of config func (ldrs *LoaderService) Reload(ctx *context.Context, _ context.CancelFunc) error { - filterS, err := waitForFilterS(ctx, ldrs.filterSChan) - if err != nil { - return err + fs := ldrs.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.LoaderS, utils.FilterS, utils.StateServiceUP) } dbs := ldrs.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ldrs.cfg.GeneralCfg().ConnectTimeout) { @@ -128,7 +125,7 @@ func (ldrs *LoaderService) Reload(ctx *context.Context, _ context.CancelFunc) er ldrs.RLock() defer ldrs.RUnlock() - ldrs.ldrs.Reload(dbs.DataManager(), filterS, ldrs.connMgr) + ldrs.ldrs.Reload(dbs.DataManager(), fs.FilterS(), ldrs.connMgr) return ldrs.ldrs.ListenAndServe(ldrs.stopChan) } diff --git a/services/radiusagent.go b/services/radiusagent.go index a466a6dff..9365adcbb 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -32,24 +32,22 @@ import ( ) // NewRadiusAgent returns the Radius Agent -func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, +func NewRadiusAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RadiusAgent{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } // RadiusAgent implements Agent interface type RadiusAgent struct { sync.RWMutex - cfg *config.CGRConfig - filterSChan chan *engine.FilterS - stopChan chan struct{} + cfg *config.CGRConfig + stopChan chan struct{} rad *agents.RadiusAgent connMgr *engine.ConnManager @@ -69,9 +67,9 @@ func (rad *RadiusAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) ( return utils.ErrServiceAlreadyRunning } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, rad.filterSChan); err != nil { - return + fs := rad.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), rad.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RadiusAgent, utils.FilterS, utils.StateServiceUP) } rad.Lock() @@ -81,7 +79,7 @@ func (rad *RadiusAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) ( rad.lauth = rad.cfg.RadiusAgentCfg().ListenAuth rad.lacct = rad.cfg.RadiusAgentCfg().ListenAcct - if rad.rad, err = agents.NewRadiusAgent(rad.cfg, filterS, rad.connMgr); err != nil { + if rad.rad, err = agents.NewRadiusAgent(rad.cfg, fs.FilterS(), rad.connMgr); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error())) return } diff --git a/services/rankings.go b/services/rankings.go index 9401a047b..e518bfdee 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -34,25 +34,21 @@ import ( // NewRankingService returns the RankingS Service func NewRankingService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RankingService{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvDep: srvDep, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } type RankingService struct { sync.RWMutex - filterSChan chan *engine.FilterS - ran *engine.RankingS cl *commonlisteners.CommonListenerS @@ -91,9 +87,9 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.RankingS, utils.DataDB, utils.StateServiceUP) } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, ran.filterSChan); err != nil { - return + fs := ran.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RankingS, utils.FilterS, utils.StateServiceUP) } anz := ran.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), ran.cfg.GeneralCfg().ConnectTimeout) { @@ -102,7 +98,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er ran.Lock() defer ran.Unlock() - ran.ran = engine.NewRankingS(dbs.DataManager(), ran.connMgr, filterS, ran.cfg) + ran.ran = engine.NewRankingS(dbs.DataManager(), ran.connMgr, fs.FilterS(), ran.cfg) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.RankingS)) diff --git a/services/rates.go b/services/rates.go index c85bd6204..b75191220 100644 --- a/services/rates.go +++ b/services/rates.go @@ -33,14 +33,12 @@ import ( // NewRateService constructs RateService func NewRateService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RateService{ - cfg: cfg, - filterSChan: filterSChan, - rldChan: make(chan struct{}), - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + rldChan: make(chan struct{}), + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -48,9 +46,6 @@ func NewRateService(cfg *config.CGRConfig, type RateService struct { sync.RWMutex - dmS *DataDBService - filterSChan chan *engine.FilterS - rateS *rates.RateS cl *commonlisteners.CommonListenerS @@ -118,9 +113,9 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er utils.CacheRateFilterIndexes); err != nil { return } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, rs.filterSChan); err != nil { - return + fs := rs.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RateS, utils.FilterS, utils.StateServiceUP) } dbs := rs.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), rs.cfg.GeneralCfg().ConnectTimeout) { @@ -132,7 +127,7 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er } rs.Lock() - rs.rateS = rates.NewRateS(rs.cfg, filterS, dbs.DataManager()) + rs.rateS = rates.NewRateS(rs.cfg, fs.FilterS(), dbs.DataManager()) rs.Unlock() rs.stopChan = make(chan struct{}) diff --git a/services/resources.go b/services/resources.go index 118fe5671..34259a6a4 100644 --- a/services/resources.go +++ b/services/resources.go @@ -33,17 +33,15 @@ import ( // NewResourceService returns the Resource Service func NewResourceService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ResourceService{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvDep: srvDep, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -51,8 +49,6 @@ func NewResourceService(cfg *config.CGRConfig, type ResourceService struct { sync.RWMutex - filterSChan chan *engine.FilterS - reS *engine.ResourceS cl *commonlisteners.CommonListenerS @@ -87,9 +83,9 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e utils.CacheResourceFilterIndexes); err != nil { return } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, reS.filterSChan); err != nil { - return + fs := reS.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ResourceS, utils.FilterS, utils.StateServiceUP) } dbs := reS.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), reS.cfg.GeneralCfg().ConnectTimeout) { @@ -102,7 +98,7 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e reS.Lock() defer reS.Unlock() - reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, filterS, reS.connMgr) + reS.reS = engine.NewResourceService(dbs.DataManager(), reS.cfg, fs.FilterS(), reS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS)) reS.reS.StartLoop(ctx) srv, _ := engine.NewService(reS.reS) diff --git a/services/routes.go b/services/routes.go index 2e80cf59c..586849fda 100644 --- a/services/routes.go +++ b/services/routes.go @@ -34,15 +34,13 @@ import ( // NewRouteService returns the Route Service func NewRouteService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RouteService{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -50,8 +48,6 @@ func NewRouteService(cfg *config.CGRConfig, type RouteService struct { sync.RWMutex - filterSChan chan *engine.FilterS - routeS *engine.RouteS cl *commonlisteners.CommonListenerS @@ -83,9 +79,9 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e utils.CacheRouteFilterIndexes); err != nil { return } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, routeS.filterSChan); err != nil { - return + fs := routeS.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.RouteS, utils.FilterS, utils.StateServiceUP) } dbs := routeS.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), routeS.cfg.GeneralCfg().ConnectTimeout) { @@ -98,7 +94,7 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e routeS.Lock() defer routeS.Unlock() - routeS.routeS = engine.NewRouteService(dbs.DataManager(), filterS, routeS.cfg, routeS.connMgr) + routeS.routeS = engine.NewRouteService(dbs.DataManager(), fs.FilterS(), routeS.cfg, routeS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.RouteS)) srv, _ := engine.NewService(routeS.routeS) diff --git a/services/sessions.go b/services/sessions.go index b75eb1240..61721e197 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -35,15 +35,14 @@ import ( ) // NewSessionService returns the Session Service -func NewSessionService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, +func NewSessionService(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &SessionService{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -51,8 +50,6 @@ func NewSessionService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, type SessionService struct { sync.RWMutex - filterSChan chan *engine.FilterS - sm *sessions.SessionS cl *commonlisteners.CommonListenerS @@ -77,9 +74,9 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) return utils.NewServiceStateTimeoutError(utils.SessionS, utils.CommonListenerS, utils.StateServiceUP) } smg.cl = cls.CLS() - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil { - return + fs := smg.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.SessionS, utils.FilterS, utils.StateServiceUP) } dbs := smg.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), smg.cfg.GeneralCfg().ConnectTimeout) { @@ -93,7 +90,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) smg.Lock() defer smg.Unlock() - smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), filterS, smg.connMgr) + smg.sm = sessions.NewSessionS(smg.cfg, dbs.DataManager(), fs.FilterS(), smg.connMgr) //start sync session in a separate goroutine smg.stopChan = make(chan struct{}) go smg.sm.ListenAndServe(smg.stopChan) diff --git a/services/sipagent.go b/services/sipagent.go index 214fe8647..fa5410a7d 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -32,23 +32,21 @@ import ( ) // NewSIPAgent returns the sip Agent -func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, +func NewSIPAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &SIPAgent{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } // SIPAgent implements Agent interface type SIPAgent struct { sync.RWMutex - cfg *config.CGRConfig - filterSChan chan *engine.FilterS + cfg *config.CGRConfig sip *agents.SIPAgent connMgr *engine.ConnManager @@ -66,15 +64,15 @@ func (sip *SIPAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err return utils.ErrServiceAlreadyRunning } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, sip.filterSChan); err != nil { - return + fs := sip.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), sip.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.SIPAgent, utils.FilterS, utils.StateServiceUP) } sip.Lock() defer sip.Unlock() sip.oldListen = sip.cfg.SIPAgentCfg().Listen - sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS) + sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, fs.FilterS()) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SIPAgent, err)) diff --git a/services/stats.go b/services/stats.go index 4ffa82171..e40606fe2 100644 --- a/services/stats.go +++ b/services/stats.go @@ -33,17 +33,15 @@ import ( // NewStatService returns the Stat Service func NewStatService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &StatService{ - cfg: cfg, - filterSChan: filterSChan, - connMgr: connMgr, - srvDep: srvDep, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -51,8 +49,6 @@ func NewStatService(cfg *config.CGRConfig, type StatService struct { sync.RWMutex - filterSChan chan *engine.FilterS - sts *engine.StatS cl *commonlisteners.CommonListenerS @@ -87,9 +83,9 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e utils.CacheStatFilterIndexes); err != nil { return } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, sts.filterSChan); err != nil { - return + fs := sts.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.StatS, utils.FilterS, utils.StateServiceUP) } dbs := sts.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), sts.cfg.GeneralCfg().ConnectTimeout) { @@ -102,7 +98,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e sts.Lock() defer sts.Unlock() - sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, filterS, sts.connMgr) + sts.sts = engine.NewStatService(dbs.DataManager(), sts.cfg, fs.FilterS(), sts.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS)) diff --git a/services/thresholds.go b/services/thresholds.go index 84cbb4ea9..f17285659 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -33,17 +33,15 @@ import ( // NewThresholdService returns the Threshold Service func NewThresholdService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ThresholdService{ - cfg: cfg, - filterSChan: filterSChan, - srvDep: srvDep, - connMgr: connMgr, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + srvDep: srvDep, + connMgr: connMgr, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } @@ -51,8 +49,6 @@ func NewThresholdService(cfg *config.CGRConfig, type ThresholdService struct { sync.RWMutex - filterSChan chan *engine.FilterS - thrs *engine.ThresholdS cl *commonlisteners.CommonListenerS @@ -87,9 +83,9 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) utils.CacheThresholdFilterIndexes); err != nil { return } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, thrs.filterSChan); err != nil { - return + fs := thrs.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.ThresholdS, utils.FilterS, utils.StateServiceUP) } dbs := thrs.srvIndexer.GetService(utils.DataDB).(*DataDBService) if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), thrs.cfg.GeneralCfg().ConnectTimeout) { @@ -102,7 +98,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) thrs.Lock() defer thrs.Unlock() - thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, filterS, thrs.connMgr) + thrs.thrs = engine.NewThresholdService(dbs.DataManager(), thrs.cfg, fs.FilterS(), thrs.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS)) thrs.thrs.StartLoop(ctx) diff --git a/services/trends.go b/services/trends.go index 541999940..cf088b0d6 100644 --- a/services/trends.go +++ b/services/trends.go @@ -33,25 +33,21 @@ import ( // NewTrendsService returns the TrendS Service func NewTrendService(cfg *config.CGRConfig, - filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup, srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &TrendService{ - cfg: cfg, - connMgr: connMgr, - srvDep: srvDep, - filterSChan: filterSChan, - srvIndexer: srvIndexer, - stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + srvIndexer: srvIndexer, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), } } type TrendService struct { sync.RWMutex - filterSChan chan *engine.FilterS - trs *engine.TrendS cl *commonlisteners.CommonListenerS @@ -90,9 +86,9 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err if utils.StructChanTimeout(dbs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { return utils.NewServiceStateTimeoutError(utils.TrendS, utils.DataDB, utils.StateServiceUP) } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, trs.filterSChan); err != nil { - return + fs := trs.srvIndexer.GetService(utils.FilterS).(*FilterService) + if utils.StructChanTimeout(fs.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.TrendS, utils.FilterS, utils.StateServiceUP) } anz := trs.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService) if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), trs.cfg.GeneralCfg().ConnectTimeout) { @@ -101,7 +97,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err trs.Lock() defer trs.Unlock() - trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, filterS, trs.connMgr) + trs.trs = engine.NewTrendService(dbs.DataManager(), trs.cfg, fs.FilterS(), trs.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TrendS)) if err := trs.trs.StartTrendS(ctx); err != nil { return err