diff --git a/cmd/cgr-loader/actions.go b/cmd/cgr-loader/actions.go index e15c7a138..89fd683c7 100644 --- a/cmd/cgr-loader/actions.go +++ b/cmd/cgr-loader/actions.go @@ -121,6 +121,7 @@ func loadActionTimings() { } for _, t := range ts { at := ×pans.ActionTiming{ + Tag: record[2], Timing: ×pans.Interval{ Months: t.Months, MonthDays: t.MonthDays, diff --git a/cmd/cgr-loader/helpers.go b/cmd/cgr-loader/helpers.go index c628c2735..ea7ede0bd 100644 --- a/cmd/cgr-loader/helpers.go +++ b/cmd/cgr-loader/helpers.go @@ -23,6 +23,7 @@ import ( "log" "strconv" "time" + "strings" ) type Rate struct { @@ -74,7 +75,7 @@ func NewTiming(timeingInfo ...string) (rt *Timing) { rt.MonthDays.Parse(timeingInfo[1], ";") rt.WeekDays.Parse(timeingInfo[2], ";") if timeingInfo[3] == "*now" { - rt.StartTime = time.Now().Format("00:00:00") + rt.StartTime = strings.Split(time.Now().Format(time.Stamp), " ")[2] } else { rt.StartTime = timeingInfo[3] } diff --git a/cmd/cgr-scheduler/cgr-scheduler.go b/cmd/cgr-scheduler/cgr-scheduler.go index 0c00de108..b4bae27eb 100644 --- a/cmd/cgr-scheduler/cgr-scheduler.go +++ b/cmd/cgr-scheduler/cgr-scheduler.go @@ -34,7 +34,10 @@ var ( redisserver = flag.String("redisserver", "tcp:127.0.0.1:6379", "redis server address (tcp:127.0.0.1:6379)") redisdb = flag.Int("rdb", 10, "redis database number (10)") redispass = flag.String("pass", "", "redis database password") + storage timespans.StorageGetter timer *time.Timer + restartLoop = make(chan byte) + s = scheduler{} ) /* @@ -63,14 +66,24 @@ func (s scheduler) loop() { a0 := s.queue[0] now := time.Now() if a0.GetNextStartTime().Equal(now) || a0.GetNextStartTime().Before(now) { + log.Printf("%v - %v", a0.Tag, a0.Timing) + log.Print(a0.GetNextStartTime(), now) go a0.Execute() s.queue = append(s.queue, a0) s.queue = s.queue[1:] sort.Sort(s.queue) } else { d := a0.GetNextStartTime().Sub(now) + log.Printf("Timer set to wait for %v", d) timer = time.NewTimer(d) - <-timer.C + select { + case <-timer.C: + // timer has expired + log.Printf("Time for action on %v", s.queue[0]) + case <-restartLoop: + // nothing to do, just continue the loop + } + } } } @@ -78,32 +91,46 @@ func (s scheduler) loop() { // Listens for the HUP system signal and gracefuly reloads the timers from database. func stopSingnalHandler() { log.Print("Handling HUP signal...") - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGHUP) - sig := <-c + for { + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGHUP) + sig := <-c - log.Printf("Caught signal %v, reloading action timings.\n", sig) - loadActionTimings() + log.Printf("Caught signal %v, reloading action timings.\n", sig) + loadActionTimings() + // check the tip of the queue for new actions + restartLoop <- 1 + timer.Stop() + } } -func loadActionTimings() (actionTimings []*timespans.ActionTiming, err error) { - storage, err := timespans.NewRedisStorage(*redisserver, *redisdb) - defer storage.Close() +func loadActionTimings() { + actionTimings, err := storage.GetAllActionTimings() if err != nil { - log.Fatalf("Could not open database connection: %v", err) + log.Fatalf("Cannot get action timings:", err) } - actionTimings, err = storage.GetAllActionTimings() - return + // recreate the queue + s.queue = actiontimingqueue{} + for _, at := range actionTimings { + if at.IsOneTimeRun() { + go at.Execute() + continue + } + s.queue = append(s.queue, at) + } + sort.Sort(s.queue) } func main() { flag.Parse() - actionTimings, err := loadActionTimings() + var err error + storage, err = timespans.NewRedisStorage(*redisserver, *redisdb) if err != nil { - log.Fatalf("Cannot get action timings:", err) + log.Fatalf("Could not open database connection: %v", err) } - s := scheduler{} - s.queue = append(s.queue, actionTimings...) + defer storage.Close() + timespans.SetStorageGetter(storage) + loadActionTimings() go stopSingnalHandler() s.loop() } diff --git a/timespans/actions.go b/timespans/actions.go index b063d7cd2..c45b83110 100644 --- a/timespans/actions.go +++ b/timespans/actions.go @@ -141,6 +141,7 @@ type ActionTrigger struct { } type ActionTiming struct { + Tag string // informative purpos only UserBalanceIds []string Timing *Interval ActionsId string @@ -192,7 +193,7 @@ func (at *ActionTiming) GetNextStartTime() (t time.Time) { for _, j := range []int{0, 1, 2, 3, 4, 5, 6} { t = time.Date(t.Year(), t.Month(), t.Day()+j, t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), t.Location()) for _, wd := range i.WeekDays { - if t.Weekday() == wd { + if t.Weekday() == wd && (t.Equal(now) || t.After(now)) { return } } @@ -206,7 +207,7 @@ func (at *ActionTiming) GetNextStartTime() (t time.Time) { d = i.MonthDays[0] if x < len(i.MonthDays) { if i.MonthDays[x] == now.Day() { - if now.Before(t) { + if t.Equal(now) || t.After(now) { h, m, s := t.Clock() t = time.Date(now.Year(), now.Month(), now.Day(), h, m, s, 0, time.Local) goto MONTHS @@ -229,7 +230,7 @@ MONTHS: m = i.Months[0] if x < len(i.Months) { if i.Months[x] == now.Month() { - if now.Before(t) { + if t.Equal(now) || t.After(now) { h, m, s := t.Clock() t = time.Date(now.Year(), now.Month(), t.Day(), h, m, s, 0, time.Local) return @@ -272,3 +273,11 @@ func (at *ActionTiming) Execute() (err error) { } return } + +func (at *ActionTiming) IsOneTimeRun() bool { + i := at.Timing + if i == nil { + return true + } + return len(i.Months) == 0 && len(i.MonthDays) == 0 && len(i.WeekDays) == 0 +} diff --git a/timespans/actions_test.go b/timespans/actions_test.go index cba21957d..5cd52e915 100644 --- a/timespans/actions_test.go +++ b/timespans/actions_test.go @@ -162,7 +162,55 @@ func TestActionTimingHourMonthdaysMonths(t *testing.T) { } } -func TestLogFunction(t *testing.T) { +func TestActionTimingFisrtOfTheMonth(t *testing.T) { + now := time.Now() + y, m, _ := now.Date() + nextMonth := time.Date(y, m+1, 1, 0, 0, 0, 0, time.Local) + at := &ActionTiming{Timing: &Interval{ + Months: Months{time.January, time.February, time.March, time.April, time.May, time.June, time.July, time.August, time.September, time.October, time.November, time.December}, + MonthDays: MonthDays{1}, + StartTime: "00:00:00", + }} + st := at.GetNextStartTime() + expected := nextMonth + if !st.Equal(expected) { + t.Errorf("Expected %v was %v", expected, st) + } +} + +func TestActionTimingIsOneTimeRunNoInterval(t *testing.T) { + at := &ActionTiming{} + if !at.IsOneTimeRun() { + t.Errorf("%v should be one time run!", at) + } +} + +func TestActionTimingIsOneTimeRunNothing(t *testing.T) { + at := &ActionTiming{Timing: &Interval{}} + if !at.IsOneTimeRun() { + t.Errorf("%v should be one time run!", at) + } +} + +func TestActionTimingIsOneTimeRunStartTime(t *testing.T) { + at := &ActionTiming{Timing: &Interval{ + StartTime: "00:00:00", + }} + if !at.IsOneTimeRun() { + t.Errorf("%v should be one time run!", at) + } +} + +func TestActionTimingIsOneTimeRunWeekDay(t *testing.T) { + at := &ActionTiming{Timing: &Interval{ + WeekDays: WeekDays{time.Monday}, + }} + if at.IsOneTimeRun() { + t.Errorf("%v should NOT be one time run!", at) + } +} + +func TestActionTimingLogFunction(t *testing.T) { a := &Action{ ActionType: "LOG", BalanceId: "test",