mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
Updated scheduler task handling
This commit is contained in:
committed by
Dan Christian Bogos
parent
b90ff99846
commit
b0208c29f0
@@ -137,11 +137,15 @@ func (s *Scheduler) Reload() {
|
||||
s.restart()
|
||||
}
|
||||
|
||||
func (s *Scheduler) loadActionPlans() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
// loadTasks loads the tasks
|
||||
// this will push the tasks that did not match
|
||||
// the filters before exiting the function
|
||||
func (s *Scheduler) loadTasks() {
|
||||
// limit the number of concurrent tasks
|
||||
limit := make(chan struct{}, 10)
|
||||
// if some task don't mach the filter save them in this slice
|
||||
// in oreder to push them back when finish executing them
|
||||
var unexecutedTasks []*engine.Task
|
||||
// execute existing tasks
|
||||
for {
|
||||
task, err := s.dm.DataDB().PopTask()
|
||||
@@ -149,22 +153,17 @@ func (s *Scheduler) loadActionPlans() {
|
||||
break
|
||||
}
|
||||
if pass, err := s.fltrS.Pass(s.cfg.GeneralCfg().DefaultTenant,
|
||||
s.cfg.SchedulerCfg().Filters, task); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing task <%s> on account <%s>",
|
||||
utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters, task.ActionsID, task.AccountID))
|
||||
if err := s.dm.DataDB().PushTask(task); err != nil {
|
||||
s.cfg.SchedulerCfg().Filters, task); err != nil || !pass {
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed pushing task <%s> back to DataDB, err <%s>",
|
||||
utils.SchedulerS, task.ActionsID, err.Error()))
|
||||
}
|
||||
continue
|
||||
} else if !pass {
|
||||
if err := s.dm.DataDB().PushTask(task); err != nil { // put the task back so it can be processed by another scheduler
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed pushing task <%s> back to DataDB, err <%s>",
|
||||
utils.SchedulerS, task.ActionsID, err.Error()))
|
||||
fmt.Sprintf("<%s> error: <%s> querying filters for path: <%+v>, not executing task <%s> on account <%s>",
|
||||
utils.SchedulerS, err.Error(), s.cfg.SchedulerCfg().Filters, task.ActionsID, task.AccountID))
|
||||
}
|
||||
// we do not push the task back as this may cause an infinite loop
|
||||
// push it when the function is done and we stoped the for
|
||||
// do not use defer here as the functions are exeucted
|
||||
// from the last one to the first
|
||||
unexecutedTasks = append(unexecutedTasks, task)
|
||||
continue
|
||||
}
|
||||
limit <- struct{}{}
|
||||
@@ -175,6 +174,19 @@ func (s *Scheduler) loadActionPlans() {
|
||||
<-limit
|
||||
}()
|
||||
}
|
||||
for _, t := range unexecutedTasks {
|
||||
if err := s.dm.DataDB().PushTask(t); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed pushing task <%s> back to DataDB, err <%s>",
|
||||
utils.SchedulerS, t.ActionsID, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) loadActionPlans() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.loadTasks()
|
||||
|
||||
actionPlans, err := s.dm.GetAllActionPlans()
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
|
||||
@@ -305,8 +305,8 @@ func (srvMngr *ServiceManager) ShutdownServices(timeout time.Duration) {
|
||||
}
|
||||
c := make(chan struct{})
|
||||
go func() {
|
||||
defer close(c)
|
||||
wg.Wait()
|
||||
close(c)
|
||||
}()
|
||||
select {
|
||||
case <-c:
|
||||
|
||||
Reference in New Issue
Block a user