diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 2c8afa271..4b5521317 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -137,11 +137,15 @@ func (s *Scheduler) Reload() { s.restart() } -func (s *Scheduler) loadActionPlans() { - s.Lock() - defer s.Unlock() +// loadTasks loads the tasks +// this will push the tasks that did not match +// the filters before exiting the function +func (s *Scheduler) loadTasks() { // limit the number of concurrent tasks limit := make(chan struct{}, 10) + // if some task don't mach the filter save them in this slice + // in oreder to push them back when finish executing them + var unexecutedTasks []*engine.Task // execute existing tasks for { task, err := s.dm.DataDB().PopTask() @@ -149,22 +153,17 @@ func (s *Scheduler) loadActionPlans() { break } if pass, err := s.fltrS.Pass(s.cfg.GeneralCfg().DefaultTenant, - s.cfg.SchedulerCfg().Filters, task); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing task <%s> on account <%s>", - utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters, task.ActionsID, task.AccountID)) - if err := s.dm.DataDB().PushTask(task); err != nil { + s.cfg.SchedulerCfg().Filters, task); err != nil || !pass { + if err != nil { utils.Logger.Warning( - fmt.Sprintf("<%s> failed pushing task <%s> back to DataDB, err <%s>", - utils.SchedulerS, task.ActionsID, err.Error())) - } - continue - } else if !pass { - if err := s.dm.DataDB().PushTask(task); err != nil { // put the task back so it can be processed by another scheduler - utils.Logger.Warning( - fmt.Sprintf("<%s> failed pushing task <%s> back to DataDB, err <%s>", - utils.SchedulerS, task.ActionsID, err.Error())) + fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing task <%s> on account <%s>", + utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters, task.ActionsID, task.AccountID)) } + // we do not push the task back as this may cause an infinite loop + // push it when the function is done and we stoped the for + // do not use defer here as the functions are exeucted + // from the last one to the first + unexecutedTasks = append(unexecutedTasks, task) continue } limit <- struct{}{} @@ -175,6 +174,19 @@ func (s *Scheduler) loadActionPlans() { <-limit }() } + for _, t := range unexecutedTasks { + if err := s.dm.DataDB().PushTask(t); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> failed pushing task <%s> back to DataDB, err <%s>", + utils.SchedulerS, t.ActionsID, err.Error())) + } + } +} + +func (s *Scheduler) loadActionPlans() { + s.Lock() + defer s.Unlock() + s.loadTasks() actionPlans, err := s.dm.GetAllActionPlans() if err != nil && err != utils.ErrNotFound { diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 49cf9a318..bc5912641 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -305,8 +305,8 @@ func (srvMngr *ServiceManager) ShutdownServices(timeout time.Duration) { } c := make(chan struct{}) go func() { - defer close(c) wg.Wait() + close(c) }() select { case <-c: