Adding serviceIndexer and StateDependencies

This commit is contained in:
DanB
2024-11-28 14:56:03 +01:00
parent 23e5f0e387
commit 6b241ee35b
45 changed files with 571 additions and 227 deletions

View File

@@ -59,7 +59,7 @@ func runCGREngine(fs []string) (err error) {
if cfg, err = services.InitConfigFromPath(ctx, *flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil || *flags.CheckConfig {
return
}
cgr := services.NewCGREngine(cfg, []servmanager.Service{})
cgr := services.NewCGREngine(cfg, servmanager.NewServiceIndexer(), []servmanager.Service{})
defer cgr.Stop(*flags.PidFile)
if err = cgr.Run(ctx, cancel, flags, vers,

View File

@@ -39,7 +39,8 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS,
internalChan chan birpc.ClientConnector,
anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &AccountService{
connChan: internalChan,
cfg: cfg,
@@ -51,6 +52,7 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
rldChan: make(chan struct{}, 1),
srvIndexer: srvIndexer,
}
}
@@ -73,6 +75,9 @@ type AccountService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -80,7 +85,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
if acts.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
acts.stateDeps = NewStateDependencies([]string{utils.StateServiceUP})
acts.cl = <-acts.clSChan
acts.clSChan <- acts.cl
if err = acts.cacheS.WaitToPrecache(ctx,
@@ -154,3 +159,8 @@ func (acts *AccountService) ServiceName() string {
func (acts *AccountService) ShouldRun() bool {
return acts.cfg.AccountSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (acts *AccountService) StateChan(stateID string) chan struct{} {
return acts.stateDeps.StateChan(stateID)
}

View File

@@ -39,7 +39,8 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ActionService{
connChan: internalChan,
connMgr: connMgr,
@@ -51,6 +52,7 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
rldChan: make(chan struct{}, 1),
srvIndexer: srvIndexer,
}
}
@@ -74,6 +76,9 @@ type ActionService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -153,3 +158,8 @@ func (acts *ActionService) ServiceName() string {
func (acts *ActionService) ShouldRun() bool {
return acts.cfg.ActionSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (acts *ActionService) StateChan(stateID string) chan struct{} {
return acts.stateDeps.StateChan(stateID)
}

View File

@@ -37,7 +37,8 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
internalAPIerSv1Chan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &AdminSv1Service{
connChan: internalAPIerSv1Chan,
cfg: cfg,
@@ -48,6 +49,7 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
connMgr: connMgr,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -69,6 +71,9 @@ type AdminSv1Service struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -153,3 +158,8 @@ func (apiService *AdminSv1Service) ServiceName() string {
func (apiService *AdminSv1Service) ShouldRun() bool {
return apiService.cfg.AdminSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (apiService *AdminSv1Service) StateChan(stateID string) chan struct{} {
return apiService.stateDeps.StateChan(stateID)
}

View File

@@ -28,6 +28,7 @@ import (
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
@@ -36,7 +37,8 @@ func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.Com
filterSChan chan *engine.FilterS,
internalAnalyzerSChan chan birpc.ClientConnector,
anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) *AnalyzerService {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) *AnalyzerService {
return &AnalyzerService{
connChan: internalAnalyzerSChan,
cfg: cfg,
@@ -44,6 +46,7 @@ func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.Com
filterSChan: filterSChan,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -62,6 +65,10 @@ type AnalyzerService struct {
connChan chan birpc.ClientConnector
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -165,3 +172,8 @@ func (anz *AnalyzerService) GetInternalCodec(c birpc.ClientConnector, to string)
}
return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to)
}
// StateChan returns signaling channel of specific state
func (anz *AnalyzerService) StateChan(stateID string) chan struct{} {
return anz.stateDeps.StateChan(stateID)
}

View File

@@ -34,11 +34,13 @@ import (
// NewAsteriskAgent returns the Asterisk Agent
func NewAsteriskAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &AsteriskAgent{
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -51,6 +53,9 @@ type AsteriskAgent struct {
smas []*agents.AsteriskAgent
connMgr *engine.ConnManager
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -113,3 +118,8 @@ func (ast *AsteriskAgent) ServiceName() string {
func (ast *AsteriskAgent) ShouldRun() bool {
return ast.cfg.AsteriskAgentCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (ast *AsteriskAgent) StateChan(stateID string) chan struct{} {
return ast.stateDeps.StateChan(stateID)
}

View File

@@ -37,17 +37,19 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
anzChan chan *AnalyzerService, dspS *DispatcherService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup, sIndxr *servmanager.ServiceIndexer) servmanager.Service {
return &AttributeService{
connChan: internalChan,
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
clSChan: clSChan,
anzChan: anzChan,
srvDep: srvDep,
dspS: dspS,
connChan: internalChan,
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
clSChan: clSChan,
anzChan: anzChan,
srvDep: srvDep,
dspS: dspS,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
serviceIndexer: sIndxr,
}
}
@@ -69,6 +71,9 @@ type AttributeService struct {
connChan chan birpc.ClientConnector // publish the internal Subsystem when available
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
serviceIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies
}
// Start should handle the service start
@@ -76,7 +81,11 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
if attrS.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
if utils.StructChanTimeout(
attrS.serviceIndexer.GetService(utils.CommonListenerS).StateChan(utils.StateServiceUP),
attrS.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.AttributeS, utils.CommonListenerS, utils.StateServiceUP)
}
attrS.cl = <-attrS.clSChan
attrS.clSChan <- attrS.cl
if err = attrS.cacheS.WaitToPrecache(ctx,
@@ -120,6 +129,7 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
}
}()
attrS.connChan <- anz.GetInternalCodec(srv, utils.AttributeS)
close(attrS.stateDeps.StateChan(utils.StateServiceUP)) // inform listeners about the service reaching UP state
return
}
@@ -157,3 +167,8 @@ func (attrS *AttributeService) ServiceName() string {
func (attrS *AttributeService) ShouldRun() bool {
return attrS.cfg.AttributeSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (attrS *AttributeService) StateChan(stateID string) chan struct{} {
return attrS.stateDeps.StateChan(stateID)
}

View File

@@ -27,6 +27,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
@@ -35,17 +36,19 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C
clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
anzChan chan *AnalyzerService, // dspS *DispatcherService,
cores *CoreService,
srvDep map[string]*sync.WaitGroup) *CacheService {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) *CacheService {
return &CacheService{
cfg: cfg,
srvDep: srvDep,
anzChan: anzChan,
cores: cores,
clSChan: clSChan,
dm: dm,
connMgr: connMgr,
rpc: internalChan,
cacheCh: make(chan *engine.CacheS, 1),
cfg: cfg,
srvDep: srvDep,
anzChan: anzChan,
cores: cores,
clSChan: clSChan,
dm: dm,
connMgr: connMgr,
rpc: internalChan,
cacheCh: make(chan *engine.CacheS, 1),
srvIndexer: srvIndexer,
}
}
@@ -63,6 +66,9 @@ type CacheService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -144,3 +150,8 @@ func (cS *CacheService) WaitToPrecache(ctx *context.Context, cacheIDs ...string)
}
return
}
// StateChan returns signaling channel of specific state
func (cS *CacheService) StateChan(stateID string) chan struct{} {
return cS.stateDeps.StateChan(stateID)
}

View File

@@ -38,7 +38,8 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
storDB *StorDBService, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, internalCDRServerChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &CDRService{
connChan: internalCDRServerChan,
cfg: cfg,
@@ -49,6 +50,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
connMgr: connMgr,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -70,6 +72,9 @@ type CDRService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -147,3 +152,8 @@ func (cs *CDRService) ServiceName() string {
func (cs *CDRService) ShouldRun() bool {
return cs.cfg.CdrsCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (cs *CDRService) StateChan(stateID string) chan struct{} {
return cs.stateDeps.StateChan(stateID)
}

View File

@@ -39,7 +39,7 @@ import (
"github.com/cgrates/rpcclient"
)
func NewCGREngine(cfg *config.CGRConfig,
func NewCGREngine(cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer,
services []servmanager.Service) *CGREngine {
cM := engine.NewConnManager(cfg)
caps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy)
@@ -49,7 +49,7 @@ func NewCGREngine(cfg *config.CGRConfig,
cM: cM,
caps: caps, // caps is used to limit RPC CPS
shdWg: shdWg, // wait for shutdown
srvManager: servmanager.NewServiceManager(shdWg, cM, cfg, services),
srvManager: servmanager.NewServiceManager(shdWg, cM, cfg, sIdxr, services),
srvDep: map[string]*sync.WaitGroup{
utils.AccountS: new(sync.WaitGroup),
utils.ActionS: new(sync.WaitGroup),
@@ -194,73 +194,78 @@ func (cgr *CGREngine) InitServices(setVersions bool) {
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs), utils.EfSv1, iEFsCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs), utils.ErSv1, iERsCh)
cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep)
cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, setVersions, cgr.srvDep)
cgr.sdbS = NewStorDBService(cgr.cfg, setVersions, cgr.srvDep)
cgr.cls = NewCommonListenerService(cgr.cfg, cgr.caps, cgr.clsCh, cgr.srvDep)
// ServiceIndexer will share service references to all services
srvIdxr := servmanager.NewServiceIndexer()
cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep, srvIdxr)
cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, setVersions, cgr.srvDep, srvIdxr)
cgr.sdbS = NewStorDBService(cgr.cfg, setVersions, cgr.srvDep, srvIdxr)
cgr.cls = NewCommonListenerService(cgr.cfg, cgr.caps, cgr.clsCh, cgr.srvDep, srvIdxr)
cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.clsCh,
cgr.iFilterSCh, iAnalyzerSCh, cgr.anzCh, cgr.srvDep)
cgr.iFilterSCh, iAnalyzerSCh, cgr.anzCh, cgr.srvDep, srvIdxr)
cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.clsCh, iCoreSv1Ch, cgr.anzCh, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep)
cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.clsCh, iCoreSv1Ch, cgr.anzCh,
cgr.cpuPrfF, cgr.shdWg, cgr.srvDep, srvIdxr)
cgr.cacheS = NewCacheService(cgr.cfg, cgr.dmS, cgr.cM,
cgr.clsCh, iCacheSCh, cgr.anzCh, cgr.coreS,
cgr.srvDep)
cgr.srvDep, srvIdxr)
dspS := NewDispatcherService(cgr.cfg, cgr.dmS, cgr.cacheS,
cgr.iFilterSCh, cgr.clsCh, cgr.iDispatcherSCh, cgr.cM,
cgr.anzCh, cgr.srvDep)
cgr.anzCh, cgr.srvDep, srvIdxr)
cgr.ldrs = NewLoaderService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh,
iLoaderSCh, cgr.cM, cgr.anzCh, cgr.srvDep)
iLoaderSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr)
cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.clsCh, cgr.srvDep)
cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.clsCh, cgr.srvDep, srvIdxr)
cgr.srvManager.AddServices(cgr.gvS, cgr.cls, cgr.coreS, cgr.cacheS,
cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.sdbS, cgr.efs,
NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.clsCh,
iAdminSCh, cgr.cM, cgr.anzCh, cgr.srvDep),
NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh, iSessionSCh, cgr.cM, cgr.anzCh, cgr.srvDep),
iAdminSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr),
NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.clsCh, iSessionSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr),
NewAttributeService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh, iAttributeSCh,
cgr.anzCh, dspS, cgr.srvDep),
cgr.anzCh, dspS, cgr.srvDep, srvIdxr),
NewChargerService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iChargerSCh, cgr.cM, cgr.anzCh, cgr.srvDep),
iChargerSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr),
NewRouteService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iRouteSCh, cgr.cM, cgr.anzCh, cgr.srvDep),
iRouteSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr),
NewResourceService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iResourceSCh, cgr.cM, cgr.anzCh, cgr.srvDep),
iResourceSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr),
NewTrendService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iTrendSCh, cgr.cM, cgr.anzCh, cgr.srvDep),
iTrendSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr),
NewRankingService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iRankingSCh, cgr.cM, cgr.anzCh, cgr.srvDep),
iRankingSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr),
NewThresholdService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh,
cgr.cM, cgr.clsCh, iThresholdSCh, cgr.anzCh, cgr.srvDep),
cgr.cM, cgr.clsCh, iThresholdSCh, cgr.anzCh, cgr.srvDep, srvIdxr),
NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.clsCh,
iStatSCh, cgr.cM, cgr.anzCh, cgr.srvDep),
iStatSCh, cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr),
NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iERsCh, cgr.anzCh, cgr.srvDep),
NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep),
NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep),
NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep),
NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep),
NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep), // partial reload
NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload
NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.caps, cgr.srvDep), // partial reload
NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep), // no reload
NewSIPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep),
NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iERsCh, cgr.anzCh, cgr.srvDep, srvIdxr),
NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep, srvIdxr),
NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr),
NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr),
NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep, srvIdxr),
NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr), // partial reload
NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep, srvIdxr), // partial reload
NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.caps, cgr.srvDep, srvIdxr), // partial reload
NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.clsCh, cgr.cM, cgr.srvDep, srvIdxr), // no reload
NewSIPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep, srvIdxr),
NewEventExporterService(cgr.cfg, cgr.iFilterSCh,
cgr.cM, cgr.clsCh, iEEsCh, cgr.anzCh, cgr.srvDep),
cgr.cM, cgr.clsCh, iEEsCh, cgr.anzCh, cgr.srvDep, srvIdxr),
NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.clsCh, iCDRServerCh,
cgr.cM, cgr.anzCh, cgr.srvDep),
cgr.cM, cgr.anzCh, cgr.srvDep, srvIdxr),
NewRegistrarCService(cgr.cfg, cgr.cM, cgr.srvDep),
NewRegistrarCService(cgr.cfg, cgr.cM, cgr.srvDep, srvIdxr),
NewRateService(cgr.cfg, cgr.cacheS, cgr.iFilterSCh, cgr.dmS,
cgr.clsCh, iRateSCh, cgr.anzCh, cgr.srvDep),
NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iActionSCh, cgr.anzCh, cgr.srvDep),
NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.clsCh, iAccountSCh, cgr.anzCh, cgr.srvDep),
NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.clsCh, cgr.srvDep),
cgr.clsCh, iRateSCh, cgr.anzCh, cgr.srvDep, srvIdxr),
NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM,
cgr.clsCh, iActionSCh, cgr.anzCh, cgr.srvDep, srvIdxr),
NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh,
cgr.cM, cgr.clsCh, iAccountSCh, cgr.anzCh, cgr.srvDep, srvIdxr),
NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.clsCh, cgr.srvDep, srvIdxr),
)
}

