Add WaitGroup to ensure safe file processing in ERs

For cases when run_delay > 0.
This commit is contained in:
ionutboangiu
2025-01-21 20:05:50 +02:00
committed by Dan Christian Bogos
parent 815e4476bf
commit c171937c3d

View File

@@ -23,6 +23,7 @@ import (
"os"
"sort"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/agents"
@@ -106,22 +107,26 @@ func mergePartialEvents(cgrEvs []*utils.CGREvent, cfg *config.EventReaderCfg, fl
return
}
// processReaderDir finds all entries within dirPath, filters only the ones whose name
// ends with the specified suffix and executes function f on them.
// processReaderDir finds all entries within dirPath, filters only the ones
// whose name ends with the specified suffix and executes function f on them.
// It waits for all operations to complete before returning.
func processReaderDir(dirPath, suffix string, f func(fn string) error) {
filesInDir, err := os.ReadDir(dirPath)
if err != nil {
utils.Logger.Notice(fmt.Sprintf(
"<%s> encountered error while reading entries from directory %s: %v",
utils.ERs, dirPath, err))
// There is no need to return, as os.ReadDir can still return entries
// even if an error was encountered. Logging it should suffice
// os.ReadDir may return partial results even on error, so we continue processing.
}
var wg sync.WaitGroup
for _, file := range filesInDir {
if !strings.HasSuffix(file.Name(), suffix) { // hardcoded file extension for csv event reader
continue // used in order to filter the files from directory
if !strings.HasSuffix(file.Name(), suffix) {
// Ignore any entries that don't end in the specified suffix.
continue
}
wg.Add(1)
go func(fileName string) {
defer wg.Done()
if err := f(fileName); err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<%s> processing file %s, error: %v",
@@ -129,4 +134,5 @@ func processReaderDir(dirPath, suffix string, f func(fn string) error) {
}
}(file.Name())
}
wg.Wait()
}