mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
prevent data race in file watcher goroutine
This commit is contained in:
committed by
Dan Christian Bogos
parent
535f2e554e
commit
6968983dd2
@@ -45,28 +45,30 @@ func watchDir(dirPath string, f func(itmID string) error, sysID string,
|
||||
return
|
||||
}
|
||||
|
||||
// watch monitors a directory for file creation events and processes them asynchronously.
|
||||
func watch(dirPath, sysID string, f func(itmID string) error,
|
||||
watcher *fsnotify.Watcher, stopWatching chan struct{}) (err error) {
|
||||
watcher *fsnotify.Watcher, stopWatching chan struct{}) error {
|
||||
defer watcher.Close()
|
||||
for {
|
||||
select {
|
||||
case <-stopWatching:
|
||||
Logger.Info(fmt.Sprintf("<%s> stop watching path <%s>", sysID, dirPath))
|
||||
return
|
||||
return nil
|
||||
case ev := <-watcher.Events:
|
||||
if ev.Op&fsnotify.Create == fsnotify.Create {
|
||||
go func() { //Enable async processing here so we can simultaneously process files
|
||||
if err = f(filepath.Base(ev.Name)); err != nil {
|
||||
// Process files asynchronously to prevent blocking the watcher.
|
||||
go func() {
|
||||
if err := f(filepath.Base(ev.Name)); err != nil {
|
||||
Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>",
|
||||
sysID, ev.Name, err.Error()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
case err = <-watcher.Errors:
|
||||
Logger.Err(
|
||||
fmt.Sprintf("<%s> watching path <%s>, error: <%s>, exiting!",
|
||||
case err := <-watcher.Errors:
|
||||
Logger.Err(fmt.Sprintf(
|
||||
"<%s> watching path <%s>, error: <%s>, exiting!",
|
||||
sysID, dirPath, err.Error()))
|
||||
return
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,9 +22,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
@@ -63,6 +66,14 @@ func testWatchWatcherError(t *testing.T) {
|
||||
}
|
||||
|
||||
func testWatchWatcherEvents(t *testing.T) {
|
||||
Logger.SetLogLevel(4)
|
||||
Logger.SetSyslog(nil)
|
||||
buf := new(bytes.Buffer)
|
||||
log.SetOutput(buf)
|
||||
t.Cleanup(func() {
|
||||
Logger.SetLogLevel(0)
|
||||
log.SetOutput(os.Stderr)
|
||||
})
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -82,9 +93,12 @@ func testWatchWatcherEvents(t *testing.T) {
|
||||
}
|
||||
return fmt.Errorf("Can't match path")
|
||||
}
|
||||
expected := "Can't match path"
|
||||
if err := watch(EmptyString, EmptyString, f, watcher, stopWatching); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expected %+v, received %+v", expected, err)
|
||||
if err := watch(EmptyString, EmptyString, f, watcher, stopWatching); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
expLog := `[WARNING] <> processing path </tmp/file.txt>, error: <Can't match path>`
|
||||
if !strings.Contains(buf.String(), expLog) {
|
||||
t.Errorf("expected %q to be present, received:\n%s", expLog, buf.String())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user