From c68440aaecc3b4582ead976e01bd464628f9280e Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 6 Nov 2019 18:14:54 +0100 Subject: [PATCH] Scheduler - adding filter support for action plan --- engine/action_plan.go | 28 ++++++++++++++++++++++++++++ scheduler/scheduler.go | 27 ++++++++++++++++++++++++--- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/engine/action_plan.go b/engine/action_plan.go index bf58de039..e1220ef96 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -45,6 +45,27 @@ type ActionTiming struct { stCache time.Time // cached time of the next start } +// Tasks converts an ActionTiming into multiple Tasks +func (at *ActionTiming) Tasks() (tsks []*Task) { + if len(at.accountIDs) == 0 { + return []*Task{ + &Task{ + Uuid: at.Uuid, + ActionsID: at.ActionsID, + }} + } + tsks = make([]*Task, len(at.accountIDs)) + i := 0 + for acntID := range at.accountIDs { + tsks[i] = &Task{ + Uuid: at.Uuid, + ActionsID: at.ActionsID, + AccountID: acntID, + } + } + return +} + type ActionPlan struct { Id string // informative purpose only AccountIDs utils.StringMap @@ -251,6 +272,13 @@ func (at *ActionTiming) SetAccountIDs(accIDs utils.StringMap) { at.accountIDs = accIDs } +func (at *ActionTiming) RemoveAccountID(acntID string) (found bool) { + if _, found = at.accountIDs[acntID]; found { + delete(at.accountIDs, acntID) + } + return +} + func (at *ActionTiming) GetAccountIDs() utils.StringMap { return at.accountIDs } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index fedb5f4bd..728ad4d06 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -153,13 +153,24 @@ func (s *Scheduler) loadActionPlans() { utils.Logger.Warning( fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing task <%s> on account <%s>", utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters[1:], task.ActionsID, task.AccountID)) + if err := s.dm.DataDB().PushTask(task); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> failed pushing task <%s> back to DataDB, err <%s>", + utils.SchedulerS, task.ActionsID, err.Error())) + } continue } else if !pass { + if err := s.dm.DataDB().PushTask(task); err != nil { // put the task back so it can be processed by another scheduler + utils.Logger.Warning( + fmt.Sprintf("<%s> failed pushing task <%s> back to DataDB, err <%s>", + utils.SchedulerS, task.ActionsID, err.Error())) + } continue } limit <- true go func() { - utils.Logger.Info(fmt.Sprintf("<%s> executing task %s on account %s", utils.SchedulerS, task.ActionsID, task.AccountID)) + utils.Logger.Info(fmt.Sprintf("<%s> executing task %s on account %s", + utils.SchedulerS, task.ActionsID, task.AccountID)) task.Execute() <-limit }() @@ -182,7 +193,7 @@ func (s *Scheduler) loadActionPlans() { continue } if at.IsASAP() { - continue + continue // should be already executed as task } now := time.Now() if at.GetNextStartTime(now).Before(now) { @@ -191,8 +202,18 @@ func (s *Scheduler) loadActionPlans() { } at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts at.SetActionPlanID(actionPlan.Id) + for _, task := range at.Tasks() { + if pass, err := s.fltrS.Pass(s.cfg.GeneralCfg().DefaultTenant, + s.cfg.SchedulerCfg().Filters, task); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing action <%s> on account <%s>", + utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters[1:], task.ActionsID, task.AccountID)) + at.RemoveAccountID(task.AccountID) + } else if !pass { + at.RemoveAccountID(task.AccountID) + } + } s.queue = append(s.queue, at) - } } sort.Sort(s.queue)