diff --git a/ers/libers.go b/ers/libers.go index 3b085ad51..94fe4906a 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -23,6 +23,7 @@ import ( "os" "sort" "strings" + "sync" "time" "github.com/cgrates/cgrates/agents" @@ -106,22 +107,26 @@ func mergePartialEvents(cgrEvs []*utils.CGREvent, cfg *config.EventReaderCfg, fl return } -// processReaderDir finds all entries within dirPath, filters only the ones whose name -// ends with the specified suffix and executes function f on them. +// processReaderDir finds all entries within dirPath, filters only the ones +// whose name ends with the specified suffix and executes function f on them. +// It waits for all operations to complete before returning. func processReaderDir(dirPath, suffix string, f func(fn string) error) { filesInDir, err := os.ReadDir(dirPath) if err != nil { utils.Logger.Notice(fmt.Sprintf( "<%s> encountered error while reading entries from directory %s: %v", utils.ERs, dirPath, err)) - // There is no need to return, as os.ReadDir can still return entries - // even if an error was encountered. Logging it should suffice + // os.ReadDir may return partial results even on error, so we continue processing. } + var wg sync.WaitGroup for _, file := range filesInDir { - if !strings.HasSuffix(file.Name(), suffix) { // hardcoded file extension for csv event reader - continue // used in order to filter the files from directory + if !strings.HasSuffix(file.Name(), suffix) { + // Ignore any entries that don't end in the specified suffix. + continue } + wg.Add(1) go func(fileName string) { + defer wg.Done() if err := f(fileName); err != nil { utils.Logger.Warning(fmt.Sprintf( "<%s> processing file %s, error: %v", @@ -129,4 +134,5 @@ func processReaderDir(dirPath, suffix string, f func(fn string) error) { } }(file.Name()) } + wg.Wait() }