mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Covered utils.go to 100%
This commit is contained in:
committed by
Dan Christian Bogos
parent
2a92607cf5
commit
00fdb5f580
@@ -27,8 +27,13 @@ import (
|
||||
// WatchDir sets up a watcher via inotify to be triggered on new files
|
||||
// sysID is the subsystem ID, f will be triggered on match
|
||||
func WatchDir(dirPath string, f func(itmPath, itmID string) error, sysID string, stopWatching chan struct{}) (err error) {
|
||||
return watchDir(dirPath, f, sysID, stopWatching, fsnotify.NewWatcher)
|
||||
}
|
||||
|
||||
func watchDir(dirPath string, f func(itmPath, itmID string) error, sysID string,
|
||||
stopWatching chan struct{}, newWatcher func() (*fsnotify.Watcher, error)) (err error) {
|
||||
var watcher *fsnotify.Watcher
|
||||
if watcher, err = fsnotify.NewWatcher(); err != nil {
|
||||
if watcher, err = newWatcher(); err != nil {
|
||||
return
|
||||
}
|
||||
if err = watcher.Add(dirPath); err != nil {
|
||||
@@ -36,29 +41,32 @@ func WatchDir(dirPath string, f func(itmPath, itmID string) error, sysID string,
|
||||
return
|
||||
}
|
||||
Logger.Info(fmt.Sprintf("<%s> monitoring <%s> for file moves.", sysID, dirPath))
|
||||
go func() { // read async
|
||||
defer watcher.Close()
|
||||
for {
|
||||
select {
|
||||
case <-stopWatching:
|
||||
Logger.Info(fmt.Sprintf("<%s> stop watching path <%s>", sysID, dirPath))
|
||||
return
|
||||
case ev := <-watcher.Events:
|
||||
if ev.Op&fsnotify.Create == fsnotify.Create {
|
||||
go func() { //Enable async processing here so we can simultaneously process files
|
||||
if err = f(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil {
|
||||
Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>",
|
||||
sysID, ev.Name, err.Error()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
case err = <-watcher.Errors:
|
||||
Logger.Err(
|
||||
fmt.Sprintf("<%s> watching path <%s>, error: <%s>, exiting!",
|
||||
sysID, dirPath, err.Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
go watch(dirPath, sysID, f, watcher, stopWatching) // read async
|
||||
return
|
||||
}
|
||||
|
||||
func watch(dirPath, sysID string, f func(itmPath, itmID string) error,
|
||||
watcher *fsnotify.Watcher, stopWatching chan struct{}) (err error) {
|
||||
defer watcher.Close()
|
||||
for {
|
||||
select {
|
||||
case <-stopWatching:
|
||||
Logger.Info(fmt.Sprintf("<%s> stop watching path <%s>", sysID, dirPath))
|
||||
return
|
||||
case ev := <-watcher.Events:
|
||||
if ev.Op&fsnotify.Create == fsnotify.Create {
|
||||
go func() { //Enable async processing here so we can simultaneously process files
|
||||
if err = f(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil {
|
||||
Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>",
|
||||
sysID, ev.Name, err.Error()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
case err = <-watcher.Errors:
|
||||
Logger.Err(
|
||||
fmt.Sprintf("<%s> watching path <%s>, error: <%s>, exiting!",
|
||||
sysID, dirPath, err.Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
@@ -19,14 +21,21 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"gopkg.in/fsnotify.v1"
|
||||
)
|
||||
|
||||
var (
|
||||
testsWatchDir = []func(t *testing.T){
|
||||
testWatchDir,
|
||||
testWatchDirNoError,
|
||||
testWatchWatcherError,
|
||||
testWatchWatcherEvents,
|
||||
testWatchDirValidPath,
|
||||
testWatchDirInvalidPath,
|
||||
testWatchNewWatcherError,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -36,27 +45,89 @@ func TestFileIT(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testWatchDir(t *testing.T) {
|
||||
func testWatchWatcherError(t *testing.T) {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
watcher.Events = make(chan fsnotify.Event, 1)
|
||||
watcher.Errors = make(chan error, 1)
|
||||
|
||||
chanErr := fmt.Errorf("")
|
||||
watcher.Errors <- chanErr
|
||||
stopWatching := make(chan struct{}, 1)
|
||||
close(stopWatching)
|
||||
flPath := "/tmp/testWatchDir"
|
||||
if err := watch(EmptyString, EmptyString, nil, watcher, stopWatching); err != chanErr {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testWatchWatcherEvents(t *testing.T) {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
watcher.Events = make(chan fsnotify.Event, 1)
|
||||
watcher.Errors = make(chan error, 1)
|
||||
|
||||
watcher.Events <- fsnotify.Event{
|
||||
Name: "/tmp/file.txt",
|
||||
Op: fsnotify.Create,
|
||||
}
|
||||
stopWatching := make(chan struct{}, 1)
|
||||
f := func(itmPath, itmID string) error {
|
||||
close(stopWatching)
|
||||
if itmPath != "/tmp" || itmID != "file.txt" {
|
||||
t.Errorf("Invalid directory or file")
|
||||
}
|
||||
return fmt.Errorf("Can't match path")
|
||||
}
|
||||
expected := "Can't match path"
|
||||
if err := watch(EmptyString, EmptyString, f, watcher, stopWatching); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expected %+v, received %+v", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
func testWatchDirValidPath(t *testing.T) {
|
||||
flPath := "/tmp/testWatchDirValidPath/"
|
||||
if err := os.MkdirAll(flPath, 0777); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := WatchDir(flPath, nil, "randomID", stopWatching); err != nil {
|
||||
|
||||
newFile, err := os.Create(path.Join(flPath, "file.txt"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
newFile.Close()
|
||||
|
||||
stopWatching := make(chan struct{}, 1)
|
||||
if err := WatchDir(path.Join(flPath, "file.txt"), nil, EmptyString, stopWatching); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if err := os.RemoveAll(flPath); err != nil {
|
||||
t.Fatal(err)
|
||||
if err := os.Remove(path.Join(flPath, "file.txt")); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := os.Remove(flPath); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testWatchDirNoError(t *testing.T) {
|
||||
func testWatchDirInvalidPath(t *testing.T) {
|
||||
flPath := "tmp/testWatchDirInvalidPath"
|
||||
stopWatching := make(chan struct{}, 1)
|
||||
flPath := "/tmp/inexistentDir"
|
||||
expectedErr := "no such file or directory"
|
||||
if err := WatchDir(flPath, nil, "randomID", stopWatching); err == nil || err.Error() != expectedErr {
|
||||
t.Errorf("Expected %+v, received %+v", expectedErr, err)
|
||||
expected := "no such file or directory"
|
||||
if err := WatchDir(flPath, nil, EmptyString, stopWatching); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expected %+v, received %+v", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
func testWatchNewWatcherError(t *testing.T) {
|
||||
newWatcher := func() (*fsnotify.Watcher, error) {
|
||||
return nil, fmt.Errorf("Invalid watcher")
|
||||
}
|
||||
stopWatching := make(chan struct{}, 1)
|
||||
expected := "Invalid watcher"
|
||||
if err := watchDir(EmptyString, nil, EmptyString, stopWatching, newWatcher); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expected %+v, received %+v", expected, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,23 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
|
||||
@@ -218,3 +218,16 @@ func TestSetCloneEmpty(t *testing.T) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expected, received)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetOne(t *testing.T) {
|
||||
set := StringSet{
|
||||
"test1": struct{}{},
|
||||
"test2": struct{}{},
|
||||
}
|
||||
value := set.GetOne()
|
||||
expected := "test1"
|
||||
if value != expected {
|
||||
t.Errorf("Expected %+v, received %+v", expected, value)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user