Remove redundant dir parameter from processFile methods

Applies to both file readers and loader (for loader, the blank statement
was used anyway).

It's redundant because for file readers, the rdr.sourceDir value was
always passed as the parameter when it was already part of the method's
object.

Parameter had to also be removed from the WatchDir function and the
functions it depends on.
This commit is contained in:
ionutboangiu
2024-09-20 19:28:24 +03:00
committed by Dan Christian Bogos
parent 7998ed6a0f
commit e1adb674b8
11 changed files with 33 additions and 33 deletions

View File

@@ -106,12 +106,12 @@ func (rdr *CSVFileER) Serve() (err error) {
}
// processFile is called for each file in a directory and dispatches erEvents from it
func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
func (rdr *CSVFileER) processFile(fName string) (err error) {
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
defer func() { rdr.conReqs <- processFile }()
}
absPath := path.Join(fPath, fName)
absPath := path.Join(rdr.sourceDir, fName)
utils.Logger.Info(
fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath))
var file *os.File

View File

@@ -114,12 +114,12 @@ func (rdr *FWVFileER) Serve() (err error) {
}
// processFile is called for each file in a directory and dispatches erEvents from it
func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
func (rdr *FWVFileER) processFile(fName string) (err error) {
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
defer func() { rdr.conReqs <- processFile }()
}
absPath := path.Join(fPath, fName)
absPath := path.Join(rdr.sourceDir, fName)
utils.Logger.Info(
fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath))
var file *os.File

View File

@@ -107,12 +107,12 @@ func (rdr *JSONFileER) Serve() (err error) {
}
// processFile is called for each file in a directory and dispatches erEvents from it
func (rdr *JSONFileER) processFile(fPath, fName string) (err error) {
func (rdr *JSONFileER) processFile(fName string) (err error) {
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
defer func() { rdr.conReqs <- processFile }()
}
absPath := path.Join(fPath, fName)
absPath := path.Join(rdr.sourceDir, fName)
utils.Logger.Info(
fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath))
var file *os.File

View File

@@ -283,8 +283,8 @@ func TestFileJSONProcessFile(t *testing.T) {
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
expected := "open : no such file or directory"
err2 := rdr.(*JSONFileER).processFile("", "")
expected := "open /var/spool/cgrates/ers/in: no such file or directory"
err2 := rdr.(*JSONFileER).processFile("")
if err2 == nil || err2.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err2)
}

View File

