From 66940db0b0d7a18d766d1f688313c20a0027a59a Mon Sep 17 00:00:00 2001 From: gezimbll Date: Wed, 13 Mar 2024 07:23:40 -0400 Subject: [PATCH] changed concurrentEvents naming in ers --- config/config_defaults.go | 2 +- config/erscfg.go | 32 +++++++++++++++---------------- ers/ers.go | 40 +++++++++++++++++++-------------------- 3 files changed, 37 insertions(+), 37 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 6f142918d..236c8a47c 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -368,7 +368,7 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts the EventReader service: "sessions_conns": ["*internal"], // RPC Connections IDs "ees_conns": [], // connection for routing processed and invalid messages through EEs - "concurrent_events":1, // number of events to processed concurrently + "concurrent_events":1, // number of events to generate concurrently on CGRateS side "partial_cache_ttl": "1s", // the duration to cache partial records when not pairing "readers": [ { diff --git a/config/erscfg.go b/config/erscfg.go index 5b1a744dd..a958a7060 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -27,12 +27,12 @@ import ( // ERsCfg the config for ERs type ERsCfg struct { - Enabled bool - SessionSConns []string - EEsConns []string - Concurrent_Events int - Readers []*EventReaderCfg - PartialCacheTTL time.Duration + Enabled bool + SessionSConns []string + EEsConns []string + ConcurrentEvents int + Readers []*EventReaderCfg + PartialCacheTTL time.Duration } func (erS *ERsCfg) loadFromJSONCfg(jsnCfg *ERsJsonCfg, msgTemplates map[string][]*FCTemplate, sep string, dfltRdrCfg *EventReaderCfg, separator string) (err error) { @@ -44,9 +44,9 @@ func (erS *ERsCfg) loadFromJSONCfg(jsnCfg *ERsJsonCfg, msgTemplates map[string][ } if jsnCfg.Concurrent_events != nil { - erS.Concurrent_Events = *jsnCfg.Concurrent_events - if erS.Concurrent_Events < 1 { - erS.Concurrent_Events = 1 + erS.ConcurrentEvents = *jsnCfg.Concurrent_events + if erS.ConcurrentEvents < 1 { + erS.ConcurrentEvents = 1 } } @@ -117,12 +117,12 @@ func (erS *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, msgTempla // Clone returns a deep copy of ERsCfg func (erS *ERsCfg) Clone() (cln *ERsCfg) { cln = &ERsCfg{ - Enabled: erS.Enabled, - SessionSConns: slices.Clone(erS.SessionSConns), - EEsConns: slices.Clone(erS.EEsConns), - Readers: make([]*EventReaderCfg, len(erS.Readers)), - Concurrent_Events: erS.Concurrent_Events, - PartialCacheTTL: erS.PartialCacheTTL, + Enabled: erS.Enabled, + SessionSConns: slices.Clone(erS.SessionSConns), + EEsConns: slices.Clone(erS.EEsConns), + Readers: make([]*EventReaderCfg, len(erS.Readers)), + ConcurrentEvents: erS.ConcurrentEvents, + PartialCacheTTL: erS.PartialCacheTTL, } for idx, rdr := range erS.Readers { cln.Readers[idx] = rdr.Clone() @@ -134,7 +134,7 @@ func (erS *ERsCfg) Clone() (cln *ERsCfg) { func (erS *ERsCfg) AsMapInterface(separator string) (initialMP map[string]any) { initialMP = map[string]any{ utils.EnabledCfg: erS.Enabled, - utils.ConcurrentEvents: erS.Concurrent_Events, + utils.ConcurrentEvents: erS.ConcurrentEvents, utils.PartialCacheTTLCfg: "0", } if erS.PartialCacheTTL != 0 { diff --git a/ers/ers.go b/ers/ers.go index 4c4727e24..d24e4bf60 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -47,16 +47,16 @@ type erEvent struct { // NewERService instantiates the ERService func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) (ers *ERService) { ers = &ERService{ - cfg: cfg, - rdrs: make(map[string]EventReader), - rdrPaths: make(map[string]string), - stopLsn: make(map[string]chan struct{}), - rdrEvents: make(chan *erEvent), - partialEvents: make(chan *erEvent), - rdrErr: make(chan error), - concurrent_events: make(chan struct{}, cfg.ERsCfg().Concurrent_Events), - filterS: filterS, - connMgr: connMgr, + cfg: cfg, + rdrs: make(map[string]EventReader), + rdrPaths: make(map[string]string), + stopLsn: make(map[string]chan struct{}), + rdrEvents: make(chan *erEvent), + partialEvents: make(chan *erEvent), + rdrErr: make(chan error), + concurrentEvents: make(chan struct{}, cfg.ERsCfg().ConcurrentEvents), + filterS: filterS, + connMgr: connMgr, } ers.partialCache = ltcache.NewCache(ltcache.UnlimitedCaching, cfg.ERsCfg().PartialCacheTTL, false, ers.onEvicted) return @@ -65,14 +65,14 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engin // ERService is managing the EventReaders type ERService struct { sync.RWMutex - cfg *config.CGRConfig - rdrs map[string]EventReader // map[rdrID]EventReader - rdrPaths map[string]string // used for reloads in case of path changes - stopLsn map[string]chan struct{} // map[rdrID] chan struct{} - rdrEvents chan *erEvent // receive here the events from readers - partialEvents chan *erEvent // receive here the partial events from readers - rdrErr chan error // receive here errors which should stop the app - concurrent_events chan struct{} + cfg *config.CGRConfig + rdrs map[string]EventReader // map[rdrID]EventReader + rdrPaths map[string]string // used for reloads in case of path changes + stopLsn map[string]chan struct{} // map[rdrID] chan struct{} + rdrEvents chan *erEvent // receive here the events from readers + partialEvents chan *erEvent // receive here the partial events from readers + rdrErr chan error // receive here errors which should stop the app + concurrentEvents chan struct{} filterS *engine.FilterS connMgr *engine.ConnManager @@ -102,7 +102,7 @@ func (erS *ERService) ListenAndServe(stopChan, cfgRldChan chan struct{}) error { erS.closeAllRdrs() return nil case erEv := <-erS.rdrEvents: - erS.concurrent_events <- struct{}{} + erS.concurrentEvents <- struct{}{} go func() { err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg) if err != nil { @@ -115,7 +115,7 @@ func (erS *ERService) ListenAndServe(stopChan, cfgRldChan chan struct{}) error { fmt.Sprintf("<%s> exporting event: <%s> from reader: <%s> got error: <%v>", utils.ERs, utils.ToJSON(erEv.cgrEvent), erEv.rdrCfg.ID, err)) } - <-erS.concurrent_events + <-erS.concurrentEvents }() case pEv := <-erS.partialEvents: err := erS.processPartialEvent(pEv.cgrEvent, pEv.rdrCfg)