diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 9e3f33da2..76e9aafe5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -59,7 +59,7 @@ func runCGREngine(fs []string) (err error) { if cfg, err = services.InitConfigFromPath(ctx, *flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil || *flags.CheckConfig { return } - cgr := services.NewCGREngine(cfg, []servmanager.Service{}) + cgr := services.NewCGREngine(cfg, servmanager.NewServiceIndexer(), []servmanager.Service{}) defer cgr.Stop(*flags.PidFile) if err = cgr.Run(ctx, cancel, flags, vers, diff --git a/services/accounts.go b/services/accounts.go index 9b6806e77..793a3c0bd 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -39,7 +39,8 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, - anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { + anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AccountService{ connChan: internalChan, cfg: cfg, @@ -51,6 +52,7 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, rldChan: make(chan struct{}, 1), + srvIndexer: srvIndexer, } } @@ -73,6 +75,9 @@ type AccountService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -80,7 +85,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e if acts.IsRunning() { return utils.ErrServiceAlreadyRunning } - + acts.stateDeps = NewStateDependencies([]string{utils.StateServiceUP}) acts.cl = <-acts.clSChan acts.clSChan <- acts.cl if err = acts.cacheS.WaitToPrecache(ctx, @@ -154,3 +159,8 @@ func (acts *AccountService) ServiceName() string { func (acts *AccountService) ShouldRun() bool { return acts.cfg.AccountSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (acts *AccountService) StateChan(stateID string) chan struct{} { + return acts.stateDeps.StateChan(stateID) +} diff --git a/services/actions.go b/services/actions.go index 1ef569664..12f1c0a89 100644 --- a/services/actions.go +++ b/services/actions.go @@ -39,7 +39,8 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, - anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { + anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ActionService{ connChan: internalChan, connMgr: connMgr, @@ -51,6 +52,7 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, rldChan: make(chan struct{}, 1), + srvIndexer: srvIndexer, } } @@ -74,6 +76,9 @@ type ActionService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -153,3 +158,8 @@ func (acts *ActionService) ServiceName() string { func (acts *ActionService) ShouldRun() bool { return acts.cfg.ActionSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (acts *ActionService) StateChan(stateID string) chan struct{} { + return acts.stateDeps.StateChan(stateID) +} diff --git a/services/adminsv1.go b/services/adminsv1.go index ca71b7ebb..038148ced 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -37,7 +37,8 @@ func NewAdminSv1Service(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalAPIerSv1Chan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AdminSv1Service{ connChan: internalAPIerSv1Chan, cfg: cfg, @@ -48,6 +49,7 @@ func NewAdminSv1Service(cfg *config.CGRConfig, connMgr: connMgr, anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -69,6 +71,9 @@ type AdminSv1Service struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -153,3 +158,8 @@ func (apiService *AdminSv1Service) ServiceName() string { func (apiService *AdminSv1Service) ShouldRun() bool { return apiService.cfg.AdminSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (apiService *AdminSv1Service) StateChan(stateID string) chan struct{} { + return apiService.stateDeps.StateChan(stateID) +} diff --git a/services/analyzers.go b/services/analyzers.go index 9714c17ff..406fa73c7 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -28,6 +28,7 @@ import ( "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) @@ -36,7 +37,8 @@ func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.Com filterSChan chan *engine.FilterS, internalAnalyzerSChan chan birpc.ClientConnector, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) *AnalyzerService { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) *AnalyzerService { return &AnalyzerService{ connChan: internalAnalyzerSChan, cfg: cfg, @@ -44,6 +46,7 @@ func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.Com filterSChan: filterSChan, anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -62,6 +65,10 @@ type AnalyzerService struct { connChan chan birpc.ClientConnector cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes + } // Start should handle the sercive start @@ -165,3 +172,8 @@ func (anz *AnalyzerService) GetInternalCodec(c birpc.ClientConnector, to string) } return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to) } + +// StateChan returns signaling channel of specific state +func (anz *AnalyzerService) StateChan(stateID string) chan struct{} { + return anz.stateDeps.StateChan(stateID) +} diff --git a/services/asteriskagent.go b/services/asteriskagent.go index 097551ef0..21fab6735 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -34,11 +34,13 @@ import ( // NewAsteriskAgent returns the Asterisk Agent func NewAsteriskAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &AsteriskAgent{ - cfg: cfg, - connMgr: connMgr, - srvDep: srvDep, + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -51,6 +53,9 @@ type AsteriskAgent struct { smas []*agents.AsteriskAgent connMgr *engine.ConnManager srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -113,3 +118,8 @@ func (ast *AsteriskAgent) ServiceName() string { func (ast *AsteriskAgent) ShouldRun() bool { return ast.cfg.AsteriskAgentCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (ast *AsteriskAgent) StateChan(stateID string) chan struct{} { + return ast.stateDeps.StateChan(stateID) +} diff --git a/services/attributes.go b/services/attributes.go index 1d76704fc..7946b0f42 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -37,17 +37,19 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, anzChan chan *AnalyzerService, dspS *DispatcherService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, sIndxr *servmanager.ServiceIndexer) servmanager.Service { return &AttributeService{ - connChan: internalChan, - cfg: cfg, - dm: dm, - cacheS: cacheS, - filterSChan: filterSChan, - clSChan: clSChan, - anzChan: anzChan, - srvDep: srvDep, - dspS: dspS, + connChan: internalChan, + cfg: cfg, + dm: dm, + cacheS: cacheS, + filterSChan: filterSChan, + clSChan: clSChan, + anzChan: anzChan, + srvDep: srvDep, + dspS: dspS, + stateDeps: NewStateDependencies([]string{utils.StateServiceUP}), + serviceIndexer: sIndxr, } } @@ -69,6 +71,9 @@ type AttributeService struct { connChan chan birpc.ClientConnector // publish the internal Subsystem when available cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + serviceIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies } // Start should handle the service start @@ -76,7 +81,11 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) if attrS.IsRunning() { return utils.ErrServiceAlreadyRunning } - + if utils.StructChanTimeout( + attrS.serviceIndexer.GetService(utils.CommonListenerS).StateChan(utils.StateServiceUP), + attrS.cfg.GeneralCfg().ConnectTimeout) { + return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP) + } attrS.cl = <-attrS.clSChan attrS.clSChan <- attrS.cl if err = attrS.cacheS.WaitToPrecache(ctx, @@ -120,6 +129,7 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) } }() attrS.connChan <- anz.GetInternalCodec(srv, utils.AttributeS) + close(attrS.stateDeps.StateChan(utils.StateServiceUP)) // inform listeners about the service reaching UP state return } @@ -157,3 +167,8 @@ func (attrS *AttributeService) ServiceName() string { func (attrS *AttributeService) ShouldRun() bool { return attrS.cfg.AttributeSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (attrS *AttributeService) StateChan(stateID string) chan struct{} { + return attrS.stateDeps.StateChan(stateID) +} diff --git a/services/caches.go b/services/caches.go index 944c3e946..b4282541e 100644 --- a/services/caches.go +++ b/services/caches.go @@ -27,6 +27,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) @@ -35,17 +36,19 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, anzChan chan *AnalyzerService, // dspS *DispatcherService, cores *CoreService, - srvDep map[string]*sync.WaitGroup) *CacheService { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) *CacheService { return &CacheService{ - cfg: cfg, - srvDep: srvDep, - anzChan: anzChan, - cores: cores, - clSChan: clSChan, - dm: dm, - connMgr: connMgr, - rpc: internalChan, - cacheCh: make(chan *engine.CacheS, 1), + cfg: cfg, + srvDep: srvDep, + anzChan: anzChan, + cores: cores, + clSChan: clSChan, + dm: dm, + connMgr: connMgr, + rpc: internalChan, + cacheCh: make(chan *engine.CacheS, 1), + srvIndexer: srvIndexer, } } @@ -63,6 +66,9 @@ type CacheService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -144,3 +150,8 @@ func (cS *CacheService) WaitToPrecache(ctx *context.Context, cacheIDs ...string) } return } + +// StateChan returns signaling channel of specific state +func (cS *CacheService) StateChan(stateID string) chan struct{} { + return cS.stateDeps.StateChan(stateID) +} diff --git a/services/cdrs.go b/services/cdrs.go index 434ff2d8a..cb81910ad 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -38,7 +38,8 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalCDRServerChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &CDRService{ connChan: internalCDRServerChan, cfg: cfg, @@ -49,6 +50,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, connMgr: connMgr, anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -70,6 +72,9 @@ type CDRService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -147,3 +152,8 @@ func (cs *CDRService) ServiceName() string { func (cs *CDRService) ShouldRun() bool { return cs.cfg.CdrsCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (cs *CDRService) StateChan(stateID string) chan struct{} { + return cs.stateDeps.StateChan(stateID) +} diff --git a/services/cgr-engine.go b/services/cgr-engine.go index ceaa40c4f..c46cf6af1 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -39,7 +39,7 @@ import ( "github.com/cgrates/rpcclient" ) -func NewCGREngine(cfg *config.CGRConfig, +func NewCGREngine(cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer, services []servmanager.Service) *CGREngine { cM := engine.NewConnManager(cfg) caps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy) @@ -49,7 +49,7 @@ func NewCGREngine(cfg *config.CGRConfig, cM: cM, caps: caps, // caps is used to limit RPC CPS shdWg: shdWg, // wait for shutdown - srvManager: servmanager.NewServiceManager(shdWg, cM, cfg, services), + srvManager: servmanager.NewServiceManager(shdWg, cM, cfg, sIdxr, services), srvDep: map[string]*sync.WaitGroup{ utils.AccountS: new(sync.WaitGroup), utils.ActionS: new(sync.WaitGroup), @@ -194,73 +194,78 @@ func (cgr *CGREngine) InitServices(setVersions bool) { cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs), utils.EfSv1, iEFsCh) cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs), utils.ErSv1, iERsCh) - cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep) - cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, setVersions, cgr.srvDep) - cgr.sdbS = NewStorDBService(cgr.cfg, setVersions, cgr.srvDep) - cgr.cls = NewCommonListenerService(cgr.cfg, cgr.caps, cgr.clsCh, cgr.srvDep) + // ServiceIndexer will share service references to all services + srvIdxr := servmanager.NewServiceIndexer() + cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep, srvIdxr) + cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, setVersions, cgr.srvDep, srvIdxr) + cgr.sdbS = NewStorDBService(cgr.cfg, setVersions, cgr.srvDep, srvIdxr) + cgr.cls = NewCommonListenerService(cgr.cfg, cgr.caps, cgr.clsCh, cgr.srvDep, srvIdxr) cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.clsCh, - cgr.iFilterSCh, iAnalyzerSCh, cgr.anzCh, cgr.srvDep) + cgr.iFilterSCh, iAnalyzerSCh, cgr.anzCh, cgr.srvDep, srvIdxr) - cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.clsCh, iCoreSv1Ch, cgr.anzCh, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep) + cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.clsCh, iCoreSv1Ch, cgr.anzCh, + cgr.cpuPrfF, cgr.shdWg, cgr.srvDep, srvIdxr) cgr.cacheS = NewCacheService(cgr.cfg, cgr.dmS, cgr.cM, cgr.clsCh, iCacheSCh, cgr.anzCh, cgr.coreS, - cgr.srvDep) + cgr.srvDep, srvIdxr) dspS := NewDispatcherService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, cgr.iDispatcherSCh, cgr.cM, - cgr.anzCh, cgr.srvDep) + cgr.anzCh, cgr.srvDep, srvIdxr) cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh, - iLoaderSCh, cgr.cM, cgr.anzCh, cgr.srvDep) + iLoaderSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr) - cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.clsCh, cgr.srvDep) + cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.clsCh, cgr.srvDep, srvIdxr) cgr.srvManager.AddServices(cgr.gvS, cgr.cls, cgr.coreS, cgr.cacheS, cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.sdbS, cgr.efs, NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.clsCh, - iAdminSCh, cgr.cM, cgr.anzCh, cgr.srvDep), - NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh, iSessionSCh, cgr.cM, cgr.anzCh, cgr.srvDep), + iAdminSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), + NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh, iSessionSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), NewAttributeService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, iAttributeSCh, - cgr.anzCh, dspS, cgr.srvDep), + cgr.anzCh, dspS, cgr.srvDep, srvIdxr), NewChargerService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iChargerSCh, cgr.cM, cgr.anzCh, cgr.srvDep), + iChargerSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), NewRouteService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iRouteSCh, cgr.cM, cgr.anzCh, cgr.srvDep), + iRouteSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), NewResourceService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iResourceSCh, cgr.cM, cgr.anzCh, cgr.srvDep), + iResourceSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), NewTrendService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iTrendSCh, cgr.cM, cgr.anzCh, cgr.srvDep), + iTrendSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), NewRankingService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iRankingSCh, cgr.cM, cgr.anzCh, cgr.srvDep), + iRankingSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), NewThresholdService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, - cgr.cM, cgr.clsCh, iThresholdSCh, cgr.anzCh, cgr.srvDep), + cgr.cM, cgr.clsCh, iThresholdSCh, cgr.anzCh, cgr.srvDep, srvIdxr), NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, - iStatSCh, cgr.cM, cgr.anzCh, cgr.srvDep), + iStatSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), - NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iERsCh, cgr.anzCh, cgr.srvDep), - NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), - NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep), - NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep), - NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep), - NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep), // partial reload - NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload - NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.caps, cgr.srvDep), // partial reload - NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep), // no reload - NewSIPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), + NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iERsCh, cgr.anzCh, cgr.srvDep, srvIdxr), + NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep, srvIdxr), + NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr), + NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr), + NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep, srvIdxr), + NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr), // partial reload + NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep, srvIdxr), // partial reload + NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.caps, cgr.srvDep, srvIdxr), // partial reload + NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep, srvIdxr), // no reload + NewSIPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep, srvIdxr), NewEventExporterService(cgr.cfg, cgr.iFilterSCh, - cgr.cM, cgr.clsCh, iEEsCh, cgr.anzCh, cgr.srvDep), + cgr.cM, cgr.clsCh, iEEsCh, cgr.anzCh, cgr.srvDep, srvIdxr), NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.clsCh, iCDRServerCh, - cgr.cM, cgr.anzCh, cgr.srvDep), + cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr), - NewRegistrarCService(cgr.cfg, cgr.cM, cgr.srvDep), + NewRegistrarCService(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr), NewRateService(cgr.cfg, cgr.cacheS, cgr.iFilterSCh, cgr.dmS, - cgr.clsCh, iRateSCh, cgr.anzCh, cgr.srvDep), - NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iActionSCh, cgr.anzCh, cgr.srvDep), - NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iAccountSCh, cgr.anzCh, cgr.srvDep), - NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.clsCh, cgr.srvDep), + cgr.clsCh, iRateSCh, cgr.anzCh, cgr.srvDep, srvIdxr), + NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, + cgr.clsCh, iActionSCh, cgr.anzCh, cgr.srvDep, srvIdxr), + NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, + cgr.cM, cgr.clsCh, iAccountSCh, cgr.anzCh, cgr.srvDep, srvIdxr), + NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.clsCh, cgr.srvDep, srvIdxr), ) } diff --git a/services/chargers.go b/services/chargers.go index 951f9ff64..f8777691e 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -36,7 +36,8 @@ import ( func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalChargerSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, - anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { + anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ChargerService{ connChan: internalChargerSChan, cfg: cfg, @@ -47,6 +48,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, connMgr: connMgr, anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -67,6 +69,9 @@ type ChargerService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -140,3 +145,8 @@ func (chrS *ChargerService) ServiceName() string { func (chrS *ChargerService) ShouldRun() bool { return chrS.cfg.ChargerSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (chrS *ChargerService) StateChan(stateID string) chan struct{} { + return chrS.stateDeps.StateChan(stateID) +} diff --git a/services/commonlisteners.go b/services/commonlisteners.go index 4f885c15b..cb876f2fc 100644 --- a/services/commonlisteners.go +++ b/services/commonlisteners.go @@ -26,16 +26,20 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/registrarc" + "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewCommonListenerService instantiates a new CommonListenerService. -func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) *CommonListenerService { +func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, + clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) *CommonListenerService { return &CommonListenerService{ - cfg: cfg, - caps: caps, - clSChan: clSChan, - srvDep: srvDep, + cfg: cfg, + caps: caps, + clSChan: clSChan, + srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -49,6 +53,9 @@ type CommonListenerService struct { caps *engine.Caps cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. @@ -99,3 +106,8 @@ func (cl *CommonListenerService) ServiceName() string { func (cl *CommonListenerService) ShouldRun() bool { return true } + +// StateChan returns signaling channel of specific state +func (cl *CommonListenerService) StateChan(stateID string) chan struct{} { + return cl.stateDeps.StateChan(stateID) +} diff --git a/services/cores.go b/services/cores.go index 26d31bba0..323b27b93 100644 --- a/services/cores.go +++ b/services/cores.go @@ -29,6 +29,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) @@ -36,17 +37,19 @@ import ( func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS, internalCoreSChan chan birpc.ClientConnector, anzChan chan *AnalyzerService, fileCPU *os.File, shdWg *sync.WaitGroup, - srvDep map[string]*sync.WaitGroup) *CoreService { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) *CoreService { return &CoreService{ - shdWg: shdWg, - connChan: internalCoreSChan, - cfg: cfg, - caps: caps, - fileCPU: fileCPU, - clSChan: clSChan, - anzChan: anzChan, - srvDep: srvDep, - csCh: make(chan *cores.CoreS, 1), + shdWg: shdWg, + connChan: internalCoreSChan, + cfg: cfg, + caps: caps, + fileCPU: fileCPU, + clSChan: clSChan, + anzChan: anzChan, + srvDep: srvDep, + csCh: make(chan *cores.CoreS, 1), + srvIndexer: srvIndexer, } } @@ -68,6 +71,9 @@ type CoreService struct { connChan chan birpc.ClientConnector cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -150,3 +156,8 @@ func (cS *CoreService) WaitForCoreS(ctx *context.Context) (cs *cores.CoreS, err } return } + +// StateChan returns signaling channel of specific state +func (cS *CoreService) StateChan(stateID string) chan struct{} { + return cS.stateDeps.StateChan(stateID) +} diff --git a/services/datadb.go b/services/datadb.go index 04613f4d7..c78785947 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -25,18 +25,21 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewDataDBService returns the DataDB Service func NewDataDBService(cfg *config.CGRConfig, connMgr *engine.ConnManager, setVersions bool, - srvDep map[string]*sync.WaitGroup) *DataDBService { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) *DataDBService { return &DataDBService{ cfg: cfg, dbchan: make(chan *engine.DataManager, 1), connMgr: connMgr, setVersions: setVersions, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -52,6 +55,9 @@ type DataDBService struct { setVersions bool srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start handles the service start. @@ -189,3 +195,8 @@ func (db *DataDBService) WaitForDM(ctx *context.Context) (datadb *engine.DataMan } return } + +// StateChan returns signaling channel of specific state +func (db *DataDBService) StateChan(stateID string) chan struct{} { + return db.stateDeps.StateChan(stateID) +} diff --git a/services/diameteragent.go b/services/diameteragent.go index f0cc849b7..40ce5bd65 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -33,13 +33,15 @@ import ( // NewDiameterAgent returns the Diameter Agent func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, caps *engine.Caps, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &DiameterAgent{ cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, caps: caps, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -58,6 +60,9 @@ type DiameterAgent struct { laddr string srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -137,3 +142,8 @@ func (da *DiameterAgent) ServiceName() string { func (da *DiameterAgent) ShouldRun() bool { return da.cfg.DiameterAgentCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (da *DiameterAgent) StateChan(stateID string) chan struct{} { + return da.stateDeps.StateChan(stateID) +} diff --git a/services/dispatchers.go b/services/dispatchers.go index c3e8754a8..babc08703 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -27,6 +27,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) @@ -35,7 +36,8 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) *DispatcherService { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) *DispatcherService { return &DispatcherService{ connChan: internalChan, cfg: cfg, @@ -47,6 +49,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, srvsReload: make(map[string]chan struct{}), + srvIndexer: srvIndexer, } } @@ -68,6 +71,9 @@ type DispatcherService struct { cfg *config.CGRConfig srvsReload map[string]chan struct{} srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -180,3 +186,8 @@ func (dspS *DispatcherService) sync() { c <- struct{}{} } } + +// StateChan returns signaling channel of specific state +func (dspS *DispatcherService) StateChan(stateID string) chan struct{} { + return dspS.stateDeps.StateChan(stateID) +} diff --git a/services/dnsagent.go b/services/dnsagent.go index 2a0401989..f764370ed 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -33,12 +33,14 @@ import ( // NewDNSAgent returns the DNS Agent func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &DNSAgent{ cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -53,6 +55,9 @@ type DNSAgent struct { dns *agents.DNSAgent connMgr *engine.ConnManager srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -142,3 +147,8 @@ func (dns *DNSAgent) ServiceName() string { func (dns *DNSAgent) ShouldRun() bool { return dns.cfg.DNSAgentCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (dns *DNSAgent) StateChan(stateID string) chan struct{} { + return dns.stateDeps.StateChan(stateID) +} diff --git a/services/ees.go b/services/ees.go index 7c463ef19..c894e5937 100644 --- a/services/ees.go +++ b/services/ees.go @@ -35,7 +35,8 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, intConnChan chan birpc.ClientConnector, - anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { + anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &EventExporterService{ cfg: cfg, filterSChan: filterSChan, @@ -44,6 +45,7 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil intConnChan: intConnChan, anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -62,6 +64,9 @@ type EventExporterService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // ServiceName returns the service name @@ -134,3 +139,8 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc es.intConnChan <- anz.GetInternalCodec(srv, utils.EEs) return nil } + +// StateChan returns signaling channel of specific state +func (es *EventExporterService) StateChan(stateID string) chan struct{} { + return es.stateDeps.StateChan(stateID) +} diff --git a/services/efs.go b/services/efs.go index 09a03268a..870ff6703 100644 --- a/services/efs.go +++ b/services/efs.go @@ -29,6 +29,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/efs" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) @@ -47,18 +48,24 @@ type ExportFailoverService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // NewExportFailoverService is the constructor for the TpeService func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager, intConnChan chan birpc.ClientConnector, - clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) *ExportFailoverService { + clSChan chan *commonlisteners.CommonListenerS, + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) *ExportFailoverService { return &ExportFailoverService{ cfg: cfg, clSChan: clSChan, connMgr: connMgr, intConnChan: intConnChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -110,3 +117,8 @@ func (efServ *ExportFailoverService) ShouldRun() bool { func (efServ *ExportFailoverService) ServiceName() string { return utils.EFs } + +// StateChan returns signaling channel of specific state +func (efServ *ExportFailoverService) StateChan(stateID string) chan struct{} { + return efServ.stateDeps.StateChan(stateID) +} diff --git a/services/ers.go b/services/ers.go index f93383f23..3d3cedb65 100644 --- a/services/ers.go +++ b/services/ers.go @@ -40,7 +40,8 @@ func NewEventReaderService( clSChan chan *commonlisteners.CommonListenerS, intConn chan birpc.ClientConnector, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &EventReaderService{ rldChan: make(chan struct{}, 1), cfg: cfg, @@ -50,6 +51,7 @@ func NewEventReaderService( intConn: intConn, anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -70,6 +72,9 @@ type EventReaderService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -152,3 +157,8 @@ func (erS *EventReaderService) ServiceName() string { func (erS *EventReaderService) ShouldRun() bool { return erS.cfg.ERsCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (erS *EventReaderService) StateChan(stateID string) chan struct{} { + return erS.stateDeps.StateChan(stateID) +} diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index ebe78fcea..f9fdce110 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -34,11 +34,13 @@ import ( // NewFreeswitchAgent returns the Freeswitch Agent func NewFreeswitchAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &FreeswitchAgent{ - cfg: cfg, - connMgr: connMgr, - srvDep: srvDep, + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -50,6 +52,9 @@ type FreeswitchAgent struct { fS *agents.FSsessions connMgr *engine.ConnManager srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -112,3 +117,8 @@ func (fS *FreeswitchAgent) ServiceName() string { func (fS *FreeswitchAgent) ShouldRun() bool { return fS.cfg.FsAgentCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (fS *FreeswitchAgent) StateChan(stateID string) chan struct{} { + return fS.stateDeps.StateChan(stateID) +} diff --git a/services/globalvars.go b/services/globalvars.go index ca45b87c4..875e38f56 100644 --- a/services/globalvars.go +++ b/services/globalvars.go @@ -25,15 +25,18 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewGlobalVarS . func NewGlobalVarS(cfg *config.CGRConfig, - srvDep map[string]*sync.WaitGroup) *GlobalVarS { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) *GlobalVarS { return &GlobalVarS{ - cfg: cfg, - srvDep: srvDep, + cfg: cfg, + srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -41,6 +44,9 @@ func NewGlobalVarS(cfg *config.CGRConfig, type GlobalVarS struct { cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -82,3 +88,8 @@ func (gv *GlobalVarS) ServiceName() string { func (gv *GlobalVarS) ShouldRun() bool { return true } + +// StateChan returns signaling channel of specific state +func (gv *GlobalVarS) StateChan(stateID string) chan struct{} { + return gv.stateDeps.StateChan(stateID) +} diff --git a/services/httpagent.go b/services/httpagent.go index d4e184941..d9839e88f 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -34,13 +34,15 @@ import ( // NewHTTPAgent returns the HTTP Agent func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &HTTPAgent{ cfg: cfg, filterSChan: filterSChan, clSChan: clSChan, connMgr: connMgr, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -60,6 +62,9 @@ type HTTPAgent struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -117,3 +122,8 @@ func (ha *HTTPAgent) ServiceName() string { func (ha *HTTPAgent) ShouldRun() bool { return len(ha.cfg.HTTPAgentCfg()) != 0 } + +// StateChan returns signaling channel of specific state +func (ha *HTTPAgent) StateChan(stateID string) chan struct{} { + return ha.stateDeps.StateChan(stateID) +} diff --git a/services/janus.go b/services/janus.go index 5222e3b39..a14bfc97c 100644 --- a/services/janus.go +++ b/services/janus.go @@ -35,13 +35,15 @@ import ( // NewJanusAgent returns the Janus Agent func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &JanusAgent{ cfg: cfg, filterSChan: filterSChan, clSChan: clSChan, connMgr: connMgr, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -61,6 +63,9 @@ type JanusAgent struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should jandle the sercive start @@ -129,3 +134,8 @@ func (ja *JanusAgent) ServiceName() string { func (ja *JanusAgent) ShouldRun() bool { return ja.cfg.JanusAgentCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (ja *JanusAgent) StateChan(stateID string) chan struct{} { + return ja.stateDeps.StateChan(stateID) +} diff --git a/services/kamailioagent.go b/services/kamailioagent.go index b24242818..9f61e0681 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -35,11 +35,13 @@ import ( // NewKamailioAgent returns the Kamailio Agent func NewKamailioAgent(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &KamailioAgent{ - cfg: cfg, - connMgr: connMgr, - srvDep: srvDep, + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -51,6 +53,9 @@ type KamailioAgent struct { kam *agents.KamailioAgent connMgr *engine.ConnManager srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -118,3 +123,8 @@ func (kam *KamailioAgent) ServiceName() string { func (kam *KamailioAgent) ShouldRun() bool { return kam.cfg.KamAgentCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (kam *KamailioAgent) StateChan(stateID string) chan struct{} { + return kam.stateDeps.StateChan(stateID) +} diff --git a/services/loaders.go b/services/loaders.go index ccd73f7d3..ef3748099 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -28,6 +28,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/loaders" + "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) @@ -36,7 +37,8 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalLoaderSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) *LoaderService { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) *LoaderService { return &LoaderService{ connChan: internalLoaderSChan, cfg: cfg, @@ -47,6 +49,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, stopChan: make(chan struct{}), anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -67,6 +70,9 @@ type LoaderService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -167,3 +173,8 @@ func (ldrs *LoaderService) GetLoaderS() *loaders.LoaderS { func (ldrs *LoaderService) GetRPCChan() chan birpc.ClientConnector { return ldrs.connChan } + +// StateChan returns signaling channel of specific state +func (ldrs *LoaderService) StateChan(stateID string) chan struct{} { + return ldrs.stateDeps.StateChan(stateID) +} diff --git a/services/radiusagent.go b/services/radiusagent.go index 13ad500ee..92bc331e4 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -33,12 +33,14 @@ import ( // NewRadiusAgent returns the Radius Agent func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RadiusAgent{ cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -56,6 +58,9 @@ type RadiusAgent struct { lnet string lauth string lacct string + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -136,3 +141,8 @@ func (rad *RadiusAgent) ServiceName() string { func (rad *RadiusAgent) ShouldRun() bool { return rad.cfg.RadiusAgentCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (rad *RadiusAgent) StateChan(stateID string) chan struct{} { + return rad.stateDeps.StateChan(stateID) +} diff --git a/services/rankings.go b/services/rankings.go index 732ef1e59..0ed070c4b 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -37,7 +37,8 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalRankingSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RankingService{ connChan: internalRankingSChan, cfg: cfg, @@ -48,6 +49,7 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, connMgr: connMgr, anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -67,6 +69,9 @@ type RankingService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -151,3 +156,8 @@ func (ran *RankingService) ServiceName() string { func (ran *RankingService) ShouldRun() bool { return ran.cfg.RankingSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (ran *RankingService) StateChan(stateID string) chan struct{} { + return ran.stateDeps.StateChan(stateID) +} diff --git a/services/rates.go b/services/rates.go index af5cbabef..227e3a7a8 100644 --- a/services/rates.go +++ b/services/rates.go @@ -36,7 +36,8 @@ func NewRateService(cfg *config.CGRConfig, cacheS *CacheService, filterSChan chan *engine.FilterS, dmS *DataDBService, clSChan chan *commonlisteners.CommonListenerS, intConnChan chan birpc.ClientConnector, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RateService{ cfg: cfg, cacheS: cacheS, @@ -47,6 +48,7 @@ func NewRateService(cfg *config.CGRConfig, rldChan: make(chan struct{}), anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -68,6 +70,9 @@ type RateService struct { intConnChan chan birpc.ClientConnector cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // ServiceName returns the service name @@ -148,3 +153,8 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er rs.intConnChan <- anz.GetInternalCodec(srv, utils.RateS) return } + +// StateChan returns signaling channel of specific state +func (rs *RateService) StateChan(stateID string) chan struct{} { + return rs.stateDeps.StateChan(stateID) +} diff --git a/services/registrarc.go b/services/registrarc.go index 322285c1d..389fe1791 100644 --- a/services/registrarc.go +++ b/services/registrarc.go @@ -31,11 +31,13 @@ import ( // NewRegistrarCService returns the Dispatcher Service func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RegistrarCService{ - cfg: cfg, - connMgr: connMgr, - srvDep: srvDep, + cfg: cfg, + connMgr: connMgr, + srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -50,6 +52,9 @@ type RegistrarCService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -102,3 +107,8 @@ func (dspS *RegistrarCService) ShouldRun() bool { return len(dspS.cfg.RegistrarCCfg().RPC.RegistrarSConns) != 0 || len(dspS.cfg.RegistrarCCfg().Dispatchers.RegistrarSConns) != 0 } + +// StateChan returns signaling channel of specific state +func (dspS *RegistrarCService) StateChan(stateID string) chan struct{} { + return dspS.stateDeps.StateChan(stateID) +} diff --git a/services/resources.go b/services/resources.go index 57881a29a..1f5327a88 100644 --- a/services/resources.go +++ b/services/resources.go @@ -36,7 +36,8 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalResourceSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ResourceService{ connChan: internalResourceSChan, cfg: cfg, @@ -47,6 +48,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, connMgr: connMgr, anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -67,6 +69,9 @@ type ResourceService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -147,3 +152,8 @@ func (reS *ResourceService) ServiceName() string { func (reS *ResourceService) ShouldRun() bool { return reS.cfg.ResourceSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (reS *ResourceService) StateChan(stateID string) chan struct{} { + return reS.stateDeps.StateChan(stateID) +} diff --git a/services/routes.go b/services/routes.go index eb60aefc0..08b7e2c02 100644 --- a/services/routes.go +++ b/services/routes.go @@ -37,7 +37,8 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalRouteSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &RouteService{ connChan: internalRouteSChan, cfg: cfg, @@ -48,6 +49,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, connMgr: connMgr, anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -68,6 +70,9 @@ type RouteService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -142,3 +147,8 @@ func (routeS *RouteService) ServiceName() string { func (routeS *RouteService) ShouldRun() bool { return routeS.cfg.RouteSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (routeS *RouteService) StateChan(stateID string) chan struct{} { + return routeS.stateDeps.StateChan(stateID) +} diff --git a/services/serviceindexer.go b/services/serviceindexer.go deleted file mode 100644 index 86c2377bc..000000000 --- a/services/serviceindexer.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package services - -import ( - "sync" - - "github.com/cgrates/cgrates/servmanager" -) - -// NewServiceIndexer constructs a ServiceIndexer -func NewServiceIndexer() *ServiceIndexer { - return &ServiceIndexer{srvS: make(map[string]servmanager.Service)} -} - -// ServiceIndexer implements servmanager.Service indexing in a thread safe way -type ServiceIndexer struct { - mux sync.RWMutex - - srvS map[string]servmanager.Service // servmanager.Services indexed by ID -} - -// Getservmanager.Service returns one servmanager.Service or nil -func (sI *ServiceIndexer) GetService(srvID string) servmanager.Service { - sI.mux.RLock() - defer sI.mux.RUnlock() - return sI.srvS[srvID] -} - -// Addservmanager.Service adds a servmanager.Service based on it's id to the index -func (sI *ServiceIndexer) AddService(srvID string, srv servmanager.Service) { - sI.mux.Lock() - sI.srvS[srvID] = srv - sI.mux.Unlock() -} - -// Remservmanager.Service will remove a servmanager.Service based on it's ID -func (sI *ServiceIndexer) RemService(srvID string) { - sI.mux.Lock() - defer sI.mux.Unlock() - delete(sI.srvS, srvID) -} - -// Getservmanager.Services returns the list of servmanager.Services indexed -func (sI *ServiceIndexer) GetServices() []servmanager.Service { - sI.mux.RLock() - defer sI.mux.RUnlock() - srvs := make([]servmanager.Service, 0, len(sI.srvS)) - for _, s := range sI.srvS { - srvs = append(srvs, s) - } - return srvs -} diff --git a/services/sessions.go b/services/sessions.go index 5d7685ede..6c3686cbb 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -38,7 +38,8 @@ import ( func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &SessionService{ connChan: internalChan, cfg: cfg, @@ -48,6 +49,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha connMgr: connMgr, anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -69,6 +71,9 @@ type SessionService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -172,3 +177,8 @@ func (smg *SessionService) ServiceName() string { func (smg *SessionService) ShouldRun() bool { return smg.cfg.SessionSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (smg *SessionService) StateChan(stateID string) chan struct{} { + return smg.stateDeps.StateChan(stateID) +} diff --git a/services/sipagent.go b/services/sipagent.go index 96f35fdf6..bafbb1378 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -33,12 +33,14 @@ import ( // NewSIPAgent returns the sip Agent func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &SIPAgent{ cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -53,6 +55,9 @@ type SIPAgent struct { srvDep map[string]*sync.WaitGroup oldListen string + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -125,3 +130,8 @@ func (sip *SIPAgent) ServiceName() string { func (sip *SIPAgent) ShouldRun() bool { return sip.cfg.SIPAgentCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (sip *SIPAgent) StateChan(stateID string) chan struct{} { + return sip.stateDeps.StateChan(stateID) +} diff --git a/services/libstatedeps.go b/services/statedeps.go similarity index 68% rename from services/libstatedeps.go rename to services/statedeps.go index dd9df8275..27795bfa8 100644 --- a/services/libstatedeps.go +++ b/services/statedeps.go @@ -22,28 +22,25 @@ import ( "sync" ) -// newStateDependencies constructs a stateDependencies struct -func newStateDependencies() *stateDependencies { - return &stateDependencies{stateDeps: make(map[string]chan struct{})} +// NewStateDependencies constructs a StateDependencies struct +func NewStateDependencies(servStates []string) (stDeps *StateDependencies) { + stDeps = &StateDependencies{stateDeps: make(map[string]chan struct{})} + for _, stateID := range servStates { + stDeps.stateDeps[stateID] = make(chan struct{}) + } + return } -// stateDependencies enhances a service with state dependencies management -type stateDependencies struct { +// StateDependencies enhances a service with state dependencies management +type StateDependencies struct { stateDeps map[string]chan struct{} // listeners for various states of the service stateDepsMux sync.RWMutex // protects stateDeps } // RegisterStateDependency will be called by a service interested by specific stateID of the service -func (sDs *stateDependencies) RegisterStateDependency(stateID string) (retChan chan struct{}) { +func (sDs *StateDependencies) StateChan(stateID string) (retChan chan struct{}) { sDs.stateDepsMux.RLock() retChan = sDs.stateDeps[stateID] sDs.stateDepsMux.RUnlock() - if retChan != nil { - return - } - sDs.stateDepsMux.Lock() - defer sDs.stateDepsMux.Unlock() - retChan = make(chan struct{}) - sDs.stateDeps[stateID] = retChan return } diff --git a/services/stats.go b/services/stats.go index 6a86d5f58..58d58ac4a 100644 --- a/services/stats.go +++ b/services/stats.go @@ -36,7 +36,8 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalStatSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &StatService{ connChan: internalStatSChan, cfg: cfg, @@ -47,6 +48,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, connMgr: connMgr, anzChan: anzChan, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -67,6 +69,9 @@ type StatService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -149,3 +154,8 @@ func (sts *StatService) ServiceName() string { func (sts *StatService) ShouldRun() bool { return sts.cfg.StatSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (sts *StatService) StateChan(stateID string) chan struct{} { + return sts.stateDeps.StateChan(stateID) +} diff --git a/services/stordb.go b/services/stordb.go index 4376cba4b..648004f18 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -25,16 +25,19 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) // NewStorDBService returns the StorDB Service func NewStorDBService(cfg *config.CGRConfig, setVersions bool, - srvDep map[string]*sync.WaitGroup) *StorDBService { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) *StorDBService { return &StorDBService{ cfg: cfg, setVersions: setVersions, srvDep: srvDep, + srvIndexer: srvIndexer, } } @@ -49,6 +52,9 @@ type StorDBService struct { setVersions bool srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -203,3 +209,8 @@ func (db *StorDBService) needsConnectionReload() bool { db.oldDBCfg.Opts.PgSSLCertMode != db.cfg.StorDbCfg().Opts.PgSSLCertMode || db.oldDBCfg.Opts.PgSSLRootCert != db.cfg.StorDbCfg().Opts.PgSSLRootCert) } + +// StateChan returns signaling channel of specific state +func (db *StorDBService) StateChan(stateID string) chan struct{} { + return db.stateDeps.StateChan(stateID) +} diff --git a/services/thresholds.go b/services/thresholds.go index cbda12c2c..ab5814700 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -36,7 +36,8 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, internalThresholdSChan chan birpc.ClientConnector, - anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { + anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &ThresholdService{ connChan: internalThresholdSChan, cfg: cfg, @@ -47,6 +48,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, connMgr: connMgr, + srvIndexer: srvIndexer, } } @@ -67,6 +69,9 @@ type ThresholdService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -148,3 +153,8 @@ func (thrs *ThresholdService) ServiceName() string { func (thrs *ThresholdService) ShouldRun() bool { return thrs.cfg.ThresholdSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (thrs *ThresholdService) StateChan(stateID string) chan struct{} { + return thrs.stateDeps.StateChan(stateID) +} diff --git a/services/tpes.go b/services/tpes.go index c38e04960..e2a345931 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -34,13 +34,15 @@ import ( // NewTPeService is the constructor for the TpeService func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService, - clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) servmanager.Service { + clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &TPeService{ - cfg: cfg, - srvDep: srvDep, - dm: dm, - connMgr: connMgr, - clSChan: clSChan, + cfg: cfg, + srvDep: srvDep, + dm: dm, + connMgr: connMgr, + clSChan: clSChan, + srvIndexer: srvIndexer, } } @@ -59,6 +61,9 @@ type TPeService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start @@ -107,3 +112,8 @@ func (ts *TPeService) ServiceName() string { func (ts *TPeService) ShouldRun() bool { return ts.cfg.TpeSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (ts *TPeService) StateChan(stateID string) chan struct{} { + return ts.stateDeps.StateChan(stateID) +} diff --git a/services/trends.go b/services/trends.go index 5db1725cd..293693cf4 100644 --- a/services/trends.go +++ b/services/trends.go @@ -36,7 +36,8 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS, internalTrendSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anzChan chan *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup, + srvIndexer *servmanager.ServiceIndexer) servmanager.Service { return &TrendService{ connChan: internalTrendSChan, cfg: cfg, @@ -47,6 +48,7 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, anzChan: anzChan, srvDep: srvDep, filterSChan: filterSChan, + srvIndexer: srvIndexer, } } @@ -66,6 +68,9 @@ type TrendService struct { connMgr *engine.ConnManager cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup + + srvIndexer *servmanager.ServiceIndexer // access directly services from here + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the sercive start @@ -148,3 +153,8 @@ func (trs *TrendService) ServiceName() string { func (trs *TrendService) ShouldRun() bool { return trs.cfg.TrendSCfg().Enabled } + +// StateChan returns signaling channel of specific state +func (trs *TrendService) StateChan(stateID string) chan struct{} { + return trs.stateDeps.StateChan(stateID) +} diff --git a/services/stateindexer.go b/servmanager/serviceindexer.go similarity index 62% rename from services/stateindexer.go rename to servmanager/serviceindexer.go index 63810f2cc..dc08f4730 100644 --- a/services/stateindexer.go +++ b/servmanager/serviceindexer.go @@ -16,52 +16,50 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package services +package servmanager import ( "sync" - - "github.com/cgrates/cgrates/servmanager" ) -// NewStateIndexer constructs a StateIndexer -func NewStateIndexer() *StateIndexer { - return &StateIndexer{srvS: make(map[string]servmanager.Service)} +// NewServiceIndexer constructs a ServiceIndexer +func NewServiceIndexer() *ServiceIndexer { + return &ServiceIndexer{srvS: make(map[string]Service)} } -// StateIndexer implements service indexing in a thread safe way -type StateIndexer struct { +// ServiceIndexer implements service indexing in a thread safe way +type ServiceIndexer struct { mux sync.RWMutex - srvS map[string]servmanager.Service // services indexed by ID + srvS map[string]Service // services indexed by ID } // GetService returns one service or nil -func (sI *StateIndexer) GetService(srvID string) servmanager.Service { +func (sI *ServiceIndexer) GetService(srvID string) Service { sI.mux.RLock() defer sI.mux.RUnlock() return sI.srvS[srvID] } // AddService adds a service based on it's id to the index -func (sI *StateIndexer) AddService(srvID string, srv servmanager.Service) { +func (sI *ServiceIndexer) AddService(srvID string, srv Service) { sI.mux.Lock() sI.srvS[srvID] = srv sI.mux.Unlock() } -// RemService will remove a service based on it's ID -func (sI *StateIndexer) RemService(srvID string) { +// RemoveService will remove a service based on it's ID +func (sI *ServiceIndexer) RemoveService(srvID string) { sI.mux.Lock() defer sI.mux.Unlock() delete(sI.srvS, srvID) } // GetServices returns the list of services indexed -func (sI *StateIndexer) GetServices() []servmanager.Service { +func (sI *ServiceIndexer) GetServices() []Service { sI.mux.RLock() defer sI.mux.RUnlock() - srvs := make([]servmanager.Service, 0, len(sI.srvS)) + srvs := make([]Service, 0, len(sI.srvS)) for _, s := range sI.srvS { srvs = append(srvs, s) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 96d370838..d1c777a7a 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -30,7 +30,7 @@ import ( // NewServiceManager returns a service manager func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager, - cfg *config.CGRConfig, services []Service) (sM *ServiceManager) { + cfg *config.CGRConfig, srvIndxr *ServiceIndexer, services []Service) (sM *ServiceManager) { sM = &ServiceManager{ cfg: cfg, subsystems: make(map[string]Service), @@ -48,6 +48,8 @@ type ServiceManager struct { cfg *config.CGRConfig subsystems map[string]Service // active subsystems managed by SM + serviceIndexer *ServiceIndexer // index here the services for accessing them by their IDs + shdWg *sync.WaitGroup // list of shutdown items connMgr *engine.ConnManager rldChan <-chan string // reload signals come over this channelc @@ -68,6 +70,18 @@ func (srvMngr *ServiceManager) StartServices(ctx *context.Context, shtDwn contex }(service) } } + for _, service := range srvMngr.serviceIndexer.GetServices() { + if service.ShouldRun() && !service.IsRunning() { + srvMngr.shdWg.Add(1) + go func(srv Service) { + if err := srv.Start(ctx, shtDwn); err != nil && + err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine + utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, srv.ServiceName(), err)) + shtDwn() + } + }(service) + } + } // startServer() } @@ -165,6 +179,8 @@ type Service interface { ShouldRun() bool // ServiceName returns the service name ServiceName() string + // StateChan returns the channel for specific state subscription + StateChan(stateID string) chan struct{} } // ArgsServiceID are passed to Start/Stop/Status RPC methods diff --git a/utils/consts.go b/utils/consts.go index 2d987b92d..8bfa1159c 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2828,6 +2828,10 @@ const ( ToNearestTowardZero = "*toNearestTowardZero" ) +const ( + StateServiceUP = "SERVICE_UP" +) + func buildCacheInstRevPrefixes() { CachePrefixToInstance = make(map[string]string) for k, v := range CacheInstanceToPrefix { diff --git a/utils/errors.go b/utils/errors.go index 432e9aacd..9ebb3694b 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -249,3 +249,8 @@ func ErrPathNotReachable(path string) error { func NewSTIRError(reason string) error { return fmt.Errorf("%s: %s", MetaSTIRAuthenticate, reason) } + +// NewServiceTimeoutError is called when waiting +func NewServiceStateTimeoutError(sourceID, srvID, stateID string) error { + return fmt.Errorf("%s: service <%s> state <%s> timeout", sourceID, srvID, stateID) +}