diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index 9b506a51f..cc0876582 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -23,6 +23,8 @@ import ( "errors" "flag" "fmt" + "github.com/cgrates/cgrates/mediator" + "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/timespans" "github.com/rif/balancer2go" @@ -98,6 +100,7 @@ var ( bal = balancer2go.NewBalancer() exitChan = make(chan bool) + sched = new(scheduler.Scheduler) ) // this function will reset to zero values the variables that are not present @@ -208,8 +211,8 @@ func startMediator(responder *timespans.Responder, loggerDb timespans.DataStorag } connector = &sessionmanager.RPCClientConnector{client} } - m := &Mediator{connector, loggerDb, mediator_skipdb} - m.parseCSV() + m := &mediator.Mediator{connector, loggerDb, mediator_skipdb} + m.ParseCSV(mediator_cdr_file) } func startSessionManager(responder *timespans.Responder, loggerDb timespans.DataStorage) { @@ -385,9 +388,9 @@ func main() { if scheduler_enabled { timespans.Logger.Info("Starting CGRateS scheduler.") go func() { - loadActionTimings(getter) + sched.LoadActionTimings(getter) go reloadSchedulerSingnalHandler(getter) - sched.loop() + sched.Loop() }() } diff --git a/cmd/cgr-rater/registration.go b/cmd/cgr-rater/registration.go index f21e994d1..ab44e0d0f 100644 --- a/cmd/cgr-rater/registration.go +++ b/cmd/cgr-rater/registration.go @@ -103,9 +103,8 @@ func reloadSchedulerSingnalHandler(getter timespans.DataStorage) { sig := <-c timespans.Logger.Info(fmt.Sprintf("Caught signal %v, reloading action timings.\n", sig)) - loadActionTimings(getter) + sched.LoadActionTimings(getter) // check the tip of the queue for new actions - restartLoop <- 1 - timer.Stop() + sched.Restart() } } diff --git a/cmd/cgr-rater/status_responder.go b/cmd/cgr-rater/status_responder.go index e886532c6..5849cfe03 100644 --- a/cmd/cgr-rater/status_responder.go +++ b/cmd/cgr-rater/status_responder.go @@ -15,6 +15,7 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ + package main import ( diff --git a/inotify/inotify_linux.go b/inotify/inotify_linux.go new file mode 100644 index 000000000..f989a9224 --- /dev/null +++ b/inotify/inotify_linux.go @@ -0,0 +1,298 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +/* +Package inotify implements a wrapper for the Linux inotify system. + +Example: + watcher, err := inotify.NewWatcher() + if err != nil { + log.Fatal(err) + } + err = watcher.Watch("/tmp") + if err != nil { + log.Fatal(err) + } + for { + select { + case ev := <-watcher.Event: + log.Println("event:", ev) + case err := <-watcher.Error: + log.Println("error:", err) + } + } + +*/ +package inotify + +import ( + "errors" + "fmt" + "os" + "strings" + "sync" + "syscall" + "unsafe" +) + +type Event struct { + Mask uint32 // Mask of events + Cookie uint32 // Unique cookie associating related events (for rename(2)) + Name string // File name (optional) +} + +type watch struct { + wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall) + flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags) +} + +type Watcher struct { + mu sync.Mutex + fd int // File descriptor (as returned by the inotify_init() syscall) + watches map[string]*watch // Map of inotify watches (key: path) + paths map[int]string // Map of watched paths (key: watch descriptor) + Error chan error // Errors are sent on this channel + Event chan *Event // Events are returned on this channel + done chan bool // Channel for sending a "quit message" to the reader goroutine + isClosed bool // Set to true when Close() is first called +} + +// NewWatcher creates and returns a new inotify instance using inotify_init(2) +func NewWatcher() (*Watcher, error) { + fd, errno := syscall.InotifyInit() + if fd == -1 { + return nil, os.NewSyscallError("inotify_init", errno) + } + w := &Watcher{ + fd: fd, + watches: make(map[string]*watch), + paths: make(map[int]string), + Event: make(chan *Event), + Error: make(chan error), + done: make(chan bool, 1), + } + + go w.readEvents() + return w, nil +} + +// Close closes an inotify watcher instance +// It sends a message to the reader goroutine to quit and removes all watches +// associated with the inotify instance +func (w *Watcher) Close() error { + if w.isClosed { + return nil + } + w.isClosed = true + + // Send "quit" message to the reader goroutine + w.done <- true + for path := range w.watches { + w.RemoveWatch(path) + } + + return nil +} + +// AddWatch adds path to the watched file set. +// The flags are interpreted as described in inotify_add_watch(2). +func (w *Watcher) AddWatch(path string, flags uint32) error { + if w.isClosed { + return errors.New("inotify instance already closed") + } + + watchEntry, found := w.watches[path] + if found { + watchEntry.flags |= flags + flags |= syscall.IN_MASK_ADD + } + + w.mu.Lock() // synchronize with readEvents goroutine + + wd, err := syscall.InotifyAddWatch(w.fd, path, flags) + if err != nil { + w.mu.Unlock() + return &os.PathError{ + Op: "inotify_add_watch", + Path: path, + Err: err, + } + } + + if !found { + w.watches[path] = &watch{wd: uint32(wd), flags: flags} + w.paths[wd] = path + } + w.mu.Unlock() + return nil +} + +// Watch adds path to the watched file set, watching all events. +func (w *Watcher) Watch(path string) error { + return w.AddWatch(path, IN_ALL_EVENTS) +} + +// RemoveWatch removes path from the watched file set. +func (w *Watcher) RemoveWatch(path string) error { + watch, ok := w.watches[path] + if !ok { + return errors.New(fmt.Sprintf("can't remove non-existent inotify watch for: %s", path)) + } + success, errno := syscall.InotifyRmWatch(w.fd, watch.wd) + if success == -1 { + return os.NewSyscallError("inotify_rm_watch", errno) + } + delete(w.watches, path) + return nil +} + +// readEvents reads from the inotify file descriptor, converts the +// received events into Event objects and sends them via the Event channel +func (w *Watcher) readEvents() { + var buf [syscall.SizeofInotifyEvent * 4096]byte + + for { + n, err := syscall.Read(w.fd, buf[0:]) + // See if there is a message on the "done" channel + var done bool + select { + case done = <-w.done: + default: + } + + // If EOF or a "done" message is received + if n == 0 || done { + err := syscall.Close(w.fd) + if err != nil { + w.Error <- os.NewSyscallError("close", err) + } + close(w.Event) + close(w.Error) + return + } + if n < 0 { + w.Error <- os.NewSyscallError("read", err) + continue + } + if n < syscall.SizeofInotifyEvent { + w.Error <- errors.New("inotify: short read in readEvents()") + continue + } + + var offset uint32 = 0 + // We don't know how many events we just read into the buffer + // While the offset points to at least one whole event... + for offset <= uint32(n-syscall.SizeofInotifyEvent) { + // Point "raw" to the event in the buffer + raw := (*syscall.InotifyEvent)(unsafe.Pointer(&buf[offset])) + event := new(Event) + event.Mask = uint32(raw.Mask) + event.Cookie = uint32(raw.Cookie) + nameLen := uint32(raw.Len) + // If the event happened to the watched directory or the watched file, the kernel + // doesn't append the filename to the event, but we would like to always fill the + // the "Name" field with a valid filename. We retrieve the path of the watch from + // the "paths" map. + w.mu.Lock() + event.Name = w.paths[int(raw.Wd)] + w.mu.Unlock() + if nameLen > 0 { + // Point "bytes" at the first byte of the filename + bytes := (*[syscall.PathMax]byte)(unsafe.Pointer(&buf[offset+syscall.SizeofInotifyEvent])) + // The filename is padded with NUL bytes. TrimRight() gets rid of those. + event.Name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000") + } + // Send the event on the events channel + w.Event <- event + + // Move to the next event in the buffer + offset += syscall.SizeofInotifyEvent + nameLen + } + } +} + +// String formats the event e in the form +// "filename: 0xEventMask = IN_ACCESS|IN_ATTRIB_|..." +func (e *Event) String() string { + var events string = "" + + m := e.Mask + for _, b := range eventBits { + if m&b.Value != 0 { + m &^= b.Value + events += "|" + b.Name + } + } + + if m != 0 { + events += fmt.Sprintf("|%#x", m) + } + if len(events) > 0 { + events = " == " + events[1:] + } + + return fmt.Sprintf("%q: %#x%s", e.Name, e.Mask, events) +} + +const ( + // Options for inotify_init() are not exported + // IN_CLOEXEC uint32 = syscall.IN_CLOEXEC + // IN_NONBLOCK uint32 = syscall.IN_NONBLOCK + + // Options for AddWatch + IN_DONT_FOLLOW uint32 = syscall.IN_DONT_FOLLOW + IN_ONESHOT uint32 = syscall.IN_ONESHOT + IN_ONLYDIR uint32 = syscall.IN_ONLYDIR + + // The "IN_MASK_ADD" option is not exported, as AddWatch + // adds it automatically, if there is already a watch for the given path + // IN_MASK_ADD uint32 = syscall.IN_MASK_ADD + + // Events + IN_ACCESS uint32 = syscall.IN_ACCESS + IN_ALL_EVENTS uint32 = syscall.IN_ALL_EVENTS + IN_ATTRIB uint32 = syscall.IN_ATTRIB + IN_CLOSE uint32 = syscall.IN_CLOSE + IN_CLOSE_NOWRITE uint32 = syscall.IN_CLOSE_NOWRITE + IN_CLOSE_WRITE uint32 = syscall.IN_CLOSE_WRITE + IN_CREATE uint32 = syscall.IN_CREATE + IN_DELETE uint32 = syscall.IN_DELETE + IN_DELETE_SELF uint32 = syscall.IN_DELETE_SELF + IN_MODIFY uint32 = syscall.IN_MODIFY + IN_MOVE uint32 = syscall.IN_MOVE + IN_MOVED_FROM uint32 = syscall.IN_MOVED_FROM + IN_MOVED_TO uint32 = syscall.IN_MOVED_TO + IN_MOVE_SELF uint32 = syscall.IN_MOVE_SELF + IN_OPEN uint32 = syscall.IN_OPEN + + // Special events + IN_ISDIR uint32 = syscall.IN_ISDIR + IN_IGNORED uint32 = syscall.IN_IGNORED + IN_Q_OVERFLOW uint32 = syscall.IN_Q_OVERFLOW + IN_UNMOUNT uint32 = syscall.IN_UNMOUNT +) + +var eventBits = []struct { + Value uint32 + Name string +}{ + {IN_ACCESS, "IN_ACCESS"}, + {IN_ATTRIB, "IN_ATTRIB"}, + {IN_CLOSE, "IN_CLOSE"}, + {IN_CLOSE_NOWRITE, "IN_CLOSE_NOWRITE"}, + {IN_CLOSE_WRITE, "IN_CLOSE_WRITE"}, + {IN_CREATE, "IN_CREATE"}, + {IN_DELETE, "IN_DELETE"}, + {IN_DELETE_SELF, "IN_DELETE_SELF"}, + {IN_MODIFY, "IN_MODIFY"}, + {IN_MOVE, "IN_MOVE"}, + {IN_MOVED_FROM, "IN_MOVED_FROM"}, + {IN_MOVED_TO, "IN_MOVED_TO"}, + {IN_MOVE_SELF, "IN_MOVE_SELF"}, + {IN_OPEN, "IN_OPEN"}, + {IN_ISDIR, "IN_ISDIR"}, + {IN_IGNORED, "IN_IGNORED"}, + {IN_Q_OVERFLOW, "IN_Q_OVERFLOW"}, + {IN_UNMOUNT, "IN_UNMOUNT"}, +} diff --git a/inotify/inotify_linux_test.go b/inotify/inotify_linux_test.go new file mode 100644 index 000000000..1685b772e --- /dev/null +++ b/inotify/inotify_linux_test.go @@ -0,0 +1,107 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build linux + +package inotify + +import ( + "io/ioutil" + "os" + "sync/atomic" + "testing" + "time" +) + +func TestInotifyEvents(t *testing.T) { + // Create an inotify watcher instance and initialize it + watcher, err := NewWatcher() + if err != nil { + t.Fatalf("NewWatcher failed: %s", err) + } + + dir, err := ioutil.TempDir("", "inotify") + if err != nil { + t.Fatalf("TempDir failed: %s", err) + } + defer os.RemoveAll(dir) + + // Add a watch for "_test" + err = watcher.Watch(dir) + if err != nil { + t.Fatalf("Watch failed: %s", err) + } + + // Receive errors on the error channel on a separate goroutine + go func() { + for err := range watcher.Error { + t.Fatalf("error received: %s", err) + } + }() + + testFile := dir + "/TestInotifyEvents.testfile" + + // Receive events on the event channel on a separate goroutine + eventstream := watcher.Event + var eventsReceived int32 = 0 + done := make(chan bool) + go func() { + for event := range eventstream { + // Only count relevant events + if event.Name == testFile { + atomic.AddInt32(&eventsReceived, 1) + t.Logf("event received: %s", event) + } else { + t.Logf("unexpected event received: %s", event) + } + } + done <- true + }() + + // Create a file + // This should add at least one event to the inotify event queue + _, err = os.OpenFile(testFile, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + t.Fatalf("creating test file: %s", err) + } + + // We expect this event to be received almost immediately, but let's wait 1 s to be sure + time.Sleep(1 * time.Second) + if atomic.AddInt32(&eventsReceived, 0) == 0 { + t.Fatal("inotify event hasn't been received after 1 second") + } + + // Try closing the inotify instance + t.Log("calling Close()") + watcher.Close() + t.Log("waiting for the event channel to become closed...") + select { + case <-done: + t.Log("event channel closed") + case <-time.After(1 * time.Second): + t.Fatal("event stream was not closed after 1 second") + } +} + +func TestInotifyClose(t *testing.T) { + watcher, _ := NewWatcher() + watcher.Close() + + done := make(chan bool) + go func() { + watcher.Close() + done <- true + }() + + select { + case <-done: + case <-time.After(50 * time.Millisecond): + t.Fatal("double Close() test failed: second Close() call didn't return") + } + + err := watcher.Watch(os.TempDir()) + if err == nil { + t.Fatal("expected error on Watch() after Close(), got nil") + } +} diff --git a/cmd/cgr-rater/mediator.go b/mediator/mediator.go similarity index 88% rename from cmd/cgr-rater/mediator.go rename to mediator/mediator.go index ec58efb75..ca443baf8 100644 --- a/cmd/cgr-rater/mediator.go +++ b/mediator/mediator.go @@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package main +package mediator import ( "bufio" @@ -31,17 +31,13 @@ import ( type Mediator struct { Connector sessionmanager.Connector - loggerDb timespans.DataStorage + LoggerDb timespans.DataStorage SkipDb bool } -/*func readDbRecord(db *sql.DB, searchedUUID string) (cc *timespans.CallCost, timespansText string, err error) { - -}*/ - -func (m *Mediator) parseCSV() { +func (m *Mediator) ParseCSV(cdrfn string) { flag.Parse() - file, err := os.Open(mediator_cdr_file) + file, err := os.Open(cdrfn) defer file.Close() if err != nil { timespans.Logger.Crit(err.Error()) @@ -67,7 +63,7 @@ func (m *Mediator) parseCSV() { func (m *Mediator) GetCostsFromDB(record []string) (cc *timespans.CallCost, err error) { searchedUUID := record[10] - cc, err = m.loggerDb.GetCallCostLog(searchedUUID) + cc, err = m.LoggerDb.GetCallCostLog(searchedUUID) if err != nil { cc, err = m.GetCostsFromRater(record) } @@ -82,6 +78,7 @@ func (m *Mediator) GetCostsFromRater(record []string) (cc *timespans.CallCost, e t2, _ := time.Parse("2012-05-21 17:48:20", record[6]) cd := timespans.CallDescriptor{ Direction: "OUT", + Account: subject, Tenant: tenant, TOR: "0", Subject: subject, diff --git a/cmd/cgr-rater/scheduler.go b/scheduler/scheduler.go similarity index 75% rename from cmd/cgr-rater/scheduler.go rename to scheduler/scheduler.go index e6a9274dc..44b25c53c 100644 --- a/cmd/cgr-rater/scheduler.go +++ b/scheduler/scheduler.go @@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package main +package scheduler import ( "fmt" @@ -25,38 +25,38 @@ import ( "time" ) -var ( - sched = new(scheduler) +type Scheduler struct { + queue timespans.ActionTimingPriotityList timer *time.Timer - restartLoop = make(chan byte) -) - -type scheduler struct { - queue timespans.ActionTimingPriotityList + restartLoop chan bool } -func (s scheduler) loop() { +func NewScheduler() *Scheduler { + return &Scheduler{restartLoop: make(chan bool)} +} + +func (s *Scheduler) Loop() { for { if len(s.queue) == 0 { - <-restartLoop + <-s.restartLoop } a0 := s.queue[0] now := time.Now() if a0.GetNextStartTime().Equal(now) || a0.GetNextStartTime().Before(now) { timespans.Logger.Debug(fmt.Sprintf("%v - %v", a0.Tag, a0.Timing)) go a0.Execute() - sched.queue = append(s.queue, a0) - sched.queue = s.queue[1:] - sort.Sort(sched.queue) + s.queue = append(s.queue, a0) + s.queue = s.queue[1:] + sort.Sort(s.queue) } else { d := a0.GetNextStartTime().Sub(now) timespans.Logger.Info(fmt.Sprintf("Timer set to wait for %v", d)) - timer = time.NewTimer(d) + s.timer = time.NewTimer(d) select { - case <-timer.C: + case <-s.timer.C: // timer has expired timespans.Logger.Info(fmt.Sprintf("Time for action on %v", s.queue[0])) - case <-restartLoop: + case <-s.restartLoop: // nothing to do, just continue the loop } @@ -64,13 +64,13 @@ func (s scheduler) loop() { } } -func loadActionTimings(storage timespans.DataStorage) { +func (s *Scheduler) LoadActionTimings(storage timespans.DataStorage) { actionTimings, err := storage.GetAllActionTimings() if err != nil { timespans.Logger.Warning(fmt.Sprintf("Cannot get action timings: %v", err)) } // recreate the queue - sched.queue = timespans.ActionTimingPriotityList{} + s.queue = timespans.ActionTimingPriotityList{} for key, ats := range actionTimings { toBeSaved := false isAsap := false @@ -83,7 +83,7 @@ func loadActionTimings(storage timespans.DataStorage) { go at.Execute() // do not append it to the newAts list to be saved } else { - sched.queue = append(sched.queue, at) + s.queue = append(s.queue, at) newAts = append(newAts, at) } } @@ -91,5 +91,10 @@ func loadActionTimings(storage timespans.DataStorage) { storage.SetActionTimings(key, newAts) } } - sort.Sort(sched.queue) + sort.Sort(s.queue) +} + +func (s *Scheduler) Restart() { + s.restartLoop <- true + s.timer.Stop() } diff --git a/test.sh b/test.sh index 3efffb0ae..ebea9d173 100755 --- a/test.sh +++ b/test.sh @@ -6,7 +6,7 @@ go test github.com/cgrates/cgrates/sessionmanager sm=$? go test github.com/cgrates/cgrates/cmd/cgr-rater cr=$? +go test github.com/cgrates/cgrates/inotify +in=$? -exit $ts || $sm || $bl || $cr - - +exit $ts || $sm || $bl || $cr || $in diff --git a/timespans/actions_test.go b/timespans/actions_test.go index 4a5c9e3bb..8b49ed48e 100644 --- a/timespans/actions_test.go +++ b/timespans/actions_test.go @@ -56,7 +56,6 @@ func TestActionTimingStoreRestore(t *testing.T) { t.Errorf("Expected %v was %v", at, o) } } - func TestActionTriggerStoreRestore(t *testing.T) { at := &ActionTrigger{ Id: "some_uuid", @@ -314,6 +313,7 @@ func TestActionTimingHourMonthdaysMonthYear(t *testing.T) { month = nextMonth.Month() } } + nextDay = time.Date(y, month, day, 10, 1, 0, 0, time.Local) year := now.Year() if nextDay.Before(now) { if now.After(testTime) {