added concurrent event processing for ers

This commit is contained in:
gezimbll
2024-03-13 05:35:37 -04:00
committed by Dan Christian Bogos
parent 51c3e15f1e
commit 21d65b3810
5 changed files with 57 additions and 38 deletions

View File

@@ -368,6 +368,7 @@ const CGRATES_CFG_JSON = `
"enabled": false, // starts the EventReader service: <true|false>
"sessions_conns": ["*internal"], // RPC Connections IDs
"ees_conns": [], // connection for routing processed and invalid messages through EEs
"concurrent_events":1, // number of events to processed concurrently
"partial_cache_ttl": "1s", // the duration to cache partial records when not pairing
"readers": [
{

View File

@@ -27,11 +27,12 @@ import (
// ERsCfg the config for ERs
type ERsCfg struct {
Enabled bool
SessionSConns []string
EEsConns []string
Readers []*EventReaderCfg
PartialCacheTTL time.Duration
Enabled bool
SessionSConns []string
EEsConns []string
Concurrent_Events int
Readers []*EventReaderCfg
PartialCacheTTL time.Duration
}
func (erS *ERsCfg) loadFromJSONCfg(jsnCfg *ERsJsonCfg, msgTemplates map[string][]*FCTemplate, sep string, dfltRdrCfg *EventReaderCfg, separator string) (err error) {
@@ -41,6 +42,14 @@ func (erS *ERsCfg) loadFromJSONCfg(jsnCfg *ERsJsonCfg, msgTemplates map[string][
if jsnCfg.Enabled != nil {
erS.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Concurrent_events != nil {
erS.Concurrent_Events = *jsnCfg.Concurrent_events
if erS.Concurrent_Events < 1 {
erS.Concurrent_Events = 1
}
}
if jsnCfg.Sessions_conns != nil {
erS.SessionSConns = make([]string, 0, len(*jsnCfg.Sessions_conns))
for _, fID := range *jsnCfg.Sessions_conns {
@@ -108,11 +117,12 @@ func (erS *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, msgTempla
// Clone returns a deep copy of ERsCfg
func (erS *ERsCfg) Clone() (cln *ERsCfg) {
cln = &ERsCfg{
Enabled: erS.Enabled,
SessionSConns: slices.Clone(erS.SessionSConns),
EEsConns: slices.Clone(erS.EEsConns),
Readers: make([]*EventReaderCfg, len(erS.Readers)),
PartialCacheTTL: erS.PartialCacheTTL,
Enabled: erS.Enabled,
SessionSConns: slices.Clone(erS.SessionSConns),
EEsConns: slices.Clone(erS.EEsConns),
Readers: make([]*EventReaderCfg, len(erS.Readers)),
Concurrent_Events: erS.Concurrent_Events,
PartialCacheTTL: erS.PartialCacheTTL,
}
for idx, rdr := range erS.Readers {
cln.Readers[idx] = rdr.Clone()
@@ -124,6 +134,7 @@ func (erS *ERsCfg) Clone() (cln *ERsCfg) {
func (erS *ERsCfg) AsMapInterface(separator string) (initialMP map[string]any) {
initialMP = map[string]any{
utils.EnabledCfg: erS.Enabled,
utils.ConcurrentEvents: erS.Concurrent_Events,
utils.PartialCacheTTLCfg: "0",
}
if erS.PartialCacheTTL != 0 {

View File

@@ -206,6 +206,7 @@ type ERsJsonCfg struct {
Enabled *bool
Sessions_conns *[]string
Ees_conns *[]string
Concurrent_events *int
Readers *[]*EventReaderJsonCfg
Partial_cache_ttl *string
}

View File

@@ -47,15 +47,16 @@ type erEvent struct {
// NewERService instantiates the ERService
func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) (ers *ERService) {
ers = &ERService{
cfg: cfg,
rdrs: make(map[string]EventReader),
rdrPaths: make(map[string]string),
stopLsn: make(map[string]chan struct{}),
rdrEvents: make(chan *erEvent),
partialEvents: make(chan *erEvent),
rdrErr: make(chan error),
filterS: filterS,
connMgr: connMgr,
cfg: cfg,
rdrs: make(map[string]EventReader),
rdrPaths: make(map[string]string),
stopLsn: make(map[string]chan struct{}),
rdrEvents: make(chan *erEvent),
partialEvents: make(chan *erEvent),
rdrErr: make(chan error),
concurrent_events: make(chan struct{}, cfg.ERsCfg().Concurrent_Events),
filterS: filterS,
connMgr: connMgr,
}
ers.partialCache = ltcache.NewCache(ltcache.UnlimitedCaching, cfg.ERsCfg().PartialCacheTTL, false, ers.onEvicted)
return
@@ -64,13 +65,14 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engin
// ERService is managing the EventReaders
type ERService struct {
sync.RWMutex
cfg *config.CGRConfig
rdrs map[string]EventReader // map[rdrID]EventReader
rdrPaths map[string]string // used for reloads in case of path changes
stopLsn map[string]chan struct{} // map[rdrID] chan struct{}
rdrEvents chan *erEvent // receive here the events from readers
partialEvents chan *erEvent // receive here the partial events from readers
rdrErr chan error // receive here errors which should stop the app
cfg *config.CGRConfig
rdrs map[string]EventReader // map[rdrID]EventReader
rdrPaths map[string]string // used for reloads in case of path changes
stopLsn map[string]chan struct{} // map[rdrID] chan struct{}
rdrEvents chan *erEvent // receive here the events from readers
partialEvents chan *erEvent // receive here the partial events from readers
rdrErr chan error // receive here errors which should stop the app
concurrent_events chan struct{}
filterS *engine.FilterS
connMgr *engine.ConnManager
@@ -100,18 +102,21 @@ func (erS *ERService) ListenAndServe(stopChan, cfgRldChan chan struct{}) error {
erS.closeAllRdrs()
return nil
case erEv := <-erS.rdrEvents:
err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> reading event: <%s> from reader: <%s> got error: <%v>",
utils.ERs, utils.ToJSON(erEv.cgrEvent), erEv.rdrCfg.ID, err))
}
if err = erS.exportEvent(erEv, err != nil); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> exporting event: <%s> from reader: <%s> got error: <%v>",
utils.ERs, utils.ToJSON(erEv.cgrEvent), erEv.rdrCfg.ID, err))
}
erS.concurrent_events <- struct{}{}
go func() {
err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> reading event: <%s> from reader: <%s> got error: <%v>",
utils.ERs, utils.ToJSON(erEv.cgrEvent), erEv.rdrCfg.ID, err))
}
if err = erS.exportEvent(erEv, err != nil); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> exporting event: <%s> from reader: <%s> got error: <%v>",
utils.ERs, utils.ToJSON(erEv.cgrEvent), erEv.rdrCfg.ID, err))
}
<-erS.concurrent_events
}()
case pEv := <-erS.partialEvents:
err := erS.processPartialEvent(pEv.cgrEvent, pEv.rdrCfg)
if err != nil {

View File

@@ -2449,6 +2449,7 @@ const (
const (
IDCfg = "id"
CacheCfg = "cache"
ConcurrentEvents = "concurrent_events"
FieldSepCfg = "field_separator"
RunDelayCfg = "run_delay"
SourcePathCfg = "source_path"