From 480dce20675d57c8c4fa91022ea4b1200f37b236 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 6 May 2020 15:26:27 +0200 Subject: [PATCH] EEs - init and reinit of the events cache --- ees/ees.go | 101 ++++++++++++++++++++++++++++++++----------------- ees/filecsv.go | 8 +++- engine/cdrs.go | 3 +- 3 files changed, 76 insertions(+), 36 deletions(-) diff --git a/ees/ees.go b/ees/ees.go index 7735d6e55..13c54ef87 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -25,8 +25,15 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" ) +// onCacheEvicted is called by ltcache when evicting an item +func onCacheEvicted(itmID string, value interface{}) { + ee := value.(EventExporter) + ee.OnEvicted(itmID, value) +} + // NewERService instantiates the EEService func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) *EEService { @@ -34,7 +41,7 @@ func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS, cfg: cfg, filterS: filterS, connMgr: connMgr, - ees: make(map[string]EventExporter), + eesChs: make(map[string]*ltcache.Cache), } } @@ -44,8 +51,8 @@ type EEService struct { filterS *engine.FilterS connMgr *engine.ConnManager - ees map[string]EventExporter // map[eeType]EventExporterID - eesMux sync.RWMutex // protects the ees + eesChs map[string]*ltcache.Cache // map[eeType]*ltcache.Cache + eesMux sync.RWMutex // protects the eesChs } // ListenAndServe keeps the service alive @@ -62,10 +69,7 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) ( cfgRld <- rld utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.", utils.EventExporterService)) - eeS.eesMux.Lock() - eeS.ees = make(map[string]EventExporter) - eeS.eesMux.Unlock() - + eeS.initCache(eeS.cfg.EEsCfg().Cache) } } return @@ -74,6 +78,42 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) ( // Shutdown is called to shutdown the service func (eeS *EEService) Shutdown() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterService)) + eeS.initCache(nil) // cleanup exporters + return +} + +// initCache deals with cleanup and initialization of the cache of EventExporters +func (eeS *EEService) initCache(chCfgs map[string]*config.CacheParamCfg) { + eeS.eesMux.Lock() + for chID, ch := range eeS.eesChs { // cleanup + ch.Clear() + delete(eeS.eesChs, chID) + } + for chID, chCfg := range chCfgs { // init + eeS.eesChs[chID] = ltcache.NewCache(chCfg.Limit, + chCfg.TTL, chCfg.StaticTTL, onCacheEvicted) + } + eeS.eesMux.Unlock() +} + +func (eeS *EEService) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attrIDs []string, ctx string) (err error) { + var rplyEv engine.AttrSProcessEventReply + attrArgs := &engine.AttrArgsProcessEvent{ + AttributeIDs: attrIDs, + Context: utils.StringPointer(ctx), + CGREvent: cgrEv.CGREvent, + ArgDispatcher: cgrEv.ArgDispatcher, + } + if err = eeS.connMgr.Call( + eeS.cfg.EEsCfg().AttributeSConns, nil, + utils.AttributeSv1ProcessEvent, + attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { + cgrEv.CGREvent = rplyEv.CGREvent + cgrEv.Opts = rplyEv.Opts + } else if err != nil && + err.Error() == utils.ErrNotFound.Error() { + err = nil // cancel ErrNotFound + } return } @@ -83,41 +123,34 @@ func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) defer eeS.cfg.RUnlocks(config.EEsJson) for cfgIdx, eeCfg := range eeS.cfg.EEsCfg().Exporters { - if eeCfg.Flags.GetBool(utils.MetaAttributes) { - var rplyEv engine.AttrSProcessEventReply - attrArgs := &engine.AttrArgsProcessEvent{ - AttributeIDs: eeCfg.AttributeSIDs, - Context: utils.StringPointer( - utils.FirstNonEmpty( - eeCfg.AttributeSCtx, - utils.IfaceAsString(cgrEv.Opts[utils.Context]), - utils.MetaEEs)), - CGREvent: cgrEv.CGREvent, - ArgDispatcher: cgrEv.ArgDispatcher, - } - if err = eeS.connMgr.Call( - eeS.cfg.EEsCfg().AttributeSConns, nil, - utils.AttributeSv1ProcessEvent, - attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { - cgrEv.CGREvent = rplyEv.CGREvent - cgrEv.Opts = rplyEv.Opts - } else if err != nil { - if err.Error() != utils.ErrNotFound.Error() { - return - } - err = nil // cancel ErrNotFound + if err = eeS.attrSProcessEvent( + cgrEv, + eeCfg.AttributeSIDs, + utils.FirstNonEmpty( + eeCfg.AttributeSCtx, + utils.IfaceAsString(cgrEv.Opts[utils.Context]), + utils.MetaEEs)); err != nil { + return } } - eeS.eesMux.RLock() - ee, has := eeS.ees[eeCfg.ID] + eeCache, hasCache := eeS.eesChs[eeCfg.Type] eeS.eesMux.RUnlock() - if !has { + var isCached bool + var ee EventExporter + if hasCache { + if x, isCached := eeCache.Get(eeCfg.ID); isCached { + ee = x.(EventExporter) + } + } + if !isCached { if ee, err = NewEventExporter(eeS.cfg, cfgIdx); err != nil { return } - eeS.ees[eeCfg.ID] = ee + if hasCache { + eeCache.Set(eeCfg.ID, ee, nil) + } } if err = ee.ExportEvent(cgrEv.CGREvent); err != nil { return diff --git a/ees/filecsv.go b/ees/filecsv.go index c143eb028..cbaeef725 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -25,6 +25,7 @@ import ( func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int) (fCsv *FileCSVee, err error) { fCsv = &FileCSVee{cgrCfg: cgrCfg, cfgIdx: cfgIdx} + err = fCsv.init() return } @@ -34,7 +35,12 @@ type FileCSVee struct { cfgIdx int // index of config instance within ERsCfg.Readers } -// OnEvicted implements EventExporter +// init will create all the necessary dependencies, including opening the file +func (fCsv *FileCSVee) init() (err error) { + return +} + +// OnEvicted implements EventExporter, doing the cleanup before exit func (fCsv *FileCSVee) OnEvicted(itmID string, value interface{}) { return } diff --git a/engine/cdrs.go b/engine/cdrs.go index 387ab8bbc..b3f731d1b 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -358,7 +358,8 @@ func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithOpts) (err err attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { cgrEv.CGREvent = rplyEv.CGREvent cgrEv.Opts = rplyEv.Opts - } else if err.Error() == utils.ErrNotFound.Error() { + } else if err != nil && + err.Error() == utils.ErrNotFound.Error() { err = nil // cancel ErrNotFound } return