From d09ae04f2ac0055bf3b6fcd1954d63d7443beffa Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sat, 30 Jan 2016 18:45:54 +0200 Subject: [PATCH] fix for removing account from action plan --- apier/v1/accounts.go | 81 ++++++++++++++++++++++++---------- apier/v1/apier.go | 2 +- apier/v1/apier_local_test.go | 5 +++ apier/v2/accounts.go | 8 +++- engine/action.go | 28 +++++++----- engine/storage_interface.go | 2 +- engine/storage_map.go | 18 ++++---- engine/storage_mongo_datadb.go | 18 ++++---- engine/storage_redis.go | 18 ++++---- engine/tp_reader.go | 4 +- 10 files changed, 120 insertions(+), 64 deletions(-) diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 15f660133..e00faee96 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -95,7 +95,7 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e if attrs.ActionPlanId != "" { // delete the entire action plan ap.ActionTimings = nil // will delete the action plan - return 0, self.RatingDb.SetActionPlan(ap.Id, ap) + return 0, self.RatingDb.SetActionPlan(ap.Id, ap, true) } if attrs.ActionTimingId != "" { // delete only a action timing from action plan @@ -106,13 +106,13 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e break } } - return 0, self.RatingDb.SetActionPlan(ap.Id, ap) + return 0, self.RatingDb.SetActionPlan(ap.Id, ap, true) } if attrs.Tenant != "" && attrs.Account != "" { accID := utils.AccountKey(attrs.Tenant, attrs.Account) delete(ap.AccountIDs, accID) - return 0, self.RatingDb.SetActionPlan(ap.Id, ap) + return 0, self.RatingDb.SetActionPlan(ap.Id, ap, true) } // update cache @@ -148,8 +148,30 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error } if len(attr.ActionPlanId) != 0 { _, err := engine.Guardian.Guard(func() (interface{}, error) { + // clean previous action plans + actionPlansMap, err := self.RatingDb.GetAllActionPlans() + if err != nil { + return 0, err + } + var dirtyAps []string + for actionPlanID, ap := range actionPlansMap { + if actionPlanID == attr.ActionPlanId { + // don't remove it if it's the current one + continue + } + if _, exists := ap.AccountIDs[accID]; exists { + delete(ap.AccountIDs, accID) + dirtyAps = append(dirtyAps, utils.ACTION_PLAN_PREFIX+actionPlanID) + } + } + + if len(dirtyAps) > 0 { + // update cache + self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: dirtyAps}) + schedulerReloadNeeded = true + } + var ap *engine.ActionPlan - var err error ap, err = self.RatingDb.GetActionPlan(attr.ActionPlanId, false) if err != nil { return 0, err @@ -173,7 +195,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) error } } } - if err := self.RatingDb.SetActionPlan(attr.ActionPlanId, ap); err != nil { + if err := self.RatingDb.SetActionPlan(attr.ActionPlanId, ap, false); err != nil { return 0, err } // update cache @@ -223,39 +245,50 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string) if missing := utils.MissingStructFields(&attr, []string{"Tenant", "Account"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } + dirtyActionPlans := make(map[string]*engine.ActionPlan) accID := utils.AccountKey(attr.Tenant, attr.Account) - var schedulerReloadNeeded bool _, err := engine.Guardian.Guard(func() (interface{}, error) { // remove it from all action plans - allAPs, err := self.RatingDb.GetAllActionPlans() - if err != nil && err != utils.ErrNotFound { - return 0, err - } - for key, ap := range allAPs { - if _, exists := ap.AccountIDs[accID]; !exists { - schedulerReloadNeeded = true - _, err := engine.Guardian.Guard(func() (interface{}, error) { - // save action plan - self.RatingDb.SetActionPlan(key, ap) - // cache - self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}}) - return 0, nil - }, 0, utils.ACTION_PLAN_PREFIX) - if err != nil { - return 0, err + _, err := engine.Guardian.Guard(func() (interface{}, error) { + actionPlansMap, err := self.RatingDb.GetAllActionPlans() + if err != nil { + return 0, err + } + + for actionPlanID, ap := range actionPlansMap { + if _, exists := ap.AccountIDs[accID]; exists { + delete(ap.AccountIDs, accID) + dirtyActionPlans[actionPlanID] = ap } } + + var actionPlansCacheIds []string + for actionPlanID, ap := range dirtyActionPlans { + if err := self.RatingDb.SetActionPlan(actionPlanID, ap, true); err != nil { + return 0, err + } + actionPlansCacheIds = append(actionPlansCacheIds, utils.ACTION_PLAN_PREFIX+actionPlanID) + } + if len(actionPlansCacheIds) > 0 { + // update cache + self.RatingDb.CacheRatingPrefixValues(map[string][]string{ + utils.ACTION_PLAN_PREFIX: actionPlansCacheIds}) + } + return 0, nil + }, 0, utils.ACTION_PLAN_PREFIX) + if err != nil { + return 0, err } + if err := self.AccountDb.RemoveAccount(accID); err != nil { return 0, err } return 0, nil }, 0, accID) - // FIXME: remove from all actionplans? if err != nil { return utils.NewErrServerError(err) } - if schedulerReloadNeeded { + if attr.ReloadScheduler && len(dirtyActionPlans) > 0 { // reload scheduler if self.Sched != nil { self.Sched.Reload(true) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index dcdfeda45..2b4f4b576 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -624,7 +624,7 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) error ActionsID: apiAtm.ActionsId, }) } - if err := self.RatingDb.SetActionPlan(ap.Id, ap); err != nil { + if err := self.RatingDb.SetActionPlan(ap.Id, ap, true); err != nil { return utils.NewErrServerError(err) } self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + attrs.Id}}) diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 264dedbfd..e78886dee 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -1057,6 +1057,11 @@ func TestApierRemAccountActionTriggers(t *testing.T) { } var rmReply string rmReq := AttrRemoveAccountActionTriggers{Tenant: "cgrates.org", Account: "dan2", UniqueID: reply[0].UniqueID} + if err := rater.Call("ApierV1.ResetAccountActionTriggers", rmReq, &rmReply); err != nil { + t.Error("Got error on ApierV1.ResetActionTiming: ", err.Error()) + } else if rmReply != OK { + t.Error("Unexpected answer received", rmReply) + } if err := rater.Call("ApierV1.RemoveAccountActionTriggers", rmReq, &rmReply); err != nil { t.Error("Got error on ApierV1.RemoveActionTiming: ", err.Error()) } else if rmReply != OK { diff --git a/apier/v2/accounts.go b/apier/v2/accounts.go index b852b212d..e0e0f11b9 100644 --- a/apier/v2/accounts.go +++ b/apier/v2/accounts.go @@ -148,12 +148,16 @@ func (self *ApierV2) SetAccount(attr AttrSetAccount, reply *string) error { } } } + var actionPlansCacheIds []string for actionPlanID, ap := range dirtyActionPlans { - if err := self.RatingDb.SetActionPlan(actionPlanID, ap); err != nil { + if err := self.RatingDb.SetActionPlan(actionPlanID, ap, true); err != nil { return 0, err } + actionPlansCacheIds = append(actionPlansCacheIds, utils.ACTION_PLAN_PREFIX+actionPlanID) + } + if len(actionPlansCacheIds) > 0 { // update cache - self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + actionPlanID}}) + self.RatingDb.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: actionPlansCacheIds}) } return 0, nil }, 0, utils.ACTION_PLAN_PREFIX) diff --git a/engine/action.go b/engine/action.go index eac9cb299..0134cbc3c 100644 --- a/engine/action.go +++ b/engine/action.go @@ -559,20 +559,28 @@ func removeAccountAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Ac utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accID, err)) return err } - for key, ap := range allAPs { - if _, exists := ap.AccountIDs[accID]; !exists { - _, err := Guardian.Guard(func() (interface{}, error) { + var dirtyAps []string + _, err = Guardian.Guard(func() (interface{}, error) { + for key, ap := range allAPs { + if _, exists := ap.AccountIDs[accID]; !exists { // save action plan - ratingStorage.SetActionPlan(key, ap) - // cache - ratingStorage.CacheRatingPrefixValues(map[string][]string{utils.ACTION_PLAN_PREFIX: []string{utils.ACTION_PLAN_PREFIX + key}}) - return 0, nil - }, 0, utils.ACTION_PLAN_PREFIX) - if err != nil { - return err + delete(ap.AccountIDs, key) + ratingStorage.SetActionPlan(key, ap, true) + dirtyAps = append(dirtyAps, utils.ACTION_PLAN_PREFIX+key) } } + if len(dirtyAps) > 0 { + // cache + ratingStorage.CacheRatingPrefixValues(map[string][]string{ + utils.ACTION_PLAN_PREFIX: dirtyAps}) + } + return 0, nil + + }, 0, utils.ACTION_PLAN_PREFIX) + if err != nil { + return err } + // TODO: no scheduler reload? return nil } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index fd5781dc5..8d082ae14 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -64,7 +64,7 @@ type RatingStorage interface { GetActionTriggers(string) (ActionTriggers, error) SetActionTriggers(string, ActionTriggers) error GetActionPlan(string, bool) (*ActionPlan, error) - SetActionPlan(string, *ActionPlan) error + SetActionPlan(string, *ActionPlan, bool) error GetAllActionPlans() (map[string]*ActionPlan, error) PushTask(*Task) error PopTask() (*Task, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 84f5547cf..d1e788745 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -664,20 +664,22 @@ func (ms *MapStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan return } -func (ms *MapStorage) SetActionPlan(key string, ats *ActionPlan) (err error) { +func (ms *MapStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool) (err error) { if len(ats.ActionTimings) == 0 { // delete the key delete(ms.dict, utils.ACTION_PLAN_PREFIX+key) cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) return } - // get existing action plan to merge the account ids - if existingAts, _ := ms.GetActionPlan(key, true); existingAts != nil { - if ats.AccountIDs == nil && len(existingAts.AccountIDs) > 0 { - ats.AccountIDs = make(utils.StringMap) - } - for accID := range existingAts.AccountIDs { - ats.AccountIDs[accID] = true + if !overwrite { + // get existing action plan to merge the account ids + if existingAts, _ := ms.GetActionPlan(key, true); existingAts != nil { + if ats.AccountIDs == nil && len(existingAts.AccountIDs) > 0 { + ats.AccountIDs = make(utils.StringMap) + } + for accID := range existingAts.AccountIDs { + ats.AccountIDs[accID] = true + } } } result, err := ms.ms.Marshal(&ats) diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 32dd40197..85cd4f7ef 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1156,7 +1156,7 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl return } -func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan) error { +func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool) error { // clean dots from account ids map if len(ats.ActionTimings) == 0 { cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) @@ -1166,13 +1166,15 @@ func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan) error { } return nil } - // get existing action plan to merge the account ids - if existingAts, _ := ms.GetActionPlan(key, true); existingAts != nil { - if ats.AccountIDs == nil && len(existingAts.AccountIDs) > 0 { - ats.AccountIDs = make(utils.StringMap) - } - for accID := range existingAts.AccountIDs { - ats.AccountIDs[accID] = true + if !overwrite { + // get existing action plan to merge the account ids + if existingAts, _ := ms.GetActionPlan(key, true); existingAts != nil { + if ats.AccountIDs == nil && len(existingAts.AccountIDs) > 0 { + ats.AccountIDs = make(utils.StringMap) + } + for accID := range existingAts.AccountIDs { + ats.AccountIDs[accID] = true + } } } result, err := ms.ms.Marshal(ats) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 9ea09ccfc..9c9fe6a51 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -920,20 +920,22 @@ func (rs *RedisStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl return } -func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan) (err error) { +func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool) (err error) { if len(ats.ActionTimings) == 0 { // delete the key err = rs.db.Cmd("DEL", utils.ACTION_PLAN_PREFIX+key).Err cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) return err } - // get existing action plan to merge the account ids - if existingAts, _ := rs.GetActionPlan(key, true); existingAts != nil { - if ats.AccountIDs == nil && len(existingAts.AccountIDs) > 0 { - ats.AccountIDs = make(utils.StringMap) - } - for accID := range existingAts.AccountIDs { - ats.AccountIDs[accID] = true + if !overwrite { + // get existing action plan to merge the account ids + if existingAts, _ := rs.GetActionPlan(key, true); existingAts != nil { + if ats.AccountIDs == nil && len(existingAts.AccountIDs) > 0 { + ats.AccountIDs = make(utils.StringMap) + } + for accID := range existingAts.AccountIDs { + ats.AccountIDs[accID] = true + } } } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 9b019074e..444a9bc57 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -760,7 +760,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *TpAccountAction) error } } // write action plan - err = tpr.ratingStorage.SetActionPlan(accountAction.ActionPlanId, actionPlan) + err = tpr.ratingStorage.SetActionPlan(accountAction.ActionPlanId, actionPlan, false) if err != nil { return errors.New(err.Error() + " (SetActionPlan): " + accountAction.ActionPlanId) } @@ -1399,7 +1399,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) { } } } - err = tpr.ratingStorage.SetActionPlan(k, ap) + err = tpr.ratingStorage.SetActionPlan(k, ap, false) if err != nil { return err }