mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
replay past scheduled actions
This commit is contained in:
@@ -20,9 +20,12 @@ package v1
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -168,3 +171,86 @@ func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *
|
||||
*reply = schedActions
|
||||
return nil
|
||||
}
|
||||
|
||||
type AttrsExecuteScheduledActions struct {
|
||||
ActionPlanID string
|
||||
TimeStart, TimeEnd time.Time // replay the action timings between the two dates
|
||||
}
|
||||
|
||||
func (self *ApierV1) ExecuteScheduledActions(attr AttrsExecuteScheduledActions, reply *string) error {
|
||||
if attr.ActionPlanID != "" { // execute by ActionPlanID
|
||||
apl, err := self.RatingDb.GetActionPlan(attr.ActionPlanID, false)
|
||||
if err != nil {
|
||||
*reply = err.Error()
|
||||
return err
|
||||
}
|
||||
if apl != nil {
|
||||
// order by weight
|
||||
engine.ActionTimingWeightOnlyPriorityList(apl.ActionTimings).Sort()
|
||||
for _, at := range apl.ActionTimings {
|
||||
if at.IsASAP() {
|
||||
continue
|
||||
}
|
||||
|
||||
at.SetAccountIDs(apl.AccountIDs) // copy the accounts
|
||||
at.SetActionPlanID(apl.Id)
|
||||
go at.Execute()
|
||||
utils.Logger.Info(fmt.Sprintf("<Force Scheduler> Executing action %s ", at.ActionsID))
|
||||
}
|
||||
}
|
||||
}
|
||||
if !attr.TimeStart.IsZero() && !attr.TimeEnd.IsZero() { // execute between two dates
|
||||
actionPlans, err := self.RatingDb.GetAllActionPlans()
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
err := fmt.Errorf("cannot get action plans: %v", err)
|
||||
*reply = err.Error()
|
||||
return err
|
||||
}
|
||||
|
||||
// recreate the queue
|
||||
queue := engine.ActionTimingPriorityList{}
|
||||
for _, actionPlan := range actionPlans {
|
||||
for _, at := range actionPlan.ActionTimings {
|
||||
if at.Timing == nil {
|
||||
continue
|
||||
}
|
||||
if at.IsASAP() {
|
||||
continue
|
||||
}
|
||||
if at.GetNextStartTime(attr.TimeStart).Before(attr.TimeStart) {
|
||||
// the task is obsolete, do not add it to the queue
|
||||
continue
|
||||
}
|
||||
at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts
|
||||
at.SetActionPlanID(actionPlan.Id)
|
||||
at.ResetStartTimeCache()
|
||||
queue = append(queue, at)
|
||||
}
|
||||
}
|
||||
sort.Sort(queue)
|
||||
// start playback execution loop
|
||||
current := attr.TimeStart
|
||||
for len(queue) > 0 && current.Before(attr.TimeEnd) {
|
||||
a0 := queue[0]
|
||||
current = a0.GetNextStartTime(current)
|
||||
if current.Before(attr.TimeEnd) || current.Equal(attr.TimeEnd) {
|
||||
utils.Logger.Info(fmt.Sprintf("<Replay Scheduler> Executing action %s for time %v", a0.ActionsID, current))
|
||||
go a0.Execute()
|
||||
// if after execute the next start time is in the past then
|
||||
// do not add it to the queue
|
||||
a0.ResetStartTimeCache()
|
||||
current = current.Add(time.Second)
|
||||
start := a0.GetNextStartTime(current)
|
||||
if start.Before(current) || start.After(attr.TimeEnd) {
|
||||
queue = queue[1:]
|
||||
} else {
|
||||
queue = append(queue, a0)
|
||||
queue = queue[1:]
|
||||
sort.Sort(queue)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
63
console/scheduler_execute.go
Normal file
63
console/scheduler_execute.go
Normal file
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
Rating system designed to be used in VoIP Carriers World
|
||||
Copyright (C) 2012-2015 ITsysCOM
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package console
|
||||
|
||||
import "github.com/cgrates/cgrates/apier/v1"
|
||||
|
||||
func init() {
|
||||
c := &CmdExecuteScheduledActions{
|
||||
name: "scheduler_execute",
|
||||
rpcMethod: "ApierV1.ExecuteScheduledActions",
|
||||
rpcParams: &v1.AttrsExecuteScheduledActions{},
|
||||
}
|
||||
commands[c.Name()] = c
|
||||
c.CommandExecuter = &CommandExecuter{c}
|
||||
}
|
||||
|
||||
// Commander implementation
|
||||
type CmdExecuteScheduledActions struct {
|
||||
name string
|
||||
rpcMethod string
|
||||
rpcParams *v1.AttrsExecuteScheduledActions
|
||||
*CommandExecuter
|
||||
}
|
||||
|
||||
func (self *CmdExecuteScheduledActions) Name() string {
|
||||
return self.name
|
||||
}
|
||||
|
||||
func (self *CmdExecuteScheduledActions) RpcMethod() string {
|
||||
return self.rpcMethod
|
||||
}
|
||||
|
||||
func (self *CmdExecuteScheduledActions) RpcParams(reset bool) interface{} {
|
||||
if reset || self.rpcParams == nil {
|
||||
self.rpcParams = &v1.AttrsExecuteScheduledActions{}
|
||||
}
|
||||
return self.rpcParams
|
||||
}
|
||||
|
||||
func (self *CmdExecuteScheduledActions) PostprocessRpcParams() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CmdExecuteScheduledActions) RpcResult() interface{} {
|
||||
var s string
|
||||
return &s
|
||||
}
|
||||
@@ -4,3 +4,4 @@ cgrates.org,1002,PACKAGE_10,STANDARD_TRIGGERS,,
|
||||
cgrates.org,1003,PACKAGE_10,STANDARD_TRIGGERS,,
|
||||
cgrates.org,1004,PACKAGE_10,STANDARD_TRIGGERS,,
|
||||
cgrates.org,1007,USE_SHARED_A,STANDARD_TRIGGERS,,
|
||||
cgrates.org,1009,TEST_EXE,,,
|
||||
|
||||
|
@@ -6,3 +6,4 @@ USE_SHARED_A,SHARED_A_0,*asap,10
|
||||
PACKAGE_1001,TOPUP_RST_5,*asap,10
|
||||
PACKAGE_1001,TOPUP_RST_SHARED_5,*asap,10
|
||||
PACKAGE_1001,TOPUP_120_DST1003,*asap,10
|
||||
TEST_EXE,TOPUP_EXE,DAILY,10
|
||||
|
@@ -4,6 +4,7 @@ TOPUP_RST_5,*topup_reset,,,,*monetary,*out,,*any,,,*unlimited,,5,20,false,false,
|
||||
TOPUP_RST_5,*topup_reset,,,,*voice,*out,,DST_1002,SPECIAL_1002,,*unlimited,,90,20,false,false,10
|
||||
TOPUP_120_DST1003,*topup_reset,,,,*voice,*out,,DST_1003,,,*unlimited,,120,20,false,false,10
|
||||
TOPUP_RST_SHARED_5,*topup,,,,*monetary,*out,,*any,,SHARED_A,*unlimited,,5,10,false,false,10
|
||||
TOPUP_EXE,*topup,,,,*monetary,*out,,*any,,,*unlimited,,5,10,false,false,10
|
||||
SHARED_A_0,*topup_reset,,,,*monetary,*out,,*any,,SHARED_A,*unlimited,,0,10,false,false,10
|
||||
LOG_WARNING,*log,,,,,,,,,,,,,,false,false,10
|
||||
DISABLE_AND_LOG,*log,,,,,,,,,,,,,,false,false,10
|
||||
|
||||
|
@@ -5,3 +5,4 @@ PEAK,*any,*any,*any,1;2;3;4;5,08:00:00
|
||||
OFFPEAK_MORNING,*any,*any,*any,1;2;3;4;5,00:00:00
|
||||
OFFPEAK_EVENING,*any,*any,*any,1;2;3;4;5,19:00:00
|
||||
OFFPEAK_WEEKEND,*any,*any,*any,6;7,00:00:00
|
||||
DAILY,*any,*any,*any,*any,00:00:00
|
||||
|
@@ -368,7 +368,7 @@ func (at *ActionTiming) IsASAP() bool {
|
||||
return at.Timing.Timing.StartTime == utils.ASAP
|
||||
}
|
||||
|
||||
// Structure to store actions according to weight
|
||||
// Structure to store actions according to execution time and weight
|
||||
type ActionTimingPriorityList []*ActionTiming
|
||||
|
||||
func (atpl ActionTimingPriorityList) Len() int {
|
||||
@@ -390,3 +390,22 @@ func (atpl ActionTimingPriorityList) Less(i, j int) bool {
|
||||
func (atpl ActionTimingPriorityList) Sort() {
|
||||
sort.Sort(atpl)
|
||||
}
|
||||
|
||||
// Structure to store actions according to weight
|
||||
type ActionTimingWeightOnlyPriorityList []*ActionTiming
|
||||
|
||||
func (atpl ActionTimingWeightOnlyPriorityList) Len() int {
|
||||
return len(atpl)
|
||||
}
|
||||
|
||||
func (atpl ActionTimingWeightOnlyPriorityList) Swap(i, j int) {
|
||||
atpl[i], atpl[j] = atpl[j], atpl[i]
|
||||
}
|
||||
|
||||
func (atpl ActionTimingWeightOnlyPriorityList) Less(i, j int) bool {
|
||||
return atpl[i].Weight > atpl[j].Weight
|
||||
}
|
||||
|
||||
func (atpl ActionTimingWeightOnlyPriorityList) Sort() {
|
||||
sort.Sort(atpl)
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ func (s *Scheduler) Loop() {
|
||||
select {
|
||||
case <-s.timer.C:
|
||||
// timer has expired
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %v", a0.ActionsID))
|
||||
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %s", a0.ActionsID))
|
||||
case <-s.restartLoop:
|
||||
// nothing to do, just continue the loop
|
||||
}
|
||||
@@ -127,7 +127,6 @@ func (s *Scheduler) loadActionPlans() {
|
||||
if at.IsASAP() {
|
||||
continue
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if at.GetNextStartTime(now).Before(now) {
|
||||
// the task is obsolete, do not add it to the queue
|
||||
|
||||
Reference in New Issue
Block a user