Updated ApierV1 service to reload the ResponderS field when needed

This commit is contained in:
Trial97
2021-02-04 09:38:05 +02:00
committed by Dan Christian Bogos
parent a020edfc5c
commit 8e7dfa9132
4 changed files with 63 additions and 13 deletions

View File

@@ -51,7 +51,8 @@ type APIerSv1 struct {
FilterS *engine.FilterS //Used for CDR Exporter
ConnMgr *engine.ConnManager
StorDBChan chan engine.StorDB
StorDBChan chan engine.StorDB
ResponderChan chan *engine.Responder
}
// Call implements rpcclient.ClientConnector interface for internal RPC
@@ -1519,6 +1520,8 @@ func (apierSv1 *APIerSv1) ListenAndServe(stopChan chan struct{}) {
}
apierSv1.CdrDb = stordb
apierSv1.StorDb = stordb
case resp := <-apierSv1.ResponderChan:
apierSv1.Responder = resp
}
}
}

View File

@@ -62,7 +62,8 @@ func TestApiersReload(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz, srvDep)
apiSv1 := NewAPIerSv1Service(cfg, db, stordb, filterSChan, server, schS, new(ResponderService),
rspd := NewResponderService(cfg, server, make(chan rpcclient.ClientConnector, 1), shdChan, anz, srvDep)
apiSv1 := NewAPIerSv1Service(cfg, db, stordb, filterSChan, server, schS, rspd,
make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep)
apiSv2 := NewAPIerSv2Service(apiSv1, cfg, server, make(chan rpcclient.ClientConnector, 1), anz, srvDep)

View File

@@ -94,6 +94,9 @@ func (apiService *APIerSv1Service) Start() (err error) {
apiService.stopChan = make(chan struct{})
apiService.storDB.RegisterSyncChan(storDBChan)
stordb := <-storDBChan
respChan := make(chan *engine.Responder, 1)
apiService.responderService.RegisterSyncChan(apiService.ServiceName(), respChan)
apiService.Lock()
defer apiService.Unlock()
@@ -102,11 +105,13 @@ func (apiService *APIerSv1Service) Start() (err error) {
CdrDb: stordb,
StorDb: stordb,
Config: apiService.cfg,
Responder: apiService.responderService.GetResponder(),
SchedulerService: apiService.schedService,
FilterS: filterS,
ConnMgr: apiService.connMgr,
StorDBChan: storDBChan,
Responder: apiService.responderService.GetResponder(), // if already started use it
ResponderChan: respChan, // if not wait in listenAndServe
}
go apiService.api.ListenAndServe(apiService.stopChan)
@@ -136,6 +141,7 @@ func (apiService *APIerSv1Service) Shutdown() (err error) {
close(apiService.stopChan)
apiService.api = nil
<-apiService.connChan
apiService.responderService.UnregisterSyncChan(apiService.ServiceName())
apiService.Unlock()
return
}

View File

@@ -34,12 +34,13 @@ func NewResponderService(cfg *config.CGRConfig, server *cores.Server,
shdChan *utils.SyncedChan, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) *ResponderService {
return &ResponderService{
connChan: internalRALsChan,
cfg: cfg,
server: server,
shdChan: shdChan,
anz: anz,
srvDep: srvDep,
connChan: internalRALsChan,
cfg: cfg,
server: server,
shdChan: shdChan,
anz: anz,
srvDep: srvDep,
syncChans: make(map[string]chan *engine.Responder),
}
}
@@ -51,10 +52,11 @@ type ResponderService struct {
server *cores.Server
shdChan *utils.SyncedChan
resp *engine.Responder
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
resp *engine.Responder
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
syncChans map[string]chan *engine.Responder
}
// Start should handle the sercive start
@@ -76,6 +78,7 @@ func (resp *ResponderService) Start() (err error) {
}
resp.connChan <- resp.anz.GetInternalCodec(resp.resp, utils.ResponderS) // Rater done
resp.sync()
return
}
@@ -92,6 +95,9 @@ func (resp *ResponderService) Shutdown() (err error) {
resp.Lock()
resp.resp = nil
<-resp.connChan
for _, c := range resp.syncChans {
c <- nil // just tell the services that responder is nil
}
resp.Unlock()
return
}
@@ -100,6 +106,10 @@ func (resp *ResponderService) Shutdown() (err error) {
func (resp *ResponderService) IsRunning() bool {
resp.RLock()
defer resp.RUnlock()
return resp.isRunning()
}
func (resp *ResponderService) isRunning() bool {
return resp != nil && resp.resp != nil
}
@@ -119,3 +129,33 @@ func (resp *ResponderService) GetResponder() *engine.Responder {
func (resp *ResponderService) ShouldRun() bool {
return resp.cfg.RalsCfg().Enabled
}
// RegisterSyncChan used by dependent subsystems to register a chanel to reload only the responder(thread safe)
func (resp *ResponderService) RegisterSyncChan(srv string, c chan *engine.Responder) {
resp.Lock()
resp.syncChans[srv] = c
if resp.isRunning() {
c <- resp.resp
}
resp.Unlock()
}
// UnregisterSyncChan used by dependent subsystems to unregister a chanel
func (resp *ResponderService) UnregisterSyncChan(srv string) {
resp.Lock()
c, has := resp.syncChans[srv]
if has {
close(c)
delete(resp.syncChans, srv)
}
resp.Unlock()
}
// sync sends the responder over syncChansv (not thrad safe)
func (resp *ResponderService) sync() {
if resp.isRunning() {
for _, c := range resp.syncChans {
c <- resp.resp
}
}
}