Scheduler actSuccessStats, actFailedStats with update mechanism and tests

This commit is contained in:
DanB
2017-01-22 20:31:07 +01:00
parent 6bc64c73ce
commit f5314bb83e
9 changed files with 115 additions and 66 deletions

View File

@@ -470,7 +470,7 @@ func (self *ApierV1) modifyBalance(aType string, attr *AttrAddBalance, reply *st
a.Balance.TimingIDs = utils.StringMapPointer(utils.ParseStringMap(*attr.TimingIds))
}
at.SetActions(engine.Actions{a})
if err := at.Execute(); err != nil {
if err := at.Execute(nil, nil); err != nil {
return err
}
*reply = OK
@@ -540,7 +540,7 @@ func (self *ApierV1) SetBalance(attr *utils.AttrSetBalance, reply *string) error
a.Balance.TimingIDs = utils.StringMapPointer(utils.ParseStringMap(*attr.TimingIds))
}
at.SetActions(engine.Actions{a})
if err := at.Execute(); err != nil {
if err := at.Execute(nil, nil); err != nil {
*reply = err.Error()
return err
}
@@ -600,7 +600,7 @@ func (self *ApierV1) RemoveBalances(attr *utils.AttrSetBalance, reply *string) e
a.Balance.TimingIDs = utils.StringMapPointer(utils.ParseStringMap(*attr.TimingIds))
}
at.SetActions(engine.Actions{a})
if err := at.Execute(); err != nil {
if err := at.Execute(nil, nil); err != nil {
*reply = err.Error()
return err
}

View File

@@ -181,7 +181,7 @@ func (self *ApierV1) ExecuteAction(attr *utils.AttrExecuteAction, reply *string)
if attr.Tenant != "" && attr.Account != "" {
at.SetAccountIDs(utils.StringMap{utils.AccountKey(attr.Tenant, attr.Account): true})
}
if err := at.Execute(); err != nil {
if err := at.Execute(nil, nil); err != nil {
*reply = err.Error()
return err
}

View File

@@ -194,7 +194,7 @@ func (self *ApierV1) ExecuteScheduledActions(attr AttrsExecuteScheduledActions,
at.SetAccountIDs(apl.AccountIDs) // copy the accounts
at.SetActionPlanID(apl.Id)
err := at.Execute()
err := at.Execute(nil, nil)
if err != nil {
*reply = err.Error()
return err
@@ -239,7 +239,7 @@ func (self *ApierV1) ExecuteScheduledActions(attr AttrsExecuteScheduledActions,
current = a0.GetNextStartTime(current)
if current.Before(attr.TimeEnd) || current.Equal(attr.TimeEnd) {
utils.Logger.Info(fmt.Sprintf("<Replay Scheduler> Executing action %s for time %v", a0.ActionsID, current))
err := a0.Execute()
err := a0.Execute(nil, nil)
if err != nil {
*reply = err.Error()
return err

View File

@@ -21,13 +21,12 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/structmatcher"
"github.com/cgrates/cgrates/utils"
"strings"
)
/*

View File

@@ -74,7 +74,7 @@ func (t *Task) Execute() error {
Uuid: t.Uuid,
ActionsID: t.ActionsID,
accountIDs: utils.StringMap{t.AccountID: true},
}).Execute()
}).Execute(nil, nil)
}
func (at *ActionTiming) GetNextStartTime(now time.Time) (t time.Time) {
@@ -282,7 +282,9 @@ func (at *ActionTiming) getActions() (as []*Action, err error) {
return at.actions, err
}
func (at *ActionTiming) Execute() (err error) {
// Execute will execute all actions in an action plan
// Reports on success/fail via channel if != nil
func (at *ActionTiming) Execute(successActions, failedActions chan *Action) (err error) {
at.ResetStartTimeCache()
aac, err := at.getActions()
if err != nil {
@@ -331,8 +333,14 @@ func (at *ActionTiming) Execute() (err error) {
if err := actionFunction(acc, nil, a, aac); err != nil {
utils.Logger.Err(fmt.Sprintf("Error executing action %s: %v!", a.ActionType, err))
transactionFailed = true
if failedActions != nil {
go func() { failedActions <- a }()
}
break
}
if successActions != nil {
go func() { successActions <- a }()
}
if a.ActionType == REMOVE_ACCOUNT {
removeAccountActionFound = true
}
@@ -356,12 +364,21 @@ func (at *ActionTiming) Execute() (err error) {
// do not allow the action plan to be rescheduled
at.Timing = nil
utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType))
if failedActions != nil {
go func() { failedActions <- a }()
}
break
}
if err := actionFunction(nil, nil, a, aac); err != nil {
utils.Logger.Err(fmt.Sprintf("Error executing accountless action %s: %v!", a.ActionType, err))
if failedActions != nil {
go func() { failedActions <- a }()
}
break
}
if successActions != nil {
go func() { successActions <- a }()
}
}
}
if err != nil {

View File

@@ -419,7 +419,7 @@ func TestActionPlanLogFunction(t *testing.T) {
at := &ActionTiming{
actions: []*Action{a},
}
err := at.Execute()
err := at.Execute(nil, nil)
if err != nil {
t.Errorf("Could not execute LOG action: %v", err)
}
@@ -438,7 +438,7 @@ func TestActionPlanFunctionNotAvailable(t *testing.T) {
Timing: &RateInterval{},
actions: []*Action{a},
}
err := at.Execute()
err := at.Execute(nil, nil)
if err != nil {
t.Errorf("Faild to detect wrong function type: %v", err)
}
@@ -577,7 +577,7 @@ func TestActionPlansRemoveMember(t *testing.T) {
actions: actions,
}
if err = at.Execute(); err != nil {
if err = at.Execute(nil, nil); err != nil {
t.Errorf("Execute Action: %v", err)
}
@@ -1214,7 +1214,7 @@ func TestRemoveAction(t *testing.T) {
accountIDs: utils.StringMap{"cgrates.org:remo": true},
actions: Actions{a},
}
at.Execute()
at.Execute(nil, nil)
afterUb, err := accountingStorage.GetAccount("cgrates.org:remo")
if err == nil || afterUb != nil {
t.Error("error removing account: ", err, afterUb)
@@ -1233,7 +1233,7 @@ func TestTopupAction(t *testing.T) {
actions: Actions{a},
}
at.Execute()
at.Execute(nil, nil)
afterUb, _ := accountingStorage.GetAccount("vdf:minu")
initialValue := initialUb.BalanceMap[utils.MONETARY].GetTotalValue()
afterValue := afterUb.BalanceMap[utils.MONETARY].GetTotalValue()
@@ -1254,7 +1254,7 @@ func TestTopupActionLoaded(t *testing.T) {
actions: Actions{a},
}
at.Execute()
at.Execute(nil, nil)
afterUb, _ := accountingStorage.GetAccount("vdf:minitsboy")
initialValue := initialUb.BalanceMap[utils.MONETARY].GetTotalValue()
afterValue := afterUb.BalanceMap[utils.MONETARY].GetTotalValue()
@@ -1422,7 +1422,7 @@ func TestActionTransactionFuncType(t *testing.T) {
},
},
}
err = at.Execute()
err = at.Execute(nil, nil)
acc, err := accountingStorage.GetAccount("cgrates.org:trans")
if err != nil || acc == nil {
t.Error("Error getting account: ", acc, err)
@@ -1458,7 +1458,7 @@ func TestActionTransactionBalanceType(t *testing.T) {
},
},
}
err = at.Execute()
err = at.Execute(nil, nil)
acc, err := accountingStorage.GetAccount("cgrates.org:trans")
if err != nil || acc == nil {
t.Error("Error getting account: ", acc, err)
@@ -1494,7 +1494,7 @@ func TestActionTransactionBalanceNotType(t *testing.T) {
},
},
}
err = at.Execute()
err = at.Execute(nil, nil)
acc, err := accountingStorage.GetAccount("cgrates.org:trans")
if err != nil || acc == nil {
t.Error("Error getting account: ", acc, err)
@@ -1537,7 +1537,7 @@ func TestActionWithExpireWithoutExpire(t *testing.T) {
},
},
}
err = at.Execute()
err = at.Execute(nil, nil)
acc, err := accountingStorage.GetAccount("cgrates.org:exp")
if err != nil || acc == nil {
t.Errorf("Error getting account: %+v: %v", acc, err)
@@ -1584,7 +1584,7 @@ func TestActionRemoveBalance(t *testing.T) {
},
},
}
err = at.Execute()
err = at.Execute(nil, nil)
acc, err := accountingStorage.GetAccount("cgrates.org:rembal")
if err != nil || acc == nil {
t.Errorf("Error getting account: %+v: %v", acc, err)
@@ -1633,7 +1633,7 @@ func TestActionTransferMonetaryDefault(t *testing.T) {
accountIDs: utils.StringMap{"cgrates.org:trans": true},
actions: Actions{a},
}
at.Execute()
at.Execute(nil, nil)
afterUb, err := accountingStorage.GetAccount("cgrates.org:trans")
if err != nil {
@@ -1694,7 +1694,7 @@ func TestActionTransferMonetaryDefaultFilter(t *testing.T) {
accountIDs: utils.StringMap{"cgrates.org:trans": true},
actions: Actions{a},
}
at.Execute()
at.Execute(nil, nil)
afterUb, err := accountingStorage.GetAccount("cgrates.org:trans")
if err != nil {
@@ -1760,7 +1760,7 @@ func TestActionConditionalTopup(t *testing.T) {
accountIDs: utils.StringMap{"cgrates.org:cond": true},
actions: Actions{a},
}
at.Execute()
at.Execute(nil, nil)
afterUb, err := accountingStorage.GetAccount("cgrates.org:cond")
if err != nil {
@@ -1824,7 +1824,7 @@ func TestActionConditionalTopupNoMatch(t *testing.T) {
accountIDs: utils.StringMap{"cgrates.org:cond": true},
actions: Actions{a},
}
at.Execute()
at.Execute(nil, nil)
afterUb, err := accountingStorage.GetAccount("cgrates.org:cond")
if err != nil {
@@ -1888,7 +1888,7 @@ func TestActionConditionalTopupExistingBalance(t *testing.T) {
accountIDs: utils.StringMap{"cgrates.org:cond": true},
actions: Actions{a},
}
at.Execute()
at.Execute(nil, nil)
afterUb, err := accountingStorage.GetAccount("cgrates.org:cond")
if err != nil {
@@ -2038,7 +2038,7 @@ func TestActionConditionalDisabledIfNegative(t *testing.T) {
accountIDs: utils.StringMap{"cgrates.org:af": true},
actions: Actions{a1, a2, a3, a4, a5},
}
at.Execute()
at.Execute(nil, nil)
afterUb, err := accountingStorage.GetAccount("cgrates.org:af")
if err != nil {
@@ -2109,7 +2109,7 @@ func TestActionSetBalance(t *testing.T) {
accountIDs: utils.StringMap{"cgrates.org:setb": true},
actions: Actions{a},
}
at.Execute()
at.Execute(nil, nil)
afterUb, err := accountingStorage.GetAccount("cgrates.org:setb")
if err != nil {
@@ -2146,7 +2146,7 @@ func TestActionExpirationTime(t *testing.T) {
actions: a,
}
for rep := 0; rep < 5; rep++ {
at.Execute()
at.Execute(nil, nil)
afterUb, err := accountingStorage.GetAccount("cgrates.org:expo")
if err != nil ||
len(afterUb.BalanceMap[utils.VOICE]) != rep+1 {
@@ -2169,7 +2169,7 @@ func TestActionExpNoExp(t *testing.T) {
accountIDs: utils.StringMap{"cgrates.org:expnoexp": true},
actions: exp,
}
at.Execute()
at.Execute(nil, nil)
afterUb, err := accountingStorage.GetAccount("cgrates.org:expnoexp")
if err != nil ||
len(afterUb.BalanceMap[utils.VOICE]) != 2 {
@@ -2222,7 +2222,7 @@ func TestActionCdrlogBalanceValue(t *testing.T) {
},
},
}
err = at.Execute()
err = at.Execute(nil, nil)
acc, err := accountingStorage.GetAccount("cgrates.org:bv")
if err != nil || acc == nil {
t.Error("Error getting account: ", acc, err)
@@ -2310,7 +2310,7 @@ func TestValueFormulaDebit(t *testing.T) {
accountIDs: utils.StringMap{"cgrates.org:vf": true},
ActionsID: "VF",
}
at.Execute()
at.Execute(nil, nil)
afterUb, err := accountingStorage.GetAccount("cgrates.org:vf")
// not an exact value, depends of month
v := afterUb.BalanceMap[utils.MONETARY].GetTotalValue()

View File

@@ -560,7 +560,7 @@ func TestMaxSessionTimeWithMaxRate(t *testing.T) {
//log.Print(ap)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
//acc, _ := accountingStorage.GetAccount("cgrates.org:12345")
//log.Print("ACC: ", utils.ToIJSON(acc))
@@ -587,7 +587,7 @@ func TestMaxSessionTimeWithMaxCost(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -611,7 +611,7 @@ func TestGetMaxSessiontWithBlocker(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("BLOCK_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
acc, err := accountingStorage.GetAccount("cgrates.org:block")
if err != nil {
@@ -662,7 +662,7 @@ func TestGetMaxSessiontWithBlockerEmpty(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("BLOCK_EMPTY_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
acc, err := accountingStorage.GetAccount("cgrates.org:block_empty")
if err != nil {
@@ -713,7 +713,7 @@ func TestGetCostWithMaxCost(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -737,7 +737,7 @@ func TestGetCostRoundingIssue(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -762,7 +762,7 @@ func TestGetCostRatingInfoOnZeroTime(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -790,7 +790,7 @@ func TestDebitRatingInfoOnZeroTime(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -819,7 +819,7 @@ func TestMaxDebitRatingInfoOnZeroTime(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -847,7 +847,7 @@ func TestMaxDebitUnknowDest(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -870,7 +870,7 @@ func TestMaxDebitRoundingIssue(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -905,7 +905,7 @@ func TestDebitRoundingRefund(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -940,7 +940,7 @@ func TestMaxSessionTimeWithMaxCostFree(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -964,7 +964,7 @@ func TestMaxDebitWithMaxCostFree(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -988,7 +988,7 @@ func TestGetCostWithMaxCostFree(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
Direction: "*out",
@@ -1043,12 +1043,12 @@ func TestMaxSessionTimeWithAccountShared(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED0_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
ap, _ = ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd0 := &CallDescriptor{
@@ -1084,12 +1084,12 @@ func TestMaxDebitWithAccountShared(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED0_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
ap, _ = ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
@@ -1307,7 +1307,7 @@ func TestDebitFromShareAndNormal(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
@@ -1336,7 +1336,7 @@ func TestDebitFromEmptyShare(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP_EMPTY_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
@@ -1365,7 +1365,7 @@ func TestDebitNegatve(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("POST_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd := &CallDescriptor{
@@ -1405,7 +1405,7 @@ func TestMaxDebitZeroDefinedRate(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd1 := &CallDescriptor{
Direction: "*out",
@@ -1435,7 +1435,7 @@ func TestMaxDebitForceDuration(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd1 := &CallDescriptor{
Direction: "*out",
@@ -1460,7 +1460,7 @@ func TestMaxDebitZeroDefinedRateOnlyMinutes(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd1 := &CallDescriptor{
Direction: "*out",
@@ -1489,7 +1489,7 @@ func TestMaxDebitConsumesMinutes(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
at.Execute(nil, nil)
}
cd1 := &CallDescriptor{
Direction: "*out",

View File

@@ -83,7 +83,7 @@ func TestAcntActsDisableAcnt(t *testing.T) {
ActionsID: "DISABLE_ACNT",
}
at.SetAccountIDs(utils.StringMap{acnt1Tag: true})
if err := at.Execute(); err != nil {
if err := at.Execute(nil, nil); err != nil {
t.Error(err)
}
expectAcnt := &engine.Account{ID: "cgrates.org:1", Disabled: true}
@@ -100,7 +100,7 @@ func TestAcntActsEnableAcnt(t *testing.T) {
ActionsID: "ENABLE_ACNT",
}
at.SetAccountIDs(utils.StringMap{acnt1Tag: true})
if err := at.Execute(); err != nil {
if err := at.Execute(nil, nil); err != nil {
t.Error(err)
}
expectAcnt := &engine.Account{ID: "cgrates.org:1", Disabled: false}

View File

@@ -29,11 +29,15 @@ import (
type Scheduler struct {
sync.RWMutex
queue engine.ActionTimingPriorityList
timer *time.Timer
restartLoop chan bool
storage engine.RatingStorage
schedulerStarted bool
queue engine.ActionTimingPriorityList
timer *time.Timer
restartLoop chan bool
storage engine.RatingStorage
schedulerStarted bool
actStatsInterval time.Duration // How long time to keep the stats in memory
actSucessChan, actFailedChan chan *engine.Action // ActionPlan will pass actions via these channels
aSMux, aFMux sync.RWMutex // protect schedStats
actSuccessStats, actFailedStats map[string]map[time.Time]bool // keep here stats regarding executed actions, map[actionType]map[execTime]bool
}
func NewScheduler(storage engine.RatingStorage) *Scheduler {
@@ -45,6 +49,35 @@ func NewScheduler(storage engine.RatingStorage) *Scheduler {
return s
}
func (s *Scheduler) updateActStats(act *engine.Action, isFailed bool) {
mux := s.aSMux
statsMp := s.actSuccessStats
if isFailed {
mux = s.aFMux
statsMp = s.actFailedStats
}
now := time.Now()
mux.Lock()
for aType := range statsMp {
for t := range statsMp[aType] {
if now.Sub(t) > s.actStatsInterval {
delete(statsMp[aType], t)
if len(statsMp[aType]) == 0 {
delete(statsMp, aType)
}
}
}
}
if act == nil {
return
}
if _, hasIt := statsMp[act.ActionType]; !hasIt {
statsMp[act.ActionType] = make(map[time.Time]bool)
}
statsMp[act.ActionType][now] = true
mux.Unlock()
}
func (s *Scheduler) Loop() {
s.schedulerStarted = true
for {
@@ -61,7 +94,7 @@ func (s *Scheduler) Loop() {
now := time.Now()
start := a0.GetNextStartTime(now)
if start.Equal(now) || start.Before(now) {
go a0.Execute()
go a0.Execute(s.actSucessChan, s.actFailedChan)
// if after execute the next start time is in the past then
// do not add it to the queue
a0.ResetStartTimeCache()