mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 12:49:54 +05:00
execute one time run events, more tests, better scheduler
This commit is contained in:
@@ -121,6 +121,7 @@ func loadActionTimings() {
|
||||
}
|
||||
for _, t := range ts {
|
||||
at := ×pans.ActionTiming{
|
||||
Tag: record[2],
|
||||
Timing: ×pans.Interval{
|
||||
Months: t.Months,
|
||||
MonthDays: t.MonthDays,
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Rate struct {
|
||||
@@ -74,7 +75,7 @@ func NewTiming(timeingInfo ...string) (rt *Timing) {
|
||||
rt.MonthDays.Parse(timeingInfo[1], ";")
|
||||
rt.WeekDays.Parse(timeingInfo[2], ";")
|
||||
if timeingInfo[3] == "*now" {
|
||||
rt.StartTime = time.Now().Format("00:00:00")
|
||||
rt.StartTime = strings.Split(time.Now().Format(time.Stamp), " ")[2]
|
||||
} else {
|
||||
rt.StartTime = timeingInfo[3]
|
||||
}
|
||||
|
||||
@@ -34,7 +34,10 @@ var (
|
||||
redisserver = flag.String("redisserver", "tcp:127.0.0.1:6379", "redis server address (tcp:127.0.0.1:6379)")
|
||||
redisdb = flag.Int("rdb", 10, "redis database number (10)")
|
||||
redispass = flag.String("pass", "", "redis database password")
|
||||
storage timespans.StorageGetter
|
||||
timer *time.Timer
|
||||
restartLoop = make(chan byte)
|
||||
s = scheduler{}
|
||||
)
|
||||
|
||||
/*
|
||||
@@ -63,14 +66,24 @@ func (s scheduler) loop() {
|
||||
a0 := s.queue[0]
|
||||
now := time.Now()
|
||||
if a0.GetNextStartTime().Equal(now) || a0.GetNextStartTime().Before(now) {
|
||||
log.Printf("%v - %v", a0.Tag, a0.Timing)
|
||||
log.Print(a0.GetNextStartTime(), now)
|
||||
go a0.Execute()
|
||||
s.queue = append(s.queue, a0)
|
||||
s.queue = s.queue[1:]
|
||||
sort.Sort(s.queue)
|
||||
} else {
|
||||
d := a0.GetNextStartTime().Sub(now)
|
||||
log.Printf("Timer set to wait for %v", d)
|
||||
timer = time.NewTimer(d)
|
||||
<-timer.C
|
||||
select {
|
||||
case <-timer.C:
|
||||
// timer has expired
|
||||
log.Printf("Time for action on %v", s.queue[0])
|
||||
case <-restartLoop:
|
||||
// nothing to do, just continue the loop
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -78,32 +91,46 @@ func (s scheduler) loop() {
|
||||
// Listens for the HUP system signal and gracefuly reloads the timers from database.
|
||||
func stopSingnalHandler() {
|
||||
log.Print("Handling HUP signal...")
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGHUP)
|
||||
sig := <-c
|
||||
for {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGHUP)
|
||||
sig := <-c
|
||||
|
||||
log.Printf("Caught signal %v, reloading action timings.\n", sig)
|
||||
loadActionTimings()
|
||||
log.Printf("Caught signal %v, reloading action timings.\n", sig)
|
||||
loadActionTimings()
|
||||
// check the tip of the queue for new actions
|
||||
restartLoop <- 1
|
||||
timer.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func loadActionTimings() (actionTimings []*timespans.ActionTiming, err error) {
|
||||
storage, err := timespans.NewRedisStorage(*redisserver, *redisdb)
|
||||
defer storage.Close()
|
||||
func loadActionTimings() {
|
||||
actionTimings, err := storage.GetAllActionTimings()
|
||||
if err != nil {
|
||||
log.Fatalf("Could not open database connection: %v", err)
|
||||
log.Fatalf("Cannot get action timings:", err)
|
||||
}
|
||||
actionTimings, err = storage.GetAllActionTimings()
|
||||
return
|
||||
// recreate the queue
|
||||
s.queue = actiontimingqueue{}
|
||||
for _, at := range actionTimings {
|
||||
if at.IsOneTimeRun() {
|
||||
go at.Execute()
|
||||
continue
|
||||
}
|
||||
s.queue = append(s.queue, at)
|
||||
}
|
||||
sort.Sort(s.queue)
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
actionTimings, err := loadActionTimings()
|
||||
var err error
|
||||
storage, err = timespans.NewRedisStorage(*redisserver, *redisdb)
|
||||
if err != nil {
|
||||
log.Fatalf("Cannot get action timings:", err)
|
||||
log.Fatalf("Could not open database connection: %v", err)
|
||||
}
|
||||
s := scheduler{}
|
||||
s.queue = append(s.queue, actionTimings...)
|
||||
defer storage.Close()
|
||||
timespans.SetStorageGetter(storage)
|
||||
loadActionTimings()
|
||||
go stopSingnalHandler()
|
||||
s.loop()
|
||||
}
|
||||
|
||||
@@ -141,6 +141,7 @@ type ActionTrigger struct {
|
||||
}
|
||||
|
||||
type ActionTiming struct {
|
||||
Tag string // informative purpos only
|
||||
UserBalanceIds []string
|
||||
Timing *Interval
|
||||
ActionsId string
|
||||
@@ -192,7 +193,7 @@ func (at *ActionTiming) GetNextStartTime() (t time.Time) {
|
||||
for _, j := range []int{0, 1, 2, 3, 4, 5, 6} {
|
||||
t = time.Date(t.Year(), t.Month(), t.Day()+j, t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), t.Location())
|
||||
for _, wd := range i.WeekDays {
|
||||
if t.Weekday() == wd {
|
||||
if t.Weekday() == wd && (t.Equal(now) || t.After(now)) {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -206,7 +207,7 @@ func (at *ActionTiming) GetNextStartTime() (t time.Time) {
|
||||
d = i.MonthDays[0]
|
||||
if x < len(i.MonthDays) {
|
||||
if i.MonthDays[x] == now.Day() {
|
||||
if now.Before(t) {
|
||||
if t.Equal(now) || t.After(now) {
|
||||
h, m, s := t.Clock()
|
||||
t = time.Date(now.Year(), now.Month(), now.Day(), h, m, s, 0, time.Local)
|
||||
goto MONTHS
|
||||
@@ -229,7 +230,7 @@ MONTHS:
|
||||
m = i.Months[0]
|
||||
if x < len(i.Months) {
|
||||
if i.Months[x] == now.Month() {
|
||||
if now.Before(t) {
|
||||
if t.Equal(now) || t.After(now) {
|
||||
h, m, s := t.Clock()
|
||||
t = time.Date(now.Year(), now.Month(), t.Day(), h, m, s, 0, time.Local)
|
||||
return
|
||||
@@ -272,3 +273,11 @@ func (at *ActionTiming) Execute() (err error) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (at *ActionTiming) IsOneTimeRun() bool {
|
||||
i := at.Timing
|
||||
if i == nil {
|
||||
return true
|
||||
}
|
||||
return len(i.Months) == 0 && len(i.MonthDays) == 0 && len(i.WeekDays) == 0
|
||||
}
|
||||
|
||||
@@ -162,7 +162,55 @@ func TestActionTimingHourMonthdaysMonths(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogFunction(t *testing.T) {
|
||||
func TestActionTimingFisrtOfTheMonth(t *testing.T) {
|
||||
now := time.Now()
|
||||
y, m, _ := now.Date()
|
||||
nextMonth := time.Date(y, m+1, 1, 0, 0, 0, 0, time.Local)
|
||||
at := &ActionTiming{Timing: &Interval{
|
||||
Months: Months{time.January, time.February, time.March, time.April, time.May, time.June, time.July, time.August, time.September, time.October, time.November, time.December},
|
||||
MonthDays: MonthDays{1},
|
||||
StartTime: "00:00:00",
|
||||
}}
|
||||
st := at.GetNextStartTime()
|
||||
expected := nextMonth
|
||||
if !st.Equal(expected) {
|
||||
t.Errorf("Expected %v was %v", expected, st)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActionTimingIsOneTimeRunNoInterval(t *testing.T) {
|
||||
at := &ActionTiming{}
|
||||
if !at.IsOneTimeRun() {
|
||||
t.Errorf("%v should be one time run!", at)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActionTimingIsOneTimeRunNothing(t *testing.T) {
|
||||
at := &ActionTiming{Timing: &Interval{}}
|
||||
if !at.IsOneTimeRun() {
|
||||
t.Errorf("%v should be one time run!", at)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActionTimingIsOneTimeRunStartTime(t *testing.T) {
|
||||
at := &ActionTiming{Timing: &Interval{
|
||||
StartTime: "00:00:00",
|
||||
}}
|
||||
if !at.IsOneTimeRun() {
|
||||
t.Errorf("%v should be one time run!", at)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActionTimingIsOneTimeRunWeekDay(t *testing.T) {
|
||||
at := &ActionTiming{Timing: &Interval{
|
||||
WeekDays: WeekDays{time.Monday},
|
||||
}}
|
||||
if at.IsOneTimeRun() {
|
||||
t.Errorf("%v should NOT be one time run!", at)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActionTimingLogFunction(t *testing.T) {
|
||||
a := &Action{
|
||||
ActionType: "LOG",
|
||||
BalanceId: "test",
|
||||
|
||||
Reference in New Issue
Block a user