diff --git a/config/config_defaults.go b/config/config_defaults.go index e1bf1cf17..6f142918d 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -368,6 +368,7 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts the EventReader service: "sessions_conns": ["*internal"], // RPC Connections IDs "ees_conns": [], // connection for routing processed and invalid messages through EEs + "concurrent_events":1, // number of events to processed concurrently "partial_cache_ttl": "1s", // the duration to cache partial records when not pairing "readers": [ { diff --git a/config/erscfg.go b/config/erscfg.go index 94d04f05c..5b1a744dd 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -27,11 +27,12 @@ import ( // ERsCfg the config for ERs type ERsCfg struct { - Enabled bool - SessionSConns []string - EEsConns []string - Readers []*EventReaderCfg - PartialCacheTTL time.Duration + Enabled bool + SessionSConns []string + EEsConns []string + Concurrent_Events int + Readers []*EventReaderCfg + PartialCacheTTL time.Duration } func (erS *ERsCfg) loadFromJSONCfg(jsnCfg *ERsJsonCfg, msgTemplates map[string][]*FCTemplate, sep string, dfltRdrCfg *EventReaderCfg, separator string) (err error) { @@ -41,6 +42,14 @@ func (erS *ERsCfg) loadFromJSONCfg(jsnCfg *ERsJsonCfg, msgTemplates map[string][ if jsnCfg.Enabled != nil { erS.Enabled = *jsnCfg.Enabled } + + if jsnCfg.Concurrent_events != nil { + erS.Concurrent_Events = *jsnCfg.Concurrent_events + if erS.Concurrent_Events < 1 { + erS.Concurrent_Events = 1 + } + } + if jsnCfg.Sessions_conns != nil { erS.SessionSConns = make([]string, 0, len(*jsnCfg.Sessions_conns)) for _, fID := range *jsnCfg.Sessions_conns { @@ -108,11 +117,12 @@ func (erS *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, msgTempla // Clone returns a deep copy of ERsCfg func (erS *ERsCfg) Clone() (cln *ERsCfg) { cln = &ERsCfg{ - Enabled: erS.Enabled, - SessionSConns: slices.Clone(erS.SessionSConns), - EEsConns: slices.Clone(erS.EEsConns), - Readers: make([]*EventReaderCfg, len(erS.Readers)), - PartialCacheTTL: erS.PartialCacheTTL, + Enabled: erS.Enabled, + SessionSConns: slices.Clone(erS.SessionSConns), + EEsConns: slices.Clone(erS.EEsConns), + Readers: make([]*EventReaderCfg, len(erS.Readers)), + Concurrent_Events: erS.Concurrent_Events, + PartialCacheTTL: erS.PartialCacheTTL, } for idx, rdr := range erS.Readers { cln.Readers[idx] = rdr.Clone() @@ -124,6 +134,7 @@ func (erS *ERsCfg) Clone() (cln *ERsCfg) { func (erS *ERsCfg) AsMapInterface(separator string) (initialMP map[string]any) { initialMP = map[string]any{ utils.EnabledCfg: erS.Enabled, + utils.ConcurrentEvents: erS.Concurrent_Events, utils.PartialCacheTTLCfg: "0", } if erS.PartialCacheTTL != 0 { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 732636bb8..5ccbf0adb 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -206,6 +206,7 @@ type ERsJsonCfg struct { Enabled *bool Sessions_conns *[]string Ees_conns *[]string + Concurrent_events *int Readers *[]*EventReaderJsonCfg Partial_cache_ttl *string } diff --git a/ers/ers.go b/ers/ers.go index e8b0a0411..4c4727e24 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -47,15 +47,16 @@ type erEvent struct { // NewERService instantiates the ERService func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) (ers *ERService) { ers = &ERService{ - cfg: cfg, - rdrs: make(map[string]EventReader), - rdrPaths: make(map[string]string), - stopLsn: make(map[string]chan struct{}), - rdrEvents: make(chan *erEvent), - partialEvents: make(chan *erEvent), - rdrErr: make(chan error), - filterS: filterS, - connMgr: connMgr, + cfg: cfg, + rdrs: make(map[string]EventReader), + rdrPaths: make(map[string]string), + stopLsn: make(map[string]chan struct{}), + rdrEvents: make(chan *erEvent), + partialEvents: make(chan *erEvent), + rdrErr: make(chan error), + concurrent_events: make(chan struct{}, cfg.ERsCfg().Concurrent_Events), + filterS: filterS, + connMgr: connMgr, } ers.partialCache = ltcache.NewCache(ltcache.UnlimitedCaching, cfg.ERsCfg().PartialCacheTTL, false, ers.onEvicted) return @@ -64,13 +65,14 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engin // ERService is managing the EventReaders type ERService struct { sync.RWMutex - cfg *config.CGRConfig - rdrs map[string]EventReader // map[rdrID]EventReader - rdrPaths map[string]string // used for reloads in case of path changes - stopLsn map[string]chan struct{} // map[rdrID] chan struct{} - rdrEvents chan *erEvent // receive here the events from readers - partialEvents chan *erEvent // receive here the partial events from readers - rdrErr chan error // receive here errors which should stop the app + cfg *config.CGRConfig + rdrs map[string]EventReader // map[rdrID]EventReader + rdrPaths map[string]string // used for reloads in case of path changes + stopLsn map[string]chan struct{} // map[rdrID] chan struct{} + rdrEvents chan *erEvent // receive here the events from readers + partialEvents chan *erEvent // receive here the partial events from readers + rdrErr chan error // receive here errors which should stop the app + concurrent_events chan struct{} filterS *engine.FilterS connMgr *engine.ConnManager @@ -100,18 +102,21 @@ func (erS *ERService) ListenAndServe(stopChan, cfgRldChan chan struct{}) error { erS.closeAllRdrs() return nil case erEv := <-erS.rdrEvents: - err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> reading event: <%s> from reader: <%s> got error: <%v>", - utils.ERs, utils.ToJSON(erEv.cgrEvent), erEv.rdrCfg.ID, err)) - } - if err = erS.exportEvent(erEv, err != nil); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> exporting event: <%s> from reader: <%s> got error: <%v>", - utils.ERs, utils.ToJSON(erEv.cgrEvent), erEv.rdrCfg.ID, err)) - } - + erS.concurrent_events <- struct{}{} + go func() { + err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading event: <%s> from reader: <%s> got error: <%v>", + utils.ERs, utils.ToJSON(erEv.cgrEvent), erEv.rdrCfg.ID, err)) + } + if err = erS.exportEvent(erEv, err != nil); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> exporting event: <%s> from reader: <%s> got error: <%v>", + utils.ERs, utils.ToJSON(erEv.cgrEvent), erEv.rdrCfg.ID, err)) + } + <-erS.concurrent_events + }() case pEv := <-erS.partialEvents: err := erS.processPartialEvent(pEv.cgrEvent, pEv.rdrCfg) if err != nil { diff --git a/utils/consts.go b/utils/consts.go index 28ee4c9d4..5432f3f98 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2449,6 +2449,7 @@ const ( const ( IDCfg = "id" CacheCfg = "cache" + ConcurrentEvents = "concurrent_events" FieldSepCfg = "field_separator" RunDelayCfg = "run_delay" SourcePathCfg = "source_path"