Add new CommonListenerService

This commit is contained in:
ionutboangiu
2024-11-09 18:07:18 +02:00
committed by Dan Christian Bogos
parent e78722ae4e
commit 21409fc92e
53 changed files with 670 additions and 406 deletions

View File

@@ -37,7 +37,7 @@ import (
// NewAccountService returns the Account Service
func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager, server *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager, cls *CommonListenerService,
internalChan chan birpc.ClientConnector,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &AccountService{
@@ -47,7 +47,7 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
cacheS: cacheS,
filterSChan: filterSChan,
connMgr: connMgr,
server: server,
cls: cls,
anz: anz,
srvDep: srvDep,
rldChan: make(chan struct{}, 1),
@@ -57,19 +57,21 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
// AccountService implements Service interface
type AccountService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
cacheS *CacheService
anz *AnalyzerService
filterSChan chan *engine.FilterS
connMgr *engine.ConnManager
server *commonlisteners.CommonListenerS
acts *accounts.AccountS
cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
acts *accounts.AccountS
connChan chan birpc.ClientConnector // publish the internal Subsystem when available
anz *AnalyzerService
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -79,6 +81,10 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
return utils.ErrServiceAlreadyRunning
}
acts.cl, err = acts.cls.WaitForCLS(ctx)
if err != nil {
return err
}
if err = acts.cacheS.WaitToPrecache(ctx,
utils.CacheAccounts,
utils.CacheAccountsFilterIndexes); err != nil {
@@ -110,7 +116,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
}
if !acts.cfg.DispatcherSCfg().Enabled {
acts.server.RpcRegister(srv)
acts.cl.RpcRegister(srv)
}
acts.connChan <- acts.anz.GetInternalCodec(srv, utils.AccountS)
@@ -131,7 +137,7 @@ func (acts *AccountService) Shutdown() (err error) {
acts.acts = nil
<-acts.connChan
acts.Unlock()
acts.server.RpcUnregisterName(utils.AccountSv1)
acts.cl.RpcUnregisterName(utils.AccountSv1)
return
}
@@ -139,7 +145,7 @@ func (acts *AccountService) Shutdown() (err error) {
func (acts *AccountService) IsRunning() bool {
acts.RLock()
defer acts.RUnlock()
return acts != nil && acts.acts != nil
return acts.acts != nil
}
// ServiceName returns the service name

View File

@@ -57,7 +57,7 @@ func TestAccountSCoverage(t *testing.T) {
dm: db,
cacheS: chS,
filterSChan: filterSChan,
server: cls,
cls: cls,
rldChan: testChan,
stopChan: make(chan struct{}, 1),
connChan: actRPC,

View File

@@ -38,7 +38,7 @@ import (
func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
server *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
cls *CommonListenerService, internalChan chan birpc.ClientConnector,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &ActionService{
connChan: internalChan,
@@ -47,7 +47,7 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
cls: cls,
anz: anz,
srvDep: srvDep,
rldChan: make(chan struct{}, 1),
@@ -57,19 +57,22 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
// ActionService implements Service interface
type ActionService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
filterSChan chan *engine.FilterS
connMgr *engine.ConnManager
server *commonlisteners.CommonListenerS
acts *actions.ActionS
cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
acts *actions.ActionS
connChan chan birpc.ClientConnector // publish the internal Subsystem when available
anz *AnalyzerService
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -79,6 +82,9 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
return utils.ErrServiceAlreadyRunning
}
if acts.cl, err = acts.cls.WaitForCLS(ctx); err != nil {
return err
}
if err = acts.cacheS.WaitToPrecache(ctx,
utils.CacheActionProfiles,
utils.CacheActionProfilesFilterIndexes); err != nil {
@@ -109,7 +115,7 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
}
// srv, _ := birpc.NewService(apis.NewActionSv1(acts.acts), "", false)
if !acts.cfg.DispatcherSCfg().Enabled {
acts.server.RpcRegister(srv)
acts.cl.RpcRegister(srv)
}
acts.connChan <- acts.anz.GetInternalCodec(srv, utils.ActionS)
return
@@ -129,7 +135,7 @@ func (acts *ActionService) Shutdown() (err error) {
acts.acts.Shutdown()
acts.acts = nil
<-acts.connChan
acts.server.RpcUnregisterName(utils.ActionSv1)
acts.cl.RpcUnregisterName(utils.ActionSv1)
return
}
@@ -137,7 +143,7 @@ func (acts *ActionService) Shutdown() (err error) {
func (acts *ActionService) IsRunning() bool {
acts.RLock()
defer acts.RUnlock()
return acts != nil && acts.acts != nil
return acts.acts != nil
}
// ServiceName returns the service name

View File

@@ -57,7 +57,7 @@ func TestActionSCoverage(t *testing.T) {
dm: db,
cacheS: chS,
filterSChan: filterSChan,
server: cls,
cls: cls,
rldChan: testChan,
stopChan: make(chan struct{}, 1),
connChan: actRPC,

View File

@@ -34,7 +34,7 @@ import (
// NewAPIerSv1Service returns the APIerSv1 Service
func NewAdminSv1Service(cfg *config.CGRConfig,
dm *DataDBService, storDB *StorDBService,
filterSChan chan *engine.FilterS, server *commonlisteners.CommonListenerS,
filterSChan chan *engine.FilterS, cls *CommonListenerService,
internalAPIerSv1Chan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
@@ -44,30 +44,31 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
dm: dm,
storDB: storDB,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
}
}
// APIerSv1Service implements Service interface
// AdminSv1Service implements Service interface
type AdminSv1Service struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
storDB *StorDBService
anz *AnalyzerService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
api *apis.AdminSv1
connChan chan birpc.ClientConnector
api *apis.AdminSv1
cl *commonlisteners.CommonListenerS
stopChan chan struct{}
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
connChan chan birpc.ClientConnector
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// Start should handle the sercive start
@@ -77,6 +78,9 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
return utils.ErrServiceAlreadyRunning
}
if apiService.cl, err = apiService.cls.WaitForCLS(ctx); err != nil {
return err
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, apiService.filterSChan); err != nil {
return
@@ -105,11 +109,11 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
if !apiService.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
apiService.server.RpcRegister(s)
apiService.cl.RpcRegister(s)
}
rpl, _ := engine.NewService(apis.NewReplicatorSv1(datadb, apiService.api))
for _, s := range rpl {
apiService.server.RpcRegister(s)
apiService.cl.RpcRegister(s)
}
}
@@ -130,7 +134,7 @@ func (apiService *AdminSv1Service) Shutdown() (err error) {
// close(apiService.stopChan)
apiService.api = nil
<-apiService.connChan
apiService.server.RpcUnregisterName(utils.AdminSv1)
apiService.cl.RpcUnregisterName(utils.AdminSv1)
apiService.Unlock()
return
}

View File

@@ -32,14 +32,14 @@ import (
)
// NewAnalyzerService returns the Analyzer Service
func NewAnalyzerService(cfg *config.CGRConfig, server *commonlisteners.CommonListenerS,
func NewAnalyzerService(cfg *config.CGRConfig, clSrv *CommonListenerService,
filterSChan chan *engine.FilterS,
internalAnalyzerSChan chan birpc.ClientConnector,
srvDep map[string]*sync.WaitGroup) *AnalyzerService {
return &AnalyzerService{
connChan: internalAnalyzerSChan,
cfg: cfg,
server: server,
cls: clSrv,
filterSChan: filterSChan,
srvDep: srvDep,
}
@@ -48,17 +48,18 @@ func NewAnalyzerService(cfg *config.CGRConfig, server *commonlisteners.CommonLis
// AnalyzerService implements Service interface
type AnalyzerService struct {
sync.RWMutex
cfg *config.CGRConfig
server *commonlisteners.CommonListenerS
cls *CommonListenerService
filterSChan chan *engine.FilterS
ctx *context.Context
cancelFunc context.CancelFunc
anz *analyzers.AnalyzerS
started chan struct{}
anz *analyzers.AnalyzerS
cl *commonlisteners.CommonListenerS
connChan chan birpc.ClientConnector
srvDep map[string]*sync.WaitGroup
started chan struct{}
cancelFunc context.CancelFunc
connChan chan birpc.ClientConnector
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// Start should handle the sercive start
@@ -67,6 +68,10 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun
return utils.ErrServiceAlreadyRunning
}
if anz.cl, err = anz.cls.WaitForCLS(ctx); err != nil {
return
}
anz.Lock()
defer anz.Unlock()
anz.started = make(chan struct{})
@@ -75,14 +80,15 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun
return
}
close(anz.started)
anz.ctx, anz.cancelFunc = context.WithCancel(ctx)
anzCtx, cancel := context.WithCancel(ctx)
anz.cancelFunc = cancel
go func(a *analyzers.AnalyzerS) {
if err := a.ListenAndServe(anz.ctx); err != nil {
if err := a.ListenAndServe(anzCtx); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AnalyzerS, err.Error()))
shtDwn()
}
}(anz.anz)
anz.server.SetAnalyzer(anz.anz)
anz.cl.SetAnalyzer(anz.anz)
go anz.start(ctx)
return
}
@@ -104,7 +110,7 @@ func (anz *AnalyzerService) start(ctx *context.Context) {
// srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false)
if !anz.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
anz.server.RpcRegister(s)
anz.cl.RpcRegister(s)
}
}
anz.Unlock()
@@ -120,7 +126,7 @@ func (anz *AnalyzerService) Reload(*context.Context, context.CancelFunc) (err er
func (anz *AnalyzerService) Shutdown() (err error) {
anz.Lock()
anz.cancelFunc()
anz.server.SetAnalyzer(nil)
anz.cl.SetAnalyzer(nil)
anz.anz.Shutdown()
anz.anz = nil
@@ -131,7 +137,7 @@ func (anz *AnalyzerService) Shutdown() (err error) {
anz.started = nil
<-anz.connChan
anz.Unlock()
anz.server.RpcUnregisterName(utils.AnalyzerSv1)
anz.cl.RpcUnregisterName(utils.AnalyzerSv1)
return
}

View File

@@ -46,7 +46,7 @@ func TestAnalyzerCoverage(t *testing.T) {
anz2 := &AnalyzerService{
RWMutex: sync.RWMutex{},
cfg: cfg,
server: cls,
cls: cls,
filterSChan: filterSChan,
connChan: connChan,
srvDep: srvDep,

View File

@@ -101,7 +101,7 @@ func (ast *AsteriskAgent) shutdown() {
func (ast *AsteriskAgent) IsRunning() bool {
ast.RLock()
defer ast.RUnlock()
return ast != nil && ast.smas != nil
return ast.smas != nil
}
// ServiceName returns the service name

View File

@@ -35,7 +35,7 @@ import (
// NewAttributeService returns the Attribute Service
func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
server *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
cls *CommonListenerService, internalChan chan birpc.ClientConnector,
anz *AnalyzerService, dspS *DispatcherService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &AttributeService{
@@ -44,7 +44,7 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
cls: cls,
anz: anz,
srvDep: srvDep,
dspS: dspS,
@@ -54,17 +54,20 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
// AttributeService implements Service interface
type AttributeService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *DataDBService
cacheS *CacheService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
attrS *engine.AttributeS
rpc *apis.AttributeSv1 // useful on restart
cls *CommonListenerService
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
dspS *DispatcherService
filterSChan chan *engine.FilterS
attrS *engine.AttributeS
cl *commonlisteners.CommonListenerS
rpc *apis.AttributeSv1 // useful on restart
connChan chan birpc.ClientConnector // publish the internal Subsystem when available
anz *AnalyzerService
dspS *DispatcherService
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -74,6 +77,9 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
return utils.ErrServiceAlreadyRunning
}
if attrS.cl, err = attrS.cls.WaitForCLS(ctx); err != nil {
return err
}
if err = attrS.cacheS.WaitToPrecache(ctx,
utils.CacheAttributeProfiles,
utils.CacheAttributeFilterIndexes); err != nil {
@@ -100,7 +106,7 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
// srv, _ := birpc.NewService(attrS.rpc, "", false)
if !attrS.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
attrS.server.RpcRegister(s)
attrS.cl.RpcRegister(s)
}
}
dspShtdChan := attrS.dspS.RegisterShutdownChan(attrS.ServiceName())
@@ -110,7 +116,7 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
return
}
if attrS.IsRunning() {
attrS.server.RpcRegister(srv)
attrS.cl.RpcRegister(srv)
}
}
@@ -131,7 +137,7 @@ func (attrS *AttributeService) Shutdown() (err error) {
attrS.attrS = nil
attrS.rpc = nil
<-attrS.connChan
attrS.server.RpcUnregisterName(utils.AttributeSv1)
attrS.cl.RpcUnregisterName(utils.AttributeSv1)
attrS.dspS.UnregisterShutdownChan(attrS.ServiceName())
attrS.Unlock()
return

View File

@@ -51,7 +51,7 @@ func TestAttributeSCoverage(t *testing.T) {
dm: db,
cacheS: chS,
filterSChan: filterSChan,
server: cls,
cls: cls,
anz: anz,
srvDep: srvDep,
dspS: &DispatcherService{srvsReload: map[string]chan struct{}{}},

View File

@@ -32,7 +32,7 @@ import (
// NewCacheService .
func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager,
server *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
cls *CommonListenerService, internalChan chan birpc.ClientConnector,
anz *AnalyzerService, // dspS *DispatcherService,
cores *CoreService,
srvDep map[string]*sync.WaitGroup) *CacheService {
@@ -41,7 +41,7 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C
srvDep: srvDep,
anz: anz,
cores: cores,
server: server,
cls: cls,
dm: dm,
connMgr: connMgr,
rpc: internalChan,
@@ -51,20 +51,26 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C
// CacheService implements Agent interface
type CacheService struct {
cfg *config.CGRConfig
anz *AnalyzerService
cores *CoreService
server *commonlisteners.CommonListenerS
dm *DataDBService
connMgr *engine.ConnManager
rpc chan birpc.ClientConnector
srvDep map[string]*sync.WaitGroup
anz *AnalyzerService
cores *CoreService
cls *CommonListenerService
dm *DataDBService
cl *commonlisteners.CommonListenerS
cacheCh chan *engine.CacheS
rpc chan birpc.ClientConnector
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// Start should handle the sercive start
func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (err error) {
if cS.cl, err = cS.cls.WaitForCLS(ctx); err != nil {
return err
}
var dm *engine.DataManager
if dm, err = cS.dm.WaitForDM(ctx); err != nil {
return
@@ -85,7 +91,7 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e
// srv, _ := birpc.NewService(apis.NewCacheSv1(engine.Cache), "", false)
if !cS.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
cS.server.RpcRegister(s)
cS.cl.RpcRegister(s)
}
}
cS.rpc <- cS.anz.GetInternalCodec(srv, utils.CacheS)
@@ -99,7 +105,7 @@ func (cS *CacheService) Reload(*context.Context, context.CancelFunc) (_ error) {
// Shutdown stops the service
func (cS *CacheService) Shutdown() (_ error) {
cS.server.RpcUnregisterName(utils.CacheSv1)
cS.cl.RpcUnregisterName(utils.CacheSv1)
return
}

View File

@@ -36,7 +36,7 @@ import (
// NewCDRServer returns the CDR Server
func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
storDB *StorDBService, filterSChan chan *engine.FilterS,
server *commonlisteners.CommonListenerS, internalCDRServerChan chan birpc.ClientConnector,
cls *CommonListenerService, internalCDRServerChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &CDRService{
@@ -45,7 +45,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
storDB: storDB,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -55,93 +55,97 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
// CDRService implements Service interface
type CDRService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *DataDBService
storDB *StorDBService
cls *CommonListenerService
dm *DataDBService
storDB *StorDBService
anz *AnalyzerService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
cdrS *cdrs.CDRServer
cdrS *cdrs.CDRServer
cl *commonlisteners.CommonListenerS
connChan chan birpc.ClientConnector
connMgr *engine.ConnManager
stopChan chan struct{}
anz *AnalyzerService
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// Start should handle the sercive start
func (cdrSrv *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
if cdrSrv.IsRunning() {
func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
if cs.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs))
if cs.cl, err = cs.cls.WaitForCLS(ctx); err != nil {
return err
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, cdrSrv.filterSChan); err != nil {
if filterS, err = waitForFilterS(ctx, cs.filterSChan); err != nil {
return
}
var datadb *engine.DataManager
if datadb, err = cdrSrv.dm.WaitForDM(ctx); err != nil {
if datadb, err = cs.dm.WaitForDM(ctx); err != nil {
return
}
if err = cdrSrv.anz.WaitForAnalyzerS(ctx); err != nil {
if err = cs.anz.WaitForAnalyzerS(ctx); err != nil {
return
}
storDBChan := make(chan engine.StorDB, 1)
cdrSrv.stopChan = make(chan struct{})
cdrSrv.storDB.RegisterSyncChan(storDBChan)
cs.stopChan = make(chan struct{})
cs.storDB.RegisterSyncChan(storDBChan)
cdrSrv.Lock()
defer cdrSrv.Unlock()
cs.Lock()
defer cs.Unlock()
cdrSrv.cdrS = cdrs.NewCDRServer(cdrSrv.cfg, datadb, filterS, cdrSrv.connMgr, storDBChan)
go cdrSrv.cdrS.ListenAndServe(cdrSrv.stopChan)
cs.cdrS = cdrs.NewCDRServer(cs.cfg, datadb, filterS, cs.connMgr, storDBChan)
go cs.cdrS.ListenAndServe(cs.stopChan)
runtime.Gosched()
utils.Logger.Info("Registering CDRS RPC service.")
srv, err := engine.NewServiceWithPing(cdrSrv.cdrS, utils.CDRsV1, utils.V1Prfx)
srv, err := engine.NewServiceWithPing(cs.cdrS, utils.CDRsV1, utils.V1Prfx)
if err != nil {
return err
}
if !cdrSrv.cfg.DispatcherSCfg().Enabled {
cdrSrv.server.RpcRegister(srv)
if !cs.cfg.DispatcherSCfg().Enabled {
cs.cl.RpcRegister(srv)
}
cdrSrv.connChan <- cdrSrv.anz.GetInternalCodec(srv, utils.CDRServer) // Signal that cdrS is operational
cs.connChan <- cs.anz.GetInternalCodec(srv, utils.CDRServer) // Signal that cdrS is operational
return
}
// Reload handles the change of config
func (cdrService *CDRService) Reload(*context.Context, context.CancelFunc) (err error) {
func (cs *CDRService) Reload(*context.Context, context.CancelFunc) (err error) {
return
}
// Shutdown stops the service
func (cdrService *CDRService) Shutdown() (err error) {
cdrService.Lock()
close(cdrService.stopChan)
cdrService.cdrS = nil
<-cdrService.connChan
cdrService.Unlock()
cdrService.server.RpcUnregisterName(utils.CDRsV1)
func (cs *CDRService) Shutdown() (err error) {
cs.Lock()
close(cs.stopChan)
cs.cdrS = nil
<-cs.connChan
cs.Unlock()
cs.cl.RpcUnregisterName(utils.CDRsV1)
return
}
// IsRunning returns if the service is running
func (cdrService *CDRService) IsRunning() bool {
cdrService.RLock()
defer cdrService.RUnlock()
return cdrService != nil && cdrService.cdrS != nil
func (cs *CDRService) IsRunning() bool {
cs.RLock()
defer cs.RUnlock()
return cs.cdrS != nil
}
// ServiceName returns the service name
func (cdrService *CDRService) ServiceName() string {
func (cs *CDRService) ServiceName() string {
return utils.CDRServer
}
// ShouldRun returns if the service should be running
func (cdrService *CDRService) ShouldRun() bool {
return cdrService.cfg.CdrsCfg().Enabled
func (cs *CDRService) ShouldRun() bool {
return cs.cfg.CdrsCfg().Enabled
}

View File

@@ -54,7 +54,7 @@ func TestCdrsCoverage(t *testing.T) {
dm: db,
storDB: stordb,
filterSChan: filterSChan,
server: cls,
cls: cls,
connChan: make(chan birpc.ClientConnector, 1),
connMgr: nil,
stopChan: make(chan struct{}, 1),

View File

@@ -29,12 +29,10 @@ import (
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/registrarc"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
@@ -50,7 +48,6 @@ func NewCGREngine(cfg *config.CGRConfig) *CGREngine {
caps: caps, // caps is used to limit RPC CPS
shdWg: shdWg, // wait for shutdown
srvManager: servmanager.NewServiceManager(shdWg, cM, cfg),
cls: commonlisteners.NewCommonListenerS(caps),
srvDep: map[string]*sync.WaitGroup{
utils.AccountS: new(sync.WaitGroup),
utils.ActionS: new(sync.WaitGroup),
@@ -98,7 +95,6 @@ type CGREngine struct {
srvDep map[string]*sync.WaitGroup
shdWg *sync.WaitGroup
cM *engine.ConnManager
cls *commonlisteners.CommonListenerS
caps *engine.Caps
cpuPrfF *os.File
@@ -107,6 +103,7 @@ type CGREngine struct {
gvS *GlobalVarS
dmS *DataDBService
sdbS *StorDBService
cls *CommonListenerService
anzS *AnalyzerService
coreS *CoreService
cacheS *CacheService
@@ -133,13 +130,6 @@ func (cgr *CGREngine) AddService(service servmanager.Service, connName, apiPrefi
}
func (cgr *CGREngine) InitServices(setVersions bool) {
if len(cgr.cfg.HTTPCfg().RegistrarSURL) != 0 {
cgr.cls.RegisterHTTPFunc(cgr.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar)
}
if cgr.cfg.ConfigSCfg().Enabled {
cgr.cls.RegisterHTTPFunc(cgr.cfg.ConfigSCfg().URL, config.HandlerConfigS)
}
// init the channel here because we need to pass them to connManager
cgr.iServeManagerCh = make(chan birpc.ClientConnector, 1)
cgr.iConfigCh = make(chan birpc.ClientConnector, 1)
@@ -201,14 +191,15 @@ func (cgr *CGREngine) InitServices(setVersions bool) {
cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep)
cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, setVersions, cgr.srvDep)
cgr.sdbS = NewStorDBService(cgr.cfg, setVersions, cgr.srvDep)
cgr.cls = NewCommonListenerService(cgr.cfg, cgr.caps, cgr.srvDep)
cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.cls,
cgr.iFilterSCh, iAnalyzerSCh, cgr.srvDep) // init AnalyzerS
cgr.iFilterSCh, iAnalyzerSCh, cgr.srvDep)
cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.cls, iCoreSv1Ch, cgr.anzS, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep) // init CoreSv1
cgr.coreS = NewCoreService(cgr.cfg, cgr.caps, cgr.cls, iCoreSv1Ch, cgr.anzS, cgr.cpuPrfF, cgr.shdWg, cgr.srvDep)
cgr.cacheS = NewCacheService(cgr.cfg, cgr.dmS, cgr.cM,
cgr.cls, iCacheSCh, cgr.anzS, cgr.coreS,
cgr.srvDep) // init CacheS
cgr.srvDep)
dspS := NewDispatcherService(cgr.cfg, cgr.dmS, cgr.cacheS,
cgr.iFilterSCh, cgr.cls, cgr.iDispatcherSCh, cgr.cM,
@@ -219,7 +210,7 @@ func (cgr *CGREngine) InitServices(setVersions bool) {
cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.cls, cgr.srvDep)
cgr.srvManager.AddServices(cgr.gvS, cgr.coreS, cgr.cacheS,
cgr.srvManager.AddServices(cgr.gvS, cgr.cls, cgr.coreS, cgr.cacheS,
cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.sdbS, cgr.efs,
NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls,
iAdminSCh, cgr.cM, cgr.anzS, cgr.srvDep),
@@ -257,7 +248,7 @@ func (cgr *CGREngine) InitServices(setVersions bool) {
NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.cls, iCDRServerCh,
cgr.cM, cgr.anzS, cgr.srvDep),
NewRegistrarCService(cgr.cfg, cgr.cls, cgr.cM, cgr.anzS, cgr.srvDep),
NewRegistrarCService(cgr.cfg, cgr.cM, cgr.anzS, cgr.srvDep),
NewRateService(cgr.cfg, cgr.cacheS, cgr.iFilterSCh, cgr.dmS,
cgr.cls, iRateSCh, cgr.anzS, cgr.srvDep),
@@ -279,6 +270,13 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu
cgr.shdWg.Done()
return
}
if cgr.cls.ShouldRun() {
cgr.shdWg.Add(1)
if err = cgr.cls.Start(ctx, shtDw); err != nil {
cgr.shdWg.Done()
return
}
}
if cgr.efs.ShouldRun() { // efs checking first because of loggers
cgr.shdWg.Add(1)
if err = cgr.efs.Start(ctx, shtDw); err != nil {
@@ -323,9 +321,9 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu
go cgrStartFilterService(ctx, cgr.iFilterSCh, cgr.cacheS.GetCacheSChan(), cgr.cM,
cgr.cfg, cgr.dmS)
cgrInitServiceManagerV1(cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.cls, cgr.anzS)
cgrInitGuardianSv1(cgr.iGuardianSCh, cgr.cfg, cgr.cls, cgr.anzS) // init GuardianSv1
cgrInitConfigSv1(cgr.iConfigCh, cgr.cfg, cgr.cls, cgr.anzS)
cgrInitServiceManagerV1(ctx, cgr.iServeManagerCh, cgr.srvManager, cgr.cfg, cgr.cls, cgr.anzS)
cgrInitGuardianSv1(ctx, cgr.iGuardianSCh, cgr.cfg, cgr.cls, cgr.anzS)
cgrInitConfigSv1(ctx, cgr.iConfigCh, cgr.cfg, cgr.cls, cgr.anzS)
if preload != utils.EmptyString {
if err = cgrRunPreload(ctx, cgr.cfg, preload, cgr.ldrs); err != nil {

View File

@@ -34,7 +34,7 @@ import (
// NewChargerService returns the Charger Service
func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS, server *commonlisteners.CommonListenerS,
cacheS *CacheService, filterSChan chan *engine.FilterS, cls *CommonListenerService,
internalChargerSChan chan birpc.ClientConnector, connMgr *engine.ConnManager,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &ChargerService{
@@ -43,7 +43,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -53,16 +53,19 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
// ChargerService implements Service interface
type ChargerService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
cacheS *CacheService
anz *AnalyzerService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
chrS *engine.ChargerS
chrS *engine.ChargerS
cl *commonlisteners.CommonListenerS
connChan chan birpc.ClientConnector
anz *AnalyzerService
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -72,6 +75,9 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
return utils.ErrServiceAlreadyRunning
}
if chrS.cl, err = chrS.cls.WaitForCLS(ctx); err != nil {
return err
}
if err = chrS.cacheS.WaitToPrecache(ctx,
utils.CacheChargerProfiles,
utils.CacheChargerFilterIndexes); err != nil {
@@ -97,7 +103,7 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
// srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false)
if !chrS.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
chrS.server.RpcRegister(s)
chrS.cl.RpcRegister(s)
}
}
chrS.connChan <- chrS.anz.GetInternalCodec(srv, utils.ChargerS)
@@ -116,7 +122,7 @@ func (chrS *ChargerService) Shutdown() (err error) {
chrS.chrS.Shutdown()
chrS.chrS = nil
<-chrS.connChan
chrS.server.RpcUnregisterName(utils.ChargerSv1)
chrS.cl.RpcUnregisterName(utils.ChargerSv1)
return
}
@@ -124,7 +130,7 @@ func (chrS *ChargerService) Shutdown() (err error) {
func (chrS *ChargerService) IsRunning() bool {
chrS.RLock()
defer chrS.RUnlock()
return chrS != nil && chrS.chrS != nil
return chrS.chrS != nil
}
// ServiceName returns the service name

View File

@@ -53,7 +53,7 @@ func TestChargerSCoverage(t *testing.T) {
dm: db,
cacheS: chS,
filterSChan: filterSChan,
server: cls,
cls: cls,
connMgr: nil,
anz: anz,
srvDep: srvDep,

116
services/commonlisteners.go Normal file
View File

@@ -0,0 +1,116 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/registrarc"
"github.com/cgrates/cgrates/utils"
)
// NewCommonListenerService instantiates a new CommonListenerService.
func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) *CommonListenerService {
return &CommonListenerService{
cfg: cfg,
caps: caps,
clsCh: make(chan *commonlisteners.CommonListenerS, 1),
srvDep: srvDep,
}
}
// CommonListenerService implements Service interface.
type CommonListenerService struct {
mu sync.RWMutex
cls *commonlisteners.CommonListenerS
clsCh chan *commonlisteners.CommonListenerS
caps *engine.Caps
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// Start handles the service start.
func (cl *CommonListenerService) Start(*context.Context, context.CancelFunc) error {
if cl.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
cl.mu.Lock()
defer cl.mu.Unlock()
cl.cls = commonlisteners.NewCommonListenerS(cl.caps)
cl.clsCh <- cl.cls
if len(cl.cfg.HTTPCfg().RegistrarSURL) != 0 {
cl.cls.RegisterHTTPFunc(cl.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar)
}
if cl.cfg.ConfigSCfg().Enabled {
cl.cls.RegisterHTTPFunc(cl.cfg.ConfigSCfg().URL, config.HandlerConfigS)
}
return nil
}
// Reload handles the config changes.
func (cl *CommonListenerService) Reload(*context.Context, context.CancelFunc) error {
return nil
}
// Shutdown stops the service.
func (cl *CommonListenerService) Shutdown() error {
cl.mu.Lock()
defer cl.mu.Unlock()
cl.cls = nil
<-cl.clsCh
return nil
}
// IsRunning returns whether the service is running or not.
func (cl *CommonListenerService) IsRunning() bool {
cl.mu.RLock()
defer cl.mu.RUnlock()
return cl.cls != nil
}
// ServiceName returns the service name
func (cl *CommonListenerService) ServiceName() string {
return utils.CommonListenerS
}
// ShouldRun returns if the service should be running.
func (cl *CommonListenerService) ShouldRun() bool {
return true
}
// WaitForCLS waits for the CommonListenerS structure to be initialized.
func (cl *CommonListenerService) WaitForCLS(ctx *context.Context) (*commonlisteners.CommonListenerS, error) {
cl.mu.RLock()
clsCh := cl.clsCh
cl.mu.RUnlock()
var cls *commonlisteners.CommonListenerS
select {
case <-ctx.Done():
return nil, ctx.Err()
case cls = <-clsCh:
clsCh <- cls
}
return cls, nil
}

View File

@@ -33,7 +33,7 @@ import (
)
// NewCoreService returns the Core Service
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *commonlisteners.CommonListenerS,
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, cls *CommonListenerService,
internalCoreSChan chan birpc.ClientConnector, anz *AnalyzerService,
fileCPU *os.File, shdWg *sync.WaitGroup,
srvDep map[string]*sync.WaitGroup) *CoreService {
@@ -43,7 +43,7 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *commonlist
cfg: cfg,
caps: caps,
fileCPU: fileCPU,
server: server,
cls: cls,
anz: anz,
srvDep: srvDep,
csCh: make(chan *cores.CoreS, 1),
@@ -52,18 +52,22 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *commonlist
// CoreService implements Service interface
type CoreService struct {
mu sync.RWMutex
cfg *config.CGRConfig
server *commonlisteners.CommonListenerS
mu sync.RWMutex
anz *AnalyzerService
cls *CommonListenerService
cS *cores.CoreS
cl *commonlisteners.CommonListenerS
fileCPU *os.File
caps *engine.Caps
csCh chan *cores.CoreS
stopChan chan struct{}
shdWg *sync.WaitGroup
fileCPU *os.File
cS *cores.CoreS
connChan chan birpc.ClientConnector
anz *AnalyzerService
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
csCh chan *cores.CoreS
}
// Start should handle the service start
@@ -72,6 +76,11 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err
return utils.ErrServiceAlreadyRunning
}
var err error
cS.cl, err = cS.cls.WaitForCLS(ctx)
if err != nil {
return err
}
if err := cS.anz.WaitForAnalyzerS(ctx); err != nil {
return err
}
@@ -88,7 +97,7 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err
}
if !cS.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
cS.server.RpcRegister(s)
cS.cl.RpcRegister(s)
}
}
cS.connChan <- cS.anz.GetInternalCodec(srv, utils.CoreS)
@@ -111,7 +120,7 @@ func (cS *CoreService) Shutdown() error {
cS.cS = nil
<-cS.connChan
<-cS.csCh
cS.server.RpcUnregisterName(utils.CoreSv1)
cS.cl.RpcUnregisterName(utils.CoreSv1)
return nil
}

View File

@@ -33,7 +33,7 @@ import (
// NewDispatcherService returns the Dispatcher Service
func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
server *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
cls *CommonListenerService, internalChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) *DispatcherService {
return &DispatcherService{
@@ -42,7 +42,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -53,19 +53,21 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
// DispatcherService implements Service interface
type DispatcherService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
dspS *dispatchers.DispatcherService
connChan chan birpc.ClientConnector
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
dspS *dispatchers.DispatcherService
cl *commonlisteners.CommonListenerS
connChan chan birpc.ClientConnector
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvsReload map[string]chan struct{}
srvDep map[string]*sync.WaitGroup
}
// Start should handle the sercive start
@@ -74,6 +76,9 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
return utils.ErrServiceAlreadyRunning
}
utils.Logger.Info("Starting CGRateS DispatcherS service.")
if dspS.cl, err = dspS.cls.WaitForCLS(ctx); err != nil {
return err
}
if err = dspS.cacheS.WaitToPrecache(ctx,
utils.CacheDispatcherProfiles,
utils.CacheDispatcherHosts,
@@ -102,7 +107,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
srv, _ := engine.NewDispatcherService(dspS.dspS)
// srv, _ := birpc.NewService(apis.NewDispatcherSv1(dspS.dspS), "", false)
for _, s := range srv {
dspS.server.RpcRegister(s)
dspS.cl.RpcRegister(s)
}
dspS.connMgr.EnableDispatcher(srv)
// for the moment we dispable Apier through dispatcher
@@ -125,8 +130,8 @@ func (dspS *DispatcherService) Shutdown() (err error) {
dspS.dspS.Shutdown()
dspS.dspS = nil
<-dspS.connChan
dspS.server.RpcUnregisterName(utils.DispatcherSv1)
dspS.server.RpcUnregisterName(utils.AttributeSv1)
dspS.cl.RpcUnregisterName(utils.DispatcherSv1)
dspS.cl.RpcUnregisterName(utils.AttributeSv1)
dspS.unregisterAllDispatchedSubsystems()
dspS.connMgr.DisableDispatcher()
@@ -138,7 +143,7 @@ func (dspS *DispatcherService) Shutdown() (err error) {
func (dspS *DispatcherService) IsRunning() bool {
dspS.RLock()
defer dspS.RUnlock()
return dspS != nil && dspS.dspS != nil
return dspS.dspS != nil
}
// ServiceName returns the service name
@@ -152,7 +157,7 @@ func (dspS *DispatcherService) ShouldRun() bool {
}
func (dspS *DispatcherService) unregisterAllDispatchedSubsystems() {
dspS.server.RpcUnregisterName(utils.AttributeSv1)
dspS.cl.RpcUnregisterName(utils.AttributeSv1)
}
func (dspS *DispatcherService) RegisterShutdownChan(subsys string) (c chan struct{}) {

View File

@@ -53,7 +53,7 @@ func TestDispatcherSCoverage(t *testing.T) {
dm: db,
cacheS: chS,
filterSChan: filterSChan,
server: cls,
cls: cls,
connMgr: srv.connMgr,
connChan: make(chan birpc.ClientConnector, 1),
anz: anz,

View File

@@ -34,13 +34,13 @@ import (
// NewEventExporterService constructs EventExporterService
func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager, server *commonlisteners.CommonListenerS, intConnChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, cls *CommonListenerService, intConnChan chan birpc.ClientConnector,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &EventExporterService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
server: server,
cls: cls,
intConnChan: intConnChan,
anz: anz,
srvDep: srvDep,
@@ -51,15 +51,17 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil
type EventExporterService struct {
mu sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
anz *AnalyzerService
filterSChan chan *engine.FilterS
connMgr *engine.ConnManager
server *commonlisteners.CommonListenerS
intConnChan chan birpc.ClientConnector
eeS *ees.EeS
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
eeS *ees.EeS
cl *commonlisteners.CommonListenerS
intConnChan chan birpc.ClientConnector
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// ServiceName returns the service name
@@ -76,7 +78,7 @@ func (es *EventExporterService) ShouldRun() (should bool) {
func (es *EventExporterService) IsRunning() bool {
es.mu.RLock()
defer es.mu.RUnlock()
return es != nil && es.eeS != nil
return es.eeS != nil
}
// Reload handles the change of config
@@ -95,7 +97,7 @@ func (es *EventExporterService) Shutdown() error {
es.eeS.ClearExporterCache()
es.eeS = nil
<-es.intConnChan
es.server.RpcUnregisterName(utils.EeSv1)
es.cl.RpcUnregisterName(utils.EeSv1)
return nil
}
@@ -105,6 +107,10 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
return utils.ErrServiceAlreadyRunning
}
var err error
if es.cl, err = es.cls.WaitForCLS(ctx); err != nil {
return err
}
fltrS, err := waitForFilterS(ctx, es.filterSChan)
if err != nil {
return err
@@ -126,7 +132,7 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
srv, _ := engine.NewServiceWithPing(es.eeS, utils.EeSv1, utils.V1Prfx)
// srv, _ := birpc.NewService(es.rpc, "", false)
if !es.cfg.DispatcherSCfg().Enabled {
es.server.RpcRegister(srv)
es.cl.RpcRegister(srv)
}
es.intConnChan <- es.anz.GetInternalCodec(srv, utils.EEs)
return nil

View File

@@ -48,7 +48,7 @@ func TestEventExporterSCoverage(t *testing.T) {
cfg: cfg,
filterSChan: filterSChan,
connMgr: engine.NewConnManager(cfg),
server: cls,
cls: cls,
intConnChan: make(chan birpc.ClientConnector, 1),
anz: anz,
srvDep: srvDep,

View File

@@ -36,24 +36,26 @@ import (
type ExportFailoverService struct {
sync.Mutex
cfg *config.CGRConfig
connMgr *engine.ConnManager
server *commonlisteners.CommonListenerS
srv *birpc.Service
intConnChan chan birpc.ClientConnector
stopChan chan struct{}
cls *CommonListenerService
efS *efs.EfS
srvDep map[string]*sync.WaitGroup
efS *efs.EfS
cl *commonlisteners.CommonListenerS
srv *birpc.Service
stopChan chan struct{}
intConnChan chan birpc.ClientConnector
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// NewExportFailoverService is the constructor for the TpeService
func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
intConnChan chan birpc.ClientConnector,
server *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) *ExportFailoverService {
cls *CommonListenerService, srvDep map[string]*sync.WaitGroup) *ExportFailoverService {
return &ExportFailoverService{
cfg: cfg,
server: server,
cls: cls,
connMgr: connMgr,
intConnChan: intConnChan,
srvDep: srvDep,
@@ -65,12 +67,15 @@ func (efServ *ExportFailoverService) Start(ctx *context.Context, _ context.Cance
if efServ.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
if efServ.cl, err = efServ.cls.WaitForCLS(ctx); err != nil {
return err
}
efServ.Lock()
efServ.efS = efs.NewEfs(efServ.cfg, efServ.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EFs))
efServ.stopChan = make(chan struct{})
efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx)
efServ.server.RpcRegister(efServ.srv)
efServ.cl.RpcRegister(efServ.srv)
efServ.Unlock()
return
}
@@ -93,9 +98,8 @@ func (efServ *ExportFailoverService) Shutdown() (err error) {
// IsRunning returns if the service is running
func (efServ *ExportFailoverService) IsRunning() bool {
efServ.Lock()
run := efServ != nil && efServ.efS != nil
efServ.Unlock()
return run
defer efServ.Unlock()
return efServ.efS != nil
}
// ShouldRun returns if the service should be running

View File

@@ -37,7 +37,7 @@ func NewEventReaderService(
cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
server *commonlisteners.CommonListenerS,
cls *CommonListenerService,
intConn chan birpc.ClientConnector,
anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
@@ -46,7 +46,7 @@ func NewEventReaderService(
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
server: server,
cls: cls,
intConn: intConn,
anz: anz,
srvDep: srvDep,
@@ -56,16 +56,19 @@ func NewEventReaderService(
// EventReaderService implements Service interface
type EventReaderService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
anz *AnalyzerService
filterSChan chan *engine.FilterS
ers *ers.ERService
ers *ers.ERService
cl *commonlisteners.CommonListenerS
rldChan chan struct{}
stopChan chan struct{}
connMgr *engine.ConnManager
server *commonlisteners.CommonListenerS
intConn chan birpc.ClientConnector
anz *AnalyzerService
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -75,6 +78,9 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel
return utils.ErrServiceAlreadyRunning
}
if erS.cl, err = erS.cls.WaitForCLS(ctx); err != nil {
return err
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, erS.filterSChan); err != nil {
return
@@ -100,7 +106,7 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel
return err
}
if !erS.cfg.DispatcherSCfg().Enabled {
erS.server.RpcRegister(srv)
erS.cl.RpcRegister(srv)
}
erS.intConn <- erS.anz.GetInternalCodec(srv, utils.ERs)
return
@@ -128,7 +134,7 @@ func (erS *EventReaderService) Shutdown() (err error) {
defer erS.Unlock()
close(erS.stopChan)
erS.ers = nil
erS.server.RpcUnregisterName(utils.ErSv1)
erS.cl.RpcUnregisterName(utils.ErSv1)
return
}
@@ -136,7 +142,7 @@ func (erS *EventReaderService) Shutdown() (err error) {
func (erS *EventReaderService) IsRunning() bool {
erS.RLock()
defer erS.RUnlock()
return erS != nil && erS.ers != nil
return erS.ers != nil
}
// ServiceName returns the service name

View File

@@ -52,7 +52,7 @@ func TestEventReaderSCoverage(t *testing.T) {
rldChan: make(chan struct{}, 1),
stopChan: make(chan struct{}, 1),
connMgr: nil,
server: cls,
cls: cls,
srvDep: srvDep,
}
if !srv2.IsRunning() {

View File

@@ -100,7 +100,7 @@ func (fS *FreeswitchAgent) Shutdown() (err error) {
func (fS *FreeswitchAgent) IsRunning() bool {
fS.RLock()
defer fS.RUnlock()
return fS != nil && fS.fS != nil
return fS.fS != nil
}
// ServiceName returns the service name

View File

@@ -33,12 +33,12 @@ import (
// NewHTTPAgent returns the HTTP Agent
func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
server *commonlisteners.CommonListenerS, connMgr *engine.ConnManager,
cls *CommonListenerService, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &HTTPAgent{
cfg: cfg,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -47,14 +47,18 @@ func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
// HTTPAgent implements Agent interface
type HTTPAgent struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
cl *commonlisteners.CommonListenerS
// we can realy stop the HTTPAgent so keep a flag
// if we registerd the handlers
started bool
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -64,6 +68,10 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro
return utils.ErrServiceAlreadyRunning
}
cl, err := ha.cls.WaitForCLS(ctx)
if err != nil {
return err
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ha.filterSChan); err != nil {
return
@@ -73,7 +81,7 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro
ha.started = true
utils.Logger.Info(fmt.Sprintf("<%s> successfully started HTTPAgent", utils.HTTPAgent))
for _, agntCfg := range ha.cfg.HTTPAgentCfg() {
ha.server.RegisterHttpHandler(agntCfg.URL,
cl.RegisterHttpHandler(agntCfg.URL,
agents.NewHTTPAgent(ha.connMgr, agntCfg.SessionSConns, filterS,
ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload,
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
@@ -99,7 +107,7 @@ func (ha *HTTPAgent) Shutdown() (err error) {
func (ha *HTTPAgent) IsRunning() bool {
ha.RLock()
defer ha.RUnlock()
return ha != nil && ha.started
return ha.started
}
// ServiceName returns the service name

View File

@@ -46,7 +46,7 @@ func TestHTTPAgentCoverage(t *testing.T) {
srv2 := &HTTPAgent{
cfg: cfg,
filterSChan: filterSChan,
server: cls,
cls: cls,
started: true,
connMgr: cM,
srvDep: srvDep,

View File

@@ -25,7 +25,6 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
@@ -34,12 +33,12 @@ import (
// NewJanusAgent returns the Janus Agent
func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
server *commonlisteners.CommonListenerS, connMgr *engine.ConnManager,
cls *CommonListenerService, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &JanusAgent{
cfg: cfg,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -48,22 +47,32 @@ func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
// JanusAgent implements Service interface
type JanusAgent struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
jA *agents.JanusAgent
jA *agents.JanusAgent
// we can realy stop the JanusAgent so keep a flag
// if we registerd the jandlers
started bool
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// Start should jandle the sercive start
func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
filterS := <-ja.filterSChan
ja.filterSChan <- filterS
cl, err := ja.cls.WaitForCLS(ctx)
if err != nil {
return err
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ja.filterSChan); err != nil {
return
}
ja.Lock()
if ja.started {
@@ -78,13 +87,13 @@ func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err err
return
}
ja.server.RegisterHttpHandler("POST "+ja.cfg.JanusAgentCfg().URL, http.HandlerFunc(ja.jA.CreateSession))
ja.server.RegisterHttpHandler("OPTIONS "+ja.cfg.JanusAgentCfg().URL, http.HandlerFunc(ja.jA.CORSOptions))
ja.server.RegisterHttpHandler(fmt.Sprintf("OPTIONS %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.SessionKeepalive))
ja.server.RegisterHttpHandler(fmt.Sprintf("OPTIONS %s/{sessionID}/", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.CORSOptions))
ja.server.RegisterHttpHandler(fmt.Sprintf("GET %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.PollSession))
ja.server.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.AttachPlugin))
ja.server.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}/{handleID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.HandlePlugin))
cl.RegisterHttpHandler("POST "+ja.cfg.JanusAgentCfg().URL, http.HandlerFunc(ja.jA.CreateSession))
cl.RegisterHttpHandler("OPTIONS "+ja.cfg.JanusAgentCfg().URL, http.HandlerFunc(ja.jA.CORSOptions))
cl.RegisterHttpHandler(fmt.Sprintf("OPTIONS %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.SessionKeepalive))
cl.RegisterHttpHandler(fmt.Sprintf("OPTIONS %s/{sessionID}/", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.CORSOptions))
cl.RegisterHttpHandler(fmt.Sprintf("GET %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.PollSession))
cl.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.AttachPlugin))
cl.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}/{handleID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.HandlePlugin))
ja.started = true
ja.Unlock()

View File

@@ -106,7 +106,7 @@ func (kam *KamailioAgent) Shutdown() (err error) {
func (kam *KamailioAgent) IsRunning() bool {
kam.RLock()
defer kam.RUnlock()
return kam != nil && kam.kam != nil
return kam.kam != nil
}
// ServiceName returns the service name

View File

@@ -32,7 +32,6 @@ import (
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
@@ -184,41 +183,45 @@ func cgrStartFilterService(ctx *context.Context, iFilterSCh chan *engine.FilterS
}
}
func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig,
server *commonlisteners.CommonListenerS, anz *AnalyzerService) {
func cgrInitGuardianSv1(ctx *context.Context, iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig,
cls *CommonListenerService, anz *AnalyzerService) {
cl, _ := cls.WaitForCLS(ctx)
srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)
if !cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
server.RpcRegister(s)
cl.RpcRegister(s)
}
}
iGuardianSCh <- anz.GetInternalCodec(srv, utils.GuardianS)
}
func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
func cgrInitServiceManagerV1(ctx *context.Context, iServMngrCh chan birpc.ClientConnector,
srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig,
server *commonlisteners.CommonListenerS, anz *AnalyzerService) {
cls *CommonListenerService, anz *AnalyzerService) {
cl, _ := cls.WaitForCLS(ctx)
srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false)
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(srv)
cl.RpcRegister(srv)
}
iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager)
}
func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector,
cfg *config.CGRConfig, server *commonlisteners.CommonListenerS, anz *AnalyzerService) {
func cgrInitConfigSv1(ctx *context.Context, iConfigCh chan birpc.ClientConnector,
cfg *config.CGRConfig, cls *CommonListenerService, anz *AnalyzerService) {
cl, _ := cls.WaitForCLS(ctx)
srv, _ := engine.NewServiceWithName(cfg, utils.ConfigS, true)
// srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false)
if !cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
server.RpcRegister(s)
cl.RpcRegister(s)
}
}
iConfigCh <- anz.GetInternalCodec(srv, utils.ConfigSv1)
}
func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc,
cfg *config.CGRConfig, server *commonlisteners.CommonListenerS, internalDispatcherSChan chan birpc.ClientConnector) {
cfg *config.CGRConfig, cls *CommonListenerService, internalDispatcherSChan chan birpc.ClientConnector) {
cl, _ := cls.WaitForCLS(ctx)
if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this
select {
case dispatcherS := <-internalDispatcherSChan:
@@ -227,7 +230,7 @@ func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc,
return
}
}
server.StartServer(ctx, shtdwnEngine, cfg)
cl.StartServer(ctx, shtdwnEngine, cfg)
}
func waitForFilterS(ctx *context.Context, fsCh chan *engine.FilterS) (filterS *engine.FilterS, err error) {

View File

@@ -33,7 +33,7 @@ import (
// NewLoaderService returns the Loader Service
func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS, server *commonlisteners.CommonListenerS,
filterSChan chan *engine.FilterS, cls *CommonListenerService,
internalLoaderSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) *LoaderService {
@@ -42,7 +42,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
stopChan: make(chan struct{}),
anz: anz,
@@ -53,16 +53,19 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
// LoaderService implements Service interface
type LoaderService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *DataDBService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
stopChan chan struct{}
ldrs *loaders.LoaderS
cls *CommonListenerService
dm *DataDBService
anz *AnalyzerService
filterSChan chan *engine.FilterS
ldrs *loaders.LoaderS
cl *commonlisteners.CommonListenerS
stopChan chan struct{}
connChan chan birpc.ClientConnector
connMgr *engine.ConnManager
anz *AnalyzerService
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -72,6 +75,9 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
return utils.ErrServiceAlreadyRunning
}
if ldrs.cl, err = ldrs.cls.WaitForCLS(ctx); err != nil {
return err
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, ldrs.filterSChan); err != nil {
return
@@ -99,7 +105,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
// srv, _ := birpc.NewService(apis.NewLoaderSv1(ldrs.ldrs), "", false)
if !ldrs.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
ldrs.server.RpcRegister(s)
ldrs.cl.RpcRegister(s)
}
}
ldrs.connChan <- ldrs.anz.GetInternalCodec(srv, utils.LoaderS)
@@ -132,7 +138,7 @@ func (ldrs *LoaderService) Shutdown() (_ error) {
ldrs.ldrs = nil
close(ldrs.stopChan)
<-ldrs.connChan
ldrs.server.RpcUnregisterName(utils.LoaderSv1)
ldrs.cl.RpcUnregisterName(utils.LoaderSv1)
ldrs.Unlock()
return
}
@@ -141,7 +147,7 @@ func (ldrs *LoaderService) Shutdown() (_ error) {
func (ldrs *LoaderService) IsRunning() bool {
ldrs.RLock()
defer ldrs.RUnlock()
return ldrs != nil && ldrs.ldrs != nil && ldrs.ldrs.Enabled()
return ldrs.ldrs != nil && ldrs.ldrs.Enabled()
}
// ServiceName returns the service name

View File

@@ -124,7 +124,7 @@ func (rad *RadiusAgent) shutdown() {
func (rad *RadiusAgent) IsRunning() bool {
rad.RLock()
defer rad.RUnlock()
return rad != nil && rad.rad != nil
return rad.rad != nil
}
// ServiceName returns the service name

View File

@@ -35,7 +35,7 @@ import (
// NewRankingService returns the RankingS Service
func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
server *commonlisteners.CommonListenerS, internalRankingSChan chan birpc.ClientConnector,
cls *CommonListenerService, internalRankingSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &RankingService{
@@ -44,7 +44,7 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -53,16 +53,20 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
type RankingService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
connChan chan birpc.ClientConnector
anz *AnalyzerService
ran *engine.RankingS
srvDep map[string]*sync.WaitGroup
ran *engine.RankingS
cl *commonlisteners.CommonListenerS
connChan chan birpc.ClientConnector
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// Start should handle the sercive start
@@ -72,6 +76,9 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
}
ran.srvDep[utils.DataDB].Add(1)
if ran.cl, err = ran.cls.WaitForCLS(ctx); err != nil {
return err
}
if err = ran.cacheS.WaitToPrecache(ctx,
utils.CacheRankingProfiles,
utils.CacheRankings,
@@ -105,7 +112,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
}
if !ran.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
ran.server.RpcRegister(s)
ran.cl.RpcRegister(s)
}
}
ran.connChan <- ran.anz.GetInternalCodec(srv, utils.RankingS)
@@ -128,13 +135,13 @@ func (ran *RankingService) Shutdown() (err error) {
ran.ran.StopRankingS()
ran.ran = nil
<-ran.connChan
ran.server.RpcUnregisterName(utils.RankingSv1)
ran.cl.RpcUnregisterName(utils.RankingSv1)
return
}
// IsRunning returns if the service is running
func (ran *RankingService) IsRunning() bool {
return ran != nil && ran.ran != nil
return ran.ran != nil
}
// ServiceName returns the service name

View File

@@ -34,7 +34,7 @@ import (
// NewRateService constructs RateService
func NewRateService(cfg *config.CGRConfig,
cacheS *CacheService, filterSChan chan *engine.FilterS,
dmS *DataDBService, server *commonlisteners.CommonListenerS,
dmS *DataDBService, cls *CommonListenerService,
intConnChan chan birpc.ClientConnector, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &RateService{
@@ -42,7 +42,7 @@ func NewRateService(cfg *config.CGRConfig,
cacheS: cacheS,
filterSChan: filterSChan,
dmS: dmS,
server: server,
cls: cls,
intConnChan: intConnChan,
rldChan: make(chan struct{}),
anz: anz,
@@ -54,18 +54,19 @@ func NewRateService(cfg *config.CGRConfig,
type RateService struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
cls *CommonListenerService
anz *AnalyzerService
dmS *DataDBService
cacheS *CacheService
server *commonlisteners.CommonListenerS
filterSChan chan *engine.FilterS
rldChan chan struct{}
stopChan chan struct{}
rateS *rates.RateS
cl *commonlisteners.CommonListenerS
rateS *rates.RateS
rldChan chan struct{}
stopChan chan struct{}
intConnChan chan birpc.ClientConnector
anz *AnalyzerService
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -100,7 +101,7 @@ func (rs *RateService) Shutdown() (err error) {
rs.rateS.Shutdown() //we don't verify the error because shutdown never returns an err
rs.rateS = nil
<-rs.intConnChan
rs.server.RpcUnregisterName(utils.RateSv1)
rs.cl.RpcUnregisterName(utils.RateSv1)
return
}
@@ -110,6 +111,9 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
return utils.ErrServiceAlreadyRunning
}
if rs.cl, err = rs.cls.WaitForCLS(ctx); err != nil {
return err
}
if err = rs.cacheS.WaitToPrecache(ctx,
utils.CacheRateProfiles,
utils.CacheRateProfilesFilterIndexes,
@@ -141,7 +145,7 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
}
// srv, _ := birpc.NewService(apis.NewRateSv1(rs.rateS), "", false)
if !rs.cfg.DispatcherSCfg().Enabled {
rs.server.RpcRegister(srv)
rs.cl.RpcRegister(srv)
}
rs.intConnChan <- rs.anz.GetInternalCodec(srv, utils.RateS)
return

View File

@@ -52,7 +52,7 @@ func TestRateSCoverage(t *testing.T) {
filterSChan: filterSChan,
dmS: db,
cacheS: chS,
server: cls,
cls: cls,
stopChan: make(chan struct{}),
intConnChan: make(chan birpc.ClientConnector, 1),
anz: anz,

View File

@@ -22,7 +22,6 @@ import (
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/registrarc"
@@ -31,12 +30,10 @@ import (
)
// NewRegistrarCService returns the Dispatcher Service
func NewRegistrarCService(cfg *config.CGRConfig, server *commonlisteners.CommonListenerS,
connMgr *engine.ConnManager, anz *AnalyzerService,
func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &RegistrarCService{
cfg: cfg,
server: server,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -46,15 +43,16 @@ func NewRegistrarCService(cfg *config.CGRConfig, server *commonlisteners.CommonL
// RegistrarCService implements Service interface
type RegistrarCService struct {
sync.RWMutex
cfg *config.CGRConfig
server *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
anz *AnalyzerService
dspS *registrarc.RegistrarCService
stopChan chan struct{}
rldChan chan struct{}
dspS *registrarc.RegistrarCService
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// Start should handle the sercive start
@@ -94,7 +92,7 @@ func (dspS *RegistrarCService) Shutdown() (err error) {
func (dspS *RegistrarCService) IsRunning() bool {
dspS.RLock()
defer dspS.RUnlock()
return dspS != nil && dspS.dspS != nil
return dspS.dspS != nil
}
// ServiceName returns the service name

View File

@@ -34,7 +34,7 @@ import (
// NewResourceService returns the Resource Service
func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
server *commonlisteners.CommonListenerS, internalResourceSChan chan birpc.ClientConnector,
cls *CommonListenerService, internalResourceSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &ResourceService{
@@ -43,7 +43,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -53,16 +53,19 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
// ResourceService implements Service interface
type ResourceService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
reS *engine.ResourceS
reS *engine.ResourceS
cl *commonlisteners.CommonListenerS
connChan chan birpc.ClientConnector
connMgr *engine.ConnManager
anz *AnalyzerService
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -73,6 +76,9 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
}
reS.srvDep[utils.DataDB].Add(1)
if reS.cl, err = reS.cls.WaitForCLS(ctx); err != nil {
return err
}
if err = reS.cacheS.WaitToPrecache(ctx,
utils.CacheResourceProfiles,
utils.CacheResources,
@@ -100,7 +106,7 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
// srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false)
if !reS.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
reS.server.RpcRegister(s)
reS.cl.RpcRegister(s)
}
}
reS.connChan <- reS.anz.GetInternalCodec(srv, utils.ResourceS)
@@ -123,7 +129,7 @@ func (reS *ResourceService) Shutdown() (err error) {
reS.reS.Shutdown(context.TODO()) //we don't verify the error because shutdown never returns an error
reS.reS = nil
<-reS.connChan
reS.server.RpcUnregisterName(utils.ResourceSv1)
reS.cl.RpcUnregisterName(utils.ResourceSv1)
return
}
@@ -131,7 +137,7 @@ func (reS *ResourceService) Shutdown() (err error) {
func (reS *ResourceService) IsRunning() bool {
reS.RLock()
defer reS.RUnlock()
return reS != nil && reS.reS != nil
return reS.reS != nil
}
// ServiceName returns the service name

View File

@@ -51,7 +51,7 @@ func TestResourceSCoverage(t *testing.T) {
dm: db,
cacheS: chS,
filterSChan: filterSChan,
server: cls,
cls: cls,
connChan: make(chan birpc.ClientConnector, 1),
connMgr: nil,
anz: anz,

View File

@@ -35,7 +35,7 @@ import (
// NewRouteService returns the Route Service
func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
server *commonlisteners.CommonListenerS, internalRouteSChan chan birpc.ClientConnector,
cls *CommonListenerService, internalRouteSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &RouteService{
@@ -44,7 +44,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -54,16 +54,19 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
// RouteService implements Service interface
type RouteService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
routeS *engine.RouteS
routeS *engine.RouteS
cl *commonlisteners.CommonListenerS
connChan chan birpc.ClientConnector
anz *AnalyzerService
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -73,6 +76,9 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
return utils.ErrServiceAlreadyRunning
}
if routeS.cl, err = routeS.cls.WaitForCLS(ctx); err != nil {
return err
}
if err = routeS.cacheS.WaitToPrecache(ctx,
utils.CacheRouteProfiles,
utils.CacheRouteFilterIndexes); err != nil {
@@ -99,7 +105,7 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
// srv, _ := birpc.NewService(apis.NewRouteSv1(routeS.routeS), "", false)
if !routeS.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
routeS.server.RpcRegister(s)
routeS.cl.RpcRegister(s)
}
}
routeS.connChan <- routeS.anz.GetInternalCodec(srv, utils.RouteS)
@@ -118,7 +124,7 @@ func (routeS *RouteService) Shutdown() (err error) {
routeS.routeS.Shutdown() //we don't verify the error because shutdown never returns an error
routeS.routeS = nil
<-routeS.connChan
routeS.server.RpcUnregisterName(utils.RouteSv1)
routeS.cl.RpcUnregisterName(utils.RouteSv1)
return
}
@@ -126,7 +132,7 @@ func (routeS *RouteService) Shutdown() (err error) {
func (routeS *RouteService) IsRunning() bool {
routeS.RLock()
defer routeS.RUnlock()
return routeS != nil && routeS.routeS != nil
return routeS.routeS != nil
}
// ServiceName returns the service name

View File

@@ -51,7 +51,7 @@ func TestSupplierSCoverage(t *testing.T) {
dm: db,
cacheS: chS,
filterSChan: filterSChan,
server: cls,
cls: cls,
connMgr: nil,
routeS: &engine.RouteS{},
// rpc: nil,

View File

@@ -36,7 +36,7 @@ import (
// NewSessionService returns the Session Service
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS,
server *commonlisteners.CommonListenerS, internalChan chan birpc.ClientConnector,
cls *CommonListenerService, internalChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &SessionService{
@@ -44,7 +44,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha
cfg: cfg,
dm: dm,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -54,19 +54,20 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha
// SessionService implements Service interface
type SessionService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
anz *AnalyzerService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
stopChan chan struct{}
sm *sessions.SessionS
connChan chan birpc.ClientConnector
sm *sessions.SessionS
cl *commonlisteners.CommonListenerS
// in order to stop the bircp server if necesary
bircpEnabled bool
bircpEnabled bool // to stop birpc server if needed
stopChan chan struct{}
connChan chan birpc.ClientConnector
connMgr *engine.ConnManager
anz *AnalyzerService
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -76,6 +77,9 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
return utils.ErrServiceAlreadyRunning
}
if smg.cl, err = smg.cls.WaitForCLS(ctx); err != nil {
return err
}
var filterS *engine.FilterS
if filterS, err = waitForFilterS(ctx, smg.filterSChan); err != nil {
return
@@ -102,7 +106,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
// srv, _ := birpc.NewService(apis.NewSessionSv1(smg.sm), utils.EmptyString, false) // methods with multiple options
if !smg.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
smg.server.RpcRegister(s)
smg.cl.RpcRegister(s)
}
}
smg.connChan <- smg.anz.GetInternalCodec(srv, utils.SessionS)
@@ -110,7 +114,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
if smg.cfg.SessionSCfg().ListenBijson != utils.EmptyString {
smg.bircpEnabled = true
for n, s := range srv {
smg.server.BiRPCRegisterName(n, s)
smg.cl.BiRPCRegisterName(n, s)
}
// run this in it's own goroutine
go smg.start(shtDw)
@@ -119,7 +123,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
}
func (smg *SessionService) start(shtDw context.CancelFunc) (err error) {
if err := smg.server.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson,
if err := smg.cl.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson,
smg.cfg.SessionSCfg().ListenBigob, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err))
smg.Lock()
@@ -144,12 +148,12 @@ func (smg *SessionService) Shutdown() (err error) {
return
}
if smg.bircpEnabled {
smg.server.StopBiRPC()
smg.cl.StopBiRPC()
smg.bircpEnabled = false
}
smg.sm = nil
<-smg.connChan
smg.server.RpcUnregisterName(utils.SessionSv1)
smg.cl.RpcUnregisterName(utils.SessionSv1)
// smg.server.BiRPCUnregisterName(utils.SessionSv1)
return
}
@@ -158,7 +162,7 @@ func (smg *SessionService) Shutdown() (err error) {
func (smg *SessionService) IsRunning() bool {
smg.RLock()
defer smg.RUnlock()
return smg != nil && smg.sm != nil
return smg.sm != nil
}
// ServiceName returns the service name

View File

@@ -59,7 +59,7 @@ func TestSessionSCoverage(t *testing.T) {
srv2 := SessionService{
cfg: cfg,
dm: db,
server: cls,
cls: cls,
connChan: make(chan birpc.ClientConnector, 1),
connMgr: nil,
anz: anz,

View File

@@ -113,7 +113,7 @@ func (sip *SIPAgent) Shutdown() (err error) {
func (sip *SIPAgent) IsRunning() bool {
sip.RLock()
defer sip.RUnlock()
return sip != nil && sip.sip != nil
return sip.sip != nil
}
// ServiceName returns the service name

View File

@@ -34,7 +34,7 @@ import (
// NewStatService returns the Stat Service
func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
server *commonlisteners.CommonListenerS, internalStatSChan chan birpc.ClientConnector,
cls *CommonListenerService, internalStatSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &StatService{
@@ -43,7 +43,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
cls: cls,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -53,16 +53,19 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
// StatService implements Service interface
type StatService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
sts *engine.StatS
sts *engine.StatS
cl *commonlisteners.CommonListenerS
connChan chan birpc.ClientConnector
anz *AnalyzerService
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -73,6 +76,9 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
}
sts.srvDep[utils.DataDB].Add(1)
if sts.cl, err = sts.cls.WaitForCLS(ctx); err != nil {
return err
}
if err = sts.cacheS.WaitToPrecache(ctx,
utils.CacheStatQueueProfiles,
utils.CacheStatQueues,
@@ -102,7 +108,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
// srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false)
if !sts.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
sts.server.RpcRegister(s)
sts.cl.RpcRegister(s)
}
}
sts.connChan <- sts.anz.GetInternalCodec(srv, utils.StatS)
@@ -125,7 +131,7 @@ func (sts *StatService) Shutdown() (err error) {
sts.sts.Shutdown(context.TODO())
sts.sts = nil
<-sts.connChan
sts.server.RpcUnregisterName(utils.StatSv1)
sts.cl.RpcUnregisterName(utils.StatSv1)
return
}
@@ -133,7 +139,7 @@ func (sts *StatService) Shutdown() (err error) {
func (sts *StatService) IsRunning() bool {
sts.RLock()
defer sts.RUnlock()
return sts != nil && sts.sts != nil
return sts.sts != nil
}
// ServiceName returns the service name

View File

@@ -51,7 +51,7 @@ func TestStatSCoverage(t *testing.T) {
dm: db,
cacheS: chS,
filterSChan: filterSChan,
server: cls,
cls: cls,
connMgr: nil,
sts: &engine.StatS{},
connChan: make(chan birpc.ClientConnector, 1),

View File

@@ -153,7 +153,7 @@ func (db *StorDBService) IsRunning() bool {
// isRunning returns if the service is running (not thread safe)
func (db *StorDBService) isRunning() bool {
return db != nil && db.db != nil
return db.db != nil
}
// ServiceName returns the service name

View File

@@ -35,7 +35,7 @@ import (
func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
server *commonlisteners.CommonListenerS, internalThresholdSChan chan birpc.ClientConnector,
cls *CommonListenerService, internalThresholdSChan chan birpc.ClientConnector,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &ThresholdService{
connChan: internalThresholdSChan,
@@ -43,7 +43,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
cls: cls,
anz: anz,
srvDep: srvDep,
connMgr: connMgr,
@@ -53,16 +53,19 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
// ThresholdService implements Service interface
type ThresholdService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
anz *AnalyzerService
cacheS *CacheService
filterSChan chan *engine.FilterS
server *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
thrs *engine.ThresholdS
thrs *engine.ThresholdS
cl *commonlisteners.CommonListenerS
connChan chan birpc.ClientConnector
anz *AnalyzerService
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
@@ -73,6 +76,9 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
}
thrs.srvDep[utils.DataDB].Add(1)
if thrs.cl, err = thrs.cls.WaitForCLS(ctx); err != nil {
return err
}
if err = thrs.cacheS.WaitToPrecache(ctx,
utils.CacheThresholdProfiles,
utils.CacheThresholds,
@@ -101,7 +107,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
// srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false)
if !thrs.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
thrs.server.RpcRegister(s)
thrs.cl.RpcRegister(s)
}
}
thrs.connChan <- thrs.anz.GetInternalCodec(srv, utils.ThresholdS)
@@ -124,7 +130,7 @@ func (thrs *ThresholdService) Shutdown() (_ error) {
thrs.thrs.Shutdown(context.TODO())
thrs.thrs = nil
<-thrs.connChan
thrs.server.RpcUnregisterName(utils.ThresholdSv1)
thrs.cl.RpcUnregisterName(utils.ThresholdSv1)
return
}
@@ -132,7 +138,7 @@ func (thrs *ThresholdService) Shutdown() (_ error) {
func (thrs *ThresholdService) IsRunning() bool {
thrs.RLock()
defer thrs.RUnlock()
return thrs != nil && thrs.thrs != nil
return thrs.thrs != nil
}
// ServiceName returns the service name

View File

@@ -50,7 +50,7 @@ func TestThresholdSCoverage(t *testing.T) {
dm: db,
cacheS: chS,
filterSChan: filterSChan,
server: cls,
cls: cls,
thrs: thrs1,
connChan: make(chan birpc.ClientConnector, 1),
anz: anz,

View File

@@ -34,13 +34,13 @@ import (
// NewTPeService is the constructor for the TpeService
func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService,
server *commonlisteners.CommonListenerS, srvDep map[string]*sync.WaitGroup) servmanager.Service {
cls *CommonListenerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &TPeService{
cfg: cfg,
srvDep: srvDep,
dm: dm,
connMgr: connMgr,
server: server,
cls: cls,
}
}
@@ -48,58 +48,63 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataD
type TPeService struct {
sync.RWMutex
cfg *config.CGRConfig
server *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
tpes *tpes.TPeS
dm *DataDBService
srv *birpc.Service
stopChan chan struct{}
cls *CommonListenerService
dm *DataDBService
srvDep map[string]*sync.WaitGroup
tpes *tpes.TPeS
cl *commonlisteners.CommonListenerS
srv *birpc.Service
stopChan chan struct{}
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// Start should handle the service start
func (tpSrv *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
if ts.cl, err = ts.cls.WaitForCLS(ctx); err != nil {
return err
}
var datadb *engine.DataManager
if datadb, err = tpSrv.dm.WaitForDM(ctx); err != nil {
if datadb, err = ts.dm.WaitForDM(ctx); err != nil {
return
}
tpSrv.tpes = tpes.NewTPeS(tpSrv.cfg, datadb, tpSrv.connMgr)
ts.tpes = tpes.NewTPeS(ts.cfg, datadb, ts.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TPeS))
tpSrv.stopChan = make(chan struct{})
tpSrv.srv, _ = birpc.NewService(apis.NewTPeSv1(tpSrv.tpes), utils.EmptyString, false)
tpSrv.server.RpcRegister(tpSrv.srv)
ts.stopChan = make(chan struct{})
ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false)
ts.cl.RpcRegister(ts.srv)
return
}
// Reload handles the change of config
func (tpSrv *TPeService) Reload(*context.Context, context.CancelFunc) (err error) {
func (ts *TPeService) Reload(*context.Context, context.CancelFunc) (err error) {
return
}
// Shutdown stops the service
func (tpSrv *TPeService) Shutdown() (err error) {
tpSrv.srv = nil
close(tpSrv.stopChan)
func (ts *TPeService) Shutdown() (err error) {
ts.srv = nil
close(ts.stopChan)
utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> subsystem", utils.CoreS, utils.TPeS))
return
}
// IsRunning returns if the service is running
func (tpSrv *TPeService) IsRunning() bool {
tpSrv.Lock()
defer tpSrv.Unlock()
return tpSrv != nil && tpSrv.tpes != nil
func (ts *TPeService) IsRunning() bool {
ts.Lock()
defer ts.Unlock()
return ts.tpes != nil
}
// ServiceName returns the service name
func (tpSrv *TPeService) ServiceName() string {
func (ts *TPeService) ServiceName() string {
return utils.TPeS
}
// ShouldRun returns if the service should be running
func (tpSrv *TPeService) ShouldRun() bool {
return tpSrv.cfg.TpeSCfg().Enabled
func (ts *TPeService) ShouldRun() bool {
return ts.cfg.TpeSCfg().Enabled
}

View File

@@ -34,7 +34,7 @@ import (
// NewTrendsService returns the TrendS Service
func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *CacheService, filterSChan chan *engine.FilterS,
server *commonlisteners.CommonListenerS, internalTrendSChan chan birpc.ClientConnector,
cls *CommonListenerService, internalTrendSChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &TrendService{
@@ -42,7 +42,7 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
cfg: cfg,
dm: dm,
cacheS: cacheS,
server: server,
cls: cls,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -51,16 +51,20 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
type TrendService struct {
sync.RWMutex
cfg *config.CGRConfig
cls *CommonListenerService
dm *DataDBService
cacheS *CacheService
server *commonlisteners.CommonListenerS
connMgr *engine.ConnManager
filterSChan chan *engine.FilterS
connChan chan birpc.ClientConnector
trs *engine.TrendS
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
cacheS *CacheService
filterSChan chan *engine.FilterS
trs *engine.TrendS
cl *commonlisteners.CommonListenerS
connChan chan birpc.ClientConnector
connMgr *engine.ConnManager
cfg *config.CGRConfig
srvDep map[string]*sync.WaitGroup
}
// Start should handle the sercive start
@@ -70,6 +74,9 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
}
trs.srvDep[utils.DataDB].Add(1)
if trs.cl, err = trs.cls.WaitForCLS(ctx); err != nil {
return err
}
if err = trs.cacheS.WaitToPrecache(ctx,
utils.CacheTrendProfiles,
utils.CacheTrends,
@@ -101,7 +108,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
}
if !trs.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
trs.server.RpcRegister(s)
trs.cl.RpcRegister(s)
}
}
trs.connChan <- trs.anz.GetInternalCodec(srv, utils.Trends)
@@ -124,13 +131,13 @@ func (trs *TrendService) Shutdown() (err error) {
trs.trs.StopTrendS()
trs.trs = nil
<-trs.connChan
trs.server.RpcUnregisterName(utils.TrendSv1)
trs.cl.RpcUnregisterName(utils.TrendSv1)
return
}
// IsRunning returns if the service is running
func (trs *TrendService) IsRunning() bool {
return trs != nil && trs.trs != nil
return trs.trs != nil
}
// ServiceName returns the service name

View File

@@ -57,8 +57,8 @@ func TestNewTrendService(t *testing.T) {
t.Errorf("Expected cacheS to be %v, but got %v", cacheS, trendService.cacheS)
}
if trendService.server != server {
t.Errorf("Expected server to be %v, but got %v", server, trendService.server)
if trendService.cls != server {
t.Errorf("Expected server to be %v, but got %v", server, trendService.cls)
}
if trendService.connChan != internalTrendSChan {
t.Errorf("Expected connChan to be %v, but got %v", internalTrendSChan, trendService.connChan)

View File

@@ -990,6 +990,7 @@ const (
CDRServer = "CDRServer"
GuardianS = "GuardianS"
ServiceManagerS = "ServiceManager"
CommonListenerS = "CommonListenerS"
)
// Lower service names