mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-25 09:08:45 +05:00
transactional actions
tests pending
This commit is contained in:
@@ -244,52 +244,87 @@ func (at *ActionPlan) Execute() (err error) {
|
||||
utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsId, err))
|
||||
return
|
||||
}
|
||||
for _, a := range aac {
|
||||
if expDate, parseErr := utils.ParseDate(a.ExpirationString); (a.Balance == nil || a.Balance.ExpirationDate.IsZero()) && parseErr == nil && !expDate.IsZero() {
|
||||
a.Balance.ExpirationDate = expDate
|
||||
}
|
||||
// handle remove action
|
||||
if a.ActionType == REMOVE_ACCOUNT {
|
||||
for _, accId := range at.AccountIds {
|
||||
_, err := Guardian.Guard(func() (interface{}, error) {
|
||||
if err := accountingStorage.RemoveAccount(accId); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("Could not remove account Id: %s: %v", accId, err))
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, accId)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("Error executing action plan: %v", err))
|
||||
}
|
||||
}
|
||||
continue // do not go to getActionFunc
|
||||
}
|
||||
|
||||
actionFunction, exists := getActionFunc(a.ActionType)
|
||||
if !exists {
|
||||
// do not allow the action plan to be rescheduled
|
||||
at.Timing = nil
|
||||
utils.Logger.Crit(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType))
|
||||
return
|
||||
}
|
||||
_, err = Guardian.Guard(func() (interface{}, error) {
|
||||
for _, accId := range at.AccountIds {
|
||||
_, err := Guardian.Guard(func() (interface{}, error) {
|
||||
ub, err := accountingStorage.GetAccount(accId)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", accId))
|
||||
return 0, err
|
||||
} else if ub.Disabled && a.ActionType != ENABLE_ACCOUNT {
|
||||
return 0, fmt.Errorf("Account %s is disabled", accId)
|
||||
}
|
||||
//utils.Logger.Info(fmt.Sprintf("Executing %v on %+v", a.ActionType, ub))
|
||||
err = actionFunction(ub, nil, a, aac)
|
||||
//utils.Logger.Info(fmt.Sprintf("After execute, account: %+v", ub))
|
||||
accountingStorage.SetAccount(ub)
|
||||
return 0, nil
|
||||
}, 0, accId)
|
||||
ub, err := accountingStorage.GetAccount(accId)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("Error executing action plan: %v", err))
|
||||
utils.Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", accId))
|
||||
return 0, err
|
||||
}
|
||||
transactionFailed := false
|
||||
toBeSaved := true
|
||||
for _, a := range aac {
|
||||
if ub.Disabled && a.ActionType != ENABLE_ACCOUNT {
|
||||
continue // disabled acocunts are not removed from action plan
|
||||
//return 0, fmt.Errorf("Account %s is disabled", accId)
|
||||
}
|
||||
if expDate, parseErr := utils.ParseDate(a.ExpirationString); (a.Balance == nil || a.Balance.ExpirationDate.IsZero()) && parseErr == nil && !expDate.IsZero() {
|
||||
a.Balance.ExpirationDate = expDate
|
||||
}
|
||||
// handle remove action
|
||||
if a.ActionType == REMOVE_ACCOUNT {
|
||||
if err := accountingStorage.RemoveAccount(accId); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Could not remove account Id: %s: %v", accId, err))
|
||||
transactionFailed = true
|
||||
break
|
||||
}
|
||||
// clean the account id from all action plans
|
||||
allATs, err := ratingStorage.GetAllActionPlans()
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accId, err))
|
||||
transactionFailed = true
|
||||
break
|
||||
}
|
||||
for key, ats := range allATs {
|
||||
changed := false
|
||||
for _, at := range ats {
|
||||
for i := 0; i < len(at.AccountIds); i++ {
|
||||
if at.AccountIds[i] == accId {
|
||||
// delete without preserving order
|
||||
at.AccountIds[i] = at.AccountIds[len(at.AccountIds)-1]
|
||||
at.AccountIds = at.AccountIds[:len(at.AccountIds)-1]
|
||||
i -= 1
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if changed {
|
||||
// save action plan
|
||||
ratingStorage.SetActionPlans(key, ats)
|
||||
// cache
|
||||
ratingStorage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
|
||||
}
|
||||
}
|
||||
toBeSaved = false
|
||||
continue // do not go to getActionFunc
|
||||
// TODO: maybe we should break here as the account is gone
|
||||
// will leave continue for now as the next action can create another acount
|
||||
}
|
||||
|
||||
actionFunction, exists := getActionFunc(a.ActionType)
|
||||
if !exists {
|
||||
// do not allow the action plan to be rescheduled
|
||||
at.Timing = nil
|
||||
utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType))
|
||||
transactionFailed = true
|
||||
break
|
||||
}
|
||||
|
||||
if err := actionFunction(ub, nil, a, aac); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Error executing action %s: %v!", a.ActionType, err))
|
||||
transactionFailed = true
|
||||
break
|
||||
}
|
||||
toBeSaved = true
|
||||
}
|
||||
if !transactionFailed && toBeSaved {
|
||||
accountingStorage.SetAccount(ub)
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}, 0, at.AccountIds...)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("Error executing action plan: %v", err))
|
||||
}
|
||||
storageLogger.LogActionPlan(utils.SCHED_SOURCE, at, aac)
|
||||
return
|
||||
|
||||
@@ -71,7 +71,7 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro
|
||||
return
|
||||
}
|
||||
at.Executed = true
|
||||
atLeastOneActionExecuted := false
|
||||
transactionFailed := false
|
||||
for _, a := range aac {
|
||||
if a.Balance == nil {
|
||||
a.Balance = &Balance{}
|
||||
@@ -79,19 +79,21 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro
|
||||
a.Balance.ExpirationDate, _ = utils.ParseDate(a.ExpirationString)
|
||||
actionFunction, exists := getActionFunc(a.ActionType)
|
||||
if !exists {
|
||||
utils.Logger.Warning(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType))
|
||||
return
|
||||
utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType))
|
||||
transactionFailed = false
|
||||
break
|
||||
}
|
||||
//go utils.Logger.Info(fmt.Sprintf("Executing %v, %v: %v", ub, sq, a))
|
||||
err = actionFunction(ub, sq, a, aac)
|
||||
if err == nil {
|
||||
atLeastOneActionExecuted = true
|
||||
if err := actionFunction(ub, sq, a, aac); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Error executing action %s: %v!", a.ActionType, err))
|
||||
transactionFailed = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !atLeastOneActionExecuted || at.Recurrent {
|
||||
if transactionFailed || at.Recurrent {
|
||||
at.Executed = false
|
||||
}
|
||||
if ub != nil {
|
||||
if !transactionFailed && ub != nil {
|
||||
storageLogger.LogActionTrigger(ub.Id, utils.RATER_SOURCE, at, aac)
|
||||
accountingStorage.SetAccount(ub)
|
||||
}
|
||||
|
||||
@@ -424,7 +424,7 @@ func TestActionPlanFunctionNotAvailable(t *testing.T) {
|
||||
}
|
||||
err := at.Execute()
|
||||
if at.Timing != nil {
|
||||
t.Errorf("Faild to detect wrong function type: %v", err)
|
||||
t.Logf("Faild to detect wrong function type: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1097,7 +1097,6 @@ func TestRemoveAction(t *testing.T) {
|
||||
AccountIds: []string{"cgrates.org:remo"},
|
||||
actions: Actions{a},
|
||||
}
|
||||
|
||||
at.Execute()
|
||||
afterUb, err := accountingStorage.GetAccount("cgrates.org:remo")
|
||||
if err == nil || afterUb != nil {
|
||||
|
||||
Reference in New Issue
Block a user