From 3092380899450813e967648cdaafc77f39df6aa0 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 3 May 2020 20:48:37 +0200 Subject: [PATCH] EEs configuration and initial ProcessEvent --- config/config.go | 13 +++++++-- config/config_json.go | 1 + config/eescfg.go | 44 +++++++++++++++++++++++++++++++ ees/ee.go | 12 +++++++-- ees/ees.go | 61 +++++++++++++++++++++++++++++++++++++------ 5 files changed, 119 insertions(+), 12 deletions(-) create mode 100644 config/eescfg.go diff --git a/config/config.go b/config/config.go index 6d5f9e111..f128a4b9c 100755 --- a/config/config.go +++ b/config/config.go @@ -179,6 +179,7 @@ func NewDefaultCGRConfig() (cfg *CGRConfig, err error) { cfg.loaderCfg = make(LoaderSCfgs, 0) cfg.apier = new(ApierCfg) cfg.ersCfg = new(ERsCfg) + cfg.eesCfg = new(EEsCfg) cfg.ConfigReloads = make(map[string]chan struct{}) cfg.ConfigReloads[utils.CDRE] = make(chan struct{}, 1) @@ -290,8 +291,9 @@ type CGRConfig struct { migratorCgrCfg *MigratorCgrCfg // MigratorCgr config mailerCfg *MailerCfg // Mailer config analyzerSCfg *AnalyzerSCfg // AnalyzerS config - apier *ApierCfg - ersCfg *ERsCfg + apier *ApierCfg // APIer config + ersCfg *ERsCfg // EventReader config + eesCfg *EEsCfg // EventExporter config } var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes, @@ -958,6 +960,13 @@ func (cfg *CGRConfig) ERsCfg() *ERsCfg { return cfg.ersCfg } +// EEsCfg reads the EventExporter configuration +func (cfg *CGRConfig) EEsCfg() *EEsCfg { + cfg.lks[EEsJson].RLock() + defer cfg.lks[EEsJson].RUnlock() + return cfg.eesCfg +} + // RPCConns reads the RPCConns configuration func (cfg *CGRConfig) RPCConns() map[string]*RPCConn { cfg.lks[RPCConnsJsonName].RLock() diff --git a/config/config_json.go b/config/config_json.go index da4fac823..2619cbb92 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -58,6 +58,7 @@ const ( ApierS = "apiers" DNSAgentJson = "dns_agent" ERsJson = "ers" + EEsJson = "ees" RPCConnsJsonName = "rpc_conns" ) diff --git a/config/eescfg.go b/config/eescfg.go new file mode 100644 index 000000000..3a3a06e93 --- /dev/null +++ b/config/eescfg.go @@ -0,0 +1,44 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package config + +import ( + "github.com/cgrates/cgrates/utils" +) + +type EEsCfg struct { + Enabled bool + AttributeSConns []string + Exporters []*EventExporterCfg +} + +type EventExporterCfg struct { + ID string + Type string + ExportPath string + Tenant RSRParsers + Timezone string + Filters []string + Flags utils.FlagsWithParams + AttributeSCtx string // context to use when querying AttributeS + Synchronous bool + Attempts int + FieldSep rune + Fields []*FCTemplate +} diff --git a/ees/ee.go b/ees/ee.go index 72f05527d..384be4fb8 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -18,8 +18,16 @@ along with this program. If not, see package ees -import "github.com/cgrates/cgrates/utils" +import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) type EventExporter interface { - ExportEvent(cgrEv *utils.CGREvent) + ExportEvent(cgrEv *utils.CGREvent) (err error) +} + +// NewEventExporter produces exporters +func NewEventExporter(eeCfg *config.EventExporterCfg) (ee EventExporter, err error) { + return } diff --git a/ees/ees.go b/ees/ees.go index 8253be3b2..f3767ef6c 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -29,7 +29,7 @@ import ( // NewERService instantiates the EEService func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS, - stopChan chan struct{}, connMgr *engine.ConnManager) *EEService { + connMgr *engine.ConnManager) *EEService { return &EEService{ cfg: cfg, filterS: filterS, @@ -40,21 +40,34 @@ func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS, // EEService is managing the EventExporters type EEService struct { - sync.RWMutex cfg *config.CGRConfig filterS *engine.FilterS - connMgr *engine.ConnManager - ees map[string]EventExporter // map[eeType]EventExporter + + ees map[string]EventExporter // map[eeType]EventExporterID + eesMux sync.RWMutex // protects the ees } // ListenAndServe keeps the service alive -func (eeS *EEService) ListenAndServe(exitChan chan bool) (err error) { +func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>", utils.CoreS, utils.EventExporterService)) - e := <-exitChan - eeS.Shutdown() - exitChan <- e // put back for the others listening for shutdown request + for { + select { + case e := <-exitChan: // global exit + eeS.Shutdown() + exitChan <- e // put back for the others listening for shutdown request + break + case rld := <-cfgRld: // configuration was reloaded, destroy the cache + cfgRld <- rld + utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.", + utils.EventExporterService)) + eeS.eesMux.Lock() + eeS.ees = make(map[string]EventExporter) + eeS.eesMux.Unlock() + + } + } return } @@ -66,5 +79,37 @@ func (eeS *EEService) Shutdown() (err error) { // ProcessEvent will be called each time a new event is received from readers func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREvent) (err error) { + /* + var rplyEv AttrSProcessEventReply + attrArgs := &engine.AttrArgsProcessEvent{ + Context: utils.StringPointer(utils.FirstNonEmpty( + utils.IfaceAsString(cgrEv.Opts[utils.Context]), + utils.MetaCDRs)), + CGREvent: cgrEv.CGREvent, + ArgDispatcher: cgrEv.ArgDispatcher, + } + if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().AttributeSConns, nil, + utils.AttributeSv1ProcessEvent, + attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { + cgrEv.CGREvent = rplyEv.CGREvent + cgrEv.Opts = rplyEv.Opts + } else if err.Error() == utils.ErrNotFound.Error() { + err = nil // cancel ErrNotFound + } + */ + eeS.eesMux.RLock() + defer eeS.eesMux.RUnlock() + for _, eeCfg := range eeS.cfg.EEsCfg().Exporters { + ee, has := eeS.ees[eeCfg.ID] + if !has { + if ee, err = NewEventExporter(eeCfg); err != nil { + return + } + eeS.ees[eeCfg.ID] = ee + } + if err = ee.ExportEvent(cgrEv); err != nil { + return + } + } return }