EEs - init and reinit of the events cache

This commit is contained in:
DanB
2020-05-06 15:26:27 +02:00
parent 0d285b50e4
commit 480dce2067
3 changed files with 76 additions and 36 deletions

View File

@@ -25,8 +25,15 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
// onCacheEvicted is called by ltcache when evicting an item
func onCacheEvicted(itmID string, value interface{}) {
ee := value.(EventExporter)
ee.OnEvicted(itmID, value)
}
// NewERService instantiates the EEService
func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS,
connMgr *engine.ConnManager) *EEService {
@@ -34,7 +41,7 @@ func NewEEService(cfg *config.CGRConfig, filterS *engine.FilterS,
cfg: cfg,
filterS: filterS,
connMgr: connMgr,
ees: make(map[string]EventExporter),
eesChs: make(map[string]*ltcache.Cache),
}
}
@@ -44,8 +51,8 @@ type EEService struct {
filterS *engine.FilterS
connMgr *engine.ConnManager
ees map[string]EventExporter // map[eeType]EventExporterID
eesMux sync.RWMutex // protects the ees
eesChs map[string]*ltcache.Cache // map[eeType]*ltcache.Cache
eesMux sync.RWMutex // protects the eesChs
}
// ListenAndServe keeps the service alive
@@ -62,10 +69,7 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (
cfgRld <- rld
utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.",
utils.EventExporterService))
eeS.eesMux.Lock()
eeS.ees = make(map[string]EventExporter)
eeS.eesMux.Unlock()
eeS.initCache(eeS.cfg.EEsCfg().Cache)
}
}
return
@@ -74,6 +78,42 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (
// Shutdown is called to shutdown the service
func (eeS *EEService) Shutdown() (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterService))
eeS.initCache(nil) // cleanup exporters
return
}
// initCache deals with cleanup and initialization of the cache of EventExporters
func (eeS *EEService) initCache(chCfgs map[string]*config.CacheParamCfg) {
eeS.eesMux.Lock()
for chID, ch := range eeS.eesChs { // cleanup
ch.Clear()
delete(eeS.eesChs, chID)
}
for chID, chCfg := range chCfgs { // init
eeS.eesChs[chID] = ltcache.NewCache(chCfg.Limit,
chCfg.TTL, chCfg.StaticTTL, onCacheEvicted)
}
eeS.eesMux.Unlock()
}
func (eeS *EEService) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attrIDs []string, ctx string) (err error) {
var rplyEv engine.AttrSProcessEventReply
attrArgs := &engine.AttrArgsProcessEvent{
AttributeIDs: attrIDs,
Context: utils.StringPointer(ctx),
CGREvent: cgrEv.CGREvent,
ArgDispatcher: cgrEv.ArgDispatcher,
}
if err = eeS.connMgr.Call(
eeS.cfg.EEsCfg().AttributeSConns, nil,
utils.AttributeSv1ProcessEvent,
attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
cgrEv.CGREvent = rplyEv.CGREvent
cgrEv.Opts = rplyEv.Opts
} else if err != nil &&
err.Error() == utils.ErrNotFound.Error() {
err = nil // cancel ErrNotFound
}
return
}
@@ -83,41 +123,34 @@ func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error)
defer eeS.cfg.RUnlocks(config.EEsJson)
for cfgIdx, eeCfg := range eeS.cfg.EEsCfg().Exporters {
if eeCfg.Flags.GetBool(utils.MetaAttributes) {
var rplyEv engine.AttrSProcessEventReply
attrArgs := &engine.AttrArgsProcessEvent{
AttributeIDs: eeCfg.AttributeSIDs,
Context: utils.StringPointer(
utils.FirstNonEmpty(
eeCfg.AttributeSCtx,
utils.IfaceAsString(cgrEv.Opts[utils.Context]),
utils.MetaEEs)),
CGREvent: cgrEv.CGREvent,
ArgDispatcher: cgrEv.ArgDispatcher,
}
if err = eeS.connMgr.Call(
eeS.cfg.EEsCfg().AttributeSConns, nil,
utils.AttributeSv1ProcessEvent,
attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
cgrEv.CGREvent = rplyEv.CGREvent
cgrEv.Opts = rplyEv.Opts
} else if err != nil {
if err.Error() != utils.ErrNotFound.Error() {
return
}
err = nil // cancel ErrNotFound
if err = eeS.attrSProcessEvent(
cgrEv,
eeCfg.AttributeSIDs,
utils.FirstNonEmpty(
eeCfg.AttributeSCtx,
utils.IfaceAsString(cgrEv.Opts[utils.Context]),
utils.MetaEEs)); err != nil {
return
}
}
eeS.eesMux.RLock()
ee, has := eeS.ees[eeCfg.ID]
eeCache, hasCache := eeS.eesChs[eeCfg.Type]
eeS.eesMux.RUnlock()
if !has {
var isCached bool
var ee EventExporter
if hasCache {
if x, isCached := eeCache.Get(eeCfg.ID); isCached {
ee = x.(EventExporter)
}
}
if !isCached {
if ee, err = NewEventExporter(eeS.cfg, cfgIdx); err != nil {
return
}
eeS.ees[eeCfg.ID] = ee
if hasCache {
eeCache.Set(eeCfg.ID, ee, nil)
}
}
if err = ee.ExportEvent(cgrEv.CGREvent); err != nil {
return

View File

@@ -25,6 +25,7 @@ import (
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{cgrCfg: cgrCfg, cfgIdx: cfgIdx}
err = fCsv.init()
return
}
@@ -34,7 +35,12 @@ type FileCSVee struct {
cfgIdx int // index of config instance within ERsCfg.Readers
}
// OnEvicted implements EventExporter
// init will create all the necessary dependencies, including opening the file
func (fCsv *FileCSVee) init() (err error) {
return
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (fCsv *FileCSVee) OnEvicted(itmID string, value interface{}) {
return
}

View File

@@ -358,7 +358,8 @@ func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithOpts) (err err
attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
cgrEv.CGREvent = rplyEv.CGREvent
cgrEv.Opts = rplyEv.Opts
} else if err.Error() == utils.ErrNotFound.Error() {
} else if err != nil &&
err.Error() == utils.ErrNotFound.Error() {
err = nil // cancel ErrNotFound
}
return