EEs with filterS and asynchronous exporters

This commit is contained in:
DanB
2020-05-06 19:51:31 +02:00
parent 8991b48d8c
commit 36431268c5
5 changed files with 58 additions and 14 deletions

View File

@@ -412,6 +412,7 @@ func main() {
internalAPIerSv1Chan := make(chan rpcclient.ClientConnector, 1)
internalAPIerSv2Chan := make(chan rpcclient.ClientConnector, 1)
internalLoaderSChan := make(chan rpcclient.ClientConnector, 1)
internalEEsChan := make(chan rpcclient.ClientConnector, 1)
// initialize the connManager before creating the DMService
// because we need to pass the connection to it
@@ -435,6 +436,7 @@ func main() {
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig): internalConfigChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): internalCoreSv1Chan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRALs): internalRALsChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan,
})

View File

@@ -26,6 +26,7 @@ import (
)
type EventExporter interface {
ID() string // return the exporter identificator
ExportEvent(cgrEv *utils.CGREvent) (err error) // called on each event to be exported
OnEvicted(itmID string, value interface{}) // called when the exporter needs to terminate
}

View File

@@ -69,7 +69,7 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (
cfgRld <- rld
utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.",
utils.EventExporterService))
eeS.initCache(eeS.cfg.EEsCfg().Cache)
eeS.setupCache(eeS.cfg.EEsCfg().Cache)
}
}
return
@@ -78,18 +78,21 @@ func (eeS *EEService) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (
// Shutdown is called to shutdown the service
func (eeS *EEService) Shutdown() (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterService))
eeS.initCache(nil) // cleanup exporters
eeS.setupCache(nil) // cleanup exporters
return
}
// initCache deals with cleanup and initialization of the cache of EventExporters
func (eeS *EEService) initCache(chCfgs map[string]*config.CacheParamCfg) {
// setupCache deals with cleanup and initialization of the cache of EventExporters
func (eeS *EEService) setupCache(chCfgs map[string]*config.CacheParamCfg) {
eeS.eesMux.Lock()
for chID, ch := range eeS.eesChs { // cleanup
ch.Clear()
delete(eeS.eesChs, chID)
}
for chID, chCfg := range chCfgs { // init
if chCfg.Limit == 0 { // cache is disabled, will not create
continue
}
eeS.eesChs[chID] = ltcache.NewCache(chCfg.Limit,
chCfg.TTL, chCfg.StaticTTL, onCacheEvicted)
}
@@ -122,7 +125,24 @@ func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error)
eeS.cfg.RLocks(config.EEsJson)
defer eeS.cfg.RUnlocks(config.EEsJson)
var wg sync.WaitGroup
var withErr bool
for cfgIdx, eeCfg := range eeS.cfg.EEsCfg().Exporters {
if len(eeCfg.Filters) != 0 {
cgrDp := config.NewNavigableMap(map[string]interface{}{
utils.MetaReq: cgrEv.Event,
})
tnt := cgrEv.Tenant
if eeTnt, errTnt := eeCfg.Tenant.ParseEvent(cgrEv.Event); errTnt == nil && eeTnt != utils.EmptyString {
tnt = eeTnt
}
if pass, errPass := eeS.filterS.Pass(tnt,
eeCfg.Filters, cgrDp); errPass != nil || !pass {
continue // does not pass the filters, ignore the exporter
}
}
if eeCfg.Flags.GetBool(utils.MetaAttributes) {
if err = eeS.attrSProcessEvent(
cgrEv,
@@ -134,6 +154,7 @@ func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error)
return
}
}
eeS.eesMux.RLock()
eeCache, hasCache := eeS.eesChs[eeCfg.Type]
eeS.eesMux.RUnlock()
@@ -152,9 +173,28 @@ func (eeS *EEService) V1ProcessEvent(cgrEv *utils.CGREventWithOpts) (err error)
eeCache.Set(eeCfg.ID, ee, nil)
}
}
if err = ee.ExportEvent(cgrEv.CGREvent); err != nil {
return
if eeCfg.Synchronous {
wg.Add(1) // wait for synchronous or file ones since these need to be done before continuing
}
go func(evict, sync bool) {
if err := ee.ExportEvent(cgrEv.CGREvent); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> with id <%s>, error: <%s>",
utils.EventExporterService, ee.ID(), err.Error()))
withErr = true
}
if evict {
ee.OnEvicted("", nil) // so we can close ie the file
}
if sync {
wg.Done()
}
}(!isCached, eeCfg.Synchronous)
}
wg.Wait()
if withErr {
err = utils.ErrPartiallyExecuted
}
return
}

View File

@@ -31,6 +31,7 @@ func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int) (fCsv *FileCSVee, err er
// FileCSVee implements EventExporter interface for .csv files
type FileCSVee struct {
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
}
@@ -40,8 +41,13 @@ func (fCsv *FileCSVee) init() (err error) {
return
}
// ID returns the identificator of this exporter
func (fCsv *FileCSVee) ID() string {
return fCsv.id
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (fCsv *FileCSVee) OnEvicted(itmID string, value interface{}) {
func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
return
}

View File

@@ -344,21 +344,16 @@ func (srvMngr *ServiceManager) reloadService(srviceName string) (err error) {
func (srvMngr *ServiceManager) startService(srviceName string) {
srv := srvMngr.GetService(srviceName)
if err := srv.Start(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, srviceName, err))
utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, srviceName, err))
srvMngr.engineShutdown <- true
}
}
// GetService returns the named service
func (srvMngr *ServiceManager) GetService(subsystem string) (srv Service) {
var has bool
srvMngr.RLock()
srv, has = srvMngr.subsystems[subsystem]
srv = srvMngr.subsystems[subsystem]
srvMngr.RUnlock()
if !has { // this should not happen (check the added services)
panic(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
utils.ServiceManager, subsystem)) // because this is not dinamic this should not happen
}
return
}