diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index fc5bada55..2721a571e 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -38,7 +38,6 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/ers" "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/services" "github.com/cgrates/cgrates/servmanager" @@ -140,39 +139,6 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne } } -// startERs handles starting of the EventReader Service -func startERs(sSChan, dspSChan chan rpcclient.RpcClientConnection, - filterSChan chan *engine.FilterS, - cfgRld chan struct{}, exitChan chan bool) { - var err error - - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs)) - filterS := <-filterSChan - filterSChan <- filterS - // overwrite the session service channel with dispatcher one - if cfg.DispatcherSCfg().Enabled { - sSChan = dspSChan - } - var sS rpcclient.RpcClientConnection - if sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.ERsCfg().SessionSConns, sSChan, false); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", - utils.ERs, utils.SessionS, err.Error())) - exitChan <- true - return - } - // build the service - erS := ers.NewERService(cfg, filterS, sS, exitChan) - if err = erS.ListenAndServe(cfgRld); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) - } - exitChan <- true -} - func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { var err error var sS rpcclient.RpcClientConnection @@ -1032,7 +998,7 @@ func main() { apiv2, _ := srvManager.GetService(utils.ApierV2) resp, _ := srvManager.GetService(utils.ResponderS) smg := services.NewSessionService() - srvManager.AddService(chS, attrS, chrS, tS, stS, reS, supS, schS, cdrS, rals, smg) + srvManager.AddService(chS, attrS, chrS, tS, stS, reS, supS, schS, cdrS, rals, smg, services.NewEventReaderService()) internalAttributeSChan := attrS.GetIntenternalChan() internalChargerSChan := chrS.GetIntenternalChan() internalThresholdSChan := tS.GetIntenternalChan() @@ -1084,10 +1050,6 @@ func main() { // Start CDRC components if necessary go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan) - if cfg.ERsCfg().Enabled { - go startERs(internalSMGChan, internalDispatcherSChan, - filterSChan, cfg.GetReloadChan(config.ERsJson), exitChan) - } // Start FreeSWITCHAgent if cfg.FsAgentCfg().Enabled { go startFsAgent(internalSMGChan, internalDispatcherSChan, exitChan) diff --git a/ers/ers.go b/ers/ers.go index f6fc4f7c6..27821fa5a 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -37,7 +37,7 @@ type erEvent struct { // NewERService instantiates the ERService func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, - sS rpcclient.RpcClientConnection, exitChan chan bool) *ERService { + sS rpcclient.RpcClientConnection, stopChan chan struct{}) *ERService { return &ERService{ cfg: cfg, rdrs: make(map[string]EventReader), @@ -47,7 +47,7 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, rdrErr: make(chan error), filterS: filterS, sS: sS, - exitChan: exitChan, + stopChan: stopChan, } } @@ -63,7 +63,7 @@ type ERService struct { filterS *engine.FilterS sS rpcclient.RpcClientConnection // connection towards SessionS - exitChan chan bool + stopChan chan struct{} } // ListenAndServe keeps the service alive @@ -84,8 +84,7 @@ func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) { fmt.Sprintf("<%s> running reader got error: <%s>", utils.ERs, err.Error())) return - case e := <-erS.exitChan: - erS.exitChan <- e // put back for the others listening for shutdown request + case <-erS.stopChan: return case erEv := <-erS.rdrEvents: if err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg); err != nil { @@ -95,7 +94,6 @@ func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) { } } } - return } // addReader will add a new reader to the service @@ -115,7 +113,7 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) { func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { for { select { - case <-erS.exitChan: + case <-erS.stopChan: return case <-cfgRldChan: cfgIDs := make(map[string]int) @@ -150,7 +148,7 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { utils.Logger.Crit( fmt.Sprintf("<%s> adding reader <%s> got error: <%s>", utils.ERs, id, err.Error())) - erS.exitChan <- true + erS.rdrErr <- err } } erS.Unlock() diff --git a/services/ers.go b/services/ers.go new file mode 100644 index 000000000..6304a6b40 --- /dev/null +++ b/services/ers.go @@ -0,0 +1,116 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/ers" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewEventReaderService returns the EventReader Service +func NewEventReaderService() servmanager.Service { + return &EventReaderService{ + connChan: make(chan rpcclient.RpcClientConnection, 1), + rldChan: make(chan struct{}, 1), + } +} + +// EventReaderService implements Service interface +type EventReaderService struct { + sync.RWMutex + ers *ers.ERService + rldChan chan struct{} + stopChan chan struct{} + connChan chan rpcclient.RpcClientConnection +} + +// Start should handle the sercive start +func (erS *EventReaderService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + if erS.IsRunning() { + return fmt.Errorf("service aleady running") + } + + erS.Lock() + defer erS.Unlock() + + // remake tht stop chan + erS.stopChan = make(chan struct{}, 1) + + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs)) + var sS rpcclient.RpcClientConnection + if sS, err = sp.GetConnection(utils.SessionS, sp.GetConfig().ERsCfg().SessionSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> failed connecting to <%s>, error: <%s>", + utils.ERs, utils.SessionS, err.Error())) + return + } + // build the service + erS.ers = ers.NewERService(sp.GetConfig(), sp.GetFilterS(), sS, erS.stopChan) + go func(erS *ers.ERService, rldChan chan struct{}) { + if err = erS.ListenAndServe(rldChan); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) + } + sp.GetExitChan() <- true + }(erS.ers, erS.rldChan) + return +} + +// GetIntenternalChan returns the internal connection chanel +func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return erS.connChan +} + +// Reload handles the change of config +func (erS *EventReaderService) Reload(sp servmanager.ServiceProvider) (err error) { + erS.Lock() + erS.rldChan <- struct{}{} + erS.Unlock() + return +} + +// Shutdown stops the service +func (erS *EventReaderService) Shutdown() (err error) { + erS.Lock() + close(erS.stopChan) + erS.ers = nil + <-erS.connChan + erS.Unlock() + return +} + +// GetRPCInterface returns the interface to register for server +func (erS *EventReaderService) GetRPCInterface() interface{} { + return erS.ers +} + +// IsRunning returns if the service is running +func (erS *EventReaderService) IsRunning() bool { + erS.RLock() + defer erS.RUnlock() + return erS != nil && erS.ers != nil +} + +// ServiceName returns the service name +func (erS *EventReaderService) ServiceName() string { + return utils.ERs +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index bdab4206a..0eadffbd5 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -277,6 +277,9 @@ func (srvMngr *ServiceManager) StartServices() (err error) { if srvMngr.GetConfig().SessionSCfg().Enabled { go srvMngr.startService(utils.SessionS) } + if srvMngr.GetConfig().ERsCfg().Enabled { + go srvMngr.startService(utils.ERs) + } // startServer() return } @@ -340,6 +343,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.SessionS, srvMngr.GetConfig().SessionSCfg().Enabled); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson): + if err = srvMngr.reloadService(utils.ERs, srvMngr.GetConfig().ERsCfg().Enabled); err != nil { + return + } } // handle RPC server }