mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-21 07:08:45 +05:00
Register the CDRServer from cdrs instead of the one from services
Also renamed services.CDRServer to CDRService to prevent such mistakes from happening in the future.
This commit is contained in:
committed by
Dan Christian Bogos
parent
00a94d092f
commit
8ee919319b
@@ -40,7 +40,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
|
||||
server *cores.Server, internalCDRServerChan chan birpc.ClientConnector,
|
||||
connMgr *engine.ConnManager, anz *AnalyzerService,
|
||||
srvDep map[string]*sync.WaitGroup) servmanager.Service {
|
||||
return &CDRServer{
|
||||
return &CDRService{
|
||||
connChan: internalCDRServerChan,
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
@@ -53,8 +53,8 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
|
||||
}
|
||||
}
|
||||
|
||||
// CDRServer implements Service interface
|
||||
type CDRServer struct {
|
||||
// CDRService implements Service interface
|
||||
type CDRService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
dm *DataDBService
|
||||
@@ -73,54 +73,54 @@ type CDRServer struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (cdrS *CDRServer) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
|
||||
if cdrS.IsRunning() {
|
||||
func (cdrSrv *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
|
||||
if cdrSrv.IsRunning() {
|
||||
return utils.ErrServiceAlreadyRunning
|
||||
}
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs))
|
||||
|
||||
var filterS *engine.FilterS
|
||||
if filterS, err = waitForFilterS(ctx, cdrS.filterSChan); err != nil {
|
||||
if filterS, err = waitForFilterS(ctx, cdrSrv.filterSChan); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var datadb *engine.DataManager
|
||||
if datadb, err = cdrS.dm.WaitForDM(ctx); err != nil {
|
||||
if datadb, err = cdrSrv.dm.WaitForDM(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
storDBChan := make(chan engine.StorDB, 1)
|
||||
cdrS.stopChan = make(chan struct{})
|
||||
cdrS.storDB.RegisterSyncChan(storDBChan)
|
||||
cdrSrv.stopChan = make(chan struct{})
|
||||
cdrSrv.storDB.RegisterSyncChan(storDBChan)
|
||||
|
||||
cdrS.Lock()
|
||||
defer cdrS.Unlock()
|
||||
cdrSrv.Lock()
|
||||
defer cdrSrv.Unlock()
|
||||
|
||||
cdrS.cdrS = cdrs.NewCDRServer(cdrS.cfg, datadb, filterS, cdrS.connMgr, storDBChan)
|
||||
go cdrS.cdrS.ListenAndServe(cdrS.stopChan)
|
||||
cdrSrv.cdrS = cdrs.NewCDRServer(cdrSrv.cfg, datadb, filterS, cdrSrv.connMgr, storDBChan)
|
||||
go cdrSrv.cdrS.ListenAndServe(cdrSrv.stopChan)
|
||||
runtime.Gosched()
|
||||
utils.Logger.Info("Registering CDRS RPC service.")
|
||||
srv, err := birpc.NewServiceWithMethodsRename(cdrS, utils.CDRsV1, true, func(oldFn string) (newFn string) {
|
||||
srv, err := birpc.NewServiceWithMethodsRename(cdrSrv.cdrS, utils.CDRsV1, true, func(oldFn string) (newFn string) {
|
||||
return strings.TrimPrefix(oldFn, utils.V1Prfx)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !cdrS.cfg.DispatcherSCfg().Enabled {
|
||||
cdrS.server.RpcRegister(srv)
|
||||
if !cdrSrv.cfg.DispatcherSCfg().Enabled {
|
||||
cdrSrv.server.RpcRegister(srv)
|
||||
}
|
||||
cdrS.connChan <- cdrS.anz.GetInternalCodec(srv, utils.CDRServer) // Signal that cdrS is operational
|
||||
cdrSrv.connChan <- cdrSrv.anz.GetInternalCodec(srv, utils.CDRServer) // Signal that cdrS is operational
|
||||
return
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (cdrService *CDRServer) Reload(*context.Context, context.CancelFunc) (err error) {
|
||||
func (cdrService *CDRService) Reload(*context.Context, context.CancelFunc) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (cdrService *CDRServer) Shutdown() (err error) {
|
||||
func (cdrService *CDRService) Shutdown() (err error) {
|
||||
cdrService.Lock()
|
||||
close(cdrService.stopChan)
|
||||
cdrService.cdrS = nil
|
||||
@@ -131,18 +131,18 @@ func (cdrService *CDRServer) Shutdown() (err error) {
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (cdrService *CDRServer) IsRunning() bool {
|
||||
func (cdrService *CDRService) IsRunning() bool {
|
||||
cdrService.RLock()
|
||||
defer cdrService.RUnlock()
|
||||
return cdrService != nil && cdrService.cdrS != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (cdrService *CDRServer) ServiceName() string {
|
||||
func (cdrService *CDRService) ServiceName() string {
|
||||
return utils.CDRServer
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (cdrService *CDRServer) ShouldRun() bool {
|
||||
func (cdrService *CDRService) ShouldRun() bool {
|
||||
return cdrService.cfg.CdrsCfg().Enabled
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ func TestCdrsCoverage(t *testing.T) {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
//populates cdrS2 with something in order to call the close funct
|
||||
cdrS2 := &CDRServer{
|
||||
cdrS2 := &CDRService{
|
||||
RWMutex: sync.RWMutex{},
|
||||
cfg: cfg,
|
||||
dm: db,
|
||||
|
||||
Reference in New Issue
Block a user