diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 256a4cea1..bf35dfd1f 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -51,7 +51,8 @@ type APIerSv1 struct { FilterS *engine.FilterS //Used for CDR Exporter ConnMgr *engine.ConnManager - StorDBChan chan engine.StorDB + StorDBChan chan engine.StorDB + ResponderChan chan *engine.Responder } // Call implements rpcclient.ClientConnector interface for internal RPC @@ -1519,6 +1520,8 @@ func (apierSv1 *APIerSv1) ListenAndServe(stopChan chan struct{}) { } apierSv1.CdrDb = stordb apierSv1.StorDb = stordb + case resp := <-apierSv1.ResponderChan: + apierSv1.Responder = resp } } } diff --git a/services/apiers_it_test.go b/services/apiers_it_test.go index 1e1043f77..a42280adc 100644 --- a/services/apiers_it_test.go +++ b/services/apiers_it_test.go @@ -62,7 +62,8 @@ func TestApiersReload(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz, srvDep) - apiSv1 := NewAPIerSv1Service(cfg, db, stordb, filterSChan, server, schS, new(ResponderService), + rspd := NewResponderService(cfg, server, make(chan rpcclient.ClientConnector, 1), shdChan, anz, srvDep) + apiSv1 := NewAPIerSv1Service(cfg, db, stordb, filterSChan, server, schS, rspd, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep) apiSv2 := NewAPIerSv2Service(apiSv1, cfg, server, make(chan rpcclient.ClientConnector, 1), anz, srvDep) diff --git a/services/apierv1.go b/services/apierv1.go index 1713593f6..bb631b669 100644 --- a/services/apierv1.go +++ b/services/apierv1.go @@ -94,6 +94,9 @@ func (apiService *APIerSv1Service) Start() (err error) { apiService.stopChan = make(chan struct{}) apiService.storDB.RegisterSyncChan(storDBChan) stordb := <-storDBChan + + respChan := make(chan *engine.Responder, 1) + apiService.responderService.RegisterSyncChan(apiService.ServiceName(), respChan) apiService.Lock() defer apiService.Unlock() @@ -102,11 +105,13 @@ func (apiService *APIerSv1Service) Start() (err error) { CdrDb: stordb, StorDb: stordb, Config: apiService.cfg, - Responder: apiService.responderService.GetResponder(), SchedulerService: apiService.schedService, FilterS: filterS, ConnMgr: apiService.connMgr, StorDBChan: storDBChan, + + Responder: apiService.responderService.GetResponder(), // if already started use it + ResponderChan: respChan, // if not wait in listenAndServe } go apiService.api.ListenAndServe(apiService.stopChan) @@ -136,6 +141,7 @@ func (apiService *APIerSv1Service) Shutdown() (err error) { close(apiService.stopChan) apiService.api = nil <-apiService.connChan + apiService.responderService.UnregisterSyncChan(apiService.ServiceName()) apiService.Unlock() return } diff --git a/services/responders.go b/services/responders.go index f23e729b2..fa1674c59 100644 --- a/services/responders.go +++ b/services/responders.go @@ -34,12 +34,13 @@ func NewResponderService(cfg *config.CGRConfig, server *cores.Server, shdChan *utils.SyncedChan, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) *ResponderService { return &ResponderService{ - connChan: internalRALsChan, - cfg: cfg, - server: server, - shdChan: shdChan, - anz: anz, - srvDep: srvDep, + connChan: internalRALsChan, + cfg: cfg, + server: server, + shdChan: shdChan, + anz: anz, + srvDep: srvDep, + syncChans: make(map[string]chan *engine.Responder), } } @@ -51,10 +52,11 @@ type ResponderService struct { server *cores.Server shdChan *utils.SyncedChan - resp *engine.Responder - connChan chan rpcclient.ClientConnector - anz *AnalyzerService - srvDep map[string]*sync.WaitGroup + resp *engine.Responder + connChan chan rpcclient.ClientConnector + anz *AnalyzerService + srvDep map[string]*sync.WaitGroup + syncChans map[string]chan *engine.Responder } // Start should handle the sercive start @@ -76,6 +78,7 @@ func (resp *ResponderService) Start() (err error) { } resp.connChan <- resp.anz.GetInternalCodec(resp.resp, utils.ResponderS) // Rater done + resp.sync() return } @@ -92,6 +95,9 @@ func (resp *ResponderService) Shutdown() (err error) { resp.Lock() resp.resp = nil <-resp.connChan + for _, c := range resp.syncChans { + c <- nil // just tell the services that responder is nil + } resp.Unlock() return } @@ -100,6 +106,10 @@ func (resp *ResponderService) Shutdown() (err error) { func (resp *ResponderService) IsRunning() bool { resp.RLock() defer resp.RUnlock() + return resp.isRunning() +} + +func (resp *ResponderService) isRunning() bool { return resp != nil && resp.resp != nil } @@ -119,3 +129,33 @@ func (resp *ResponderService) GetResponder() *engine.Responder { func (resp *ResponderService) ShouldRun() bool { return resp.cfg.RalsCfg().Enabled } + +// RegisterSyncChan used by dependent subsystems to register a chanel to reload only the responder(thread safe) +func (resp *ResponderService) RegisterSyncChan(srv string, c chan *engine.Responder) { + resp.Lock() + resp.syncChans[srv] = c + if resp.isRunning() { + c <- resp.resp + } + resp.Unlock() +} + +// UnregisterSyncChan used by dependent subsystems to unregister a chanel +func (resp *ResponderService) UnregisterSyncChan(srv string) { + resp.Lock() + c, has := resp.syncChans[srv] + if has { + close(c) + delete(resp.syncChans, srv) + } + resp.Unlock() +} + +// sync sends the responder over syncChansv (not thrad safe) +func (resp *ResponderService) sync() { + if resp.isRunning() { + for _, c := range resp.syncChans { + c <- resp.resp + } + } +}