From b497cf22814e2235ad5a88ae25d8d7346b909996 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 8 May 2020 18:02:46 +0200 Subject: [PATCH] EventExporterService implementation --- cmd/cgr-engine/cgr-engine.go | 12 +++- ees/ees.go | 35 +++++----- services/attributes.go | 4 +- services/ees.go | 124 +++++++++++++++++++++++++++++++++++ services/stats.go | 11 ++-- utils/consts.go | 2 +- 6 files changed, 163 insertions(+), 25 deletions(-) create mode 100644 services/ees.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 55287b325..eeb3269ce 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -114,7 +114,8 @@ func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalRsChan, internalStatSChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, - internalLoaderSChan, internalRALsv1Chan, internalCacheSChan chan rpcclient.ClientConnector, + internalLoaderSChan, internalRALsv1Chan, internalCacheSChan, + internalEEsChan chan rpcclient.ClientConnector, exitChan chan bool) { if !cfg.DispatcherSCfg().Enabled { select { // Any of the rpc methods will unlock listening to rpc requests @@ -144,6 +145,8 @@ func startRpc(server *utils.Server, internalRaterChan, internalRALsv1Chan <- ralS case chS := <-internalCacheSChan: // added in order to start the RPC before precaching is done internalCacheSChan <- chS + case eeS := <-internalEEsChan: + internalEEsChan <- eeS } } else { select { @@ -518,6 +521,8 @@ func main() { services.NewDiameterAgent(cfg, filterSChan, exitChan, connManager), // partial reload services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload ldrs, anz, dspS, dmService, storDBService, + services.NewEventExporterService(cfg, filterSChan, + connManager, server, exitChan, internalEEsChan), ) srvManager.StartServices() // Start FilterS @@ -526,7 +531,7 @@ func main() { initServiceManagerV1(internalServeManagerChan, srvManager, server) - // init internalRPCSet because we can have double connections in rpc_conns and one of it could be *internal + // init internalRPCSet to share internal connections among the engine engine.IntRPC = engine.NewRPCClientSet() engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, anz.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.APIerSv1, apiSv1.GetIntenternalChan()) @@ -557,7 +562,8 @@ func main() { reS.GetIntenternalChan(), stS.GetIntenternalChan(), attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(), routeS.GetIntenternalChan(), smg.GetIntenternalChan(), anz.GetIntenternalChan(), - dspS.GetIntenternalChan(), ldrs.GetIntenternalChan(), rals.GetIntenternalChan(), internalCacheSChan, exitChan) + dspS.GetIntenternalChan(), ldrs.GetIntenternalChan(), rals.GetIntenternalChan(), + internalCacheSChan, internalEEsChan, exitChan) <-exitChan if *cpuProfDir != "" { // wait to end cpuProfiling diff --git a/ees/ees.go b/ees/ees.go index 24267276f..9624ad681 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -34,10 +34,10 @@ func onCacheEvicted(itmID string, value interface{}) { ee.OnEvicted(itmID, value) } -// NewERService instantiates the EEService -func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS, - connMgr *engine.ConnManager) *EEService { - return &EEService{ +// NewERService instantiates the EventExporterS +func NewEventExporterS(cfg *config.CGRConfig, filterS *engine.FilterS, + connMgr *engine.ConnManager) *EventExporterS { + return &EventExporterS{ cfg: cfg, filterS: filterS, connMgr: connMgr, @@ -45,8 +45,8 @@ func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS, } } -// EEService is managing the EventExporters -type EEService struct { +// EventExporterS is managing the EventExporters +type EventExporterS struct { cfg *config.CGRConfig filterS *engine.FilterS connMgr *engine.ConnManager @@ -56,9 +56,9 @@ type EEService struct { } // ListenAndServe keeps the service alive -func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) { +func (eeS *EventExporterS) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>", - utils.CoreS, utils.EventExporterService)) + utils.CoreS, utils.EventExporterS)) for { select { case e := <-exitChan: // global exit @@ -68,7 +68,7 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) ( case rld := <-cfgRld: // configuration was reloaded, destroy the cache cfgRld <- rld utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.", - utils.EventExporterService)) + utils.EventExporterS)) eeS.setupCache(eeS.cfg.EEsCfg().Cache) } } @@ -76,14 +76,19 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) ( } // Shutdown is called to shutdown the service -func (eeS *EEService) Shutdown() (err error) { - utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterService)) +func (eeS *EventExporterS) Shutdown() (err error) { + utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterS)) eeS.setupCache(nil) // cleanup exporters return } +// Call implements rpcclient.ClientConnector interface for internal RPC +func (eeS *EventExporterS) Call(serviceMethod string, args interface{}, reply interface{}) error { + return utils.RPCCall(eeS, serviceMethod, args, reply) +} + // setupCache deals with cleanup and initialization of the cache of EventExporters -func (eeS *EEService) setupCache(chCfgs map[string]*config.CacheParamCfg) { +func (eeS *EventExporterS) setupCache(chCfgs map[string]*config.CacheParamCfg) { eeS.eesMux.Lock() for chID, ch := range eeS.eesChs { // cleanup ch.Clear() @@ -99,7 +104,7 @@ func (eeS *EEService) setupCache(chCfgs map[string]*config.CacheParamCfg) { eeS.eesMux.Unlock() } -func (eeS *EEService) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attrIDs []string, ctx string) (err error) { +func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attrIDs []string, ctx string) (err error) { var rplyEv engine.AttrSProcessEventReply attrArgs := &engine.AttrArgsProcessEvent{ AttributeIDs: attrIDs, @@ -121,7 +126,7 @@ func (eeS *EEService) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attrIDs [ } // ProcessEvent will be called each time a new event is received from readers -func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) { +func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) { eeS.cfg.RLocks(config.EEsJson) defer eeS.cfg.RUnlocks(config.EEsJson) @@ -180,7 +185,7 @@ func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) if err := ee.ExportEvent(cgrEv.CGREvent); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> with id <%s>, error: <%s>", - utils.EventExporterService, ee.ID(), err.Error())) + utils.EventExporterS, ee.ID(), err.Error())) withErr = true } if evict { diff --git a/services/attributes.go b/services/attributes.go index 1d1828b59..6c09855be 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -54,8 +54,8 @@ type AttributeService struct { server *utils.Server attrS *engine.AttributeService - rpc *v1.AttributeSv1 - connChan chan rpcclient.ClientConnector + rpc *v1.AttributeSv1 // useful on restart + connChan chan rpcclient.ClientConnector // publish the internal Subsystem when available } // Start should handle the sercive start diff --git a/services/ees.go b/services/ees.go new file mode 100644 index 000000000..2cf5704c0 --- /dev/null +++ b/services/ees.go @@ -0,0 +1,124 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/ees" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewEventExporterService constructs EventExporterService +func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, + connMgr *engine.ConnManager, server *utils.Server, exitChan chan bool, + intConnChan chan rpcclient.ClientConnector) servmanager.Service { + return &EventExporterService{ + cfg: cfg, + filterSChan: filterSChan, + connMgr: connMgr, + server: server, + exitChan: exitChan, + intConnChan: intConnChan, + rldChan: make(chan struct{}), + } +} + +// EventExporterService is the service structure for EventExporterS +type EventExporterService struct { + sync.RWMutex + + cfg *config.CGRConfig + filterSChan chan *engine.FilterS + connMgr *engine.ConnManager + server *utils.Server + exitChan chan bool + intConnChan chan rpcclient.ClientConnector + rldChan chan struct{} + + eeS *ees.EventExporterS +} + +// GetIntenternalChan is deprecated and it will be removed shortly +func (es *EventExporterService) GetIntenternalChan() (conn chan rpcclient.ClientConnector) { + panic("deprecated method") +} + +// IsRunning returns if the service is running +func (es *EventExporterService) IsRunning() bool { + es.RLock() + defer es.RUnlock() + return es != nil && es.eeS != nil +} + +// ServiceName returns the service name +func (es *EventExporterService) ServiceName() string { + return utils.EventExporterS +} + +// ShouldRun returns if the service should be running +func (es *EventExporterService) ShouldRun() (should bool) { + es.cfg.RLocks(config.EEsJson) + should = es.cfg.EEsCfg().Enabled + es.cfg.RUnlocks(config.EEsJson) + return +} + +// Reload handles the change of config +func (es *EventExporterService) Reload() (err error) { + es.rldChan <- struct{}{} + return // for the momment nothing to reload +} + +// Shutdown stops the service +func (es *EventExporterService) Shutdown() (err error) { + es.Lock() + defer es.Unlock() + if err = es.eeS.Shutdown(); err != nil { + return + } + es.eeS = nil + <-es.intConnChan + return +} + +// Start should handle the service start +func (es *EventExporterService) Start() (err error) { + if es.IsRunning() { + return fmt.Errorf("service aleady running") + } + + fltrS := <-es.filterSChan + es.filterSChan <- fltrS + + es.Lock() + es.eeS = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr) + es.Unlock() + if err != nil { + + return + } + es.intConnChan <- es.eeS + return es.eeS.ListenAndServe(es.exitChan, es.rldChan) +} diff --git a/services/stats.go b/services/stats.go index a9e2edfa3..66e40bf70 100644 --- a/services/stats.go +++ b/services/stats.go @@ -78,12 +78,15 @@ func (sts *StatService) Start() (err error) { sts.Lock() defer sts.Unlock() - sts.sts, err = engine.NewStatService(datadb, sts.cfg, filterS, sts.connMgr) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) + if sts.sts, err = engine.NewStatService(datadb, + sts.cfg, filterS, sts.connMgr); err != nil { + utils.Logger.Crit( + fmt.Sprintf(" Could not init, error: %s", + err.Error())) return } - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS)) + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", + utils.CoreS, utils.StatS)) sts.sts.StartLoop() sts.rpc = v1.NewStatSv1(sts.sts) if !sts.cfg.DispatcherSCfg().Enabled { diff --git a/utils/consts.go b/utils/consts.go index b224142f1..029a7d586 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -720,7 +720,7 @@ const ( Count = "Count" ProfileID = "ProfileID" SortedRoutes = "SortedRoutes" - EventExporterService = "EventExporterService" + EventExporterS = "EventExporterS" MetaMonthly = "*monthly" MetaYearly = "*yearly" MetaDaily = "*daily"