From 85864ccaa33afa7fbd1ab42c14c62322a5dea57c Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 2 Apr 2024 10:04:07 +0300 Subject: [PATCH] Extract common dir processing logic to a func It will be reused for all file readers. Rename rdrDir field to dir (redundant prefix). --- ers/ers_it_test.go | 2 +- ers/filecsv.go | 22 +++++----------------- ers/filecsv_it_test.go | 16 ++++++++-------- ers/filefwv.go | 22 +++++----------------- ers/filefwv_it_test.go | 24 ++++++++++++------------ ers/filejson.go | 22 +++++----------------- ers/filejson_it_test.go | 14 +++++++------- ers/filexml.go | 22 +++++----------------- ers/filexml_it_test.go | 28 ++++++++++++++-------------- ers/filexml_test.go | 2 +- ers/libers.go | 27 +++++++++++++++++++++++++++ ers/readers_test.go | 2 +- 12 files changed, 91 insertions(+), 112 deletions(-) diff --git a/ers/ers_it_test.go b/ers/ers_it_test.go index 1c50300ce..36a9c0658 100644 --- a/ers/ers_it_test.go +++ b/ers/ers_it_test.go @@ -354,7 +354,7 @@ func TestERsListenAndServeCfgRldChan5(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: nil, - rdrDir: "", + dir: "", rdrEvents: nil, rdrError: nil, rdrExit: nil, diff --git a/ers/filecsv.go b/ers/filecsv.go index 0987f264b..33feabc42 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -44,7 +44,7 @@ func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, - rdrDir: srcPath, + dir: srcPath, rdrEvents: rdrEvents, partialEvents: partialEvents, rdrError: rdrErr, @@ -62,7 +62,7 @@ type CSVFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - rdrDir string + dir string rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error @@ -84,23 +84,11 @@ func (rdr *CSVFileER) serveDefault() { tm.Stop() utils.Logger.Info( fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.rdrDir)) + utils.ERs, rdr.dir)) return case <-tm.C: } - filesInDir, _ := os.ReadDir(rdr.rdrDir) - for _, file := range filesInDir { - if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader - continue // used in order to filter the files from directory - } - go func(fileName string) { - if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing file %s, error: %s", - utils.ERs, fileName, err.Error())) - } - }(file.Name()) - } + processReaderDir(rdr.dir, utils.CSVSuffix, rdr.processFile) tm.Reset(rdr.Config().RunDelay) } } @@ -110,7 +98,7 @@ func (rdr *CSVFileER) Serve() (err error) { case time.Duration(0): // 0 disables the automatic read, maybe done per API return case time.Duration(-1): - return utils.WatchDir(rdr.rdrDir, rdr.processFile, + return utils.WatchDir(rdr.dir, rdr.processFile, utils.ERs, rdr.rdrExit) default: go rdr.serveDefault() diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index ca5f75515..c73d847bd 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -395,7 +395,7 @@ func TestFileCSVProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ers/out/", + dir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -532,7 +532,7 @@ func TestFileCSVProcessEventError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ers/out/", + dir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -565,7 +565,7 @@ func TestFileCSVProcessEventError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ers/out/", + dir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -609,7 +609,7 @@ func TestFileCSVProcessEventError3(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ers/out/", + dir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -642,7 +642,7 @@ func TestFileCSVDirErr(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ers/out/", + dir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -662,7 +662,7 @@ func TestFileCSV(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ers/out/", + dir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -682,7 +682,7 @@ func TestFileCSVServeDefault(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ers/out/", + dir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -719,7 +719,7 @@ func TestFileCSVExit(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ers/out/", + dir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), diff --git a/ers/filefwv.go b/ers/filefwv.go index 95377e8aa..2bb598797 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -45,7 +45,7 @@ func NewFWVFileER(cfg *config.CGRConfig, cfgIdx int, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, - rdrDir: srcPath, + dir: srcPath, rdrEvents: rdrEvents, partialEvents: partialEvents, rdrError: rdrErr, @@ -64,7 +64,7 @@ type FWVFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - rdrDir string + dir string rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error @@ -92,23 +92,11 @@ func (rdr *FWVFileER) serveDefault() { tm.Stop() utils.Logger.Info( fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.rdrDir)) + utils.ERs, rdr.dir)) return case <-tm.C: } - filesInDir, _ := os.ReadDir(rdr.rdrDir) - for _, file := range filesInDir { - if !strings.HasSuffix(file.Name(), utils.FWVSuffix) { // hardcoded file extension for xml event reader - continue // used in order to filter the files from directory - } - go func(fileName string) { - if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing file %s, error: %s", - utils.ERs, fileName, err.Error())) - } - }(file.Name()) - } + processReaderDir(rdr.dir, utils.FWVSuffix, rdr.processFile) tm.Reset(rdr.Config().RunDelay) } } @@ -118,7 +106,7 @@ func (rdr *FWVFileER) Serve() (err error) { case time.Duration(0): // 0 disables the automatic read, maybe done per API return case time.Duration(-1): - return utils.WatchDir(rdr.rdrDir, rdr.processFile, + return utils.WatchDir(rdr.dir, rdr.processFile, utils.ERs, rdr.rdrExit) default: go rdr.serveDefault() diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index df431a04b..a5774a158 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -211,7 +211,7 @@ func TestNewFWVFileER(t *testing.T) { expected := &FWVFileER{ cgrCfg: cfg, cfgIdx: cfgIdx, - rdrDir: "", + dir: "", } cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs = 1 result, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil, nil) @@ -283,7 +283,7 @@ func TestFileFWVProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/fwvErs/out", + dir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -348,7 +348,7 @@ func TestFileFWV(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/fwvErs/out", + dir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -383,7 +383,7 @@ func TestFileFWVServeDefault(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/fwvErs/out", + dir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -419,7 +419,7 @@ func TestFileFWVExit(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/fwvErs/out", + dir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -443,7 +443,7 @@ func TestFileFWVProcessTrailer(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/fwvErs/out", + dir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -503,7 +503,7 @@ func TestFileFWVProcessTrailerError1(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/fwvErs/out", + dir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -539,7 +539,7 @@ func TestFileFWVProcessTrailerError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/fwvErs/out", + dir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -586,7 +586,7 @@ func TestFileFWVProcessTrailerError3(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/fwvErs/out", + dir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -617,7 +617,7 @@ func TestFileFWVCreateHeaderMap(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/fwvErs/out", + dir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -666,7 +666,7 @@ func TestFileFWVCreateHeaderMapError1(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/fwvErs/out", + dir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -692,7 +692,7 @@ func TestFileFWVCreateHeaderMapError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/fwvErs/out", + dir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), diff --git a/ers/filejson.go b/ers/filejson.go index bd1c34e58..ff174122e 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -46,7 +46,7 @@ func NewJSONFileER(cfg *config.CGRConfig, cfgIdx int, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, - rdrDir: srcPath, + dir: srcPath, rdrEvents: rdrEvents, partialEvents: partialEvents, rdrError: rdrErr, @@ -65,7 +65,7 @@ type JSONFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - rdrDir string + dir string rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error @@ -86,23 +86,11 @@ func (rdr *JSONFileER) serveDefault() { tm.Stop() utils.Logger.Info( fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.rdrDir)) + utils.ERs, rdr.dir)) return case <-tm.C: } - filesInDir, _ := os.ReadDir(rdr.rdrDir) - for _, file := range filesInDir { - if !strings.HasSuffix(file.Name(), utils.JSNSuffix) { // hardcoded file extension for json event reader - continue // used in order to filter the files from directory - } - go func(fileName string) { - if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing file %s, error: %s", - utils.ERs, fileName, err.Error())) - } - }(file.Name()) - } + processReaderDir(rdr.dir, utils.JSNSuffix, rdr.processFile) tm.Reset(rdr.Config().RunDelay) } } @@ -112,7 +100,7 @@ func (rdr *JSONFileER) Serve() (err error) { case time.Duration(0): // 0 disables the automatic read, maybe done per API return case time.Duration(-1): - return utils.WatchDir(rdr.rdrDir, rdr.processFile, + return utils.WatchDir(rdr.dir, rdr.processFile, utils.ERs, rdr.rdrExit) default: go rdr.serveDefault() diff --git a/ers/filejson_it_test.go b/ers/filejson_it_test.go index 53987b798..244ffafb2 100644 --- a/ers/filejson_it_test.go +++ b/ers/filejson_it_test.go @@ -340,7 +340,7 @@ func TestFileJSONProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ErsJSON/out/", + dir: "/tmp/ErsJSON/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -393,7 +393,7 @@ func TestFileJSONProcessEventReadError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ErsJSON/out/", + dir: "/tmp/ErsJSON/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -441,7 +441,7 @@ func TestFileJSONProcessEventError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ErsJSON/out/", + dir: "/tmp/ErsJSON/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -500,7 +500,7 @@ func TestFileJSONProcessEventError3(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ErsJSON/out/", + dir: "/tmp/ErsJSON/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -533,7 +533,7 @@ func TestFileJSON(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ErsJSON/out/", + dir: "/tmp/ErsJSON/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -552,7 +552,7 @@ func TestFileJSONServeDefault(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ErsJSON/out/", + dir: "/tmp/ErsJSON/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -588,7 +588,7 @@ func TestFileJSONExit(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/ErsJSON/out/", + dir: "/tmp/ErsJSON/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), diff --git a/ers/filexml.go b/ers/filexml.go index d12d5ed6c..4fa9900e6 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -46,7 +46,7 @@ func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, - rdrDir: srcPath, + dir: srcPath, rdrEvents: rdrEvents, partialEvents: partialEvents, rdrError: rdrErr, @@ -65,7 +65,7 @@ type XMLFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - rdrDir string + dir string rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error @@ -82,7 +82,7 @@ func (rdr *XMLFileER) Serve() (err error) { case time.Duration(0): // 0 disables the automatic read, maybe done per API return case time.Duration(-1): - return utils.WatchDir(rdr.rdrDir, rdr.processFile, + return utils.WatchDir(rdr.dir, rdr.processFile, utils.ERs, rdr.rdrExit) default: go func() { @@ -94,23 +94,11 @@ func (rdr *XMLFileER) Serve() (err error) { tm.Stop() utils.Logger.Info( fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.rdrDir)) + utils.ERs, rdr.dir)) return case <-tm.C: } - filesInDir, _ := os.ReadDir(rdr.rdrDir) - for _, file := range filesInDir { - if !strings.HasSuffix(file.Name(), utils.XMLSuffix) { // hardcoded file extension for xml event reader - continue // used in order to filter the files from directory - } - go func(fileName string) { - if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing file %s, error: %s", - utils.ERs, fileName, err.Error())) - } - }(file.Name()) - } + processReaderDir(rdr.dir, utils.XMLSuffix, rdr.processFile) tm.Reset(rdr.Config().RunDelay) } }() diff --git a/ers/filexml_it_test.go b/ers/filexml_it_test.go index b9eb37719..71ad0ed0b 100644 --- a/ers/filexml_it_test.go +++ b/ers/filexml_it_test.go @@ -349,7 +349,7 @@ func TestNewXMLFileER(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/xmlErs/out", + dir: "/tmp/xmlErs/out", rdrEvents: nil, rdrError: nil, rdrExit: nil, @@ -402,7 +402,7 @@ func TestFileXMLProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/xmlErs/out", + dir: "/tmp/xmlErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -458,7 +458,7 @@ func TestFileXMLProcessEventError1(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/xmlErs/out/", + dir: "/tmp/xmlErs/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -504,7 +504,7 @@ func TestFileXMLProcessEVentError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/xmlErs/out/", + dir: "/tmp/xmlErs/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -554,7 +554,7 @@ func TestFileXMLProcessEventParseError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/xmlErs/out", + dir: "/tmp/xmlErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -579,14 +579,14 @@ func TestFileXML(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/xmlErs/out", + dir: "/tmp/xmlErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } eR.conReqs <- struct{}{} - err := os.MkdirAll(eR.rdrDir, 0777) + err := os.MkdirAll(eR.dir, 0777) if err != nil { t.Error(err) } @@ -604,7 +604,7 @@ func TestFileXML(t *testing.T) { eR.Config().Fields[0].ComputePath() for i := 1; i < 4; i++ { - if _, err := os.Create(path.Join(eR.rdrDir, fmt.Sprintf("file%d.xml", i))); err != nil { + if _, err := os.Create(path.Join(eR.dir, fmt.Sprintf("file%d.xml", i))); err != nil { t.Error(err) } } @@ -618,7 +618,7 @@ func TestFileXML(t *testing.T) { if err := eR.Serve(); err != nil { t.Error(err) } - os.Create(path.Join(eR.rdrDir, "file1.txt")) + os.Create(path.Join(eR.dir, "file1.txt")) eR.Config().RunDelay = 1 * time.Millisecond if err := eR.Serve(); err != nil { t.Error(err) @@ -632,14 +632,14 @@ func TestFileXMLError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/xmlErsError/out", + dir: "/tmp/xmlErsError/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } eR.conReqs <- struct{}{} - err := os.MkdirAll(eR.rdrDir, 0777) + err := os.MkdirAll(eR.dir, 0777) if err != nil { t.Error(err) } @@ -657,11 +657,11 @@ func TestFileXMLError(t *testing.T) { eR.Config().Fields[0].ComputePath() for i := 1; i < 4; i++ { - if _, err := os.Create(path.Join(eR.rdrDir, fmt.Sprintf("file%d.xml", i))); err != nil { + if _, err := os.Create(path.Join(eR.dir, fmt.Sprintf("file%d.xml", i))); err != nil { t.Error(err) } } - os.Create(path.Join(eR.rdrDir, "file1.txt")) + os.Create(path.Join(eR.dir, "file1.txt")) eR.Config().RunDelay = 1 * time.Millisecond if err := eR.Serve(); err != nil { t.Error(err) @@ -675,7 +675,7 @@ func TestFileXMLExit(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - rdrDir: "/tmp/xmlErs/out", + dir: "/tmp/xmlErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), diff --git a/ers/filexml_test.go b/ers/filexml_test.go index 3368604e0..42a883b37 100644 --- a/ers/filexml_test.go +++ b/ers/filexml_test.go @@ -32,7 +32,7 @@ func TestERSNewXMLFileER(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: nil, - rdrDir: "/var/spool/cgrates/ers/in", + dir: "/var/spool/cgrates/ers/in", rdrEvents: nil, rdrError: nil, rdrExit: nil, diff --git a/ers/libers.go b/ers/libers.go index 7b32b0517..ff9c7d79e 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -20,7 +20,9 @@ package ers import ( "fmt" + "os" "sort" + "strings" "time" "github.com/cgrates/cgrates/agents" @@ -103,3 +105,28 @@ func mergePartialEvents(cgrEvs []*utils.CGREvent, cfg *config.EventReaderCfg, fl } return } + +// processReaderDir finds all entries within dirPath, filters only the ones whose name +// ends with the specified suffix and executes function f on them. +func processReaderDir(dirPath, suffix string, f func(dir, fn string) error) { + filesInDir, err := os.ReadDir(dirPath) + if err != nil { + utils.Logger.Notice(fmt.Sprintf( + "<%s> encountered error while reading entries from directory %s: %v", + utils.ERs, dirPath, err)) + // There is no need to return, as os.ReadDir can still return entries + // even if an error was encountered. Logging it should suffice + } + for _, file := range filesInDir { + if !strings.HasSuffix(file.Name(), suffix) { // hardcoded file extension for csv event reader + continue // used in order to filter the files from directory + } + go func(fileName string) { + if err := f(dirPath, fileName); err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> processing file %s, error: %v", + utils.ERs, fileName, err)) + } + }(file.Name()) + } +} diff --git a/ers/readers_test.go b/ers/readers_test.go index 1182ac540..0d68a2aaa 100644 --- a/ers/readers_test.go +++ b/ers/readers_test.go @@ -55,7 +55,7 @@ func TestNewCsvReader(t *testing.T) { cgrCfg: cfg, cfgIdx: 1, fltrS: fltr, - rdrDir: cfg.ERsCfg().Readers[1].SourcePath, + dir: cfg.ERsCfg().Readers[1].SourcePath, rdrEvents: nil, rdrError: nil, rdrExit: nil,