@@ -128,12 +128,12 @@ func (rdr *XMLFileER) Serve() (err error) {
*/
// processFile is called for each file in a directory and dispatches erEvents from it
func (rdr *XMLFileER) processFile(fPath, fName string) error {
func (rdr *XMLFileER) processFile(fName string) error {
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
defer func() { rdr.conReqs <- processFile }()
}
absPath := path.Join(fPath, fName)
absPath := path.Join(rdr.sourceDir, fName)
utils.Logger.Info(
fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath))
file, err := os.Open(absPath)

View File

@@ -419,7 +419,7 @@ func TestFileXMLProcessEvent(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: "/tmp/xmlErs/out",
sourceDir: filePath,
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -441,7 +441,7 @@ func TestFileXMLProcessEvent(t *testing.T) {
eR.conReqs <- struct{}{}
fileName := "file1.xml"
if err := eR.processFile(filePath, fileName); err != nil {
if err := eR.processFile(fileName); err != nil {
t.Error(err)
}
expEvent := &utils.CGREvent{
@@ -474,7 +474,7 @@ func TestFileXMLProcessEventError1(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: "/tmp/xmlErs/out/",
sourceDir: filePath,
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -482,7 +482,7 @@ func TestFileXMLProcessEventError1(t *testing.T) {
}
eR.conReqs <- struct{}{}
errExpect := "open /tmp/TestFileXMLProcessEvent/file1.xml: no such file or directory"
if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect {
if err := eR.processFile(fname); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
@@ -520,7 +520,7 @@ func TestFileXMLProcessEVentError2(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: "/tmp/xmlErs/out/",
sourceDir: filePath,
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -536,14 +536,14 @@ func TestFileXMLProcessEVentError2(t *testing.T) {
//
eR.Config().Filters = []string{"Filter1"}
errExpect := "NOT_FOUND:Filter1"
if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect {
if err := eR.processFile(fname); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
//
eR.Config().Filters = []string{"*exists:~*req..Account:"}
errExpect = "rename /tmp/TestFileXMLProcessEvent/file1.xml /var/spool/cgrates/ers/out/file1.xml: no such file or directory"
if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect {
if err := eR.processFile(fname); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
if err := os.RemoveAll(filePath); err != nil {
@@ -583,7 +583,7 @@ func TestFileXMLProcessEVentError3(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: "/tmp/xmlErs/out/",
sourceDir: filePath,
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -609,7 +609,7 @@ func TestFileXMLProcessEVentError3(t *testing.T) {
eR.Config().Fields[0].ComputePath()
errExpect := "Empty source value for fieldID: <OriginID>"
eR.processFile(filePath, fname)
eR.processFile(fname)
if !strings.Contains(buf.String(), errExpect) {
t.Errorf("Expected to contain %s", errExpect)
}
@@ -637,7 +637,7 @@ func TestFileXMLProcessEventParseError(t *testing.T) {
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: "/tmp/xmlErs/out",
sourceDir: filePath,
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
@@ -647,7 +647,7 @@ func TestFileXMLProcessEventParseError(t *testing.T) {
fileName := "file1.xml"
errExpect := "XML syntax error on line 2: unexpected EOF"
if err := eR.processFile(filePath, fileName); err == nil || err.Error() != errExpect {
if err := eR.processFile(fileName); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
if err := os.RemoveAll(filePath); err != nil {

View File

@@ -106,7 +106,7 @@ func mergePartialEvents(cgrEvs []*utils.CGREvent, cfg *config.EventReaderCfg, fl
// processReaderDir finds all entries within dirPath, filters only the ones whose name
// ends with the specified suffix and executes function f on them.
func processReaderDir(dirPath, suffix string, f func(dir, fn string) error) {
func processReaderDir(dirPath, suffix string, f func(fn string) error) {
filesInDir, err := os.ReadDir(dirPath)
if err != nil {
utils.Logger.Notice(fmt.Sprintf(
@@ -120,7 +120,7 @@ func processReaderDir(dirPath, suffix string, f func(dir, fn string) error) {
continue // used in order to filter the files from directory
}
go func(fileName string) {
if err := f(dirPath, fileName); err != nil {
if err := f(fileName); err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<%s> processing file %s, error: %v",
utils.ERs, fileName, err))

View File

@@ -317,7 +317,7 @@ func (l *loader) getCfg(fileName string) (cfg *config.LoaderDataType) {
return nil
}
func (l *loader) processIFile(_, fileName string) (err error) {
func (l *loader) processIFile(fileName string) (err error) {
cfg := l.getCfg(fileName)
if cfg == nil {
if pathIn := path.Join(l.ldrCfg.TpInDir, fileName); !l.IsLockFile(pathIn) && len(l.ldrCfg.TpOutDir) != 0 {

View File

@@ -977,7 +977,7 @@ func TestLoaderProcessIFile(t *testing.T) {
},
}, dm, cache, fS, cM, nil)
expErrMsg := fmt.Sprintf(`rename %s/Chargers.csv %s/Chargers.csv: no such file or directory`, tmpIn, tmpOut)
if err := ld.processIFile(utils.EmptyString, utils.ChargersCsv); err == nil || err.Error() != expErrMsg {
if err := ld.processIFile(utils.ChargersCsv); err == nil || err.Error() != expErrMsg {
t.Errorf("Expected: %v, received: %v", expErrMsg, err)
}
@@ -994,7 +994,7 @@ func TestLoaderProcessIFile(t *testing.T) {
if err := f.Close(); err != nil {
t.Fatal(err)
}
if err := ld.processIFile(utils.EmptyString, utils.AttributesCsv); err != nil {
if err := ld.processIFile(utils.AttributesCsv); err != nil {
t.Fatal(err)
}
if prf, err := dm.GetAttributeProfile(context.Background(), "cgrates.org", "ID", false, true, utils.NonTransactional); err != nil {
@@ -1016,7 +1016,7 @@ func TestLoaderProcessIFile(t *testing.T) {
}
ld.Locker = mockLock{}
if err := ld.processIFile(utils.EmptyString, utils.AttributesCsv); err != utils.ErrExists {
if err := ld.processIFile(utils.AttributesCsv); err != utils.ErrExists {
t.Fatal(err)
}
}

View File

@@ -26,11 +26,11 @@ import (
// WatchDir sets up a watcher via inotify to be triggered on new files
// sysID is the subsystem ID, f will be triggered on match
func WatchDir(dirPath string, f func(itmPath, itmID string) error, sysID string, stopWatching chan struct{}) (err error) {
func WatchDir(dirPath string, f func(itmID string) error, sysID string, stopWatching chan struct{}) (err error) {
return watchDir(dirPath, f, sysID, stopWatching, fsnotify.NewWatcher)
}
func watchDir(dirPath string, f func(itmPath, itmID string) error, sysID string,
func watchDir(dirPath string, f func(itmID string) error, sysID string,
stopWatching chan struct{}, newWatcher func() (*fsnotify.Watcher, error)) (err error) {
var watcher *fsnotify.Watcher
if watcher, err = newWatcher(); err != nil {
@@ -45,7 +45,7 @@ func watchDir(dirPath string, f func(itmPath, itmID string) error, sysID string,
return
}
func watch(dirPath, sysID string, f func(itmPath, itmID string) error,
func watch(dirPath, sysID string, f func(itmID string) error,
watcher *fsnotify.Watcher, stopWatching chan struct{}) (err error) {
defer watcher.Close()
for {
@@ -56,7 +56,7 @@ func watch(dirPath, sysID string, f func(itmPath, itmID string) error,
case ev := <-watcher.Events:
if ev.Op&fsnotify.Create == fsnotify.Create {
go func() { //Enable async processing here so we can simultaneously process files
if err = f(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil {
if err = f(filepath.Base(ev.Name)); err != nil {
Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>",
sysID, ev.Name, err.Error()))
}

View File

@@ -75,9 +75,9 @@ func testWatchWatcherEvents(t *testing.T) {
Op: fsnotify.Create,
}
stopWatching := make(chan struct{}, 1)
f := func(itmPath, itmID string) error {
f := func(itmID string) error {
close(stopWatching)
if itmPath != "/tmp" || itmID != "file.txt" {
if itmID != "file.txt" {
t.Errorf("Invalid directory or file")
}
return fmt.Errorf("Can't match path")