diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 3363f7a84..a5525973e 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -707,9 +707,10 @@ func main() { attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), internalDispatcherSChan, exitChan) srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals, - rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg) + rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg, + services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), + ) /* - services.NewEventReaderService(), services.NewDNSAgent(), services.NewFreeswitchAgent(), services.NewKamailioAgent(), @@ -719,9 +720,6 @@ func main() { services.NewHTTPAgent(), // no reload */ - /* - internalSMGChan := smg.GetIntenternalChan() - */ srvManager.StartServices() // Start FilterS diff --git a/services/ers.go b/services/ers.go index 7b77ff119..cac0b0405 100644 --- a/services/ers.go +++ b/services/ers.go @@ -22,6 +22,8 @@ import ( "fmt" "sync" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/ers" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" @@ -29,22 +31,35 @@ import ( ) // NewEventReaderService returns the EventReader Service -func NewEventReaderService() servmanager.Service { +func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, + sSChan, dispatcherChan chan rpcclient.RpcClientConnection, + exitChan chan bool) servmanager.Service { return &EventReaderService{ - rldChan: make(chan struct{}, 1), + rldChan: make(chan struct{}, 1), + cfg: cfg, + filterSChan: filterSChan, + sSChan: sSChan, + dispatcherChan: dispatcherChan, + exitChan: exitChan, } } // EventReaderService implements Service interface type EventReaderService struct { sync.RWMutex + cfg *config.CGRConfig + filterSChan chan *engine.FilterS + sSChan chan rpcclient.RpcClientConnection + dispatcherChan chan rpcclient.RpcClientConnection + exitChan chan bool + ers *ers.ERService rldChan chan struct{} stopChan chan struct{} } // Start should handle the sercive start -func (erS *EventReaderService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { +func (erS *EventReaderService) Start() (err error) { if erS.IsRunning() { return fmt.Errorf("service aleady running") } @@ -52,22 +67,25 @@ func (erS *EventReaderService) Start(sp servmanager.ServiceProvider, waitCache b erS.Lock() defer erS.Unlock() + filterS := <-erS.filterSChan + erS.filterSChan <- filterS + // remake tht stop chan erS.stopChan = make(chan struct{}, 1) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs)) var sS rpcclient.RpcClientConnection - if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().ERsCfg().SessionSConns); err != nil { + if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", utils.ERs, utils.SessionS, err.Error())) return } // build the service - erS.ers = ers.NewERService(sp.GetConfig(), sp.GetFilterS(), sS, erS.stopChan) - go func(erS *ers.ERService, rldChan chan struct{}) { - if err := erS.ListenAndServe(rldChan); err != nil { + erS.ers = ers.NewERService(erS.cfg, filterS, sS, erS.stopChan) + go func(ers *ers.ERService, rldChan chan struct{}) { + if err := ers.ListenAndServe(rldChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) - sp.GetExitChan() <- true + erS.exitChan <- true } }(erS.ers, erS.rldChan) return @@ -79,9 +97,9 @@ func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClie } // Reload handles the change of config -func (erS *EventReaderService) Reload(sp servmanager.ServiceProvider) (err error) { +func (erS *EventReaderService) Reload() (err error) { var sS rpcclient.RpcClientConnection - if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().ERsCfg().SessionSConns); err != nil { + if sS, err = NewConnection(erS.cfg, erS.sSChan, erS.dispatcherChan, erS.cfg.ERsCfg().SessionSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", utils.ERs, utils.SessionS, err.Error())) return @@ -102,11 +120,6 @@ func (erS *EventReaderService) Shutdown() (err error) { return } -// GetRPCInterface returns the interface to register for server -func (erS *EventReaderService) GetRPCInterface() interface{} { - return erS.ers -} - // IsRunning returns if the service is running func (erS *EventReaderService) IsRunning() bool { erS.RLock() @@ -118,3 +131,8 @@ func (erS *EventReaderService) IsRunning() bool { func (erS *EventReaderService) ServiceName() string { return utils.ERs } + +// ShouldRun returns if the service should be running +func (erS *EventReaderService) ShouldRun() bool { + return erS.cfg.ERsCfg().Enabled +} diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 734562e81..fb8d5d39a 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -57,8 +57,10 @@ func TestEventReaderSReload(t *testing.T) { /*loadStorage*/ nil, filterSChan, server, nil, engineShutdown) srvMngr.SetCacheS(chS) - attrS := NewEventReaderService() - srvMngr.AddService(attrS, NewSessionService()) + sS := NewSessionService(cfg, nil, server, nil, + nil, nil, nil, nil, nil, nil, nil, nil, engineShutdown) + attrS := NewEventReaderService(cfg, filterSChan, sS.GetIntenternalChan(), nil, engineShutdown) + srvMngr.AddServices(attrS, sS) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index b90e686e5..aa262960f 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -270,10 +270,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.GetConfig().SessionSCfg().Enabled { go srvMngr.startService(utils.SessionS) + } + if srvMngr.GetConfig().ERsCfg().Enabled { + go srvMngr.startService(utils.ERs) } /* - if srvMngr.GetConfig().ERsCfg().Enabled { - go srvMngr.startService(utils.ERs) - } if srvMngr.GetConfig().DNSAgentCfg().Enabled { go srvMngr.startService(utils.DNSAgent) } @@ -371,11 +371,11 @@ func (srvMngr *ServiceManager) handleReload() { case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJson): if err = srvMngr.reloadService(utils.SessionS); err != nil { return + } + case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson): + if err = srvMngr.reloadService(utils.ERs); err != nil { + return } /* - case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson): - if err = srvMngr.reloadService(utils.ERs); err != nil { - return - } case <-srvMngr.GetConfig().GetReloadChan(config.DNSAgentJson): if err = srvMngr.reloadService(utils.DNSAgent); err != nil { return