diff --git a/ers/filecsv.go b/ers/filecsv.go index 383f27da8..2263747f3 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -106,12 +106,12 @@ func (rdr *CSVFileER) Serve() (err error) { } // processFile is called for each file in a directory and dispatches erEvents from it -func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { +func (rdr *CSVFileER) processFile(fName string) (err error) { if cap(rdr.conReqs) != 0 { // 0 goes for no limit processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(fPath, fName) + absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) var file *os.File diff --git a/ers/filefwv.go b/ers/filefwv.go index 24187dcb4..c31cfa098 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -114,12 +114,12 @@ func (rdr *FWVFileER) Serve() (err error) { } // processFile is called for each file in a directory and dispatches erEvents from it -func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { +func (rdr *FWVFileER) processFile(fName string) (err error) { if cap(rdr.conReqs) != 0 { // 0 goes for no limit processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(fPath, fName) + absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) var file *os.File diff --git a/ers/filejson.go b/ers/filejson.go index 166b9d344..14d50c47f 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -107,12 +107,12 @@ func (rdr *JSONFileER) Serve() (err error) { } // processFile is called for each file in a directory and dispatches erEvents from it -func (rdr *JSONFileER) processFile(fPath, fName string) (err error) { +func (rdr *JSONFileER) processFile(fName string) (err error) { if cap(rdr.conReqs) != 0 { // 0 goes for no limit processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(fPath, fName) + absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) var file *os.File diff --git a/ers/filejson_it_test.go b/ers/filejson_it_test.go index 8507cdfe3..d2e6f67b8 100644 --- a/ers/filejson_it_test.go +++ b/ers/filejson_it_test.go @@ -283,8 +283,8 @@ func TestFileJSONProcessFile(t *testing.T) { if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } - expected := "open : no such file or directory" - err2 := rdr.(*JSONFileER).processFile("", "") + expected := "open /var/spool/cgrates/ers/in: no such file or directory" + err2 := rdr.(*JSONFileER).processFile("") if err2 == nil || err2.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err2) } diff --git a/ers/filexml.go b/ers/filexml.go index b40b04b96..01526ab82 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -128,12 +128,12 @@ func (rdr *XMLFileER) Serve() (err error) { */ // processFile is called for each file in a directory and dispatches erEvents from it -func (rdr *XMLFileER) processFile(fPath, fName string) error { +func (rdr *XMLFileER) processFile(fName string) error { if cap(rdr.conReqs) != 0 { // 0 goes for no limit processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(fPath, fName) + absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) file, err := os.Open(absPath) diff --git a/ers/filexml_it_test.go b/ers/filexml_it_test.go index a928b4d85..10d6216d2 100644 --- a/ers/filexml_it_test.go +++ b/ers/filexml_it_test.go @@ -419,7 +419,7 @@ func TestFileXMLProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - sourceDir: "/tmp/xmlErs/out", + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -441,7 +441,7 @@ func TestFileXMLProcessEvent(t *testing.T) { eR.conReqs <- struct{}{} fileName := "file1.xml" - if err := eR.processFile(filePath, fileName); err != nil { + if err := eR.processFile(fileName); err != nil { t.Error(err) } expEvent := &utils.CGREvent{ @@ -474,7 +474,7 @@ func TestFileXMLProcessEventError1(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - sourceDir: "/tmp/xmlErs/out/", + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -482,7 +482,7 @@ func TestFileXMLProcessEventError1(t *testing.T) { } eR.conReqs <- struct{}{} errExpect := "open /tmp/TestFileXMLProcessEvent/file1.xml: no such file or directory" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } } @@ -520,7 +520,7 @@ func TestFileXMLProcessEVentError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - sourceDir: "/tmp/xmlErs/out/", + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -536,14 +536,14 @@ func TestFileXMLProcessEVentError2(t *testing.T) { // eR.Config().Filters = []string{"Filter1"} errExpect := "NOT_FOUND:Filter1" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } // eR.Config().Filters = []string{"*exists:~*req..Account:"} errExpect = "rename /tmp/TestFileXMLProcessEvent/file1.xml /var/spool/cgrates/ers/out/file1.xml: no such file or directory" - if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } if err := os.RemoveAll(filePath); err != nil { @@ -583,7 +583,7 @@ func TestFileXMLProcessEVentError3(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - sourceDir: "/tmp/xmlErs/out/", + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -609,7 +609,7 @@ func TestFileXMLProcessEVentError3(t *testing.T) { eR.Config().Fields[0].ComputePath() errExpect := "Empty source value for fieldID: " - eR.processFile(filePath, fname) + eR.processFile(fname) if !strings.Contains(buf.String(), errExpect) { t.Errorf("Expected to contain %s", errExpect) } @@ -637,7 +637,7 @@ func TestFileXMLProcessEventParseError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - sourceDir: "/tmp/xmlErs/out", + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -647,7 +647,7 @@ func TestFileXMLProcessEventParseError(t *testing.T) { fileName := "file1.xml" errExpect := "XML syntax error on line 2: unexpected EOF" - if err := eR.processFile(filePath, fileName); err == nil || err.Error() != errExpect { + if err := eR.processFile(fileName); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } if err := os.RemoveAll(filePath); err != nil { diff --git a/ers/libers.go b/ers/libers.go index 8862a168c..42fab220b 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -106,7 +106,7 @@ func mergePartialEvents(cgrEvs []*utils.CGREvent, cfg *config.EventReaderCfg, fl // processReaderDir finds all entries within dirPath, filters only the ones whose name // ends with the specified suffix and executes function f on them. -func processReaderDir(dirPath, suffix string, f func(dir, fn string) error) { +func processReaderDir(dirPath, suffix string, f func(fn string) error) { filesInDir, err := os.ReadDir(dirPath) if err != nil { utils.Logger.Notice(fmt.Sprintf( @@ -120,7 +120,7 @@ func processReaderDir(dirPath, suffix string, f func(dir, fn string) error) { continue // used in order to filter the files from directory } go func(fileName string) { - if err := f(dirPath, fileName); err != nil { + if err := f(fileName); err != nil { utils.Logger.Warning(fmt.Sprintf( "<%s> processing file %s, error: %v", utils.ERs, fileName, err)) diff --git a/loaders/loader.go b/loaders/loader.go index 1c26abf4a..6d4351e82 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -317,7 +317,7 @@ func (l *loader) getCfg(fileName string) (cfg *config.LoaderDataType) { return nil } -func (l *loader) processIFile(_, fileName string) (err error) { +func (l *loader) processIFile(fileName string) (err error) { cfg := l.getCfg(fileName) if cfg == nil { if pathIn := path.Join(l.ldrCfg.TpInDir, fileName); !l.IsLockFile(pathIn) && len(l.ldrCfg.TpOutDir) != 0 { diff --git a/loaders/loader_test.go b/loaders/loader_test.go index 25928e59f..5e5258e08 100644 --- a/loaders/loader_test.go +++ b/loaders/loader_test.go @@ -977,7 +977,7 @@ func TestLoaderProcessIFile(t *testing.T) { }, }, dm, cache, fS, cM, nil) expErrMsg := fmt.Sprintf(`rename %s/Chargers.csv %s/Chargers.csv: no such file or directory`, tmpIn, tmpOut) - if err := ld.processIFile(utils.EmptyString, utils.ChargersCsv); err == nil || err.Error() != expErrMsg { + if err := ld.processIFile(utils.ChargersCsv); err == nil || err.Error() != expErrMsg { t.Errorf("Expected: %v, received: %v", expErrMsg, err) } @@ -994,7 +994,7 @@ func TestLoaderProcessIFile(t *testing.T) { if err := f.Close(); err != nil { t.Fatal(err) } - if err := ld.processIFile(utils.EmptyString, utils.AttributesCsv); err != nil { + if err := ld.processIFile(utils.AttributesCsv); err != nil { t.Fatal(err) } if prf, err := dm.GetAttributeProfile(context.Background(), "cgrates.org", "ID", false, true, utils.NonTransactional); err != nil { @@ -1016,7 +1016,7 @@ func TestLoaderProcessIFile(t *testing.T) { } ld.Locker = mockLock{} - if err := ld.processIFile(utils.EmptyString, utils.AttributesCsv); err != utils.ErrExists { + if err := ld.processIFile(utils.AttributesCsv); err != utils.ErrExists { t.Fatal(err) } } diff --git a/utils/file.go b/utils/file.go index 96a5e5b38..14e8cceea 100644 --- a/utils/file.go +++ b/utils/file.go @@ -26,11 +26,11 @@ import ( // WatchDir sets up a watcher via inotify to be triggered on new files // sysID is the subsystem ID, f will be triggered on match -func WatchDir(dirPath string, f func(itmPath, itmID string) error, sysID string, stopWatching chan struct{}) (err error) { +func WatchDir(dirPath string, f func(itmID string) error, sysID string, stopWatching chan struct{}) (err error) { return watchDir(dirPath, f, sysID, stopWatching, fsnotify.NewWatcher) } -func watchDir(dirPath string, f func(itmPath, itmID string) error, sysID string, +func watchDir(dirPath string, f func(itmID string) error, sysID string, stopWatching chan struct{}, newWatcher func() (*fsnotify.Watcher, error)) (err error) { var watcher *fsnotify.Watcher if watcher, err = newWatcher(); err != nil { @@ -45,7 +45,7 @@ func watchDir(dirPath string, f func(itmPath, itmID string) error, sysID string, return } -func watch(dirPath, sysID string, f func(itmPath, itmID string) error, +func watch(dirPath, sysID string, f func(itmID string) error, watcher *fsnotify.Watcher, stopWatching chan struct{}) (err error) { defer watcher.Close() for { @@ -56,7 +56,7 @@ func watch(dirPath, sysID string, f func(itmPath, itmID string) error, case ev := <-watcher.Events: if ev.Op&fsnotify.Create == fsnotify.Create { go func() { //Enable async processing here so we can simultaneously process files - if err = f(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil { + if err = f(filepath.Base(ev.Name)); err != nil { Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>", sysID, ev.Name, err.Error())) } diff --git a/utils/file_it_test.go b/utils/file_it_test.go index 39ccbcbc2..58b4033ab 100644 --- a/utils/file_it_test.go +++ b/utils/file_it_test.go @@ -75,9 +75,9 @@ func testWatchWatcherEvents(t *testing.T) { Op: fsnotify.Create, } stopWatching := make(chan struct{}, 1) - f := func(itmPath, itmID string) error { + f := func(itmID string) error { close(stopWatching) - if itmPath != "/tmp" || itmID != "file.txt" { + if itmID != "file.txt" { t.Errorf("Invalid directory or file") } return fmt.Errorf("Can't match path")