diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index e2cb0ab5a..0e83a4b8f 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -24,11 +24,11 @@ import ( "github.com/cgrates/cgrates/mediator" "github.com/cgrates/cgrates/rater" "io/ioutil" + "log" "net/http" ) var ( - Logger = rater.Logger cfg *config.CGRConfig // Share the configuration with the rest of the package storage rater.DataStorage medi *mediator.Mediator @@ -37,10 +37,11 @@ var ( func cdrHandler(w http.ResponseWriter, r *http.Request) { body, _ := ioutil.ReadAll(r.Body) if fsCdr, err := new(FSCdr).New(body); err == nil { - storage.SetCdr(fsCdr) - medi.MediateCdrFromDB(fsCdr.GetAccount(), storage) + log.Printf("CDR: %v", fsCdr) + //storage.SetCdr(fsCdr) + //medi.MediateCdrFromDB(fsCdr.GetAccount(), storage) } else { - Logger.Err(fmt.Sprintf("Could not create CDR entry: %v", err)) + rater.Logger.Err(fmt.Sprintf("Could not create CDR entry: %v", err)) } } diff --git a/inotify/inotify_linux.go b/inotify/inotify_linux.go deleted file mode 100644 index f989a9224..000000000 --- a/inotify/inotify_linux.go +++ /dev/null @@ -1,298 +0,0 @@ -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -/* -Package inotify implements a wrapper for the Linux inotify system. - -Example: - watcher, err := inotify.NewWatcher() - if err != nil { - log.Fatal(err) - } - err = watcher.Watch("/tmp") - if err != nil { - log.Fatal(err) - } - for { - select { - case ev := <-watcher.Event: - log.Println("event:", ev) - case err := <-watcher.Error: - log.Println("error:", err) - } - } - -*/ -package inotify - -import ( - "errors" - "fmt" - "os" - "strings" - "sync" - "syscall" - "unsafe" -) - -type Event struct { - Mask uint32 // Mask of events - Cookie uint32 // Unique cookie associating related events (for rename(2)) - Name string // File name (optional) -} - -type watch struct { - wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall) - flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags) -} - -type Watcher struct { - mu sync.Mutex - fd int // File descriptor (as returned by the inotify_init() syscall) - watches map[string]*watch // Map of inotify watches (key: path) - paths map[int]string // Map of watched paths (key: watch descriptor) - Error chan error // Errors are sent on this channel - Event chan *Event // Events are returned on this channel - done chan bool // Channel for sending a "quit message" to the reader goroutine - isClosed bool // Set to true when Close() is first called -} - -// NewWatcher creates and returns a new inotify instance using inotify_init(2) -func NewWatcher() (*Watcher, error) { - fd, errno := syscall.InotifyInit() - if fd == -1 { - return nil, os.NewSyscallError("inotify_init", errno) - } - w := &Watcher{ - fd: fd, - watches: make(map[string]*watch), - paths: make(map[int]string), - Event: make(chan *Event), - Error: make(chan error), - done: make(chan bool, 1), - } - - go w.readEvents() - return w, nil -} - -// Close closes an inotify watcher instance -// It sends a message to the reader goroutine to quit and removes all watches -// associated with the inotify instance -func (w *Watcher) Close() error { - if w.isClosed { - return nil - } - w.isClosed = true - - // Send "quit" message to the reader goroutine - w.done <- true - for path := range w.watches { - w.RemoveWatch(path) - } - - return nil -} - -// AddWatch adds path to the watched file set. -// The flags are interpreted as described in inotify_add_watch(2). -func (w *Watcher) AddWatch(path string, flags uint32) error { - if w.isClosed { - return errors.New("inotify instance already closed") - } - - watchEntry, found := w.watches[path] - if found { - watchEntry.flags |= flags - flags |= syscall.IN_MASK_ADD - } - - w.mu.Lock() // synchronize with readEvents goroutine - - wd, err := syscall.InotifyAddWatch(w.fd, path, flags) - if err != nil { - w.mu.Unlock() - return &os.PathError{ - Op: "inotify_add_watch", - Path: path, - Err: err, - } - } - - if !found { - w.watches[path] = &watch{wd: uint32(wd), flags: flags} - w.paths[wd] = path - } - w.mu.Unlock() - return nil -} - -// Watch adds path to the watched file set, watching all events. -func (w *Watcher) Watch(path string) error { - return w.AddWatch(path, IN_ALL_EVENTS) -} - -// RemoveWatch removes path from the watched file set. -func (w *Watcher) RemoveWatch(path string) error { - watch, ok := w.watches[path] - if !ok { - return errors.New(fmt.Sprintf("can't remove non-existent inotify watch for: %s", path)) - } - success, errno := syscall.InotifyRmWatch(w.fd, watch.wd) - if success == -1 { - return os.NewSyscallError("inotify_rm_watch", errno) - } - delete(w.watches, path) - return nil -} - -// readEvents reads from the inotify file descriptor, converts the -// received events into Event objects and sends them via the Event channel -func (w *Watcher) readEvents() { - var buf [syscall.SizeofInotifyEvent * 4096]byte - - for { - n, err := syscall.Read(w.fd, buf[0:]) - // See if there is a message on the "done" channel - var done bool - select { - case done = <-w.done: - default: - } - - // If EOF or a "done" message is received - if n == 0 || done { - err := syscall.Close(w.fd) - if err != nil { - w.Error <- os.NewSyscallError("close", err) - } - close(w.Event) - close(w.Error) - return - } - if n < 0 { - w.Error <- os.NewSyscallError("read", err) - continue - } - if n < syscall.SizeofInotifyEvent { - w.Error <- errors.New("inotify: short read in readEvents()") - continue - } - - var offset uint32 = 0 - // We don't know how many events we just read into the buffer - // While the offset points to at least one whole event... - for offset <= uint32(n-syscall.SizeofInotifyEvent) { - // Point "raw" to the event in the buffer - raw := (*syscall.InotifyEvent)(unsafe.Pointer(&buf[offset])) - event := new(Event) - event.Mask = uint32(raw.Mask) - event.Cookie = uint32(raw.Cookie) - nameLen := uint32(raw.Len) - // If the event happened to the watched directory or the watched file, the kernel - // doesn't append the filename to the event, but we would like to always fill the - // the "Name" field with a valid filename. We retrieve the path of the watch from - // the "paths" map. - w.mu.Lock() - event.Name = w.paths[int(raw.Wd)] - w.mu.Unlock() - if nameLen > 0 { - // Point "bytes" at the first byte of the filename - bytes := (*[syscall.PathMax]byte)(unsafe.Pointer(&buf[offset+syscall.SizeofInotifyEvent])) - // The filename is padded with NUL bytes. TrimRight() gets rid of those. - event.Name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000") - } - // Send the event on the events channel - w.Event <- event - - // Move to the next event in the buffer - offset += syscall.SizeofInotifyEvent + nameLen - } - } -} - -// String formats the event e in the form -// "filename: 0xEventMask = IN_ACCESS|IN_ATTRIB_|..." -func (e *Event) String() string { - var events string = "" - - m := e.Mask - for _, b := range eventBits { - if m&b.Value != 0 { - m &^= b.Value - events += "|" + b.Name - } - } - - if m != 0 { - events += fmt.Sprintf("|%#x", m) - } - if len(events) > 0 { - events = " == " + events[1:] - } - - return fmt.Sprintf("%q: %#x%s", e.Name, e.Mask, events) -} - -const ( - // Options for inotify_init() are not exported - // IN_CLOEXEC uint32 = syscall.IN_CLOEXEC - // IN_NONBLOCK uint32 = syscall.IN_NONBLOCK - - // Options for AddWatch - IN_DONT_FOLLOW uint32 = syscall.IN_DONT_FOLLOW - IN_ONESHOT uint32 = syscall.IN_ONESHOT - IN_ONLYDIR uint32 = syscall.IN_ONLYDIR - - // The "IN_MASK_ADD" option is not exported, as AddWatch - // adds it automatically, if there is already a watch for the given path - // IN_MASK_ADD uint32 = syscall.IN_MASK_ADD - - // Events - IN_ACCESS uint32 = syscall.IN_ACCESS - IN_ALL_EVENTS uint32 = syscall.IN_ALL_EVENTS - IN_ATTRIB uint32 = syscall.IN_ATTRIB - IN_CLOSE uint32 = syscall.IN_CLOSE - IN_CLOSE_NOWRITE uint32 = syscall.IN_CLOSE_NOWRITE - IN_CLOSE_WRITE uint32 = syscall.IN_CLOSE_WRITE - IN_CREATE uint32 = syscall.IN_CREATE - IN_DELETE uint32 = syscall.IN_DELETE - IN_DELETE_SELF uint32 = syscall.IN_DELETE_SELF - IN_MODIFY uint32 = syscall.IN_MODIFY - IN_MOVE uint32 = syscall.IN_MOVE - IN_MOVED_FROM uint32 = syscall.IN_MOVED_FROM - IN_MOVED_TO uint32 = syscall.IN_MOVED_TO - IN_MOVE_SELF uint32 = syscall.IN_MOVE_SELF - IN_OPEN uint32 = syscall.IN_OPEN - - // Special events - IN_ISDIR uint32 = syscall.IN_ISDIR - IN_IGNORED uint32 = syscall.IN_IGNORED - IN_Q_OVERFLOW uint32 = syscall.IN_Q_OVERFLOW - IN_UNMOUNT uint32 = syscall.IN_UNMOUNT -) - -var eventBits = []struct { - Value uint32 - Name string -}{ - {IN_ACCESS, "IN_ACCESS"}, - {IN_ATTRIB, "IN_ATTRIB"}, - {IN_CLOSE, "IN_CLOSE"}, - {IN_CLOSE_NOWRITE, "IN_CLOSE_NOWRITE"}, - {IN_CLOSE_WRITE, "IN_CLOSE_WRITE"}, - {IN_CREATE, "IN_CREATE"}, - {IN_DELETE, "IN_DELETE"}, - {IN_DELETE_SELF, "IN_DELETE_SELF"}, - {IN_MODIFY, "IN_MODIFY"}, - {IN_MOVE, "IN_MOVE"}, - {IN_MOVED_FROM, "IN_MOVED_FROM"}, - {IN_MOVED_TO, "IN_MOVED_TO"}, - {IN_MOVE_SELF, "IN_MOVE_SELF"}, - {IN_OPEN, "IN_OPEN"}, - {IN_ISDIR, "IN_ISDIR"}, - {IN_IGNORED, "IN_IGNORED"}, - {IN_Q_OVERFLOW, "IN_Q_OVERFLOW"}, - {IN_UNMOUNT, "IN_UNMOUNT"}, -} diff --git a/inotify/inotify_linux_test.go b/inotify/inotify_linux_test.go deleted file mode 100644 index 1685b772e..000000000 --- a/inotify/inotify_linux_test.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// +build linux - -package inotify - -import ( - "io/ioutil" - "os" - "sync/atomic" - "testing" - "time" -) - -func TestInotifyEvents(t *testing.T) { - // Create an inotify watcher instance and initialize it - watcher, err := NewWatcher() - if err != nil { - t.Fatalf("NewWatcher failed: %s", err) - } - - dir, err := ioutil.TempDir("", "inotify") - if err != nil { - t.Fatalf("TempDir failed: %s", err) - } - defer os.RemoveAll(dir) - - // Add a watch for "_test" - err = watcher.Watch(dir) - if err != nil { - t.Fatalf("Watch failed: %s", err) - } - - // Receive errors on the error channel on a separate goroutine - go func() { - for err := range watcher.Error { - t.Fatalf("error received: %s", err) - } - }() - - testFile := dir + "/TestInotifyEvents.testfile" - - // Receive events on the event channel on a separate goroutine - eventstream := watcher.Event - var eventsReceived int32 = 0 - done := make(chan bool) - go func() { - for event := range eventstream { - // Only count relevant events - if event.Name == testFile { - atomic.AddInt32(&eventsReceived, 1) - t.Logf("event received: %s", event) - } else { - t.Logf("unexpected event received: %s", event) - } - } - done <- true - }() - - // Create a file - // This should add at least one event to the inotify event queue - _, err = os.OpenFile(testFile, os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - t.Fatalf("creating test file: %s", err) - } - - // We expect this event to be received almost immediately, but let's wait 1 s to be sure - time.Sleep(1 * time.Second) - if atomic.AddInt32(&eventsReceived, 0) == 0 { - t.Fatal("inotify event hasn't been received after 1 second") - } - - // Try closing the inotify instance - t.Log("calling Close()") - watcher.Close() - t.Log("waiting for the event channel to become closed...") - select { - case <-done: - t.Log("event channel closed") - case <-time.After(1 * time.Second): - t.Fatal("event stream was not closed after 1 second") - } -} - -func TestInotifyClose(t *testing.T) { - watcher, _ := NewWatcher() - watcher.Close() - - done := make(chan bool) - go func() { - watcher.Close() - done <- true - }() - - select { - case <-done: - case <-time.After(50 * time.Millisecond): - t.Fatal("double Close() test failed: second Close() call didn't return") - } - - err := watcher.Watch(os.TempDir()) - if err == nil { - t.Fatal("expected error on Watch() after Close(), got nil") - } -} diff --git a/mediator/mediator.go b/mediator/mediator.go index 49c04d686..ea01deb7a 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -23,8 +23,8 @@ import ( "encoding/csv" "flag" "fmt" - "github.com/cgrates/cgrates/inotify" "github.com/cgrates/cgrates/rater" + "github.com/howeyc/fsnotify" "os" "path" "strconv" @@ -109,10 +109,11 @@ func (m *Mediator) validateIndexses() bool { // Watch the specified folder for file moves and parse the files on events func (m *Mediator) TrackCDRFiles(cdrPath string) (err error) { - watcher, err := inotify.NewWatcher() + watcher, err := fsnotify.NewWatcher() if err != nil { return } + defer watcher.Close() err = watcher.Watch(cdrPath) if err != nil { return @@ -121,7 +122,7 @@ func (m *Mediator) TrackCDRFiles(cdrPath string) (err error) { for { select { case ev := <-watcher.Event: - if ev.Mask&inotify.IN_MOVED_TO != 0 { + if ev.IsRename() { rater.Logger.Info(fmt.Sprintf("Parsing: %v", ev.Name)) err = m.parseCSV(ev.Name) if err != nil { diff --git a/rater/calldesc.go b/rater/calldesc.go index 62e7d1acd..2430ac4e2 100644 --- a/rater/calldesc.go +++ b/rater/calldesc.go @@ -31,6 +31,7 @@ import ( func init() { var err error Logger, err = syslog.New(syslog.LOG_INFO, "CGRateS") + Logger = new(utils.StdLogger) if err != nil { Logger = new(utils.StdLogger) } diff --git a/test.sh b/test.sh index 06be8cda1..c08cc8ad8 100755 --- a/test.sh +++ b/test.sh @@ -4,7 +4,6 @@ go test -i github.com/cgrates/cgrates/rater go test -i github.com/cgrates/cgrates/sessionmanager go test -i github.com/cgrates/cgrates/config go test -i github.com/cgrates/cgrates/cmd/cgr-rater -go test -i github.com/cgrates/cgrates/inotify go test -i github.com/cgrates/cgrates/mediator go test -i github.com/cgrates/fsock go test -i github.com/cgrates/cgrates/cdrs @@ -18,8 +17,6 @@ go test github.com/cgrates/cgrates/config cfg=$? go test github.com/cgrates/cgrates/cmd/cgr-rater cr=$? -go test github.com/cgrates/cgrates/inotify -it=$? go test github.com/cgrates/cgrates/mediator md=$? go test github.com/cgrates/cgrates/cdrs @@ -29,4 +26,4 @@ ut=$? go test github.com/cgrates//fsock fs=$? -exit $ts && $sm && $cfg && $bl && $cr && $it && $md && $cdr && $fs && $ut +exit $ts && $sm && $cfg && $bl && $cr && $md && $cdr && $fs && $ut diff --git a/update_external_libs.sh b/update_external_libs.sh index 7bd0e6ee7..8ca0e8d6c 100755 --- a/update_external_libs.sh +++ b/update_external_libs.sh @@ -9,3 +9,4 @@ go get -v -u github.com/cgrates/fsock go get -u -v github.com/go-sql-driver/mysql go get -u -v github.com/garyburd/redigo/redis go get -u -v menteslibres.net/gosexy/redis +go get -u -v github.com/howeyc/fsnotify