Extract common dir processing logic to a func

It will be reused for all file readers.
Rename rdrDir field to dir (redundant prefix).
This commit is contained in:
ionutboangiu
2024-04-02 10:04:07 +03:00
committed by Dan Christian Bogos
parent 9c004b069d
commit 85864ccaa3
12 changed files with 91 additions and 112 deletions

View File

@@ -354,7 +354,7 @@ func TestERsListenAndServeCfgRldChan5(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: nil,
rdrDir: "",
dir: "",
rdrEvents: nil,
rdrError: nil,
rdrExit: nil,

View File

@@ -44,7 +44,7 @@ func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int,
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
rdrDir: srcPath,
dir: srcPath,
rdrEvents: rdrEvents,
partialEvents: partialEvents,
rdrError: rdrErr,
@@ -62,7 +62,7 @@ type CSVFileER struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
rdrDir string
dir string
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
rdrError chan error
@@ -84,23 +84,11 @@ func (rdr *CSVFileER) serveDefault() {
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
utils.ERs, rdr.dir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
processReaderDir(rdr.dir, utils.CSVSuffix, rdr.processFile)
tm.Reset(rdr.Config().RunDelay)
}
}
@@ -110,7 +98,7 @@ func (rdr *CSVFileER) Serve() (err error) {
case time.Duration(0): // 0 disables the automatic read, maybe done per API
return
case time.Duration(-1):
return utils.WatchDir(rdr.rdrDir, rdr.processFile,
return utils.WatchDir(rdr.dir, rdr.processFile,
utils.ERs, rdr.rdrExit)
default:
go rdr.serveDefault()

View File

@@ -395,7 +395,7 @@ func TestFileCSVProcessEvent(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ers/out/",
dir: "/tmp/ers/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -532,7 +532,7 @@ func TestFileCSVProcessEventError(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ers/out/",
dir: "/tmp/ers/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -565,7 +565,7 @@ func TestFileCSVProcessEventError2(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ers/out/",
dir: "/tmp/ers/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -609,7 +609,7 @@ func TestFileCSVProcessEventError3(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ers/out/",
dir: "/tmp/ers/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -642,7 +642,7 @@ func TestFileCSVDirErr(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ers/out/",
dir: "/tmp/ers/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -662,7 +662,7 @@ func TestFileCSV(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ers/out/",
dir: "/tmp/ers/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -682,7 +682,7 @@ func TestFileCSVServeDefault(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ers/out/",
dir: "/tmp/ers/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -719,7 +719,7 @@ func TestFileCSVExit(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ers/out/",
dir: "/tmp/ers/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),

View File

@@ -45,7 +45,7 @@ func NewFWVFileER(cfg *config.CGRConfig, cfgIdx int,
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
rdrDir: srcPath,
dir: srcPath,
rdrEvents: rdrEvents,
partialEvents: partialEvents,
rdrError: rdrErr,
@@ -64,7 +64,7 @@ type FWVFileER struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
rdrDir string
dir string
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
rdrError chan error
@@ -92,23 +92,11 @@ func (rdr *FWVFileER) serveDefault() {
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
utils.ERs, rdr.dir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.FWVSuffix) { // hardcoded file extension for xml event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
processReaderDir(rdr.dir, utils.FWVSuffix, rdr.processFile)
tm.Reset(rdr.Config().RunDelay)
}
}
@@ -118,7 +106,7 @@ func (rdr *FWVFileER) Serve() (err error) {
case time.Duration(0): // 0 disables the automatic read, maybe done per API
return
case time.Duration(-1):
return utils.WatchDir(rdr.rdrDir, rdr.processFile,
return utils.WatchDir(rdr.dir, rdr.processFile,
utils.ERs, rdr.rdrExit)
default:
go rdr.serveDefault()

View File

@@ -211,7 +211,7 @@ func TestNewFWVFileER(t *testing.T) {
expected := &FWVFileER{
cgrCfg: cfg,
cfgIdx: cfgIdx,
rdrDir: "",
dir: "",
}
cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs = 1
result, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil, nil)
@@ -283,7 +283,7 @@ func TestFileFWVProcessEvent(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
dir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -348,7 +348,7 @@ func TestFileFWV(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
dir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -383,7 +383,7 @@ func TestFileFWVServeDefault(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
dir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -419,7 +419,7 @@ func TestFileFWVExit(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
dir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -443,7 +443,7 @@ func TestFileFWVProcessTrailer(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
dir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -503,7 +503,7 @@ func TestFileFWVProcessTrailerError1(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
dir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -539,7 +539,7 @@ func TestFileFWVProcessTrailerError2(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
dir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -586,7 +586,7 @@ func TestFileFWVProcessTrailerError3(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
dir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -617,7 +617,7 @@ func TestFileFWVCreateHeaderMap(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
dir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -666,7 +666,7 @@ func TestFileFWVCreateHeaderMapError1(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
dir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -692,7 +692,7 @@ func TestFileFWVCreateHeaderMapError2(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/fwvErs/out",
dir: "/tmp/fwvErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),

View File

@@ -46,7 +46,7 @@ func NewJSONFileER(cfg *config.CGRConfig, cfgIdx int,
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
rdrDir: srcPath,
dir: srcPath,
rdrEvents: rdrEvents,
partialEvents: partialEvents,
rdrError: rdrErr,
@@ -65,7 +65,7 @@ type JSONFileER struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
rdrDir string
dir string
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
rdrError chan error
@@ -86,23 +86,11 @@ func (rdr *JSONFileER) serveDefault() {
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
utils.ERs, rdr.dir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.JSNSuffix) { // hardcoded file extension for json event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
processReaderDir(rdr.dir, utils.JSNSuffix, rdr.processFile)
tm.Reset(rdr.Config().RunDelay)
}
}
@@ -112,7 +100,7 @@ func (rdr *JSONFileER) Serve() (err error) {
case time.Duration(0): // 0 disables the automatic read, maybe done per API
return
case time.Duration(-1):
return utils.WatchDir(rdr.rdrDir, rdr.processFile,
return utils.WatchDir(rdr.dir, rdr.processFile,
utils.ERs, rdr.rdrExit)
default:
go rdr.serveDefault()

View File

@@ -340,7 +340,7 @@ func TestFileJSONProcessEvent(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
dir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -393,7 +393,7 @@ func TestFileJSONProcessEventReadError(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
dir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -441,7 +441,7 @@ func TestFileJSONProcessEventError2(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
dir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -500,7 +500,7 @@ func TestFileJSONProcessEventError3(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
dir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -533,7 +533,7 @@ func TestFileJSON(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
dir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -552,7 +552,7 @@ func TestFileJSONServeDefault(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
dir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -588,7 +588,7 @@ func TestFileJSONExit(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/ErsJSON/out/",
dir: "/tmp/ErsJSON/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),

View File

@@ -46,7 +46,7 @@ func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int,
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
rdrDir: srcPath,
dir: srcPath,
rdrEvents: rdrEvents,
partialEvents: partialEvents,
rdrError: rdrErr,
@@ -65,7 +65,7 @@ type XMLFileER struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
rdrDir string
dir string
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
rdrError chan error
@@ -82,7 +82,7 @@ func (rdr *XMLFileER) Serve() (err error) {
case time.Duration(0): // 0 disables the automatic read, maybe done per API
return
case time.Duration(-1):
return utils.WatchDir(rdr.rdrDir, rdr.processFile,
return utils.WatchDir(rdr.dir, rdr.processFile,
utils.ERs, rdr.rdrExit)
default:
go func() {
@@ -94,23 +94,11 @@ func (rdr *XMLFileER) Serve() (err error) {
tm.Stop()
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring path <%s>",
utils.ERs, rdr.rdrDir))
utils.ERs, rdr.dir))
return
case <-tm.C:
}
filesInDir, _ := os.ReadDir(rdr.rdrDir)
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), utils.XMLSuffix) { // hardcoded file extension for xml event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing file %s, error: %s",
utils.ERs, fileName, err.Error()))
}
}(file.Name())
}
processReaderDir(rdr.dir, utils.XMLSuffix, rdr.processFile)
tm.Reset(rdr.Config().RunDelay)
}
}()

View File

@@ -349,7 +349,7 @@ func TestNewXMLFileER(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/xmlErs/out",
dir: "/tmp/xmlErs/out",
rdrEvents: nil,
rdrError: nil,
rdrExit: nil,
@@ -402,7 +402,7 @@ func TestFileXMLProcessEvent(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/xmlErs/out",
dir: "/tmp/xmlErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -458,7 +458,7 @@ func TestFileXMLProcessEventError1(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/xmlErs/out/",
dir: "/tmp/xmlErs/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -504,7 +504,7 @@ func TestFileXMLProcessEVentError2(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/xmlErs/out/",
dir: "/tmp/xmlErs/out/",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -554,7 +554,7 @@ func TestFileXMLProcessEventParseError(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/xmlErs/out",
dir: "/tmp/xmlErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -579,14 +579,14 @@ func TestFileXML(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/xmlErs/out",
dir: "/tmp/xmlErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
err := os.MkdirAll(eR.rdrDir, 0777)
err := os.MkdirAll(eR.dir, 0777)
if err != nil {
t.Error(err)
}
@@ -604,7 +604,7 @@ func TestFileXML(t *testing.T) {
eR.Config().Fields[0].ComputePath()
for i := 1; i < 4; i++ {
if _, err := os.Create(path.Join(eR.rdrDir, fmt.Sprintf("file%d.xml", i))); err != nil {
if _, err := os.Create(path.Join(eR.dir, fmt.Sprintf("file%d.xml", i))); err != nil {
t.Error(err)
}
}
@@ -618,7 +618,7 @@ func TestFileXML(t *testing.T) {
if err := eR.Serve(); err != nil {
t.Error(err)
}
os.Create(path.Join(eR.rdrDir, "file1.txt"))
os.Create(path.Join(eR.dir, "file1.txt"))
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
@@ -632,14 +632,14 @@ func TestFileXMLError(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/xmlErsError/out",
dir: "/tmp/xmlErsError/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.conReqs <- struct{}{}
err := os.MkdirAll(eR.rdrDir, 0777)
err := os.MkdirAll(eR.dir, 0777)
if err != nil {
t.Error(err)
}
@@ -657,11 +657,11 @@ func TestFileXMLError(t *testing.T) {
eR.Config().Fields[0].ComputePath()
for i := 1; i < 4; i++ {
if _, err := os.Create(path.Join(eR.rdrDir, fmt.Sprintf("file%d.xml", i))); err != nil {
if _, err := os.Create(path.Join(eR.dir, fmt.Sprintf("file%d.xml", i))); err != nil {
t.Error(err)
}
}
os.Create(path.Join(eR.rdrDir, "file1.txt"))
os.Create(path.Join(eR.dir, "file1.txt"))
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
@@ -675,7 +675,7 @@ func TestFileXMLExit(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
rdrDir: "/tmp/xmlErs/out",
dir: "/tmp/xmlErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),

View File

@@ -32,7 +32,7 @@ func TestERSNewXMLFileER(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: nil,
rdrDir: "/var/spool/cgrates/ers/in",
dir: "/var/spool/cgrates/ers/in",
rdrEvents: nil,
rdrError: nil,
rdrExit: nil,

View File

@@ -20,7 +20,9 @@ package ers
import (
"fmt"
"os"
"sort"
"strings"
"time"
"github.com/cgrates/cgrates/agents"
@@ -103,3 +105,28 @@ func mergePartialEvents(cgrEvs []*utils.CGREvent, cfg *config.EventReaderCfg, fl
}
return
}
// processReaderDir finds all entries within dirPath, filters only the ones whose name
// ends with the specified suffix and executes function f on them.
func processReaderDir(dirPath, suffix string, f func(dir, fn string) error) {
filesInDir, err := os.ReadDir(dirPath)
if err != nil {
utils.Logger.Notice(fmt.Sprintf(
"<%s> encountered error while reading entries from directory %s: %v",
utils.ERs, dirPath, err))
// There is no need to return, as os.ReadDir can still return entries
// even if an error was encountered. Logging it should suffice
}
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), suffix) { // hardcoded file extension for csv event reader
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := f(dirPath, fileName); err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<%s> processing file %s, error: %v",
utils.ERs, fileName, err))
}
}(file.Name())
}
}

View File

@@ -55,7 +55,7 @@ func TestNewCsvReader(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 1,
fltrS: fltr,
rdrDir: cfg.ERsCfg().Readers[1].SourcePath,
dir: cfg.ERsCfg().Readers[1].SourcePath,
rdrEvents: nil,
rdrError: nil,
rdrExit: nil,