From 9d228d80bfa8cd49a3b8bd4c1320445246c3ed5e Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 14 Dec 2015 22:30:36 +0200 Subject: [PATCH 01/15] restructured action plan --- engine/action_plan.go | 118 +++++++++++++----------------------- engine/action_trigger.go | 41 ------------- engine/actions_test.go | 109 +++++++++++++++++---------------- engine/calldesc_test.go | 110 +++++++++++++++++++-------------- engine/loader_csv_test.go | 41 +++++++++---- engine/storage_interface.go | 8 +-- engine/storage_map.go | 18 +++--- engine/storage_mongo.go | 20 +++--- engine/storage_mongo_tp.go | 4 +- engine/storage_redis.go | 16 ++--- engine/storage_sql.go | 2 +- engine/tp_reader.go | 72 +++++++++++----------- 12 files changed, 262 insertions(+), 297 deletions(-) diff --git a/engine/action_plan.go b/engine/action_plan.go index 6bdea2c4c..231c9568e 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -21,7 +21,6 @@ package engine import ( "fmt" "sort" - "strconv" "time" "github.com/cgrates/cgrates/utils" @@ -32,20 +31,31 @@ const ( FORMAT = "2006-1-2 15:04:05 MST" ) -type ActionPlan struct { - Uuid string // uniquely identify the timing - Id string // informative purpose only - AccountIds []string +type ActionTiming struct { + Uuid string Timing *RateInterval - Weight float64 ActionsId string + Weight float64 actions Actions + accountIDs map[string]struct{} stCache time.Time // cached time of the next start } -type ActionPlans []*ActionPlan +type ActionPlan struct { + Uuid string // uniquely identify the timing + Id string // informative purpose only + AccountIDs map[string]struct{} + ActionTimings []*ActionTiming +} -func (at *ActionPlan) GetNextStartTime(now time.Time) (t time.Time) { +func (apl *ActionPlan) RemoveAccountID(accID string) (found bool) { + if _, found = apl.AccountIDs[accID]; found { + delete(apl.AccountIDs, accID) + } + return +} + +func (at *ActionTiming) GetNextStartTime(now time.Time) (t time.Time) { if !at.stCache.IsZero() { return at.stCache } @@ -68,7 +78,7 @@ func (at *ActionPlan) GetNextStartTime(now time.Time) (t time.Time) { } // To be deleted after the above solution proves reliable -func (at *ActionPlan) GetNextStartTimeOld(now time.Time) (t time.Time) { +func (at *ActionTiming) GetNextStartTimeOld(now time.Time) (t time.Time) { if !at.stCache.IsZero() { return at.stCache } @@ -218,15 +228,15 @@ YEARS: return } -func (at *ActionPlan) ResetStartTimeCache() { +func (at *ActionTiming) ResetStartTimeCache() { at.stCache = time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC) } -func (at *ActionPlan) SetActions(as Actions) { +func (at *ActionTiming) SetActions(as Actions) { at.actions = as } -func (at *ActionPlan) getActions() (as []*Action, err error) { +func (at *ActionTiming) getActions() (as []*Action, err error) { if at.actions == nil { at.actions, err = ratingStorage.GetActions(at.ActionsId, false) } @@ -234,8 +244,8 @@ func (at *ActionPlan) getActions() (as []*Action, err error) { return at.actions, err } -func (at *ActionPlan) Execute() (err error) { - if len(at.AccountIds) == 0 { // nothing to do if no accounts set +func (at *ActionTiming) Execute() (err error) { + if len(at.accountIDs) == 0 { // nothing to do if no accounts set return } at.ResetStartTimeCache() @@ -244,8 +254,8 @@ func (at *ActionPlan) Execute() (err error) { utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsId, err)) return } - _, err = Guardian.Guard(func() (interface{}, error) { - for _, accId := range at.AccountIds { + for accId, _ := range at.accountIDs { + _, err = Guardian.Guard(func() (interface{}, error) { ub, err := accountingStorage.GetAccount(accId) if err != nil { utils.Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", accId)) @@ -261,45 +271,7 @@ func (at *ActionPlan) Execute() (err error) { if expDate, parseErr := utils.ParseDate(a.ExpirationString); (a.Balance == nil || a.Balance.ExpirationDate.IsZero()) && parseErr == nil && !expDate.IsZero() { a.Balance.ExpirationDate = expDate } - // handle remove action - if a.ActionType == REMOVE_ACCOUNT { - if err := accountingStorage.RemoveAccount(accId); err != nil { - utils.Logger.Err(fmt.Sprintf("Could not remove account Id: %s: %v", accId, err)) - transactionFailed = true - break - } - // clean the account id from all action plans - allATs, err := ratingStorage.GetAllActionPlans() - if err != nil && err != utils.ErrNotFound { - utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accId, err)) - transactionFailed = true - break - } - for key, ats := range allATs { - changed := false - for _, at := range ats { - for i := 0; i < len(at.AccountIds); i++ { - if at.AccountIds[i] == accId { - // delete without preserving order - at.AccountIds[i] = at.AccountIds[len(at.AccountIds)-1] - at.AccountIds = at.AccountIds[:len(at.AccountIds)-1] - i-- - changed = true - } - } - } - if changed { - // save action plan - ratingStorage.SetActionPlans(key, ats) - // cache - ratingStorage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}}) - } - } - toBeSaved = false - continue // do not go to getActionFunc - // TODO: maybe we should break here as the account is gone - // will leave continue for now as the next action can create another acount - } + actionFunction, exists := getActionFunc(a.ActionType) if !exists { // do not allow the action plan to be rescheduled @@ -318,18 +290,18 @@ func (at *ActionPlan) Execute() (err error) { if !transactionFailed && toBeSaved { accountingStorage.SetAccount(ub) } - } - return 0, nil - }, 0, at.AccountIds...) + return 0, nil + }, 0, accId) + } if err != nil { utils.Logger.Warning(fmt.Sprintf("Error executing action plan: %v", err)) return err } - storageLogger.LogActionPlan(utils.SCHED_SOURCE, at, aac) + storageLogger.LogActionTiming(utils.SCHED_SOURCE, at, aac) return } -func (at *ActionPlan) IsASAP() bool { +func (at *ActionTiming) IsASAP() bool { if at.Timing == nil { return false } @@ -337,17 +309,17 @@ func (at *ActionPlan) IsASAP() bool { } // Structure to store actions according to weight -type ActionPlanPriotityList []*ActionPlan +type ActionTimingPriorityList []*ActionTiming -func (atpl ActionPlanPriotityList) Len() int { +func (atpl ActionTimingPriorityList) Len() int { return len(atpl) } -func (atpl ActionPlanPriotityList) Swap(i, j int) { +func (atpl ActionTimingPriorityList) Swap(i, j int) { atpl[i], atpl[j] = atpl[j], atpl[i] } -func (atpl ActionPlanPriotityList) Less(i, j int) bool { +func (atpl ActionTimingPriorityList) Less(i, j int) bool { if atpl[i].GetNextStartTime(time.Now()).Equal(atpl[j].GetNextStartTime(time.Now())) { // higher weights earlyer in the list return atpl[i].Weight > atpl[j].Weight @@ -355,20 +327,16 @@ func (atpl ActionPlanPriotityList) Less(i, j int) bool { return atpl[i].GetNextStartTime(time.Now()).Before(atpl[j].GetNextStartTime(time.Now())) } -func (atpl ActionPlanPriotityList) Sort() { +func (atpl ActionTimingPriorityList) Sort() { sort.Sort(atpl) } -func (at *ActionPlan) String_DISABLED() string { - return at.Id + " " + at.GetNextStartTime(time.Now()).String() + ",w: " + strconv.FormatFloat(at.Weight, 'f', -1, 64) -} - // Helper to remove ActionPlan members based on specific filters, empty data means no always match -func RemActionPlan(ats ActionPlans, actionTimingId, accountId string) ActionPlans { - for idx, at := range ats { - if len(actionTimingId) != 0 && at.Uuid != actionTimingId { // No Match for ActionPlanId, no need to move further - continue - } +/*func RemActionPlan(apl ActionPlan, actionTimingId, accountId string) ActionPlan { + if len(actionTimingId) != 0 && apl.Uuid != actionTimingId { // No Match for ActionPlanId, no need to move further + continue + } + for idx, ats := range apl.ActionTimings { if len(accountId) == 0 { // No account defined, considered match for complete removal if len(ats) == 1 { // Removing last item, by init empty return make([]*ActionPlan, 0) @@ -392,4 +360,4 @@ func RemActionPlan(ats ActionPlans, actionTimingId, accountId string) ActionPlan } } return ats -} +}*/ diff --git a/engine/action_trigger.go b/engine/action_trigger.go index e6de7d1e3..c7bec094b 100644 --- a/engine/action_trigger.go +++ b/engine/action_trigger.go @@ -79,47 +79,6 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro a.Balance = &Balance{} } a.Balance.ExpirationDate, _ = utils.ParseDate(a.ExpirationString) - // handle remove action - if a.ActionType == REMOVE_ACCOUNT { - accId := ub.Id - if err := accountingStorage.RemoveAccount(accId); err != nil { - utils.Logger.Err(fmt.Sprintf("Could not remove account Id: %s: %v", accId, err)) - transactionFailed = true - break - } - // clean the account id from all action plans - allATs, err := ratingStorage.GetAllActionPlans() - if err != nil && err != utils.ErrNotFound { - utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accId, err)) - transactionFailed = true - break - } - for key, ats := range allATs { - changed := false - for _, at := range ats { - for i := 0; i < len(at.AccountIds); i++ { - if at.AccountIds[i] == accId { - // delete without preserving order - at.AccountIds[i] = at.AccountIds[len(at.AccountIds)-1] - at.AccountIds = at.AccountIds[:len(at.AccountIds)-1] - i -= 1 - changed = true - } - } - } - if changed { - // save action plan - ratingStorage.SetActionPlans(key, ats) - // cache - ratingStorage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}}) - } - } - toBeSaved = false - continue // do not go to getActionFunc - // TODO: maybe we should break here as the account is gone - // will leave continue for now as the next action can create another acount - } - actionFunction, exists := getActionFunc(a.ActionType) if !exists { utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType)) diff --git a/engine/actions_test.go b/engine/actions_test.go index 88c3bcbb3..b96003200 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -38,7 +38,7 @@ var ( ) func TestActionTimingAlways(t *testing.T) { - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: "00:00:00"}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{StartTime: "00:00:00"}}} st := at.GetNextStartTime(referenceDate) y, m, d := referenceDate.Date() expected := time.Date(y, m, d, 0, 0, 0, 0, time.Local).AddDate(0, 0, 1) @@ -48,7 +48,7 @@ func TestActionTimingAlways(t *testing.T) { } func TestActionPlanNothing(t *testing.T) { - at := &ActionPlan{} + at := &ActionTiming{} st := at.GetNextStartTime(referenceDate) expected := time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC) if !st.Equal(expected) { @@ -57,7 +57,7 @@ func TestActionPlanNothing(t *testing.T) { } func TestActionTimingMidnight(t *testing.T) { - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: "00:00:00"}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{StartTime: "00:00:00"}}} y, m, d := referenceDate.Date() now := time.Date(y, m, d, 0, 0, 1, 0, time.Local) st := at.GetNextStartTime(now) @@ -68,7 +68,7 @@ func TestActionTimingMidnight(t *testing.T) { } func TestActionPlanOnlyHour(t *testing.T) { - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: "10:01:00"}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{StartTime: "10:01:00"}}} st := at.GetNextStartTime(referenceDate) y, m, d := now.Date() @@ -82,7 +82,7 @@ func TestActionPlanOnlyHour(t *testing.T) { } func TestActionPlanHourYear(t *testing.T) { - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{2022}, StartTime: "10:01:00"}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{2022}, StartTime: "10:01:00"}}} st := at.GetNextStartTime(referenceDate) expected := time.Date(2022, 1, 1, 10, 1, 0, 0, time.Local) if !st.Equal(expected) { @@ -91,7 +91,7 @@ func TestActionPlanHourYear(t *testing.T) { } func TestActionPlanOnlyWeekdays(t *testing.T) { - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{WeekDays: []time.Weekday{time.Monday}}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{WeekDays: []time.Weekday{time.Monday}}}} st := at.GetNextStartTime(referenceDate) y, m, d := now.Date() @@ -112,7 +112,7 @@ func TestActionPlanOnlyWeekdays(t *testing.T) { } func TestActionPlanHourWeekdays(t *testing.T) { - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{WeekDays: []time.Weekday{time.Monday}, StartTime: "10:01:00"}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{WeekDays: []time.Weekday{time.Monday}, StartTime: "10:01:00"}}} st := at.GetNextStartTime(referenceDate) y, m, d := now.Date() @@ -135,7 +135,7 @@ func TestActionPlanOnlyMonthdays(t *testing.T) { y, m, d := now.Date() tomorrow := time.Date(y, m, d, 0, 0, 0, 0, time.Local).AddDate(0, 0, 1) - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{MonthDays: utils.MonthDays{1, 25, 2, tomorrow.Day()}}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{MonthDays: utils.MonthDays{1, 25, 2, tomorrow.Day()}}}} st := at.GetNextStartTime(referenceDate) expected := tomorrow if !st.Equal(expected) { @@ -151,7 +151,7 @@ func TestActionPlanHourMonthdays(t *testing.T) { if now.After(testTime) { y, m, d = tomorrow.Date() } - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{MonthDays: utils.MonthDays{now.Day(), tomorrow.Day()}, StartTime: "10:01:00"}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{MonthDays: utils.MonthDays{now.Day(), tomorrow.Day()}, StartTime: "10:01:00"}}} st := at.GetNextStartTime(referenceDate) expected := time.Date(y, m, d, 10, 1, 0, 0, time.Local) if !st.Equal(expected) { @@ -163,7 +163,7 @@ func TestActionPlanOnlyMonths(t *testing.T) { y, m, _ := now.Date() nextMonth := time.Date(y, m, 1, 0, 0, 0, 0, time.Local).AddDate(0, 1, 0) - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{Months: utils.Months{time.February, time.May, nextMonth.Month()}}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{Months: utils.Months{time.February, time.May, nextMonth.Month()}}}} st := at.GetNextStartTime(referenceDate) expected := time.Date(nextMonth.Year(), nextMonth.Month(), 1, 0, 0, 0, 0, time.Local) if !st.Equal(expected) { @@ -186,7 +186,7 @@ func TestActionPlanHourMonths(t *testing.T) { y = nextMonth.Year() } - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{ + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{ Months: utils.Months{now.Month(), nextMonth.Month()}, StartTime: "10:01:00"}}} st := at.GetNextStartTime(referenceDate) @@ -216,7 +216,7 @@ func TestActionPlanHourMonthdaysMonths(t *testing.T) { month = nextMonth.Month() } } - at := &ActionPlan{Timing: &RateInterval{ + at := &ActionTiming{Timing: &RateInterval{ Timing: &RITiming{ Months: utils.Months{now.Month(), nextMonth.Month()}, MonthDays: utils.MonthDays{now.Day(), tomorrow.Day()}, @@ -234,7 +234,7 @@ func TestActionPlanFirstOfTheMonth(t *testing.T) { y, m, _ := now.Date() nextMonth := time.Date(y, m, 1, 0, 0, 0, 0, time.Local).AddDate(0, 1, 0) - at := &ActionPlan{Timing: &RateInterval{ + at := &ActionTiming{Timing: &RateInterval{ Timing: &RITiming{ MonthDays: utils.MonthDays{1}, }, @@ -249,7 +249,7 @@ func TestActionPlanFirstOfTheMonth(t *testing.T) { func TestActionPlanOnlyYears(t *testing.T) { y, _, _ := referenceDate.Date() nextYear := time.Date(y, 1, 1, 0, 0, 0, 0, time.Local).AddDate(1, 0, 0) - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{now.Year(), nextYear.Year()}}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{now.Year(), nextYear.Year()}}}} st := at.GetNextStartTime(referenceDate) expected := nextYear if !st.Equal(expected) { @@ -258,7 +258,7 @@ func TestActionPlanOnlyYears(t *testing.T) { } func TestActionPlanPast(t *testing.T) { - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{2023}}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{2023}}}} st := at.GetNextStartTime(referenceDate) expected := time.Date(2023, 1, 1, 0, 0, 0, 0, time.Local) if !st.Equal(expected) { @@ -267,7 +267,7 @@ func TestActionPlanPast(t *testing.T) { } func TestActionPlanHourYears(t *testing.T) { - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{referenceDate.Year(), referenceDate.Year() + 1}, StartTime: "10:01:00"}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{Years: utils.Years{referenceDate.Year(), referenceDate.Year() + 1}, StartTime: "10:01:00"}}} st := at.GetNextStartTime(referenceDate) expected := time.Date(referenceDate.Year(), 1, 1, 10, 1, 0, 0, time.Local) if referenceDate.After(expected) { @@ -292,7 +292,7 @@ func TestActionPlanHourMonthdaysYear(t *testing.T) { expected = tomorrow } } - at := &ActionPlan{Timing: &RateInterval{ + at := &ActionTiming{Timing: &RateInterval{ Timing: &RITiming{ Years: utils.Years{now.Year(), nextYear.Year()}, MonthDays: utils.MonthDays{now.Day(), tomorrow.Day()}, @@ -332,7 +332,7 @@ func TestActionPlanHourMonthdaysMonthYear(t *testing.T) { year = nextYear.Year() } } - at := &ActionPlan{Timing: &RateInterval{ + at := &ActionTiming{Timing: &RateInterval{ Timing: &RITiming{ Years: utils.Years{now.Year(), nextYear.Year()}, Months: utils.Months{now.Month(), nextMonth.Month()}, @@ -350,7 +350,7 @@ func TestActionPlanHourMonthdaysMonthYear(t *testing.T) { func TestActionPlanFirstOfTheYear(t *testing.T) { y, _, _ := now.Date() nextYear := time.Date(y, 1, 1, 0, 0, 0, 0, time.Local).AddDate(1, 0, 0) - at := &ActionPlan{Timing: &RateInterval{ + at := &ActionTiming{Timing: &RateInterval{ Timing: &RITiming{ Years: utils.Years{nextYear.Year()}, Months: utils.Months{time.January}, @@ -371,7 +371,7 @@ func TestActionPlanFirstMonthOfTheYear(t *testing.T) { if referenceDate.After(expected) { expected = expected.AddDate(1, 0, 0) } - at := &ActionPlan{Timing: &RateInterval{ + at := &ActionTiming{Timing: &RateInterval{ Timing: &RITiming{ Months: utils.Months{time.January}, }, @@ -388,7 +388,7 @@ func TestActionPlanFirstMonthOfTheYearSecondDay(t *testing.T) { if referenceDate.After(expected) { expected = expected.AddDate(1, 0, 0) } - at := &ActionPlan{Timing: &RateInterval{ + at := &ActionTiming{Timing: &RateInterval{ Timing: &RITiming{ Months: utils.Months{time.January}, MonthDays: utils.MonthDays{2}, @@ -401,7 +401,7 @@ func TestActionPlanFirstMonthOfTheYearSecondDay(t *testing.T) { } func TestActionPlanCheckForASAP(t *testing.T) { - at := &ActionPlan{Timing: &RateInterval{Timing: &RITiming{StartTime: utils.ASAP}}} + at := &ActionTiming{Timing: &RateInterval{Timing: &RITiming{StartTime: utils.ASAP}}} if !at.IsASAP() { t.Errorf("%v should be asap!", at) } @@ -413,7 +413,7 @@ func TestActionPlanLogFunction(t *testing.T) { BalanceType: "test", Balance: &Balance{Value: 1.1}, } - at := &ActionPlan{ + at := &ActionTiming{ actions: []*Action{a}, } err := at.Execute() @@ -428,8 +428,8 @@ func TestActionPlanFunctionNotAvailable(t *testing.T) { BalanceType: "test", Balance: &Balance{Value: 1.1}, } - at := &ActionPlan{ - AccountIds: []string{"cgrates.org:dy"}, + at := &ActionTiming{ + accountIDs: map[string]struct{}{"cgrates.org:dy": struct{}{}}, Timing: &RateInterval{}, actions: []*Action{a}, } @@ -439,8 +439,8 @@ func TestActionPlanFunctionNotAvailable(t *testing.T) { } } -func TestActionPlanPriotityListSortByWeight(t *testing.T) { - at1 := &ActionPlan{Timing: &RateInterval{ +func TestActionTimingPriorityListSortByWeight(t *testing.T) { + at1 := &ActionTiming{Timing: &RateInterval{ Timing: &RITiming{ Years: utils.Years{2020}, Months: utils.Months{time.January, time.February, time.March, time.April, time.May, time.June, time.July, time.August, time.September, time.October, time.November, time.December}, @@ -449,7 +449,7 @@ func TestActionPlanPriotityListSortByWeight(t *testing.T) { }, Weight: 20, }} - at2 := &ActionPlan{Timing: &RateInterval{ + at2 := &ActionTiming{Timing: &RateInterval{ Timing: &RITiming{ Years: utils.Years{2020}, Months: utils.Months{time.January, time.February, time.March, time.April, time.May, time.June, time.July, time.August, time.September, time.October, time.November, time.December}, @@ -458,7 +458,7 @@ func TestActionPlanPriotityListSortByWeight(t *testing.T) { }, Weight: 10, }} - var atpl ActionPlanPriotityList + var atpl ActionTimingPriorityList atpl = append(atpl, at2, at1) atpl.Sort() if atpl[0] != at1 || atpl[1] != at2 { @@ -466,8 +466,8 @@ func TestActionPlanPriotityListSortByWeight(t *testing.T) { } } -func TestActionPlanPriotityListWeight(t *testing.T) { - at1 := &ActionPlan{ +func TestActionTimingPriorityListWeight(t *testing.T) { + at1 := &ActionTiming{ Timing: &RateInterval{ Timing: &RITiming{ Months: utils.Months{time.January, time.February, time.March, time.April, time.May, time.June, time.July, time.August, time.September, time.October, time.November, time.December}, @@ -477,7 +477,7 @@ func TestActionPlanPriotityListWeight(t *testing.T) { }, Weight: 20, } - at2 := &ActionPlan{ + at2 := &ActionTiming{ Timing: &RateInterval{ Timing: &RITiming{ Months: utils.Months{time.January, time.February, time.March, time.April, time.May, time.June, time.July, time.August, time.September, time.October, time.November, time.December}, @@ -487,7 +487,7 @@ func TestActionPlanPriotityListWeight(t *testing.T) { }, Weight: 10, } - var atpl ActionPlanPriotityList + var atpl ActionTimingPriorityList atpl = append(atpl, at2, at1) atpl.Sort() if atpl[0] != at1 || atpl[1] != at2 { @@ -495,17 +495,18 @@ func TestActionPlanPriotityListWeight(t *testing.T) { } } +/* func TestActionPlansRemoveMember(t *testing.T) { at1 := &ActionPlan{ Uuid: "some uuid", Id: "test", - AccountIds: []string{"one", "two", "three"}, + AccountIDs: []string{"one", "two", "three"}, ActionsId: "TEST_ACTIONS", } at2 := &ActionPlan{ Uuid: "some uuid22", Id: "test2", - AccountIds: []string{"three", "four"}, + AccountIDs: []string{"three", "four"}, ActionsId: "TEST_ACTIONS2", } ats := ActionPlans{at1, at2} @@ -522,7 +523,7 @@ func TestActionPlansRemoveMember(t *testing.T) { if ats2 = RemActionPlan(ats2, "", ""); len(ats2) != 0 { t.Error("Should have no members anymore", ats2) } -} +}*/ func TestActionTriggerMatchNil(t *testing.T) { at := &ActionTrigger{ @@ -1056,10 +1057,8 @@ func TestActionPlanLogging(t *testing.T) { Rates: RateGroups{&Rate{0, 1.0, 1 * time.Second, 60 * time.Second}}, }, } - at := &ActionPlan{ - Uuid: "some uuid", - Id: "test", - AccountIds: []string{"one", "two", "three"}, + at := &ActionTiming{ + accountIDs: map[string]struct{}{"one": struct{}{}, "two": struct{}{}, "three": struct{}{}}, Timing: i, Weight: 10.0, ActionsId: "TEST_ACTIONS", @@ -1068,7 +1067,7 @@ func TestActionPlanLogging(t *testing.T) { if err != nil { t.Error("Error getting actions for the action trigger: ", err) } - storageLogger.LogActionPlan(utils.SCHED_SOURCE, at, as) + storageLogger.LogActionTiming(utils.SCHED_SOURCE, at, as) //expected := "some uuid|test|one,two,three|;1,2,3,4,5,6,7,8,9,10,11,12;1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31;1,2,3,4,5;18:00:00;00:00:00;10;0;1;60;1|10|TEST_ACTIONS*|TOPUP|MONETARY|OUT|10|0" var key string atMap, _ := ratingStorage.GetAllActionPlans() @@ -1104,8 +1103,8 @@ func TestRemoveAction(t *testing.T) { ActionType: REMOVE_ACCOUNT, } - at := &ActionPlan{ - AccountIds: []string{"cgrates.org:remo"}, + at := &ActionTiming{ + accountIDs: map[string]struct{}{"cgrates.org:remo": struct{}{}}, actions: Actions{a}, } at.Execute() @@ -1123,8 +1122,8 @@ func TestTopupAction(t *testing.T) { Balance: &Balance{Value: 25, DestinationIds: utils.NewStringMap("RET"), Directions: utils.NewStringMap(utils.OUT), Weight: 20}, } - at := &ActionPlan{ - AccountIds: []string{"vdf:minu"}, + at := &ActionTiming{ + accountIDs: map[string]struct{}{"vdf:minu": struct{}{}}, actions: Actions{a}, } @@ -1145,8 +1144,8 @@ func TestTopupActionLoaded(t *testing.T) { Balance: &Balance{Value: 25, DestinationIds: utils.NewStringMap("RET"), Directions: utils.NewStringMap(utils.OUT), Weight: 20}, } - at := &ActionPlan{ - AccountIds: []string{"vdf:minitsboy"}, + at := &ActionTiming{ + accountIDs: map[string]struct{}{"vdf:minitsboy": struct{}{}}, actions: Actions{a}, } @@ -1297,8 +1296,8 @@ func TestActionTransactionFuncType(t *testing.T) { if err != nil { t.Error("Error setting account: ", err) } - at := &ActionPlan{ - AccountIds: []string{"cgrates.org:trans"}, + at := &ActionTiming{ + accountIDs: map[string]struct{}{"cgrates.org:trans": struct{}{}}, Timing: &RateInterval{}, actions: []*Action{ &Action{ @@ -1335,8 +1334,8 @@ func TestActionTransactionBalanceType(t *testing.T) { if err != nil { t.Error("Error setting account: ", err) } - at := &ActionPlan{ - AccountIds: []string{"cgrates.org:trans"}, + at := &ActionTiming{ + accountIDs: map[string]struct{}{"cgrates.org:trans": struct{}{}}, Timing: &RateInterval{}, actions: []*Action{ &Action{ @@ -1373,8 +1372,8 @@ func TestActionWithExpireWithoutExpire(t *testing.T) { if err != nil { t.Error("Error setting account: ", err) } - at := &ActionPlan{ - AccountIds: []string{"cgrates.org:exp"}, + at := &ActionTiming{ + accountIDs: map[string]struct{}{"cgrates.org:exp": struct{}{}}, Timing: &RateInterval{}, actions: []*Action{ &Action{ @@ -1428,8 +1427,8 @@ func TestActionRemoveBalance(t *testing.T) { if err != nil { t.Error("Error setting account: ", err) } - at := &ActionPlan{ - AccountIds: []string{"cgrates.org:rembal"}, + at := &ActionTiming{ + accountIDs: map[string]struct{}{"cgrates.org:rembal": struct{}{}}, Timing: &RateInterval{}, actions: []*Action{ &Action{ diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go index 2e8744fcd..b33807f38 100644 --- a/engine/calldesc_test.go +++ b/engine/calldesc_test.go @@ -487,8 +487,9 @@ func TestMaxSessionTimeWithAccount(t *testing.T) { } func TestMaxSessionTimeWithMaxRate(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } //acc, _ := accountingStorage.GetAccount("cgrates.org:12345") @@ -513,8 +514,9 @@ func TestMaxSessionTimeWithMaxRate(t *testing.T) { } func TestMaxSessionTimeWithMaxCost(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd := &CallDescriptor{ @@ -536,8 +538,9 @@ func TestMaxSessionTimeWithMaxCost(t *testing.T) { } func TestGetCostWithMaxCost(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd := &CallDescriptor{ @@ -558,8 +561,9 @@ func TestGetCostWithMaxCost(t *testing.T) { } } func TestGetCostRoundingIssue(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd := &CallDescriptor{ @@ -582,8 +586,9 @@ func TestGetCostRoundingIssue(t *testing.T) { } func TestGetCostRatingInfoOnZeroTime(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd := &CallDescriptor{ @@ -609,8 +614,9 @@ func TestGetCostRatingInfoOnZeroTime(t *testing.T) { } func TestDebitRatingInfoOnZeroTime(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd := &CallDescriptor{ @@ -637,8 +643,9 @@ func TestDebitRatingInfoOnZeroTime(t *testing.T) { } func TestMaxDebitRatingInfoOnZeroTime(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd := &CallDescriptor{ @@ -664,8 +671,9 @@ func TestMaxDebitRatingInfoOnZeroTime(t *testing.T) { } func TestMaxDebitUnknowDest(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd := &CallDescriptor{ @@ -686,8 +694,9 @@ func TestMaxDebitUnknowDest(t *testing.T) { } func TestGetCostMaxDebitRoundingIssue(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd := &CallDescriptor{ @@ -718,8 +727,9 @@ func TestGetCostMaxDebitRoundingIssue(t *testing.T) { } func TestMaxSessionTimeWithMaxCostFree(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd := &CallDescriptor{ @@ -741,8 +751,9 @@ func TestMaxSessionTimeWithMaxCostFree(t *testing.T) { } func TestMaxDebitWithMaxCostFree(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd := &CallDescriptor{ @@ -764,8 +775,9 @@ func TestMaxDebitWithMaxCostFree(t *testing.T) { } func TestGetCostWithMaxCostFree(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd := &CallDescriptor{ @@ -818,12 +830,14 @@ func TestMaxSessionTimeWithAccountAlias(t *testing.T) { } func TestMaxSessionTimeWithAccountShared(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP_SHARED0_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED0_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } - ap, _ = ratingStorage.GetActionPlans("TOPUP_SHARED10_AT", false) - for _, at := range ap { + ap, _ = ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } @@ -857,12 +871,14 @@ func TestMaxSessionTimeWithAccountShared(t *testing.T) { } func TestMaxDebitWithAccountShared(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP_SHARED0_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED0_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } - ap, _ = ratingStorage.GetActionPlans("TOPUP_SHARED10_AT", false) - for _, at := range ap { + ap, _ = ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } @@ -1077,8 +1093,9 @@ func TestMaxSesionTimeLongerThanMoney(t *testing.T) { } func TestDebitFromShareAndNormal(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP_SHARED10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP_SHARED10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } @@ -1105,8 +1122,9 @@ func TestDebitFromShareAndNormal(t *testing.T) { } func TestDebitFromEmptyShare(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP_EMPTY_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP_EMPTY_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } @@ -1133,8 +1151,9 @@ func TestDebitFromEmptyShare(t *testing.T) { } func TestDebitNegatve(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("POST_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("POST_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } @@ -1172,8 +1191,9 @@ func TestDebitNegatve(t *testing.T) { } func TestMaxDebitZeroDefinedRate(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd1 := &CallDescriptor{ @@ -1200,8 +1220,9 @@ func TestMaxDebitZeroDefinedRate(t *testing.T) { } func TestMaxDebitZeroDefinedRateOnlyMinutes(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd1 := &CallDescriptor{ @@ -1228,8 +1249,9 @@ func TestMaxDebitZeroDefinedRateOnlyMinutes(t *testing.T) { } func TestMaxDebitConsumesMinutes(t *testing.T) { - ap, _ := ratingStorage.GetActionPlans("TOPUP10_AT", false) - for _, at := range ap { + ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs at.Execute() } cd1 := &CallDescriptor{ diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index eb6a1bdfa..e01df4be7 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -985,25 +985,42 @@ func TestLoadActionTimings(t *testing.T) { if len(csvr.actionPlans) != 6 { t.Error("Failed to load action timings: ", len(csvr.actionPlans)) } - atm := csvr.actionPlans["MORE_MINUTES"][0] + atm := csvr.actionPlans["MORE_MINUTES"] expected := &ActionPlan{ Uuid: atm.Uuid, Id: "MORE_MINUTES", - AccountIds: []string{"vdf:minitsboy"}, - Timing: &RateInterval{ - Timing: &RITiming{ - Years: utils.Years{2012}, - Months: utils.Months{}, - MonthDays: utils.MonthDays{}, - WeekDays: utils.WeekDays{}, - StartTime: utils.ASAP, + AccountIDs: map[string]struct{}{"vdf:minitsboy": struct{}{}}, + ActionTimings: []*ActionTiming{ + &ActionTiming{ + Timing: &RateInterval{ + Timing: &RITiming{ + Years: utils.Years{2012}, + Months: utils.Months{}, + MonthDays: utils.MonthDays{}, + WeekDays: utils.WeekDays{}, + StartTime: utils.ASAP, + }, + }, + Weight: 10, + ActionsId: "MINI", + }, + &ActionTiming{ + Timing: &RateInterval{ + Timing: &RITiming{ + Years: utils.Years{2012}, + Months: utils.Months{}, + MonthDays: utils.MonthDays{}, + WeekDays: utils.WeekDays{}, + StartTime: utils.ASAP, + }, + }, + Weight: 10, + ActionsId: "SHARED", }, }, - Weight: 10, - ActionsId: "MINI", } if !reflect.DeepEqual(atm, expected) { - t.Errorf("Error loading action timing:\n%+v", atm) + t.Errorf("Error loading action timing:\n%+v", atm.ActionTimings[1].Timing) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 8c51db8ea..916cd511c 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -63,9 +63,9 @@ type RatingStorage interface { SetSharedGroup(*SharedGroup) error GetActionTriggers(string) (ActionTriggers, error) SetActionTriggers(string, ActionTriggers) error - GetActionPlans(string, bool) (ActionPlans, error) - SetActionPlans(string, ActionPlans) error - GetAllActionPlans() (map[string]ActionPlans, error) + GetActionPlan(string, bool) (*ActionPlan, error) + SetActionPlan(string, *ActionPlan) error + GetAllActionPlans() (map[string]*ActionPlan, error) } type AccountingStorage interface { @@ -106,7 +106,7 @@ type LogStorage interface { Storage //GetAllActionTimingsLogs() (map[string]ActionsTimings, error) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) error - LogActionPlan(source string, at *ActionPlan, as Actions) error + LogActionTiming(source string, at *ActionTiming, as Actions) error } type LoadStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index adf785a69..776ffcaff 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -178,7 +178,7 @@ func (ms *MapStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, actK } if strings.HasPrefix(k, utils.ACTION_PLAN_PREFIX) { cache2go.RemKey(k) - if _, err := ms.GetActionPlans(k[len(utils.ACTION_PLAN_PREFIX):], true); err != nil { + if _, err := ms.GetActionPlan(k[len(utils.ACTION_PLAN_PREFIX):], true); err != nil { cache2go.RollbackTransaction() return err } @@ -645,11 +645,11 @@ func (ms *MapStorage) SetActionTriggers(key string, atrs ActionTriggers) (err er return } -func (ms *MapStorage) GetActionPlans(key string, skipCache bool) (ats ActionPlans, err error) { +func (ms *MapStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan, err error) { key = utils.ACTION_PLAN_PREFIX + key if !skipCache { if x, err := cache2go.Get(key); err == nil { - return x.(ActionPlans), nil + return x.(*ActionPlan), nil } else { return nil, err } @@ -663,8 +663,8 @@ func (ms *MapStorage) GetActionPlans(key string, skipCache bool) (ats ActionPlan return } -func (ms *MapStorage) SetActionPlans(key string, ats ActionPlans) (err error) { - if len(ats) == 0 { +func (ms *MapStorage) SetActionPlan(key string, ats *ActionPlan) (err error) { + if len(ats.ActionTimings) == 0 { // delete the key delete(ms.dict, utils.ACTION_PLAN_PREFIX+key) cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) @@ -675,15 +675,15 @@ func (ms *MapStorage) SetActionPlans(key string, ats ActionPlans) (err error) { return } -func (ms *MapStorage) GetAllActionPlans() (ats map[string]ActionPlans, err error) { +func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) { apls, err := cache2go.GetAllEntries(utils.ACTION_PLAN_PREFIX) if err != nil { return nil, err } - ats = make(map[string]ActionPlans, len(apls)) + ats = make(map[string]*ActionPlan, len(apls)) for key, value := range apls { - apl := value.(ActionPlans) + apl := value.(*ActionPlan) ats[key] = apl } @@ -774,7 +774,7 @@ func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, a return } -func (ms *MapStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) { +func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { mat, err := ms.ms.Marshal(at) if err != nil { return diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index 5ea101224..510442356 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -451,7 +451,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } for _, key := range aplKeys { cache2go.RemKey(key) - if _, err = ms.GetActionPlans(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil { + if _, err = ms.GetActionPlan(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil { cache2go.RollbackTransaction() return err } @@ -1026,17 +1026,17 @@ func (ms *MongoStorage) SetActionTriggers(key string, atrs ActionTriggers) (err return err } -func (ms *MongoStorage) GetActionPlans(key string, skipCache bool) (ats ActionPlans, err error) { +func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan, err error) { if !skipCache { if x, err := cache2go.Get(utils.ACTION_PLAN_PREFIX + key); err == nil { - return x.(ActionPlans), nil + return x.(*ActionPlan), nil } else { return nil, err } } var kv struct { Key string - Value ActionPlans + Value *ActionPlan } err = ms.db.C(colApl).Find(bson.M{"key": key}).One(&kv) if err == nil { @@ -1046,8 +1046,8 @@ func (ms *MongoStorage) GetActionPlans(key string, skipCache bool) (ats ActionPl return } -func (ms *MongoStorage) SetActionPlans(key string, ats ActionPlans) error { - if len(ats) == 0 { +func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan) error { + if len(ats.ActionTimings) == 0 { cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) err := ms.db.C(colApl).Remove(bson.M{"key": key}) if err != mgo.ErrNotFound { @@ -1057,20 +1057,20 @@ func (ms *MongoStorage) SetActionPlans(key string, ats ActionPlans) error { } _, err := ms.db.C(colApl).Upsert(bson.M{"key": key}, &struct { Key string - Value ActionPlans + Value *ActionPlan }{Key: key, Value: ats}) return err } -func (ms *MongoStorage) GetAllActionPlans() (ats map[string]ActionPlans, err error) { +func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) { apls, err := cache2go.GetAllEntries(utils.ACTION_PLAN_PREFIX) if err != nil { return nil, err } - ats = make(map[string]ActionPlans, len(apls)) + ats = make(map[string]*ActionPlan, len(apls)) for key, value := range apls { - apl := value.(ActionPlans) + apl := value.(*ActionPlan) ats[key] = apl } diff --git a/engine/storage_mongo_tp.go b/engine/storage_mongo_tp.go index 9a30c5287..7a1eda6b4 100644 --- a/engine/storage_mongo_tp.go +++ b/engine/storage_mongo_tp.go @@ -689,9 +689,9 @@ func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, }{ubId, at, as, time.Now(), source}) } -func (ms *MongoStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) { +func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { return ms.db.C(colLogApl).Insert(&struct { - ActionPlan *ActionPlan + ActionPlan *ActionTiming Actions Actions LogTime time.Time Source string diff --git a/engine/storage_redis.go b/engine/storage_redis.go index d93a59d14..e8bb6d493 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -282,7 +282,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } for _, key := range aplKeys { cache2go.RemKey(key) - if _, err = rs.GetActionPlans(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil { + if _, err = rs.GetActionPlan(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil { cache2go.RollbackTransaction() return err } @@ -893,11 +893,11 @@ func (rs *RedisStorage) SetActionTriggers(key string, atrs ActionTriggers) (err return conn.Cmd("SET", utils.ACTION_TRIGGER_PREFIX+key, result).Err } -func (rs *RedisStorage) GetActionPlans(key string, skipCache bool) (ats ActionPlans, err error) { +func (rs *RedisStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan, err error) { key = utils.ACTION_PLAN_PREFIX + key if !skipCache { if x, err := cache2go.Get(key); err == nil { - return x.(ActionPlans), nil + return x.(*ActionPlan), nil } else { return nil, err } @@ -911,8 +911,8 @@ func (rs *RedisStorage) GetActionPlans(key string, skipCache bool) (ats ActionPl return } -func (rs *RedisStorage) SetActionPlans(key string, ats ActionPlans) (err error) { - if len(ats) == 0 { +func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan) (err error) { + if len(ats.ActionTimings) == 0 { // delete the key err = rs.db.Cmd("DEL", utils.ACTION_PLAN_PREFIX+key).Err cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) @@ -925,15 +925,15 @@ func (rs *RedisStorage) SetActionPlans(key string, ats ActionPlans) (err error) return rs.db.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, result).Err } -func (rs *RedisStorage) GetAllActionPlans() (ats map[string]ActionPlans, err error) { +func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) { apls, err := cache2go.GetAllEntries(utils.ACTION_PLAN_PREFIX) if err != nil { return nil, err } - ats = make(map[string]ActionPlans, len(apls)) + ats = make(map[string]*ActionPlan, len(apls)) for key, value := range apls { - apl := value.(ActionPlans) + apl := value.(*ActionPlan) ats[key] = apl } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 9e550d066..2c0336e90 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -592,7 +592,7 @@ func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (*CallCost, func (self *SQLStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { return } -func (self *SQLStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) { +func (self *SQLStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { return } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index a44248459..912d1a629 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -19,7 +19,7 @@ type TpReader struct { accountingStorage AccountingStorage lr LoadReader actions map[string][]*Action - actionPlans map[string][]*ActionPlan + actionPlans map[string]*ActionPlan actionsTriggers map[string]ActionTriggers accountActions map[string]*Account dirtyRpAliases []*TenantRatingSubject // used to clean aliases that might have changed @@ -73,7 +73,7 @@ func NewTpReader(rs RatingStorage, as AccountingStorage, lr LoadReader, tpid, ti func (tpr *TpReader) Init() { tpr.actions = make(map[string][]*Action) - tpr.actionPlans = make(map[string][]*ActionPlan) + tpr.actionPlans = make(map[string]*ActionPlan) tpr.actionsTriggers = make(map[string]ActionTriggers) tpr.rates = make(map[string]*utils.TPRate) tpr.destinations = make(map[string]*Destination) @@ -575,9 +575,14 @@ func (tpr *TpReader) LoadActionPlans() (err error) { if !exists { return fmt.Errorf("[ActionPlans] Could not load the timing for tag: %v", at.TimingId) } - actPln := &ActionPlan{ - Uuid: utils.GenUUID(), - Id: atId, + var actPln *ActionPlan + if actPln, exists = tpr.actionPlans[atId]; !exists { + actPln = &ActionPlan{ + Uuid: utils.GenUUID(), + Id: atId, + } + } + actPln.ActionTimings = append(actPln.ActionTimings, &ActionTiming{ Weight: at.Weight, Timing: &RateInterval{ Timing: &RITiming{ @@ -589,8 +594,9 @@ func (tpr *TpReader) LoadActionPlans() (err error) { }, }, ActionsId: at.ActionsId, - } - tpr.actionPlans[atId] = append(tpr.actionPlans[atId], actPln) + }) + + tpr.actionPlans[atId] = actPln } } @@ -660,11 +666,10 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error // action timings if accountAction.ActionPlanId != "" { // get old userBalanceIds - var exitingAccountIds []string - existingActionPlans, err := tpr.ratingStorage.GetActionPlans(accountAction.ActionPlanId, true) - if err == nil && len(existingActionPlans) > 0 { - // all action timings from a specific tag shuld have the same list of user balances from the first one - exitingAccountIds = existingActionPlans[0].AccountIds + var exitingAccountIds map[string]struct{} + existingActionPlan, err := tpr.ratingStorage.GetActionPlan(accountAction.ActionPlanId, true) + if err == nil && existingActionPlan != nil { + exitingAccountIds = existingActionPlan.AccountIDs } tpap, err := tpr.lr.GetTpActionPlans(tpr.tpid, accountAction.ActionPlanId) @@ -677,7 +682,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error if err != nil { return err } - var actionTimings []*ActionPlan + var actionPlan *ActionPlan ats := aps[accountAction.ActionPlanId] for _, at := range ats { // Check action exists before saving it inside actionTiming key @@ -703,9 +708,13 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error } else { t = tpr.timings[at.TimingId] // *asap } - actPln := &ActionPlan{ - Uuid: utils.GenUUID(), - Id: accountAction.ActionPlanId, + if actionPlan == nil { + actionPlan = &ActionPlan{ + Uuid: utils.GenUUID(), + Id: accountAction.ActionPlanId, + } + } + actionPlan.ActionTimings = append(actionPlan.ActionTimings, &ActionTiming{ Weight: at.Weight, Timing: &RateInterval{ Timing: &RITiming{ @@ -716,25 +725,15 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error }, }, ActionsId: at.ActionsId, - } + }) // collect action ids from timings - actionsIds = append(actionsIds, actPln.ActionsId) - //add user balance id if no already in - found := false - for _, ubId := range exitingAccountIds { - if ubId == id { - found = true - break - } - } - if !found { - actPln.AccountIds = append(exitingAccountIds, id) - } - actionTimings = append(actionTimings, actPln) + actionsIds = append(actionsIds, at.ActionsId) + exitingAccountIds[id] = struct{}{} + actionPlan.AccountIDs = exitingAccountIds } - // write action triggers - err = tpr.ratingStorage.SetActionPlans(accountAction.ActionPlanId, actionTimings) + // write action plan + err = tpr.ratingStorage.SetActionPlan(accountAction.ActionPlanId, actionPlan) if err != nil { return errors.New(err.Error() + " (SetActionPlan): " + accountAction.ActionPlanId) } @@ -883,14 +882,15 @@ func (tpr *TpReader) LoadAccountActions() (err error) { } ub.InitCounters() tpr.accountActions[aa.KeyId()] = ub - aTimings, exists := tpr.actionPlans[aa.ActionPlanId] + actionPlan, exists := tpr.actionPlans[aa.ActionPlanId] if !exists { log.Printf("could not get action plan for tag %v", aa.ActionPlanId) // must not continue here } - for _, at := range aTimings { - at.AccountIds = append(at.AccountIds, aa.KeyId()) + if actionPlan.AccountIDs == nil { + actionPlan.AccountIDs = make(map[string]struct{}) } + actionPlan.AccountIDs[aa.KeyId()] = struct{}{} } return nil } @@ -1319,7 +1319,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Action Plans:") } for k, ats := range tpr.actionPlans { - err = tpr.ratingStorage.SetActionPlans(k, ats) + err = tpr.ratingStorage.SetActionPlan(k, ats) if err != nil { return err } From 65516e201e4d41bb545d442ea45229948ef3cf7c Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 17 Dec 2015 19:10:38 +0200 Subject: [PATCH 02/15] using tasks and action plans with no accounts --- engine/action_plan.go | 70 +++++++++++++++++++++++++++---------- engine/action_trigger.go | 6 +--- engine/actions_test.go | 4 +-- engine/loader_csv_test.go | 4 +-- engine/storage_interface.go | 2 ++ engine/storage_map.go | 26 ++++++++++++-- engine/storage_mongo.go | 24 +++++++++++++ engine/storage_redis.go | 21 +++++++++-- engine/storage_test.go | 35 +++++++++++++++++++ engine/tp_reader.go | 23 +++++++++--- scheduler/scheduler.go | 69 +++++++++++++++++------------------- utils/consts.go | 1 + utils/map.go | 17 +++++++++ 13 files changed, 230 insertions(+), 72 deletions(-) diff --git a/engine/action_plan.go b/engine/action_plan.go index 231c9568e..e5914c79c 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -34,11 +34,17 @@ const ( type ActionTiming struct { Uuid string Timing *RateInterval - ActionsId string + ActionsID string Weight float64 actions Actions - accountIDs map[string]struct{} - stCache time.Time // cached time of the next start + accountIDs map[string]struct{} // copy of action plans accounts + stCache time.Time // cached time of the next start +} + +type Task struct { + Uuid string + AccountID string + ActionsID string } type ActionPlan struct { @@ -55,6 +61,14 @@ func (apl *ActionPlan) RemoveAccountID(accID string) (found bool) { return } +func (t *Task) Execute() error { + return (&ActionTiming{ + Uuid: t.Uuid, + ActionsID: t.ActionsID, + accountIDs: map[string]struct{}{t.AccountID: struct{}{}}, + }).Execute() +} + func (at *ActionTiming) GetNextStartTime(now time.Time) (t time.Time) { if !at.stCache.IsZero() { return at.stCache @@ -236,37 +250,37 @@ func (at *ActionTiming) SetActions(as Actions) { at.actions = as } +func (at *ActionTiming) SetAccountIDs(accIDs map[string]struct{}) { + at.accountIDs = accIDs +} + func (at *ActionTiming) getActions() (as []*Action, err error) { if at.actions == nil { - at.actions, err = ratingStorage.GetActions(at.ActionsId, false) + at.actions, err = ratingStorage.GetActions(at.ActionsID, false) } at.actions.Sort() return at.actions, err } func (at *ActionTiming) Execute() (err error) { - if len(at.accountIDs) == 0 { // nothing to do if no accounts set - return - } at.ResetStartTimeCache() aac, err := at.getActions() if err != nil { - utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsId, err)) + utils.Logger.Err(fmt.Sprintf("Failed to get actions for %s: %s", at.ActionsID, err)) return } - for accId, _ := range at.accountIDs { + for accID, _ := range at.accountIDs { _, err = Guardian.Guard(func() (interface{}, error) { - ub, err := accountingStorage.GetAccount(accId) + ub, err := accountingStorage.GetAccount(accID) if err != nil { - utils.Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", accId)) + utils.Logger.Warning(fmt.Sprintf("Could not get account id: %s. Skipping!", accID)) return 0, err } transactionFailed := false - toBeSaved := true for _, a := range aac { if ub.Disabled && a.ActionType != ENABLE_ACCOUNT { continue // disabled acocunts are not removed from action plan - //return 0, fmt.Errorf("Account %s is disabled", accId) + //return 0, fmt.Errorf("Account %s is disabled", accID) } if expDate, parseErr := utils.ParseDate(a.ExpirationString); (a.Balance == nil || a.Balance.ExpirationDate.IsZero()) && parseErr == nil && !expDate.IsZero() { a.Balance.ExpirationDate = expDate @@ -285,13 +299,33 @@ func (at *ActionTiming) Execute() (err error) { transactionFailed = true break } - toBeSaved = true } - if !transactionFailed && toBeSaved { + if !transactionFailed { accountingStorage.SetAccount(ub) } return 0, nil - }, 0, accId) + }, 0, accID) + } + if len(at.accountIDs) == 0 { // action timing executing without accounts + for _, a := range aac { + + if expDate, parseErr := utils.ParseDate(a.ExpirationString); (a.Balance == nil || a.Balance.ExpirationDate.IsZero()) && + parseErr == nil && !expDate.IsZero() { + a.Balance.ExpirationDate = expDate + } + + actionFunction, exists := getActionFunc(a.ActionType) + if !exists { + // do not allow the action plan to be rescheduled + at.Timing = nil + utils.Logger.Err(fmt.Sprintf("Function type %v not available, aborting execution!", a.ActionType)) + break + } + if err := actionFunction(nil, nil, a, aac); err != nil { + utils.Logger.Err(fmt.Sprintf("Error executing action %s: %v!", a.ActionType, err)) + break + } + } } if err != nil { utils.Logger.Warning(fmt.Sprintf("Error executing action plan: %v", err)) @@ -344,8 +378,8 @@ func (atpl ActionTimingPriorityList) Sort() { ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] continue } - for iAcc, accId := range at.AccountIds { - if accId == accountId { + for iAcc, accID := range at.AccountIds { + if accID == accountId { if len(at.AccountIds) == 1 { // Only one balance, remove complete at if len(ats) == 1 { // Removing last item, by init empty return make([]*ActionPlan, 0) diff --git a/engine/action_trigger.go b/engine/action_trigger.go index c7bec094b..9c3d16178 100644 --- a/engine/action_trigger.go +++ b/engine/action_trigger.go @@ -73,7 +73,6 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro } at.Executed = true transactionFailed := false - toBeSaved := true for _, a := range aac { if a.Balance == nil { a.Balance = &Balance{} @@ -91,16 +90,13 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro transactionFailed = false break } - toBeSaved = true } if transactionFailed || at.Recurrent { at.Executed = false } if !transactionFailed && ub != nil { storageLogger.LogActionTrigger(ub.Id, utils.RATER_SOURCE, at, aac) - if toBeSaved { - accountingStorage.SetAccount(ub) - } + accountingStorage.SetAccount(ub) } return } diff --git a/engine/actions_test.go b/engine/actions_test.go index b96003200..75b6ef9b9 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -1061,9 +1061,9 @@ func TestActionPlanLogging(t *testing.T) { accountIDs: map[string]struct{}{"one": struct{}{}, "two": struct{}{}, "three": struct{}{}}, Timing: i, Weight: 10.0, - ActionsId: "TEST_ACTIONS", + ActionsID: "TEST_ACTIONS", } - as, err := ratingStorage.GetActions(at.ActionsId, false) + as, err := ratingStorage.GetActions(at.ActionsID, false) if err != nil { t.Error("Error getting actions for the action trigger: ", err) } diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index e01df4be7..e0e24f13f 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -1002,7 +1002,7 @@ func TestLoadActionTimings(t *testing.T) { }, }, Weight: 10, - ActionsId: "MINI", + ActionsID: "MINI", }, &ActionTiming{ Timing: &RateInterval{ @@ -1015,7 +1015,7 @@ func TestLoadActionTimings(t *testing.T) { }, }, Weight: 10, - ActionsId: "SHARED", + ActionsID: "SHARED", }, }, } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 916cd511c..ee67ad681 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -66,6 +66,8 @@ type RatingStorage interface { GetActionPlan(string, bool) (*ActionPlan, error) SetActionPlan(string, *ActionPlan) error GetAllActionPlans() (map[string]*ActionPlan, error) + PushTask(*Task) error + PopTask() (*Task, error) } type AccountingStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index 776ffcaff..a362e1e01 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -33,8 +33,9 @@ import ( ) type MapStorage struct { - dict map[string][]byte - ms Marshaler + dict map[string][]byte + tasks [][]byte + ms Marshaler } func NewMapStorage() (*MapStorage, error) { @@ -690,6 +691,27 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error return } +func (ms *MapStorage) PushTask(t *Task) error { + result, err := ms.ms.Marshal(t) + if err != nil { + return err + } + ms.tasks = append(ms.tasks, result) + return nil +} + +func (ms *MapStorage) PopTask() (t *Task, err error) { + if len(ms.tasks) > 0 { + var values []byte + values, ms.tasks = ms.tasks[0], ms.tasks[1:] + t = &Task{} + err = ms.ms.Unmarshal(values, t) + } else { + err = utils.ErrNotFound + } + return +} + func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) { key = utils.DERIVEDCHARGERS_PREFIX + key if !skipCache { diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index 510442356..3f33bdf61 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -34,6 +34,7 @@ const ( colDst = "destinations" colAct = "actions" colApl = "actionplans" + colTsk = "tasks" colAtr = "actiontriggers" colRpl = "ratingplans" colRpf = "ratingprofiles" @@ -1043,10 +1044,13 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl ats = kv.Value cache2go.Cache(utils.ACTION_PLAN_PREFIX+key, ats) } + ats.AccountIDs = utils.YesDots(ats.AccountIDs) return } func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan) error { + // clean dots from account ids map + ats.AccountIDs = utils.NoDots(ats.AccountIDs) if len(ats.ActionTimings) == 0 { cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) err := ms.db.C(colApl).Remove(bson.M{"key": key}) @@ -1071,12 +1075,32 @@ func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err ats = make(map[string]*ActionPlan, len(apls)) for key, value := range apls { apl := value.(*ActionPlan) + apl.AccountIDs = utils.YesDots(apl.AccountIDs) ats[key] = apl } return } +func (ms *MongoStorage) PushTask(t *Task) error { + return ms.db.C(colTsk).Insert(bson.M{"_id": bson.NewObjectId(), "task": t}) +} + +func (ms *MongoStorage) PopTask() (t *Task, err error) { + v := struct { + ID bson.ObjectId `bson:"_id"` + Task *Task + }{} + if err = ms.db.C(colTsk).Find(nil).One(&v); err == nil { + if remErr := ms.db.C(colTsk).Remove(bson.M{"_id": v.ID}); remErr != nil { + return nil, remErr + } + t = v.Task + } + + return +} + func (ms *MongoStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) { if !skipCache { if x, err := cache2go.Get(utils.DERIVEDCHARGERS_PREFIX + key); err == nil { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index e8bb6d493..4f252548b 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -886,7 +886,7 @@ func (rs *RedisStorage) SetActionTriggers(key string, atrs ActionTriggers) (err // delete the key return conn.Cmd("DEL", utils.ACTION_TRIGGER_PREFIX+key).Err } - result, err := rs.ms.Marshal(&atrs) + result, err := rs.ms.Marshal(atrs) if err != nil { return err } @@ -940,6 +940,23 @@ func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err return } +func (rs *RedisStorage) PushTask(t *Task) error { + result, err := rs.ms.Marshal(t) + if err != nil { + return err + } + return rs.db.Cmd("RPUSH", utils.TASKS_KEY, result).Err +} + +func (rs *RedisStorage) PopTask() (t *Task, err error) { + var values []byte + if values, err = rs.db.Cmd("LPOP", utils.TASKS_KEY).Bytes(); err == nil { + t = &Task{} + err = rs.ms.Unmarshal(values, t) + } + return +} + func (rs *RedisStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) { key = utils.DERIVEDCHARGERS_PREFIX + key if !skipCache { @@ -1037,7 +1054,7 @@ func (rs *RedisStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, return rs.db.Cmd("SET", utils.LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v*%v", ubId, string(mat), string(mas)))).Err } -func (rs *RedisStorage) LogActionPlan(source string, at *ActionPlan, as Actions) (err error) { +func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { mat, err := rs.ms.Marshal(at) if err != nil { return diff --git a/engine/storage_test.go b/engine/storage_test.go index cd5cf9144..375979fa9 100644 --- a/engine/storage_test.go +++ b/engine/storage_test.go @@ -272,6 +272,41 @@ func TestDifferentUuid(t *testing.T) { } } +func TestStorageTask(t *testing.T) { + // clean previous unused tasks + for i := 0; i < 16; i++ { + ratingStorage.PopTask() + } + + if err := ratingStorage.PushTask(&Task{Uuid: "1"}); err != nil { + t.Error("Error pushing task: ", err) + } + if err := ratingStorage.PushTask(&Task{Uuid: "2"}); err != nil { + t.Error("Error pushing task: ", err) + } + if err := ratingStorage.PushTask(&Task{Uuid: "3"}); err != nil { + t.Error("Error pushing task: ", err) + } + if err := ratingStorage.PushTask(&Task{Uuid: "4"}); err != nil { + t.Error("Error pushing task: ", err) + } + if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "1" { + t.Error("Error poping task: ", task, err) + } + if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "2" { + t.Error("Error poping task: ", task, err) + } + if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "3" { + t.Error("Error poping task: ", task, err) + } + if task, err := ratingStorage.PopTask(); err != nil && task.Uuid != "4" { + t.Error("Error poping task: ", task, err) + } + if task, err := ratingStorage.PopTask(); err == nil && task != nil { + t.Errorf("Error poping task %+v, %v: ", task, err) + } +} + /************************** Benchmarks *****************************/ func GetUB() *Account { diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 912d1a629..abafa2811 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -593,7 +593,7 @@ func (tpr *TpReader) LoadActionPlans() (err error) { StartTime: t.StartTime, }, }, - ActionsId: at.ActionsId, + ActionsID: at.ActionsId, }) tpr.actionPlans[atId] = actPln @@ -724,7 +724,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error StartTime: t.StartTime, }, }, - ActionsId: at.ActionsId, + ActionsID: at.ActionsId, }) // collect action ids from timings actionsIds = append(actionsIds, at.ActionsId) @@ -873,7 +873,6 @@ func (tpr *TpReader) LoadAccountActions() (err error) { return fmt.Errorf("could not get action triggers for tag %s", aa.ActionTriggersId) } } - ub := &Account{ Id: aa.KeyId(), ActionTriggers: aTriggers, @@ -1318,8 +1317,22 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) { if verbose { log.Print("Action Plans:") } - for k, ats := range tpr.actionPlans { - err = tpr.ratingStorage.SetActionPlan(k, ats) + for k, ap := range tpr.actionPlans { + for _, at := range ap.ActionTimings { + if at.IsASAP() { + for accID := range ap.AccountIDs { + t := &Task{ + Uuid: utils.GenUUID(), + AccountID: accID, + ActionsID: at.ActionsID, + } + if err = tpr.ratingStorage.PushTask(t); err != nil { + return err + } + } + } + } + err = tpr.ratingStorage.SetActionPlan(k, ap) if err != nil { return err } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5b29216ec..fecc15608 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -29,7 +29,7 @@ import ( ) type Scheduler struct { - queue engine.ActionPlanPriotityList + queue engine.ActionTimingPriorityList timer *time.Timer restartLoop chan bool sync.Mutex @@ -56,7 +56,7 @@ func (s *Scheduler) Loop() { utils.Logger.Info(fmt.Sprintf(" Scheduler queue length: %v", len(s.queue))) s.Lock() a0 := s.queue[0] - utils.Logger.Info(fmt.Sprintf(" Action: %s", a0.Id)) + utils.Logger.Info(fmt.Sprintf(" Action: %s", a0.ActionsID)) now := time.Now() start := a0.GetNextStartTime(now) if start.Equal(now) || start.Before(now) { @@ -77,12 +77,12 @@ func (s *Scheduler) Loop() { } else { s.Unlock() d := a0.GetNextStartTime(now).Sub(now) - utils.Logger.Info(fmt.Sprintf(" Time to next action (%s): %v", a0.Id, d)) + utils.Logger.Info(fmt.Sprintf(" Time to next action (%s): %v", a0.ActionsID, d)) s.timer = time.NewTimer(d) select { case <-s.timer.C: // timer has expired - utils.Logger.Info(fmt.Sprintf(" Time for action on %v", a0.Id)) + utils.Logger.Info(fmt.Sprintf(" Time for action on %v", a0.ActionsID)) case <-s.restartLoop: // nothing to do, just continue the loop } @@ -120,6 +120,21 @@ func (s *Scheduler) Reload(protect bool) { } func (s *Scheduler) loadActionPlans() { + // limit the number of concurrent tasks + var limit = make(chan bool, 10) + // execute existing tasks + for { + task, err := s.storage.PopTask() + if err != nil || task == nil { + break + } + limit <- true + go func() { + task.Execute() + <-limit + }() + } + actionPlans, err := s.storage.GetAllActionPlans() if err != nil && err != utils.ErrNotFound { utils.Logger.Warning(fmt.Sprintf(" Cannot get action plans: %v", err)) @@ -128,43 +143,25 @@ func (s *Scheduler) loadActionPlans() { // recreate the queue s.Lock() defer s.Unlock() - s.queue = engine.ActionPlanPriotityList{} - for key, aps := range actionPlans { - toBeSaved := false - isAsap := false - var newApls []*engine.ActionPlan // will remove the one time runs from the database - for _, ap := range aps { - if ap.Timing == nil { - utils.Logger.Warning(fmt.Sprintf(" Nil timing on action plan: %+v, discarding!", ap)) + s.queue = engine.ActionTimingPriorityList{} + for _, actionPlan := range actionPlans { + for _, at := range actionPlan.ActionTimings { + if at.Timing == nil { + utils.Logger.Warning(fmt.Sprintf(" Nil timing on action plan: %+v, discarding!", at)) continue } - if len(ap.AccountIds) == 0 { // no accounts just ignore + if at.IsASAP() { continue } - isAsap = ap.IsASAP() - toBeSaved = toBeSaved || isAsap - if isAsap { - utils.Logger.Info(fmt.Sprintf(" Time for one time action on %v", key)) - ap.Execute() - ap.AccountIds = make([]string, 0) - } else { - now := time.Now() - if ap.GetNextStartTime(now).Before(now) { - // the task is obsolete, do not add it to the queue - continue - } - s.queue = append(s.queue, ap) + now := time.Now() + if at.GetNextStartTime(now).Before(now) { + // the task is obsolete, do not add it to the queue + continue } - // save even asap action plans with empty account id list - newApls = append(newApls, ap) - } - if toBeSaved { - engine.Guardian.Guard(func() (interface{}, error) { - s.storage.SetActionPlans(key, newApls) - s.storage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}}) - return 0, nil - }, 0, utils.ACTION_PLAN_PREFIX) + at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts + s.queue = append(s.queue, at) + } } sort.Sort(s.queue) @@ -180,6 +177,6 @@ func (s *Scheduler) restart() { } } -func (s *Scheduler) GetQueue() engine.ActionPlanPriotityList { +func (s *Scheduler) GetQueue() engine.ActionTimingPriorityList { return s.queue } diff --git a/utils/consts.go b/utils/consts.go index 38e56d530..492ced815 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -169,6 +169,7 @@ const ( ASR = "ASR" ACD = "ACD" FILTER_REGEXP_TPL = "$1$2$3$4$5" + TASKS_KEY = "tasks" ACTION_PLAN_PREFIX = "apl_" ACTION_TRIGGER_PREFIX = "atr_" RATING_PLAN_PREFIX = "rpl_" diff --git a/utils/map.go b/utils/map.go index 5a71ca506..90bce4cd4 100644 --- a/utils/map.go +++ b/utils/map.go @@ -143,3 +143,20 @@ func (sm StringMap) GetOne() string { } return "" } + +func NoDots(m map[string]struct{}) map[string]struct{} { + return MapKeysReplace(m, ".", ".") +} + +func YesDots(m map[string]struct{}) map[string]struct{} { + return MapKeysReplace(m, ".", ".") +} + +func MapKeysReplace(m map[string]struct{}, old, new string) map[string]struct{} { + for key, val := range m { + delete(m, key) + key = strings.Replace(key, old, new, -1) + m[key] = val + } + return m +} From a250fc668f4ce754142d6ea1fd1ccb52c8f1a7b4 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 10:26:41 +0200 Subject: [PATCH 03/15] updated apis --- apier/v1/accounts.go | 256 +++++++++++++++++++++++++++------ apier/v1/apier.go | 180 ++--------------------- apier/v1/scheduler.go | 10 +- console/actionplan_get.go | 2 +- engine/action_plan.go | 59 +++----- engine/loader_csv_test.go | 1 - engine/tp_reader.go | 6 +- general_tests/acntacts_test.go | 10 +- scheduler/scheduler.go | 35 +---- utils/apitpdata.go | 6 +- 10 files changed, 265 insertions(+), 300 deletions(-) diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 08640f219..39acaf5ae 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -45,17 +45,24 @@ func (self *ApierV1) GetAccountActionPlan(attrs AttrAcntAction, reply *[]*Accoun if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(strings.Join(missing, ","), "") } - accountATs := make([]*AccountActionTiming, 0) - allATs, err := self.RatingDb.GetAllActionPlans() + accountATs := make([]*AccountActionTiming, 0) // needs to be initialized if remains empty + allAPs, err := self.RatingDb.GetAllActionPlans() if err != nil { return utils.NewErrServerError(err) } - for _, ats := range allATs { - for _, at := range ats { - if utils.IsSliceMember(at.AccountIds, utils.AccountKey(attrs.Tenant, attrs.Account)) { - accountATs = append(accountATs, &AccountActionTiming{Uuid: at.Uuid, ActionPlanId: at.Id, ActionsId: at.ActionsId, NextExecTime: at.GetNextStartTime(time.Now())}) + accID := utils.AccountKey(attrs.Tenant, attrs.Account) + for _, ap := range allAPs { + if _, exists := ap.AccountIDs[accID]; exists { + for _, at := range ap.ActionTimings { + accountATs = append(accountATs, &AccountActionTiming{ + ActionPlanId: ap.Id, + Uuid: at.Uuid, + ActionsId: at.ActionsID, + NextExecTime: at.GetNextStartTime(time.Now()), + }) } } + } *reply = accountATs return nil @@ -80,22 +87,41 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e } } _, err := engine.Guardian.Guard(func() (interface{}, error) { - ats, err := self.RatingDb.GetActionPlans(attrs.ActionPlanId, false) + ap, err := self.RatingDb.GetActionPlan(attrs.ActionPlanId, false) if err != nil { return 0, err - } else if len(ats) == 0 { + } else if ap == nil { return 0, utils.ErrNotFound } - ats = engine.RemActionPlan(ats, attrs.ActionTimingId, utils.AccountKey(attrs.Tenant, attrs.Account)) - if err := self.RatingDb.SetActionPlans(attrs.ActionPlanId, ats); err != nil { - return 0, err + + if attrs.ActionPlanId != "" { // delete the entire action plan + ap.ActionTimings = nil // will delete the action plan + return 0, self.RatingDb.SetActionPlan(ap.Id, ap) } - if len(ats) > 0 { // update cache - self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + attrs.ActionPlanId}}) + + if attrs.ActionTimingId != "" { // delete only a action timing from action plan + for i, at := range ap.ActionTimings { + if at.Uuid == attrs.ActionTimingId { + ap.ActionTimings[i] = ap.ActionTimings[len(ap.ActionTimings)-1] + ap.ActionTimings = ap.ActionTimings[:len(ap.ActionTimings)-1] + break + } + } + return 0, self.RatingDb.SetActionPlan(ap.Id, ap) } + + if attrs.Tenant != "" && attrs.Account != "" { + accID := utils.AccountKey(attrs.Tenant, attrs.Account) + delete(ap.AccountIDs, accID) + return 0, self.RatingDb.SetActionPlan(ap.Id, ap) + } + + // update cache + self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + attrs.ActionPlanId}}) return 0, nil }, 0, utils.ACTION_PLAN_PREFIX) if err != nil { + *reply = err.Error() return utils.NewErrServerError(err) } if attrs.ReloadScheduler && self.Sched != nil { @@ -130,9 +156,9 @@ func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, r if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } - accId := utils.AccountKey(attrs.Tenant, attrs.Account) + accID := utils.AccountKey(attrs.Tenant, attrs.Account) _, err := engine.Guardian.Guard(func() (interface{}, error) { - ub, err := self.AccountDb.GetAccount(accId) + ub, err := self.AccountDb.GetAccount(accID) if err != nil { return 0, err } @@ -152,7 +178,7 @@ func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, r return 0, err } return 0, nil - }, 0, accId) + }, 0, accID) if err != nil { return utils.NewErrServerError(err) } @@ -166,30 +192,28 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error return utils.NewErrMandatoryIeMissing(missing...) } var schedulerReloadNeeded = false - accId := utils.AccountKey(attr.Tenant, attr.Account) + accID := utils.AccountKey(attr.Tenant, attr.Account) var ub *engine.Account _, err := engine.Guardian.Guard(func() (interface{}, error) { - if bal, _ := self.AccountDb.GetAccount(accId); bal != nil { + if bal, _ := self.AccountDb.GetAccount(accID); bal != nil { ub = bal } else { // Not found in db, create it here ub = &engine.Account{ - Id: accId, + Id: accID, } } if len(attr.ActionPlanId) != 0 { _, err := engine.Guardian.Guard(func() (interface{}, error) { - var ats engine.ActionPlans + var ap *engine.ActionPlan var err error - ats, err = self.RatingDb.GetActionPlans(attr.ActionPlanId, false) + ap, err = self.RatingDb.GetActionPlan(attr.ActionPlanId, false) if err != nil { return 0, err } - for _, at := range ats { - at.AccountIds = append(at.AccountIds, accId) - } - if len(ats) != 0 { + if _, exists := ap.AccountIDs[accID]; !exists { + ap.AccountIDs[accID] = struct{}{} schedulerReloadNeeded = true - if err := self.RatingDb.SetActionPlans(attr.ActionPlanId, ats); err != nil { + if err := self.RatingDb.SetActionPlan(attr.ActionPlanId, ap); err != nil { return 0, err } // update cache @@ -221,11 +245,11 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error return 0, err } return 0, nil - }, 0, accId) + }, 0, accID) if err != nil { return utils.NewErrServerError(err) } - if schedulerReloadNeeded { + if attr.ReloadScheduler && schedulerReloadNeeded { // reload scheduler if self.Sched != nil { self.Sched.Reload(true) @@ -239,32 +263,20 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string) if missing := utils.MissingStructFields(&attr, []string{"Tenant", "Account"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } - accountId := utils.AccountKey(attr.Tenant, attr.Account) + accID := utils.AccountKey(attr.Tenant, attr.Account) var schedulerReloadNeeded bool _, err := engine.Guardian.Guard(func() (interface{}, error) { // remove it from all action plans - allATs, err := self.RatingDb.GetAllActionPlans() + allAPs, err := self.RatingDb.GetAllActionPlans() if err != nil && err != utils.ErrNotFound { return 0, err } - for key, ats := range allATs { - changed := false - for _, at := range ats { - for i := 0; i < len(at.AccountIds); i++ { - if at.AccountIds[i] == accountId { - // delete without preserving order - at.AccountIds[i] = at.AccountIds[len(at.AccountIds)-1] - at.AccountIds = at.AccountIds[:len(at.AccountIds)-1] - i-- - changed = true - } - } - } - if changed { + for key, ap := range allAPs { + if _, exists := ap.AccountIDs[accID]; !exists { schedulerReloadNeeded = true _, err := engine.Guardian.Guard(func() (interface{}, error) { // save action plan - self.RatingDb.SetActionPlans(key, ats) + self.RatingDb.SetActionPlan(key, ap) // cache self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}}) return 0, nil @@ -274,11 +286,11 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string) } } } - if err := self.AccountDb.RemoveAccount(accountId); err != nil { + if err := self.AccountDb.RemoveAccount(accID); err != nil { return 0, err } return 0, nil - }, 0, accountId) + }, 0, accID) // FIXME: remove from all actionplans? if err != nil { return utils.NewErrServerError(err) @@ -344,3 +356,153 @@ func (self *ApierV1) GetAccount(attr *utils.AttrGetAccount, reply *interface{}) *reply = userBalance.AsOldStructure() return nil } + +type AttrAddBalance struct { + Tenant string + Account string + BalanceUuid string + BalanceId string + BalanceType string + Directions string + Value float64 + ExpiryTime string + RatingSubject string + Categories string + DestinationIds string + Weight float64 + SharedGroups string + Overwrite bool // When true it will reset if the balance is already there + Disabled bool +} + +func (self *ApierV1) AddBalance(attr *AttrAddBalance, reply *string) error { + expTime, err := utils.ParseTimeDetectLayout(attr.ExpiryTime, self.Config.DefaultTimezone) + if err != nil { + *reply = err.Error() + return err + } + accID := utils.AccountKey(attr.Tenant, attr.Account) + if _, err := self.AccountDb.GetAccount(accID); err != nil { + // create account if not exists + account := &engine.Account{ + Id: accID, + } + if err := self.AccountDb.SetAccount(account); err != nil { + *reply = err.Error() + return err + } + } + at := &engine.ActionTiming{} + at.SetAccountIDs(map[string]struct{}{accID: struct{}{}}) + + aType := engine.DEBIT + // reverse the sign as it is a debit + attr.Value = -attr.Value + + if attr.Overwrite { + aType = engine.DEBIT_RESET + } + at.SetActions(engine.Actions{ + &engine.Action{ + ActionType: aType, + BalanceType: attr.BalanceType, + Balance: &engine.Balance{ + Uuid: attr.BalanceUuid, + Id: attr.BalanceId, + Value: attr.Value, + ExpirationDate: expTime, + RatingSubject: attr.RatingSubject, + Directions: utils.ParseStringMap(attr.Directions), + DestinationIds: utils.ParseStringMap(attr.DestinationIds), + Categories: utils.ParseStringMap(attr.Categories), + Weight: attr.Weight, + SharedGroups: utils.ParseStringMap(attr.SharedGroups), + Disabled: attr.Disabled, + }, + }, + }) + if err := at.Execute(); err != nil { + *reply = err.Error() + return err + } + *reply = OK + return nil +} + +func (self *ApierV1) EnableDisableBalance(attr *AttrAddBalance, reply *string) error { + expTime, err := utils.ParseDate(attr.ExpiryTime) + if err != nil { + *reply = err.Error() + return err + } + accID := utils.ConcatenatedKey(attr.Tenant, attr.Account) + if _, err := self.AccountDb.GetAccount(accID); err != nil { + return utils.ErrNotFound + } + at := &engine.ActionTiming{} + at.SetAccountIDs(map[string]struct{}{accID: struct{}{}}) + + at.SetActions(engine.Actions{ + &engine.Action{ + ActionType: engine.ENABLE_DISABLE_BALANCE, + BalanceType: attr.BalanceType, + Balance: &engine.Balance{ + Uuid: attr.BalanceUuid, + Id: attr.BalanceId, + Value: attr.Value, + ExpirationDate: expTime, + RatingSubject: attr.RatingSubject, + Directions: utils.ParseStringMap(attr.Directions), + DestinationIds: utils.ParseStringMap(attr.DestinationIds), + Weight: attr.Weight, + SharedGroups: utils.ParseStringMap(attr.SharedGroups), + Disabled: attr.Disabled, + }, + }, + }) + if err := at.Execute(); err != nil { + *reply = err.Error() + return err + } + *reply = OK + return nil +} + +func (self *ApierV1) RemoveBalances(attr *AttrAddBalance, reply *string) error { + expTime, err := utils.ParseDate(attr.ExpiryTime) + if err != nil { + *reply = err.Error() + return err + } + accID := utils.AccountKey(attr.Tenant, attr.Account) + if _, err := self.AccountDb.GetAccount(accID); err != nil { + return utils.ErrNotFound + } + + at := &engine.ActionTiming{} + at.SetAccountIDs(map[string]struct{}{accID: struct{}{}}) + at.SetActions(engine.Actions{ + &engine.Action{ + ActionType: engine.REMOVE_BALANCE, + BalanceType: attr.BalanceType, + Balance: &engine.Balance{ + Uuid: attr.BalanceUuid, + Id: attr.BalanceId, + Value: attr.Value, + ExpirationDate: expTime, + RatingSubject: attr.RatingSubject, + Directions: utils.ParseStringMap(attr.Directions), + DestinationIds: utils.ParseStringMap(attr.DestinationIds), + Weight: attr.Weight, + SharedGroups: utils.ParseStringMap(attr.SharedGroups), + Disabled: attr.Disabled, + }, + }, + }) + if err := at.Execute(); err != nil { + *reply = err.Error() + return err + } + *reply = OK + return nil +} diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 7b58735d8..e1e4e7bfb 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -102,162 +102,12 @@ func (self *ApierV1) GetRatingPlan(rplnId string, reply *engine.RatingPlan) erro return nil } -type AttrAddBalance struct { - Tenant string - Account string - BalanceUuid string - BalanceId string - BalanceType string - Directions string - Value float64 - ExpiryTime string - RatingSubject string - Categories string - DestinationIds string - Weight float64 - SharedGroups string - Overwrite bool // When true it will reset if the balance is already there - Disabled bool -} - -func (self *ApierV1) AddBalance(attr *AttrAddBalance, reply *string) error { - expTime, err := utils.ParseTimeDetectLayout(attr.ExpiryTime, self.Config.DefaultTimezone) - if err != nil { - *reply = err.Error() - return err - } - tag := utils.ConcatenatedKey(attr.Tenant, attr.Account) - if _, err := self.AccountDb.GetAccount(tag); err != nil { - // create account if not exists - account := &engine.Account{ - Id: tag, - } - if err := self.AccountDb.SetAccount(account); err != nil { - *reply = err.Error() - return err - } - } - at := &engine.ActionPlan{ - AccountIds: []string{tag}, - } - aType := engine.DEBIT - // reverse the sign as it is a debit - attr.Value = -attr.Value - - if attr.Overwrite { - aType = engine.DEBIT_RESET - } - at.SetActions(engine.Actions{ - &engine.Action{ - ActionType: aType, - BalanceType: attr.BalanceType, - Balance: &engine.Balance{ - Uuid: attr.BalanceUuid, - Id: attr.BalanceId, - Value: attr.Value, - ExpirationDate: expTime, - RatingSubject: attr.RatingSubject, - Directions: utils.ParseStringMap(attr.Directions), - DestinationIds: utils.ParseStringMap(attr.DestinationIds), - Categories: utils.ParseStringMap(attr.Categories), - Weight: attr.Weight, - SharedGroups: utils.ParseStringMap(attr.SharedGroups), - Disabled: attr.Disabled, - }, - }, - }) - if err := at.Execute(); err != nil { - *reply = err.Error() - return err - } - *reply = OK - return nil -} - -func (self *ApierV1) EnableDisableBalance(attr *AttrAddBalance, reply *string) error { - expTime, err := utils.ParseDate(attr.ExpiryTime) - if err != nil { - *reply = err.Error() - return err - } - tag := utils.ConcatenatedKey(attr.Tenant, attr.Account) - if _, err := self.AccountDb.GetAccount(tag); err != nil { - return utils.ErrNotFound - } - at := &engine.ActionPlan{ - AccountIds: []string{tag}, - } - at.SetActions(engine.Actions{ - &engine.Action{ - ActionType: engine.ENABLE_DISABLE_BALANCE, - BalanceType: attr.BalanceType, - Balance: &engine.Balance{ - Uuid: attr.BalanceUuid, - Id: attr.BalanceId, - Value: attr.Value, - ExpirationDate: expTime, - RatingSubject: attr.RatingSubject, - Directions: utils.ParseStringMap(attr.Directions), - DestinationIds: utils.ParseStringMap(attr.DestinationIds), - Weight: attr.Weight, - SharedGroups: utils.ParseStringMap(attr.SharedGroups), - Disabled: attr.Disabled, - }, - }, - }) - if err := at.Execute(); err != nil { - *reply = err.Error() - return err - } - *reply = OK - return nil -} - -func (self *ApierV1) RemoveBalances(attr *AttrAddBalance, reply *string) error { - expTime, err := utils.ParseDate(attr.ExpiryTime) - if err != nil { - *reply = err.Error() - return err - } - accId := utils.ConcatenatedKey(attr.Tenant, attr.Account) - if _, err := self.AccountDb.GetAccount(accId); err != nil { - return utils.ErrNotFound - } - at := &engine.ActionPlan{ - AccountIds: []string{accId}, - } - at.SetActions(engine.Actions{ - &engine.Action{ - ActionType: engine.REMOVE_BALANCE, - BalanceType: attr.BalanceType, - Balance: &engine.Balance{ - Uuid: attr.BalanceUuid, - Id: attr.BalanceId, - Value: attr.Value, - ExpirationDate: expTime, - RatingSubject: attr.RatingSubject, - Directions: utils.ParseStringMap(attr.Directions), - DestinationIds: utils.ParseStringMap(attr.DestinationIds), - Weight: attr.Weight, - SharedGroups: utils.ParseStringMap(attr.SharedGroups), - Disabled: attr.Disabled, - }, - }, - }) - if err := at.Execute(); err != nil { - *reply = err.Error() - return err - } - *reply = OK - return nil -} - func (self *ApierV1) ExecuteAction(attr *utils.AttrExecuteAction, reply *string) error { - accId := utils.AccountKey(attr.Tenant, attr.Account) - at := &engine.ActionPlan{ - AccountIds: []string{accId}, - ActionsId: attr.ActionsId, + accID := utils.AccountKey(attr.Tenant, attr.Account) + at := &engine.ActionTiming{ + ActionsID: attr.ActionsId, } + at.SetAccountIDs(map[string]struct{}{accID: struct{}{}}) if err := at.Execute(); err != nil { *reply = err.Error() return err @@ -745,8 +595,10 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) error return utils.ErrExists } } - storeAtms := make(engine.ActionPlans, len(attrs.ActionPlan)) - for idx, apiAtm := range attrs.ActionPlan { + ap := &engine.ActionPlan{ + Id: attrs.Id, + } + for _, apiAtm := range attrs.ActionPlan { if exists, err := self.RatingDb.HasData(utils.ACTION_PREFIX, apiAtm.ActionsId); err != nil { return utils.NewErrServerError(err) } else if !exists { @@ -758,16 +610,14 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) error timing.MonthDays.Parse(apiAtm.MonthDays, ";") timing.WeekDays.Parse(apiAtm.WeekDays, ";") timing.StartTime = apiAtm.Time - at := &engine.ActionPlan{ + ap.ActionTimings = append(ap.ActionTimings, &engine.ActionTiming{ Uuid: utils.GenUUID(), - Id: attrs.Id, Weight: apiAtm.Weight, Timing: &engine.RateInterval{Timing: timing}, - ActionsId: apiAtm.ActionsId, - } - storeAtms[idx] = at + ActionsID: apiAtm.ActionsId, + }) } - if err := self.RatingDb.SetActionPlans(attrs.Id, storeAtms); err != nil { + if err := self.RatingDb.SetActionPlan(ap.Id, ap); err != nil { return utils.NewErrServerError(err) } self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + attrs.Id}}) @@ -785,8 +635,8 @@ type AttrGetActionPlan struct { Id string } -func (self *ApierV1) GetActionPlan(attr AttrGetActionPlan, reply *[]engine.ActionPlans) error { - var result []engine.ActionPlans +func (self *ApierV1) GetActionPlan(attr AttrGetActionPlan, reply *[]*engine.ActionPlan) error { + var result []*engine.ActionPlan if attr.Id == "" || attr.Id == "*" { aplsMap, err := self.RatingDb.GetAllActionPlans() if err != nil { @@ -796,7 +646,7 @@ func (self *ApierV1) GetActionPlan(attr AttrGetActionPlan, reply *[]engine.Actio result = append(result, apls) } } else { - apls, err := self.RatingDb.GetActionPlans(attr.Id, false) + apls, err := self.RatingDb.GetActionPlan(attr.Id, false) if err != nil { return err } diff --git a/apier/v1/scheduler.go b/apier/v1/scheduler.go index ab4d0b9c4..a91701204 100644 --- a/apier/v1/scheduler.go +++ b/apier/v1/scheduler.go @@ -104,9 +104,9 @@ type AttrsGetScheduledActions struct { } type ScheduledActions struct { - NextRunTime time.Time - Accounts int - ActionsId, ActionPlanId, ActionPlanUuid string + NextRunTime time.Time + Accounts int + ActionsId, ActionPlanId, ActionTimingUuid string } func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply *[]*ScheduledActions) error { @@ -116,7 +116,7 @@ func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply * schedActions := make([]*ScheduledActions, 0) // needs to be initialized if remains empty scheduledActions := self.Sched.GetQueue() for _, qActions := range scheduledActions { - sas := &ScheduledActions{ActionsId: qActions.ActionsId, ActionPlanId: qActions.Id, ActionPlanUuid: qActions.Uuid, Accounts: len(qActions.AccountIds)} + sas := &ScheduledActions{ActionsId: qActions.ActionsID, ActionPlanId: qActions.GetActionPlanID(), ActionTimingUuid: qActions.Uuid, Accounts: len(qActions.GetAccountIDs())} if attrs.SearchTerm != "" && !(strings.Contains(sas.ActionPlanId, attrs.SearchTerm) || strings.Contains(sas.ActionsId, attrs.SearchTerm)) { @@ -132,7 +132,7 @@ func (self *ApierV1) GetScheduledActions(attrs AttrsGetScheduledActions, reply * // filter on account if attrs.Tenant != "" || attrs.Account != "" { found := false - for _, accID := range qActions.AccountIds { + for accID := range qActions.GetAccountIDs() { split := strings.Split(accID, utils.CONCATENATED_KEY_SEP) if len(split) != 2 { continue // malformed account id diff --git a/console/actionplan_get.go b/console/actionplan_get.go index af17a0aed..23b4b64a9 100644 --- a/console/actionplan_get.go +++ b/console/actionplan_get.go @@ -61,6 +61,6 @@ func (self *CmdGetActionPlan) PostprocessRpcParams() error { } func (self *CmdGetActionPlan) RpcResult() interface{} { - s := make([]*engine.ActionPlans, 0) + s := make([]*engine.ActionPlan, 0) return &s } diff --git a/engine/action_plan.go b/engine/action_plan.go index e5914c79c..c1e854cfa 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -32,13 +32,14 @@ const ( ) type ActionTiming struct { - Uuid string - Timing *RateInterval - ActionsID string - Weight float64 - actions Actions - accountIDs map[string]struct{} // copy of action plans accounts - stCache time.Time // cached time of the next start + Uuid string + Timing *RateInterval + ActionsID string + Weight float64 + actions Actions + accountIDs map[string]struct{} // copy of action plans accounts + actionPlanID string // the id of the belonging action plan (info only) + stCache time.Time // cached time of the next start } type Task struct { @@ -48,7 +49,6 @@ type Task struct { } type ActionPlan struct { - Uuid string // uniquely identify the timing Id string // informative purpose only AccountIDs map[string]struct{} ActionTimings []*ActionTiming @@ -254,6 +254,18 @@ func (at *ActionTiming) SetAccountIDs(accIDs map[string]struct{}) { at.accountIDs = accIDs } +func (at *ActionTiming) GetAccountIDs() map[string]struct{} { + return at.accountIDs +} + +func (at *ActionTiming) SetActionPlanID(id string) { + at.actionPlanID = id +} + +func (at *ActionTiming) GetActionPlanID() string { + return at.actionPlanID +} + func (at *ActionTiming) getActions() (as []*Action, err error) { if at.actions == nil { at.actions, err = ratingStorage.GetActions(at.ActionsID, false) @@ -364,34 +376,3 @@ func (atpl ActionTimingPriorityList) Less(i, j int) bool { func (atpl ActionTimingPriorityList) Sort() { sort.Sort(atpl) } - -// Helper to remove ActionPlan members based on specific filters, empty data means no always match -/*func RemActionPlan(apl ActionPlan, actionTimingId, accountId string) ActionPlan { - if len(actionTimingId) != 0 && apl.Uuid != actionTimingId { // No Match for ActionPlanId, no need to move further - continue - } - for idx, ats := range apl.ActionTimings { - if len(accountId) == 0 { // No account defined, considered match for complete removal - if len(ats) == 1 { // Removing last item, by init empty - return make([]*ActionPlan, 0) - } - ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] - continue - } - for iAcc, accID := range at.AccountIds { - if accID == accountId { - if len(at.AccountIds) == 1 { // Only one balance, remove complete at - if len(ats) == 1 { // Removing last item, by init empty - return make([]*ActionPlan, 0) - } - ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] - } else { - at.AccountIds[iAcc], at.AccountIds = at.AccountIds[len(at.AccountIds)-1], at.AccountIds[:len(at.AccountIds)-1] - } - // only remove the first one matching - break - } - } - } - return ats -}*/ diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index e0e24f13f..805cd1c46 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -987,7 +987,6 @@ func TestLoadActionTimings(t *testing.T) { } atm := csvr.actionPlans["MORE_MINUTES"] expected := &ActionPlan{ - Uuid: atm.Uuid, Id: "MORE_MINUTES", AccountIDs: map[string]struct{}{"vdf:minitsboy": struct{}{}}, ActionTimings: []*ActionTiming{ diff --git a/engine/tp_reader.go b/engine/tp_reader.go index abafa2811..4981d38ad 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -578,8 +578,7 @@ func (tpr *TpReader) LoadActionPlans() (err error) { var actPln *ActionPlan if actPln, exists = tpr.actionPlans[atId]; !exists { actPln = &ActionPlan{ - Uuid: utils.GenUUID(), - Id: atId, + Id: atId, } } actPln.ActionTimings = append(actPln.ActionTimings, &ActionTiming{ @@ -710,8 +709,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error } if actionPlan == nil { actionPlan = &ActionPlan{ - Uuid: utils.GenUUID(), - Id: accountAction.ActionPlanId, + Id: accountAction.ActionPlanId, } } actionPlan.ActionTimings = append(actionPlan.ActionTimings, &ActionTiming{ diff --git a/general_tests/acntacts_test.go b/general_tests/acntacts_test.go index 780560a59..4bfcd6ddd 100644 --- a/general_tests/acntacts_test.go +++ b/general_tests/acntacts_test.go @@ -74,10 +74,10 @@ ENABLE_ACNT,*enable_account,,,,,,,,,,,,,false,10` func TestAcntActsDisableAcnt(t *testing.T) { acnt1Tag := "cgrates.org:1" - at := &engine.ActionPlan{ - AccountIds: []string{acnt1Tag}, - ActionsId: "DISABLE_ACNT", + at := &engine.ActionTiming{ + ActionsID: "DISABLE_ACNT", } + at.SetAccountIDs(map[string]struct{}{acnt1Tag: struct{}{}}) if err := at.Execute(); err != nil { t.Error(err) } @@ -92,9 +92,9 @@ func TestAcntActsDisableAcnt(t *testing.T) { func TestAcntActsEnableAcnt(t *testing.T) { acnt1Tag := "cgrates.org:1" at := &engine.ActionPlan{ - AccountIds: []string{acnt1Tag}, - ActionsId: "ENABLE_ACNT", + ActionsID: "ENABLE_ACNT", } + at.SetAccountIDs(map[string]struct{}{acnt1Tag: struct{}{}}) if err := at.Execute(); err != nil { t.Error(err) } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index fecc15608..45583931f 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -34,8 +34,6 @@ type Scheduler struct { restartLoop chan bool sync.Mutex storage engine.RatingStorage - waitingReload bool - loopChecker chan int schedulerStarted bool } @@ -43,7 +41,6 @@ func NewScheduler(storage engine.RatingStorage) *Scheduler { return &Scheduler{ restartLoop: make(chan bool), storage: storage, - loopChecker: make(chan int), } } @@ -91,35 +88,13 @@ func (s *Scheduler) Loop() { } func (s *Scheduler) Reload(protect bool) { - s.Lock() - defer s.Unlock() - - if protect { - if s.waitingReload { - s.loopChecker <- 1 - } - s.waitingReload = true - go func() { - t := time.NewTicker(100 * time.Millisecond) // wait for loops before start - select { - case <-s.loopChecker: - t.Stop() // cancel reload - case <-t.C: - s.loadActionPlans() - s.restart() - t.Stop() - s.waitingReload = false - } - }() - } else { - go func() { - s.loadActionPlans() - s.restart() - }() - } + s.loadActionPlans() + s.restart() } func (s *Scheduler) loadActionPlans() { + s.Lock() + defer s.Unlock() // limit the number of concurrent tasks var limit = make(chan bool, 10) // execute existing tasks @@ -141,8 +116,6 @@ func (s *Scheduler) loadActionPlans() { } utils.Logger.Info(fmt.Sprintf(" processing %d action plans", len(actionPlans))) // recreate the queue - s.Lock() - defer s.Unlock() s.queue = engine.ActionTimingPriorityList{} for _, actionPlan := range actionPlans { for _, at := range actionPlan.ActionTimings { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index d528fc739..65892955f 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1099,11 +1099,13 @@ type AttrSetAccount struct { ActionTriggersId string AllowNegative *bool Disabled *bool + ReloadScheduler bool } type AttrRemoveAccount struct { - Tenant string - Account string + Tenant string + Account string + ReloadScheduler bool } type AttrGetSMASessions struct { From 177e2d7ebb3765269932bdb878f20a09a9a6545a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 11:08:21 +0200 Subject: [PATCH 04/15] initialize accounts map on load --- engine/tp_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 4981d38ad..23b8e4fbc 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -665,7 +665,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error // action timings if accountAction.ActionPlanId != "" { // get old userBalanceIds - var exitingAccountIds map[string]struct{} + exitingAccountIds := make(map[string]struct{}) existingActionPlan, err := tpr.ratingStorage.GetActionPlan(accountAction.ActionPlanId, true) if err == nil && existingActionPlan != nil { exitingAccountIds = existingActionPlan.AccountIDs From 9783a00b6dc6984403179596e6b33ed8c3e58c10 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 11:13:31 +0200 Subject: [PATCH 05/15] fix set account api --- apier/v1/accounts.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 39acaf5ae..2b69a62ba 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -211,6 +211,9 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error return 0, err } if _, exists := ap.AccountIDs[accID]; !exists { + if ap.AccountIDs == nil { + ap.AccountIDs = make(map[string]struct{}) + } ap.AccountIDs[accID] = struct{}{} schedulerReloadNeeded = true if err := self.RatingDb.SetActionPlan(attr.ActionPlanId, ap); err != nil { From 75a5c6ddd97877751e5d03bef2c9f7a496af4330 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 11:18:45 +0200 Subject: [PATCH 06/15] fix general tests --- general_tests/acntacts_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/general_tests/acntacts_test.go b/general_tests/acntacts_test.go index 4bfcd6ddd..d61797377 100644 --- a/general_tests/acntacts_test.go +++ b/general_tests/acntacts_test.go @@ -91,7 +91,7 @@ func TestAcntActsDisableAcnt(t *testing.T) { func TestAcntActsEnableAcnt(t *testing.T) { acnt1Tag := "cgrates.org:1" - at := &engine.ActionPlan{ + at := &engine.ActionTiming{ ActionsID: "ENABLE_ACNT", } at.SetAccountIDs(map[string]struct{}{acnt1Tag: struct{}{}}) From 785e7e9386a6193f0c37bc8f650d6499e577917a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 14:31:44 +0200 Subject: [PATCH 07/15] fixes and updated migrator --- apier/v1/apier_local_test.go | 1 - cmd/cgr-loader/migrator_rc8.go | 50 ++++++++++++++++++++++++++++++++-- engine/tp_reader.go | 2 ++ scheduler/scheduler.go | 1 + 4 files changed, 50 insertions(+), 4 deletions(-) diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 2ec712d3f..12144cfbf 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -1134,7 +1134,6 @@ func TestApierGetAccount(t *testing.T) { if !*testLocal { return } - time.Sleep(100 * time.Millisecond) // give scheduler time to react var reply *engine.Account attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} if err := rater.Call("ApierV2.GetAccount", attrs, &reply); err != nil { diff --git a/cmd/cgr-loader/migrator_rc8.go b/cmd/cgr-loader/migrator_rc8.go index 6eab368ac..672167a5f 100644 --- a/cmd/cgr-loader/migrator_rc8.go +++ b/cmd/cgr-loader/migrator_rc8.go @@ -125,6 +125,26 @@ type Action struct { Balance *Balance } +type ActionPlan struct { + Uuid string // uniquely identify the timing + Id string // informative purpose only + AccountIds []string + Timing *engine.RateInterval + Weight float64 + ActionsId string + actions Actions + stCache time.Time // cached time of the next start +} + +func (at *ActionPlan) IsASAP() bool { + if at.Timing == nil { + return false + } + return at.Timing.Timing.StartTime == utils.ASAP +} + +type ActionPlans []*ActionPlan + func (mig MigratorRC8) migrateAccounts() error { keys, err := mig.db.Cmd("KEYS", OLD_ACCOUNT_PREFIX+"*").List() if err != nil { @@ -431,10 +451,10 @@ func (mig MigratorRC8) migrateActionPlans() error { if err != nil { return err } - aplsMap := make(map[string]engine.ActionPlans, len(keys)) + aplsMap := make(map[string]ActionPlans, len(keys)) for _, key := range keys { log.Printf("Migrating action plans: %s...", key) - var apls engine.ActionPlans + var apls ActionPlans var values []byte if values, err = mig.db.Cmd("GET", key).Bytes(); err == nil { if err := mig.ms.Unmarshal(values, &apls); err != nil { @@ -456,7 +476,31 @@ func (mig MigratorRC8) migrateActionPlans() error { aplsMap[key] = apls } // write data back - for key, apl := range aplsMap { + newAplMap := make(map[string]*engine.ActionPlan) + for key, apls := range aplsMap { + for _, apl := range apls { + newApl, exists := newAplMap[key] + if !exists { + newApl = &engine.ActionPlan{ + Id: apl.Id, + AccountIDs: make(map[string]struct{}), + } + newAplMap[key] = newApl + } + if !apl.IsASAP() { + for _, accID := range apl.AccountIds { + newApl.AccountIDs[accID] = struct{}{} + } + } + newApl.ActionTimings = append(newApl.ActionTimings, &engine.ActionTiming{ + Uuid: utils.GenUUID(), + Timing: apl.Timing, + ActionsID: apl.ActionsId, + Weight: apl.Weight, + }) + } + } + for key, apl := range newAplMap { result, err := mig.ms.Marshal(apl) if err != nil { return err diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 23b8e4fbc..d5d4fc4a5 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -582,6 +582,7 @@ func (tpr *TpReader) LoadActionPlans() (err error) { } } actPln.ActionTimings = append(actPln.ActionTimings, &ActionTiming{ + Uuid: utils.GenUUID(), Weight: at.Weight, Timing: &RateInterval{ Timing: &RITiming{ @@ -713,6 +714,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error } } actionPlan.ActionTimings = append(actionPlan.ActionTimings, &ActionTiming{ + Uuid: utils.GenUUID(), Weight: at.Weight, Timing: &RateInterval{ Timing: &RITiming{ diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 45583931f..ac5e35d63 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -133,6 +133,7 @@ func (s *Scheduler) loadActionPlans() { continue } at.SetAccountIDs(actionPlan.AccountIDs) // copy the accounts + at.SetActionPlanID(actionPlan.Id) s.queue = append(s.queue, at) } From e626d585a5782c696c7231784e3a174074aa6b7a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 15:24:22 +0200 Subject: [PATCH 08/15] remove account action and compression --- cmd/cgr-loader/migrator_rc8.go | 8 +++++- engine/action.go | 48 ++++++++++++++++++++++++++++++++++ engine/action_plan.go | 6 ++++- engine/action_trigger.go | 6 ++++- engine/loader_csv_test.go | 4 ++- engine/storage_redis.go | 19 +++++++++++--- 6 files changed, 84 insertions(+), 7 deletions(-) diff --git a/cmd/cgr-loader/migrator_rc8.go b/cmd/cgr-loader/migrator_rc8.go index 672167a5f..fcddea1f7 100644 --- a/cmd/cgr-loader/migrator_rc8.go +++ b/cmd/cgr-loader/migrator_rc8.go @@ -1,6 +1,8 @@ package main import ( + "bytes" + "compress/zlib" "fmt" "log" "strings" @@ -505,7 +507,11 @@ func (mig MigratorRC8) migrateActionPlans() error { if err != nil { return err } - if err = mig.db.Cmd("SET", key, result).Err; err != nil { + var b bytes.Buffer + w := zlib.NewWriter(&b) + w.Write(result) + w.Close() + if err = mig.db.Cmd("SET", key, b.Bytes()).Err; err != nil { return err } } diff --git a/engine/action.go b/engine/action.go index a3f6a640a..f3d605ec8 100644 --- a/engine/action.go +++ b/engine/action.go @@ -129,6 +129,8 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { return mailAsync, true case SET_DDESTINATIONS: return setddestinations, true + case REMOVE_ACCOUNT: + return removeAccount, true case REMOVE_BALANCE: return removeBalance, true } @@ -528,6 +530,52 @@ func setddestinations(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actio return nil } +func removeAccount(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error { + var accID string + if ub != nil { + accID = ub.Id + } else { + accountInfo := struct { + Tenant string + Account string + }{} + if a.ExtraParameters != "" { + if err := json.Unmarshal([]byte(a.ExtraParameters), &accountInfo); err != nil { + return err + } + } + accID = utils.AccountKey(accountInfo.Tenant, accountInfo.Account) + } + if accID == "" { + return utils.ErrInvalidKey + } + if err := accountingStorage.RemoveAccount(accID); err != nil { + utils.Logger.Err(fmt.Sprintf("Could not remove account Id: %s: %v", accID, err)) + return err + } + // clean the account id from all action plans + allAPs, err := ratingStorage.GetAllActionPlans() + if err != nil && err != utils.ErrNotFound { + utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accID, err)) + return err + } + for key, ap := range allAPs { + if _, exists := ap.AccountIDs[accID]; !exists { + _, err := Guardian.Guard(func() (interface{}, error) { + // save action plan + ratingStorage.SetActionPlan(key, ap) + // cache + ratingStorage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}}) + return 0, nil + }, 0, utils.ACTION_PLAN_PREFIX) + if err != nil { + return err + } + } + } + return nil +} + func removeBalance(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error { if _, exists := ub.BalanceMap[a.BalanceType]; !exists { return utils.ErrNotFound diff --git a/engine/action_plan.go b/engine/action_plan.go index c1e854cfa..1acea3887 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -289,6 +289,7 @@ func (at *ActionTiming) Execute() (err error) { return 0, err } transactionFailed := false + removeAccountActionFound := false for _, a := range aac { if ub.Disabled && a.ActionType != ENABLE_ACCOUNT { continue // disabled acocunts are not removed from action plan @@ -311,8 +312,11 @@ func (at *ActionTiming) Execute() (err error) { transactionFailed = true break } + if a.ActionType == REMOVE_ACCOUNT { + removeAccountActionFound = true + } } - if !transactionFailed { + if !transactionFailed && !removeAccountActionFound { accountingStorage.SetAccount(ub) } return 0, nil diff --git a/engine/action_trigger.go b/engine/action_trigger.go index 9c3d16178..286736c81 100644 --- a/engine/action_trigger.go +++ b/engine/action_trigger.go @@ -73,6 +73,7 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro } at.Executed = true transactionFailed := false + removeAccountActionFound := false for _, a := range aac { if a.Balance == nil { a.Balance = &Balance{} @@ -90,11 +91,14 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro transactionFailed = false break } + if a.ActionType == REMOVE_ACCOUNT { + removeAccountActionFound = true + } } if transactionFailed || at.Recurrent { at.Executed = false } - if !transactionFailed && ub != nil { + if !transactionFailed && ub != nil && !removeAccountActionFound { storageLogger.LogActionTrigger(ub.Id, utils.RATER_SOURCE, at, aac) accountingStorage.SetAccount(ub) } diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index 805cd1c46..ab922c9e0 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -991,6 +991,7 @@ func TestLoadActionTimings(t *testing.T) { AccountIDs: map[string]struct{}{"vdf:minitsboy": struct{}{}}, ActionTimings: []*ActionTiming{ &ActionTiming{ + Uuid: atm.ActionTimings[0].Uuid, Timing: &RateInterval{ Timing: &RITiming{ Years: utils.Years{2012}, @@ -1004,6 +1005,7 @@ func TestLoadActionTimings(t *testing.T) { ActionsID: "MINI", }, &ActionTiming{ + Uuid: atm.ActionTimings[1].Uuid, Timing: &RateInterval{ Timing: &RITiming{ Years: utils.Years{2012}, @@ -1019,7 +1021,7 @@ func TestLoadActionTimings(t *testing.T) { }, } if !reflect.DeepEqual(atm, expected) { - t.Errorf("Error loading action timing:\n%+v", atm.ActionTimings[1].Timing) + t.Errorf("Error loading action timing:\n%+v", atm.ActionTimings[1]) } } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 4f252548b..8a857d8a8 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -903,9 +903,18 @@ func (rs *RedisStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { - err = rs.ms.Unmarshal(values, &ats) + b := bytes.NewBuffer(values) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + err = rs.ms.Unmarshal(out, &ats) cache2go.Cache(key, ats) } return @@ -922,7 +931,11 @@ func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan) (err error) { if err != nil { return err } - return rs.db.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, result).Err + var b bytes.Buffer + w := zlib.NewWriter(&b) + w.Write(result) + w.Close() + return rs.db.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, b.Bytes()).Err } func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) { From 37c60afeed3edb4e8a97da50c75473675a3b0320 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 15:51:45 +0200 Subject: [PATCH 09/15] integration tests fixes --- engine/loader_local_test.go | 3 +++ engine/storage_redis.go | 12 ++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/engine/loader_local_test.go b/engine/loader_local_test.go index 4c338d503..9af365122 100644 --- a/engine/loader_local_test.go +++ b/engine/loader_local_test.go @@ -395,6 +395,9 @@ func TestMatchLoadCsvWithStorRating(t *testing.T) { for _, key := range keysCsv { var refVal []byte for idx, rs := range []*RedisStorage{rsCsv, rsStor, rsApier} { + if key == utils.TASKS_KEY { + continue + } qVal, err := rs.db.Cmd("GET", key).Bytes() if err != nil { t.Fatalf("Run: %d, could not retrieve key %s, error: %s", idx, key, err.Error()) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 8a857d8a8..990ca2cde 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -904,7 +904,7 @@ func (rs *RedisStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl } var values []byte if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { - b := bytes.NewBuffer(values) + /*b := bytes.NewBuffer(values) r, err := zlib.NewReader(b) if err != nil { return nil, err @@ -913,8 +913,8 @@ func (rs *RedisStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl if err != nil { return nil, err } - r.Close() - err = rs.ms.Unmarshal(out, &ats) + r.Close()*/ + err = rs.ms.Unmarshal(values, &ats) cache2go.Cache(key, ats) } return @@ -932,10 +932,10 @@ func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan) (err error) { return err } var b bytes.Buffer - w := zlib.NewWriter(&b) + /*w := zlib.NewWriter(&b) w.Write(result) - w.Close() - return rs.db.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, b.Bytes()).Err + w.Close()*/ + return rs.db.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, result).Err } func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) { From ef5c6c78a7f7bf41ac71e1fd8def17da73162e5c Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 16:21:27 +0200 Subject: [PATCH 10/15] action plan interation tests --- engine/loader_local_test.go | 3 ++- engine/storage_redis.go | 12 ++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/engine/loader_local_test.go b/engine/loader_local_test.go index 9af365122..0320f6840 100644 --- a/engine/loader_local_test.go +++ b/engine/loader_local_test.go @@ -21,6 +21,7 @@ package engine import ( "flag" "path" + "strings" "testing" "github.com/cgrates/cgrates/config" @@ -395,7 +396,7 @@ func TestMatchLoadCsvWithStorRating(t *testing.T) { for _, key := range keysCsv { var refVal []byte for idx, rs := range []*RedisStorage{rsCsv, rsStor, rsApier} { - if key == utils.TASKS_KEY { + if key == utils.TASKS_KEY || strings.HasPrefix(key, utils.ACTION_PLAN_PREFIX) { // action plans are not consistent continue } qVal, err := rs.db.Cmd("GET", key).Bytes() diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 990ca2cde..8a857d8a8 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -904,7 +904,7 @@ func (rs *RedisStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl } var values []byte if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { - /*b := bytes.NewBuffer(values) + b := bytes.NewBuffer(values) r, err := zlib.NewReader(b) if err != nil { return nil, err @@ -913,8 +913,8 @@ func (rs *RedisStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl if err != nil { return nil, err } - r.Close()*/ - err = rs.ms.Unmarshal(values, &ats) + r.Close() + err = rs.ms.Unmarshal(out, &ats) cache2go.Cache(key, ats) } return @@ -932,10 +932,10 @@ func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan) (err error) { return err } var b bytes.Buffer - /*w := zlib.NewWriter(&b) + w := zlib.NewWriter(&b) w.Write(result) - w.Close()*/ - return rs.db.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, result).Err + w.Close() + return rs.db.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, b.Bytes()).Err } func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) { From c18d2ce2baf5e25251612d4a9ac20e0c0b6d06a3 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 17:12:36 +0200 Subject: [PATCH 11/15] minor fixes --- apier/v1/apier_local_test.go | 5 ++--- cmd/cgr-loader/migrator_rc8.go | 4 +++- scheduler/scheduler.go | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 12144cfbf..794445581 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -740,7 +740,6 @@ func TestApierSetRatingProfile(t *testing.T) { if err := rater.Call("ApierV1.SetRatingProfile", rpf, &reply); err != nil { t.Error("Unexpected result on duplication: ", err.Error()) } - time.Sleep(10 * time.Millisecond) // Give time for cache reload // Make sure rates were loaded for account dan // Test here ResponderGetCost tStart, _ := utils.ParseDate("2013-08-07T17:30:00Z") @@ -1074,7 +1073,7 @@ func TestApierSetAccount(t *testing.T) { return } reply := "" - attrs := &utils.AttrSetAccount{Tenant: "cgrates.org", Account: "dan7", ActionPlanId: "ATMS_1"} + attrs := &utils.AttrSetAccount{Tenant: "cgrates.org", Account: "dan7", ActionPlanId: "ATMS_1", ReloadScheduler: true} if err := rater.Call("ApierV1.SetAccount", attrs, &reply); err != nil { t.Error("Got error on ApierV1.SetAccount: ", err.Error()) } else if reply != "OK" { @@ -1174,7 +1173,7 @@ func TestApierTriggersExecute(t *testing.T) { return } reply := "" - attrs := &utils.AttrSetAccount{Tenant: "cgrates.org", Account: "dan8"} + attrs := &utils.AttrSetAccount{Tenant: "cgrates.org", Account: "dan8", ReloadScheduler: true} if err := rater.Call("ApierV1.SetAccount", attrs, &reply); err != nil { t.Error("Got error on ApierV1.SetAccount: ", err.Error()) } else if reply != "OK" { diff --git a/cmd/cgr-loader/migrator_rc8.go b/cmd/cgr-loader/migrator_rc8.go index fcddea1f7..5f5b29671 100644 --- a/cmd/cgr-loader/migrator_rc8.go +++ b/cmd/cgr-loader/migrator_rc8.go @@ -491,7 +491,9 @@ func (mig MigratorRC8) migrateActionPlans() error { } if !apl.IsASAP() { for _, accID := range apl.AccountIds { - newApl.AccountIDs[accID] = struct{}{} + if _, exists := newApl.AccountIDs[accID]; !exists { + newApl.AccountIDs[accID] = struct{}{} + } } } newApl.ActionTimings = append(newApl.ActionTimings, &engine.ActionTiming{ diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index ac5e35d63..30afa0814 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -105,6 +105,7 @@ func (s *Scheduler) loadActionPlans() { } limit <- true go func() { + utils.Logger.Info(fmt.Sprintf(" executing task %s on account %s", task.ActionsID, task.AccountID)) task.Execute() <-limit }() From 115fe8a1ade317c9ed867b156636b8ab8a0e92fc Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 17:48:03 +0200 Subject: [PATCH 12/15] write tasks on loadaccountactions --- apier/v1/apier.go | 2 +- engine/tp_reader.go | 15 +++++++++++++++ scheduler/scheduler.go | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index e1e4e7bfb..140c6eb1c 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -806,7 +806,7 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str } // ToDo: Get the action keys loaded by dbReader so we reload only these in cache // Need to do it before scheduler otherwise actions to run will be unknown - if err := self.RatingDb.CacheRatingPrefixes(utils.DERIVEDCHARGERS_PREFIX, utils.ACTION_PREFIX, utils.SHARED_GROUP_PREFIX); err != nil { + if err := self.RatingDb.CacheRatingPrefixes(utils.DERIVEDCHARGERS_PREFIX, utils.ACTION_PREFIX, utils.SHARED_GROUP_PREFIX, utils.ACTION_PLAN_PREFIX); err != nil { return err } if self.Sched != nil { diff --git a/engine/tp_reader.go b/engine/tp_reader.go index d5d4fc4a5..8012e7ad0 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -732,6 +732,21 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error actionPlan.AccountIDs = exitingAccountIds } + // write tasks + for _, at := range actionPlan.ActionTimings { + if at.IsASAP() { + for accID := range actionPlan.AccountIDs { + t := &Task{ + Uuid: utils.GenUUID(), + AccountID: accID, + ActionsID: at.ActionsID, + } + if err = tpr.ratingStorage.PushTask(t); err != nil { + return err + } + } + } + } // write action plan err = tpr.ratingStorage.SetActionPlan(accountAction.ActionPlanId, actionPlan) if err != nil { diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 30afa0814..e8c6896a9 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -96,7 +96,7 @@ func (s *Scheduler) loadActionPlans() { s.Lock() defer s.Unlock() // limit the number of concurrent tasks - var limit = make(chan bool, 10) + limit := make(chan bool, 10) // execute existing tasks for { task, err := s.storage.PopTask() From cfe6fd67a06a4e81cc337c1d17015887f65c29fe Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 18:04:09 +0200 Subject: [PATCH 13/15] updated dependencies --- glide.lock | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/glide.lock b/glide.lock index 032aebf5d..97ec30d38 100644 --- a/glide.lock +++ b/glide.lock @@ -1,9 +1,8 @@ hash: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 -updated: 2015-12-16T09:32:54.975724404+02:00 +updated: 2015-12-18T18:02:12.669823411+02:00 imports: - name: github.com/cenkalti/hub version: 57d753b5f4856e77b3cf8ecce78c97215a7d324d - repo: https://github.com/cenkalti/hub - name: github.com/cenkalti/rpc2 version: 2d1be381ce47537e9e076b2b76dc70933162e4e9 - name: github.com/cgrates/fsock @@ -29,10 +28,9 @@ imports: - name: github.com/gorhill/cronexpr version: a557574d6c024ed6e36acc8b610f5f211c91568a - name: github.com/jinzhu/gorm - version: c6a22c50962028255a718f22fe7e8959e8c67884 + version: d209be3138acbe304daffee637bc495499c1e70e - name: github.com/jinzhu/inflection version: 3272df6c21d04180007eb3349844c89a3856bc25 - repo: https://github.com/jinzhu/inflection - name: github.com/kr/pty version: f7ee69f31298ecbe5d2b349c711e2547a617d398 - name: github.com/lib/pq @@ -43,7 +41,7 @@ imports: - /pool - redis - name: github.com/peterh/liner - version: 4d47685ab2fd2dbb46c66b831344d558bc4be5b9 + version: 3f1c20449d1836aa4cbe38731b96f95cdf89634d - name: github.com/ugorji/go version: cd43bdd6be4b5675a0d1e75c4af55ee1dc0d9c5e subpackages: @@ -51,18 +49,17 @@ imports: - name: golang.org/x/crypto version: f18420efc3b4f8e9f3d51f6bd2476e92c46260e9 - name: golang.org/x/net - version: 548f7bf20c8aae87fecd9aa09cc89065451e6271 + version: 28273ec927bee3bea305f112fc28ceee575ea893 subpackages: - /websocket - name: golang.org/x/text - version: 92ca7bbb695e2e9675f1d731fa85760f95d2c0df + version: cf4986612c83df6c55578ba198316d1684a9a287 - name: gopkg.in/fsnotify.v1 - version: 2cdd39bd6129c6a49c74fb07fb9d77ba1271c572 + version: 508915b7500b6e42a87132e4afeb4729cebc7cbb - name: gopkg.in/mgo.v2 version: e30de8ac9ae3b30df7065f766c71f88bba7d4e49 subpackages: - bson - name: gopkg.in/tomb.v2 version: 14b3d72120e8d10ea6e6b7f87f7175734b1faab8 - repo: https://gopkg.in/tomb.v2 devImports: [] From 5eb652adb383c1c79c831a315ddc0c0f65c12115 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 18:30:45 +0200 Subject: [PATCH 14/15] mongo do not sanitize nil map --- engine/storage_mongo.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index 3f33bdf61..2febd4cc1 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -1044,13 +1044,17 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl ats = kv.Value cache2go.Cache(utils.ACTION_PLAN_PREFIX+key, ats) } - ats.AccountIDs = utils.YesDots(ats.AccountIDs) + if ats.AccountIDs != nil { + ats.AccountIDs = utils.YesDots(ats.AccountIDs) + } return } func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan) error { // clean dots from account ids map - ats.AccountIDs = utils.NoDots(ats.AccountIDs) + if ats.AccountIDs != nil { + ats.AccountIDs = utils.NoDots(ats.AccountIDs) + } if len(ats.ActionTimings) == 0 { cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) err := ms.db.C(colApl).Remove(bson.M{"key": key}) From c561db9c82f2ed54e5ae731db6837bfec24530bf Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 18 Dec 2015 18:53:32 +0200 Subject: [PATCH 15/15] more mongo nil checking --- engine/storage_mongo.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index 2febd4cc1..bd930f3f2 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -1044,7 +1044,7 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl ats = kv.Value cache2go.Cache(utils.ACTION_PLAN_PREFIX+key, ats) } - if ats.AccountIDs != nil { + if ats != nil && ats.AccountIDs != nil { ats.AccountIDs = utils.YesDots(ats.AccountIDs) } return @@ -1052,7 +1052,7 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan) error { // clean dots from account ids map - if ats.AccountIDs != nil { + if ats != nil && ats.AccountIDs != nil { ats.AccountIDs = utils.NoDots(ats.AccountIDs) } if len(ats.ActionTimings) == 0 {