View File

@@ -36,7 +36,8 @@ import (
func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
internalChargerSChan chan birpc.ClientConnector, connMgr *engine.ConnManager,
anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ChargerService{
connChan: internalChargerSChan,
cfg: cfg,
@@ -47,6 +48,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
connMgr: connMgr,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -67,6 +69,9 @@ type ChargerService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -140,3 +145,8 @@ func (chrS *ChargerService) ServiceName() string {
func (chrS *ChargerService) ShouldRun() bool {
return chrS.cfg.ChargerSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (chrS *ChargerService) StateChan(stateID string) chan struct{} {
return chrS.stateDeps.StateChan(stateID)
}

View File

@@ -26,16 +26,20 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/registrarc"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewCommonListenerService instantiates a new CommonListenerService.
func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) *CommonListenerService {
func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps,
clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) *CommonListenerService {
return &CommonListenerService{
cfg: cfg,
caps: caps,
clSChan: clSChan,
srvDep: srvDep,
cfg: cfg,
caps: caps,
clSChan: clSChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -49,6 +53,9 @@ type CommonListenerService struct {
caps *engine.Caps
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start handles the service start.
@@ -99,3 +106,8 @@ func (cl *CommonListenerService) ServiceName() string {
func (cl *CommonListenerService) ShouldRun() bool {
return true
}
// StateChan returns signaling channel of specific state
func (cl *CommonListenerService) StateChan(stateID string) chan struct{} {
return cl.stateDeps.StateChan(stateID)
}

View File

@@ -29,6 +29,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
@@ -36,17 +37,19 @@ import (
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *commonlisteners.CommonListenerS,
internalCoreSChan chan birpc.ClientConnector, anzChan chan *AnalyzerService,
fileCPU *os.File, shdWg *sync.WaitGroup,
srvDep map[string]*sync.WaitGroup) *CoreService {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) *CoreService {
return &CoreService{
shdWg: shdWg,
connChan: internalCoreSChan,
cfg: cfg,
caps: caps,
fileCPU: fileCPU,
clSChan: clSChan,
anzChan: anzChan,
srvDep: srvDep,
csCh: make(chan *cores.CoreS, 1),
shdWg: shdWg,
connChan: internalCoreSChan,
cfg: cfg,
caps: caps,
fileCPU: fileCPU,
clSChan: clSChan,
anzChan: anzChan,
srvDep: srvDep,
csCh: make(chan *cores.CoreS, 1),
srvIndexer: srvIndexer,
}
}
@@ -68,6 +71,9 @@ type CoreService struct {
connChan chan birpc.ClientConnector
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -150,3 +156,8 @@ func (cS *CoreService) WaitForCoreS(ctx *context.Context) (cs *cores.CoreS, err
}
return
}
// StateChan returns signaling channel of specific state
func (cS *CoreService) StateChan(stateID string) chan struct{} {
return cS.stateDeps.StateChan(stateID)
}

View File

@@ -25,18 +25,21 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewDataDBService returns the DataDB Service
func NewDataDBService(cfg *config.CGRConfig, connMgr *engine.ConnManager, setVersions bool,
srvDep map[string]*sync.WaitGroup) *DataDBService {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) *DataDBService {
return &DataDBService{
cfg: cfg,
dbchan: make(chan *engine.DataManager, 1),
connMgr: connMgr,
setVersions: setVersions,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -52,6 +55,9 @@ type DataDBService struct {
setVersions bool
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start handles the service start.
@@ -189,3 +195,8 @@ func (db *DataDBService) WaitForDM(ctx *context.Context) (datadb *engine.DataMan
}
return
}
// StateChan returns signaling channel of specific state
func (db *DataDBService) StateChan(stateID string) chan struct{} {
return db.stateDeps.StateChan(stateID)
}

View File

@@ -33,13 +33,15 @@ import (
// NewDiameterAgent returns the Diameter Agent
func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager, caps *engine.Caps,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &DiameterAgent{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
caps: caps,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -58,6 +60,9 @@ type DiameterAgent struct {
laddr string
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -137,3 +142,8 @@ func (da *DiameterAgent) ServiceName() string {
func (da *DiameterAgent) ShouldRun() bool {
return da.cfg.DiameterAgentCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (da *DiameterAgent) StateChan(stateID string) chan struct{} {
return da.stateDeps.StateChan(stateID)
}

View File

@@ -27,6 +27,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
@@ -35,7 +36,8 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) *DispatcherService {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) *DispatcherService {
return &DispatcherService{
connChan: internalChan,
cfg: cfg,
@@ -47,6 +49,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
srvsReload: make(map[string]chan struct{}),
srvIndexer: srvIndexer,
}
}
@@ -68,6 +71,9 @@ type DispatcherService struct {
cfg *config.CGRConfig
srvsReload map[string]chan struct{}
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -180,3 +186,8 @@ func (dspS *DispatcherService) sync() {
c <- struct{}{}
}
}
// StateChan returns signaling channel of specific state
func (dspS *DispatcherService) StateChan(stateID string) chan struct{} {
return dspS.stateDeps.StateChan(stateID)
}

View File

@@ -33,12 +33,14 @@ import (
// NewDNSAgent returns the DNS Agent
func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &DNSAgent{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -53,6 +55,9 @@ type DNSAgent struct {
dns *agents.DNSAgent
connMgr *engine.ConnManager
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -142,3 +147,8 @@ func (dns *DNSAgent) ServiceName() string {
func (dns *DNSAgent) ShouldRun() bool {
return dns.cfg.DNSAgentCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (dns *DNSAgent) StateChan(stateID string) chan struct{} {
return dns.stateDeps.StateChan(stateID)
}

View File

@@ -35,7 +35,8 @@ import (
// NewEventExporterService constructs EventExporterService
func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager, clSChan chan *commonlisteners.CommonListenerS, intConnChan chan birpc.ClientConnector,
anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &EventExporterService{
cfg: cfg,
filterSChan: filterSChan,
@@ -44,6 +45,7 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil
intConnChan: intConnChan,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -62,6 +64,9 @@ type EventExporterService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// ServiceName returns the service name
@@ -134,3 +139,8 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
es.intConnChan <- anz.GetInternalCodec(srv, utils.EEs)
return nil
}
// StateChan returns signaling channel of specific state
func (es *EventExporterService) StateChan(stateID string) chan struct{} {
return es.stateDeps.StateChan(stateID)
}

View File

@@ -29,6 +29,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
@@ -47,18 +48,24 @@ type ExportFailoverService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// NewExportFailoverService is the constructor for the TpeService
func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
intConnChan chan birpc.ClientConnector,
clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) *ExportFailoverService {
clSChan chan *commonlisteners.CommonListenerS,
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) *ExportFailoverService {
return &ExportFailoverService{
cfg: cfg,
clSChan: clSChan,
connMgr: connMgr,
intConnChan: intConnChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -110,3 +117,8 @@ func (efServ *ExportFailoverService) ShouldRun() bool {
func (efServ *ExportFailoverService) ServiceName() string {
return utils.EFs
}
// StateChan returns signaling channel of specific state
func (efServ *ExportFailoverService) StateChan(stateID string) chan struct{} {
return efServ.stateDeps.StateChan(stateID)
}

View File

@@ -40,7 +40,8 @@ func NewEventReaderService(
clSChan chan *commonlisteners.CommonListenerS,
intConn chan birpc.ClientConnector,
anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &EventReaderService{
rldChan: make(chan struct{}, 1),
cfg: cfg,
@@ -50,6 +51,7 @@ func NewEventReaderService(
intConn: intConn,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -70,6 +72,9 @@ type EventReaderService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -152,3 +157,8 @@ func (erS *EventReaderService) ServiceName() string {
func (erS *EventReaderService) ShouldRun() bool {
return erS.cfg.ERsCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (erS *EventReaderService) StateChan(stateID string) chan struct{} {
return erS.stateDeps.StateChan(stateID)
}

View File

@@ -34,11 +34,13 @@ import (
// NewFreeswitchAgent returns the Freeswitch Agent
func NewFreeswitchAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &FreeswitchAgent{
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -50,6 +52,9 @@ type FreeswitchAgent struct {
fS *agents.FSsessions
connMgr *engine.ConnManager
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -112,3 +117,8 @@ func (fS *FreeswitchAgent) ServiceName() string {
func (fS *FreeswitchAgent) ShouldRun() bool {
return fS.cfg.FsAgentCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (fS *FreeswitchAgent) StateChan(stateID string) chan struct{} {
return fS.stateDeps.StateChan(stateID)
}

View File

@@ -25,15 +25,18 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewGlobalVarS .
func NewGlobalVarS(cfg *config.CGRConfig,
srvDep map[string]*sync.WaitGroup) *GlobalVarS {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) *GlobalVarS {
return &GlobalVarS{
cfg: cfg,
srvDep: srvDep,
cfg: cfg,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -41,6 +44,9 @@ func NewGlobalVarS(cfg *config.CGRConfig,
type GlobalVarS struct {
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -82,3 +88,8 @@ func (gv *GlobalVarS) ServiceName() string {
func (gv *GlobalVarS) ShouldRun() bool {
return true
}
// StateChan returns signaling channel of specific state
func (gv *GlobalVarS) StateChan(stateID string) chan struct{} {
return gv.stateDeps.StateChan(stateID)
}

View File

@@ -34,13 +34,15 @@ import (
// NewHTTPAgent returns the HTTP Agent
func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &HTTPAgent{
cfg: cfg,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -60,6 +62,9 @@ type HTTPAgent struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -117,3 +122,8 @@ func (ha *HTTPAgent) ServiceName() string {
func (ha *HTTPAgent) ShouldRun() bool {
return len(ha.cfg.HTTPAgentCfg()) != 0
}
// StateChan returns signaling channel of specific state
func (ha *HTTPAgent) StateChan(stateID string) chan struct{} {
return ha.stateDeps.StateChan(stateID)
}

View File

@@ -35,13 +35,15 @@ import (
// NewJanusAgent returns the Janus Agent
func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &JanusAgent{
cfg: cfg,
filterSChan: filterSChan,
clSChan: clSChan,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -61,6 +63,9 @@ type JanusAgent struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should jandle the sercive start
@@ -129,3 +134,8 @@ func (ja *JanusAgent) ServiceName() string {
func (ja *JanusAgent) ShouldRun() bool {
return ja.cfg.JanusAgentCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (ja *JanusAgent) StateChan(stateID string) chan struct{} {
return ja.stateDeps.StateChan(stateID)
}

View File

@@ -35,11 +35,13 @@ import (
// NewKamailioAgent returns the Kamailio Agent
func NewKamailioAgent(cfg *config.CGRConfig,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &KamailioAgent{
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -51,6 +53,9 @@ type KamailioAgent struct {
kam *agents.KamailioAgent
connMgr *engine.ConnManager
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -118,3 +123,8 @@ func (kam *KamailioAgent) ServiceName() string {
func (kam *KamailioAgent) ShouldRun() bool {
return kam.cfg.KamAgentCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (kam *KamailioAgent) StateChan(stateID string) chan struct{} {
return kam.stateDeps.StateChan(stateID)
}

View File

@@ -28,6 +28,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
@@ -36,7 +37,8 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS, clSChan chan *commonlisteners.CommonListenerS,
internalLoaderSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) *LoaderService {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) *LoaderService {
return &LoaderService{
connChan: internalLoaderSChan,
cfg: cfg,
@@ -47,6 +49,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
stopChan: make(chan struct{}),
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -67,6 +70,9 @@ type LoaderService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -167,3 +173,8 @@ func (ldrs *LoaderService) GetLoaderS() *loaders.LoaderS {
func (ldrs *LoaderService) GetRPCChan() chan birpc.ClientConnector {
return ldrs.connChan
}
// StateChan returns signaling channel of specific state
func (ldrs *LoaderService) StateChan(stateID string) chan struct{} {
return ldrs.stateDeps.StateChan(stateID)
}

View File

@@ -33,12 +33,14 @@ import (
// NewRadiusAgent returns the Radius Agent
func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RadiusAgent{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -56,6 +58,9 @@ type RadiusAgent struct {
lnet string
lauth string
lacct string
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -136,3 +141,8 @@ func (rad *RadiusAgent) ServiceName() string {
func (rad *RadiusAgent) ShouldRun() bool {
return rad.cfg.RadiusAgentCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (rad *RadiusAgent) StateChan(stateID string) chan struct{} {
return rad.stateDeps.StateChan(stateID)
}

View File

@@ -37,7 +37,8 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, internalRankingSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RankingService{
connChan: internalRankingSChan,
cfg: cfg,
@@ -48,6 +49,7 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
connMgr: connMgr,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -67,6 +69,9 @@ type RankingService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -151,3 +156,8 @@ func (ran *RankingService) ServiceName() string {
func (ran *RankingService) ShouldRun() bool {
return ran.cfg.RankingSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (ran *RankingService) StateChan(stateID string) chan struct{} {
return ran.stateDeps.StateChan(stateID)
}

View File

@@ -36,7 +36,8 @@ func NewRateService(cfg *config.CGRConfig,
cacheS *CacheService, filterSChan chan *engine.FilterS,
dmS *DataDBService, clSChan chan *commonlisteners.CommonListenerS,
intConnChan chan birpc.ClientConnector, anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RateService{
cfg: cfg,
cacheS: cacheS,
@@ -47,6 +48,7 @@ func NewRateService(cfg *config.CGRConfig,
rldChan: make(chan struct{}),
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -68,6 +70,9 @@ type RateService struct {
intConnChan chan birpc.ClientConnector
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// ServiceName returns the service name
@@ -148,3 +153,8 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
rs.intConnChan <- anz.GetInternalCodec(srv, utils.RateS)
return
}
// StateChan returns signaling channel of specific state
func (rs *RateService) StateChan(stateID string) chan struct{} {
return rs.stateDeps.StateChan(stateID)
}

View File

@@ -31,11 +31,13 @@ import (
// NewRegistrarCService returns the Dispatcher Service
func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RegistrarCService{
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
cfg: cfg,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -50,6 +52,9 @@ type RegistrarCService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -102,3 +107,8 @@ func (dspS *RegistrarCService) ShouldRun() bool {
return len(dspS.cfg.RegistrarCCfg().RPC.RegistrarSConns) != 0 ||
len(dspS.cfg.RegistrarCCfg().Dispatchers.RegistrarSConns) != 0
}
// StateChan returns signaling channel of specific state
func (dspS *RegistrarCService) StateChan(stateID string) chan struct{} {
return dspS.stateDeps.StateChan(stateID)
}

View File

@@ -36,7 +36,8 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, internalResourceSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ResourceService{
connChan: internalResourceSChan,
cfg: cfg,
@@ -47,6 +48,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
connMgr: connMgr,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -67,6 +69,9 @@ type ResourceService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -147,3 +152,8 @@ func (reS *ResourceService) ServiceName() string {
func (reS *ResourceService) ShouldRun() bool {
return reS.cfg.ResourceSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (reS *ResourceService) StateChan(stateID string) chan struct{} {
return reS.stateDeps.StateChan(stateID)
}

View File

@@ -37,7 +37,8 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, internalRouteSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &RouteService{
connChan: internalRouteSChan,
cfg: cfg,
@@ -48,6 +49,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
connMgr: connMgr,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -68,6 +70,9 @@ type RouteService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -142,3 +147,8 @@ func (routeS *RouteService) ServiceName() string {
func (routeS *RouteService) ShouldRun() bool {
return routeS.cfg.RouteSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (routeS *RouteService) StateChan(stateID string) chan struct{} {
return routeS.stateDeps.StateChan(stateID)
}

View File

@@ -1,69 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"sync"
"github.com/cgrates/cgrates/servmanager"
)
// NewServiceIndexer constructs a ServiceIndexer
func NewServiceIndexer() *ServiceIndexer {
return &ServiceIndexer{srvS: make(map[string]servmanager.Service)}
}
// ServiceIndexer implements servmanager.Service indexing in a thread safe way
type ServiceIndexer struct {
mux sync.RWMutex
srvS map[string]servmanager.Service // servmanager.Services indexed by ID
}
// Getservmanager.Service returns one servmanager.Service or nil
func (sI *ServiceIndexer) GetService(srvID string) servmanager.Service {
sI.mux.RLock()
defer sI.mux.RUnlock()
return sI.srvS[srvID]
}
// Addservmanager.Service adds a servmanager.Service based on it's id to the index
func (sI *ServiceIndexer) AddService(srvID string, srv servmanager.Service) {
sI.mux.Lock()
sI.srvS[srvID] = srv
sI.mux.Unlock()
}
// Remservmanager.Service will remove a servmanager.Service based on it's ID
func (sI *ServiceIndexer) RemService(srvID string) {
sI.mux.Lock()
defer sI.mux.Unlock()
delete(sI.srvS, srvID)
}
// Getservmanager.Services returns the list of servmanager.Services indexed
func (sI *ServiceIndexer) GetServices() []servmanager.Service {
sI.mux.RLock()
defer sI.mux.RUnlock()
srvs := make([]servmanager.Service, 0, len(sI.srvS))
for _, s := range sI.srvS {
srvs = append(srvs, s)
}
return srvs
}

View File

@@ -38,7 +38,8 @@ import (
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &SessionService{
connChan: internalChan,
cfg: cfg,
@@ -48,6 +49,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha
connMgr: connMgr,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -69,6 +71,9 @@ type SessionService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -172,3 +177,8 @@ func (smg *SessionService) ServiceName() string {
func (smg *SessionService) ShouldRun() bool {
return smg.cfg.SessionSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (smg *SessionService) StateChan(stateID string) chan struct{} {
return smg.stateDeps.StateChan(stateID)
}

View File

@@ -33,12 +33,14 @@ import (
// NewSIPAgent returns the sip Agent
func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &SIPAgent{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -53,6 +55,9 @@ type SIPAgent struct {
srvDep map[string]*sync.WaitGroup
oldListen string
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -125,3 +130,8 @@ func (sip *SIPAgent) ServiceName() string {
func (sip *SIPAgent) ShouldRun() bool {
return sip.cfg.SIPAgentCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (sip *SIPAgent) StateChan(stateID string) chan struct{} {
return sip.stateDeps.StateChan(stateID)
}

View File

@@ -22,28 +22,25 @@ import (
"sync"
)
// newStateDependencies constructs a stateDependencies struct
func newStateDependencies() *stateDependencies {
return &stateDependencies{stateDeps: make(map[string]chan struct{})}
// NewStateDependencies constructs a StateDependencies struct
func NewStateDependencies(servStates []string) (stDeps *StateDependencies) {
stDeps = &StateDependencies{stateDeps: make(map[string]chan struct{})}
for _, stateID := range servStates {
stDeps.stateDeps[stateID] = make(chan struct{})
}
return
}
// stateDependencies enhances a service with state dependencies management
type stateDependencies struct {
// StateDependencies enhances a service with state dependencies management
type StateDependencies struct {
stateDeps map[string]chan struct{} // listeners for various states of the service
stateDepsMux sync.RWMutex // protects stateDeps
}
// RegisterStateDependency will be called by a service interested by specific stateID of the service
func (sDs *stateDependencies) RegisterStateDependency(stateID string) (retChan chan struct{}) {
func (sDs *StateDependencies) StateChan(stateID string) (retChan chan struct{}) {
sDs.stateDepsMux.RLock()
retChan = sDs.stateDeps[stateID]
sDs.stateDepsMux.RUnlock()
if retChan != nil {
return
}
sDs.stateDepsMux.Lock()
defer sDs.stateDepsMux.Unlock()
retChan = make(chan struct{})
sDs.stateDeps[stateID] = retChan
return
}

View File

@@ -36,7 +36,8 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, internalStatSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &StatService{
connChan: internalStatSChan,
cfg: cfg,
@@ -47,6 +48,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
connMgr: connMgr,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -67,6 +69,9 @@ type StatService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -149,3 +154,8 @@ func (sts *StatService) ServiceName() string {
func (sts *StatService) ShouldRun() bool {
return sts.cfg.StatSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (sts *StatService) StateChan(stateID string) chan struct{} {
return sts.stateDeps.StateChan(stateID)
}

View File

@@ -25,16 +25,19 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewStorDBService returns the StorDB Service
func NewStorDBService(cfg *config.CGRConfig, setVersions bool,
srvDep map[string]*sync.WaitGroup) *StorDBService {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) *StorDBService {
return &StorDBService{
cfg: cfg,
setVersions: setVersions,
srvDep: srvDep,
srvIndexer: srvIndexer,
}
}
@@ -49,6 +52,9 @@ type StorDBService struct {
setVersions bool
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -203,3 +209,8 @@ func (db *StorDBService) needsConnectionReload() bool {
db.oldDBCfg.Opts.PgSSLCertMode != db.cfg.StorDbCfg().Opts.PgSSLCertMode ||
db.oldDBCfg.Opts.PgSSLRootCert != db.cfg.StorDbCfg().Opts.PgSSLRootCert)
}
// StateChan returns signaling channel of specific state
func (db *StorDBService) StateChan(stateID string) chan struct{} {
return db.stateDeps.StateChan(stateID)
}

View File

@@ -36,7 +36,8 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
clSChan chan *commonlisteners.CommonListenerS, internalThresholdSChan chan birpc.ClientConnector,
anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
anzChan chan *AnalyzerService, srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &ThresholdService{
connChan: internalThresholdSChan,
cfg: cfg,
@@ -47,6 +48,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
connMgr: connMgr,
srvIndexer: srvIndexer,
}
}
@@ -67,6 +69,9 @@ type ThresholdService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -148,3 +153,8 @@ func (thrs *ThresholdService) ServiceName() string {
func (thrs *ThresholdService) ShouldRun() bool {
return thrs.cfg.ThresholdSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (thrs *ThresholdService) StateChan(stateID string) chan struct{} {
return thrs.stateDeps.StateChan(stateID)
}

View File

@@ -34,13 +34,15 @@ import (
// NewTPeService is the constructor for the TpeService
func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService,
clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) servmanager.Service {
clSChan chan *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &TPeService{
cfg: cfg,
srvDep: srvDep,
dm: dm,
connMgr: connMgr,
clSChan: clSChan,
cfg: cfg,
srvDep: srvDep,
dm: dm,
connMgr: connMgr,
clSChan: clSChan,
srvIndexer: srvIndexer,
}
}
@@ -59,6 +61,9 @@ type TPeService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
@@ -107,3 +112,8 @@ func (ts *TPeService) ServiceName() string {
func (ts *TPeService) ShouldRun() bool {
return ts.cfg.TpeSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (ts *TPeService) StateChan(stateID string) chan struct{} {
return ts.stateDeps.StateChan(stateID)
}

View File

@@ -36,7 +36,8 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
clSChan chan *commonlisteners.CommonListenerS, internalTrendSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anzChan chan *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup,
srvIndexer *servmanager.ServiceIndexer) servmanager.Service {
return &TrendService{
connChan: internalTrendSChan,
cfg: cfg,
@@ -47,6 +48,7 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
filterSChan: filterSChan,
srvIndexer: srvIndexer,
}
}
@@ -66,6 +68,9 @@ type TrendService struct {
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the sercive start
@@ -148,3 +153,8 @@ func (trs *TrendService) ServiceName() string {
func (trs *TrendService) ShouldRun() bool {
return trs.cfg.TrendSCfg().Enabled
}
// StateChan returns signaling channel of specific state
func (trs *TrendService) StateChan(stateID string) chan struct{} {
return trs.stateDeps.StateChan(stateID)
}

View File

@@ -16,52 +16,50 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
package servmanager
import (
"sync"
"github.com/cgrates/cgrates/servmanager"
)
// NewStateIndexer constructs a StateIndexer
func NewStateIndexer() *StateIndexer {
return &StateIndexer{srvS: make(map[string]servmanager.Service)}
// NewServiceIndexer constructs a ServiceIndexer
func NewServiceIndexer() *ServiceIndexer {
return &ServiceIndexer{srvS: make(map[string]Service)}
}
// StateIndexer implements service indexing in a thread safe way
type StateIndexer struct {
// ServiceIndexer implements service indexing in a thread safe way
type ServiceIndexer struct {
mux sync.RWMutex
srvS map[string]servmanager.Service // services indexed by ID
srvS map[string]Service // services indexed by ID
}
// GetService returns one service or nil
func (sI *StateIndexer) GetService(srvID string) servmanager.Service {
func (sI *ServiceIndexer) GetService(srvID string) Service {
sI.mux.RLock()
defer sI.mux.RUnlock()
return sI.srvS[srvID]
}
// AddService adds a service based on it's id to the index
func (sI *StateIndexer) AddService(srvID string, srv servmanager.Service) {
func (sI *ServiceIndexer) AddService(srvID string, srv Service) {
sI.mux.Lock()
sI.srvS[srvID] = srv
sI.mux.Unlock()
}
// RemService will remove a service based on it's ID
func (sI *StateIndexer) RemService(srvID string) {
// RemoveService will remove a service based on it's ID
func (sI *ServiceIndexer) RemoveService(srvID string) {
sI.mux.Lock()
defer sI.mux.Unlock()
delete(sI.srvS, srvID)
}
// GetServices returns the list of services indexed
func (sI *StateIndexer) GetServices() []servmanager.Service {
func (sI *ServiceIndexer) GetServices() []Service {
sI.mux.RLock()
defer sI.mux.RUnlock()
srvs := make([]servmanager.Service, 0, len(sI.srvS))
srvs := make([]Service, 0, len(sI.srvS))
for _, s := range sI.srvS {
srvs = append(srvs, s)
}

View File

@@ -30,7 +30,7 @@ import (
// NewServiceManager returns a service manager
func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager,
cfg *config.CGRConfig, services []Service) (sM *ServiceManager) {
cfg *config.CGRConfig, srvIndxr *ServiceIndexer, services []Service) (sM *ServiceManager) {
sM = &ServiceManager{
cfg: cfg,
subsystems: make(map[string]Service),
@@ -48,6 +48,8 @@ type ServiceManager struct {
cfg *config.CGRConfig
subsystems map[string]Service // active subsystems managed by SM
serviceIndexer *ServiceIndexer // index here the services for accessing them by their IDs
shdWg *sync.WaitGroup // list of shutdown items
connMgr *engine.ConnManager
rldChan <-chan string // reload signals come over this channelc
@@ -68,6 +70,18 @@ func (srvMngr *ServiceManager) StartServices(ctx *context.Context, shtDwn contex
}(service)
}
}
for _, service := range srvMngr.serviceIndexer.GetServices() {
if service.ShouldRun() && !service.IsRunning() {
srvMngr.shdWg.Add(1)
go func(srv Service) {
if err := srv.Start(ctx, shtDwn); err != nil &&
err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine
utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, srv.ServiceName(), err))
shtDwn()
}
}(service)
}
}
// startServer()
}
@@ -165,6 +179,8 @@ type Service interface {
ShouldRun() bool
// ServiceName returns the service name
ServiceName() string
// StateChan returns the channel for specific state subscription
StateChan(stateID string) chan struct{}
}
// ArgsServiceID are passed to Start/Stop/Status RPC methods

View File

@@ -2828,6 +2828,10 @@ const (
ToNearestTowardZero = "*toNearestTowardZero"
)
const (
StateServiceUP = "SERVICE_UP"
)
func buildCacheInstRevPrefixes() {
CachePrefixToInstance = make(map[string]string)
for k, v := range CacheInstanceToPrefix {

View File

@@ -249,3 +249,8 @@ func ErrPathNotReachable(path string) error {
func NewSTIRError(reason string) error {
return fmt.Errorf("%s: %s", MetaSTIRAuthenticate, reason)
}
// NewServiceTimeoutError is called when waiting
func NewServiceStateTimeoutError(sourceID, srvID, stateID string) error {
return fmt.Errorf("%s: service <%s> state <%s> timeout", sourceID, srvID, stateID)
}