diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index 55287b325..eeb3269ce 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -114,7 +114,8 @@ func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalRsChan, internalStatSChan,
internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan,
internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan,
- internalLoaderSChan, internalRALsv1Chan, internalCacheSChan chan rpcclient.ClientConnector,
+ internalLoaderSChan, internalRALsv1Chan, internalCacheSChan,
+ internalEEsChan chan rpcclient.ClientConnector,
exitChan chan bool) {
if !cfg.DispatcherSCfg().Enabled {
select { // Any of the rpc methods will unlock listening to rpc requests
@@ -144,6 +145,8 @@ func startRpc(server *utils.Server, internalRaterChan,
internalRALsv1Chan <- ralS
case chS := <-internalCacheSChan: // added in order to start the RPC before precaching is done
internalCacheSChan <- chS
+ case eeS := <-internalEEsChan:
+ internalEEsChan <- eeS
}
} else {
select {
@@ -518,6 +521,8 @@ func main() {
services.NewDiameterAgent(cfg, filterSChan, exitChan, connManager), // partial reload
services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload
ldrs, anz, dspS, dmService, storDBService,
+ services.NewEventExporterService(cfg, filterSChan,
+ connManager, server, exitChan, internalEEsChan),
)
srvManager.StartServices()
// Start FilterS
@@ -526,7 +531,7 @@ func main() {
initServiceManagerV1(internalServeManagerChan, srvManager, server)
- // init internalRPCSet because we can have double connections in rpc_conns and one of it could be *internal
+ // init internalRPCSet to share internal connections among the engine
engine.IntRPC = engine.NewRPCClientSet()
engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, anz.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.APIerSv1, apiSv1.GetIntenternalChan())
@@ -557,7 +562,8 @@ func main() {
reS.GetIntenternalChan(), stS.GetIntenternalChan(),
attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(),
routeS.GetIntenternalChan(), smg.GetIntenternalChan(), anz.GetIntenternalChan(),
- dspS.GetIntenternalChan(), ldrs.GetIntenternalChan(), rals.GetIntenternalChan(), internalCacheSChan, exitChan)
+ dspS.GetIntenternalChan(), ldrs.GetIntenternalChan(), rals.GetIntenternalChan(),
+ internalCacheSChan, internalEEsChan, exitChan)
<-exitChan
if *cpuProfDir != "" { // wait to end cpuProfiling
diff --git a/ees/ees.go b/ees/ees.go
index 24267276f..9624ad681 100644
--- a/ees/ees.go
+++ b/ees/ees.go
@@ -34,10 +34,10 @@ func onCacheEvicted(itmID string, value interface{}) {
ee.OnEvicted(itmID, value)
}
-// NewERService instantiates the EEService
-func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS,
- connMgr *engine.ConnManager) *EEService {
- return &EEService{
+// NewERService instantiates the EventExporterS
+func NewEventExporterS(cfg *config.CGRConfig, filterS *engine.FilterS,
+ connMgr *engine.ConnManager) *EventExporterS {
+ return &EventExporterS{
cfg: cfg,
filterS: filterS,
connMgr: connMgr,
@@ -45,8 +45,8 @@ func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS,
}
}
-// EEService is managing the EventExporters
-type EEService struct {
+// EventExporterS is managing the EventExporters
+type EventExporterS struct {
cfg *config.CGRConfig
filterS *engine.FilterS
connMgr *engine.ConnManager
@@ -56,9 +56,9 @@ type EEService struct {
}
// ListenAndServe keeps the service alive
-func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) {
+func (eeS *EventExporterS) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>",
- utils.CoreS, utils.EventExporterService))
+ utils.CoreS, utils.EventExporterS))
for {
select {
case e := <-exitChan: // global exit
@@ -68,7 +68,7 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (
case rld := <-cfgRld: // configuration was reloaded, destroy the cache
cfgRld <- rld
utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.",
- utils.EventExporterService))
+ utils.EventExporterS))
eeS.setupCache(eeS.cfg.EEsCfg().Cache)
}
}
@@ -76,14 +76,19 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (
}
// Shutdown is called to shutdown the service
-func (eeS *EEService) Shutdown() (err error) {
- utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterService))
+func (eeS *EventExporterS) Shutdown() (err error) {
+ utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterS))
eeS.setupCache(nil) // cleanup exporters
return
}
+// Call implements rpcclient.ClientConnector interface for internal RPC
+func (eeS *EventExporterS) Call(serviceMethod string, args interface{}, reply interface{}) error {
+ return utils.RPCCall(eeS, serviceMethod, args, reply)
+}
+
// setupCache deals with cleanup and initialization of the cache of EventExporters
-func (eeS *EEService) setupCache(chCfgs map[string]*config.CacheParamCfg) {
+func (eeS *EventExporterS) setupCache(chCfgs map[string]*config.CacheParamCfg) {
eeS.eesMux.Lock()
for chID, ch := range eeS.eesChs { // cleanup
ch.Clear()
@@ -99,7 +104,7 @@ func (eeS *EEService) setupCache(chCfgs map[string]*config.CacheParamCfg) {
eeS.eesMux.Unlock()
}
-func (eeS *EEService) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attrIDs []string, ctx string) (err error) {
+func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attrIDs []string, ctx string) (err error) {
var rplyEv engine.AttrSProcessEventReply
attrArgs := &engine.AttrArgsProcessEvent{
AttributeIDs: attrIDs,
@@ -121,7 +126,7 @@ func (eeS *EEService) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attrIDs [
}
// ProcessEvent will be called each time a new event is received from readers
-func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) {
+func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) {
eeS.cfg.RLocks(config.EEsJson)
defer eeS.cfg.RUnlocks(config.EEsJson)
@@ -180,7 +185,7 @@ func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error)
if err := ee.ExportEvent(cgrEv.CGREvent); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> with id <%s>, error: <%s>",
- utils.EventExporterService, ee.ID(), err.Error()))
+ utils.EventExporterS, ee.ID(), err.Error()))
withErr = true
}
if evict {
diff --git a/services/attributes.go b/services/attributes.go
index 1d1828b59..6c09855be 100644
--- a/services/attributes.go
+++ b/services/attributes.go
@@ -54,8 +54,8 @@ type AttributeService struct {
server *utils.Server
attrS *engine.AttributeService
- rpc *v1.AttributeSv1
- connChan chan rpcclient.ClientConnector
+ rpc *v1.AttributeSv1 // useful on restart
+ connChan chan rpcclient.ClientConnector // publish the internal Subsystem when available
}
// Start should handle the sercive start
diff --git a/services/ees.go b/services/ees.go
new file mode 100644
index 000000000..2cf5704c0
--- /dev/null
+++ b/services/ees.go
@@ -0,0 +1,124 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package services
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/ees"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/servmanager"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/rpcclient"
+)
+
+// NewEventExporterService constructs EventExporterService
+func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
+ connMgr *engine.ConnManager, server *utils.Server, exitChan chan bool,
+ intConnChan chan rpcclient.ClientConnector) servmanager.Service {
+ return &EventExporterService{
+ cfg: cfg,
+ filterSChan: filterSChan,
+ connMgr: connMgr,
+ server: server,
+ exitChan: exitChan,
+ intConnChan: intConnChan,
+ rldChan: make(chan struct{}),
+ }
+}
+
+// EventExporterService is the service structure for EventExporterS
+type EventExporterService struct {
+ sync.RWMutex
+
+ cfg *config.CGRConfig
+ filterSChan chan *engine.FilterS
+ connMgr *engine.ConnManager
+ server *utils.Server
+ exitChan chan bool
+ intConnChan chan rpcclient.ClientConnector
+ rldChan chan struct{}
+
+ eeS *ees.EventExporterS
+}
+
+// GetIntenternalChan is deprecated and it will be removed shortly
+func (es *EventExporterService) GetIntenternalChan() (conn chan rpcclient.ClientConnector) {
+ panic("deprecated method")
+}
+
+// IsRunning returns if the service is running
+func (es *EventExporterService) IsRunning() bool {
+ es.RLock()
+ defer es.RUnlock()
+ return es != nil && es.eeS != nil
+}
+
+// ServiceName returns the service name
+func (es *EventExporterService) ServiceName() string {
+ return utils.EventExporterS
+}
+
+// ShouldRun returns if the service should be running
+func (es *EventExporterService) ShouldRun() (should bool) {
+ es.cfg.RLocks(config.EEsJson)
+ should = es.cfg.EEsCfg().Enabled
+ es.cfg.RUnlocks(config.EEsJson)
+ return
+}
+
+// Reload handles the change of config
+func (es *EventExporterService) Reload() (err error) {
+ es.rldChan <- struct{}{}
+ return // for the momment nothing to reload
+}
+
+// Shutdown stops the service
+func (es *EventExporterService) Shutdown() (err error) {
+ es.Lock()
+ defer es.Unlock()
+ if err = es.eeS.Shutdown(); err != nil {
+ return
+ }
+ es.eeS = nil
+ <-es.intConnChan
+ return
+}
+
+// Start should handle the service start
+func (es *EventExporterService) Start() (err error) {
+ if es.IsRunning() {
+ return fmt.Errorf("service aleady running")
+ }
+
+ fltrS := <-es.filterSChan
+ es.filterSChan <- fltrS
+
+ es.Lock()
+ es.eeS = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr)
+ es.Unlock()
+ if err != nil {
+
+ return
+ }
+ es.intConnChan <- es.eeS
+ return es.eeS.ListenAndServe(es.exitChan, es.rldChan)
+}
diff --git a/services/stats.go b/services/stats.go
index a9e2edfa3..66e40bf70 100644
--- a/services/stats.go
+++ b/services/stats.go
@@ -78,12 +78,15 @@ func (sts *StatService) Start() (err error) {
sts.Lock()
defer sts.Unlock()
- sts.sts, err = engine.NewStatService(datadb, sts.cfg, filterS, sts.connMgr)
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error()))
+ if sts.sts, err = engine.NewStatService(datadb,
+ sts.cfg, filterS, sts.connMgr); err != nil {
+ utils.Logger.Crit(
+ fmt.Sprintf(" Could not init, error: %s",
+ err.Error()))
return
}
- utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS))
+ utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
+ utils.CoreS, utils.StatS))
sts.sts.StartLoop()
sts.rpc = v1.NewStatSv1(sts.sts)
if !sts.cfg.DispatcherSCfg().Enabled {
diff --git a/utils/consts.go b/utils/consts.go
index b224142f1..029a7d586 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -720,7 +720,7 @@ const (
Count = "Count"
ProfileID = "ProfileID"
SortedRoutes = "SortedRoutes"
- EventExporterService = "EventExporterService"
+ EventExporterS = "EventExporterS"
MetaMonthly = "*monthly"
MetaYearly = "*yearly"
MetaDaily = "*daily"