Scheduler - adding filter support for action plan

This commit is contained in:
DanB
2019-11-06 18:14:54 +01:00
parent 04cc20c1d2
commit c68440aaec
2 changed files with 52 additions and 3 deletions

View File

@@ -45,6 +45,27 @@ type ActionTiming struct {
stCache time.Time // cached time of the next start
}
// Tasks converts an ActionTiming into multiple Tasks
func (at *ActionTiming) Tasks() (tsks []*Task) {
if len(at.accountIDs) == 0 {
return []*Task{
&Task{
Uuid: at.Uuid,
ActionsID: at.ActionsID,
}}
}
tsks = make([]*Task, len(at.accountIDs))
i := 0
for acntID := range at.accountIDs {
tsks[i] = &Task{
Uuid: at.Uuid,
ActionsID: at.ActionsID,
AccountID: acntID,
}
}
return
}
type ActionPlan struct {
Id string // informative purpose only
AccountIDs utils.StringMap
@@ -251,6 +272,13 @@ func (at *ActionTiming) SetAccountIDs(accIDs utils.StringMap) {
at.accountIDs = accIDs
}
func (at *ActionTiming) RemoveAccountID(acntID string) (found bool) {
if _, found = at.accountIDs[acntID]; found {
delete(at.accountIDs, acntID)
}
return
}
func (at *ActionTiming) GetAccountIDs() utils.StringMap {
return at.accountIDs
}

View File

@@ -153,13 +153,24 @@ func (s *Scheduler) loadActionPlans() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing task <%s> on account <%s>",
utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters[1:], task.ActionsID, task.AccountID))
if err := s.dm.DataDB().PushTask(task); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed pushing task <%s> back to DataDB, err <%s>",
utils.SchedulerS, task.ActionsID, err.Error()))
}
continue
} else if !pass {
if err := s.dm.DataDB().PushTask(task); err != nil { // put the task back so it can be processed by another scheduler
utils.Logger.Warning(
fmt.Sprintf("<%s> failed pushing task <%s> back to DataDB, err <%s>",
utils.SchedulerS, task.ActionsID, err.Error()))
}
continue
}
limit <- true
go func() {
utils.Logger.Info(fmt.Sprintf("<%s> executing task %s on account %s", utils.SchedulerS, task.ActionsID, task.AccountID))
utils.Logger.Info(fmt.Sprintf("<%s> executing task %s on account %s",
utils.SchedulerS, task.ActionsID, task.AccountID))
task.Execute()
<-limit
}()
@@ -182,7 +193,7 @@ func (s *Scheduler) loadActionPlans() {
continue
}
if at.IsASAP() {
continue
continue // should be already executed as task
}
now := time.Now()
if at.GetNextStartTime(now).Before(now) {
@@ -191,8 +202,18 @@ func (s *Scheduler) loadActionPlans() {
}
at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts
at.SetActionPlanID(actionPlan.Id)
for _, task := range at.Tasks() {
if pass, err := s.fltrS.Pass(s.cfg.GeneralCfg().DefaultTenant,
s.cfg.SchedulerCfg().Filters, task); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing action <%s> on account <%s>",
utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters[1:], task.ActionsID, task.AccountID))
at.RemoveAccountID(task.AccountID)
} else if !pass {
at.RemoveAccountID(task.AccountID)
}
}
s.queue = append(s.queue, at)
}
}
sort.Sort(s.queue)