diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ba34a5820..43bf384d6 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -412,6 +412,7 @@ func main() { internalAPIerSv1Chan := make(chan rpcclient.ClientConnector, 1) internalAPIerSv2Chan := make(chan rpcclient.ClientConnector, 1) internalLoaderSChan := make(chan rpcclient.ClientConnector, 1) + internalEEsChan := make(chan rpcclient.ClientConnector, 1) // initialize the connManager before creating the DMService // because we need to pass the connection to it @@ -435,6 +436,7 @@ func main() { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig): internalConfigChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): internalCoreSv1Chan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRALs): internalRALsChan, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan, }) diff --git a/ees/ee.go b/ees/ee.go index 3ca76024c..e91b75957 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -26,6 +26,7 @@ import ( ) type EventExporter interface { + ID() string // return the exporter identificator ExportEvent(cgrEv *utils.CGREvent) (err error) // called on each event to be exported OnEvicted(itmID string, value interface{}) // called when the exporter needs to terminate } diff --git a/ees/ees.go b/ees/ees.go index 13c54ef87..24267276f 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -69,7 +69,7 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) ( cfgRld <- rld utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.", utils.EventExporterService)) - eeS.initCache(eeS.cfg.EEsCfg().Cache) + eeS.setupCache(eeS.cfg.EEsCfg().Cache) } } return @@ -78,18 +78,21 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) ( // Shutdown is called to shutdown the service func (eeS *EEService) Shutdown() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterService)) - eeS.initCache(nil) // cleanup exporters + eeS.setupCache(nil) // cleanup exporters return } -// initCache deals with cleanup and initialization of the cache of EventExporters -func (eeS *EEService) initCache(chCfgs map[string]*config.CacheParamCfg) { +// setupCache deals with cleanup and initialization of the cache of EventExporters +func (eeS *EEService) setupCache(chCfgs map[string]*config.CacheParamCfg) { eeS.eesMux.Lock() for chID, ch := range eeS.eesChs { // cleanup ch.Clear() delete(eeS.eesChs, chID) } for chID, chCfg := range chCfgs { // init + if chCfg.Limit == 0 { // cache is disabled, will not create + continue + } eeS.eesChs[chID] = ltcache.NewCache(chCfg.Limit, chCfg.TTL, chCfg.StaticTTL, onCacheEvicted) } @@ -122,7 +125,24 @@ func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) eeS.cfg.RLocks(config.EEsJson) defer eeS.cfg.RUnlocks(config.EEsJson) + var wg sync.WaitGroup + var withErr bool for cfgIdx, eeCfg := range eeS.cfg.EEsCfg().Exporters { + + if len(eeCfg.Filters) != 0 { + cgrDp := config.NewNavigableMap(map[string]interface{}{ + utils.MetaReq: cgrEv.Event, + }) + tnt := cgrEv.Tenant + if eeTnt, errTnt := eeCfg.Tenant.ParseEvent(cgrEv.Event); errTnt == nil && eeTnt != utils.EmptyString { + tnt = eeTnt + } + if pass, errPass := eeS.filterS.Pass(tnt, + eeCfg.Filters, cgrDp); errPass != nil || !pass { + continue // does not pass the filters, ignore the exporter + } + } + if eeCfg.Flags.GetBool(utils.MetaAttributes) { if err = eeS.attrSProcessEvent( cgrEv, @@ -134,6 +154,7 @@ func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) return } } + eeS.eesMux.RLock() eeCache, hasCache := eeS.eesChs[eeCfg.Type] eeS.eesMux.RUnlock() @@ -152,9 +173,28 @@ func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) eeCache.Set(eeCfg.ID, ee, nil) } } - if err = ee.ExportEvent(cgrEv.CGREvent); err != nil { - return + if eeCfg.Synchronous { + wg.Add(1) // wait for synchronous or file ones since these need to be done before continuing } + go func(evict, sync bool) { + if err := ee.ExportEvent(cgrEv.CGREvent); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> with id <%s>, error: <%s>", + utils.EventExporterService, ee.ID(), err.Error())) + withErr = true + } + if evict { + ee.OnEvicted("", nil) // so we can close ie the file + } + if sync { + wg.Done() + } + }(!isCached, eeCfg.Synchronous) } + wg.Wait() + if withErr { + err = utils.ErrPartiallyExecuted + } + return } diff --git a/ees/filecsv.go b/ees/filecsv.go index cbaeef725..24e7802a6 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -31,6 +31,7 @@ func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int) (fCsv *FileCSVee, err er // FileCSVee implements EventExporter interface for .csv files type FileCSVee struct { + id string cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers } @@ -40,8 +41,13 @@ func (fCsv *FileCSVee) init() (err error) { return } +// ID returns the identificator of this exporter +func (fCsv *FileCSVee) ID() string { + return fCsv.id +} + // OnEvicted implements EventExporter, doing the cleanup before exit -func (fCsv *FileCSVee) OnEvicted(itmID string, value interface{}) { +func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) { return } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index a1ed9824c..c4ec4bee7 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -344,21 +344,16 @@ func (srvMngr *ServiceManager) reloadService(srviceName string) (err error) { func (srvMngr *ServiceManager) startService(srviceName string) { srv := srvMngr.GetService(srviceName) if err := srv.Start(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, srviceName, err)) + utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, srviceName, err)) srvMngr.engineShutdown <- true } } // GetService returns the named service func (srvMngr *ServiceManager) GetService(subsystem string) (srv Service) { - var has bool srvMngr.RLock() - srv, has = srvMngr.subsystems[subsystem] + srv = srvMngr.subsystems[subsystem] srvMngr.RUnlock() - if !has { // this should not happen (check the added services) - panic(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", - utils.ServiceManager, subsystem)) // because this is not dinamic this should not happen - } return }