diff --git a/ers/filecsv.go b/ers/filecsv.go index 33feabc42..8b140e5cf 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -107,12 +107,12 @@ func (rdr *CSVFileER) Serve() (err error) { } // processFile is called for each file in a directory and dispatches erEvents from it -func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { +func (rdr *CSVFileER) processFile(fName string) (err error) { if cap(rdr.conReqs) != 0 { // 0 goes for no limit processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(fPath, fName) + absPath := path.Join(rdr.dir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) var file *os.File diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index c73d847bd..241ecfd71 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -395,7 +395,7 @@ func TestFileCSVProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ers/out/", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -505,7 +505,7 @@ func TestFileCSVProcessEvent(t *testing.T) { } fname := "file1.csv" - if err := eR.processFile(filePath, fname); err != nil { + if err := eR.processFile(fname); err != nil { t.Error(err) } select { @@ -532,7 +532,7 @@ func TestFileCSVProcessEventError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ers/out/", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -540,7 +540,7 @@ func TestFileCSVProcessEventError(t *testing.T) { } eR.conReqs <- struct{}{} errExpect := "open /tmp/TestFileCSVProcessEvent/file1.csv: no such file or directory" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } } @@ -565,7 +565,7 @@ func TestFileCSVProcessEventError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ers/out/", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -578,7 +578,7 @@ func TestFileCSVProcessEventError2(t *testing.T) { } errExpect := "unsupported type: <>" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } if err := os.RemoveAll(filePath); err != nil { @@ -609,7 +609,7 @@ func TestFileCSVProcessEventError3(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ers/out/", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -620,14 +620,14 @@ func TestFileCSVProcessEventError3(t *testing.T) { // eR.Config().Filters = []string{"Filter1"} errExpect := "NOT_FOUND:Filter1" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } // eR.Config().Filters = []string{"*exists:~*req..Account:"} errExpect = "Invalid fieldPath [ Account]" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } if err := os.RemoveAll(filePath); err != nil { diff --git a/ers/filefwv.go b/ers/filefwv.go index 2bb598797..0cfcc81cc 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -115,12 +115,12 @@ func (rdr *FWVFileER) Serve() (err error) { } // processFile is called for each file in a directory and dispatches erEvents from it -func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { +func (rdr *FWVFileER) processFile(fName string) (err error) { if cap(rdr.conReqs) != 0 { // 0 goes for no limit processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(fPath, fName) + absPath := path.Join(rdr.dir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) var file *os.File diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index a5774a158..7e33cbe62 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -283,7 +283,7 @@ func TestFileFWVProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/fwvErs/out", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -304,7 +304,7 @@ func TestFileFWVProcessEvent(t *testing.T) { }, } eR.Config().Fields[0].ComputePath() - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } if err := os.RemoveAll(filePath); err != nil { diff --git a/ers/filejson.go b/ers/filejson.go index ff174122e..b0ffee8c6 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -109,12 +109,12 @@ func (rdr *JSONFileER) Serve() (err error) { } // processFile is called for each file in a directory and dispatches erEvents from it -func (rdr *JSONFileER) processFile(fPath, fName string) (err error) { +func (rdr *JSONFileER) processFile(fName string) (err error) { if cap(rdr.conReqs) != 0 { // 0 goes for no limit processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(fPath, fName) + absPath := path.Join(rdr.dir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) var file *os.File diff --git a/ers/filejson_it_test.go b/ers/filejson_it_test.go index 244ffafb2..2ba1a2209 100644 --- a/ers/filejson_it_test.go +++ b/ers/filejson_it_test.go @@ -340,7 +340,7 @@ func TestFileJSONProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ErsJSON/out/", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -366,7 +366,7 @@ func TestFileJSONProcessEvent(t *testing.T) { // expEvent := &utils.CGREvent{} eR.conReqs <- struct{}{} fname := "file1.json" - if err := eR.processFile(filePath, fname); err != nil { + if err := eR.processFile(fname); err != nil { t.Error(err) } select { @@ -393,7 +393,7 @@ func TestFileJSONProcessEventReadError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ErsJSON/out/", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -401,7 +401,7 @@ func TestFileJSONProcessEventReadError(t *testing.T) { } eR.conReqs <- struct{}{} errExpect := "open /tmp/TestFileJSONProcessEvent/file2.json: no such file or directory" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } } @@ -441,7 +441,7 @@ func TestFileJSONProcessEventError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ErsJSON/out/", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -454,7 +454,7 @@ func TestFileJSONProcessEventError2(t *testing.T) { } fname := "file1.json" errExpect := "unsupported type: <>" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } if err := os.RemoveAll(filePath); err != nil { @@ -500,7 +500,7 @@ func TestFileJSONProcessEventError3(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ErsJSON/out/", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -512,13 +512,13 @@ func TestFileJSONProcessEventError3(t *testing.T) { // eR.Config().Filters = []string{"Filter1"} errExpect := "NOT_FOUND:Filter1" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } // eR.Config().Filters = []string{"*exists:~*req..Account:"} - if err := eR.processFile(filePath, fname); err != nil { + if err := eR.processFile(fname); err != nil { t.Error(err) } if err := os.RemoveAll(filePath); err != nil { diff --git a/ers/filexml.go b/ers/filexml.go index 4fa9900e6..09a363bb7 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -127,12 +127,12 @@ func (rdr *XMLFileER) Serve() (err error) { */ // processFile is called for each file in a directory and dispatches erEvents from it -func (rdr *XMLFileER) processFile(fPath, fName string) error { +func (rdr *XMLFileER) processFile(fName string) error { if cap(rdr.conReqs) != 0 { // 0 goes for no limit processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(fPath, fName) + absPath := path.Join(rdr.dir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) file, err := os.Open(absPath) diff --git a/ers/filexml_it_test.go b/ers/filexml_it_test.go index 71ad0ed0b..930021a03 100644 --- a/ers/filexml_it_test.go +++ b/ers/filexml_it_test.go @@ -402,7 +402,7 @@ func TestFileXMLProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/xmlErs/out", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -424,7 +424,7 @@ func TestFileXMLProcessEvent(t *testing.T) { eR.conReqs <- struct{}{} fileName := "file1.xml" - if err := eR.processFile(filePath, fileName); err != nil { + if err := eR.processFile(fileName); err != nil { t.Error(err) } expEvent := &utils.CGREvent{ @@ -458,7 +458,7 @@ func TestFileXMLProcessEventError1(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/xmlErs/out/", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -466,7 +466,7 @@ func TestFileXMLProcessEventError1(t *testing.T) { } eR.conReqs <- struct{}{} errExpect := "open /tmp/TestFileXMLProcessEvent/file1.xml: no such file or directory" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } } @@ -504,7 +504,7 @@ func TestFileXMLProcessEVentError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/xmlErs/out/", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -520,14 +520,14 @@ func TestFileXMLProcessEVentError2(t *testing.T) { // eR.Config().Filters = []string{"Filter1"} errExpect := "NOT_FOUND:Filter1" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } // eR.Config().Filters = []string{"*exists:~*req..Account:"} errExpect = "rename /tmp/TestFileXMLProcessEvent/file1.xml /var/spool/cgrates/ers/out/file1.xml: no such file or directory" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } if err := os.RemoveAll(filePath); err != nil { @@ -554,7 +554,7 @@ func TestFileXMLProcessEventParseError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/xmlErs/out", + dir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -564,7 +564,7 @@ func TestFileXMLProcessEventParseError(t *testing.T) { fileName := "file1.xml" errExpect := "XML syntax error on line 2: unexpected EOF" - if err := eR.processFile(filePath, fileName); err == nil || err.Error() != errExpect { + if err := eR.processFile(fileName); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } if err := os.RemoveAll(filePath); err != nil { diff --git a/ers/libers.go b/ers/libers.go index ff9c7d79e..3b085ad51 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -108,7 +108,7 @@ func mergePartialEvents(cgrEvs []*utils.CGREvent, cfg *config.EventReaderCfg, fl // processReaderDir finds all entries within dirPath, filters only the ones whose name // ends with the specified suffix and executes function f on them. -func processReaderDir(dirPath, suffix string, f func(dir, fn string) error) { +func processReaderDir(dirPath, suffix string, f func(fn string) error) { filesInDir, err := os.ReadDir(dirPath) if err != nil { utils.Logger.Notice(fmt.Sprintf( @@ -122,7 +122,7 @@ func processReaderDir(dirPath, suffix string, f func(dir, fn string) error) { continue // used in order to filter the files from directory } go func(fileName string) { - if err := f(dirPath, fileName); err != nil { + if err := f(fileName); err != nil { utils.Logger.Warning(fmt.Sprintf( "<%s> processing file %s, error: %v", utils.ERs, fileName, err)) diff --git a/loaders/loader.go b/loaders/loader.go index f0fc64af7..d3066aaaf 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -848,7 +848,7 @@ func (ldr *Loader) handleFolder(stopChan chan struct{}) { } } -func (ldr *Loader) processFile(_, itmID string) (err error) { +func (ldr *Loader) processFile(itmID string) (err error) { loaderType := ldr.getLdrType(itmID) if len(loaderType) == 0 { return diff --git a/loaders/loader_it_test.go b/loaders/loader_it_test.go index be43ac48e..9f1458b3b 100644 --- a/loaders/loader_it_test.go +++ b/loaders/loader_it_test.go @@ -455,7 +455,7 @@ cgrates.org,NewRes1 } //loader file is empty (loaderType will be empty) - if err := ldr.processFile("unusedValue", utils.ResourcesCsv); err != nil { + if err := ldr.processFile(utils.ResourcesCsv); err != nil { t.Error(err) } @@ -482,7 +482,7 @@ cgrates.org,NewRes1 } //successfully processed the file - if err := ldr.processFile("unusedValue", utils.ResourcesCsv); err != nil { + if err := ldr.processFile(utils.ResourcesCsv); err != nil { t.Error(err) } @@ -509,7 +509,7 @@ cgrates.org,NewRes1 //cannot move file when tpOutDir is empty ldr.tpOutDir = utils.EmptyString - if err := ldr.processFile("unusedValue", utils.ResourcesCsv); err != nil { + if err := ldr.processFile(utils.ResourcesCsv); err != nil { t.Error(err) } @@ -569,7 +569,7 @@ cgrates.org,NewRes1 }, } - if err := ldr.processFile("unusedValue", "inexistent.csv"); err != nil { + if err := ldr.processFile("inexistent.csv"); err != nil { t.Error(err) } @@ -615,7 +615,7 @@ cgrates.org,NewRes1 //unable to lock the folder, because lockFileName is missing expected := "open /tmp/test/.cgr.lck: no such file or directory" - if err := ldr.processFile("unusedValue", utils.ResourcesCsv); err == nil || err.Error() != expected { + if err := ldr.processFile(utils.ResourcesCsv); err == nil || err.Error() != expected { t.Errorf("Expected %+v, received %+v", expected, err) } @@ -655,7 +655,7 @@ cgrates.org,NewRes1 //unable to lock the folder, because lockFileName is missing expected := "open /tmp/testProcessFileUnableToOpen/resources: no such file or directory" - if err := ldr.processFile("unusedValue", `resources`); err == nil || err.Error() != expected { + if err := ldr.processFile(`resources`); err == nil || err.Error() != expected { t.Errorf("Expected %+v, received %+v", expected, err) } @@ -721,7 +721,7 @@ cgrates.org,NewRes1 file.Close() expected := "rename /tmp/testProcessFileLockFolder/Resources.csv INEXISTING_FILE/Resources.csv: no such file or directory" - if err := ldr.processFile("unusedValue", utils.ResourcesCsv); err == nil || err.Error() != expected { + if err := ldr.processFile(utils.ResourcesCsv); err == nil || err.Error() != expected { t.Errorf("Expected %+v, received %+v", expected, err) } diff --git a/utils/file.go b/utils/file.go index 96a5e5b38..14e8cceea 100644 --- a/utils/file.go +++ b/utils/file.go @@ -26,11 +26,11 @@ import ( // WatchDir sets up a watcher via inotify to be triggered on new files // sysID is the subsystem ID, f will be triggered on match -func WatchDir(dirPath string, f func(itmPath, itmID string) error, sysID string, stopWatching chan struct{}) (err error) { +func WatchDir(dirPath string, f func(itmID string) error, sysID string, stopWatching chan struct{}) (err error) { return watchDir(dirPath, f, sysID, stopWatching, fsnotify.NewWatcher) } -func watchDir(dirPath string, f func(itmPath, itmID string) error, sysID string, +func watchDir(dirPath string, f func(itmID string) error, sysID string, stopWatching chan struct{}, newWatcher func() (*fsnotify.Watcher, error)) (err error) { var watcher *fsnotify.Watcher if watcher, err = newWatcher(); err != nil { @@ -45,7 +45,7 @@ func watchDir(dirPath string, f func(itmPath, itmID string) error, sysID string, return } -func watch(dirPath, sysID string, f func(itmPath, itmID string) error, +func watch(dirPath, sysID string, f func(itmID string) error, watcher *fsnotify.Watcher, stopWatching chan struct{}) (err error) { defer watcher.Close() for { @@ -56,7 +56,7 @@ func watch(dirPath, sysID string, f func(itmPath, itmID string) error, case ev := <-watcher.Events: if ev.Op&fsnotify.Create == fsnotify.Create { go func() { //Enable async processing here so we can simultaneously process files - if err = f(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil { + if err = f(filepath.Base(ev.Name)); err != nil { Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>", sysID, ev.Name, err.Error())) } diff --git a/utils/file_it_test.go b/utils/file_it_test.go index 39ccbcbc2..58b4033ab 100644 --- a/utils/file_it_test.go +++ b/utils/file_it_test.go @@ -75,9 +75,9 @@ func testWatchWatcherEvents(t *testing.T) { Op: fsnotify.Create, } stopWatching := make(chan struct{}, 1) - f := func(itmPath, itmID string) error { + f := func(itmID string) error { close(stopWatching) - if itmPath != "/tmp" || itmID != "file.txt" { + if itmID != "file.txt" { t.Errorf("Invalid directory or file") } return fmt.Errorf("Can't match path")