From 708a3f8db5ef4c769371c59cec64a1b9c6ae722d Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 25 Jan 2016 22:02:33 +0200 Subject: [PATCH] replay past scheduled actions --- apier/v1/scheduler.go | 86 ++++++++++++++++++++ console/scheduler_execute.go | 63 ++++++++++++++ data/tariffplans/tutorial/AccountActions.csv | 1 + data/tariffplans/tutorial/ActionPlans.csv | 1 + data/tariffplans/tutorial/Actions.csv | 1 + data/tariffplans/tutorial/Timings.csv | 1 + engine/action_plan.go | 21 ++++- scheduler/scheduler.go | 3 +- 8 files changed, 174 insertions(+), 3 deletions(-) create mode 100644 console/scheduler_execute.go diff --git a/apier/v1/scheduler.go b/apier/v1/scheduler.go index a91701204..99ecf443b 100644 --- a/apier/v1/scheduler.go +++ b/apier/v1/scheduler.go @@ -20,9 +20,12 @@ package v1 import ( "errors" + "fmt" + "sort" "strings" "time" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -168,3 +171,86 @@ func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply * *reply = schedActions return nil } + +type AttrsExecuteScheduledActions struct { + ActionPlanID string + TimeStart, TimeEnd time.Time // replay the action timings between the two dates +} + +func (self *ApierV1) ExecuteScheduledActions(attr AttrsExecuteScheduledActions, reply *string) error { + if attr.ActionPlanID != "" { // execute by ActionPlanID + apl, err := self.RatingDb.GetActionPlan(attr.ActionPlanID, false) + if err != nil { + *reply = err.Error() + return err + } + if apl != nil { + // order by weight + engine.ActionTimingWeightOnlyPriorityList(apl.ActionTimings).Sort() + for _, at := range apl.ActionTimings { + if at.IsASAP() { + continue + } + + at.SetAccountIDs(apl.AccountIDs) // copy the accounts + at.SetActionPlanID(apl.Id) + go at.Execute() + utils.Logger.Info(fmt.Sprintf(" Executing action %s ", at.ActionsID)) + } + } + } + if !attr.TimeStart.IsZero() && !attr.TimeEnd.IsZero() { // execute between two dates + actionPlans, err := self.RatingDb.GetAllActionPlans() + if err != nil && err != utils.ErrNotFound { + err := fmt.Errorf("cannot get action plans: %v", err) + *reply = err.Error() + return err + } + + // recreate the queue + queue := engine.ActionTimingPriorityList{} + for _, actionPlan := range actionPlans { + for _, at := range actionPlan.ActionTimings { + if at.Timing == nil { + continue + } + if at.IsASAP() { + continue + } + if at.GetNextStartTime(attr.TimeStart).Before(attr.TimeStart) { + // the task is obsolete, do not add it to the queue + continue + } + at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts + at.SetActionPlanID(actionPlan.Id) + at.ResetStartTimeCache() + queue = append(queue, at) + } + } + sort.Sort(queue) + // start playback execution loop + current := attr.TimeStart + for len(queue) > 0 && current.Before(attr.TimeEnd) { + a0 := queue[0] + current = a0.GetNextStartTime(current) + if current.Before(attr.TimeEnd) || current.Equal(attr.TimeEnd) { + utils.Logger.Info(fmt.Sprintf(" Executing action %s for time %v", a0.ActionsID, current)) + go a0.Execute() + // if after execute the next start time is in the past then + // do not add it to the queue + a0.ResetStartTimeCache() + current = current.Add(time.Second) + start := a0.GetNextStartTime(current) + if start.Before(current) || start.After(attr.TimeEnd) { + queue = queue[1:] + } else { + queue = append(queue, a0) + queue = queue[1:] + sort.Sort(queue) + } + } + } + } + *reply = utils.OK + return nil +} diff --git a/console/scheduler_execute.go b/console/scheduler_execute.go new file mode 100644 index 000000000..172d3e833 --- /dev/null +++ b/console/scheduler_execute.go @@ -0,0 +1,63 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import "github.com/cgrates/cgrates/apier/v1" + +func init() { + c := &CmdExecuteScheduledActions{ + name: "scheduler_execute", + rpcMethod: "ApierV1.ExecuteScheduledActions", + rpcParams: &v1.AttrsExecuteScheduledActions{}, + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +// Commander implementation +type CmdExecuteScheduledActions struct { + name string + rpcMethod string + rpcParams *v1.AttrsExecuteScheduledActions + *CommandExecuter +} + +func (self *CmdExecuteScheduledActions) Name() string { + return self.name +} + +func (self *CmdExecuteScheduledActions) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdExecuteScheduledActions) RpcParams(reset bool) interface{} { + if reset || self.rpcParams == nil { + self.rpcParams = &v1.AttrsExecuteScheduledActions{} + } + return self.rpcParams +} + +func (self *CmdExecuteScheduledActions) PostprocessRpcParams() error { + return nil +} + +func (self *CmdExecuteScheduledActions) RpcResult() interface{} { + var s string + return &s +} diff --git a/data/tariffplans/tutorial/AccountActions.csv b/data/tariffplans/tutorial/AccountActions.csv index 97bcd5f64..bfdad1dd0 100644 --- a/data/tariffplans/tutorial/AccountActions.csv +++ b/data/tariffplans/tutorial/AccountActions.csv @@ -4,3 +4,4 @@ cgrates.org,1002,PACKAGE_10,STANDARD_TRIGGERS,, cgrates.org,1003,PACKAGE_10,STANDARD_TRIGGERS,, cgrates.org,1004,PACKAGE_10,STANDARD_TRIGGERS,, cgrates.org,1007,USE_SHARED_A,STANDARD_TRIGGERS,, +cgrates.org,1009,TEST_EXE,,, diff --git a/data/tariffplans/tutorial/ActionPlans.csv b/data/tariffplans/tutorial/ActionPlans.csv index 3d59fe3e9..ec5a53cce 100644 --- a/data/tariffplans/tutorial/ActionPlans.csv +++ b/data/tariffplans/tutorial/ActionPlans.csv @@ -6,3 +6,4 @@ USE_SHARED_A,SHARED_A_0,*asap,10 PACKAGE_1001,TOPUP_RST_5,*asap,10 PACKAGE_1001,TOPUP_RST_SHARED_5,*asap,10 PACKAGE_1001,TOPUP_120_DST1003,*asap,10 +TEST_EXE,TOPUP_EXE,DAILY,10 \ No newline at end of file diff --git a/data/tariffplans/tutorial/Actions.csv b/data/tariffplans/tutorial/Actions.csv index 93a3faa1f..690285fc3 100644 --- a/data/tariffplans/tutorial/Actions.csv +++ b/data/tariffplans/tutorial/Actions.csv @@ -4,6 +4,7 @@ TOPUP_RST_5,*topup_reset,,,,*monetary,*out,,*any,,,*unlimited,,5,20,false,false, TOPUP_RST_5,*topup_reset,,,,*voice,*out,,DST_1002,SPECIAL_1002,,*unlimited,,90,20,false,false,10 TOPUP_120_DST1003,*topup_reset,,,,*voice,*out,,DST_1003,,,*unlimited,,120,20,false,false,10 TOPUP_RST_SHARED_5,*topup,,,,*monetary,*out,,*any,,SHARED_A,*unlimited,,5,10,false,false,10 +TOPUP_EXE,*topup,,,,*monetary,*out,,*any,,,*unlimited,,5,10,false,false,10 SHARED_A_0,*topup_reset,,,,*monetary,*out,,*any,,SHARED_A,*unlimited,,0,10,false,false,10 LOG_WARNING,*log,,,,,,,,,,,,,,false,false,10 DISABLE_AND_LOG,*log,,,,,,,,,,,,,,false,false,10 diff --git a/data/tariffplans/tutorial/Timings.csv b/data/tariffplans/tutorial/Timings.csv index 59fbab0a9..d957c710f 100644 --- a/data/tariffplans/tutorial/Timings.csv +++ b/data/tariffplans/tutorial/Timings.csv @@ -5,3 +5,4 @@ PEAK,*any,*any,*any,1;2;3;4;5,08:00:00 OFFPEAK_MORNING,*any,*any,*any,1;2;3;4;5,00:00:00 OFFPEAK_EVENING,*any,*any,*any,1;2;3;4;5,19:00:00 OFFPEAK_WEEKEND,*any,*any,*any,6;7,00:00:00 +DAILY,*any,*any,*any,*any,00:00:00 \ No newline at end of file diff --git a/engine/action_plan.go b/engine/action_plan.go index 23aa88f04..a633c8386 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -368,7 +368,7 @@ func (at *ActionTiming) IsASAP() bool { return at.Timing.Timing.StartTime == utils.ASAP } -// Structure to store actions according to weight +// Structure to store actions according to execution time and weight type ActionTimingPriorityList []*ActionTiming func (atpl ActionTimingPriorityList) Len() int { @@ -390,3 +390,22 @@ func (atpl ActionTimingPriorityList) Less(i, j int) bool { func (atpl ActionTimingPriorityList) Sort() { sort.Sort(atpl) } + +// Structure to store actions according to weight +type ActionTimingWeightOnlyPriorityList []*ActionTiming + +func (atpl ActionTimingWeightOnlyPriorityList) Len() int { + return len(atpl) +} + +func (atpl ActionTimingWeightOnlyPriorityList) Swap(i, j int) { + atpl[i], atpl[j] = atpl[j], atpl[i] +} + +func (atpl ActionTimingWeightOnlyPriorityList) Less(i, j int) bool { + return atpl[i].Weight > atpl[j].Weight +} + +func (atpl ActionTimingWeightOnlyPriorityList) Sort() { + sort.Sort(atpl) +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index e8c6896a9..2cfb53b8d 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -79,7 +79,7 @@ func (s *Scheduler) Loop() { select { case <-s.timer.C: // timer has expired - utils.Logger.Info(fmt.Sprintf(" Time for action on %v", a0.ActionsID)) + utils.Logger.Info(fmt.Sprintf(" Time for action on %s", a0.ActionsID)) case <-s.restartLoop: // nothing to do, just continue the loop } @@ -127,7 +127,6 @@ func (s *Scheduler) loadActionPlans() { if at.IsASAP() { continue } - now := time.Now() if at.GetNextStartTime(now).Before(now) { // the task is obsolete, do not add it to the queue