diff --git a/config/config.go b/config/config.go
index 6d5f9e111..f128a4b9c 100755
--- a/config/config.go
+++ b/config/config.go
@@ -179,6 +179,7 @@ func NewDefaultCGRConfig() (cfg *CGRConfig, err error) {
cfg.loaderCfg = make(LoaderSCfgs, 0)
cfg.apier = new(ApierCfg)
cfg.ersCfg = new(ERsCfg)
+ cfg.eesCfg = new(EEsCfg)
cfg.ConfigReloads = make(map[string]chan struct{})
cfg.ConfigReloads[utils.CDRE] = make(chan struct{}, 1)
@@ -290,8 +291,9 @@ type CGRConfig struct {
migratorCgrCfg *MigratorCgrCfg // MigratorCgr config
mailerCfg *MailerCfg // Mailer config
analyzerSCfg *AnalyzerSCfg // AnalyzerS config
- apier *ApierCfg
- ersCfg *ERsCfg
+ apier *ApierCfg // APIer config
+ ersCfg *ERsCfg // EventReader config
+ eesCfg *EEsCfg // EventExporter config
}
var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes,
@@ -958,6 +960,13 @@ func (cfg *CGRConfig) ERsCfg() *ERsCfg {
return cfg.ersCfg
}
+// EEsCfg reads the EventExporter configuration
+func (cfg *CGRConfig) EEsCfg() *EEsCfg {
+ cfg.lks[EEsJson].RLock()
+ defer cfg.lks[EEsJson].RUnlock()
+ return cfg.eesCfg
+}
+
// RPCConns reads the RPCConns configuration
func (cfg *CGRConfig) RPCConns() map[string]*RPCConn {
cfg.lks[RPCConnsJsonName].RLock()
diff --git a/config/config_json.go b/config/config_json.go
index da4fac823..2619cbb92 100644
--- a/config/config_json.go
+++ b/config/config_json.go
@@ -58,6 +58,7 @@ const (
ApierS = "apiers"
DNSAgentJson = "dns_agent"
ERsJson = "ers"
+ EEsJson = "ees"
RPCConnsJsonName = "rpc_conns"
)
diff --git a/config/eescfg.go b/config/eescfg.go
new file mode 100644
index 000000000..3a3a06e93
--- /dev/null
+++ b/config/eescfg.go
@@ -0,0 +1,44 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package config
+
+import (
+ "github.com/cgrates/cgrates/utils"
+)
+
+type EEsCfg struct {
+ Enabled bool
+ AttributeSConns []string
+ Exporters []*EventExporterCfg
+}
+
+type EventExporterCfg struct {
+ ID string
+ Type string
+ ExportPath string
+ Tenant RSRParsers
+ Timezone string
+ Filters []string
+ Flags utils.FlagsWithParams
+ AttributeSCtx string // context to use when querying AttributeS
+ Synchronous bool
+ Attempts int
+ FieldSep rune
+ Fields []*FCTemplate
+}
diff --git a/ees/ee.go b/ees/ee.go
index 72f05527d..384be4fb8 100644
--- a/ees/ee.go
+++ b/ees/ee.go
@@ -18,8 +18,16 @@ along with this program. If not, see
package ees
-import "github.com/cgrates/cgrates/utils"
+import (
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+)
type EventExporter interface {
- ExportEvent(cgrEv *utils.CGREvent)
+ ExportEvent(cgrEv *utils.CGREvent) (err error)
+}
+
+// NewEventExporter produces exporters
+func NewEventExporter(eeCfg *config.EventExporterCfg) (ee EventExporter, err error) {
+ return
}
diff --git a/ees/ees.go b/ees/ees.go
index 8253be3b2..f3767ef6c 100644
--- a/ees/ees.go
+++ b/ees/ees.go
@@ -29,7 +29,7 @@ import (
// NewERService instantiates the EEService
func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS,
- stopChan chan struct{}, connMgr *engine.ConnManager) *EEService {
+ connMgr *engine.ConnManager) *EEService {
return &EEService{
cfg: cfg,
filterS: filterS,
@@ -40,21 +40,34 @@ func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS,
// EEService is managing the EventExporters
type EEService struct {
- sync.RWMutex
cfg *config.CGRConfig
filterS *engine.FilterS
-
connMgr *engine.ConnManager
- ees map[string]EventExporter // map[eeType]EventExporter
+
+ ees map[string]EventExporter // map[eeType]EventExporterID
+ eesMux sync.RWMutex // protects the ees
}
// ListenAndServe keeps the service alive
-func (eeS *EEService) ListenAndServe(exitChan chan bool) (err error) {
+func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>",
utils.CoreS, utils.EventExporterService))
- e := <-exitChan
- eeS.Shutdown()
- exitChan <- e // put back for the others listening for shutdown request
+ for {
+ select {
+ case e := <-exitChan: // global exit
+ eeS.Shutdown()
+ exitChan <- e // put back for the others listening for shutdown request
+ break
+ case rld := <-cfgRld: // configuration was reloaded, destroy the cache
+ cfgRld <- rld
+ utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.",
+ utils.EventExporterService))
+ eeS.eesMux.Lock()
+ eeS.ees = make(map[string]EventExporter)
+ eeS.eesMux.Unlock()
+
+ }
+ }
return
}
@@ -66,5 +79,37 @@ func (eeS *EEService) Shutdown() (err error) {
// ProcessEvent will be called each time a new event is received from readers
func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREvent) (err error) {
+ /*
+ var rplyEv AttrSProcessEventReply
+ attrArgs := &engine.AttrArgsProcessEvent{
+ Context: utils.StringPointer(utils.FirstNonEmpty(
+ utils.IfaceAsString(cgrEv.Opts[utils.Context]),
+ utils.MetaCDRs)),
+ CGREvent: cgrEv.CGREvent,
+ ArgDispatcher: cgrEv.ArgDispatcher,
+ }
+ if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().AttributeSConns, nil,
+ utils.AttributeSv1ProcessEvent,
+ attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
+ cgrEv.CGREvent = rplyEv.CGREvent
+ cgrEv.Opts = rplyEv.Opts
+ } else if err.Error() == utils.ErrNotFound.Error() {
+ err = nil // cancel ErrNotFound
+ }
+ */
+ eeS.eesMux.RLock()
+ defer eeS.eesMux.RUnlock()
+ for _, eeCfg := range eeS.cfg.EEsCfg().Exporters {
+ ee, has := eeS.ees[eeCfg.ID]
+ if !has {
+ if ee, err = NewEventExporter(eeCfg); err != nil {
+ return
+ }
+ eeS.ees[eeCfg.ID] = ee
+ }
+ if err = ee.ExportEvent(cgrEv); err != nil {
+ return
+ }
+ }
return
}