mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
make sure next action time is refreshed
This commit is contained in:
@@ -218,7 +218,7 @@ YEARS:
|
||||
return
|
||||
}
|
||||
|
||||
func (at *ActionPlan) resetStartTimeCache() {
|
||||
func (at *ActionPlan) ResetStartTimeCache() {
|
||||
at.stCache = time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
}
|
||||
|
||||
@@ -238,7 +238,7 @@ func (at *ActionPlan) Execute() (err error) {
|
||||
if len(at.AccountIds) == 0 { // nothing to do if no accounts set
|
||||
return
|
||||
}
|
||||
at.resetStartTimeCache()
|
||||
at.ResetStartTimeCache()
|
||||
aac, err := at.getActions()
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsId, err))
|
||||
@@ -283,7 +283,7 @@ func (at *ActionPlan) Execute() (err error) {
|
||||
// delete without preserving order
|
||||
at.AccountIds[i] = at.AccountIds[len(at.AccountIds)-1]
|
||||
at.AccountIds = at.AccountIds[:len(at.AccountIds)-1]
|
||||
i -= 1
|
||||
i--
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,17 @@ func TestActionPlanNothing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestActionTimingMidnight(t *testing.T) {
|
||||
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: "00:00:00"}}}
|
||||
y, m, d := referenceDate.Date()
|
||||
now := time.Date(y, m, d, 0, 0, 1, 0, time.Local)
|
||||
st := at.GetNextStartTime(now)
|
||||
expected := time.Date(y, m, d, 0, 0, 0, 0, time.Local).AddDate(0, 0, 1)
|
||||
if !st.Equal(expected) {
|
||||
t.Errorf("Expected %v was %v", expected, st)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActionPlanOnlyHour(t *testing.T) {
|
||||
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: "10:01:00"}}}
|
||||
st := at.GetNextStartTime(referenceDate)
|
||||
|
||||
@@ -46,13 +46,15 @@ func (s *Scheduler) Loop() {
|
||||
}
|
||||
s.Lock()
|
||||
a0 := s.queue[0]
|
||||
//utils.Logger.Info(fmt.Sprintf("Scheduler qeue length: %v", len(s.qeue)))
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Action: %s", a0.Id))
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Scheduler queue length: %v", len(s.queue)))
|
||||
now := time.Now()
|
||||
start := a0.GetNextStartTime(now)
|
||||
if start.Equal(now) || start.Before(now) {
|
||||
go a0.Execute()
|
||||
// if after execute the next start time is in the past then
|
||||
// do not add it to the queue
|
||||
a0.ResetStartTimeCache()
|
||||
now = time.Now().Add(time.Second)
|
||||
start = a0.GetNextStartTime(now)
|
||||
if start.Before(now) {
|
||||
@@ -71,7 +73,7 @@ func (s *Scheduler) Loop() {
|
||||
select {
|
||||
case <-s.timer.C:
|
||||
// timer has expired
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %v", a0))
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %v", a0.Id))
|
||||
case <-s.restartLoop:
|
||||
// nothing to do, just continue the loop
|
||||
}
|
||||
@@ -87,6 +89,7 @@ func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) {
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> processing %d action plans", len(actionPlans)))
|
||||
// recreate the queue
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.queue = engine.ActionPlanPriotityList{}
|
||||
for key, aps := range actionPlans {
|
||||
toBeSaved := false
|
||||
@@ -128,7 +131,6 @@ func (s *Scheduler) LoadActionPlans(storage engine.RatingStorage) {
|
||||
}
|
||||
sort.Sort(s.queue)
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> queued %d action plans", len(s.queue)))
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (s *Scheduler) Restart() {
|
||||
|
||||
Reference in New Issue
Block a user