mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
changed concurrentEvents naming in ers
This commit is contained in:
committed by
Dan Christian Bogos
parent
21d65b3810
commit
66940db0b0
40
ers/ers.go
40
ers/ers.go
@@ -47,16 +47,16 @@ type erEvent struct {
|
||||
// NewERService instantiates the ERService
|
||||
func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) (ers *ERService) {
|
||||
ers = &ERService{
|
||||
cfg: cfg,
|
||||
rdrs: make(map[string]EventReader),
|
||||
rdrPaths: make(map[string]string),
|
||||
stopLsn: make(map[string]chan struct{}),
|
||||
rdrEvents: make(chan *erEvent),
|
||||
partialEvents: make(chan *erEvent),
|
||||
rdrErr: make(chan error),
|
||||
concurrent_events: make(chan struct{}, cfg.ERsCfg().Concurrent_Events),
|
||||
filterS: filterS,
|
||||
connMgr: connMgr,
|
||||
cfg: cfg,
|
||||
rdrs: make(map[string]EventReader),
|
||||
rdrPaths: make(map[string]string),
|
||||
stopLsn: make(map[string]chan struct{}),
|
||||
rdrEvents: make(chan *erEvent),
|
||||
partialEvents: make(chan *erEvent),
|
||||
rdrErr: make(chan error),
|
||||
concurrentEvents: make(chan struct{}, cfg.ERsCfg().ConcurrentEvents),
|
||||
filterS: filterS,
|
||||
connMgr: connMgr,
|
||||
}
|
||||
ers.partialCache = ltcache.NewCache(ltcache.UnlimitedCaching, cfg.ERsCfg().PartialCacheTTL, false, ers.onEvicted)
|
||||
return
|
||||
@@ -65,14 +65,14 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engin
|
||||
// ERService is managing the EventReaders
|
||||
type ERService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
rdrs map[string]EventReader // map[rdrID]EventReader
|
||||
rdrPaths map[string]string // used for reloads in case of path changes
|
||||
stopLsn map[string]chan struct{} // map[rdrID] chan struct{}
|
||||
rdrEvents chan *erEvent // receive here the events from readers
|
||||
partialEvents chan *erEvent // receive here the partial events from readers
|
||||
rdrErr chan error // receive here errors which should stop the app
|
||||
concurrent_events chan struct{}
|
||||
cfg *config.CGRConfig
|
||||
rdrs map[string]EventReader // map[rdrID]EventReader
|
||||
rdrPaths map[string]string // used for reloads in case of path changes
|
||||
stopLsn map[string]chan struct{} // map[rdrID] chan struct{}
|
||||
rdrEvents chan *erEvent // receive here the events from readers
|
||||
partialEvents chan *erEvent // receive here the partial events from readers
|
||||
rdrErr chan error // receive here errors which should stop the app
|
||||
concurrentEvents chan struct{}
|
||||
|
||||
filterS *engine.FilterS
|
||||
connMgr *engine.ConnManager
|
||||
@@ -102,7 +102,7 @@ func (erS *ERService) ListenAndServe(stopChan, cfgRldChan chan struct{}) error {
|
||||
erS.closeAllRdrs()
|
||||
return nil
|
||||
case erEv := <-erS.rdrEvents:
|
||||
erS.concurrent_events <- struct{}{}
|
||||
erS.concurrentEvents <- struct{}{}
|
||||
go func() {
|
||||
err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg)
|
||||
if err != nil {
|
||||
@@ -115,7 +115,7 @@ func (erS *ERService) ListenAndServe(stopChan, cfgRldChan chan struct{}) error {
|
||||
fmt.Sprintf("<%s> exporting event: <%s> from reader: <%s> got error: <%v>",
|
||||
utils.ERs, utils.ToJSON(erEv.cgrEvent), erEv.rdrCfg.ID, err))
|
||||
}
|
||||
<-erS.concurrent_events
|
||||
<-erS.concurrentEvents
|
||||
}()
|
||||
case pEv := <-erS.partialEvents:
|
||||
err := erS.processPartialEvent(pEv.cgrEvent, pEv.rdrCfg)
|
||||
|
||||
Reference in New Issue
Block a user