diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 687ac8480..fa1eff410 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -470,7 +470,7 @@ func (self *ApierV1) modifyBalance(aType string, attr *AttrAddBalance, reply *st a.Balance.TimingIDs = utils.StringMapPointer(utils.ParseStringMap(*attr.TimingIds)) } at.SetActions(engine.Actions{a}) - if err := at.Execute(); err != nil { + if err := at.Execute(nil, nil); err != nil { return err } *reply = OK @@ -540,7 +540,7 @@ func (self *ApierV1) SetBalance(attr *utils.AttrSetBalance, reply *string) error a.Balance.TimingIDs = utils.StringMapPointer(utils.ParseStringMap(*attr.TimingIds)) } at.SetActions(engine.Actions{a}) - if err := at.Execute(); err != nil { + if err := at.Execute(nil, nil); err != nil { *reply = err.Error() return err } @@ -600,7 +600,7 @@ func (self *ApierV1) RemoveBalances(attr *utils.AttrSetBalance, reply *string) e a.Balance.TimingIDs = utils.StringMapPointer(utils.ParseStringMap(*attr.TimingIds)) } at.SetActions(engine.Actions{a}) - if err := at.Execute(); err != nil { + if err := at.Execute(nil, nil); err != nil { *reply = err.Error() return err } diff --git a/apier/v1/apier.go b/apier/v1/apier.go index f48b54eb9..146972255 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -181,7 +181,7 @@ func (self *ApierV1) ExecuteAction(attr *utils.AttrExecuteAction, reply *string) if attr.Tenant != "" && attr.Account != "" { at.SetAccountIDs(utils.StringMap{utils.AccountKey(attr.Tenant, attr.Account): true}) } - if err := at.Execute(); err != nil { + if err := at.Execute(nil, nil); err != nil { *reply = err.Error() return err } diff --git a/apier/v1/scheduler.go b/apier/v1/scheduler.go index d0ee56f66..079672ddd 100644 --- a/apier/v1/scheduler.go +++ b/apier/v1/scheduler.go @@ -194,7 +194,7 @@ func (self *ApierV1) ExecuteScheduledActions(attr AttrsExecuteScheduledActions, at.SetAccountIDs(apl.AccountIDs) // copy the accounts at.SetActionPlanID(apl.Id) - err := at.Execute() + err := at.Execute(nil, nil) if err != nil { *reply = err.Error() return err @@ -239,7 +239,7 @@ func (self *ApierV1) ExecuteScheduledActions(attr AttrsExecuteScheduledActions, current = a0.GetNextStartTime(current) if current.Before(attr.TimeEnd) || current.Equal(attr.TimeEnd) { utils.Logger.Info(fmt.Sprintf(" Executing action %s for time %v", a0.ActionsID, current)) - err := a0.Execute() + err := a0.Execute(nil, nil) if err != nil { *reply = err.Error() return err diff --git a/engine/account.go b/engine/account.go index b8cca9198..1a6b20a8b 100644 --- a/engine/account.go +++ b/engine/account.go @@ -21,13 +21,12 @@ import ( "encoding/json" "errors" "fmt" + "strings" "time" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/structmatcher" "github.com/cgrates/cgrates/utils" - - "strings" ) /* diff --git a/engine/action_plan.go b/engine/action_plan.go index 36b215952..1f96e1dd1 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -74,7 +74,7 @@ func (t *Task) Execute() error { Uuid: t.Uuid, ActionsID: t.ActionsID, accountIDs: utils.StringMap{t.AccountID: true}, - }).Execute() + }).Execute(nil, nil) } func (at *ActionTiming) GetNextStartTime(now time.Time) (t time.Time) { @@ -282,7 +282,9 @@ func (at *ActionTiming) getActions() (as []*Action, err error) { return at.actions, err } -func (at *ActionTiming) Execute() (err error) { +// Execute will execute all actions in an action plan +// Reports on success/fail via channel if != nil +func (at *ActionTiming) Execute(successActions, failedActions chan *Action) (err error) { at.ResetStartTimeCache() aac, err := at.getActions() if err != nil { @@ -331,8 +333,14 @@ func (at *ActionTiming) Execute() (err error) { if err := actionFunction(acc, nil, a, aac); err != nil { utils.Logger.Err(fmt.Sprintf("Error executing action %s: %v!", a.ActionType, err)) transactionFailed = true + if failedActions != nil { + go func() { failedActions <- a }() + } break } + if successActions != nil { + go func() { successActions <- a }() + } if a.ActionType == REMOVE_ACCOUNT { removeAccountActionFound = true } @@ -356,12 +364,21 @@ func (at *ActionTiming) Execute() (err error) { // do not allow the action plan to be rescheduled at.Timing = nil utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType)) + if failedActions != nil { + go func() { failedActions <- a }() + } break } if err := actionFunction(nil, nil, a, aac); err != nil { utils.Logger.Err(fmt.Sprintf("Error executing accountless action %s: %v!", a.ActionType, err)) + if failedActions != nil { + go func() { failedActions <- a }() + } break } + if successActions != nil { + go func() { successActions <- a }() + } } } if err != nil { diff --git a/engine/actions_test.go b/engine/actions_test.go index 118cda4a2..5aa386ad9 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -419,7 +419,7 @@ func TestActionPlanLogFunction(t *testing.T) { at := &ActionTiming{ actions: []*Action{a}, } - err := at.Execute() + err := at.Execute(nil, nil) if err != nil { t.Errorf("Could not execute LOG action: %v", err) } @@ -438,7 +438,7 @@ func TestActionPlanFunctionNotAvailable(t *testing.T) { Timing: &RateInterval{}, actions: []*Action{a}, } - err := at.Execute() + err := at.Execute(nil, nil) if err != nil { t.Errorf("Faild to detect wrong function type: %v", err) } @@ -577,7 +577,7 @@ func TestActionPlansRemoveMember(t *testing.T) { actions: actions, } - if err = at.Execute(); err != nil { + if err = at.Execute(nil, nil); err != nil { t.Errorf("Execute Action: %v", err) } @@ -1214,7 +1214,7 @@ func TestRemoveAction(t *testing.T) { accountIDs: utils.StringMap{"cgrates.org:remo": true}, actions: Actions{a}, } - at.Execute() + at.Execute(nil, nil) afterUb, err := accountingStorage.GetAccount("cgrates.org:remo") if err == nil || afterUb != nil { t.Error("error removing account: ", err, afterUb) @@ -1233,7 +1233,7 @@ func TestTopupAction(t *testing.T) { actions: Actions{a}, } - at.Execute() + at.Execute(nil, nil) afterUb, _ := accountingStorage.GetAccount("vdf:minu") initialValue := initialUb.BalanceMap[utils.MONETARY].GetTotalValue() afterValue := afterUb.BalanceMap[utils.MONETARY].GetTotalValue() @@ -1254,7 +1254,7 @@ func TestTopupActionLoaded(t *testing.T) { actions: Actions{a}, } - at.Execute() + at.Execute(nil, nil) afterUb, _ := accountingStorage.GetAccount("vdf:minitsboy") initialValue := initialUb.BalanceMap[utils.MONETARY].GetTotalValue() afterValue := afterUb.BalanceMap[utils.MONETARY].GetTotalValue() @@ -1422,7 +1422,7 @@ func TestActionTransactionFuncType(t *testing.T) { }, }, } - err = at.Execute() + err = at.Execute(nil, nil) acc, err := accountingStorage.GetAccount("cgrates.org:trans") if err != nil || acc == nil { t.Error("Error getting account: ", acc, err) @@ -1458,7 +1458,7 @@ func TestActionTransactionBalanceType(t *testing.T) { }, }, } - err = at.Execute() + err = at.Execute(nil, nil) acc, err := accountingStorage.GetAccount("cgrates.org:trans") if err != nil || acc == nil { t.Error("Error getting account: ", acc, err) @@ -1494,7 +1494,7 @@ func TestActionTransactionBalanceNotType(t *testing.T) { }, }, } - err = at.Execute() + err = at.Execute(nil, nil) acc, err := accountingStorage.GetAccount("cgrates.org:trans") if err != nil || acc == nil { t.Error("Error getting account: ", acc, err) @@ -1537,7 +1537,7 @@ func TestActionWithExpireWithoutExpire(t *testing.T) { }, }, } - err = at.Execute() + err = at.Execute(nil, nil) acc, err := accountingStorage.GetAccount("cgrates.org:exp") if err != nil || acc == nil { t.Errorf("Error getting account: %+v: %v", acc, err) @@ -1584,7 +1584,7 @@ func TestActionRemoveBalance(t *testing.T) { }, }, } - err = at.Execute() + err = at.Execute(nil, nil) acc, err := accountingStorage.GetAccount("cgrates.org:rembal") if err != nil || acc == nil { t.Errorf("Error getting account: %+v: %v", acc, err) @@ -1633,7 +1633,7 @@ func TestActionTransferMonetaryDefault(t *testing.T) { accountIDs: utils.StringMap{"cgrates.org:trans": true}, actions: Actions{a}, } - at.Execute() + at.Execute(nil, nil) afterUb, err := accountingStorage.GetAccount("cgrates.org:trans") if err != nil { @@ -1694,7 +1694,7 @@ func TestActionTransferMonetaryDefaultFilter(t *testing.T) { accountIDs: utils.StringMap{"cgrates.org:trans": true}, actions: Actions{a}, } - at.Execute() + at.Execute(nil, nil) afterUb, err := accountingStorage.GetAccount("cgrates.org:trans") if err != nil { @@ -1760,7 +1760,7 @@ func TestActionConditionalTopup(t *testing.T) { accountIDs: utils.StringMap{"cgrates.org:cond": true}, actions: Actions{a}, } - at.Execute() + at.Execute(nil, nil) afterUb, err := accountingStorage.GetAccount("cgrates.org:cond") if err != nil { @@ -1824,7 +1824,7 @@ func TestActionConditionalTopupNoMatch(t *testing.T) { accountIDs: utils.StringMap{"cgrates.org:cond": true}, actions: Actions{a}, } - at.Execute() + at.Execute(nil, nil) afterUb, err := accountingStorage.GetAccount("cgrates.org:cond") if err != nil { @@ -1888,7 +1888,7 @@ func TestActionConditionalTopupExistingBalance(t *testing.T) { accountIDs: utils.StringMap{"cgrates.org:cond": true}, actions: Actions{a}, } - at.Execute() + at.Execute(nil, nil) afterUb, err := accountingStorage.GetAccount("cgrates.org:cond") if err != nil { @@ -2038,7 +2038,7 @@ func TestActionConditionalDisabledIfNegative(t *testing.T) { accountIDs: utils.StringMap{"cgrates.org:af": true}, actions: Actions{a1, a2, a3, a4, a5}, } - at.Execute() + at.Execute(nil, nil) afterUb, err := accountingStorage.GetAccount("cgrates.org:af") if err != nil { @@ -2109,7 +2109,7 @@ func TestActionSetBalance(t *testing.T) { accountIDs: utils.StringMap{"cgrates.org:setb": true}, actions: Actions{a}, } - at.Execute() + at.Execute(nil, nil) afterUb, err := accountingStorage.GetAccount("cgrates.org:setb") if err != nil { @@ -2146,7 +2146,7 @@ func TestActionExpirationTime(t *testing.T) { actions: a, } for rep := 0; rep < 5; rep++ { - at.Execute() + at.Execute(nil, nil) afterUb, err := accountingStorage.GetAccount("cgrates.org:expo") if err != nil || len(afterUb.BalanceMap[utils.VOICE]) != rep+1 { @@ -2169,7 +2169,7 @@ func TestActionExpNoExp(t *testing.T) { accountIDs: utils.StringMap{"cgrates.org:expnoexp": true}, actions: exp, } - at.Execute() + at.Execute(nil, nil) afterUb, err := accountingStorage.GetAccount("cgrates.org:expnoexp") if err != nil || len(afterUb.BalanceMap[utils.VOICE]) != 2 { @@ -2222,7 +2222,7 @@ func TestActionCdrlogBalanceValue(t *testing.T) { }, }, } - err = at.Execute() + err = at.Execute(nil, nil) acc, err := accountingStorage.GetAccount("cgrates.org:bv") if err != nil || acc == nil { t.Error("Error getting account: ", acc, err) @@ -2310,7 +2310,7 @@ func TestValueFormulaDebit(t *testing.T) { accountIDs: utils.StringMap{"cgrates.org:vf": true}, ActionsID: "VF", } - at.Execute() + at.Execute(nil, nil) afterUb, err := accountingStorage.GetAccount("cgrates.org:vf") // not an exact value, depends of month v := afterUb.BalanceMap[utils.MONETARY].GetTotalValue() diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go index 9326306d9..8c07fd062 100644 --- a/engine/calldesc_test.go +++ b/engine/calldesc_test.go @@ -560,7 +560,7 @@ func TestMaxSessionTimeWithMaxRate(t *testing.T) { //log.Print(ap) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } //acc, _ := accountingStorage.GetAccount("cgrates.org:12345") //log.Print("ACC: ", utils.ToIJSON(acc)) @@ -587,7 +587,7 @@ func TestMaxSessionTimeWithMaxCost(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -611,7 +611,7 @@ func TestGetMaxSessiontWithBlocker(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("BLOCK_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } acc, err := accountingStorage.GetAccount("cgrates.org:block") if err != nil { @@ -662,7 +662,7 @@ func TestGetMaxSessiontWithBlockerEmpty(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("BLOCK_EMPTY_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } acc, err := accountingStorage.GetAccount("cgrates.org:block_empty") if err != nil { @@ -713,7 +713,7 @@ func TestGetCostWithMaxCost(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -737,7 +737,7 @@ func TestGetCostRoundingIssue(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -762,7 +762,7 @@ func TestGetCostRatingInfoOnZeroTime(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -790,7 +790,7 @@ func TestDebitRatingInfoOnZeroTime(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -819,7 +819,7 @@ func TestMaxDebitRatingInfoOnZeroTime(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -847,7 +847,7 @@ func TestMaxDebitUnknowDest(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -870,7 +870,7 @@ func TestMaxDebitRoundingIssue(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -905,7 +905,7 @@ func TestDebitRoundingRefund(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -940,7 +940,7 @@ func TestMaxSessionTimeWithMaxCostFree(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -964,7 +964,7 @@ func TestMaxDebitWithMaxCostFree(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -988,7 +988,7 @@ func TestGetCostWithMaxCostFree(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ Direction: "*out", @@ -1043,12 +1043,12 @@ func TestMaxSessionTimeWithAccountShared(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED0_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } ap, _ = ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd0 := &CallDescriptor{ @@ -1084,12 +1084,12 @@ func TestMaxDebitWithAccountShared(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED0_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } ap, _ = ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ @@ -1307,7 +1307,7 @@ func TestDebitFromShareAndNormal(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ @@ -1336,7 +1336,7 @@ func TestDebitFromEmptyShare(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP_EMPTY_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ @@ -1365,7 +1365,7 @@ func TestDebitNegatve(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("POST_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd := &CallDescriptor{ @@ -1405,7 +1405,7 @@ func TestMaxDebitZeroDefinedRate(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd1 := &CallDescriptor{ Direction: "*out", @@ -1435,7 +1435,7 @@ func TestMaxDebitForceDuration(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd1 := &CallDescriptor{ Direction: "*out", @@ -1460,7 +1460,7 @@ func TestMaxDebitZeroDefinedRateOnlyMinutes(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd1 := &CallDescriptor{ Direction: "*out", @@ -1489,7 +1489,7 @@ func TestMaxDebitConsumesMinutes(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs - at.Execute() + at.Execute(nil, nil) } cd1 := &CallDescriptor{ Direction: "*out", diff --git a/general_tests/acntacts_test.go b/general_tests/acntacts_test.go index 4f493fee8..0e9a5911e 100644 --- a/general_tests/acntacts_test.go +++ b/general_tests/acntacts_test.go @@ -83,7 +83,7 @@ func TestAcntActsDisableAcnt(t *testing.T) { ActionsID: "DISABLE_ACNT", } at.SetAccountIDs(utils.StringMap{acnt1Tag: true}) - if err := at.Execute(); err != nil { + if err := at.Execute(nil, nil); err != nil { t.Error(err) } expectAcnt := &engine.Account{ID: "cgrates.org:1", Disabled: true} @@ -100,7 +100,7 @@ func TestAcntActsEnableAcnt(t *testing.T) { ActionsID: "ENABLE_ACNT", } at.SetAccountIDs(utils.StringMap{acnt1Tag: true}) - if err := at.Execute(); err != nil { + if err := at.Execute(nil, nil); err != nil { t.Error(err) } expectAcnt := &engine.Account{ID: "cgrates.org:1", Disabled: false} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 7b6059187..374915230 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -29,11 +29,15 @@ import ( type Scheduler struct { sync.RWMutex - queue engine.ActionTimingPriorityList - timer *time.Timer - restartLoop chan bool - storage engine.RatingStorage - schedulerStarted bool + queue engine.ActionTimingPriorityList + timer *time.Timer + restartLoop chan bool + storage engine.RatingStorage + schedulerStarted bool + actStatsInterval time.Duration // How long time to keep the stats in memory + actSucessChan, actFailedChan chan *engine.Action // ActionPlan will pass actions via these channels + aSMux, aFMux sync.RWMutex // protect schedStats + actSuccessStats, actFailedStats map[string]map[time.Time]bool // keep here stats regarding executed actions, map[actionType]map[execTime]bool } func NewScheduler(storage engine.RatingStorage) *Scheduler { @@ -45,6 +49,35 @@ func NewScheduler(storage engine.RatingStorage) *Scheduler { return s } +func (s *Scheduler) updateActStats(act *engine.Action, isFailed bool) { + mux := s.aSMux + statsMp := s.actSuccessStats + if isFailed { + mux = s.aFMux + statsMp = s.actFailedStats + } + now := time.Now() + mux.Lock() + for aType := range statsMp { + for t := range statsMp[aType] { + if now.Sub(t) > s.actStatsInterval { + delete(statsMp[aType], t) + if len(statsMp[aType]) == 0 { + delete(statsMp, aType) + } + } + } + } + if act == nil { + return + } + if _, hasIt := statsMp[act.ActionType]; !hasIt { + statsMp[act.ActionType] = make(map[time.Time]bool) + } + statsMp[act.ActionType][now] = true + mux.Unlock() +} + func (s *Scheduler) Loop() { s.schedulerStarted = true for { @@ -61,7 +94,7 @@ func (s *Scheduler) Loop() { now := time.Now() start := a0.GetNextStartTime(now) if start.Equal(now) || start.Before(now) { - go a0.Execute() + go a0.Execute(s.actSucessChan, s.actFailedChan) // if after execute the next start time is in the past then // do not add it to the queue a0.ResetStartTimeCache()