This commit is contained in:
DanB
2015-12-18 19:02:59 +01:00
27 changed files with 900 additions and 664 deletions

View File

@@ -45,17 +45,24 @@ func (self *ApierV1) GetAccountActionPlan(attrs AttrAcntAction, reply *[]*Accoun
if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(strings.Join(missing, ","), "")
}
accountATs := make([]*AccountActionTiming, 0)
allATs, err := self.RatingDb.GetAllActionPlans()
accountATs := make([]*AccountActionTiming, 0) // needs to be initialized if remains empty
allAPs, err := self.RatingDb.GetAllActionPlans()
if err != nil {
return utils.NewErrServerError(err)
}
for _, ats := range allATs {
for _, at := range ats {
if utils.IsSliceMember(at.AccountIds, utils.AccountKey(attrs.Tenant, attrs.Account)) {
accountATs = append(accountATs, &AccountActionTiming{Uuid: at.Uuid, ActionPlanId: at.Id, ActionsId: at.ActionsId, NextExecTime: at.GetNextStartTime(time.Now())})
accID := utils.AccountKey(attrs.Tenant, attrs.Account)
for _, ap := range allAPs {
if _, exists := ap.AccountIDs[accID]; exists {
for _, at := range ap.ActionTimings {
accountATs = append(accountATs, &AccountActionTiming{
ActionPlanId: ap.Id,
Uuid: at.Uuid,
ActionsId: at.ActionsID,
NextExecTime: at.GetNextStartTime(time.Now()),
})
}
}
}
*reply = accountATs
return nil
@@ -80,22 +87,41 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e
}
}
_, err := engine.Guardian.Guard(func() (interface{}, error) {
ats, err := self.RatingDb.GetActionPlans(attrs.ActionPlanId, false)
ap, err := self.RatingDb.GetActionPlan(attrs.ActionPlanId, false)
if err != nil {
return 0, err
} else if len(ats) == 0 {
} else if ap == nil {
return 0, utils.ErrNotFound
}
ats = engine.RemActionPlan(ats, attrs.ActionTimingId, utils.AccountKey(attrs.Tenant, attrs.Account))
if err := self.RatingDb.SetActionPlans(attrs.ActionPlanId, ats); err != nil {
return 0, err
if attrs.ActionPlanId != "" { // delete the entire action plan
ap.ActionTimings = nil // will delete the action plan
return 0, self.RatingDb.SetActionPlan(ap.Id, ap)
}
if len(ats) > 0 { // update cache
self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + attrs.ActionPlanId}})
if attrs.ActionTimingId != "" { // delete only a action timing from action plan
for i, at := range ap.ActionTimings {
if at.Uuid == attrs.ActionTimingId {
ap.ActionTimings[i] = ap.ActionTimings[len(ap.ActionTimings)-1]
ap.ActionTimings = ap.ActionTimings[:len(ap.ActionTimings)-1]
break
}
}
return 0, self.RatingDb.SetActionPlan(ap.Id, ap)
}
if attrs.Tenant != "" && attrs.Account != "" {
accID := utils.AccountKey(attrs.Tenant, attrs.Account)
delete(ap.AccountIDs, accID)
return 0, self.RatingDb.SetActionPlan(ap.Id, ap)
}
// update cache
self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + attrs.ActionPlanId}})
return 0, nil
}, 0, utils.ACTION_PLAN_PREFIX)
if err != nil {
*reply = err.Error()
return utils.NewErrServerError(err)
}
if attrs.ReloadScheduler && self.Sched != nil {
@@ -130,9 +156,9 @@ func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, r
if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
accId := utils.AccountKey(attrs.Tenant, attrs.Account)
accID := utils.AccountKey(attrs.Tenant, attrs.Account)
_, err := engine.Guardian.Guard(func() (interface{}, error) {
ub, err := self.AccountDb.GetAccount(accId)
ub, err := self.AccountDb.GetAccount(accID)
if err != nil {
return 0, err
}
@@ -152,7 +178,7 @@ func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, r
return 0, err
}
return 0, nil
}, 0, accId)
}, 0, accID)
if err != nil {
return utils.NewErrServerError(err)
}
@@ -166,30 +192,31 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error
return utils.NewErrMandatoryIeMissing(missing...)
}
var schedulerReloadNeeded = false
accId := utils.AccountKey(attr.Tenant, attr.Account)
accID := utils.AccountKey(attr.Tenant, attr.Account)
var ub *engine.Account
_, err := engine.Guardian.Guard(func() (interface{}, error) {
if bal, _ := self.AccountDb.GetAccount(accId); bal != nil {
if bal, _ := self.AccountDb.GetAccount(accID); bal != nil {
ub = bal
} else { // Not found in db, create it here
ub = &engine.Account{
Id: accId,
Id: accID,
}
}
if len(attr.ActionPlanId) != 0 {
_, err := engine.Guardian.Guard(func() (interface{}, error) {
var ats engine.ActionPlans
var ap *engine.ActionPlan
var err error
ats, err = self.RatingDb.GetActionPlans(attr.ActionPlanId, false)
ap, err = self.RatingDb.GetActionPlan(attr.ActionPlanId, false)
if err != nil {
return 0, err
}
for _, at := range ats {
at.AccountIds = append(at.AccountIds, accId)
}
if len(ats) != 0 {
if _, exists := ap.AccountIDs[accID]; !exists {
if ap.AccountIDs == nil {
ap.AccountIDs = make(map[string]struct{})
}
ap.AccountIDs[accID] = struct{}{}
schedulerReloadNeeded = true
if err := self.RatingDb.SetActionPlans(attr.ActionPlanId, ats); err != nil {
if err := self.RatingDb.SetActionPlan(attr.ActionPlanId, ap); err != nil {
return 0, err
}
// update cache
@@ -221,11 +248,11 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error
return 0, err
}
return 0, nil
}, 0, accId)
}, 0, accID)
if err != nil {
return utils.NewErrServerError(err)
}
if schedulerReloadNeeded {
if attr.ReloadScheduler && schedulerReloadNeeded {
// reload scheduler
if self.Sched != nil {
self.Sched.Reload(true)
@@ -239,32 +266,20 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string)
if missing := utils.MissingStructFields(&attr, []string{"Tenant", "Account"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
accountId := utils.AccountKey(attr.Tenant, attr.Account)
accID := utils.AccountKey(attr.Tenant, attr.Account)
var schedulerReloadNeeded bool
_, err := engine.Guardian.Guard(func() (interface{}, error) {
// remove it from all action plans
allATs, err := self.RatingDb.GetAllActionPlans()
allAPs, err := self.RatingDb.GetAllActionPlans()
if err != nil && err != utils.ErrNotFound {
return 0, err
}
for key, ats := range allATs {
changed := false
for _, at := range ats {
for i := 0; i < len(at.AccountIds); i++ {
if at.AccountIds[i] == accountId {
// delete without preserving order
at.AccountIds[i] = at.AccountIds[len(at.AccountIds)-1]
at.AccountIds = at.AccountIds[:len(at.AccountIds)-1]
i--
changed = true
}
}
}
if changed {
for key, ap := range allAPs {
if _, exists := ap.AccountIDs[accID]; !exists {
schedulerReloadNeeded = true
_, err := engine.Guardian.Guard(func() (interface{}, error) {
// save action plan
self.RatingDb.SetActionPlans(key, ats)
self.RatingDb.SetActionPlan(key, ap)
// cache
self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
return 0, nil
@@ -274,11 +289,11 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string)
}
}
}
if err := self.AccountDb.RemoveAccount(accountId); err != nil {
if err := self.AccountDb.RemoveAccount(accID); err != nil {
return 0, err
}
return 0, nil
}, 0, accountId)
}, 0, accID)
// FIXME: remove from all actionplans?
if err != nil {
return utils.NewErrServerError(err)
@@ -344,3 +359,153 @@ func (self *ApierV1) GetAccount(attr *utils.AttrGetAccount, reply *interface{})
*reply = userBalance.AsOldStructure()
return nil
}
type AttrAddBalance struct {
Tenant string
Account string
BalanceUuid string
BalanceId string
BalanceType string
Directions string
Value float64
ExpiryTime string
RatingSubject string
Categories string
DestinationIds string
Weight float64
SharedGroups string
Overwrite bool // When true it will reset if the balance is already there
Disabled bool
}
func (self *ApierV1) AddBalance(attr *AttrAddBalance, reply *string) error {
expTime, err := utils.ParseTimeDetectLayout(attr.ExpiryTime, self.Config.DefaultTimezone)
if err != nil {
*reply = err.Error()
return err
}
accID := utils.AccountKey(attr.Tenant, attr.Account)
if _, err := self.AccountDb.GetAccount(accID); err != nil {
// create account if not exists
account := &engine.Account{
Id: accID,
}
if err := self.AccountDb.SetAccount(account); err != nil {
*reply = err.Error()
return err
}
}
at := &engine.ActionTiming{}
at.SetAccountIDs(map[string]struct{}{accID: struct{}{}})
aType := engine.DEBIT
// reverse the sign as it is a debit
attr.Value = -attr.Value
if attr.Overwrite {
aType = engine.DEBIT_RESET
}
at.SetActions(engine.Actions{
&engine.Action{
ActionType: aType,
BalanceType: attr.BalanceType,
Balance: &engine.Balance{
Uuid: attr.BalanceUuid,
Id: attr.BalanceId,
Value: attr.Value,
ExpirationDate: expTime,
RatingSubject: attr.RatingSubject,
Directions: utils.ParseStringMap(attr.Directions),
DestinationIds: utils.ParseStringMap(attr.DestinationIds),
Categories: utils.ParseStringMap(attr.Categories),
Weight: attr.Weight,
SharedGroups: utils.ParseStringMap(attr.SharedGroups),
Disabled: attr.Disabled,
},
},
})
if err := at.Execute(); err != nil {
*reply = err.Error()
return err
}
*reply = OK
return nil
}
func (self *ApierV1) EnableDisableBalance(attr *AttrAddBalance, reply *string) error {
expTime, err := utils.ParseDate(attr.ExpiryTime)
if err != nil {
*reply = err.Error()
return err
}
accID := utils.ConcatenatedKey(attr.Tenant, attr.Account)
if _, err := self.AccountDb.GetAccount(accID); err != nil {
return utils.ErrNotFound
}
at := &engine.ActionTiming{}
at.SetAccountIDs(map[string]struct{}{accID: struct{}{}})
at.SetActions(engine.Actions{
&engine.Action{
ActionType: engine.ENABLE_DISABLE_BALANCE,
BalanceType: attr.BalanceType,
Balance: &engine.Balance{
Uuid: attr.BalanceUuid,
Id: attr.BalanceId,
Value: attr.Value,
ExpirationDate: expTime,
RatingSubject: attr.RatingSubject,
Directions: utils.ParseStringMap(attr.Directions),
DestinationIds: utils.ParseStringMap(attr.DestinationIds),
Weight: attr.Weight,
SharedGroups: utils.ParseStringMap(attr.SharedGroups),
Disabled: attr.Disabled,
},
},
})
if err := at.Execute(); err != nil {
*reply = err.Error()
return err
}
*reply = OK
return nil
}
func (self *ApierV1) RemoveBalances(attr *AttrAddBalance, reply *string) error {
expTime, err := utils.ParseDate(attr.ExpiryTime)
if err != nil {
*reply = err.Error()
return err
}
accID := utils.AccountKey(attr.Tenant, attr.Account)
if _, err := self.AccountDb.GetAccount(accID); err != nil {
return utils.ErrNotFound
}
at := &engine.ActionTiming{}
at.SetAccountIDs(map[string]struct{}{accID: struct{}{}})
at.SetActions(engine.Actions{
&engine.Action{
ActionType: engine.REMOVE_BALANCE,
BalanceType: attr.BalanceType,
Balance: &engine.Balance{
Uuid: attr.BalanceUuid,
Id: attr.BalanceId,
Value: attr.Value,
ExpirationDate: expTime,
RatingSubject: attr.RatingSubject,
Directions: utils.ParseStringMap(attr.Directions),
DestinationIds: utils.ParseStringMap(attr.DestinationIds),
Weight: attr.Weight,
SharedGroups: utils.ParseStringMap(attr.SharedGroups),
Disabled: attr.Disabled,
},
},
})
if err := at.Execute(); err != nil {
*reply = err.Error()
return err
}
*reply = OK
return nil
}

View File

@@ -102,162 +102,12 @@ func (self *ApierV1) GetRatingPlan(rplnId string, reply *engine.RatingPlan) erro
return nil
}
type AttrAddBalance struct {
Tenant string
Account string
BalanceUuid string
BalanceId string
BalanceType string
Directions string
Value float64
ExpiryTime string
RatingSubject string
Categories string
DestinationIds string
Weight float64
SharedGroups string
Overwrite bool // When true it will reset if the balance is already there
Disabled bool
}
func (self *ApierV1) AddBalance(attr *AttrAddBalance, reply *string) error {
expTime, err := utils.ParseTimeDetectLayout(attr.ExpiryTime, self.Config.DefaultTimezone)
if err != nil {
*reply = err.Error()
return err
}
tag := utils.ConcatenatedKey(attr.Tenant, attr.Account)
if _, err := self.AccountDb.GetAccount(tag); err != nil {
// create account if not exists
account := &engine.Account{
Id: tag,
}
if err := self.AccountDb.SetAccount(account); err != nil {
*reply = err.Error()
return err
}
}
at := &engine.ActionPlan{
AccountIds: []string{tag},
}
aType := engine.DEBIT
// reverse the sign as it is a debit
attr.Value = -attr.Value
if attr.Overwrite {
aType = engine.DEBIT_RESET
}
at.SetActions(engine.Actions{
&engine.Action{
ActionType: aType,
BalanceType: attr.BalanceType,
Balance: &engine.Balance{
Uuid: attr.BalanceUuid,
Id: attr.BalanceId,
Value: attr.Value,
ExpirationDate: expTime,
RatingSubject: attr.RatingSubject,
Directions: utils.ParseStringMap(attr.Directions),
DestinationIds: utils.ParseStringMap(attr.DestinationIds),
Categories: utils.ParseStringMap(attr.Categories),
Weight: attr.Weight,
SharedGroups: utils.ParseStringMap(attr.SharedGroups),
Disabled: attr.Disabled,
},
},
})
if err := at.Execute(); err != nil {
*reply = err.Error()
return err
}
*reply = OK
return nil
}
func (self *ApierV1) EnableDisableBalance(attr *AttrAddBalance, reply *string) error {
expTime, err := utils.ParseDate(attr.ExpiryTime)
if err != nil {
*reply = err.Error()
return err
}
tag := utils.ConcatenatedKey(attr.Tenant, attr.Account)
if _, err := self.AccountDb.GetAccount(tag); err != nil {
return utils.ErrNotFound
}
at := &engine.ActionPlan{
AccountIds: []string{tag},
}
at.SetActions(engine.Actions{
&engine.Action{
ActionType: engine.ENABLE_DISABLE_BALANCE,
BalanceType: attr.BalanceType,
Balance: &engine.Balance{
Uuid: attr.BalanceUuid,
Id: attr.BalanceId,
Value: attr.Value,
ExpirationDate: expTime,
RatingSubject: attr.RatingSubject,
Directions: utils.ParseStringMap(attr.Directions),
DestinationIds: utils.ParseStringMap(attr.DestinationIds),
Weight: attr.Weight,
SharedGroups: utils.ParseStringMap(attr.SharedGroups),
Disabled: attr.Disabled,
},
},
})
if err := at.Execute(); err != nil {
*reply = err.Error()
return err
}
*reply = OK
return nil
}
func (self *ApierV1) RemoveBalances(attr *AttrAddBalance, reply *string) error {
expTime, err := utils.ParseDate(attr.ExpiryTime)
if err != nil {
*reply = err.Error()
return err
}
accId := utils.ConcatenatedKey(attr.Tenant, attr.Account)
if _, err := self.AccountDb.GetAccount(accId); err != nil {
return utils.ErrNotFound
}
at := &engine.ActionPlan{
AccountIds: []string{accId},
}
at.SetActions(engine.Actions{
&engine.Action{
ActionType: engine.REMOVE_BALANCE,
BalanceType: attr.BalanceType,
Balance: &engine.Balance{
Uuid: attr.BalanceUuid,
Id: attr.BalanceId,
Value: attr.Value,
ExpirationDate: expTime,
RatingSubject: attr.RatingSubject,
Directions: utils.ParseStringMap(attr.Directions),
DestinationIds: utils.ParseStringMap(attr.DestinationIds),
Weight: attr.Weight,
SharedGroups: utils.ParseStringMap(attr.SharedGroups),
Disabled: attr.Disabled,
},
},
})
if err := at.Execute(); err != nil {
*reply = err.Error()
return err
}
*reply = OK
return nil
}
func (self *ApierV1) ExecuteAction(attr *utils.AttrExecuteAction, reply *string) error {
accId := utils.AccountKey(attr.Tenant, attr.Account)
at := &engine.ActionPlan{
AccountIds: []string{accId},
ActionsId: attr.ActionsId,
accID := utils.AccountKey(attr.Tenant, attr.Account)
at := &engine.ActionTiming{
ActionsID: attr.ActionsId,
}
at.SetAccountIDs(map[string]struct{}{accID: struct{}{}})
if err := at.Execute(); err != nil {
*reply = err.Error()
return err
@@ -745,8 +595,10 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) error
return utils.ErrExists
}
}
storeAtms := make(engine.ActionPlans, len(attrs.ActionPlan))
for idx, apiAtm := range attrs.ActionPlan {
ap := &engine.ActionPlan{
Id: attrs.Id,
}
for _, apiAtm := range attrs.ActionPlan {
if exists, err := self.RatingDb.HasData(utils.ACTION_PREFIX, apiAtm.ActionsId); err != nil {
return utils.NewErrServerError(err)
} else if !exists {
@@ -758,16 +610,14 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) error
timing.MonthDays.Parse(apiAtm.MonthDays, ";")
timing.WeekDays.Parse(apiAtm.WeekDays, ";")
timing.StartTime = apiAtm.Time
at := &engine.ActionPlan{
ap.ActionTimings = append(ap.ActionTimings, &engine.ActionTiming{
Uuid: utils.GenUUID(),
Id: attrs.Id,
Weight: apiAtm.Weight,
Timing: &engine.RateInterval{Timing: timing},
ActionsId: apiAtm.ActionsId,
}
storeAtms[idx] = at
ActionsID: apiAtm.ActionsId,
})
}
if err := self.RatingDb.SetActionPlans(attrs.Id, storeAtms); err != nil {
if err := self.RatingDb.SetActionPlan(ap.Id, ap); err != nil {
return utils.NewErrServerError(err)
}
self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + attrs.Id}})
@@ -785,8 +635,8 @@ type AttrGetActionPlan struct {
Id string
}
func (self *ApierV1) GetActionPlan(attr AttrGetActionPlan, reply *[]engine.ActionPlans) error {
var result []engine.ActionPlans
func (self *ApierV1) GetActionPlan(attr AttrGetActionPlan, reply *[]*engine.ActionPlan) error {
var result []*engine.ActionPlan
if attr.Id == "" || attr.Id == "*" {
aplsMap, err := self.RatingDb.GetAllActionPlans()
if err != nil {
@@ -796,7 +646,7 @@ func (self *ApierV1) GetActionPlan(attr AttrGetActionPlan, reply *[]engine.Actio
result = append(result, apls)
}
} else {
apls, err := self.RatingDb.GetActionPlans(attr.Id, false)
apls, err := self.RatingDb.GetActionPlan(attr.Id, false)
if err != nil {
return err
}
@@ -956,7 +806,7 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str
}
// ToDo: Get the action keys loaded by dbReader so we reload only these in cache
// Need to do it before scheduler otherwise actions to run will be unknown
if err := self.RatingDb.CacheRatingPrefixes(utils.DERIVEDCHARGERS_PREFIX, utils.ACTION_PREFIX, utils.SHARED_GROUP_PREFIX); err != nil {
if err := self.RatingDb.CacheRatingPrefixes(utils.DERIVEDCHARGERS_PREFIX, utils.ACTION_PREFIX, utils.SHARED_GROUP_PREFIX, utils.ACTION_PLAN_PREFIX); err != nil {
return err
}
if self.Sched != nil {

View File

@@ -740,7 +740,6 @@ func TestApierSetRatingProfile(t *testing.T) {
if err := rater.Call("ApierV1.SetRatingProfile", rpf, &reply); err != nil {
t.Error("Unexpected result on duplication: ", err.Error())
}
time.Sleep(10 * time.Millisecond) // Give time for cache reload
// Make sure rates were loaded for account dan
// Test here ResponderGetCost
tStart, _ := utils.ParseDate("2013-08-07T17:30:00Z")
@@ -1074,7 +1073,7 @@ func TestApierSetAccount(t *testing.T) {
return
}
reply := ""
attrs := &utils.AttrSetAccount{Tenant: "cgrates.org", Account: "dan7", ActionPlanId: "ATMS_1"}
attrs := &utils.AttrSetAccount{Tenant: "cgrates.org", Account: "dan7", ActionPlanId: "ATMS_1", ReloadScheduler: true}
if err := rater.Call("ApierV1.SetAccount", attrs, &reply); err != nil {
t.Error("Got error on ApierV1.SetAccount: ", err.Error())
} else if reply != "OK" {
@@ -1134,7 +1133,6 @@ func TestApierGetAccount(t *testing.T) {
if !*testLocal {
return
}
time.Sleep(100 * time.Millisecond) // give scheduler time to react
var reply *engine.Account
attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"}
if err := rater.Call("ApierV2.GetAccount", attrs, &reply); err != nil {
@@ -1175,7 +1173,7 @@ func TestApierTriggersExecute(t *testing.T) {
return
}
reply := ""
attrs := &utils.AttrSetAccount{Tenant: "cgrates.org", Account: "dan8"}
attrs := &utils.AttrSetAccount{Tenant: "cgrates.org", Account: "dan8", ReloadScheduler: true}
if err := rater.Call("ApierV1.SetAccount", attrs, &reply); err != nil {
t.Error("Got error on ApierV1.SetAccount: ", err.Error())
} else if reply != "OK" {

View File

@@ -104,9 +104,9 @@ type AttrsGetScheduledActions struct {
}
type ScheduledActions struct {
NextRunTime time.Time
Accounts int
ActionsId, ActionPlanId, ActionPlanUuid string
NextRunTime time.Time
Accounts int
ActionsId, ActionPlanId, ActionTimingUuid string
}
func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *[]*ScheduledActions) error {
@@ -116,7 +116,7 @@ func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *
schedActions := make([]*ScheduledActions, 0) // needs to be initialized if remains empty
scheduledActions := self.Sched.GetQueue()
for _, qActions := range scheduledActions {
sas := &ScheduledActions{ActionsId: qActions.ActionsId, ActionPlanId: qActions.Id, ActionPlanUuid: qActions.Uuid, Accounts: len(qActions.AccountIds)}
sas := &ScheduledActions{ActionsId: qActions.ActionsID, ActionPlanId: qActions.GetActionPlanID(), ActionTimingUuid: qActions.Uuid, Accounts: len(qActions.GetAccountIDs())}
if attrs.SearchTerm != "" &&
!(strings.Contains(sas.ActionPlanId, attrs.SearchTerm) ||
strings.Contains(sas.ActionsId, attrs.SearchTerm)) {
@@ -132,7 +132,7 @@ func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *
// filter on account
if attrs.Tenant != "" || attrs.Account != "" {
found := false
for _, accID := range qActions.AccountIds {
for accID := range qActions.GetAccountIDs() {
split := strings.Split(accID, utils.CONCATENATED_KEY_SEP)
if len(split) != 2 {
continue // malformed account id

View File

@@ -1,6 +1,8 @@
package main
import (
"bytes"
"compress/zlib"
"fmt"
"log"
"strings"
@@ -125,6 +127,26 @@ type Action struct {
Balance *Balance
}
type ActionPlan struct {
Uuid string // uniquely identify the timing
Id string // informative purpose only
AccountIds []string
Timing *engine.RateInterval
Weight float64
ActionsId string
actions Actions
stCache time.Time // cached time of the next start
}
func (at *ActionPlan) IsASAP() bool {
if at.Timing == nil {
return false
}
return at.Timing.Timing.StartTime == utils.ASAP
}
type ActionPlans []*ActionPlan
func (mig MigratorRC8) migrateAccounts() error {
keys, err := mig.db.Cmd("KEYS", OLD_ACCOUNT_PREFIX+"*").List()
if err != nil {
@@ -431,10 +453,10 @@ func (mig MigratorRC8) migrateActionPlans() error {
if err != nil {
return err
}
aplsMap := make(map[string]engine.ActionPlans, len(keys))
aplsMap := make(map[string]ActionPlans, len(keys))
for _, key := range keys {
log.Printf("Migrating action plans: %s...", key)
var apls engine.ActionPlans
var apls ActionPlans
var values []byte
if values, err = mig.db.Cmd("GET", key).Bytes(); err == nil {
if err := mig.ms.Unmarshal(values, &apls); err != nil {
@@ -456,12 +478,42 @@ func (mig MigratorRC8) migrateActionPlans() error {
aplsMap[key] = apls
}
// write data back
for key, apl := range aplsMap {
newAplMap := make(map[string]*engine.ActionPlan)
for key, apls := range aplsMap {
for _, apl := range apls {
newApl, exists := newAplMap[key]
if !exists {
newApl = &engine.ActionPlan{
Id: apl.Id,
AccountIDs: make(map[string]struct{}),
}
newAplMap[key] = newApl
}
if !apl.IsASAP() {
for _, accID := range apl.AccountIds {
if _, exists := newApl.AccountIDs[accID]; !exists {
newApl.AccountIDs[accID] = struct{}{}
}
}
}
newApl.ActionTimings = append(newApl.ActionTimings, &engine.ActionTiming{
Uuid: utils.GenUUID(),
Timing: apl.Timing,
ActionsID: apl.ActionsId,
Weight: apl.Weight,
})
}
}
for key, apl := range newAplMap {
result, err := mig.ms.Marshal(apl)
if err != nil {
return err
}
if err = mig.db.Cmd("SET", key, result).Err; err != nil {
var b bytes.Buffer
w := zlib.NewWriter(&b)
w.Write(result)
w.Close()
if err = mig.db.Cmd("SET", key, b.Bytes()).Err; err != nil {
return err
}
}

View File

@@ -61,6 +61,6 @@ func (self *CmdGetActionPlan) PostprocessRpcParams() error {
}
func (self *CmdGetActionPlan) RpcResult() interface{} {
s := make([]*engine.ActionPlans, 0)
s := make([]*engine.ActionPlan, 0)
return &s
}

View File

@@ -129,6 +129,8 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
return mailAsync, true
case SET_DDESTINATIONS:
return setddestinations, true
case REMOVE_ACCOUNT:
return removeAccount, true
case REMOVE_BALANCE:
return removeBalance, true
}
@@ -528,6 +530,52 @@ func setddestinations(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actio
return nil
}
func removeAccount(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
var accID string
if ub != nil {
accID = ub.Id
} else {
accountInfo := struct {
Tenant string
Account string
}{}
if a.ExtraParameters != "" {
if err := json.Unmarshal([]byte(a.ExtraParameters), &accountInfo); err != nil {
return err
}
}
accID = utils.AccountKey(accountInfo.Tenant, accountInfo.Account)
}
if accID == "" {
return utils.ErrInvalidKey
}
if err := accountingStorage.RemoveAccount(accID); err != nil {
utils.Logger.Err(fmt.Sprintf("Could not remove account Id: %s: %v", accID, err))
return err
}
// clean the account id from all action plans
allAPs, err := ratingStorage.GetAllActionPlans()
if err != nil && err != utils.ErrNotFound {
utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accID, err))
return err
}
for key, ap := range allAPs {
if _, exists := ap.AccountIDs[accID]; !exists {
_, err := Guardian.Guard(func() (interface{}, error) {
// save action plan
ratingStorage.SetActionPlan(key, ap)
// cache
ratingStorage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
return 0, nil
}, 0, utils.ACTION_PLAN_PREFIX)
if err != nil {
return err
}
}
}
return nil
}
func removeBalance(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
if _, exists := ub.BalanceMap[a.BalanceType]; !exists {
return utils.ErrNotFound

View File

@@ -21,7 +21,6 @@ package engine
import (
"fmt"
"sort"
"strconv"
"time"
"github.com/cgrates/cgrates/utils"
@@ -32,20 +31,45 @@ const (
FORMAT = "2006-1-2 15:04:05 MST"
)
type ActionPlan struct {
Uuid string // uniquely identify the timing
Id string // informative purpose only
AccountIds []string
Timing *RateInterval
Weight float64
ActionsId string
actions Actions
stCache time.Time // cached time of the next start
type ActionTiming struct {
Uuid string
Timing *RateInterval
ActionsID string
Weight float64
actions Actions
accountIDs map[string]struct{} // copy of action plans accounts
actionPlanID string // the id of the belonging action plan (info only)
stCache time.Time // cached time of the next start
}
type ActionPlans []*ActionPlan
type Task struct {
Uuid string
AccountID string
ActionsID string
}
func (at *ActionPlan) GetNextStartTime(now time.Time) (t time.Time) {
type ActionPlan struct {
Id string // informative purpose only
AccountIDs map[string]struct{}
ActionTimings []*ActionTiming
}
func (apl *ActionPlan) RemoveAccountID(accID string) (found bool) {
if _, found = apl.AccountIDs[accID]; found {
delete(apl.AccountIDs, accID)
}
return
}
func (t *Task) Execute() error {
return (&ActionTiming{
Uuid: t.Uuid,
ActionsID: t.ActionsID,
accountIDs: map[string]struct{}{t.AccountID: struct{}{}},
}).Execute()
}
func (at *ActionTiming) GetNextStartTime(now time.Time) (t time.Time) {
if !at.stCache.IsZero() {
return at.stCache
}
@@ -68,7 +92,7 @@ func (at *ActionPlan) GetNextStartTime(now time.Time) (t time.Time) {
}
// To be deleted after the above solution proves reliable
func (at *ActionPlan) GetNextStartTimeOld(now time.Time) (t time.Time) {
func (at *ActionTiming) GetNextStartTimeOld(now time.Time) (t time.Time) {
if !at.stCache.IsZero() {
return at.stCache
}
@@ -218,88 +242,63 @@ YEARS:
return
}
func (at *ActionPlan) ResetStartTimeCache() {
func (at *ActionTiming) ResetStartTimeCache() {
at.stCache = time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)
}
func (at *ActionPlan) SetActions(as Actions) {
func (at *ActionTiming) SetActions(as Actions) {
at.actions = as
}
func (at *ActionPlan) getActions() (as []*Action, err error) {
func (at *ActionTiming) SetAccountIDs(accIDs map[string]struct{}) {
at.accountIDs = accIDs
}
func (at *ActionTiming) GetAccountIDs() map[string]struct{} {
return at.accountIDs
}
func (at *ActionTiming) SetActionPlanID(id string) {
at.actionPlanID = id
}
func (at *ActionTiming) GetActionPlanID() string {
return at.actionPlanID
}
func (at *ActionTiming) getActions() (as []*Action, err error) {
if at.actions == nil {
at.actions, err = ratingStorage.GetActions(at.ActionsId, false)
at.actions, err = ratingStorage.GetActions(at.ActionsID, false)
}
at.actions.Sort()
return at.actions, err
}
func (at *ActionPlan) Execute() (err error) {
if len(at.AccountIds) == 0 { // nothing to do if no accounts set
return
}
func (at *ActionTiming) Execute() (err error) {
at.ResetStartTimeCache()
aac, err := at.getActions()
if err != nil {
utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsId, err))
utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsID, err))
return
}
_, err = Guardian.Guard(func() (interface{}, error) {
for _, accId := range at.AccountIds {
ub, err := accountingStorage.GetAccount(accId)
for accID, _ := range at.accountIDs {
_, err = Guardian.Guard(func() (interface{}, error) {
ub, err := accountingStorage.GetAccount(accID)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", accId))
utils.Logger.Warning(fmt.Sprintf("Could not get account id: %s. Skipping!", accID))
return 0, err
}
transactionFailed := false
toBeSaved := true
removeAccountActionFound := false
for _, a := range aac {
if ub.Disabled && a.ActionType != ENABLE_ACCOUNT {
continue // disabled acocunts are not removed from action plan
//return 0, fmt.Errorf("Account %s is disabled", accId)
//return 0, fmt.Errorf("Account %s is disabled", accID)
}
if expDate, parseErr := utils.ParseDate(a.ExpirationString); (a.Balance == nil || a.Balance.ExpirationDate.IsZero()) && parseErr == nil && !expDate.IsZero() {
a.Balance.ExpirationDate = expDate
}
// handle remove action
if a.ActionType == REMOVE_ACCOUNT {
if err := accountingStorage.RemoveAccount(accId); err != nil {
utils.Logger.Err(fmt.Sprintf("Could not remove account Id: %s: %v", accId, err))
transactionFailed = true
break
}
// clean the account id from all action plans
allATs, err := ratingStorage.GetAllActionPlans()
if err != nil && err != utils.ErrNotFound {
utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accId, err))
transactionFailed = true
break
}
for key, ats := range allATs {
changed := false
for _, at := range ats {
for i := 0; i < len(at.AccountIds); i++ {
if at.AccountIds[i] == accId {
// delete without preserving order
at.AccountIds[i] = at.AccountIds[len(at.AccountIds)-1]
at.AccountIds = at.AccountIds[:len(at.AccountIds)-1]
i--
changed = true
}
}
}
if changed {
// save action plan
ratingStorage.SetActionPlans(key, ats)
// cache
ratingStorage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
}
}
toBeSaved = false
continue // do not go to getActionFunc
// TODO: maybe we should break here as the account is gone
// will leave continue for now as the next action can create another acount
}
actionFunction, exists := getActionFunc(a.ActionType)
if !exists {
// do not allow the action plan to be rescheduled
@@ -313,23 +312,46 @@ func (at *ActionPlan) Execute() (err error) {
transactionFailed = true
break
}
toBeSaved = true
if a.ActionType == REMOVE_ACCOUNT {
removeAccountActionFound = true
}
}
if !transactionFailed && toBeSaved {
if !transactionFailed && !removeAccountActionFound {
accountingStorage.SetAccount(ub)
}
return 0, nil
}, 0, accID)
}
if len(at.accountIDs) == 0 { // action timing executing without accounts
for _, a := range aac {
if expDate, parseErr := utils.ParseDate(a.ExpirationString); (a.Balance == nil || a.Balance.ExpirationDate.IsZero()) &&
parseErr == nil && !expDate.IsZero() {
a.Balance.ExpirationDate = expDate
}
actionFunction, exists := getActionFunc(a.ActionType)
if !exists {
// do not allow the action plan to be rescheduled
at.Timing = nil
utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType))
break
}
if err := actionFunction(nil, nil, a, aac); err != nil {
utils.Logger.Err(fmt.Sprintf("Error executing action %s: %v!", a.ActionType, err))
break
}
}
return 0, nil
}, 0, at.AccountIds...)
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("Error executing action plan: %v", err))
return err
}
storageLogger.LogActionPlan(utils.SCHED_SOURCE, at, aac)
storageLogger.LogActionTiming(utils.SCHED_SOURCE, at, aac)
return
}
func (at *ActionPlan) IsASAP() bool {
func (at *ActionTiming) IsASAP() bool {
if at.Timing == nil {
return false
}
@@ -337,17 +359,17 @@ func (at *ActionPlan) IsASAP() bool {
}
// Structure to store actions according to weight
type ActionPlanPriotityList []*ActionPlan
type ActionTimingPriorityList []*ActionTiming
func (atpl ActionPlanPriotityList) Len() int {
func (atpl ActionTimingPriorityList) Len() int {
return len(atpl)
}
func (atpl ActionPlanPriotityList) Swap(i, j int) {
func (atpl ActionTimingPriorityList) Swap(i, j int) {
atpl[i], atpl[j] = atpl[j], atpl[i]
}
func (atpl ActionPlanPriotityList) Less(i, j int) bool {
func (atpl ActionTimingPriorityList) Less(i, j int) bool {
if atpl[i].GetNextStartTime(time.Now()).Equal(atpl[j].GetNextStartTime(time.Now())) {
// higher weights earlyer in the list
return atpl[i].Weight > atpl[j].Weight
@@ -355,41 +377,6 @@ func (atpl ActionPlanPriotityList) Less(i, j int) bool {
return atpl[i].GetNextStartTime(time.Now()).Before(atpl[j].GetNextStartTime(time.Now()))
}
func (atpl ActionPlanPriotityList) Sort() {
func (atpl ActionTimingPriorityList) Sort() {
sort.Sort(atpl)
}
func (at *ActionPlan) String_DISABLED() string {
return at.Id + " " + at.GetNextStartTime(time.Now()).String() + ",w: " + strconv.FormatFloat(at.Weight, 'f', -1, 64)
}
// Helper to remove ActionPlan members based on specific filters, empty data means no always match
func RemActionPlan(ats ActionPlans, actionTimingId, accountId string) ActionPlans {
for idx, at := range ats {
if len(actionTimingId) != 0 && at.Uuid != actionTimingId { // No Match for ActionPlanId, no need to move further
continue
}
if len(accountId) == 0 { // No account defined, considered match for complete removal
if len(ats) == 1 { // Removing last item, by init empty
return make([]*ActionPlan, 0)
}
ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1]
continue
}
for iAcc, accId := range at.AccountIds {
if accId == accountId {
if len(at.AccountIds) == 1 { // Only one balance, remove complete at
if len(ats) == 1 { // Removing last item, by init empty
return make([]*ActionPlan, 0)
}
ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1]
} else {
at.AccountIds[iAcc], at.AccountIds = at.AccountIds[len(at.AccountIds)-1], at.AccountIds[:len(at.AccountIds)-1]
}
// only remove the first one matching
break
}
}
}
return ats
}

View File

@@ -73,53 +73,12 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro
}
at.Executed = true
transactionFailed := false
toBeSaved := true
removeAccountActionFound := false
for _, a := range aac {
if a.Balance == nil {
a.Balance = &Balance{}
}
a.Balance.ExpirationDate, _ = utils.ParseDate(a.ExpirationString)
// handle remove action
if a.ActionType == REMOVE_ACCOUNT {
accId := ub.Id
if err := accountingStorage.RemoveAccount(accId); err != nil {
utils.Logger.Err(fmt.Sprintf("Could not remove account Id: %s: %v", accId, err))
transactionFailed = true
break
}
// clean the account id from all action plans
allATs, err := ratingStorage.GetAllActionPlans()
if err != nil && err != utils.ErrNotFound {
utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accId, err))
transactionFailed = true
break
}
for key, ats := range allATs {
changed := false
for _, at := range ats {
for i := 0; i < len(at.AccountIds); i++ {
if at.AccountIds[i] == accId {
// delete without preserving order
at.AccountIds[i] = at.AccountIds[len(at.AccountIds)-1]
at.AccountIds = at.AccountIds[:len(at.AccountIds)-1]
i -= 1
changed = true
}
}
}
if changed {
// save action plan
ratingStorage.SetActionPlans(key, ats)
// cache
ratingStorage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
}
}
toBeSaved = false
continue // do not go to getActionFunc
// TODO: maybe we should break here as the account is gone
// will leave continue for now as the next action can create another acount
}
actionFunction, exists := getActionFunc(a.ActionType)
if !exists {
utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType))
@@ -132,16 +91,16 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro
transactionFailed = false
break
}
toBeSaved = true
if a.ActionType == REMOVE_ACCOUNT {
removeAccountActionFound = true
}
}
if transactionFailed || at.Recurrent {
at.Executed = false
}
if !transactionFailed && ub != nil {
if !transactionFailed && ub != nil && !removeAccountActionFound {
storageLogger.LogActionTrigger(ub.Id, utils.RATER_SOURCE, at, aac)
if toBeSaved {
accountingStorage.SetAccount(ub)
}
accountingStorage.SetAccount(ub)
}
return
}

View File

@@ -38,7 +38,7 @@ var (
)
func TestActionTimingAlways(t *testing.T) {
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: "00:00:00"}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{StartTime: "00:00:00"}}}
st := at.GetNextStartTime(referenceDate)
y, m, d := referenceDate.Date()
expected := time.Date(y, m, d, 0, 0, 0, 0, time.Local).AddDate(0, 0, 1)
@@ -48,7 +48,7 @@ func TestActionTimingAlways(t *testing.T) {
}
func TestActionPlanNothing(t *testing.T) {
at := &ActionPlan{}
at := &ActionTiming{}
st := at.GetNextStartTime(referenceDate)
expected := time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)
if !st.Equal(expected) {
@@ -57,7 +57,7 @@ func TestActionPlanNothing(t *testing.T) {
}
func TestActionTimingMidnight(t *testing.T) {
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: "00:00:00"}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{StartTime: "00:00:00"}}}
y, m, d := referenceDate.Date()
now := time.Date(y, m, d, 0, 0, 1, 0, time.Local)
st := at.GetNextStartTime(now)
@@ -68,7 +68,7 @@ func TestActionTimingMidnight(t *testing.T) {
}
func TestActionPlanOnlyHour(t *testing.T) {
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: "10:01:00"}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{StartTime: "10:01:00"}}}
st := at.GetNextStartTime(referenceDate)
y, m, d := now.Date()
@@ -82,7 +82,7 @@ func TestActionPlanOnlyHour(t *testing.T) {
}
func TestActionPlanHourYear(t *testing.T) {
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{2022}, StartTime: "10:01:00"}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{2022}, StartTime: "10:01:00"}}}
st := at.GetNextStartTime(referenceDate)
expected := time.Date(2022, 1, 1, 10, 1, 0, 0, time.Local)
if !st.Equal(expected) {
@@ -91,7 +91,7 @@ func TestActionPlanHourYear(t *testing.T) {
}
func TestActionPlanOnlyWeekdays(t *testing.T) {
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{WeekDays: []time.Weekday{time.Monday}}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{WeekDays: []time.Weekday{time.Monday}}}}
st := at.GetNextStartTime(referenceDate)
y, m, d := now.Date()
@@ -112,7 +112,7 @@ func TestActionPlanOnlyWeekdays(t *testing.T) {
}
func TestActionPlanHourWeekdays(t *testing.T) {
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{WeekDays: []time.Weekday{time.Monday}, StartTime: "10:01:00"}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{WeekDays: []time.Weekday{time.Monday}, StartTime: "10:01:00"}}}
st := at.GetNextStartTime(referenceDate)
y, m, d := now.Date()
@@ -135,7 +135,7 @@ func TestActionPlanOnlyMonthdays(t *testing.T) {
y, m, d := now.Date()
tomorrow := time.Date(y, m, d, 0, 0, 0, 0, time.Local).AddDate(0, 0, 1)
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{MonthDays: utils.MonthDays{1, 25, 2, tomorrow.Day()}}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{MonthDays: utils.MonthDays{1, 25, 2, tomorrow.Day()}}}}
st := at.GetNextStartTime(referenceDate)
expected := tomorrow
if !st.Equal(expected) {
@@ -151,7 +151,7 @@ func TestActionPlanHourMonthdays(t *testing.T) {
if now.After(testTime) {
y, m, d = tomorrow.Date()
}
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{MonthDays: utils.MonthDays{now.Day(), tomorrow.Day()}, StartTime: "10:01:00"}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{MonthDays: utils.MonthDays{now.Day(), tomorrow.Day()}, StartTime: "10:01:00"}}}
st := at.GetNextStartTime(referenceDate)
expected := time.Date(y, m, d, 10, 1, 0, 0, time.Local)
if !st.Equal(expected) {
@@ -163,7 +163,7 @@ func TestActionPlanOnlyMonths(t *testing.T) {
y, m, _ := now.Date()
nextMonth := time.Date(y, m, 1, 0, 0, 0, 0, time.Local).AddDate(0, 1, 0)
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{Months: utils.Months{time.February, time.May, nextMonth.Month()}}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{Months: utils.Months{time.February, time.May, nextMonth.Month()}}}}
st := at.GetNextStartTime(referenceDate)
expected := time.Date(nextMonth.Year(), nextMonth.Month(), 1, 0, 0, 0, 0, time.Local)
if !st.Equal(expected) {
@@ -186,7 +186,7 @@ func TestActionPlanHourMonths(t *testing.T) {
y = nextMonth.Year()
}
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{
Months: utils.Months{now.Month(), nextMonth.Month()},
StartTime: "10:01:00"}}}
st := at.GetNextStartTime(referenceDate)
@@ -216,7 +216,7 @@ func TestActionPlanHourMonthdaysMonths(t *testing.T) {
month = nextMonth.Month()
}
}
at := &ActionPlan{Timing: &RateInterval{
at := &ActionTiming{Timing: &RateInterval{
Timing: &RITiming{
Months: utils.Months{now.Month(), nextMonth.Month()},
MonthDays: utils.MonthDays{now.Day(), tomorrow.Day()},
@@ -234,7 +234,7 @@ func TestActionPlanFirstOfTheMonth(t *testing.T) {
y, m, _ := now.Date()
nextMonth := time.Date(y, m, 1, 0, 0, 0, 0, time.Local).AddDate(0, 1, 0)
at := &ActionPlan{Timing: &RateInterval{
at := &ActionTiming{Timing: &RateInterval{
Timing: &RITiming{
MonthDays: utils.MonthDays{1},
},
@@ -249,7 +249,7 @@ func TestActionPlanFirstOfTheMonth(t *testing.T) {
func TestActionPlanOnlyYears(t *testing.T) {
y, _, _ := referenceDate.Date()
nextYear := time.Date(y, 1, 1, 0, 0, 0, 0, time.Local).AddDate(1, 0, 0)
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{now.Year(), nextYear.Year()}}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{now.Year(), nextYear.Year()}}}}
st := at.GetNextStartTime(referenceDate)
expected := nextYear
if !st.Equal(expected) {
@@ -258,7 +258,7 @@ func TestActionPlanOnlyYears(t *testing.T) {
}
func TestActionPlanPast(t *testing.T) {
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{2023}}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{2023}}}}
st := at.GetNextStartTime(referenceDate)
expected := time.Date(2023, 1, 1, 0, 0, 0, 0, time.Local)
if !st.Equal(expected) {
@@ -267,7 +267,7 @@ func TestActionPlanPast(t *testing.T) {
}
func TestActionPlanHourYears(t *testing.T) {
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{referenceDate.Year(), referenceDate.Year() + 1}, StartTime: "10:01:00"}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{referenceDate.Year(), referenceDate.Year() + 1}, StartTime: "10:01:00"}}}
st := at.GetNextStartTime(referenceDate)
expected := time.Date(referenceDate.Year(), 1, 1, 10, 1, 0, 0, time.Local)
if referenceDate.After(expected) {
@@ -292,7 +292,7 @@ func TestActionPlanHourMonthdaysYear(t *testing.T) {
expected = tomorrow
}
}
at := &ActionPlan{Timing: &RateInterval{
at := &ActionTiming{Timing: &RateInterval{
Timing: &RITiming{
Years: utils.Years{now.Year(), nextYear.Year()},
MonthDays: utils.MonthDays{now.Day(), tomorrow.Day()},
@@ -332,7 +332,7 @@ func TestActionPlanHourMonthdaysMonthYear(t *testing.T) {
year = nextYear.Year()
}
}
at := &ActionPlan{Timing: &RateInterval{
at := &ActionTiming{Timing: &RateInterval{
Timing: &RITiming{
Years: utils.Years{now.Year(), nextYear.Year()},
Months: utils.Months{now.Month(), nextMonth.Month()},
@@ -350,7 +350,7 @@ func TestActionPlanHourMonthdaysMonthYear(t *testing.T) {
func TestActionPlanFirstOfTheYear(t *testing.T) {
y, _, _ := now.Date()
nextYear := time.Date(y, 1, 1, 0, 0, 0, 0, time.Local).AddDate(1, 0, 0)
at := &ActionPlan{Timing: &RateInterval{
at := &ActionTiming{Timing: &RateInterval{
Timing: &RITiming{
Years: utils.Years{nextYear.Year()},
Months: utils.Months{time.January},
@@ -371,7 +371,7 @@ func TestActionPlanFirstMonthOfTheYear(t *testing.T) {
if referenceDate.After(expected) {
expected = expected.AddDate(1, 0, 0)
}
at := &ActionPlan{Timing: &RateInterval{
at := &ActionTiming{Timing: &RateInterval{
Timing: &RITiming{
Months: utils.Months{time.January},
},
@@ -388,7 +388,7 @@ func TestActionPlanFirstMonthOfTheYearSecondDay(t *testing.T) {
if referenceDate.After(expected) {
expected = expected.AddDate(1, 0, 0)
}
at := &ActionPlan{Timing: &RateInterval{
at := &ActionTiming{Timing: &RateInterval{
Timing: &RITiming{
Months: utils.Months{time.January},
MonthDays: utils.MonthDays{2},
@@ -401,7 +401,7 @@ func TestActionPlanFirstMonthOfTheYearSecondDay(t *testing.T) {
}
func TestActionPlanCheckForASAP(t *testing.T) {
at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: utils.ASAP}}}
at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{StartTime: utils.ASAP}}}
if !at.IsASAP() {
t.Errorf("%v should be asap!", at)
}
@@ -413,7 +413,7 @@ func TestActionPlanLogFunction(t *testing.T) {
BalanceType: "test",
Balance: &Balance{Value: 1.1},
}
at := &ActionPlan{
at := &ActionTiming{
actions: []*Action{a},
}
err := at.Execute()
@@ -428,8 +428,8 @@ func TestActionPlanFunctionNotAvailable(t *testing.T) {
BalanceType: "test",
Balance: &Balance{Value: 1.1},
}
at := &ActionPlan{
AccountIds: []string{"cgrates.org:dy"},
at := &ActionTiming{
accountIDs: map[string]struct{}{"cgrates.org:dy": struct{}{}},
Timing: &RateInterval{},
actions: []*Action{a},
}
@@ -439,8 +439,8 @@ func TestActionPlanFunctionNotAvailable(t *testing.T) {
}
}
func TestActionPlanPriotityListSortByWeight(t *testing.T) {
at1 := &ActionPlan{Timing: &RateInterval{
func TestActionTimingPriorityListSortByWeight(t *testing.T) {
at1 := &ActionTiming{Timing: &RateInterval{
Timing: &RITiming{
Years: utils.Years{2020},
Months: utils.Months{time.January, time.February, time.March, time.April, time.May, time.June, time.July, time.August, time.September, time.October, time.November, time.December},
@@ -449,7 +449,7 @@ func TestActionPlanPriotityListSortByWeight(t *testing.T) {
},
Weight: 20,
}}
at2 := &ActionPlan{Timing: &RateInterval{
at2 := &ActionTiming{Timing: &RateInterval{
Timing: &RITiming{
Years: utils.Years{2020},
Months: utils.Months{time.January, time.February, time.March, time.April, time.May, time.June, time.July, time.August, time.September, time.October, time.November, time.December},
@@ -458,7 +458,7 @@ func TestActionPlanPriotityListSortByWeight(t *testing.T) {
},
Weight: 10,
}}
var atpl ActionPlanPriotityList
var atpl ActionTimingPriorityList
atpl = append(atpl, at2, at1)
atpl.Sort()
if atpl[0] != at1 || atpl[1] != at2 {
@@ -466,8 +466,8 @@ func TestActionPlanPriotityListSortByWeight(t *testing.T) {
}
}
func TestActionPlanPriotityListWeight(t *testing.T) {
at1 := &ActionPlan{
func TestActionTimingPriorityListWeight(t *testing.T) {
at1 := &ActionTiming{
Timing: &RateInterval{
Timing: &RITiming{
Months: utils.Months{time.January, time.February, time.March, time.April, time.May, time.June, time.July, time.August, time.September, time.October, time.November, time.December},
@@ -477,7 +477,7 @@ func TestActionPlanPriotityListWeight(t *testing.T) {
},
Weight: 20,
}
at2 := &ActionPlan{
at2 := &ActionTiming{
Timing: &RateInterval{
Timing: &RITiming{
Months: utils.Months{time.January, time.February, time.March, time.April, time.May, time.June, time.July, time.August, time.September, time.October, time.November, time.December},
@@ -487,7 +487,7 @@ func TestActionPlanPriotityListWeight(t *testing.T) {
},
Weight: 10,
}
var atpl ActionPlanPriotityList
var atpl ActionTimingPriorityList
atpl = append(atpl, at2, at1)
atpl.Sort()
if atpl[0] != at1 || atpl[1] != at2 {
@@ -495,17 +495,18 @@ func TestActionPlanPriotityListWeight(t *testing.T) {
}
}
/*
func TestActionPlansRemoveMember(t *testing.T) {
at1 := &ActionPlan{
Uuid: "some uuid",
Id: "test",
AccountIds: []string{"one", "two", "three"},
AccountIDs: []string{"one", "two", "three"},
ActionsId: "TEST_ACTIONS",
}
at2 := &ActionPlan{
Uuid: "some uuid22",
Id: "test2",
AccountIds: []string{"three", "four"},
AccountIDs: []string{"three", "four"},
ActionsId: "TEST_ACTIONS2",
}
ats := ActionPlans{at1, at2}
@@ -522,7 +523,7 @@ func TestActionPlansRemoveMember(t *testing.T) {
if ats2 = RemActionPlan(ats2, "", ""); len(ats2) != 0 {
t.Error("Should have no members anymore", ats2)
}
}
}*/
func TestActionTriggerMatchNil(t *testing.T) {
at := &ActionTrigger{
@@ -1056,19 +1057,17 @@ func TestActionPlanLogging(t *testing.T) {
Rates: RateGroups{&Rate{0, 1.0, 1 * time.Second, 60 * time.Second}},
},
}
at := &ActionPlan{
Uuid: "some uuid",
Id: "test",
AccountIds: []string{"one", "two", "three"},
at := &ActionTiming{
accountIDs: map[string]struct{}{"one": struct{}{}, "two": struct{}{}, "three": struct{}{}},
Timing: i,
Weight: 10.0,
ActionsId: "TEST_ACTIONS",
ActionsID: "TEST_ACTIONS",
}
as, err := ratingStorage.GetActions(at.ActionsId, false)
as, err := ratingStorage.GetActions(at.ActionsID, false)
if err != nil {
t.Error("Error getting actions for the action trigger: ", err)
}
storageLogger.LogActionPlan(utils.SCHED_SOURCE, at, as)
storageLogger.LogActionTiming(utils.SCHED_SOURCE, at, as)
//expected := "some uuid|test|one,two,three|;1,2,3,4,5,6,7,8,9,10,11,12;1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31;1,2,3,4,5;18:00:00;00:00:00;10;0;1;60;1|10|TEST_ACTIONS*|TOPUP|MONETARY|OUT|10|0"
var key string
atMap, _ := ratingStorage.GetAllActionPlans()
@@ -1104,8 +1103,8 @@ func TestRemoveAction(t *testing.T) {
ActionType: REMOVE_ACCOUNT,
}
at := &ActionPlan{
AccountIds: []string{"cgrates.org:remo"},
at := &ActionTiming{
accountIDs: map[string]struct{}{"cgrates.org:remo": struct{}{}},
actions: Actions{a},
}
at.Execute()
@@ -1123,8 +1122,8 @@ func TestTopupAction(t *testing.T) {
Balance: &Balance{Value: 25, DestinationIds: utils.NewStringMap("RET"), Directions: utils.NewStringMap(utils.OUT), Weight: 20},
}
at := &ActionPlan{
AccountIds: []string{"vdf:minu"},
at := &ActionTiming{
accountIDs: map[string]struct{}{"vdf:minu": struct{}{}},
actions: Actions{a},
}
@@ -1145,8 +1144,8 @@ func TestTopupActionLoaded(t *testing.T) {
Balance: &Balance{Value: 25, DestinationIds: utils.NewStringMap("RET"), Directions: utils.NewStringMap(utils.OUT), Weight: 20},
}
at := &ActionPlan{
AccountIds: []string{"vdf:minitsboy"},
at := &ActionTiming{
accountIDs: map[string]struct{}{"vdf:minitsboy": struct{}{}},
actions: Actions{a},
}
@@ -1297,8 +1296,8 @@ func TestActionTransactionFuncType(t *testing.T) {
if err != nil {
t.Error("Error setting account: ", err)
}
at := &ActionPlan{
AccountIds: []string{"cgrates.org:trans"},
at := &ActionTiming{
accountIDs: map[string]struct{}{"cgrates.org:trans": struct{}{}},
Timing: &RateInterval{},
actions: []*Action{
&Action{
@@ -1335,8 +1334,8 @@ func TestActionTransactionBalanceType(t *testing.T) {
if err != nil {
t.Error("Error setting account: ", err)
}
at := &ActionPlan{
AccountIds: []string{"cgrates.org:trans"},
at := &ActionTiming{
accountIDs: map[string]struct{}{"cgrates.org:trans": struct{}{}},
Timing: &RateInterval{},
actions: []*Action{
&Action{
@@ -1373,8 +1372,8 @@ func TestActionWithExpireWithoutExpire(t *testing.T) {
if err != nil {
t.Error("Error setting account: ", err)
}
at := &ActionPlan{
AccountIds: []string{"cgrates.org:exp"},
at := &ActionTiming{
accountIDs: map[string]struct{}{"cgrates.org:exp": struct{}{}},
Timing: &RateInterval{},
actions: []*Action{
&Action{
@@ -1428,8 +1427,8 @@ func TestActionRemoveBalance(t *testing.T) {
if err != nil {
t.Error("Error setting account: ", err)
}
at := &ActionPlan{
AccountIds: []string{"cgrates.org:rembal"},
at := &ActionTiming{
accountIDs: map[string]struct{}{"cgrates.org:rembal": struct{}{}},
Timing: &RateInterval{},
actions: []*Action{
&Action{

View File

@@ -487,8 +487,9 @@ func TestMaxSessionTimeWithAccount(t *testing.T) {
}
func TestMaxSessionTimeWithMaxRate(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
//acc, _ := accountingStorage.GetAccount("cgrates.org:12345")
@@ -513,8 +514,9 @@ func TestMaxSessionTimeWithMaxRate(t *testing.T) {
}
func TestMaxSessionTimeWithMaxCost(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd := &CallDescriptor{
@@ -536,8 +538,9 @@ func TestMaxSessionTimeWithMaxCost(t *testing.T) {
}
func TestGetCostWithMaxCost(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd := &CallDescriptor{
@@ -558,8 +561,9 @@ func TestGetCostWithMaxCost(t *testing.T) {
}
}
func TestGetCostRoundingIssue(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd := &CallDescriptor{
@@ -582,8 +586,9 @@ func TestGetCostRoundingIssue(t *testing.T) {
}
func TestGetCostRatingInfoOnZeroTime(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd := &CallDescriptor{
@@ -609,8 +614,9 @@ func TestGetCostRatingInfoOnZeroTime(t *testing.T) {
}
func TestDebitRatingInfoOnZeroTime(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd := &CallDescriptor{
@@ -637,8 +643,9 @@ func TestDebitRatingInfoOnZeroTime(t *testing.T) {
}
func TestMaxDebitRatingInfoOnZeroTime(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd := &CallDescriptor{
@@ -664,8 +671,9 @@ func TestMaxDebitRatingInfoOnZeroTime(t *testing.T) {
}
func TestMaxDebitUnknowDest(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd := &CallDescriptor{
@@ -686,8 +694,9 @@ func TestMaxDebitUnknowDest(t *testing.T) {
}
func TestGetCostMaxDebitRoundingIssue(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd := &CallDescriptor{
@@ -718,8 +727,9 @@ func TestGetCostMaxDebitRoundingIssue(t *testing.T) {
}
func TestMaxSessionTimeWithMaxCostFree(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd := &CallDescriptor{
@@ -741,8 +751,9 @@ func TestMaxSessionTimeWithMaxCostFree(t *testing.T) {
}
func TestMaxDebitWithMaxCostFree(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd := &CallDescriptor{
@@ -764,8 +775,9 @@ func TestMaxDebitWithMaxCostFree(t *testing.T) {
}
func TestGetCostWithMaxCostFree(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd := &CallDescriptor{
@@ -818,12 +830,14 @@ func TestMaxSessionTimeWithAccountAlias(t *testing.T) {
}
func TestMaxSessionTimeWithAccountShared(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP_SHARED0_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED0_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
ap, _ = ratingStorage.GetActionPlans("TOPUP_SHARED10_AT", false)
for _, at := range ap {
ap, _ = ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
@@ -857,12 +871,14 @@ func TestMaxSessionTimeWithAccountShared(t *testing.T) {
}
func TestMaxDebitWithAccountShared(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP_SHARED0_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED0_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
ap, _ = ratingStorage.GetActionPlans("TOPUP_SHARED10_AT", false)
for _, at := range ap {
ap, _ = ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
@@ -1077,8 +1093,9 @@ func TestMaxSesionTimeLongerThanMoney(t *testing.T) {
}
func TestDebitFromShareAndNormal(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP_SHARED10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
@@ -1105,8 +1122,9 @@ func TestDebitFromShareAndNormal(t *testing.T) {
}
func TestDebitFromEmptyShare(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP_EMPTY_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP_EMPTY_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
@@ -1133,8 +1151,9 @@ func TestDebitFromEmptyShare(t *testing.T) {
}
func TestDebitNegatve(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("POST_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("POST_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
@@ -1172,8 +1191,9 @@ func TestDebitNegatve(t *testing.T) {
}
func TestMaxDebitZeroDefinedRate(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd1 := &CallDescriptor{
@@ -1200,8 +1220,9 @@ func TestMaxDebitZeroDefinedRate(t *testing.T) {
}
func TestMaxDebitZeroDefinedRateOnlyMinutes(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd1 := &CallDescriptor{
@@ -1228,8 +1249,9 @@ func TestMaxDebitZeroDefinedRateOnlyMinutes(t *testing.T) {
}
func TestMaxDebitConsumesMinutes(t *testing.T) {
ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false)
for _, at := range ap {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
cd1 := &CallDescriptor{

View File

@@ -985,25 +985,43 @@ func TestLoadActionTimings(t *testing.T) {
if len(csvr.actionPlans) != 6 {
t.Error("Failed to load action timings: ", len(csvr.actionPlans))
}
atm := csvr.actionPlans["MORE_MINUTES"][0]
atm := csvr.actionPlans["MORE_MINUTES"]
expected := &ActionPlan{
Uuid: atm.Uuid,
Id: "MORE_MINUTES",
AccountIds: []string{"vdf:minitsboy"},
Timing: &RateInterval{
Timing: &RITiming{
Years: utils.Years{2012},
Months: utils.Months{},
MonthDays: utils.MonthDays{},
WeekDays: utils.WeekDays{},
StartTime: utils.ASAP,
AccountIDs: map[string]struct{}{"vdf:minitsboy": struct{}{}},
ActionTimings: []*ActionTiming{
&ActionTiming{
Uuid: atm.ActionTimings[0].Uuid,
Timing: &RateInterval{
Timing: &RITiming{
Years: utils.Years{2012},
Months: utils.Months{},
MonthDays: utils.MonthDays{},
WeekDays: utils.WeekDays{},
StartTime: utils.ASAP,
},
},
Weight: 10,
ActionsID: "MINI",
},
&ActionTiming{
Uuid: atm.ActionTimings[1].Uuid,
Timing: &RateInterval{
Timing: &RITiming{
Years: utils.Years{2012},
Months: utils.Months{},
MonthDays: utils.MonthDays{},
WeekDays: utils.WeekDays{},
StartTime: utils.ASAP,
},
},
Weight: 10,
ActionsID: "SHARED",
},
},
Weight: 10,
ActionsId: "MINI",
}
if !reflect.DeepEqual(atm, expected) {
t.Errorf("Error loading action timing:\n%+v", atm)
t.Errorf("Error loading action timing:\n%+v", atm.ActionTimings[1])
}
}

View File

@@ -21,6 +21,7 @@ package engine
import (
"flag"
"path"
"strings"
"testing"
"github.com/cgrates/cgrates/config"
@@ -395,6 +396,9 @@ func TestMatchLoadCsvWithStorRating(t *testing.T) {
for _, key := range keysCsv {
var refVal []byte
for idx, rs := range []*RedisStorage{rsCsv, rsStor, rsApier} {
if key == utils.TASKS_KEY || strings.HasPrefix(key, utils.ACTION_PLAN_PREFIX) { // action plans are not consistent
continue
}
qVal, err := rs.db.Cmd("GET", key).Bytes()
if err != nil {
t.Fatalf("Run: %d, could not retrieve key %s, error: %s", idx, key, err.Error())

View File

@@ -63,9 +63,11 @@ type RatingStorage interface {
SetSharedGroup(*SharedGroup) error
GetActionTriggers(string) (ActionTriggers, error)
SetActionTriggers(string, ActionTriggers) error
GetActionPlans(string, bool) (ActionPlans, error)
SetActionPlans(string, ActionPlans) error
GetAllActionPlans() (map[string]ActionPlans, error)
GetActionPlan(string, bool) (*ActionPlan, error)
SetActionPlan(string, *ActionPlan) error
GetAllActionPlans() (map[string]*ActionPlan, error)
PushTask(*Task) error
PopTask() (*Task, error)
}
type AccountingStorage interface {
@@ -106,7 +108,7 @@ type LogStorage interface {
Storage
//GetAllActionTimingsLogs() (map[string]ActionsTimings, error)
LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) error
LogActionPlan(source string, at *ActionPlan, as Actions) error
LogActionTiming(source string, at *ActionTiming, as Actions) error
}
type LoadStorage interface {

View File

@@ -33,8 +33,9 @@ import (
)
type MapStorage struct {
dict map[string][]byte
ms Marshaler
dict map[string][]byte
tasks [][]byte
ms Marshaler
}
func NewMapStorage() (*MapStorage, error) {
@@ -178,7 +179,7 @@ func (ms *MapStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, actK
}
if strings.HasPrefix(k, utils.ACTION_PLAN_PREFIX) {
cache2go.RemKey(k)
if _, err := ms.GetActionPlans(k[len(utils.ACTION_PLAN_PREFIX):], true); err != nil {
if _, err := ms.GetActionPlan(k[len(utils.ACTION_PLAN_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
}
@@ -645,11 +646,11 @@ func (ms *MapStorage) SetActionTriggers(key string, atrs ActionTriggers) (err er
return
}
func (ms *MapStorage) GetActionPlans(key string, skipCache bool) (ats ActionPlans, err error) {
func (ms *MapStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan, err error) {
key = utils.ACTION_PLAN_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
return x.(ActionPlans), nil
return x.(*ActionPlan), nil
} else {
return nil, err
}
@@ -663,8 +664,8 @@ func (ms *MapStorage) GetActionPlans(key string, skipCache bool) (ats ActionPlan
return
}
func (ms *MapStorage) SetActionPlans(key string, ats ActionPlans) (err error) {
if len(ats) == 0 {
func (ms *MapStorage) SetActionPlan(key string, ats *ActionPlan) (err error) {
if len(ats.ActionTimings) == 0 {
// delete the key
delete(ms.dict, utils.ACTION_PLAN_PREFIX+key)
cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key)
@@ -675,21 +676,42 @@ func (ms *MapStorage) SetActionPlans(key string, ats ActionPlans) (err error) {
return
}
func (ms *MapStorage) GetAllActionPlans() (ats map[string]ActionPlans, err error) {
func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) {
apls, err := cache2go.GetAllEntries(utils.ACTION_PLAN_PREFIX)
if err != nil {
return nil, err
}
ats = make(map[string]ActionPlans, len(apls))
ats = make(map[string]*ActionPlan, len(apls))
for key, value := range apls {
apl := value.(ActionPlans)
apl := value.(*ActionPlan)
ats[key] = apl
}
return
}
func (ms *MapStorage) PushTask(t *Task) error {
result, err := ms.ms.Marshal(t)
if err != nil {
return err
}
ms.tasks = append(ms.tasks, result)
return nil
}
func (ms *MapStorage) PopTask() (t *Task, err error) {
if len(ms.tasks) > 0 {
var values []byte
values, ms.tasks = ms.tasks[0], ms.tasks[1:]
t = &Task{}
err = ms.ms.Unmarshal(values, t)
} else {
err = utils.ErrNotFound
}
return
}
func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) {
key = utils.DERIVEDCHARGERS_PREFIX + key
if !skipCache {
@@ -774,7 +796,7 @@ func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, a
return
}
func (ms *MapStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) {
func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) {
mat, err := ms.ms.Marshal(at)
if err != nil {
return

View File

@@ -34,6 +34,7 @@ const (
colDst = "destinations"
colAct = "actions"
colApl = "actionplans"
colTsk = "tasks"
colAtr = "actiontriggers"
colRpl = "ratingplans"
colRpf = "ratingprofiles"
@@ -451,7 +452,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
}
for _, key := range aplKeys {
cache2go.RemKey(key)
if _, err = ms.GetActionPlans(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil {
if _, err = ms.GetActionPlan(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
}
@@ -1026,28 +1027,35 @@ func (ms *MongoStorage) SetActionTriggers(key string, atrs ActionTriggers) (err
return err
}
func (ms *MongoStorage) GetActionPlans(key string, skipCache bool) (ats ActionPlans, err error) {
func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan, err error) {
if !skipCache {
if x, err := cache2go.Get(utils.ACTION_PLAN_PREFIX + key); err == nil {
return x.(ActionPlans), nil
return x.(*ActionPlan), nil
} else {
return nil, err
}
}
var kv struct {
Key string
Value ActionPlans
Value *ActionPlan
}
err = ms.db.C(colApl).Find(bson.M{"key": key}).One(&kv)
if err == nil {
ats = kv.Value
cache2go.Cache(utils.ACTION_PLAN_PREFIX+key, ats)
}
if ats != nil && ats.AccountIDs != nil {
ats.AccountIDs = utils.YesDots(ats.AccountIDs)
}
return
}
func (ms *MongoStorage) SetActionPlans(key string, ats ActionPlans) error {
if len(ats) == 0 {
func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan) error {
// clean dots from account ids map
if ats != nil && ats.AccountIDs != nil {
ats.AccountIDs = utils.NoDots(ats.AccountIDs)
}
if len(ats.ActionTimings) == 0 {
cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key)
err := ms.db.C(colApl).Remove(bson.M{"key": key})
if err != mgo.ErrNotFound {
@@ -1057,26 +1065,46 @@ func (ms *MongoStorage) SetActionPlans(key string, ats ActionPlans) error {
}
_, err := ms.db.C(colApl).Upsert(bson.M{"key": key}, &struct {
Key string
Value ActionPlans
Value *ActionPlan
}{Key: key, Value: ats})
return err
}
func (ms *MongoStorage) GetAllActionPlans() (ats map[string]ActionPlans, err error) {
func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) {
apls, err := cache2go.GetAllEntries(utils.ACTION_PLAN_PREFIX)
if err != nil {
return nil, err
}
ats = make(map[string]ActionPlans, len(apls))
ats = make(map[string]*ActionPlan, len(apls))
for key, value := range apls {
apl := value.(ActionPlans)
apl := value.(*ActionPlan)
apl.AccountIDs = utils.YesDots(apl.AccountIDs)
ats[key] = apl
}
return
}
func (ms *MongoStorage) PushTask(t *Task) error {
return ms.db.C(colTsk).Insert(bson.M{"_id": bson.NewObjectId(), "task": t})
}
func (ms *MongoStorage) PopTask() (t *Task, err error) {
v := struct {
ID bson.ObjectId `bson:"_id"`
Task *Task
}{}
if err = ms.db.C(colTsk).Find(nil).One(&v); err == nil {
if remErr := ms.db.C(colTsk).Remove(bson.M{"_id": v.ID}); remErr != nil {
return nil, remErr
}
t = v.Task
}
return
}
func (ms *MongoStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) {
if !skipCache {
if x, err := cache2go.Get(utils.DERIVEDCHARGERS_PREFIX + key); err == nil {

View File

@@ -689,9 +689,9 @@ func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger,
}{ubId, at, as, time.Now(), source})
}
func (ms *MongoStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) {
func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) {
return ms.db.C(colLogApl).Insert(&struct {
ActionPlan *ActionPlan
ActionPlan *ActionTiming
Actions Actions
LogTime time.Time
Source string

View File

@@ -282,7 +282,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
}
for _, key := range aplKeys {
cache2go.RemKey(key)
if _, err = rs.GetActionPlans(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil {
if _, err = rs.GetActionPlan(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
}
@@ -886,33 +886,42 @@ func (rs *RedisStorage) SetActionTriggers(key string, atrs ActionTriggers) (err
// delete the key
return conn.Cmd("DEL", utils.ACTION_TRIGGER_PREFIX+key).Err
}
result, err := rs.ms.Marshal(&atrs)
result, err := rs.ms.Marshal(atrs)
if err != nil {
return err
}
return conn.Cmd("SET", utils.ACTION_TRIGGER_PREFIX+key, result).Err
}
func (rs *RedisStorage) GetActionPlans(key string, skipCache bool) (ats ActionPlans, err error) {
func (rs *RedisStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan, err error) {
key = utils.ACTION_PLAN_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
return x.(ActionPlans), nil
return x.(*ActionPlan), nil
} else {
return nil, err
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
err = rs.ms.Unmarshal(values, &ats)
b := bytes.NewBuffer(values)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
err = rs.ms.Unmarshal(out, &ats)
cache2go.Cache(key, ats)
}
return
}
func (rs *RedisStorage) SetActionPlans(key string, ats ActionPlans) (err error) {
if len(ats) == 0 {
func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan) (err error) {
if len(ats.ActionTimings) == 0 {
// delete the key
err = rs.db.Cmd("DEL", utils.ACTION_PLAN_PREFIX+key).Err
cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key)
@@ -922,24 +931,45 @@ func (rs *RedisStorage) SetActionPlans(key string, ats ActionPlans) (err error)
if err != nil {
return err
}
return rs.db.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, result).Err
var b bytes.Buffer
w := zlib.NewWriter(&b)
w.Write(result)
w.Close()
return rs.db.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, b.Bytes()).Err
}
func (rs *RedisStorage) GetAllActionPlans() (ats map[string]ActionPlans, err error) {
func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) {
apls, err := cache2go.GetAllEntries(utils.ACTION_PLAN_PREFIX)
if err != nil {
return nil, err
}
ats = make(map[string]ActionPlans, len(apls))
ats = make(map[string]*ActionPlan, len(apls))
for key, value := range apls {
apl := value.(ActionPlans)
apl := value.(*ActionPlan)
ats[key] = apl
}
return
}
func (rs *RedisStorage) PushTask(t *Task) error {
result, err := rs.ms.Marshal(t)
if err != nil {
return err
}
return rs.db.Cmd("RPUSH", utils.TASKS_KEY, result).Err
}
func (rs *RedisStorage) PopTask() (t *Task, err error) {
var values []byte
if values, err = rs.db.Cmd("LPOP", utils.TASKS_KEY).Bytes(); err == nil {
t = &Task{}
err = rs.ms.Unmarshal(values, t)
}
return
}
func (rs *RedisStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) {
key = utils.DERIVEDCHARGERS_PREFIX + key
if !skipCache {
@@ -1037,7 +1067,7 @@ func (rs *RedisStorage) LogActionTrigger(ubId, source string, at *ActionTrigger,
return rs.db.Cmd("SET", utils.LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v*%v", ubId, string(mat), string(mas)))).Err
}
func (rs *RedisStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) {
func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) {
mat, err := rs.ms.Marshal(at)
if err != nil {
return

View File

@@ -592,7 +592,7 @@ func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (*CallCost,
func (self *SQLStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) {
return
}
func (self *SQLStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) {
func (self *SQLStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) {
return
}

View File

@@ -272,6 +272,41 @@ func TestDifferentUuid(t *testing.T) {
}
}
func TestStorageTask(t *testing.T) {
// clean previous unused tasks
for i := 0; i < 16; i++ {
ratingStorage.PopTask()
}
if err := ratingStorage.PushTask(&Task{Uuid: "1"}); err != nil {
t.Error("Error pushing task: ", err)
}
if err := ratingStorage.PushTask(&Task{Uuid: "2"}); err != nil {
t.Error("Error pushing task: ", err)
}
if err := ratingStorage.PushTask(&Task{Uuid: "3"}); err != nil {
t.Error("Error pushing task: ", err)
}
if err := ratingStorage.PushTask(&Task{Uuid: "4"}); err != nil {
t.Error("Error pushing task: ", err)
}
if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "1" {
t.Error("Error poping task: ", task, err)
}
if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "2" {
t.Error("Error poping task: ", task, err)
}
if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "3" {
t.Error("Error poping task: ", task, err)
}
if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "4" {
t.Error("Error poping task: ", task, err)
}
if task, err := ratingStorage.PopTask(); err == nil && task != nil {
t.Errorf("Error poping task %+v, %v: ", task, err)
}
}
/************************** Benchmarks *****************************/
func GetUB() *Account {

View File

@@ -19,7 +19,7 @@ type TpReader struct {
accountingStorage AccountingStorage
lr LoadReader
actions map[string][]*Action
actionPlans map[string][]*ActionPlan
actionPlans map[string]*ActionPlan
actionsTriggers map[string]ActionTriggers
accountActions map[string]*Account
dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed
@@ -73,7 +73,7 @@ func NewTpReader(rs RatingStorage, as AccountingStorage, lr LoadReader, tpid, ti
func (tpr *TpReader) Init() {
tpr.actions = make(map[string][]*Action)
tpr.actionPlans = make(map[string][]*ActionPlan)
tpr.actionPlans = make(map[string]*ActionPlan)
tpr.actionsTriggers = make(map[string]ActionTriggers)
tpr.rates = make(map[string]*utils.TPRate)
tpr.destinations = make(map[string]*Destination)
@@ -575,9 +575,14 @@ func (tpr *TpReader) LoadActionPlans() (err error) {
if !exists {
return fmt.Errorf("[ActionPlans] Could not load the timing for tag: %v", at.TimingId)
}
actPln := &ActionPlan{
var actPln *ActionPlan
if actPln, exists = tpr.actionPlans[atId]; !exists {
actPln = &ActionPlan{
Id: atId,
}
}
actPln.ActionTimings = append(actPln.ActionTimings, &ActionTiming{
Uuid: utils.GenUUID(),
Id: atId,
Weight: at.Weight,
Timing: &RateInterval{
Timing: &RITiming{
@@ -588,9 +593,10 @@ func (tpr *TpReader) LoadActionPlans() (err error) {
StartTime: t.StartTime,
},
},
ActionsId: at.ActionsId,
}
tpr.actionPlans[atId] = append(tpr.actionPlans[atId], actPln)
ActionsID: at.ActionsId,
})
tpr.actionPlans[atId] = actPln
}
}
@@ -660,11 +666,10 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error
// action timings
if accountAction.ActionPlanId != "" {
// get old userBalanceIds
var exitingAccountIds []string
existingActionPlans, err := tpr.ratingStorage.GetActionPlans(accountAction.ActionPlanId, true)
if err == nil && len(existingActionPlans) > 0 {
// all action timings from a specific tag shuld have the same list of user balances from the first one
exitingAccountIds = existingActionPlans[0].AccountIds
exitingAccountIds := make(map[string]struct{})
existingActionPlan, err := tpr.ratingStorage.GetActionPlan(accountAction.ActionPlanId, true)
if err == nil && existingActionPlan != nil {
exitingAccountIds = existingActionPlan.AccountIDs
}
tpap, err := tpr.lr.GetTpActionPlans(tpr.tpid, accountAction.ActionPlanId)
@@ -677,7 +682,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error
if err != nil {
return err
}
var actionTimings []*ActionPlan
var actionPlan *ActionPlan
ats := aps[accountAction.ActionPlanId]
for _, at := range ats {
// Check action exists before saving it inside actionTiming key
@@ -703,9 +708,13 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error
} else {
t = tpr.timings[at.TimingId] // *asap
}
actPln := &ActionPlan{
if actionPlan == nil {
actionPlan = &ActionPlan{
Id: accountAction.ActionPlanId,
}
}
actionPlan.ActionTimings = append(actionPlan.ActionTimings, &ActionTiming{
Uuid: utils.GenUUID(),
Id: accountAction.ActionPlanId,
Weight: at.Weight,
Timing: &RateInterval{
Timing: &RITiming{
@@ -715,26 +724,31 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error
StartTime: t.StartTime,
},
},
ActionsId: at.ActionsId,
}
ActionsID: at.ActionsId,
})
// collect action ids from timings
actionsIds = append(actionsIds, actPln.ActionsId)
//add user balance id if no already in
found := false
for _, ubId := range exitingAccountIds {
if ubId == id {
found = true
break
}
}
if !found {
actPln.AccountIds = append(exitingAccountIds, id)
}
actionTimings = append(actionTimings, actPln)
actionsIds = append(actionsIds, at.ActionsId)
exitingAccountIds[id] = struct{}{}
actionPlan.AccountIDs = exitingAccountIds
}
// write action triggers
err = tpr.ratingStorage.SetActionPlans(accountAction.ActionPlanId, actionTimings)
// write tasks
for _, at := range actionPlan.ActionTimings {
if at.IsASAP() {
for accID := range actionPlan.AccountIDs {
t := &Task{
Uuid: utils.GenUUID(),
AccountID: accID,
ActionsID: at.ActionsID,
}
if err = tpr.ratingStorage.PushTask(t); err != nil {
return err
}
}
}
}
// write action plan
err = tpr.ratingStorage.SetActionPlan(accountAction.ActionPlanId, actionPlan)
if err != nil {
return errors.New(err.Error() + " (SetActionPlan): " + accountAction.ActionPlanId)
}
@@ -874,7 +888,6 @@ func (tpr *TpReader) LoadAccountActions() (err error) {
return fmt.Errorf("could not get action triggers for tag %s", aa.ActionTriggersId)
}
}
ub := &Account{
Id: aa.KeyId(),
ActionTriggers: aTriggers,
@@ -883,14 +896,15 @@ func (tpr *TpReader) LoadAccountActions() (err error) {
}
ub.InitCounters()
tpr.accountActions[aa.KeyId()] = ub
aTimings, exists := tpr.actionPlans[aa.ActionPlanId]
actionPlan, exists := tpr.actionPlans[aa.ActionPlanId]
if !exists {
log.Printf("could not get action plan for tag %v", aa.ActionPlanId)
// must not continue here
}
for _, at := range aTimings {
at.AccountIds = append(at.AccountIds, aa.KeyId())
if actionPlan.AccountIDs == nil {
actionPlan.AccountIDs = make(map[string]struct{})
}
actionPlan.AccountIDs[aa.KeyId()] = struct{}{}
}
return nil
}
@@ -1318,8 +1332,22 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) {
if verbose {
log.Print("Action Plans:")
}
for k, ats := range tpr.actionPlans {
err = tpr.ratingStorage.SetActionPlans(k, ats)
for k, ap := range tpr.actionPlans {
for _, at := range ap.ActionTimings {
if at.IsASAP() {
for accID := range ap.AccountIDs {
t := &Task{
Uuid: utils.GenUUID(),
AccountID: accID,
ActionsID: at.ActionsID,
}
if err = tpr.ratingStorage.PushTask(t); err != nil {
return err
}
}
}
}
err = tpr.ratingStorage.SetActionPlan(k, ap)
if err != nil {
return err
}

View File

@@ -74,10 +74,10 @@ ENABLE_ACNT,*enable_account,,,,,,,,,,,,,false,10`
func TestAcntActsDisableAcnt(t *testing.T) {
acnt1Tag := "cgrates.org:1"
at := &engine.ActionPlan{
AccountIds: []string{acnt1Tag},
ActionsId: "DISABLE_ACNT",
at := &engine.ActionTiming{
ActionsID: "DISABLE_ACNT",
}
at.SetAccountIDs(map[string]struct{}{acnt1Tag: struct{}{}})
if err := at.Execute(); err != nil {
t.Error(err)
}
@@ -91,10 +91,10 @@ func TestAcntActsDisableAcnt(t *testing.T) {
func TestAcntActsEnableAcnt(t *testing.T) {
acnt1Tag := "cgrates.org:1"
at := &engine.ActionPlan{
AccountIds: []string{acnt1Tag},
ActionsId: "ENABLE_ACNT",
at := &engine.ActionTiming{
ActionsID: "ENABLE_ACNT",
}
at.SetAccountIDs(map[string]struct{}{acnt1Tag: struct{}{}})
if err := at.Execute(); err != nil {
t.Error(err)
}

15
glide.lock generated
View File

@@ -1,9 +1,8 @@
hash: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
updated: 2015-12-16T09:32:54.975724404+02:00
updated: 2015-12-18T18:02:12.669823411+02:00
imports:
- name: github.com/cenkalti/hub
version: 57d753b5f4856e77b3cf8ecce78c97215a7d324d
repo: https://github.com/cenkalti/hub
- name: github.com/cenkalti/rpc2
version: 2d1be381ce47537e9e076b2b76dc70933162e4e9
- name: github.com/cgrates/fsock
@@ -29,10 +28,9 @@ imports:
- name: github.com/gorhill/cronexpr
version: a557574d6c024ed6e36acc8b610f5f211c91568a
- name: github.com/jinzhu/gorm
version: c6a22c50962028255a718f22fe7e8959e8c67884
version: d209be3138acbe304daffee637bc495499c1e70e
- name: github.com/jinzhu/inflection
version: 3272df6c21d04180007eb3349844c89a3856bc25
repo: https://github.com/jinzhu/inflection
- name: github.com/kr/pty
version: f7ee69f31298ecbe5d2b349c711e2547a617d398
- name: github.com/lib/pq
@@ -43,7 +41,7 @@ imports:
- /pool
- redis
- name: github.com/peterh/liner
version: 4d47685ab2fd2dbb46c66b831344d558bc4be5b9
version: 3f1c20449d1836aa4cbe38731b96f95cdf89634d
- name: github.com/ugorji/go
version: cd43bdd6be4b5675a0d1e75c4af55ee1dc0d9c5e
subpackages:
@@ -51,18 +49,17 @@ imports:
- name: golang.org/x/crypto
version: f18420efc3b4f8e9f3d51f6bd2476e92c46260e9
- name: golang.org/x/net
version: 548f7bf20c8aae87fecd9aa09cc89065451e6271
version: 28273ec927bee3bea305f112fc28ceee575ea893
subpackages:
- /websocket
- name: golang.org/x/text
version: 92ca7bbb695e2e9675f1d731fa85760f95d2c0df
version: cf4986612c83df6c55578ba198316d1684a9a287
- name: gopkg.in/fsnotify.v1
version: 2cdd39bd6129c6a49c74fb07fb9d77ba1271c572
version: 508915b7500b6e42a87132e4afeb4729cebc7cbb
- name: gopkg.in/mgo.v2
version: e30de8ac9ae3b30df7065f766c71f88bba7d4e49
subpackages:
- bson
- name: gopkg.in/tomb.v2
version: 14b3d72120e8d10ea6e6b7f87f7175734b1faab8
repo: https://gopkg.in/tomb.v2
devImports: []

View File

@@ -29,13 +29,11 @@ import (
)
type Scheduler struct {
queue engine.ActionPlanPriotityList
queue engine.ActionTimingPriorityList
timer *time.Timer
restartLoop chan bool
sync.Mutex
storage engine.RatingStorage
waitingReload bool
loopChecker chan int
schedulerStarted bool
}
@@ -43,7 +41,6 @@ func NewScheduler(storage engine.RatingStorage) *Scheduler {
return &Scheduler{
restartLoop: make(chan bool),
storage: storage,
loopChecker: make(chan int),
}
}
@@ -56,7 +53,7 @@ func (s *Scheduler) Loop() {
utils.Logger.Info(fmt.Sprintf("<Scheduler> Scheduler queue length: %v", len(s.queue)))
s.Lock()
a0 := s.queue[0]
utils.Logger.Info(fmt.Sprintf("<Scheduler> Action: %s", a0.Id))
utils.Logger.Info(fmt.Sprintf("<Scheduler> Action: %s", a0.ActionsID))
now := time.Now()
start := a0.GetNextStartTime(now)
if start.Equal(now) || start.Before(now) {
@@ -77,12 +74,12 @@ func (s *Scheduler) Loop() {
} else {
s.Unlock()
d := a0.GetNextStartTime(now).Sub(now)
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time to next action (%s): %v", a0.Id, d))
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time to next action (%s): %v", a0.ActionsID, d))
s.timer = time.NewTimer(d)
select {
case <-s.timer.C:
// timer has expired
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %v", a0.Id))
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for action on %v", a0.ActionsID))
case <-s.restartLoop:
// nothing to do, just continue the loop
}
@@ -91,80 +88,55 @@ func (s *Scheduler) Loop() {
}
func (s *Scheduler) Reload(protect bool) {
s.Lock()
defer s.Unlock()
if protect {
if s.waitingReload {
s.loopChecker <- 1
}
s.waitingReload = true
go func() {
t := time.NewTicker(100 * time.Millisecond) // wait for loops before start
select {
case <-s.loopChecker:
t.Stop() // cancel reload
case <-t.C:
s.loadActionPlans()
s.restart()
t.Stop()
s.waitingReload = false
}
}()
} else {
go func() {
s.loadActionPlans()
s.restart()
}()
}
s.loadActionPlans()
s.restart()
}
func (s *Scheduler) loadActionPlans() {
s.Lock()
defer s.Unlock()
// limit the number of concurrent tasks
limit := make(chan bool, 10)
// execute existing tasks
for {
task, err := s.storage.PopTask()
if err != nil || task == nil {
break
}
limit <- true
go func() {
utils.Logger.Info(fmt.Sprintf("<Scheduler> executing task %s on account %s", task.ActionsID, task.AccountID))
task.Execute()
<-limit
}()
}
actionPlans, err := s.storage.GetAllActionPlans()
if err != nil && err != utils.ErrNotFound {
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Cannot get action plans: %v", err))
}
utils.Logger.Info(fmt.Sprintf("<Scheduler> processing %d action plans", len(actionPlans)))
// recreate the queue
s.Lock()
defer s.Unlock()
s.queue = engine.ActionPlanPriotityList{}
for key, aps := range actionPlans {
toBeSaved := false
isAsap := false
var newApls []*engine.ActionPlan // will remove the one time runs from the database
for _, ap := range aps {
if ap.Timing == nil {
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Nil timing on action plan: %+v, discarding!", ap))
s.queue = engine.ActionTimingPriorityList{}
for _, actionPlan := range actionPlans {
for _, at := range actionPlan.ActionTimings {
if at.Timing == nil {
utils.Logger.Warning(fmt.Sprintf("<Scheduler> Nil timing on action plan: %+v, discarding!", at))
continue
}
if len(ap.AccountIds) == 0 { // no accounts just ignore
if at.IsASAP() {
continue
}
isAsap = ap.IsASAP()
toBeSaved = toBeSaved || isAsap
if isAsap {
utils.Logger.Info(fmt.Sprintf("<Scheduler> Time for one time action on %v", key))
ap.Execute()
ap.AccountIds = make([]string, 0)
} else {
now := time.Now()
if ap.GetNextStartTime(now).Before(now) {
// the task is obsolete, do not add it to the queue
continue
}
s.queue = append(s.queue, ap)
now := time.Now()
if at.GetNextStartTime(now).Before(now) {
// the task is obsolete, do not add it to the queue
continue
}
// save even asap action plans with empty account id list
newApls = append(newApls, ap)
}
if toBeSaved {
engine.Guardian.Guard(func() (interface{}, error) {
s.storage.SetActionPlans(key, newApls)
s.storage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}})
return 0, nil
}, 0, utils.ACTION_PLAN_PREFIX)
at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts
at.SetActionPlanID(actionPlan.Id)
s.queue = append(s.queue, at)
}
}
sort.Sort(s.queue)
@@ -180,6 +152,6 @@ func (s *Scheduler) restart() {
}
}
func (s *Scheduler) GetQueue() engine.ActionPlanPriotityList {
func (s *Scheduler) GetQueue() engine.ActionTimingPriorityList {
return s.queue
}

View File

@@ -1099,11 +1099,13 @@ type AttrSetAccount struct {
ActionTriggersId string
AllowNegative *bool
Disabled *bool
ReloadScheduler bool
}
type AttrRemoveAccount struct {
Tenant string
Account string
Tenant string
Account string
ReloadScheduler bool
}
type AttrGetSMASessions struct {

View File

@@ -169,6 +169,7 @@ const (
ASR = "ASR"
ACD = "ACD"
FILTER_REGEXP_TPL = "$1$2$3$4$5"
TASKS_KEY = "tasks"
ACTION_PLAN_PREFIX = "apl_"
ACTION_TRIGGER_PREFIX = "atr_"
RATING_PLAN_PREFIX = "rpl_"

View File

@@ -143,3 +143,20 @@ func (sm StringMap) GetOne() string {
}
return ""
}
func NoDots(m map[string]struct{}) map[string]struct{} {
return MapKeysReplace(m, ".", "")
}
func YesDots(m map[string]struct{}) map[string]struct{} {
return MapKeysReplace(m, "", ".")
}
func MapKeysReplace(m map[string]struct{}, old, new string) map[string]struct{} {
for key, val := range m {
delete(m, key)
key = strings.Replace(key, old, new, -1)
m[key] = val
}
return m
}