diff --git a/services/accounts.go b/services/accounts.go index 3c1ced45e..eae646159 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -37,7 +37,7 @@ import ( // NewAccountService returns the Account Service func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - connMgr *engine.ConnManager, server *commonlisteners.CommonListenerS, + connMgr *engine.ConnManager, cls *CommonListenerService, internalChan chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &AccountService{ @@ -47,7 +47,7 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, cacheS: cacheS, filterSChan: filterSChan, connMgr: connMgr, - server: server, + cls: cls, anz: anz, srvDep: srvDep, rldChan: make(chan struct{}, 1), @@ -57,19 +57,21 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, // AccountService implements Service interface type AccountService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService cacheS *CacheService + anz *AnalyzerService filterSChan chan *engine.FilterS - connMgr *engine.ConnManager - server *commonlisteners.CommonListenerS + + acts *accounts.AccountS + cl *commonlisteners.CommonListenerS rldChan chan struct{} stopChan chan struct{} - - acts *accounts.AccountS connChan chan birpc.ClientConnector // publish the internal Subsystem when available - anz *AnalyzerService + connMgr *engine.ConnManager + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -79,6 +81,10 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e return utils.ErrServiceAlreadyRunning } + acts.cl, err = acts.cls.WaitForCLS(ctx) + if err != nil { + return err + } if err = acts.cacheS.WaitToPrecache(ctx, utils.CacheAccounts, utils.CacheAccountsFilterIndexes); err != nil { @@ -110,7 +116,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e } if !acts.cfg.DispatcherSCfg().Enabled { - acts.server.RpcRegister(srv) + acts.cl.RpcRegister(srv) } acts.connChan <- acts.anz.GetInternalCodec(srv, utils.AccountS) @@ -131,7 +137,7 @@ func (acts *AccountService) Shutdown() (err error) { acts.acts = nil <-acts.connChan acts.Unlock() - acts.server.RpcUnregisterName(utils.AccountSv1) + acts.cl.RpcUnregisterName(utils.AccountSv1) return } @@ -139,7 +145,7 @@ func (acts *AccountService) Shutdown() (err error) { func (acts *AccountService) IsRunning() bool { acts.RLock() defer acts.RUnlock() - return acts != nil && acts.acts != nil + return acts.acts != nil } // ServiceName returns the service name diff --git a/services/accounts_test.go b/services/accounts_test.go index d8645b7c5..461929114 100644 --- a/services/accounts_test.go +++ b/services/accounts_test.go @@ -57,7 +57,7 @@ func TestAccountSCoverage(t *testing.T) { dm: db, cacheS: chS, filterSChan: filterSChan, - server: cls, + cls: cls, rldChan: testChan, stopChan: make(chan struct{}, 1), connChan: actRPC, diff --git a/services/actions.go b/services/actions.go index 7e690c39a..691f23dbc 100644 --- a/services/actions.go +++ b/services/actions.go @@ -38,7 +38,7 @@ import ( func NewActionService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - server *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, + cls *CommonListenerService, internalChan chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &ActionService{ connChan: internalChan, @@ -47,7 +47,7 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - server: server, + cls: cls, anz: anz, srvDep: srvDep, rldChan: make(chan struct{}, 1), @@ -57,19 +57,22 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService, // ActionService implements Service interface type ActionService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService + anz *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS - connMgr *engine.ConnManager - server *commonlisteners.CommonListenerS + + acts *actions.ActionS + cl *commonlisteners.CommonListenerS rldChan chan struct{} stopChan chan struct{} - acts *actions.ActionS connChan chan birpc.ClientConnector // publish the internal Subsystem when available - anz *AnalyzerService + connMgr *engine.ConnManager + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -79,6 +82,9 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er return utils.ErrServiceAlreadyRunning } + if acts.cl, err = acts.cls.WaitForCLS(ctx); err != nil { + return err + } if err = acts.cacheS.WaitToPrecache(ctx, utils.CacheActionProfiles, utils.CacheActionProfilesFilterIndexes); err != nil { @@ -109,7 +115,7 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er } // srv, _ := birpc.NewService(apis.NewActionSv1(acts.acts), "", false) if !acts.cfg.DispatcherSCfg().Enabled { - acts.server.RpcRegister(srv) + acts.cl.RpcRegister(srv) } acts.connChan <- acts.anz.GetInternalCodec(srv, utils.ActionS) return @@ -129,7 +135,7 @@ func (acts *ActionService) Shutdown() (err error) { acts.acts.Shutdown() acts.acts = nil <-acts.connChan - acts.server.RpcUnregisterName(utils.ActionSv1) + acts.cl.RpcUnregisterName(utils.ActionSv1) return } @@ -137,7 +143,7 @@ func (acts *ActionService) Shutdown() (err error) { func (acts *ActionService) IsRunning() bool { acts.RLock() defer acts.RUnlock() - return acts != nil && acts.acts != nil + return acts.acts != nil } // ServiceName returns the service name diff --git a/services/actions_test.go b/services/actions_test.go index 4b4e795f3..8f98b81a9 100644 --- a/services/actions_test.go +++ b/services/actions_test.go @@ -57,7 +57,7 @@ func TestActionSCoverage(t *testing.T) { dm: db, cacheS: chS, filterSChan: filterSChan, - server: cls, + cls: cls, rldChan: testChan, stopChan: make(chan struct{}, 1), connChan: actRPC, diff --git a/services/adminsv1.go b/services/adminsv1.go index 340cf4be1..dc2e95f6c 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -34,7 +34,7 @@ import ( // NewAPIerSv1Service returns the APIerSv1 Service func NewAdminSv1Service(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, - filterSChan chan *engine.FilterS, server *commonlisteners.CommonListenerS, + filterSChan chan *engine.FilterS, cls *CommonListenerService, internalAPIerSv1Chan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { @@ -44,30 +44,31 @@ func NewAdminSv1Service(cfg *config.CGRConfig, dm: dm, storDB: storDB, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, anz: anz, srvDep: srvDep, } } -// APIerSv1Service implements Service interface +// AdminSv1Service implements Service interface type AdminSv1Service struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService storDB *StorDBService + anz *AnalyzerService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - api *apis.AdminSv1 - connChan chan birpc.ClientConnector + api *apis.AdminSv1 + cl *commonlisteners.CommonListenerS stopChan chan struct{} - - anz *AnalyzerService - srvDep map[string]*sync.WaitGroup + connChan chan birpc.ClientConnector + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup } // Start should handle the sercive start @@ -77,6 +78,9 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF return utils.ErrServiceAlreadyRunning } + if apiService.cl, err = apiService.cls.WaitForCLS(ctx); err != nil { + return err + } var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil { return @@ -105,11 +109,11 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF if !apiService.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - apiService.server.RpcRegister(s) + apiService.cl.RpcRegister(s) } rpl, _ := engine.NewService(apis.NewReplicatorSv1(datadb, apiService.api)) for _, s := range rpl { - apiService.server.RpcRegister(s) + apiService.cl.RpcRegister(s) } } @@ -130,7 +134,7 @@ func (apiService *AdminSv1Service) Shutdown() (err error) { // close(apiService.stopChan) apiService.api = nil <-apiService.connChan - apiService.server.RpcUnregisterName(utils.AdminSv1) + apiService.cl.RpcUnregisterName(utils.AdminSv1) apiService.Unlock() return } diff --git a/services/analyzers.go b/services/analyzers.go index 43e96a3a4..27250397f 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -32,14 +32,14 @@ import ( ) // NewAnalyzerService returns the Analyzer Service -func NewAnalyzerService(cfg *config.CGRConfig, server *commonlisteners.CommonListenerS, +func NewAnalyzerService(cfg *config.CGRConfig, clSrv *CommonListenerService, filterSChan chan *engine.FilterS, internalAnalyzerSChan chan birpc.ClientConnector, srvDep map[string]*sync.WaitGroup) *AnalyzerService { return &AnalyzerService{ connChan: internalAnalyzerSChan, cfg: cfg, - server: server, + cls: clSrv, filterSChan: filterSChan, srvDep: srvDep, } @@ -48,17 +48,18 @@ func NewAnalyzerService(cfg *config.CGRConfig, server *commonlisteners.CommonLis // AnalyzerService implements Service interface type AnalyzerService struct { sync.RWMutex - cfg *config.CGRConfig - server *commonlisteners.CommonListenerS + + cls *CommonListenerService filterSChan chan *engine.FilterS - ctx *context.Context - cancelFunc context.CancelFunc - anz *analyzers.AnalyzerS - started chan struct{} + anz *analyzers.AnalyzerS + cl *commonlisteners.CommonListenerS - connChan chan birpc.ClientConnector - srvDep map[string]*sync.WaitGroup + started chan struct{} + cancelFunc context.CancelFunc + connChan chan birpc.ClientConnector + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup } // Start should handle the sercive start @@ -67,6 +68,10 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun return utils.ErrServiceAlreadyRunning } + if anz.cl, err = anz.cls.WaitForCLS(ctx); err != nil { + return + } + anz.Lock() defer anz.Unlock() anz.started = make(chan struct{}) @@ -75,14 +80,15 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun return } close(anz.started) - anz.ctx, anz.cancelFunc = context.WithCancel(ctx) + anzCtx, cancel := context.WithCancel(ctx) + anz.cancelFunc = cancel go func(a *analyzers.AnalyzerS) { - if err := a.ListenAndServe(anz.ctx); err != nil { + if err := a.ListenAndServe(anzCtx); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AnalyzerS, err.Error())) shtDwn() } }(anz.anz) - anz.server.SetAnalyzer(anz.anz) + anz.cl.SetAnalyzer(anz.anz) go anz.start(ctx) return } @@ -104,7 +110,7 @@ func (anz *AnalyzerService) start(ctx *context.Context) { // srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false) if !anz.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - anz.server.RpcRegister(s) + anz.cl.RpcRegister(s) } } anz.Unlock() @@ -120,7 +126,7 @@ func (anz *AnalyzerService) Reload(*context.Context, context.CancelFunc) (err er func (anz *AnalyzerService) Shutdown() (err error) { anz.Lock() anz.cancelFunc() - anz.server.SetAnalyzer(nil) + anz.cl.SetAnalyzer(nil) anz.anz.Shutdown() anz.anz = nil @@ -131,7 +137,7 @@ func (anz *AnalyzerService) Shutdown() (err error) { anz.started = nil <-anz.connChan anz.Unlock() - anz.server.RpcUnregisterName(utils.AnalyzerSv1) + anz.cl.RpcUnregisterName(utils.AnalyzerSv1) return } diff --git a/services/analyzers_test.go b/services/analyzers_test.go index 481e77c3b..f588c3932 100644 --- a/services/analyzers_test.go +++ b/services/analyzers_test.go @@ -46,7 +46,7 @@ func TestAnalyzerCoverage(t *testing.T) { anz2 := &AnalyzerService{ RWMutex: sync.RWMutex{}, cfg: cfg, - server: cls, + cls: cls, filterSChan: filterSChan, connChan: connChan, srvDep: srvDep, diff --git a/services/asteriskagent.go b/services/asteriskagent.go index c01eb7d64..097551ef0 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -101,7 +101,7 @@ func (ast *AsteriskAgent) shutdown() { func (ast *AsteriskAgent) IsRunning() bool { ast.RLock() defer ast.RUnlock() - return ast != nil && ast.smas != nil + return ast.smas != nil } // ServiceName returns the service name diff --git a/services/attributes.go b/services/attributes.go index 8be5153e1..898048c9a 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -35,7 +35,7 @@ import ( // NewAttributeService returns the Attribute Service func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - server *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, + cls *CommonListenerService, internalChan chan birpc.ClientConnector, anz *AnalyzerService, dspS *DispatcherService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &AttributeService{ @@ -44,7 +44,7 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - server: server, + cls: cls, anz: anz, srvDep: srvDep, dspS: dspS, @@ -54,17 +54,20 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, // AttributeService implements Service interface type AttributeService struct { sync.RWMutex - cfg *config.CGRConfig - dm *DataDBService - cacheS *CacheService - filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - attrS *engine.AttributeS - rpc *apis.AttributeSv1 // useful on restart + cls *CommonListenerService + dm *DataDBService + anz *AnalyzerService + cacheS *CacheService + dspS *DispatcherService + filterSChan chan *engine.FilterS + + attrS *engine.AttributeS + cl *commonlisteners.CommonListenerS + rpc *apis.AttributeSv1 // useful on restart + connChan chan birpc.ClientConnector // publish the internal Subsystem when available - anz *AnalyzerService - dspS *DispatcherService + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -74,6 +77,9 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) return utils.ErrServiceAlreadyRunning } + if attrS.cl, err = attrS.cls.WaitForCLS(ctx); err != nil { + return err + } if err = attrS.cacheS.WaitToPrecache(ctx, utils.CacheAttributeProfiles, utils.CacheAttributeFilterIndexes); err != nil { @@ -100,7 +106,7 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) // srv, _ := birpc.NewService(attrS.rpc, "", false) if !attrS.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - attrS.server.RpcRegister(s) + attrS.cl.RpcRegister(s) } } dspShtdChan := attrS.dspS.RegisterShutdownChan(attrS.ServiceName()) @@ -110,7 +116,7 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) return } if attrS.IsRunning() { - attrS.server.RpcRegister(srv) + attrS.cl.RpcRegister(srv) } } @@ -131,7 +137,7 @@ func (attrS *AttributeService) Shutdown() (err error) { attrS.attrS = nil attrS.rpc = nil <-attrS.connChan - attrS.server.RpcUnregisterName(utils.AttributeSv1) + attrS.cl.RpcUnregisterName(utils.AttributeSv1) attrS.dspS.UnregisterShutdownChan(attrS.ServiceName()) attrS.Unlock() return diff --git a/services/attributes_test.go b/services/attributes_test.go index c04e0fa8e..dc23fdd6b 100644 --- a/services/attributes_test.go +++ b/services/attributes_test.go @@ -51,7 +51,7 @@ func TestAttributeSCoverage(t *testing.T) { dm: db, cacheS: chS, filterSChan: filterSChan, - server: cls, + cls: cls, anz: anz, srvDep: srvDep, dspS: &DispatcherService{srvsReload: map[string]chan struct{}{}}, diff --git a/services/caches.go b/services/caches.go index 8574345b0..0fc1a7a8e 100644 --- a/services/caches.go +++ b/services/caches.go @@ -32,7 +32,7 @@ import ( // NewCacheService . func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager, - server *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, + cls *CommonListenerService, internalChan chan birpc.ClientConnector, anz *AnalyzerService, // dspS *DispatcherService, cores *CoreService, srvDep map[string]*sync.WaitGroup) *CacheService { @@ -41,7 +41,7 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C srvDep: srvDep, anz: anz, cores: cores, - server: server, + cls: cls, dm: dm, connMgr: connMgr, rpc: internalChan, @@ -51,20 +51,26 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C // CacheService implements Agent interface type CacheService struct { - cfg *config.CGRConfig - anz *AnalyzerService - cores *CoreService - server *commonlisteners.CommonListenerS - dm *DataDBService - connMgr *engine.ConnManager - rpc chan birpc.ClientConnector - srvDep map[string]*sync.WaitGroup + anz *AnalyzerService + cores *CoreService + cls *CommonListenerService + dm *DataDBService + + cl *commonlisteners.CommonListenerS cacheCh chan *engine.CacheS + rpc chan birpc.ClientConnector + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup } // Start should handle the sercive start func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (err error) { + + if cS.cl, err = cS.cls.WaitForCLS(ctx); err != nil { + return err + } var dm *engine.DataManager if dm, err = cS.dm.WaitForDM(ctx); err != nil { return @@ -85,7 +91,7 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e // srv, _ := birpc.NewService(apis.NewCacheSv1(engine.Cache), "", false) if !cS.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - cS.server.RpcRegister(s) + cS.cl.RpcRegister(s) } } cS.rpc <- cS.anz.GetInternalCodec(srv, utils.CacheS) @@ -99,7 +105,7 @@ func (cS *CacheService) Reload(*context.Context, context.CancelFunc) (_ error) { // Shutdown stops the service func (cS *CacheService) Shutdown() (_ error) { - cS.server.RpcUnregisterName(utils.CacheSv1) + cS.cl.RpcUnregisterName(utils.CacheSv1) return } diff --git a/services/cdrs.go b/services/cdrs.go index a253241c1..4fe7cd2e7 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -36,7 +36,7 @@ import ( // NewCDRServer returns the CDR Server func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, filterSChan chan *engine.FilterS, - server *commonlisteners.CommonListenerS, internalCDRServerChan chan birpc.ClientConnector, + cls *CommonListenerService, internalCDRServerChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &CDRService{ @@ -45,7 +45,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, dm: dm, storDB: storDB, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -55,93 +55,97 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, // CDRService implements Service interface type CDRService struct { sync.RWMutex - cfg *config.CGRConfig - dm *DataDBService - storDB *StorDBService + cls *CommonListenerService + dm *DataDBService + storDB *StorDBService + anz *AnalyzerService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - cdrS *cdrs.CDRServer + cdrS *cdrs.CDRServer + cl *commonlisteners.CommonListenerS + connChan chan birpc.ClientConnector - connMgr *engine.ConnManager - stopChan chan struct{} - anz *AnalyzerService + connMgr *engine.ConnManager + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } // Start should handle the sercive start -func (cdrSrv *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { - if cdrSrv.IsRunning() { +func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { + if cs.IsRunning() { return utils.ErrServiceAlreadyRunning } utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs)) + if cs.cl, err = cs.cls.WaitForCLS(ctx); err != nil { + return err + } var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, cdrSrv.filterSChan); err != nil { + if filterS, err = waitForFilterS(ctx, cs.filterSChan); err != nil { return } var datadb *engine.DataManager - if datadb, err = cdrSrv.dm.WaitForDM(ctx); err != nil { + if datadb, err = cs.dm.WaitForDM(ctx); err != nil { return } - if err = cdrSrv.anz.WaitForAnalyzerS(ctx); err != nil { + if err = cs.anz.WaitForAnalyzerS(ctx); err != nil { return } storDBChan := make(chan engine.StorDB, 1) - cdrSrv.stopChan = make(chan struct{}) - cdrSrv.storDB.RegisterSyncChan(storDBChan) + cs.stopChan = make(chan struct{}) + cs.storDB.RegisterSyncChan(storDBChan) - cdrSrv.Lock() - defer cdrSrv.Unlock() + cs.Lock() + defer cs.Unlock() - cdrSrv.cdrS = cdrs.NewCDRServer(cdrSrv.cfg, datadb, filterS, cdrSrv.connMgr, storDBChan) - go cdrSrv.cdrS.ListenAndServe(cdrSrv.stopChan) + cs.cdrS = cdrs.NewCDRServer(cs.cfg, datadb, filterS, cs.connMgr, storDBChan) + go cs.cdrS.ListenAndServe(cs.stopChan) runtime.Gosched() utils.Logger.Info("Registering CDRS RPC service.") - srv, err := engine.NewServiceWithPing(cdrSrv.cdrS, utils.CDRsV1, utils.V1Prfx) + srv, err := engine.NewServiceWithPing(cs.cdrS, utils.CDRsV1, utils.V1Prfx) if err != nil { return err } - if !cdrSrv.cfg.DispatcherSCfg().Enabled { - cdrSrv.server.RpcRegister(srv) + if !cs.cfg.DispatcherSCfg().Enabled { + cs.cl.RpcRegister(srv) } - cdrSrv.connChan <- cdrSrv.anz.GetInternalCodec(srv, utils.CDRServer) // Signal that cdrS is operational + cs.connChan <- cs.anz.GetInternalCodec(srv, utils.CDRServer) // Signal that cdrS is operational return } // Reload handles the change of config -func (cdrService *CDRService) Reload(*context.Context, context.CancelFunc) (err error) { +func (cs *CDRService) Reload(*context.Context, context.CancelFunc) (err error) { return } // Shutdown stops the service -func (cdrService *CDRService) Shutdown() (err error) { - cdrService.Lock() - close(cdrService.stopChan) - cdrService.cdrS = nil - <-cdrService.connChan - cdrService.Unlock() - cdrService.server.RpcUnregisterName(utils.CDRsV1) +func (cs *CDRService) Shutdown() (err error) { + cs.Lock() + close(cs.stopChan) + cs.cdrS = nil + <-cs.connChan + cs.Unlock() + cs.cl.RpcUnregisterName(utils.CDRsV1) return } // IsRunning returns if the service is running -func (cdrService *CDRService) IsRunning() bool { - cdrService.RLock() - defer cdrService.RUnlock() - return cdrService != nil && cdrService.cdrS != nil +func (cs *CDRService) IsRunning() bool { + cs.RLock() + defer cs.RUnlock() + return cs.cdrS != nil } // ServiceName returns the service name -func (cdrService *CDRService) ServiceName() string { +func (cs *CDRService) ServiceName() string { return utils.CDRServer } // ShouldRun returns if the service should be running -func (cdrService *CDRService) ShouldRun() bool { - return cdrService.cfg.CdrsCfg().Enabled +func (cs *CDRService) ShouldRun() bool { + return cs.cfg.CdrsCfg().Enabled } diff --git a/services/cdrs_test.go b/services/cdrs_test.go index 4ac6d0dd7..a93401831 100644 --- a/services/cdrs_test.go +++ b/services/cdrs_test.go @@ -54,7 +54,7 @@ func TestCdrsCoverage(t *testing.T) { dm: db, storDB: stordb, filterSChan: filterSChan, - server: cls, + cls: cls, connChan: make(chan birpc.ClientConnector, 1), connMgr: nil, stopChan: make(chan struct{}, 1), diff --git a/services/cgr-engine.go b/services/cgr-engine.go index be744b649..ca0266d27 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -29,12 +29,10 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/efs" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/registrarc" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" @@ -50,7 +48,6 @@ func NewCGREngine(cfg *config.CGRConfig) *CGREngine { caps: caps, // caps is used to limit RPC CPS shdWg: shdWg, // wait for shutdown srvManager: servmanager.NewServiceManager(shdWg, cM, cfg), - cls: commonlisteners.NewCommonListenerS(caps), srvDep: map[string]*sync.WaitGroup{ utils.AccountS: new(sync.WaitGroup), utils.ActionS: new(sync.WaitGroup), @@ -98,7 +95,6 @@ type CGREngine struct { srvDep map[string]*sync.WaitGroup shdWg *sync.WaitGroup cM *engine.ConnManager - cls *commonlisteners.CommonListenerS caps *engine.Caps cpuPrfF *os.File @@ -107,6 +103,7 @@ type CGREngine struct { gvS *GlobalVarS dmS *DataDBService sdbS *StorDBService + cls *CommonListenerService anzS *AnalyzerService coreS *CoreService cacheS *CacheService @@ -133,13 +130,6 @@ func (cgr *CGREngine) AddService(service servmanager.Service, connName, apiPrefi } func (cgr *CGREngine) InitServices(setVersions bool) { - if len(cgr.cfg.HTTPCfg().RegistrarSURL) != 0 { - cgr.cls.RegisterHTTPFunc(cgr.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar) - } - if cgr.cfg.ConfigSCfg().Enabled { - cgr.cls.RegisterHTTPFunc(cgr.cfg.ConfigSCfg().URL, config.HandlerConfigS) - } - // init the channel here because we need to pass them to connManager cgr.iServeManagerCh = make(chan birpc.ClientConnector, 1) cgr.iConfigCh = make(chan birpc.ClientConnector, 1) @@ -201,14 +191,15 @@ func (cgr *CGREngine) InitServices(setVersions bool) { cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep) cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, setVersions, cgr.srvDep) cgr.sdbS = NewStorDBService(cgr.cfg, setVersions, cgr.srvDep) + cgr.cls = NewCommonListenerService(cgr.cfg, cgr.caps, cgr.srvDep) cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.cls, - cgr.iFilterSCh, iAnalyzerSCh, cgr.srvDep) // init AnalyzerS + cgr.iFilterSCh, iAnalyzerSCh, cgr.srvDep) - cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.cls, iCoreSv1Ch, cgr.anzS, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep) // init CoreSv1 + cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.cls, iCoreSv1Ch, cgr.anzS, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep) cgr.cacheS = NewCacheService(cgr.cfg, cgr.dmS, cgr.cM, cgr.cls, iCacheSCh, cgr.anzS, cgr.coreS, - cgr.srvDep) // init CacheS + cgr.srvDep) dspS := NewDispatcherService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cls, cgr.iDispatcherSCh, cgr.cM, @@ -219,7 +210,7 @@ func (cgr *CGREngine) InitServices(setVersions bool) { cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.cls, cgr.srvDep) - cgr.srvManager.AddServices(cgr.gvS, cgr.coreS, cgr.cacheS, + cgr.srvManager.AddServices(cgr.gvS, cgr.cls, cgr.coreS, cgr.cacheS, cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.sdbS, cgr.efs, NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls, iAdminSCh, cgr.cM, cgr.anzS, cgr.srvDep), @@ -257,7 +248,7 @@ func (cgr *CGREngine) InitServices(setVersions bool) { NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls, iCDRServerCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewRegistrarCService(cgr.cfg, cgr.cls, cgr.cM, cgr.anzS, cgr.srvDep), + NewRegistrarCService(cgr.cfg, cgr.cM, cgr.anzS, cgr.srvDep), NewRateService(cgr.cfg, cgr.cacheS, cgr.iFilterSCh, cgr.dmS, cgr.cls, iRateSCh, cgr.anzS, cgr.srvDep), @@ -279,6 +270,13 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu cgr.shdWg.Done() return } + if cgr.cls.ShouldRun() { + cgr.shdWg.Add(1) + if err = cgr.cls.Start(ctx, shtDw); err != nil { + cgr.shdWg.Done() + return + } + } if cgr.efs.ShouldRun() { // efs checking first because of loggers cgr.shdWg.Add(1) if err = cgr.efs.Start(ctx, shtDw); err != nil { @@ -323,9 +321,9 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu go cgrStartFilterService(ctx, cgr.iFilterSCh, cgr.cacheS.GetCacheSChan(), cgr.cM, cgr.cfg, cgr.dmS) - cgrInitServiceManagerV1(cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.cls, cgr.anzS) - cgrInitGuardianSv1(cgr.iGuardianSCh, cgr.cfg, cgr.cls, cgr.anzS) // init GuardianSv1 - cgrInitConfigSv1(cgr.iConfigCh, cgr.cfg, cgr.cls, cgr.anzS) + cgrInitServiceManagerV1(ctx, cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.cls, cgr.anzS) + cgrInitGuardianSv1(ctx, cgr.iGuardianSCh, cgr.cfg, cgr.cls, cgr.anzS) + cgrInitConfigSv1(ctx, cgr.iConfigCh, cgr.cfg, cgr.cls, cgr.anzS) if preload != utils.EmptyString { if err = cgrRunPreload(ctx, cgr.cfg, preload, cgr.ldrs); err != nil { diff --git a/services/chargers.go b/services/chargers.go index 132b94e52..3f363547c 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -34,7 +34,7 @@ import ( // NewChargerService returns the Charger Service func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, - cacheS *CacheService, filterSChan chan *engine.FilterS, server *commonlisteners.CommonListenerS, + cacheS *CacheService, filterSChan chan *engine.FilterS, cls *CommonListenerService, internalChargerSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &ChargerService{ @@ -43,7 +43,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -53,16 +53,19 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, // ChargerService implements Service interface type ChargerService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService cacheS *CacheService + anz *AnalyzerService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - chrS *engine.ChargerS + chrS *engine.ChargerS + cl *commonlisteners.CommonListenerS + connChan chan birpc.ClientConnector - anz *AnalyzerService + connMgr *engine.ConnManager + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -72,6 +75,9 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e return utils.ErrServiceAlreadyRunning } + if chrS.cl, err = chrS.cls.WaitForCLS(ctx); err != nil { + return err + } if err = chrS.cacheS.WaitToPrecache(ctx, utils.CacheChargerProfiles, utils.CacheChargerFilterIndexes); err != nil { @@ -97,7 +103,7 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e // srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false) if !chrS.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - chrS.server.RpcRegister(s) + chrS.cl.RpcRegister(s) } } chrS.connChan <- chrS.anz.GetInternalCodec(srv, utils.ChargerS) @@ -116,7 +122,7 @@ func (chrS *ChargerService) Shutdown() (err error) { chrS.chrS.Shutdown() chrS.chrS = nil <-chrS.connChan - chrS.server.RpcUnregisterName(utils.ChargerSv1) + chrS.cl.RpcUnregisterName(utils.ChargerSv1) return } @@ -124,7 +130,7 @@ func (chrS *ChargerService) Shutdown() (err error) { func (chrS *ChargerService) IsRunning() bool { chrS.RLock() defer chrS.RUnlock() - return chrS != nil && chrS.chrS != nil + return chrS.chrS != nil } // ServiceName returns the service name diff --git a/services/chargers_test.go b/services/chargers_test.go index 77a8aa3dc..1f875e76c 100644 --- a/services/chargers_test.go +++ b/services/chargers_test.go @@ -53,7 +53,7 @@ func TestChargerSCoverage(t *testing.T) { dm: db, cacheS: chS, filterSChan: filterSChan, - server: cls, + cls: cls, connMgr: nil, anz: anz, srvDep: srvDep, diff --git a/services/commonlisteners.go b/services/commonlisteners.go new file mode 100644 index 000000000..21349ddf2 --- /dev/null +++ b/services/commonlisteners.go @@ -0,0 +1,116 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "sync" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/commonlisteners" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/registrarc" + "github.com/cgrates/cgrates/utils" +) + +// NewCommonListenerService instantiates a new CommonListenerService. +func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) *CommonListenerService { + return &CommonListenerService{ + cfg: cfg, + caps: caps, + clsCh: make(chan *commonlisteners.CommonListenerS, 1), + srvDep: srvDep, + } +} + +// CommonListenerService implements Service interface. +type CommonListenerService struct { + mu sync.RWMutex + + cls *commonlisteners.CommonListenerS + + clsCh chan *commonlisteners.CommonListenerS + caps *engine.Caps + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup +} + +// Start handles the service start. +func (cl *CommonListenerService) Start(*context.Context, context.CancelFunc) error { + if cl.IsRunning() { + return utils.ErrServiceAlreadyRunning + } + cl.mu.Lock() + defer cl.mu.Unlock() + cl.cls = commonlisteners.NewCommonListenerS(cl.caps) + cl.clsCh <- cl.cls + if len(cl.cfg.HTTPCfg().RegistrarSURL) != 0 { + cl.cls.RegisterHTTPFunc(cl.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar) + } + if cl.cfg.ConfigSCfg().Enabled { + cl.cls.RegisterHTTPFunc(cl.cfg.ConfigSCfg().URL, config.HandlerConfigS) + } + return nil +} + +// Reload handles the config changes. +func (cl *CommonListenerService) Reload(*context.Context, context.CancelFunc) error { + return nil +} + +// Shutdown stops the service. +func (cl *CommonListenerService) Shutdown() error { + cl.mu.Lock() + defer cl.mu.Unlock() + cl.cls = nil + <-cl.clsCh + return nil +} + +// IsRunning returns whether the service is running or not. +func (cl *CommonListenerService) IsRunning() bool { + cl.mu.RLock() + defer cl.mu.RUnlock() + return cl.cls != nil +} + +// ServiceName returns the service name +func (cl *CommonListenerService) ServiceName() string { + return utils.CommonListenerS +} + +// ShouldRun returns if the service should be running. +func (cl *CommonListenerService) ShouldRun() bool { + return true +} + +// WaitForCLS waits for the CommonListenerS structure to be initialized. +func (cl *CommonListenerService) WaitForCLS(ctx *context.Context) (*commonlisteners.CommonListenerS, error) { + cl.mu.RLock() + clsCh := cl.clsCh + cl.mu.RUnlock() + var cls *commonlisteners.CommonListenerS + select { + case <-ctx.Done(): + return nil, ctx.Err() + case cls = <-clsCh: + clsCh <- cls + } + return cls, nil +} diff --git a/services/cores.go b/services/cores.go index ab4180bd8..b0d263ec0 100644 --- a/services/cores.go +++ b/services/cores.go @@ -33,7 +33,7 @@ import ( ) // NewCoreService returns the Core Service -func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *commonlisteners.CommonListenerS, +func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, cls *CommonListenerService, internalCoreSChan chan birpc.ClientConnector, anz *AnalyzerService, fileCPU *os.File, shdWg *sync.WaitGroup, srvDep map[string]*sync.WaitGroup) *CoreService { @@ -43,7 +43,7 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *commonlist cfg: cfg, caps: caps, fileCPU: fileCPU, - server: server, + cls: cls, anz: anz, srvDep: srvDep, csCh: make(chan *cores.CoreS, 1), @@ -52,18 +52,22 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *commonlist // CoreService implements Service interface type CoreService struct { - mu sync.RWMutex - cfg *config.CGRConfig - server *commonlisteners.CommonListenerS + mu sync.RWMutex + + anz *AnalyzerService + cls *CommonListenerService + + cS *cores.CoreS + cl *commonlisteners.CommonListenerS + + fileCPU *os.File caps *engine.Caps + csCh chan *cores.CoreS stopChan chan struct{} shdWg *sync.WaitGroup - fileCPU *os.File - cS *cores.CoreS connChan chan birpc.ClientConnector - anz *AnalyzerService + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup - csCh chan *cores.CoreS } // Start should handle the service start @@ -72,6 +76,11 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err return utils.ErrServiceAlreadyRunning } + var err error + cS.cl, err = cS.cls.WaitForCLS(ctx) + if err != nil { + return err + } if err := cS.anz.WaitForAnalyzerS(ctx); err != nil { return err } @@ -88,7 +97,7 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err } if !cS.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - cS.server.RpcRegister(s) + cS.cl.RpcRegister(s) } } cS.connChan <- cS.anz.GetInternalCodec(srv, utils.CoreS) @@ -111,7 +120,7 @@ func (cS *CoreService) Shutdown() error { cS.cS = nil <-cS.connChan <-cS.csCh - cS.server.RpcUnregisterName(utils.CoreSv1) + cS.cl.RpcUnregisterName(utils.CoreSv1) return nil } diff --git a/services/dispatchers.go b/services/dispatchers.go index e574a14b9..0bf5aff26 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -33,7 +33,7 @@ import ( // NewDispatcherService returns the Dispatcher Service func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - server *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, + cls *CommonListenerService, internalChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) *DispatcherService { return &DispatcherService{ @@ -42,7 +42,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -53,19 +53,21 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, // DispatcherService implements Service interface type DispatcherService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService + anz *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - dspS *dispatchers.DispatcherService - connChan chan birpc.ClientConnector - anz *AnalyzerService - srvDep map[string]*sync.WaitGroup + dspS *dispatchers.DispatcherService + cl *commonlisteners.CommonListenerS + connChan chan birpc.ClientConnector + connMgr *engine.ConnManager + cfg *config.CGRConfig srvsReload map[string]chan struct{} + srvDep map[string]*sync.WaitGroup } // Start should handle the sercive start @@ -74,6 +76,9 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) return utils.ErrServiceAlreadyRunning } utils.Logger.Info("Starting CGRateS DispatcherS service.") + if dspS.cl, err = dspS.cls.WaitForCLS(ctx); err != nil { + return err + } if err = dspS.cacheS.WaitToPrecache(ctx, utils.CacheDispatcherProfiles, utils.CacheDispatcherHosts, @@ -102,7 +107,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) srv, _ := engine.NewDispatcherService(dspS.dspS) // srv, _ := birpc.NewService(apis.NewDispatcherSv1(dspS.dspS), "", false) for _, s := range srv { - dspS.server.RpcRegister(s) + dspS.cl.RpcRegister(s) } dspS.connMgr.EnableDispatcher(srv) // for the moment we dispable Apier through dispatcher @@ -125,8 +130,8 @@ func (dspS *DispatcherService) Shutdown() (err error) { dspS.dspS.Shutdown() dspS.dspS = nil <-dspS.connChan - dspS.server.RpcUnregisterName(utils.DispatcherSv1) - dspS.server.RpcUnregisterName(utils.AttributeSv1) + dspS.cl.RpcUnregisterName(utils.DispatcherSv1) + dspS.cl.RpcUnregisterName(utils.AttributeSv1) dspS.unregisterAllDispatchedSubsystems() dspS.connMgr.DisableDispatcher() @@ -138,7 +143,7 @@ func (dspS *DispatcherService) Shutdown() (err error) { func (dspS *DispatcherService) IsRunning() bool { dspS.RLock() defer dspS.RUnlock() - return dspS != nil && dspS.dspS != nil + return dspS.dspS != nil } // ServiceName returns the service name @@ -152,7 +157,7 @@ func (dspS *DispatcherService) ShouldRun() bool { } func (dspS *DispatcherService) unregisterAllDispatchedSubsystems() { - dspS.server.RpcUnregisterName(utils.AttributeSv1) + dspS.cl.RpcUnregisterName(utils.AttributeSv1) } func (dspS *DispatcherService) RegisterShutdownChan(subsys string) (c chan struct{}) { diff --git a/services/dispatchers_test.go b/services/dispatchers_test.go index 4b0e86239..cadea5de0 100644 --- a/services/dispatchers_test.go +++ b/services/dispatchers_test.go @@ -53,7 +53,7 @@ func TestDispatcherSCoverage(t *testing.T) { dm: db, cacheS: chS, filterSChan: filterSChan, - server: cls, + cls: cls, connMgr: srv.connMgr, connChan: make(chan birpc.ClientConnector, 1), anz: anz, diff --git a/services/ees.go b/services/ees.go index fb136a8b6..6656d3cdb 100644 --- a/services/ees.go +++ b/services/ees.go @@ -34,13 +34,13 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - connMgr *engine.ConnManager, server *commonlisteners.CommonListenerS, intConnChan chan birpc.ClientConnector, + connMgr *engine.ConnManager, cls *CommonListenerService, intConnChan chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &EventExporterService{ cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, - server: server, + cls: cls, intConnChan: intConnChan, anz: anz, srvDep: srvDep, @@ -51,15 +51,17 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil type EventExporterService struct { mu sync.RWMutex - cfg *config.CGRConfig + cls *CommonListenerService + anz *AnalyzerService filterSChan chan *engine.FilterS - connMgr *engine.ConnManager - server *commonlisteners.CommonListenerS - intConnChan chan birpc.ClientConnector - eeS *ees.EeS - anz *AnalyzerService - srvDep map[string]*sync.WaitGroup + eeS *ees.EeS + cl *commonlisteners.CommonListenerS + + intConnChan chan birpc.ClientConnector + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup } // ServiceName returns the service name @@ -76,7 +78,7 @@ func (es *EventExporterService) ShouldRun() (should bool) { func (es *EventExporterService) IsRunning() bool { es.mu.RLock() defer es.mu.RUnlock() - return es != nil && es.eeS != nil + return es.eeS != nil } // Reload handles the change of config @@ -95,7 +97,7 @@ func (es *EventExporterService) Shutdown() error { es.eeS.ClearExporterCache() es.eeS = nil <-es.intConnChan - es.server.RpcUnregisterName(utils.EeSv1) + es.cl.RpcUnregisterName(utils.EeSv1) return nil } @@ -105,6 +107,10 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc return utils.ErrServiceAlreadyRunning } + var err error + if es.cl, err = es.cls.WaitForCLS(ctx); err != nil { + return err + } fltrS, err := waitForFilterS(ctx, es.filterSChan) if err != nil { return err @@ -126,7 +132,7 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc srv, _ := engine.NewServiceWithPing(es.eeS, utils.EeSv1, utils.V1Prfx) // srv, _ := birpc.NewService(es.rpc, "", false) if !es.cfg.DispatcherSCfg().Enabled { - es.server.RpcRegister(srv) + es.cl.RpcRegister(srv) } es.intConnChan <- es.anz.GetInternalCodec(srv, utils.EEs) return nil diff --git a/services/ees_test.go b/services/ees_test.go index 9026f2adb..ba6a08f20 100644 --- a/services/ees_test.go +++ b/services/ees_test.go @@ -48,7 +48,7 @@ func TestEventExporterSCoverage(t *testing.T) { cfg: cfg, filterSChan: filterSChan, connMgr: engine.NewConnManager(cfg), - server: cls, + cls: cls, intConnChan: make(chan birpc.ClientConnector, 1), anz: anz, srvDep: srvDep, diff --git a/services/efs.go b/services/efs.go index 1b1eb90cd..23cc77a94 100644 --- a/services/efs.go +++ b/services/efs.go @@ -36,24 +36,26 @@ import ( type ExportFailoverService struct { sync.Mutex - cfg *config.CGRConfig - connMgr *engine.ConnManager - server *commonlisteners.CommonListenerS - srv *birpc.Service - intConnChan chan birpc.ClientConnector - stopChan chan struct{} + cls *CommonListenerService - efS *efs.EfS - srvDep map[string]*sync.WaitGroup + efS *efs.EfS + cl *commonlisteners.CommonListenerS + srv *birpc.Service + + stopChan chan struct{} + intConnChan chan birpc.ClientConnector + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup } // NewExportFailoverService is the constructor for the TpeService func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager, intConnChan chan birpc.ClientConnector, - server *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) *ExportFailoverService { + cls *CommonListenerService, srvDep map[string]*sync.WaitGroup) *ExportFailoverService { return &ExportFailoverService{ cfg: cfg, - server: server, + cls: cls, connMgr: connMgr, intConnChan: intConnChan, srvDep: srvDep, @@ -65,12 +67,15 @@ func (efServ *ExportFailoverService) Start(ctx *context.Context, _ context.Cance if efServ.IsRunning() { return utils.ErrServiceAlreadyRunning } + if efServ.cl, err = efServ.cls.WaitForCLS(ctx); err != nil { + return err + } efServ.Lock() efServ.efS = efs.NewEfs(efServ.cfg, efServ.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EFs)) efServ.stopChan = make(chan struct{}) efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx) - efServ.server.RpcRegister(efServ.srv) + efServ.cl.RpcRegister(efServ.srv) efServ.Unlock() return } @@ -93,9 +98,8 @@ func (efServ *ExportFailoverService) Shutdown() (err error) { // IsRunning returns if the service is running func (efServ *ExportFailoverService) IsRunning() bool { efServ.Lock() - run := efServ != nil && efServ.efS != nil - efServ.Unlock() - return run + defer efServ.Unlock() + return efServ.efS != nil } // ShouldRun returns if the service should be running diff --git a/services/ers.go b/services/ers.go index ca9248073..524b67faf 100644 --- a/services/ers.go +++ b/services/ers.go @@ -37,7 +37,7 @@ func NewEventReaderService( cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - server *commonlisteners.CommonListenerS, + cls *CommonListenerService, intConn chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { @@ -46,7 +46,7 @@ func NewEventReaderService( cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, - server: server, + cls: cls, intConn: intConn, anz: anz, srvDep: srvDep, @@ -56,16 +56,19 @@ func NewEventReaderService( // EventReaderService implements Service interface type EventReaderService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService + anz *AnalyzerService filterSChan chan *engine.FilterS - ers *ers.ERService + ers *ers.ERService + cl *commonlisteners.CommonListenerS + rldChan chan struct{} stopChan chan struct{} - connMgr *engine.ConnManager - server *commonlisteners.CommonListenerS intConn chan birpc.ClientConnector - anz *AnalyzerService + connMgr *engine.ConnManager + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -75,6 +78,9 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel return utils.ErrServiceAlreadyRunning } + if erS.cl, err = erS.cls.WaitForCLS(ctx); err != nil { + return err + } var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil { return @@ -100,7 +106,7 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel return err } if !erS.cfg.DispatcherSCfg().Enabled { - erS.server.RpcRegister(srv) + erS.cl.RpcRegister(srv) } erS.intConn <- erS.anz.GetInternalCodec(srv, utils.ERs) return @@ -128,7 +134,7 @@ func (erS *EventReaderService) Shutdown() (err error) { defer erS.Unlock() close(erS.stopChan) erS.ers = nil - erS.server.RpcUnregisterName(utils.ErSv1) + erS.cl.RpcUnregisterName(utils.ErSv1) return } @@ -136,7 +142,7 @@ func (erS *EventReaderService) Shutdown() (err error) { func (erS *EventReaderService) IsRunning() bool { erS.RLock() defer erS.RUnlock() - return erS != nil && erS.ers != nil + return erS.ers != nil } // ServiceName returns the service name diff --git a/services/ers_test.go b/services/ers_test.go index b83ea23fd..ba40c8c81 100644 --- a/services/ers_test.go +++ b/services/ers_test.go @@ -52,7 +52,7 @@ func TestEventReaderSCoverage(t *testing.T) { rldChan: make(chan struct{}, 1), stopChan: make(chan struct{}, 1), connMgr: nil, - server: cls, + cls: cls, srvDep: srvDep, } if !srv2.IsRunning() { diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index 018208981..ebe78fcea 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -100,7 +100,7 @@ func (fS *FreeswitchAgent) Shutdown() (err error) { func (fS *FreeswitchAgent) IsRunning() bool { fS.RLock() defer fS.RUnlock() - return fS != nil && fS.fS != nil + return fS.fS != nil } // ServiceName returns the service name diff --git a/services/httpagent.go b/services/httpagent.go index 108d7890e..56c59c6bb 100644 --- a/services/httpagent.go +++ b/services/httpagent.go @@ -33,12 +33,12 @@ import ( // NewHTTPAgent returns the HTTP Agent func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - server *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, + cls *CommonListenerService, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &HTTPAgent{ cfg: cfg, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, srvDep: srvDep, } @@ -47,14 +47,18 @@ func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, // HTTPAgent implements Agent interface type HTTPAgent struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS + + cl *commonlisteners.CommonListenerS // we can realy stop the HTTPAgent so keep a flag // if we registerd the handlers started bool + connMgr *engine.ConnManager + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -64,6 +68,10 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro return utils.ErrServiceAlreadyRunning } + cl, err := ha.cls.WaitForCLS(ctx) + if err != nil { + return err + } var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, ha.filterSChan); err != nil { return @@ -73,7 +81,7 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro ha.started = true utils.Logger.Info(fmt.Sprintf("<%s> successfully started HTTPAgent", utils.HTTPAgent)) for _, agntCfg := range ha.cfg.HTTPAgentCfg() { - ha.server.RegisterHttpHandler(agntCfg.URL, + cl.RegisterHttpHandler(agntCfg.URL, agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, filterS, ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload, agntCfg.ReplyPayload, agntCfg.RequestProcessors)) @@ -99,7 +107,7 @@ func (ha *HTTPAgent) Shutdown() (err error) { func (ha *HTTPAgent) IsRunning() bool { ha.RLock() defer ha.RUnlock() - return ha != nil && ha.started + return ha.started } // ServiceName returns the service name diff --git a/services/httpagent_test.go b/services/httpagent_test.go index 07ffd7418..dacc1794f 100644 --- a/services/httpagent_test.go +++ b/services/httpagent_test.go @@ -46,7 +46,7 @@ func TestHTTPAgentCoverage(t *testing.T) { srv2 := &HTTPAgent{ cfg: cfg, filterSChan: filterSChan, - server: cls, + cls: cls, started: true, connMgr: cM, srvDep: srvDep, diff --git a/services/janus.go b/services/janus.go index 135749da6..7f83bc27b 100644 --- a/services/janus.go +++ b/services/janus.go @@ -25,7 +25,6 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -34,12 +33,12 @@ import ( // NewJanusAgent returns the Janus Agent func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - server *commonlisteners.CommonListenerS, connMgr *engine.ConnManager, + cls *CommonListenerService, connMgr *engine.ConnManager, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &JanusAgent{ cfg: cfg, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, srvDep: srvDep, } @@ -48,22 +47,32 @@ func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, // JanusAgent implements Service interface type JanusAgent struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - jA *agents.JanusAgent + + jA *agents.JanusAgent // we can realy stop the JanusAgent so keep a flag // if we registerd the jandlers started bool + connMgr *engine.ConnManager + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } // Start should jandle the sercive start func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err error) { - filterS := <-ja.filterSChan - ja.filterSChan <- filterS + + cl, err := ja.cls.WaitForCLS(ctx) + if err != nil { + return err + } + var filterS *engine.FilterS + if filterS, err = waitForFilterS(ctx, ja.filterSChan); err != nil { + return + } ja.Lock() if ja.started { @@ -78,13 +87,13 @@ func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err err return } - ja.server.RegisterHttpHandler("POST "+ja.cfg.JanusAgentCfg().URL, http.HandlerFunc(ja.jA.CreateSession)) - ja.server.RegisterHttpHandler("OPTIONS "+ja.cfg.JanusAgentCfg().URL, http.HandlerFunc(ja.jA.CORSOptions)) - ja.server.RegisterHttpHandler(fmt.Sprintf("OPTIONS %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.SessionKeepalive)) - ja.server.RegisterHttpHandler(fmt.Sprintf("OPTIONS %s/{sessionID}/", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.CORSOptions)) - ja.server.RegisterHttpHandler(fmt.Sprintf("GET %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.PollSession)) - ja.server.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.AttachPlugin)) - ja.server.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}/{handleID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.HandlePlugin)) + cl.RegisterHttpHandler("POST "+ja.cfg.JanusAgentCfg().URL, http.HandlerFunc(ja.jA.CreateSession)) + cl.RegisterHttpHandler("OPTIONS "+ja.cfg.JanusAgentCfg().URL, http.HandlerFunc(ja.jA.CORSOptions)) + cl.RegisterHttpHandler(fmt.Sprintf("OPTIONS %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.SessionKeepalive)) + cl.RegisterHttpHandler(fmt.Sprintf("OPTIONS %s/{sessionID}/", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.CORSOptions)) + cl.RegisterHttpHandler(fmt.Sprintf("GET %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.PollSession)) + cl.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.AttachPlugin)) + cl.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}/{handleID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.HandlePlugin)) ja.started = true ja.Unlock() diff --git a/services/kamailioagent.go b/services/kamailioagent.go index 1a4938b30..b24242818 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -106,7 +106,7 @@ func (kam *KamailioAgent) Shutdown() (err error) { func (kam *KamailioAgent) IsRunning() bool { kam.RLock() defer kam.RUnlock() - return kam != nil && kam.kam != nil + return kam.kam != nil } // ServiceName returns the service name diff --git a/services/libcgr-engine.go b/services/libcgr-engine.go index af7d12daf..df9e48a71 100644 --- a/services/libcgr-engine.go +++ b/services/libcgr-engine.go @@ -32,7 +32,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/apis" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/guardian" @@ -184,41 +183,45 @@ func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS } } -func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig, - server *commonlisteners.CommonListenerS, anz *AnalyzerService) { +func cgrInitGuardianSv1(ctx *context.Context, iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig, + cls *CommonListenerService, anz *AnalyzerService) { + cl, _ := cls.WaitForCLS(ctx) srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true) if !cfg.DispatcherSCfg().Enabled { for _, s := range srv { - server.RpcRegister(s) + cl.RpcRegister(s) } } iGuardianSCh <- anz.GetInternalCodec(srv, utils.GuardianS) } -func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, +func cgrInitServiceManagerV1(ctx *context.Context, iServMngrCh chan birpc.ClientConnector, srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig, - server *commonlisteners.CommonListenerS, anz *AnalyzerService) { + cls *CommonListenerService, anz *AnalyzerService) { + cl, _ := cls.WaitForCLS(ctx) srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false) if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(srv) + cl.RpcRegister(srv) } iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager) } -func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector, - cfg *config.CGRConfig, server *commonlisteners.CommonListenerS, anz *AnalyzerService) { +func cgrInitConfigSv1(ctx *context.Context, iConfigCh chan birpc.ClientConnector, + cfg *config.CGRConfig, cls *CommonListenerService, anz *AnalyzerService) { + cl, _ := cls.WaitForCLS(ctx) srv, _ := engine.NewServiceWithName(cfg, utils.ConfigS, true) // srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false) if !cfg.DispatcherSCfg().Enabled { for _, s := range srv { - server.RpcRegister(s) + cl.RpcRegister(s) } } iConfigCh <- anz.GetInternalCodec(srv, utils.ConfigSv1) } func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, - cfg *config.CGRConfig, server *commonlisteners.CommonListenerS, internalDispatcherSChan chan birpc.ClientConnector) { + cfg *config.CGRConfig, cls *CommonListenerService, internalDispatcherSChan chan birpc.ClientConnector) { + cl, _ := cls.WaitForCLS(ctx) if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this select { case dispatcherS := <-internalDispatcherSChan: @@ -227,7 +230,7 @@ func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, return } } - server.StartServer(ctx, shtdwnEngine, cfg) + cl.StartServer(ctx, shtdwnEngine, cfg) } func waitForFilterS(ctx *context.Context, fsCh chan *engine.FilterS) (filterS *engine.FilterS, err error) { diff --git a/services/loaders.go b/services/loaders.go index d9bfeca50..f5d9e2e61 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -33,7 +33,7 @@ import ( // NewLoaderService returns the Loader Service func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, - filterSChan chan *engine.FilterS, server *commonlisteners.CommonListenerS, + filterSChan chan *engine.FilterS, cls *CommonListenerService, internalLoaderSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) *LoaderService { @@ -42,7 +42,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, cfg: cfg, dm: dm, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, stopChan: make(chan struct{}), anz: anz, @@ -53,16 +53,19 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, // LoaderService implements Service interface type LoaderService struct { sync.RWMutex - cfg *config.CGRConfig - dm *DataDBService - filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - stopChan chan struct{} - ldrs *loaders.LoaderS + cls *CommonListenerService + dm *DataDBService + anz *AnalyzerService + filterSChan chan *engine.FilterS + + ldrs *loaders.LoaderS + cl *commonlisteners.CommonListenerS + + stopChan chan struct{} connChan chan birpc.ClientConnector connMgr *engine.ConnManager - anz *AnalyzerService + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -72,6 +75,9 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er return utils.ErrServiceAlreadyRunning } + if ldrs.cl, err = ldrs.cls.WaitForCLS(ctx); err != nil { + return err + } var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil { return @@ -99,7 +105,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er // srv, _ := birpc.NewService(apis.NewLoaderSv1(ldrs.ldrs), "", false) if !ldrs.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - ldrs.server.RpcRegister(s) + ldrs.cl.RpcRegister(s) } } ldrs.connChan <- ldrs.anz.GetInternalCodec(srv, utils.LoaderS) @@ -132,7 +138,7 @@ func (ldrs *LoaderService) Shutdown() (_ error) { ldrs.ldrs = nil close(ldrs.stopChan) <-ldrs.connChan - ldrs.server.RpcUnregisterName(utils.LoaderSv1) + ldrs.cl.RpcUnregisterName(utils.LoaderSv1) ldrs.Unlock() return } @@ -141,7 +147,7 @@ func (ldrs *LoaderService) Shutdown() (_ error) { func (ldrs *LoaderService) IsRunning() bool { ldrs.RLock() defer ldrs.RUnlock() - return ldrs != nil && ldrs.ldrs != nil && ldrs.ldrs.Enabled() + return ldrs.ldrs != nil && ldrs.ldrs.Enabled() } // ServiceName returns the service name diff --git a/services/radiusagent.go b/services/radiusagent.go index 14c90c8ce..13ad500ee 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -124,7 +124,7 @@ func (rad *RadiusAgent) shutdown() { func (rad *RadiusAgent) IsRunning() bool { rad.RLock() defer rad.RUnlock() - return rad != nil && rad.rad != nil + return rad.rad != nil } // ServiceName returns the service name diff --git a/services/rankings.go b/services/rankings.go index e2ddb6a84..e05697f5b 100644 --- a/services/rankings.go +++ b/services/rankings.go @@ -35,7 +35,7 @@ import ( // NewRankingService returns the RankingS Service func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - server *commonlisteners.CommonListenerS, internalRankingSChan chan birpc.ClientConnector, + cls *CommonListenerService, internalRankingSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &RankingService{ @@ -44,7 +44,7 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -53,16 +53,20 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService, type RankingService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService + anz *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - connChan chan birpc.ClientConnector - anz *AnalyzerService - ran *engine.RankingS - srvDep map[string]*sync.WaitGroup + + ran *engine.RankingS + cl *commonlisteners.CommonListenerS + + connChan chan birpc.ClientConnector + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup } // Start should handle the sercive start @@ -72,6 +76,9 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er } ran.srvDep[utils.DataDB].Add(1) + if ran.cl, err = ran.cls.WaitForCLS(ctx); err != nil { + return err + } if err = ran.cacheS.WaitToPrecache(ctx, utils.CacheRankingProfiles, utils.CacheRankings, @@ -105,7 +112,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er } if !ran.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - ran.server.RpcRegister(s) + ran.cl.RpcRegister(s) } } ran.connChan <- ran.anz.GetInternalCodec(srv, utils.RankingS) @@ -128,13 +135,13 @@ func (ran *RankingService) Shutdown() (err error) { ran.ran.StopRankingS() ran.ran = nil <-ran.connChan - ran.server.RpcUnregisterName(utils.RankingSv1) + ran.cl.RpcUnregisterName(utils.RankingSv1) return } // IsRunning returns if the service is running func (ran *RankingService) IsRunning() bool { - return ran != nil && ran.ran != nil + return ran.ran != nil } // ServiceName returns the service name diff --git a/services/rates.go b/services/rates.go index 49c057050..8e1533cab 100644 --- a/services/rates.go +++ b/services/rates.go @@ -34,7 +34,7 @@ import ( // NewRateService constructs RateService func NewRateService(cfg *config.CGRConfig, cacheS *CacheService, filterSChan chan *engine.FilterS, - dmS *DataDBService, server *commonlisteners.CommonListenerS, + dmS *DataDBService, cls *CommonListenerService, intConnChan chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &RateService{ @@ -42,7 +42,7 @@ func NewRateService(cfg *config.CGRConfig, cacheS: cacheS, filterSChan: filterSChan, dmS: dmS, - server: server, + cls: cls, intConnChan: intConnChan, rldChan: make(chan struct{}), anz: anz, @@ -54,18 +54,19 @@ func NewRateService(cfg *config.CGRConfig, type RateService struct { sync.RWMutex - cfg *config.CGRConfig - filterSChan chan *engine.FilterS + cls *CommonListenerService + anz *AnalyzerService dmS *DataDBService cacheS *CacheService - server *commonlisteners.CommonListenerS + filterSChan chan *engine.FilterS - rldChan chan struct{} - stopChan chan struct{} + rateS *rates.RateS + cl *commonlisteners.CommonListenerS - rateS *rates.RateS + rldChan chan struct{} + stopChan chan struct{} intConnChan chan birpc.ClientConnector - anz *AnalyzerService + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -100,7 +101,7 @@ func (rs *RateService) Shutdown() (err error) { rs.rateS.Shutdown() //we don't verify the error because shutdown never returns an err rs.rateS = nil <-rs.intConnChan - rs.server.RpcUnregisterName(utils.RateSv1) + rs.cl.RpcUnregisterName(utils.RateSv1) return } @@ -110,6 +111,9 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er return utils.ErrServiceAlreadyRunning } + if rs.cl, err = rs.cls.WaitForCLS(ctx); err != nil { + return err + } if err = rs.cacheS.WaitToPrecache(ctx, utils.CacheRateProfiles, utils.CacheRateProfilesFilterIndexes, @@ -141,7 +145,7 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er } // srv, _ := birpc.NewService(apis.NewRateSv1(rs.rateS), "", false) if !rs.cfg.DispatcherSCfg().Enabled { - rs.server.RpcRegister(srv) + rs.cl.RpcRegister(srv) } rs.intConnChan <- rs.anz.GetInternalCodec(srv, utils.RateS) return diff --git a/services/rates_test.go b/services/rates_test.go index 6fc6cc00a..dabb87d68 100644 --- a/services/rates_test.go +++ b/services/rates_test.go @@ -52,7 +52,7 @@ func TestRateSCoverage(t *testing.T) { filterSChan: filterSChan, dmS: db, cacheS: chS, - server: cls, + cls: cls, stopChan: make(chan struct{}), intConnChan: make(chan birpc.ClientConnector, 1), anz: anz, diff --git a/services/registrarc.go b/services/registrarc.go index 50cd1347f..c4660daf9 100644 --- a/services/registrarc.go +++ b/services/registrarc.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/registrarc" @@ -31,12 +30,10 @@ import ( ) // NewRegistrarCService returns the Dispatcher Service -func NewRegistrarCService(cfg *config.CGRConfig, server *commonlisteners.CommonListenerS, - connMgr *engine.ConnManager, anz *AnalyzerService, +func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &RegistrarCService{ cfg: cfg, - server: server, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -46,15 +43,16 @@ func NewRegistrarCService(cfg *config.CGRConfig, server *commonlisteners.CommonL // RegistrarCService implements Service interface type RegistrarCService struct { sync.RWMutex - cfg *config.CGRConfig - server *commonlisteners.CommonListenerS - connMgr *engine.ConnManager + + anz *AnalyzerService + + dspS *registrarc.RegistrarCService + stopChan chan struct{} rldChan chan struct{} - - dspS *registrarc.RegistrarCService - anz *AnalyzerService - srvDep map[string]*sync.WaitGroup + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup } // Start should handle the sercive start @@ -94,7 +92,7 @@ func (dspS *RegistrarCService) Shutdown() (err error) { func (dspS *RegistrarCService) IsRunning() bool { dspS.RLock() defer dspS.RUnlock() - return dspS != nil && dspS.dspS != nil + return dspS.dspS != nil } // ServiceName returns the service name diff --git a/services/resources.go b/services/resources.go index 4ca197e0f..4e08b4615 100644 --- a/services/resources.go +++ b/services/resources.go @@ -34,7 +34,7 @@ import ( // NewResourceService returns the Resource Service func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - server *commonlisteners.CommonListenerS, internalResourceSChan chan birpc.ClientConnector, + cls *CommonListenerService, internalResourceSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &ResourceService{ @@ -43,7 +43,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -53,16 +53,19 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, // ResourceService implements Service interface type ResourceService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService + anz *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - reS *engine.ResourceS + reS *engine.ResourceS + cl *commonlisteners.CommonListenerS + connChan chan birpc.ClientConnector connMgr *engine.ConnManager - anz *AnalyzerService + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -73,6 +76,9 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e } reS.srvDep[utils.DataDB].Add(1) + if reS.cl, err = reS.cls.WaitForCLS(ctx); err != nil { + return err + } if err = reS.cacheS.WaitToPrecache(ctx, utils.CacheResourceProfiles, utils.CacheResources, @@ -100,7 +106,7 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e // srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false) if !reS.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - reS.server.RpcRegister(s) + reS.cl.RpcRegister(s) } } reS.connChan <- reS.anz.GetInternalCodec(srv, utils.ResourceS) @@ -123,7 +129,7 @@ func (reS *ResourceService) Shutdown() (err error) { reS.reS.Shutdown(context.TODO()) //we don't verify the error because shutdown never returns an error reS.reS = nil <-reS.connChan - reS.server.RpcUnregisterName(utils.ResourceSv1) + reS.cl.RpcUnregisterName(utils.ResourceSv1) return } @@ -131,7 +137,7 @@ func (reS *ResourceService) Shutdown() (err error) { func (reS *ResourceService) IsRunning() bool { reS.RLock() defer reS.RUnlock() - return reS != nil && reS.reS != nil + return reS.reS != nil } // ServiceName returns the service name diff --git a/services/resources_test.go b/services/resources_test.go index 29b3fe630..a8a70f4bc 100644 --- a/services/resources_test.go +++ b/services/resources_test.go @@ -51,7 +51,7 @@ func TestResourceSCoverage(t *testing.T) { dm: db, cacheS: chS, filterSChan: filterSChan, - server: cls, + cls: cls, connChan: make(chan birpc.ClientConnector, 1), connMgr: nil, anz: anz, diff --git a/services/routes.go b/services/routes.go index 7518277de..f2d6195cb 100644 --- a/services/routes.go +++ b/services/routes.go @@ -35,7 +35,7 @@ import ( // NewRouteService returns the Route Service func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - server *commonlisteners.CommonListenerS, internalRouteSChan chan birpc.ClientConnector, + cls *CommonListenerService, internalRouteSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &RouteService{ @@ -44,7 +44,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -54,16 +54,19 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, // RouteService implements Service interface type RouteService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService + anz *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - routeS *engine.RouteS + routeS *engine.RouteS + cl *commonlisteners.CommonListenerS + connChan chan birpc.ClientConnector - anz *AnalyzerService + connMgr *engine.ConnManager + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -73,6 +76,9 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e return utils.ErrServiceAlreadyRunning } + if routeS.cl, err = routeS.cls.WaitForCLS(ctx); err != nil { + return err + } if err = routeS.cacheS.WaitToPrecache(ctx, utils.CacheRouteProfiles, utils.CacheRouteFilterIndexes); err != nil { @@ -99,7 +105,7 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e // srv, _ := birpc.NewService(apis.NewRouteSv1(routeS.routeS), "", false) if !routeS.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - routeS.server.RpcRegister(s) + routeS.cl.RpcRegister(s) } } routeS.connChan <- routeS.anz.GetInternalCodec(srv, utils.RouteS) @@ -118,7 +124,7 @@ func (routeS *RouteService) Shutdown() (err error) { routeS.routeS.Shutdown() //we don't verify the error because shutdown never returns an error routeS.routeS = nil <-routeS.connChan - routeS.server.RpcUnregisterName(utils.RouteSv1) + routeS.cl.RpcUnregisterName(utils.RouteSv1) return } @@ -126,7 +132,7 @@ func (routeS *RouteService) Shutdown() (err error) { func (routeS *RouteService) IsRunning() bool { routeS.RLock() defer routeS.RUnlock() - return routeS != nil && routeS.routeS != nil + return routeS.routeS != nil } // ServiceName returns the service name diff --git a/services/routes_test.go b/services/routes_test.go index 5f7c7c1c5..cbcae6512 100644 --- a/services/routes_test.go +++ b/services/routes_test.go @@ -51,7 +51,7 @@ func TestSupplierSCoverage(t *testing.T) { dm: db, cacheS: chS, filterSChan: filterSChan, - server: cls, + cls: cls, connMgr: nil, routeS: &engine.RouteS{}, // rpc: nil, diff --git a/services/sessions.go b/services/sessions.go index 1584a4e46..e9acd0d5f 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -36,7 +36,7 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, - server *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector, + cls *CommonListenerService, internalChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &SessionService{ @@ -44,7 +44,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha cfg: cfg, dm: dm, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -54,19 +54,20 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha // SessionService implements Service interface type SessionService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService + anz *AnalyzerService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - stopChan chan struct{} - sm *sessions.SessionS - connChan chan birpc.ClientConnector + sm *sessions.SessionS + cl *commonlisteners.CommonListenerS - // in order to stop the bircp server if necesary - bircpEnabled bool + bircpEnabled bool // to stop birpc server if needed + stopChan chan struct{} + connChan chan birpc.ClientConnector connMgr *engine.ConnManager - anz *AnalyzerService + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -76,6 +77,9 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) return utils.ErrServiceAlreadyRunning } + if smg.cl, err = smg.cls.WaitForCLS(ctx); err != nil { + return err + } var filterS *engine.FilterS if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil { return @@ -102,7 +106,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) // srv, _ := birpc.NewService(apis.NewSessionSv1(smg.sm), utils.EmptyString, false) // methods with multiple options if !smg.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - smg.server.RpcRegister(s) + smg.cl.RpcRegister(s) } } smg.connChan <- smg.anz.GetInternalCodec(srv, utils.SessionS) @@ -110,7 +114,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) if smg.cfg.SessionSCfg().ListenBijson != utils.EmptyString { smg.bircpEnabled = true for n, s := range srv { - smg.server.BiRPCRegisterName(n, s) + smg.cl.BiRPCRegisterName(n, s) } // run this in it's own goroutine go smg.start(shtDw) @@ -119,7 +123,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) } func (smg *SessionService) start(shtDw context.CancelFunc) (err error) { - if err := smg.server.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson, + if err := smg.cl.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson, smg.cfg.SessionSCfg().ListenBigob, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err)) smg.Lock() @@ -144,12 +148,12 @@ func (smg *SessionService) Shutdown() (err error) { return } if smg.bircpEnabled { - smg.server.StopBiRPC() + smg.cl.StopBiRPC() smg.bircpEnabled = false } smg.sm = nil <-smg.connChan - smg.server.RpcUnregisterName(utils.SessionSv1) + smg.cl.RpcUnregisterName(utils.SessionSv1) // smg.server.BiRPCUnregisterName(utils.SessionSv1) return } @@ -158,7 +162,7 @@ func (smg *SessionService) Shutdown() (err error) { func (smg *SessionService) IsRunning() bool { smg.RLock() defer smg.RUnlock() - return smg != nil && smg.sm != nil + return smg.sm != nil } // ServiceName returns the service name diff --git a/services/sessions_test.go b/services/sessions_test.go index 392da1922..8fd259e9c 100644 --- a/services/sessions_test.go +++ b/services/sessions_test.go @@ -59,7 +59,7 @@ func TestSessionSCoverage(t *testing.T) { srv2 := SessionService{ cfg: cfg, dm: db, - server: cls, + cls: cls, connChan: make(chan birpc.ClientConnector, 1), connMgr: nil, anz: anz, diff --git a/services/sipagent.go b/services/sipagent.go index b7f38b787..96f35fdf6 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -113,7 +113,7 @@ func (sip *SIPAgent) Shutdown() (err error) { func (sip *SIPAgent) IsRunning() bool { sip.RLock() defer sip.RUnlock() - return sip != nil && sip.sip != nil + return sip.sip != nil } // ServiceName returns the service name diff --git a/services/stats.go b/services/stats.go index ee08127f2..ac5eb9520 100644 --- a/services/stats.go +++ b/services/stats.go @@ -34,7 +34,7 @@ import ( // NewStatService returns the Stat Service func NewStatService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - server *commonlisteners.CommonListenerS, internalStatSChan chan birpc.ClientConnector, + cls *CommonListenerService, internalStatSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &StatService{ @@ -43,7 +43,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - server: server, + cls: cls, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -53,16 +53,19 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, // StatService implements Service interface type StatService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService + anz *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - sts *engine.StatS + sts *engine.StatS + cl *commonlisteners.CommonListenerS + connChan chan birpc.ClientConnector - anz *AnalyzerService + connMgr *engine.ConnManager + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -73,6 +76,9 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e } sts.srvDep[utils.DataDB].Add(1) + if sts.cl, err = sts.cls.WaitForCLS(ctx); err != nil { + return err + } if err = sts.cacheS.WaitToPrecache(ctx, utils.CacheStatQueueProfiles, utils.CacheStatQueues, @@ -102,7 +108,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e // srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false) if !sts.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - sts.server.RpcRegister(s) + sts.cl.RpcRegister(s) } } sts.connChan <- sts.anz.GetInternalCodec(srv, utils.StatS) @@ -125,7 +131,7 @@ func (sts *StatService) Shutdown() (err error) { sts.sts.Shutdown(context.TODO()) sts.sts = nil <-sts.connChan - sts.server.RpcUnregisterName(utils.StatSv1) + sts.cl.RpcUnregisterName(utils.StatSv1) return } @@ -133,7 +139,7 @@ func (sts *StatService) Shutdown() (err error) { func (sts *StatService) IsRunning() bool { sts.RLock() defer sts.RUnlock() - return sts != nil && sts.sts != nil + return sts.sts != nil } // ServiceName returns the service name diff --git a/services/stats_test.go b/services/stats_test.go index 91d34f92c..14666ad52 100644 --- a/services/stats_test.go +++ b/services/stats_test.go @@ -51,7 +51,7 @@ func TestStatSCoverage(t *testing.T) { dm: db, cacheS: chS, filterSChan: filterSChan, - server: cls, + cls: cls, connMgr: nil, sts: &engine.StatS{}, connChan: make(chan birpc.ClientConnector, 1), diff --git a/services/stordb.go b/services/stordb.go index defca89bf..4376cba4b 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -153,7 +153,7 @@ func (db *StorDBService) IsRunning() bool { // isRunning returns if the service is running (not thread safe) func (db *StorDBService) isRunning() bool { - return db != nil && db.db != nil + return db.db != nil } // ServiceName returns the service name diff --git a/services/thresholds.go b/services/thresholds.go index 61a5ad8f7..da11f4924 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -35,7 +35,7 @@ import ( func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, - server *commonlisteners.CommonListenerS, internalThresholdSChan chan birpc.ClientConnector, + cls *CommonListenerService, internalThresholdSChan chan birpc.ClientConnector, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &ThresholdService{ connChan: internalThresholdSChan, @@ -43,7 +43,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, dm: dm, cacheS: cacheS, filterSChan: filterSChan, - server: server, + cls: cls, anz: anz, srvDep: srvDep, connMgr: connMgr, @@ -53,16 +53,19 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, // ThresholdService implements Service interface type ThresholdService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService + anz *AnalyzerService cacheS *CacheService filterSChan chan *engine.FilterS - server *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - thrs *engine.ThresholdS + thrs *engine.ThresholdS + cl *commonlisteners.CommonListenerS + connChan chan birpc.ClientConnector - anz *AnalyzerService + connMgr *engine.ConnManager + cfg *config.CGRConfig srvDep map[string]*sync.WaitGroup } @@ -73,6 +76,9 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) } thrs.srvDep[utils.DataDB].Add(1) + if thrs.cl, err = thrs.cls.WaitForCLS(ctx); err != nil { + return err + } if err = thrs.cacheS.WaitToPrecache(ctx, utils.CacheThresholdProfiles, utils.CacheThresholds, @@ -101,7 +107,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) // srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false) if !thrs.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - thrs.server.RpcRegister(s) + thrs.cl.RpcRegister(s) } } thrs.connChan <- thrs.anz.GetInternalCodec(srv, utils.ThresholdS) @@ -124,7 +130,7 @@ func (thrs *ThresholdService) Shutdown() (_ error) { thrs.thrs.Shutdown(context.TODO()) thrs.thrs = nil <-thrs.connChan - thrs.server.RpcUnregisterName(utils.ThresholdSv1) + thrs.cl.RpcUnregisterName(utils.ThresholdSv1) return } @@ -132,7 +138,7 @@ func (thrs *ThresholdService) Shutdown() (_ error) { func (thrs *ThresholdService) IsRunning() bool { thrs.RLock() defer thrs.RUnlock() - return thrs != nil && thrs.thrs != nil + return thrs.thrs != nil } // ServiceName returns the service name diff --git a/services/thresholds_test.go b/services/thresholds_test.go index 037daa50a..561a35074 100644 --- a/services/thresholds_test.go +++ b/services/thresholds_test.go @@ -50,7 +50,7 @@ func TestThresholdSCoverage(t *testing.T) { dm: db, cacheS: chS, filterSChan: filterSChan, - server: cls, + cls: cls, thrs: thrs1, connChan: make(chan birpc.ClientConnector, 1), anz: anz, diff --git a/services/tpes.go b/services/tpes.go index 13073c63c..bf0f21f66 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -34,13 +34,13 @@ import ( // NewTPeService is the constructor for the TpeService func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService, - server *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) servmanager.Service { + cls *CommonListenerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &TPeService{ cfg: cfg, srvDep: srvDep, dm: dm, connMgr: connMgr, - server: server, + cls: cls, } } @@ -48,58 +48,63 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataD type TPeService struct { sync.RWMutex - cfg *config.CGRConfig - server *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - tpes *tpes.TPeS - dm *DataDBService - srv *birpc.Service - stopChan chan struct{} + cls *CommonListenerService + dm *DataDBService - srvDep map[string]*sync.WaitGroup + tpes *tpes.TPeS + cl *commonlisteners.CommonListenerS + srv *birpc.Service + + stopChan chan struct{} + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup } // Start should handle the service start -func (tpSrv *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { +func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { + if ts.cl, err = ts.cls.WaitForCLS(ctx); err != nil { + return err + } var datadb *engine.DataManager - if datadb, err = tpSrv.dm.WaitForDM(ctx); err != nil { + if datadb, err = ts.dm.WaitForDM(ctx); err != nil { return } - tpSrv.tpes = tpes.NewTPeS(tpSrv.cfg, datadb, tpSrv.connMgr) + ts.tpes = tpes.NewTPeS(ts.cfg, datadb, ts.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TPeS)) - tpSrv.stopChan = make(chan struct{}) - tpSrv.srv, _ = birpc.NewService(apis.NewTPeSv1(tpSrv.tpes), utils.EmptyString, false) - tpSrv.server.RpcRegister(tpSrv.srv) + ts.stopChan = make(chan struct{}) + ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false) + ts.cl.RpcRegister(ts.srv) return } // Reload handles the change of config -func (tpSrv *TPeService) Reload(*context.Context, context.CancelFunc) (err error) { +func (ts *TPeService) Reload(*context.Context, context.CancelFunc) (err error) { return } // Shutdown stops the service -func (tpSrv *TPeService) Shutdown() (err error) { - tpSrv.srv = nil - close(tpSrv.stopChan) +func (ts *TPeService) Shutdown() (err error) { + ts.srv = nil + close(ts.stopChan) utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> subsystem", utils.CoreS, utils.TPeS)) return } // IsRunning returns if the service is running -func (tpSrv *TPeService) IsRunning() bool { - tpSrv.Lock() - defer tpSrv.Unlock() - return tpSrv != nil && tpSrv.tpes != nil +func (ts *TPeService) IsRunning() bool { + ts.Lock() + defer ts.Unlock() + return ts.tpes != nil } // ServiceName returns the service name -func (tpSrv *TPeService) ServiceName() string { +func (ts *TPeService) ServiceName() string { return utils.TPeS } // ShouldRun returns if the service should be running -func (tpSrv *TPeService) ShouldRun() bool { - return tpSrv.cfg.TpeSCfg().Enabled +func (ts *TPeService) ShouldRun() bool { + return ts.cfg.TpeSCfg().Enabled } diff --git a/services/trends.go b/services/trends.go index cd066bad1..45b7381b6 100644 --- a/services/trends.go +++ b/services/trends.go @@ -34,7 +34,7 @@ import ( // NewTrendsService returns the TrendS Service func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, cacheS *CacheService, filterSChan chan *engine.FilterS, - server *commonlisteners.CommonListenerS, internalTrendSChan chan birpc.ClientConnector, + cls *CommonListenerService, internalTrendSChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &TrendService{ @@ -42,7 +42,7 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, cfg: cfg, dm: dm, cacheS: cacheS, - server: server, + cls: cls, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -51,16 +51,20 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService, type TrendService struct { sync.RWMutex - cfg *config.CGRConfig + + cls *CommonListenerService dm *DataDBService - cacheS *CacheService - server *commonlisteners.CommonListenerS - connMgr *engine.ConnManager - filterSChan chan *engine.FilterS - connChan chan birpc.ClientConnector - trs *engine.TrendS anz *AnalyzerService - srvDep map[string]*sync.WaitGroup + cacheS *CacheService + filterSChan chan *engine.FilterS + + trs *engine.TrendS + cl *commonlisteners.CommonListenerS + + connChan chan birpc.ClientConnector + connMgr *engine.ConnManager + cfg *config.CGRConfig + srvDep map[string]*sync.WaitGroup } // Start should handle the sercive start @@ -70,6 +74,9 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err } trs.srvDep[utils.DataDB].Add(1) + if trs.cl, err = trs.cls.WaitForCLS(ctx); err != nil { + return err + } if err = trs.cacheS.WaitToPrecache(ctx, utils.CacheTrendProfiles, utils.CacheTrends, @@ -101,7 +108,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err } if !trs.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - trs.server.RpcRegister(s) + trs.cl.RpcRegister(s) } } trs.connChan <- trs.anz.GetInternalCodec(srv, utils.Trends) @@ -124,13 +131,13 @@ func (trs *TrendService) Shutdown() (err error) { trs.trs.StopTrendS() trs.trs = nil <-trs.connChan - trs.server.RpcUnregisterName(utils.TrendSv1) + trs.cl.RpcUnregisterName(utils.TrendSv1) return } // IsRunning returns if the service is running func (trs *TrendService) IsRunning() bool { - return trs != nil && trs.trs != nil + return trs.trs != nil } // ServiceName returns the service name diff --git a/services/trends_test.go b/services/trends_test.go index 10a2baa49..9b305f2fa 100644 --- a/services/trends_test.go +++ b/services/trends_test.go @@ -57,8 +57,8 @@ func TestNewTrendService(t *testing.T) { t.Errorf("Expected cacheS to be %v, but got %v", cacheS, trendService.cacheS) } - if trendService.server != server { - t.Errorf("Expected server to be %v, but got %v", server, trendService.server) + if trendService.cls != server { + t.Errorf("Expected server to be %v, but got %v", server, trendService.cls) } if trendService.connChan != internalTrendSChan { t.Errorf("Expected connChan to be %v, but got %v", internalTrendSChan, trendService.connChan) diff --git a/utils/consts.go b/utils/consts.go index 85f6ac92b..398dce1f5 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -990,6 +990,7 @@ const ( CDRServer = "CDRServer" GuardianS = "GuardianS" ServiceManagerS = "ServiceManager" + CommonListenerS = "CommonListenerS" ) // Lower service names