diff --git a/utils/file.go b/utils/file.go
index 573beb3be..86636fcf7 100644
--- a/utils/file.go
+++ b/utils/file.go
@@ -27,8 +27,13 @@ import (
// WatchDir sets up a watcher via inotify to be triggered on new files
// sysID is the subsystem ID, f will be triggered on match
func WatchDir(dirPath string, f func(itmPath, itmID string) error, sysID string, stopWatching chan struct{}) (err error) {
+ return watchDir(dirPath, f, sysID, stopWatching, fsnotify.NewWatcher)
+}
+
+func watchDir(dirPath string, f func(itmPath, itmID string) error, sysID string,
+ stopWatching chan struct{}, newWatcher func() (*fsnotify.Watcher, error)) (err error) {
var watcher *fsnotify.Watcher
- if watcher, err = fsnotify.NewWatcher(); err != nil {
+ if watcher, err = newWatcher(); err != nil {
return
}
if err = watcher.Add(dirPath); err != nil {
@@ -36,29 +41,32 @@ func WatchDir(dirPath string, f func(itmPath, itmID string) error, sysID string,
return
}
Logger.Info(fmt.Sprintf("<%s> monitoring <%s> for file moves.", sysID, dirPath))
- go func() { // read async
- defer watcher.Close()
- for {
- select {
- case <-stopWatching:
- Logger.Info(fmt.Sprintf("<%s> stop watching path <%s>", sysID, dirPath))
- return
- case ev := <-watcher.Events:
- if ev.Op&fsnotify.Create == fsnotify.Create {
- go func() { //Enable async processing here so we can simultaneously process files
- if err = f(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil {
- Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>",
- sysID, ev.Name, err.Error()))
- }
- }()
- }
- case err = <-watcher.Errors:
- Logger.Err(
- fmt.Sprintf("<%s> watching path <%s>, error: <%s>, exiting!",
- sysID, dirPath, err.Error()))
- return
- }
- }
- }()
+ go watch(dirPath, sysID, f, watcher, stopWatching) // read async
return
}
+
+func watch(dirPath, sysID string, f func(itmPath, itmID string) error,
+ watcher *fsnotify.Watcher, stopWatching chan struct{}) (err error) {
+ defer watcher.Close()
+ for {
+ select {
+ case <-stopWatching:
+ Logger.Info(fmt.Sprintf("<%s> stop watching path <%s>", sysID, dirPath))
+ return
+ case ev := <-watcher.Events:
+ if ev.Op&fsnotify.Create == fsnotify.Create {
+ go func() { //Enable async processing here so we can simultaneously process files
+ if err = f(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil {
+ Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>",
+ sysID, ev.Name, err.Error()))
+ }
+ }()
+ }
+ case err = <-watcher.Errors:
+ Logger.Err(
+ fmt.Sprintf("<%s> watching path <%s>, error: <%s>, exiting!",
+ sysID, dirPath, err.Error()))
+ return
+ }
+ }
+}
diff --git a/utils/file_it_test.go b/utils/file_it_test.go
index 925cabff2..9891156cd 100644
--- a/utils/file_it_test.go
+++ b/utils/file_it_test.go
@@ -1,3 +1,5 @@
+// +build integration
+
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
@@ -19,14 +21,21 @@ along with this program. If not, see
package utils
import (
+ "fmt"
"os"
+ "path"
"testing"
+
+ "gopkg.in/fsnotify.v1"
)
var (
testsWatchDir = []func(t *testing.T){
- testWatchDir,
- testWatchDirNoError,
+ testWatchWatcherError,
+ testWatchWatcherEvents,
+ testWatchDirValidPath,
+ testWatchDirInvalidPath,
+ testWatchNewWatcherError,
}
)
@@ -36,27 +45,89 @@ func TestFileIT(t *testing.T) {
}
}
-func testWatchDir(t *testing.T) {
+func testWatchWatcherError(t *testing.T) {
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ t.Fatal(err)
+ }
+ watcher.Events = make(chan fsnotify.Event, 1)
+ watcher.Errors = make(chan error, 1)
+
+ chanErr := fmt.Errorf("")
+ watcher.Errors <- chanErr
stopWatching := make(chan struct{}, 1)
- close(stopWatching)
- flPath := "/tmp/testWatchDir"
+ if err := watch(EmptyString, EmptyString, nil, watcher, stopWatching); err != chanErr {
+ t.Error(err)
+ }
+}
+
+func testWatchWatcherEvents(t *testing.T) {
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ t.Fatal(err)
+ }
+ watcher.Events = make(chan fsnotify.Event, 1)
+ watcher.Errors = make(chan error, 1)
+
+ watcher.Events <- fsnotify.Event{
+ Name: "/tmp/file.txt",
+ Op: fsnotify.Create,
+ }
+ stopWatching := make(chan struct{}, 1)
+ f := func(itmPath, itmID string) error {
+ close(stopWatching)
+ if itmPath != "/tmp" || itmID != "file.txt" {
+ t.Errorf("Invalid directory or file")
+ }
+ return fmt.Errorf("Can't match path")
+ }
+ expected := "Can't match path"
+ if err := watch(EmptyString, EmptyString, f, watcher, stopWatching); err == nil || err.Error() != expected {
+ t.Errorf("Expected %+v, received %+v", expected, err)
+ }
+}
+
+func testWatchDirValidPath(t *testing.T) {
+ flPath := "/tmp/testWatchDirValidPath/"
if err := os.MkdirAll(flPath, 0777); err != nil {
t.Error(err)
}
- if err := WatchDir(flPath, nil, "randomID", stopWatching); err != nil {
+
+ newFile, err := os.Create(path.Join(flPath, "file.txt"))
+ if err != nil {
+ t.Error(err)
+ }
+ newFile.Close()
+
+ stopWatching := make(chan struct{}, 1)
+ if err := WatchDir(path.Join(flPath, "file.txt"), nil, EmptyString, stopWatching); err != nil {
t.Error(err)
}
- if err := os.RemoveAll(flPath); err != nil {
- t.Fatal(err)
+ if err := os.Remove(path.Join(flPath, "file.txt")); err != nil {
+ t.Error(err)
+ }
+ if err := os.Remove(flPath); err != nil {
+ t.Error(err)
}
}
-func testWatchDirNoError(t *testing.T) {
+func testWatchDirInvalidPath(t *testing.T) {
+ flPath := "tmp/testWatchDirInvalidPath"
stopWatching := make(chan struct{}, 1)
- flPath := "/tmp/inexistentDir"
- expectedErr := "no such file or directory"
- if err := WatchDir(flPath, nil, "randomID", stopWatching); err == nil || err.Error() != expectedErr {
- t.Errorf("Expected %+v, received %+v", expectedErr, err)
+ expected := "no such file or directory"
+ if err := WatchDir(flPath, nil, EmptyString, stopWatching); err == nil || err.Error() != expected {
+ t.Errorf("Expected %+v, received %+v", expected, err)
+ }
+}
+
+func testWatchNewWatcherError(t *testing.T) {
+ newWatcher := func() (*fsnotify.Watcher, error) {
+ return nil, fmt.Errorf("Invalid watcher")
+ }
+ stopWatching := make(chan struct{}, 1)
+ expected := "Invalid watcher"
+ if err := watchDir(EmptyString, nil, EmptyString, stopWatching, newWatcher); err == nil || err.Error() != expected {
+ t.Errorf("Expected %+v, received %+v", expected, err)
}
}
diff --git a/utils/logger_it_test.go b/utils/logger_it_test.go
index d01b3040a..1b4a33357 100644
--- a/utils/logger_it_test.go
+++ b/utils/logger_it_test.go
@@ -1,5 +1,23 @@
// +build integration
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
package utils
import (
diff --git a/utils/set_test.go b/utils/set_test.go
index f3be6cd9f..172fe603a 100644
--- a/utils/set_test.go
+++ b/utils/set_test.go
@@ -218,3 +218,16 @@ func TestSetCloneEmpty(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", expected, received)
}
}
+
+func TestGetOne(t *testing.T) {
+ set := StringSet{
+ "test1": struct{}{},
+ "test2": struct{}{},
+ }
+ value := set.GetOne()
+ expected := "test1"
+ if value != expected {
+ t.Errorf("Expected %+v, received %+v", expected, value)
+ }
+
+}