diff --git a/ers/filecsv.go b/ers/filecsv.go index fc836262a..dd4b1a3c1 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -43,14 +43,20 @@ func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int, if strings.HasSuffix(srcPath, utils.Slash) { srcPath = srcPath[:len(srcPath)-1] } - return &CSVFileER{ + csvEr := &CSVFileER{ cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, rdrDir: srcPath, rdrEvents: rdrEvents, rdrError: rdrErr, - rdrExit: rdrExit}, nil + rdrExit: rdrExit, + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} + var processFile struct{} + for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { + csvEr.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop + } + return csvEr, nil } // CSVFileER implements EventReader interface for .csv files diff --git a/ers/filefwv.go b/ers/filefwv.go index 92cd38030..2aede3bed 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -42,14 +42,20 @@ func NewFWVFileERER(cfg *config.CGRConfig, cfgIdx int, if strings.HasSuffix(srcPath, utils.Slash) { srcPath = srcPath[:len(srcPath)-1] } - return &FWVFileER{ + fwvER := &FWVFileER{ cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, rdrDir: srcPath, rdrEvents: rdrEvents, rdrError: rdrErr, - rdrExit: rdrExit}, nil + rdrExit: rdrExit, + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} + var processFile struct{} + for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { + fwvER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop + } + return fwvER, nil } // XMLFileER implements EventReader interface for .xml files diff --git a/ers/filexml.go b/ers/filexml.go index fbde7a2c9..b926948bf 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -43,14 +43,20 @@ func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int, if strings.HasSuffix(srcPath, utils.Slash) { srcPath = srcPath[:len(srcPath)-1] } - return &XMLFileER{ + xmlER := &XMLFileER{ cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, rdrDir: srcPath, rdrEvents: rdrEvents, rdrError: rdrErr, - rdrExit: rdrExit}, nil + rdrExit: rdrExit, + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} + var processFile struct{} + for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { + xmlER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop + } + return xmlER, nil } // XMLFileER implements EventReader interface for .xml files diff --git a/ers/flatstore.go b/ers/flatstore.go index 95877542b..e42f8fabc 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -55,10 +55,14 @@ func NewFlatstoreER(cfg *config.CGRConfig, cfgIdx int, rdrEvents: rdrEvents, rdrError: rdrErr, rdrExit: rdrExit, + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs), + } + var processFile struct{} + for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { + flatER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop } flatER.cache = ltcache.NewCache(ltcache.UnlimitedCaching, cfg.ERsCfg().Readers[cfgIdx].PartialRecordCache, false, flatER.dumpToFile) return flatER, err - } // FlatstoreER implements EventReader interface for Flatstore CDR diff --git a/ers/partial_csv.go b/ers/partial_csv.go index 211ab0b0a..3622aa0e3 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -55,7 +55,8 @@ func NewPartialCSVFileER(cfg *config.CGRConfig, cfgIdx int, rdrDir: srcPath, rdrEvents: rdrEvents, rdrError: rdrErr, - rdrExit: rdrExit} + rdrExit: rdrExit, + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} var function func(itmID string, value interface{}) if cfg.ERsCfg().Readers[cfgIdx].PartialCacheExpiryAction == utils.MetaDumpToFile { @@ -63,6 +64,10 @@ func NewPartialCSVFileER(cfg *config.CGRConfig, cfgIdx int, } else { function = pCSVFileER.postCDR } + var processFile struct{} + for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { + pCSVFileER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop + } pCSVFileER.cache = ltcache.NewCache(ltcache.UnlimitedCaching, cfg.ERsCfg().Readers[cfgIdx].PartialRecordCache, false, function) return pCSVFileER, nil } diff --git a/ers/readers_test.go b/ers/readers_test.go index f430226a0..5e7416d6a 100644 --- a/ers/readers_test.go +++ b/ers/readers_test.go @@ -51,14 +51,25 @@ func TestNewCsvReader(t *testing.T) { if len(cfg.ERsCfg().Readers) != 2 { t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers)) } - expected, err := NewCSVFileER(cfg, 1, nil, nil, fltr, nil) - if err != nil { - t.Errorf("Expecting: , received: <%+v>", err) - } + exp := &CSVFileER{ + cgrCfg: cfg, + cfgIdx: 1, + fltrS: fltr, + rdrDir: cfg.ERsCfg().Readers[1].SourcePath, + rdrEvents: nil, + rdrError: nil, + rdrExit: nil, + conReqs: nil} + var expected EventReader = exp if rcv, err := NewEventReader(cfg, 1, nil, nil, fltr, nil); err != nil { t.Errorf("Expecting: , received: <%+v>", err) - } else if !reflect.DeepEqual(expected, rcv) { - t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv) + } else { + // because we use function make to init the channel when we create the EventReader reflect.DeepEqual + // says it doesn't match + rcv.(*CSVFileER).conReqs = nil + if !reflect.DeepEqual(expected, rcv) { + t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv) + } } }