From a6aa435a8d2e640a0d08c1ee337cde7142d7a285 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 3 Jun 2021 17:09:01 +0300 Subject: [PATCH] Updated AcionPlans --- apier/v1/accounts.go | 12 +- apier/v1/apier.go | 20 ++- apier/v1/replicator.go | 12 +- apier/v1/scheduler.go | 2 +- apier/v2/accounts.go | 6 +- apier/v2/apierv2_it_test.go | 22 +-- engine/account_test.go | 4 +- engine/action.go | 4 +- engine/action_plan.go | 3 + engine/actions_test.go | 4 +- engine/calldesc_test.go | 76 ++++---- engine/datamanager.go | 191 ++++++++++++++++---- engine/loader_it_test.go | 2 +- engine/onstor_it_test.go | 62 ++++++- engine/storage_interface.go | 16 +- engine/storage_internal_datadb.go | 282 ++++++++---------------------- engine/storage_mongo_datadb.go | 271 +++++++--------------------- engine/storage_redis.go | 203 ++------------------- engine/tpreader.go | 11 +- migrator/action_plan.go | 2 +- migrator/action_plan_it_test.go | 6 +- 21 files changed, 467 insertions(+), 744 deletions(-) diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 17f369508..8b17a131c 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -43,13 +43,13 @@ func (api *APIerSv1) GetAccountActionPlan(attrs utils.TenantAccount, reply *[]*A } acntID := utils.ConcatenatedKey(attrs.Tenant, attrs.Account) acntATsIf, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := api.DataManager.GetAccountActionPlans(acntID, false, utils.NonTransactional) + acntAPids, err := api.DataManager.GetAccountActionPlans(acntID, true, true, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return nil, utils.NewErrServerError(err) } var acntAPs []*engine.ActionPlan for _, apID := range acntAPids { - if ap, err := api.DataManager.GetActionPlan(apID, false, utils.NonTransactional); err != nil { + if ap, err := api.DataManager.GetActionPlan(apID, true, true, utils.NonTransactional); err != nil { return nil, err } else if ap != nil { acntAPs = append(acntAPs, ap) @@ -99,7 +99,7 @@ func (api *APIerSv1) RemoveActionTiming(attrs AttrRemoveActionTiming, reply *str var remAcntAPids []string // list of accounts who's indexes need modification _, err = guardian.Guardian.Guard(func() (interface{}, error) { - ap, err := api.DataManager.GetActionPlan(attrs.ActionPlanId, false, utils.NonTransactional) + ap, err := api.DataManager.GetActionPlan(attrs.ActionPlanId, true, true, utils.NonTransactional) if err != nil { return 0, err } else if ap == nil { @@ -199,7 +199,7 @@ func (api *APIerSv1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e } if attr.ActionPlanID != "" { _, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := api.DataManager.GetAccountActionPlans(accID, false, utils.NonTransactional) + acntAPids, err := api.DataManager.GetAccountActionPlans(accID, true, true, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return 0, err } @@ -210,7 +210,7 @@ func (api *APIerSv1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e i++ // increase index since we don't remove from slice continue } - ap, err := api.DataManager.GetActionPlan(apID, false, utils.NonTransactional) + ap, err := api.DataManager.GetActionPlan(apID, true, true, utils.NonTransactional) if err != nil { return 0, err } @@ -219,7 +219,7 @@ func (api *APIerSv1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e acntAPids = append(acntAPids[:i], acntAPids[i+1:]...) // remove the item from the list so we can overwrite the real list } if !utils.IsSliceMember(acntAPids, attr.ActionPlanID) { // Account not yet attached to action plan, do it here - ap, err := api.DataManager.GetActionPlan(attr.ActionPlanID, false, utils.NonTransactional) + ap, err := api.DataManager.GetActionPlan(attr.ActionPlanID, true, true, utils.NonTransactional) if err != nil { return 0, err } diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 24d9c66a8..fddf46e6e 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -107,7 +107,7 @@ func (apiv1 *APIerSv1) GetReverseDestination(prefix string, reply *[]string) (er // ComputeReverseDestinations will rebuild complete reverse destinations data func (apiv1 *APIerSv1) ComputeReverseDestinations(ignr string, reply *string) (err error) { - if err = apiv1.DataManager.DataDB().RebuildReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil { + if err = apiv1.DataManager.RebuildReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil { return } *reply = utils.OK @@ -115,12 +115,16 @@ func (apiv1 *APIerSv1) ComputeReverseDestinations(ignr string, reply *string) (e } // ComputeAccountActionPlans will rebuild complete reverse accountActions data -func (apiv1 *APIerSv1) ComputeAccountActionPlans(ignr string, reply *string) (err error) { - if err = apiv1.DataManager.DataDB().RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil { +func (apiv1 *APIerSv1) ComputeAccountActionPlans(tnt *utils.TenantWithArgDispatcher, reply *string) (err error) { + if err = apiv1.DataManager.RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil { return } - *reply = utils.OK - return + return apiv1.ConnMgr.Call(apiv1.Config.ApierCfg().CachesConns, nil, + utils.CacheSv1Clear, &utils.AttrCacheIDsWithArgDispatcher{ + ArgDispatcher: tnt.ArgDispatcher, + CacheIDs: []string{utils.CacheAccountActionPlans}, + TenantArg: *tnt.TenantArg, + }, reply) } func (apiv1 *APIerSv1) GetSharedGroup(sgId string, reply *engine.SharedGroup) error { @@ -659,7 +663,7 @@ func (apiv1 *APIerSv1) SetActionPlan(attrs AttrSetActionPlan, reply *string) (er } _, err = guardian.Guardian.Guard(func() (interface{}, error) { var prevAccountIDs utils.StringMap - if prevAP, err := apiv1.DataManager.GetActionPlan(attrs.Id, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { + if prevAP, err := apiv1.DataManager.GetActionPlan(attrs.Id, true, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { return 0, utils.NewErrServerError(err) } else if err == nil && !attrs.Overwrite { return 0, utils.ErrExists @@ -785,7 +789,7 @@ func (apiv1 *APIerSv1) GetActionPlan(attr AttrGetActionPlan, reply *[]*engine.Ac result = append(result, apls) } } else { - apls, err := apiv1.DataManager.GetActionPlan(attr.ID, false, utils.NonTransactional) + apls, err := apiv1.DataManager.GetActionPlan(attr.ID, true, true, utils.NonTransactional) if err != nil { return err } @@ -801,7 +805,7 @@ func (apiv1 *APIerSv1) RemoveActionPlan(attr AttrGetActionPlan, reply *string) ( } if _, err = guardian.Guardian.Guard(func() (interface{}, error) { var prevAccountIDs utils.StringMap - if prevAP, err := apiv1.DataManager.GetActionPlan(attr.ID, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { + if prevAP, err := apiv1.DataManager.GetActionPlan(attr.ID, true, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { return 0, err } else if prevAP != nil { prevAccountIDs = prevAP.AccountIDs diff --git a/apier/v1/replicator.go b/apier/v1/replicator.go index c256cea88..961026064 100644 --- a/apier/v1/replicator.go +++ b/apier/v1/replicator.go @@ -179,7 +179,7 @@ func (rplSv1 *ReplicatorSv1) GetActions(id string, reply *engine.Actions) error //GetActions func (rplSv1 *ReplicatorSv1) GetActionPlan(id string, reply *engine.ActionPlan) error { - if rcv, err := rplSv1.dm.DataDB().GetActionPlanDrv(id, true, utils.NonTransactional); err != nil { + if rcv, err := rplSv1.dm.DataDB().GetActionPlanDrv(id); err != nil { return err } else { *reply = *rcv @@ -199,7 +199,7 @@ func (rplSv1 *ReplicatorSv1) GetAllActionPlans(_ string, reply *map[string]*engi //GetAccountActionPlans func (rplSv1 *ReplicatorSv1) GetAccountActionPlans(id string, reply *[]string) error { - if rcv, err := rplSv1.dm.DataDB().GetAccountActionPlansDrv(id, false, utils.NonTransactional); err != nil { + if rcv, err := rplSv1.dm.DataDB().GetAccountActionPlansDrv(id); err != nil { return err } else { *reply = rcv @@ -487,7 +487,7 @@ func (rplSv1 *ReplicatorSv1) SetDispatcherProfile(dpp *engine.DispatcherProfile, } func (rplSv1 *ReplicatorSv1) SetActionPlan(args *engine.SetActionPlanArg, reply *string) error { - if err := rplSv1.dm.DataDB().SetActionPlanDrv(args.Key, args.Ats, args.Overwrite, utils.NonTransactional); err != nil { + if err := rplSv1.dm.DataDB().SetActionPlanDrv(args.Key, args.Ats); err != nil { return err } *reply = utils.OK @@ -495,7 +495,7 @@ func (rplSv1 *ReplicatorSv1) SetActionPlan(args *engine.SetActionPlanArg, reply } func (rplSv1 *ReplicatorSv1) SetAccountActionPlans(args *engine.SetAccountActionPlansArg, reply *string) error { - if err := rplSv1.dm.DataDB().SetAccountActionPlansDrv(args.AcntID, args.AplIDs, args.Overwrite); err != nil { + if err := rplSv1.dm.DataDB().SetAccountActionPlansDrv(args.AcntID, args.AplIDs); err != nil { return err } *reply = utils.OK @@ -623,7 +623,7 @@ func (rplSv1 *ReplicatorSv1) RemoveActions(id string, reply *string) error { } func (rplSv1 *ReplicatorSv1) RemoveActionPlan(id string, reply *string) error { - if err := rplSv1.dm.DataDB().RemoveActionPlanDrv(id, utils.NonTransactional); err != nil { + if err := rplSv1.dm.DataDB().RemoveActionPlanDrv(id); err != nil { return err } *reply = utils.OK @@ -631,7 +631,7 @@ func (rplSv1 *ReplicatorSv1) RemoveActionPlan(id string, reply *string) error { } func (rplSv1 *ReplicatorSv1) RemAccountActionPlans(args *engine.RemAccountActionPlansArgs, reply *string) error { - if err := rplSv1.dm.DataDB().RemAccountActionPlansDrv(args.AcntID, args.ApIDs); err != nil { + if err := rplSv1.dm.DataDB().RemAccountActionPlansDrv(args.AcntID); err != nil { return err } *reply = utils.OK diff --git a/apier/v1/scheduler.go b/apier/v1/scheduler.go index f31938adf..b3a384a74 100644 --- a/apier/v1/scheduler.go +++ b/apier/v1/scheduler.go @@ -120,7 +120,7 @@ type AttrsExecuteScheduledActions struct { func (self *APIerSv1) ExecuteScheduledActions(attr AttrsExecuteScheduledActions, reply *string) error { if attr.ActionPlanID != "" { // execute by ActionPlanID - apl, err := self.DataManager.GetActionPlan(attr.ActionPlanID, false, utils.NonTransactional) + apl, err := self.DataManager.GetActionPlan(attr.ActionPlanID, true, true, utils.NonTransactional) if err != nil { *reply = err.Error() return err diff --git a/apier/v2/accounts.go b/apier/v2/accounts.go index c9942cc28..f1fbfeb2e 100644 --- a/apier/v2/accounts.go +++ b/apier/v2/accounts.go @@ -119,7 +119,7 @@ func (apiv2 *APIerSv2) SetAccount(attr AttrSetAccount, reply *string) error { } } _, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := apiv2.DataManager.GetAccountActionPlans(accID, false, utils.NonTransactional) + acntAPids, err := apiv2.DataManager.GetAccountActionPlans(accID, true, true, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return 0, err } @@ -131,7 +131,7 @@ func (apiv2 *APIerSv2) SetAccount(attr AttrSetAccount, reply *string) error { nAcntAPids = append(nAcntAPids, apID) continue // not removing the ones where } - ap, err := apiv2.DataManager.GetActionPlan(apID, false, utils.NonTransactional) + ap, err := apiv2.DataManager.GetActionPlan(apID, true, true, utils.NonTransactional) if err != nil { return 0, err } @@ -141,7 +141,7 @@ func (apiv2 *APIerSv2) SetAccount(attr AttrSetAccount, reply *string) error { acntAPids = nAcntAPids } for _, apID := range attr.ActionPlanIDs { - ap, err := apiv2.DataManager.GetActionPlan(apID, false, utils.NonTransactional) + ap, err := apiv2.DataManager.GetActionPlan(apID, true, true, utils.NonTransactional) if err != nil { return 0, err } diff --git a/apier/v2/apierv2_it_test.go b/apier/v2/apierv2_it_test.go index b7021d61c..630c50d35 100644 --- a/apier/v2/apierv2_it_test.go +++ b/apier/v2/apierv2_it_test.go @@ -260,7 +260,7 @@ func testAPIerSv2itSetAccountWithAP(t *testing.T) { {ActionsId: argActs1.ActionsId, Time: fmt.Sprintf("%v:%v:%v", tNow.Hour(), tNow.Minute(), tNow.Second()), // 10:4:12 Weight: 20.0}}} - if _, err := dm.GetActionPlan(argAP1.Id, true, utils.NonTransactional); err == nil || err != utils.ErrNotFound { + if _, err := dm.GetActionPlan(argAP1.Id, false, false, utils.NonTransactional); err == nil || err != utils.ErrNotFound { t.Error(err) } if err := apierRPC.Call(utils.APIerSv1SetActionPlan, argAP1, &reply); err != nil { @@ -274,19 +274,19 @@ func testAPIerSv2itSetAccountWithAP(t *testing.T) { ActionPlanIDs: []string{argAP1.Id}, } acntID := utils.ConcatenatedKey(argSetAcnt1.Tenant, argSetAcnt1.Account) - if _, err := dm.GetAccountActionPlans(acntID, true, utils.NonTransactional); err == nil || err != utils.ErrNotFound { + if _, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err == nil || err != utils.ErrNotFound { t.Error(err) } if err := apierRPC.Call(utils.APIerSv2SetAccount, argSetAcnt1, &reply); err != nil { t.Fatal(err) } - if ap, err := dm.GetActionPlan(argAP1.Id, true, utils.NonTransactional); err != nil { + if ap, err := dm.GetActionPlan(argAP1.Id, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if _, hasIt := ap.AccountIDs[acntID]; !hasIt { t.Errorf("ActionPlan does not contain the accountID: %+v", ap) } eAAPids := []string{argAP1.Id} - if aapIDs, err := dm.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if aapIDs, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(eAAPids, aapIDs) { t.Errorf("Expecting: %+v, received: %+v", eAAPids, aapIDs) @@ -295,7 +295,7 @@ func testAPIerSv2itSetAccountWithAP(t *testing.T) { argAP2 := &v1.AttrSetActionPlan{Id: "TestAPIerSv2itSetAccountWithAP_AP_2", ActionPlan: []*v1.AttrActionPlan{ {ActionsId: argActs1.ActionsId, MonthDays: "1", Time: "00:00:00", Weight: 20.0}}} - if _, err := dm.GetActionPlan(argAP2.Id, true, utils.NonTransactional); err == nil || err != utils.ErrNotFound { + if _, err := dm.GetActionPlan(argAP2.Id, false, false, utils.NonTransactional); err == nil || err != utils.ErrNotFound { t.Error(err) } if err := apierRPC.Call(utils.APIerSv2SetActionPlan, argAP2, &reply); err != nil { @@ -312,18 +312,18 @@ func testAPIerSv2itSetAccountWithAP(t *testing.T) { if err := apierRPC.Call(utils.APIerSv2SetAccount, argSetAcnt2, &reply); err != nil { t.Fatal(err) } - if ap, err := dm.GetActionPlan(argAP2.Id, true, utils.NonTransactional); err != nil { + if ap, err := dm.GetActionPlan(argAP2.Id, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if _, hasIt := ap.AccountIDs[acntID]; !hasIt { t.Errorf("ActionPlan does not contain the accountID: %+v", ap) } - if ap, err := dm.GetActionPlan(argAP1.Id, true, utils.NonTransactional); err != nil { + if ap, err := dm.GetActionPlan(argAP1.Id, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if _, hasIt := ap.AccountIDs[acntID]; !hasIt { t.Errorf("ActionPlan does not contain the accountID: %+v", ap) } eAAPids = []string{argAP1.Id, argAP2.Id} - if aapIDs, err := dm.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if aapIDs, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(eAAPids, aapIDs) { t.Errorf("Expecting: %+v, received: %+v", eAAPids, aapIDs) @@ -338,18 +338,18 @@ func testAPIerSv2itSetAccountWithAP(t *testing.T) { if err := apierRPC.Call(utils.APIerSv2SetAccount, argSetAcnt2, &reply); err != nil { t.Fatal(err) } - if ap, err := dm.GetActionPlan(argAP1.Id, true, utils.NonTransactional); err != nil { + if ap, err := dm.GetActionPlan(argAP1.Id, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if _, hasIt := ap.AccountIDs[acntID]; hasIt { t.Errorf("ActionPlan does contain the accountID: %+v", ap) } - if ap, err := dm.GetActionPlan(argAP2.Id, true, utils.NonTransactional); err != nil { + if ap, err := dm.GetActionPlan(argAP2.Id, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if _, hasIt := ap.AccountIDs[acntID]; !hasIt { t.Errorf("ActionPlan does not contain the accountID: %+v", ap) } eAAPids = []string{argAP2.Id} - if aapIDs, err := dm.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if aapIDs, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(eAAPids, aapIDs) { t.Errorf("Expecting: %+v, received: %+v", eAAPids, aapIDs) diff --git a/engine/account_test.go b/engine/account_test.go index 473b83b2e..dfa935182 100644 --- a/engine/account_test.go +++ b/engine/account_test.go @@ -110,8 +110,8 @@ func TestGetSecondsForPrefix(t *testing.T) { DestinationIDs: utils.StringMap{"RET": true}} ub1 := &Account{ID: "CUSTOMER_1:rif", BalanceMap: map[string]Balances{ - utils.VOICE: Balances{b1, b2}, - utils.MONETARY: Balances{&Balance{Value: 200}}}} + utils.VOICE: {b1, b2}, + utils.MONETARY: {&Balance{Value: 200}}}} cd := &CallDescriptor{ Category: "0", Tenant: "vdf", diff --git a/engine/action.go b/engine/action.go index 1446a0637..e8b4ee185 100644 --- a/engine/action.go +++ b/engine/action.go @@ -623,13 +623,13 @@ func removeAccountAction(ub *Account, a *Action, acs Actions, extraData interfac } _, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := dm.GetAccountActionPlans(accID, false, utils.NonTransactional) + acntAPids, err := dm.GetAccountActionPlans(accID, true, true, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accID, err)) return 0, err } for _, apID := range acntAPids { - ap, err := dm.GetActionPlan(apID, false, utils.NonTransactional) + ap, err := dm.GetActionPlan(apID, true, true, utils.NonTransactional) if err != nil { utils.Logger.Err(fmt.Sprintf("Could not retrieve action plan: %s: %v", apID, err)) return 0, err diff --git a/engine/action_plan.go b/engine/action_plan.go index 3da3b2f2b..badfc3c31 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -82,6 +82,9 @@ func (apl *ActionPlan) RemoveAccountID(accID string) (found bool) { // Clone clones *ActionPlan func (apl *ActionPlan) Clone() (interface{}, error) { + if apl == nil { + return nil, nil + } cln := &ActionPlan{ Id: apl.Id, AccountIDs: apl.AccountIDs.Clone(), diff --git a/engine/actions_test.go b/engine/actions_test.go index 343c8993a..d88bd8ac2 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -584,7 +584,7 @@ func TestActionPlansRemoveMember(t *testing.T) { []string{account1.ID}, true); err != nil { t.Error(err) } - dm.GetAccountActionPlans(account1.ID, true, utils.NonTransactional) // FixMe: remove here after finishing testing of map + dm.GetAccountActionPlans(account1.ID, false,false, utils.NonTransactional) // FixMe: remove here after finishing testing of map if err = dm.SetAccountActionPlans(account2.ID, []string{ap2.Id}, false); err != nil { t.Error(err) @@ -613,7 +613,7 @@ func TestActionPlansRemoveMember(t *testing.T) { t.Errorf("Execute Action: %v", err) } - apr, err1 := dm.GetActionPlan(ap1.Id, false, utils.NonTransactional) + apr, err1 := dm.GetActionPlan(ap1.Id, true,true, utils.NonTransactional) if err1 != nil { t.Errorf("Get action plan test: %v", err1) diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go index b0249919d..8d5e081c7 100644 --- a/engine/calldesc_test.go +++ b/engine/calldesc_test.go @@ -38,10 +38,10 @@ func init() { func populateDB() { ats := []*Action{ - &Action{ActionType: "*topup", + {ActionType: "*topup", Balance: &BalanceFilter{Type: utils.StringPointer(utils.MONETARY), Value: &utils.ValueFormula{Static: 10}}}, - &Action{ActionType: "*topup", + {ActionType: "*topup", Balance: &BalanceFilter{Type: utils.StringPointer(utils.VOICE), Weight: utils.Float64Pointer(20), Value: &utils.ValueFormula{Static: 10 * float64(time.Second)}, @@ -49,18 +49,18 @@ func populateDB() { } ats1 := []*Action{ - &Action{ActionType: "*topup", + {ActionType: "*topup", Balance: &BalanceFilter{ Type: utils.StringPointer(utils.MONETARY), Value: &utils.ValueFormula{Static: 10}}, Weight: 10}, - &Action{ActionType: "*reset_account", Weight: 20}, + {ActionType: "*reset_account", Weight: 20}, } minu := &Account{ ID: "vdf:minu", BalanceMap: map[string]Balances{ - utils.MONETARY: Balances{&Balance{Value: 50}}, - utils.VOICE: Balances{ + utils.MONETARY: {&Balance{Value: 50}}, + utils.VOICE: { &Balance{Value: 200 * float64(time.Second), DestinationIDs: utils.NewStringMap("NAT"), Weight: 10}, &Balance{Value: 100 * float64(time.Second), @@ -70,7 +70,7 @@ func populateDB() { broker := &Account{ ID: "vdf:broker", BalanceMap: map[string]Balances{ - utils.VOICE: Balances{ + utils.VOICE: { &Balance{Value: 20 * float64(time.Second), DestinationIDs: utils.NewStringMap("NAT"), Weight: 10, RatingSubject: "rif"}, @@ -81,7 +81,7 @@ func populateDB() { luna := &Account{ ID: "vdf:luna", BalanceMap: map[string]Balances{ - utils.MONETARY: Balances{ + utils.MONETARY: { &Balance{Value: 0, Weight: 20}, }}, } @@ -89,14 +89,14 @@ func populateDB() { minitsboy := &Account{ ID: "vdf:minitsboy", BalanceMap: map[string]Balances{ - utils.VOICE: Balances{ + utils.VOICE: { &Balance{Value: 20 * float64(time.Second), DestinationIDs: utils.NewStringMap("NAT"), Weight: 10, RatingSubject: "rif"}, &Balance{Value: 100 * float64(time.Second), DestinationIDs: utils.NewStringMap("RET"), Weight: 20}, }, - utils.MONETARY: Balances{ + utils.MONETARY: { &Balance{Value: 100, Weight: 10}, }, }, @@ -104,14 +104,14 @@ func populateDB() { max := &Account{ ID: "cgrates.org:max", BalanceMap: map[string]Balances{ - utils.MONETARY: Balances{ + utils.MONETARY: { &Balance{Value: 11, Weight: 20}, }}, } money := &Account{ ID: "cgrates.org:money", BalanceMap: map[string]Balances{ - utils.MONETARY: Balances{ + utils.MONETARY: { &Balance{Value: 10000, Weight: 10}, }}, } @@ -685,7 +685,7 @@ func TestMaxSessionTimeWithAccount(t *testing.T) { } func TestMaxSessionTimeWithMaxRate(t *testing.T) { - ap, err := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, err := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) if err != nil { t.FailNow() } @@ -713,7 +713,7 @@ func TestMaxSessionTimeWithMaxRate(t *testing.T) { } func TestMaxSessionTimeWithMaxCost(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -736,7 +736,7 @@ func TestMaxSessionTimeWithMaxCost(t *testing.T) { } func TestGetMaxSessiontWithBlocker(t *testing.T) { - ap, _ := dm.GetActionPlan("BLOCK_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("BLOCK_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -785,7 +785,7 @@ func TestGetMaxSessiontWithBlocker(t *testing.T) { } func TestGetMaxSessiontWithBlockerEmpty(t *testing.T) { - ap, _ := dm.GetActionPlan("BLOCK_EMPTY_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("BLOCK_EMPTY_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -834,7 +834,7 @@ func TestGetMaxSessiontWithBlockerEmpty(t *testing.T) { } func TestGetCostWithMaxCost(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -857,7 +857,7 @@ func TestGetCostWithMaxCost(t *testing.T) { } func TestGetCostRoundingIssue(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -881,7 +881,7 @@ func TestGetCostRoundingIssue(t *testing.T) { } func TestGetCostRatingInfoOnZeroTime(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -908,7 +908,7 @@ func TestGetCostRatingInfoOnZeroTime(t *testing.T) { } func TestDebitRatingInfoOnZeroTime(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -936,7 +936,7 @@ func TestDebitRatingInfoOnZeroTime(t *testing.T) { } func TestMaxDebitRatingInfoOnZeroTime(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -963,7 +963,7 @@ func TestMaxDebitRatingInfoOnZeroTime(t *testing.T) { } func TestMaxDebitUnknowDest(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -985,7 +985,7 @@ func TestMaxDebitUnknowDest(t *testing.T) { } func TestMaxDebitRoundingIssue(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1019,7 +1019,7 @@ func TestMaxDebitRoundingIssue(t *testing.T) { } func TestDebitRoundingRefund(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1053,7 +1053,7 @@ func TestDebitRoundingRefund(t *testing.T) { } func TestMaxSessionTimeWithMaxCostFree(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1076,7 +1076,7 @@ func TestMaxSessionTimeWithMaxCostFree(t *testing.T) { } func TestMaxDebitWithMaxCostFree(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1099,7 +1099,7 @@ func TestMaxDebitWithMaxCostFree(t *testing.T) { } func TestGetCostWithMaxCostFree(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1123,12 +1123,12 @@ func TestGetCostWithMaxCostFree(t *testing.T) { } func TestMaxSessionTimeWithAccountShared(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP_SHARED0_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP_SHARED0_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) } - ap, _ = dm.GetActionPlan("TOPUP_SHARED10_AT", false, utils.NonTransactional) + ap, _ = dm.GetActionPlan("TOPUP_SHARED10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1162,12 +1162,12 @@ func TestMaxSessionTimeWithAccountShared(t *testing.T) { } func TestMaxDebitWithAccountShared(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP_SHARED0_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP_SHARED0_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) } - ap, _ = dm.GetActionPlan("TOPUP_SHARED10_AT", false, utils.NonTransactional) + ap, _ = dm.GetActionPlan("TOPUP_SHARED10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1378,7 +1378,7 @@ func TestMaxSesionTimeLongerThanMoney(t *testing.T) { } func TestDebitFromShareAndNormal(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP_SHARED10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP_SHARED10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1406,7 +1406,7 @@ func TestDebitFromShareAndNormal(t *testing.T) { } func TestDebitFromEmptyShare(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP_EMPTY_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP_EMPTY_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1434,7 +1434,7 @@ func TestDebitFromEmptyShare(t *testing.T) { } func TestDebitNegatve(t *testing.T) { - ap, _ := dm.GetActionPlan("POST_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("POST_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1473,7 +1473,7 @@ func TestDebitNegatve(t *testing.T) { } func TestMaxDebitZeroDefinedRate(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1502,7 +1502,7 @@ func TestMaxDebitZeroDefinedRate(t *testing.T) { } func TestMaxDebitForceDuration(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1526,7 +1526,7 @@ func TestMaxDebitForceDuration(t *testing.T) { } func TestMaxDebitZeroDefinedRateOnlyMinutes(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) @@ -1554,7 +1554,7 @@ func TestMaxDebitZeroDefinedRateOnlyMinutes(t *testing.T) { } func TestMaxDebitConsumesMinutes(t *testing.T) { - ap, _ := dm.GetActionPlan("TOPUP10_AT", false, utils.NonTransactional) + ap, _ := dm.GetActionPlan("TOPUP10_AT", true, true, utils.NonTransactional) for _, at := range ap.ActionTimings { at.accountIDs = ap.AccountIDs at.Execute(nil, nil) diff --git a/engine/datamanager.go b/engine/datamanager.go index e681d80d5..a86c487d6 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -241,9 +241,9 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b case utils.ACTION_PREFIX: _, err = dm.GetActions(dataID, true, utils.NonTransactional) case utils.ACTION_PLAN_PREFIX: - _, err = dm.GetActionPlan(dataID, true, utils.NonTransactional) + _, err = dm.GetActionPlan(dataID, false, true, utils.NonTransactional) case utils.AccountActionPlansPrefix: - _, err = dm.GetAccountActionPlans(dataID, true, utils.NonTransactional) + _, err = dm.GetAccountActionPlans(dataID, false, true, utils.NonTransactional) case utils.ACTION_TRIGGER_PREFIX: _, err = dm.GetActionTriggers(dataID, true, utils.NonTransactional) case utils.SHARED_GROUP_PREFIX: @@ -319,6 +319,54 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b return } +func (dm *DataManager) RebuildReverseForPrefix(prefix string) (err error) { + switch prefix { + case utils.REVERSE_DESTINATION_PREFIX: + if err = dm.dataDB.RemoveKeysForPrefix(prefix); err != nil { + return + } + var keys []string + if keys, err = dm.dataDB.GetKeysForPrefix(utils.DestinationPrefix); err != nil { + return + } + for _, key := range keys { + var dest *Destination + if dest, err = dm.GetDestination(key[len(utils.DestinationPrefix):], false, utils.NonTransactional); err != nil { + return err + } + if err = dm.SetReverseDestination(dest, utils.NonTransactional); err != nil { + return err + } + } + case utils.AccountActionPlansPrefix: + if err = dm.dataDB.RemoveKeysForPrefix(prefix); err != nil { + return + } + var keys []string + if keys, err = dm.dataDB.GetKeysForPrefix(utils.ACTION_PLAN_PREFIX); err != nil { + return + } + accIDs := make(map[string][]string) + for _, key := range keys { + var apl *ActionPlan + if apl, err = dm.GetActionPlan(key[len(utils.ACTION_PLAN_PREFIX):], + true, false, utils.NonTransactional); err != nil { + return + } + for acntID := range apl.AccountIDs { + accIDs[acntID] = append(accIDs[acntID], apl.Id) + } + } + for acntID, apIDs := range accIDs { + if err = dm.SetAccountActionPlans(acntID, apIDs, true); err != nil { + return + } + } + default: + return utils.ErrInvalidKey + } + return +} func (dm *DataManager) GetDestination(key string, skipCache bool, transactionID string) (dest *Destination, err error) { dest, err = dm.dataDB.GetDestinationDrv(key, skipCache, transactionID) if err != nil { @@ -1317,40 +1365,74 @@ func (dm *DataManager) RemoveActions(key, transactionID string) (err error) { return } -func (dm *DataManager) GetActionPlan(key string, skipCache bool, transactionID string) (ats *ActionPlan, err error) { - ats, err = dm.dataDB.GetActionPlanDrv(key, skipCache, transactionID) - if err == utils.ErrNotFound && - config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans].Remote { - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, - utils.ReplicatorSv1GetActionPlan, key, &ats); err == nil { - err = dm.dataDB.SetActionPlanDrv(key, ats, true, utils.NonTransactional) +func (dm *DataManager) GetActionPlan(key string, cacheRead, cacheWrite bool, transactionID string) (ats *ActionPlan, err error) { + if cacheRead { + if x, err := Cache.GetCloned(utils.CacheActionPlans, key); err != nil { + if err != ltcache.ErrNotFound { // Only consider cache if item was found + return nil, err + } + } else if x == nil { // item was placed nil in cache + return nil, utils.ErrNotFound + } else { + return x.(*ActionPlan), nil } } + ats, err = dm.dataDB.GetActionPlanDrv(key) if err != nil { - err = utils.CastRPCErr(err) - return nil, err + if err == utils.ErrNotFound && + config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans].Remote { + if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, + utils.ReplicatorSv1GetActionPlan, key, &ats); err == nil { + err = dm.dataDB.SetActionPlanDrv(key, ats) + if err != nil { + err = utils.CastRPCErr(err) + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheActionPlans, key, nil, nil, + cacheCommit(transactionID), transactionID) + } + return nil, err + } + } + } + } + if cacheWrite { + Cache.Set(utils.CacheActionPlans, key, ats, nil, + cacheCommit(transactionID), transactionID) } return } type SetActionPlanArg struct { - Key string - Ats *ActionPlan - Overwrite bool + Key string + Ats *ActionPlan } func (dm *DataManager) SetActionPlan(key string, ats *ActionPlan, overwrite bool, transactionID string) (err error) { - if err = dm.dataDB.SetActionPlanDrv(key, ats, overwrite, transactionID); err != nil { + if len(ats.ActionTimings) == 0 { // special case to keep the old style + return dm.RemoveActionPlan(key, transactionID) + } + if !overwrite { + // get existing action plan to merge the account ids + if oldAP, _ := dm.GetActionPlan(key, true, true, transactionID); oldAP != nil { + if ats.AccountIDs == nil && len(oldAP.AccountIDs) > 0 { + ats.AccountIDs = make(utils.StringMap) + } + for accID := range oldAP.AccountIDs { + ats.AccountIDs[accID] = true + } + } + } + + if err = dm.dataDB.SetActionPlanDrv(key, ats); err != nil { return } if config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans].Replicate { var reply string if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, utils.ReplicatorSv1SetActionPlan, &SetActionPlanArg{ - Key: key, - Ats: ats, - Overwrite: overwrite, + Key: key, + Ats: ats, }, &reply); err != nil { err = utils.CastRPCErr(err) return @@ -1375,7 +1457,7 @@ func (dm *DataManager) GetAllActionPlans() (ats map[string]*ActionPlan, err erro } func (dm *DataManager) RemoveActionPlan(key string, transactionID string) (err error) { - if err = dm.dataDB.RemoveActionPlanDrv(key, transactionID); err != nil { + if err = dm.dataDB.RemoveActionPlanDrv(key); err != nil { return } if config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans].Replicate { @@ -1385,40 +1467,66 @@ func (dm *DataManager) RemoveActionPlan(key string, transactionID string) (err e } return } -func (dm *DataManager) GetAccountActionPlans(acntID string, - skipCache bool, transactionID string) (apIDs []string, err error) { - apIDs, err = dm.dataDB.GetAccountActionPlansDrv(acntID, skipCache, transactionID) +func (dm *DataManager) GetAccountActionPlans(acntID string, cacheRead, cacheWrite bool, transactionID string) (apIDs []string, err error) { + if cacheRead { + if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.([]string), nil + } + } + apIDs, err = dm.dataDB.GetAccountActionPlansDrv(acntID) if ((err == nil && len(apIDs) == 0) || err == utils.ErrNotFound) && config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans].Remote { if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, utils.ReplicatorSv1GetAccountActionPlans, acntID, &apIDs); err == nil { - err = dm.dataDB.SetAccountActionPlansDrv(acntID, apIDs, true) + err = dm.dataDB.SetAccountActionPlansDrv(acntID, apIDs) + } + if err != nil { + err = utils.CastRPCErr(err) + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, + cacheCommit(transactionID), transactionID) + } + return nil, err } } - if err != nil { - err = utils.CastRPCErr(err) - return nil, err + if cacheWrite { + Cache.Set(utils.CacheAccountActionPlans, acntID, apIDs, nil, + cacheCommit(transactionID), transactionID) } return } type SetAccountActionPlansArg struct { - AcntID string - AplIDs []string - Overwrite bool + AcntID string + AplIDs []string } func (dm *DataManager) SetAccountActionPlans(acntID string, aPlIDs []string, overwrite bool) (err error) { - if err = dm.dataDB.SetAccountActionPlansDrv(acntID, aPlIDs, overwrite); err != nil { + if !overwrite { + var oldaPlIDs []string + if oldaPlIDs, err = dm.GetAccountActionPlans(acntID, + true, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { + return + } + for _, oldAPid := range oldaPlIDs { + if !utils.IsSliceMember(aPlIDs, oldAPid) { + aPlIDs = append(aPlIDs, oldAPid) + } + } + } + + if err = dm.dataDB.SetAccountActionPlansDrv(acntID, aPlIDs); err != nil { return } if config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans].Replicate { var reply string if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, utils.ReplicatorSv1SetAccountActionPlans, &SetAccountActionPlansArg{ - AcntID: acntID, - AplIDs: aPlIDs, - Overwrite: overwrite, + AcntID: acntID, + AplIDs: aPlIDs, }, &reply); err != nil { err = utils.CastRPCErr(err) return @@ -1433,7 +1541,22 @@ type RemAccountActionPlansArgs struct { } func (dm *DataManager) RemAccountActionPlans(acntID string, apIDs []string) (err error) { - if err = dm.dataDB.RemAccountActionPlansDrv(acntID, apIDs); err != nil { + if len(apIDs) != 0 { // special case to keep the old style + var oldAAP []string + if oldAAP, err = dm.GetAccountActionPlans(acntID, true, false, utils.NonTransactional); err != nil { + return + } + remainAAP := make([]string, 0, len(oldAAP)) + for _, ap := range oldAAP { + if !utils.IsSliceMember(apIDs, ap) { + remainAAP = append(remainAAP, ap) + } + } + if len(remainAAP) != 0 { + return dm.SetAccountActionPlans(acntID, remainAAP, true) + } + } + if err = dm.dataDB.RemAccountActionPlansDrv(acntID); err != nil { return } if config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans].Replicate { diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index 71be7d643..d065d0da0 100644 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -291,7 +291,7 @@ func testLoaderITWriteToDatabase(t *testing.T) { } for k, ap := range loader.actionPlans { - rcv, err := loader.dm.GetActionPlan(k, true, utils.NonTransactional) + rcv, err := loader.dm.GetActionPlan(k, false, false, utils.NonTransactional) if err != nil { t.Error("Failed GetActionPlan: ", err.Error()) } diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 33d022612..2bc58e76e 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -58,6 +58,7 @@ var ( testOnStorITSharedGroup, testOnStorITCRUDActionPlan, testOnStorITCRUDAccountActionPlans, + testOnStorITCRUDAccountActionPlans2, testOnStorITCRUDAccount, testOnStorITResource, testOnStorITResourceProfile, @@ -1034,13 +1035,13 @@ func testOnStorITCRUDActionPlan(t *testing.T) { }, }, } - if _, rcvErr := onStor.GetActionPlan(ap.Id, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetActionPlan(ap.Id, false, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } if err := onStor.SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil { t.Error(err) } - if rcv, err := onStor.GetActionPlan(ap.Id, true, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetActionPlan(ap.Id, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(ap, rcv) { t.Errorf("Expecting: %v, received: %v", ap, rcv) @@ -1073,13 +1074,13 @@ func testOnStorITCRUDAccountActionPlans(t *testing.T) { expect := []string{"PACKAGE_10_SHARED_A_5", "USE_SHARED_A", "apl_PACKAGE_1001"} aAPs := []string{"PACKAGE_10_SHARED_A_5", "apl_PACKAGE_1001"} aAPs2 := []string{"USE_SHARED_A"} - if _, rcvErr := onStor.GetAccountActionPlans(acntID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } if err := onStor.SetAccountActionPlans(acntID, aAPs, true); err != nil { t.Error(err) } - if rcv, err := onStor.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(aAPs, rcv) { t.Errorf("Expecting: %v, received: %v", aAPs, rcv) @@ -1087,7 +1088,7 @@ func testOnStorITCRUDAccountActionPlans(t *testing.T) { if err := onStor.SetAccountActionPlans(acntID, aAPs2, false); err != nil { t.Error(err) } - if rcv, err := onStor.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetAccountActionPlans(acntID, false, true, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(expect, rcv) { t.Errorf("Expecting: %v, received: %v", expect, rcv) @@ -1100,7 +1101,7 @@ func testOnStorITCRUDAccountActionPlans(t *testing.T) { // t.Error(rcvErr) // } // - if rcv, err := onStor.GetAccountActionPlans(acntID, false, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetAccountActionPlans(acntID, true, true, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(expect, rcv) { t.Errorf("Expecting: %v, received: %v", expect, rcv) @@ -1108,10 +1109,11 @@ func testOnStorITCRUDAccountActionPlans(t *testing.T) { // if err = onStor.DataDB().SelectDatabase(onStorCfg); err != nil { // t.Error(err) // } + if err := onStor.RemAccountActionPlans(acntID, aAPs2); err != nil { t.Error(err) } - if rcv, err := onStor.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetAccountActionPlans(acntID, false, true, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(aAPs, rcv) { t.Errorf("Expecting: %v, received: %v", aAPs, rcv) @@ -1119,7 +1121,51 @@ func testOnStorITCRUDAccountActionPlans(t *testing.T) { if err := onStor.RemAccountActionPlans(acntID, aAPs); err != nil { t.Error(err) } - if _, rcvErr := onStor.GetAccountActionPlans(acntID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if v, rcvErr := onStor.GetAccountActionPlans(acntID, false, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + t.Error(rcvErr, v) + } +} + +func testOnStorITCRUDAccountActionPlans2(t *testing.T) { + acntID := utils.ConcatenatedKey("cgrates.org2", "1001") + acntID2 := utils.ConcatenatedKey("cgrates.org2", "1002") + expect := []string{"PACKAGE_10_SHARED_A_15", "PACKAGE_10_SHARED_A_5"} + // aAPs := []string{"PACKAGE_10_SHARED_A_5"} + if err := onStor.SetActionPlan("PACKAGE_10_SHARED_A_5", &ActionPlan{ + Id: "PACKAGE_10_SHARED_A_5", + AccountIDs: utils.NewStringMap(acntID, acntID2), + ActionTimings: []*ActionTiming{{}}, + }, true, utils.NonTransactional); err != nil { + t.Error(err) + } + if err := onStor.SetActionPlan("PACKAGE_10_SHARED_A_15", &ActionPlan{ + Id: "PACKAGE_10_SHARED_A_15", + AccountIDs: utils.NewStringMap(acntID, acntID2), + ActionTimings: []*ActionTiming{{}}, + }, true, utils.NonTransactional); err != nil { + t.Error(err) + } + Cache.Set(utils.CacheAccountActionPlans, acntID2, nil, nil, true, utils.NonTransactional) + if err = onStor.RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil { + t.Fatal(err) + } + if rcv, err := onStor.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expect, rcv) { + t.Errorf("Expecting: %v, received: %v", expect, rcv) + } + if rcv, err := onStor.GetAccountActionPlans(acntID2, false, false, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expect, rcv) { + t.Errorf("Expecting: %v, received: %v", expect, rcv) + } + if err := onStor.RemAccountActionPlans(acntID, nil); err != nil { + t.Error(err) + } + if err := onStor.RemAccountActionPlans(acntID2, nil); err != nil { + t.Error(err) + } + if _, rcvErr := onStor.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 701543c29..f68cd5fcd 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -34,8 +34,7 @@ type Storage interface { Close() Flush(string) error GetKeysForPrefix(string) ([]string, error) - RebuildReverseForPrefix(string) error - RemoveReverseForPrefix(string) error + RemoveKeysForPrefix(string) error GetVersions(itm string) (vrs Versions, err error) SetVersions(vrs Versions, overwrite bool) (err error) RemoveVersions(vrs Versions) (err error) @@ -69,14 +68,13 @@ type DataDB interface { GetActionTriggersDrv(string) (ActionTriggers, error) SetActionTriggersDrv(string, ActionTriggers) error RemoveActionTriggersDrv(string) error - GetActionPlanDrv(string, bool, string) (*ActionPlan, error) - SetActionPlanDrv(string, *ActionPlan, bool, string) error - RemoveActionPlanDrv(key string, transactionID string) error + GetActionPlanDrv(string) (*ActionPlan, error) + SetActionPlanDrv(string, *ActionPlan) error + RemoveActionPlanDrv(key string) error GetAllActionPlansDrv() (map[string]*ActionPlan, error) - GetAccountActionPlansDrv(acntID string, skipCache bool, - transactionID string) (apIDs []string, err error) - SetAccountActionPlansDrv(acntID string, apIDs []string, overwrite bool) (err error) - RemAccountActionPlansDrv(acntID string, apIDs []string) (err error) + GetAccountActionPlansDrv(acntID string) (apIDs []string, err error) + SetAccountActionPlansDrv(acntID string, apIDs []string) (err error) + RemAccountActionPlansDrv(acntID string) (err error) PushTask(*Task) error PopTask() (*Task, error) GetAccountDrv(string) (*Account, error) diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index a924db839..7dfa83e0c 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -368,76 +368,24 @@ func (iDB *InternalDB) GetKeysForPrefix(prefix string) ([]string, error) { return nil, fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } category := prefix[:keyLen] // prefix length - queryPrefix := prefix[keyLen:] - ids := iDB.db.GetItemIDs(utils.CachePrefixToInstance[category], queryPrefix) + ids := iDB.db.GetItemIDs(utils.CachePrefixToInstance[category], prefix[keyLen:]) for i := range ids { ids[i] = category + ids[i] } return ids, nil } -func (iDB *InternalDB) RebuildReverseForPrefix(prefix string) (err error) { - keys, err := iDB.GetKeysForPrefix(prefix) - if err != nil { - return err +func (iDB *InternalDB) RemoveKeysForPrefix(prefix string) (err error) { + keyLen := len(utils.DESTINATION_PREFIX) + if len(prefix) < keyLen { + return fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } - for _, key := range keys { - iDB.db.Remove(utils.CacheReverseDestinations, key, + cacheID := utils.CachePrefixToInstance[prefix[:keyLen]] + for _, key := range iDB.db.GetItemIDs(cacheID, prefix[keyLen:]) { + iDB.db.Remove(cacheID, key, cacheCommit(utils.NonTransactional), utils.NonTransactional) } - switch prefix { - case utils.REVERSE_DESTINATION_PREFIX: - keys, err = iDB.GetKeysForPrefix(utils.DESTINATION_PREFIX) - if err != nil { - return err - } - for _, key := range keys { - dest, err := iDB.GetDestinationDrv(key[len(utils.DESTINATION_PREFIX):], false, utils.NonTransactional) - if err != nil { - return err - } - if err := iDB.SetReverseDestinationDrv(dest, utils.NonTransactional); err != nil { - return err - } - } - case utils.AccountActionPlansPrefix: - return nil - default: - return utils.ErrInvalidKey - } - return nil -} - -func (iDB *InternalDB) RemoveReverseForPrefix(prefix string) (err error) { - keys, err := iDB.GetKeysForPrefix(prefix) - if err != nil { - return err - } - for _, key := range keys { - iDB.db.Remove(utils.CacheReverseDestinations, key, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - switch prefix { - case utils.REVERSE_DESTINATION_PREFIX: - keys, err = iDB.GetKeysForPrefix(utils.DESTINATION_PREFIX) - if err != nil { - return err - } - for _, key := range keys { - dest, err := iDB.GetDestinationDrv(key[len(utils.DESTINATION_PREFIX):], false, utils.NonTransactional) - if err != nil { - return err - } - if err := iDB.RemoveDestinationDrv(dest.Id, utils.NonTransactional); err != nil { - return err - } - } - case utils.AccountActionPlansPrefix: - return nil - default: - return utils.ErrInvalidKey - } - return nil + return } func (iDB *InternalDB) GetVersions(itm string) (vrs Versions, err error) { @@ -464,7 +412,7 @@ func (iDB *InternalDB) SetVersions(vrs Versions, overwrite bool) (err error) { x, ok := iDB.db.Get(utils.TBLVersions, utils.Version) if !ok || x == nil { iDB.db.Set(utils.TBLVersions, utils.Version, vrs, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } provVrs := x.(Versions) @@ -472,7 +420,7 @@ func (iDB *InternalDB) SetVersions(vrs Versions, overwrite bool) (err error) { provVrs[key] = val } iDB.db.Set(utils.TBLVersions, utils.Version, provVrs, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -488,11 +436,11 @@ func (iDB *InternalDB) RemoveVersions(vrs Versions) (err error) { delete(internalVersions, key) } iDB.db.Set(utils.TBLVersions, utils.Version, internalVersions, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return nil } iDB.db.Remove(utils.TBLVersions, utils.Version, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -533,13 +481,13 @@ func (iDB *InternalDB) GetRatingPlanDrv(id string) (rp *RatingPlan, err error) { func (iDB *InternalDB) SetRatingPlanDrv(rp *RatingPlan) (err error) { iDB.db.Set(utils.CacheRatingPlans, rp.Id, rp, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveRatingPlanDrv(id string) (err error) { iDB.db.Remove(utils.CacheRatingPlans, id, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -553,13 +501,13 @@ func (iDB *InternalDB) GetRatingProfileDrv(id string) (rp *RatingProfile, err er func (iDB *InternalDB) SetRatingProfileDrv(rp *RatingProfile) (err error) { iDB.db.Set(utils.CacheRatingProfiles, rp.Id, rp, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveRatingProfileDrv(id string) (err error) { iDB.db.Remove(utils.CacheRatingProfiles, id, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -587,7 +535,7 @@ func (iDB *InternalDB) GetDestinationDrv(key string, skipCache bool, transaction func (iDB *InternalDB) SetDestinationDrv(dest *Destination, transactionID string) (err error) { iDB.db.Set(utils.CacheDestinations, dest.Id, dest, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) Cache.Remove(utils.CacheDestinations, dest.Id, cacheCommit(transactionID), transactionID) return @@ -600,12 +548,12 @@ func (iDB *InternalDB) RemoveDestinationDrv(destID string, transactionID string) return } iDB.db.Remove(utils.CacheDestinations, destID, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) Cache.Remove(utils.CacheDestinations, destID, cacheCommit(transactionID), transactionID) for _, prefix := range d.Prefixes { iDB.db.Remove(utils.CacheReverseDestinations, prefix, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) iDB.GetReverseDestinationDrv(prefix, true, transactionID) // it will recache the destination } return @@ -626,7 +574,7 @@ func (iDB *InternalDB) SetReverseDestinationDrv(dest *Destination, transactionID mpRevDst[dest.Id] = true // for ReverseDestination we will use Groups iDB.db.Set(utils.CacheReverseDestinations, p, mpRevDst, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) } return } @@ -704,7 +652,7 @@ func (iDB *InternalDB) UpdateReverseDestinationDrv(oldDest, newDest *Destination } // for ReverseDestination we will use Groups iDB.db.Set(utils.CacheReverseDestinations, obsoletePrefix, mpRevDst, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) } Cache.Remove(utils.CacheReverseDestinations, obsoletePrefix, @@ -724,7 +672,7 @@ func (iDB *InternalDB) UpdateReverseDestinationDrv(oldDest, newDest *Destination mpRevDst[newDest.Id] = true // for ReverseDestination we will use Groups iDB.db.Set(utils.CacheReverseDestinations, addedPrefix, mpRevDst, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) } return err } @@ -739,13 +687,13 @@ func (iDB *InternalDB) GetActionsDrv(id string) (acts Actions, err error) { func (iDB *InternalDB) SetActionsDrv(id string, acts Actions) (err error) { iDB.db.Set(utils.CacheActions, id, acts, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveActionsDrv(id string) (err error) { iDB.db.Remove(utils.CacheActions, id, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -759,13 +707,13 @@ func (iDB *InternalDB) GetSharedGroupDrv(id string) (sh *SharedGroup, err error) func (iDB *InternalDB) SetSharedGroupDrv(sh *SharedGroup) (err error) { iDB.db.Set(utils.CacheSharedGroups, sh.Id, sh, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveSharedGroupDrv(id string) (err error) { iDB.db.Remove(utils.CacheSharedGroups, id, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -779,70 +727,34 @@ func (iDB *InternalDB) GetActionTriggersDrv(id string) (at ActionTriggers, err e func (iDB *InternalDB) SetActionTriggersDrv(id string, at ActionTriggers) (err error) { iDB.db.Set(utils.CacheActionTriggers, id, at, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveActionTriggersDrv(id string) (err error) { iDB.db.Remove(utils.CacheActionTriggers, id, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } -func (iDB *InternalDB) GetActionPlanDrv(key string, skipCache bool, - transactionID string) (ats *ActionPlan, err error) { - if !skipCache { - if x, ok := Cache.Get(utils.CacheActionPlans, key); ok { - if x != nil { - return x.(*ActionPlan), nil - } - return nil, utils.ErrNotFound - } - } - cCommit := cacheCommit(transactionID) +func (iDB *InternalDB) GetActionPlanDrv(key string) (ats *ActionPlan, err error) { x, ok := iDB.db.Get(utils.CacheActionPlans, key) if !ok || x == nil { - Cache.Set(utils.CacheActionPlans, key, nil, nil, - cCommit, transactionID) return nil, utils.ErrNotFound } ats = x.(*ActionPlan) - Cache.Set(utils.CacheActionPlans, key, ats, nil, - cCommit, transactionID) return } -func (iDB *InternalDB) SetActionPlanDrv(key string, ats *ActionPlan, - overwrite bool, transactionID string) (err error) { - cCommit := cacheCommit(transactionID) - if len(ats.ActionTimings) == 0 { - iDB.db.Remove(utils.CacheActionPlans, key, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - Cache.Remove(utils.CacheActionPlans, key, - cCommit, transactionID) - return - } - if !overwrite { - // get existing action plan to merge the account ids - if existingAts, _ := iDB.GetActionPlanDrv(key, true, - transactionID); existingAts != nil { - if ats.AccountIDs == nil && len(existingAts.AccountIDs) > 0 { - ats.AccountIDs = make(utils.StringMap) - } - for accID := range existingAts.AccountIDs { - ats.AccountIDs[accID] = true - } - } - } +func (iDB *InternalDB) SetActionPlanDrv(key string, ats *ActionPlan) (err error) { iDB.db.Set(utils.CacheActionPlans, key, ats, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } -func (iDB *InternalDB) RemoveActionPlanDrv(key string, transactionID string) (err error) { +func (iDB *InternalDB) RemoveActionPlanDrv(key string) (err error) { iDB.db.Remove(utils.CacheActionPlans, key, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - Cache.Remove(utils.CacheActionPlans, key, cacheCommit(transactionID), transactionID) + true, utils.NonTransactional) return } @@ -854,7 +766,7 @@ func (iDB *InternalDB) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err e ats = make(map[string]*ActionPlan, len(keys)) for _, key := range keys { - ap, err := iDB.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):], false, utils.NonTransactional) + ap, err := iDB.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):]) if err != nil { return nil, err } @@ -863,69 +775,25 @@ func (iDB *InternalDB) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err e return } -func (iDB *InternalDB) GetAccountActionPlansDrv(acntID string, - skipCache bool, transactionID string) (apIDs []string, err error) { - if !skipCache { - if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.([]string), nil - } - } +func (iDB *InternalDB) GetAccountActionPlansDrv(acntID string) (apIDs []string, err error) { x, ok := iDB.db.Get(utils.CacheAccountActionPlans, acntID) if !ok || x == nil { - Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, - cacheCommit(transactionID), transactionID) return nil, utils.ErrNotFound } apIDs = x.([]string) - Cache.Set(utils.CacheAccountActionPlans, acntID, apIDs, nil, - cacheCommit(transactionID), transactionID) return } -func (iDB *InternalDB) SetAccountActionPlansDrv(acntID string, apIDs []string, overwrite bool) (err error) { - if !overwrite { - if oldaPlIDs, err := iDB.GetAccountActionPlansDrv(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { - return err - } else { - for _, oldAPid := range oldaPlIDs { - if !utils.IsSliceMember(apIDs, oldAPid) { - apIDs = append(apIDs, oldAPid) - } - } - } - } +func (iDB *InternalDB) SetAccountActionPlansDrv(acntID string, apIDs []string) (err error) { iDB.db.Set(utils.CacheAccountActionPlans, acntID, apIDs, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } -func (iDB *InternalDB) RemAccountActionPlansDrv(acntID string, apIDs []string) (err error) { - if len(apIDs) == 0 { - iDB.db.Remove(utils.CacheAccountActionPlans, acntID, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - return - } - oldaPlIDs, err := iDB.GetAccountActionPlansDrv(acntID, true, utils.NonTransactional) - if err != nil { - return err - } - for i := 0; i < len(oldaPlIDs); { - if utils.IsSliceMember(apIDs, oldaPlIDs[i]) { - oldaPlIDs = append(oldaPlIDs[:i], oldaPlIDs[i+1:]...) - continue - } - i++ - } - if len(oldaPlIDs) == 0 { - iDB.db.Remove(utils.CacheAccountActionPlans, acntID, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - return - } - iDB.db.Set(utils.CacheAccountActionPlans, acntID, oldaPlIDs, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) +func (iDB *InternalDB) RemAccountActionPlansDrv(acntID string) (err error) { + iDB.db.Remove(utils.CacheAccountActionPlans, acntID, + true, utils.NonTransactional) + return return } @@ -972,13 +840,13 @@ func (iDB *InternalDB) SetAccountDrv(acc *Account) (err error) { } acc.UpdateTime = time.Now() iDB.db.Set(utils.CacheAccounts, acc.ID, acc, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveAccountDrv(id string) (err error) { iDB.db.Remove(utils.CacheAccounts, id, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -992,13 +860,13 @@ func (iDB *InternalDB) GetResourceProfileDrv(tenant, id string) (rp *ResourcePro func (iDB *InternalDB) SetResourceProfileDrv(rp *ResourceProfile) (err error) { iDB.db.Set(utils.CacheResourceProfiles, rp.TenantID(), rp, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveResourceProfileDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheResourceProfiles, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -1012,13 +880,13 @@ func (iDB *InternalDB) GetResourceDrv(tenant, id string) (r *Resource, err error func (iDB *InternalDB) SetResourceDrv(r *Resource) (err error) { iDB.db.Set(utils.CacheResources, r.TenantID(), r, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveResourceDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheResources, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -1032,13 +900,13 @@ func (iDB *InternalDB) GetTimingDrv(id string) (tmg *utils.TPTiming, err error) func (iDB *InternalDB) SetTimingDrv(timing *utils.TPTiming) (err error) { iDB.db.Set(utils.CacheTimings, timing.ID, timing, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveTimingDrv(id string) (err error) { iDB.db.Remove(utils.CacheTimings, id, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -1090,9 +958,9 @@ func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string, if commit && transactionID != "" { x, _ := iDB.db.Get(cacheID, dbKey) iDB.db.Remove(cacheID, dbKey, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) iDB.db.Set(cacheID, originKey, x, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } var toBeDeleted []string @@ -1110,7 +978,7 @@ func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string, x, ok := iDB.db.Get(cacheID, dbKey) if !ok || x == nil { iDB.db.Set(cacheID, dbKey, toBeAdded, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return err } @@ -1130,7 +998,7 @@ func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string, } func (iDB *InternalDB) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) { iDB.db.Remove(cacheID, utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -1163,13 +1031,13 @@ func (iDB *InternalDB) GetStatQueueProfileDrv(tenant string, id string) (sq *Sta } func (iDB *InternalDB) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { iDB.db.Set(utils.CacheStatQueueProfiles, sq.TenantID(), sq, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemStatQueueProfileDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -1188,12 +1056,12 @@ func (iDB *InternalDB) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err } } iDB.db.Set(utils.CacheStatQueues, utils.ConcatenatedKey(sq.Tenant, sq.ID), sq, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemStatQueueDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheStatQueues, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -1207,13 +1075,13 @@ func (iDB *InternalDB) GetThresholdProfileDrv(tenant, id string) (tp *ThresholdP func (iDB *InternalDB) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) { iDB.db.Set(utils.CacheThresholdProfiles, tp.TenantID(), tp, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemThresholdProfileDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheThresholdProfiles, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -1227,13 +1095,13 @@ func (iDB *InternalDB) GetThresholdDrv(tenant, id string) (th *Threshold, err er func (iDB *InternalDB) SetThresholdDrv(th *Threshold) (err error) { iDB.db.Set(utils.CacheThresholds, th.TenantID(), th, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveThresholdDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheThresholds, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -1248,13 +1116,13 @@ func (iDB *InternalDB) GetFilterDrv(tenant, id string) (fltr *Filter, err error) func (iDB *InternalDB) SetFilterDrv(fltr *Filter) (err error) { iDB.db.Set(utils.CacheFilters, fltr.TenantID(), fltr, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveFilterDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheFilters, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } @@ -1268,12 +1136,12 @@ func (iDB *InternalDB) GetSupplierProfileDrv(tenant, id string) (spp *SupplierPr } func (iDB *InternalDB) SetSupplierProfileDrv(spp *SupplierProfile) (err error) { iDB.db.Set(utils.CacheSupplierProfiles, spp.TenantID(), spp, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveSupplierProfileDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheSupplierProfiles, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) GetAttributeProfileDrv(tenant, id string) (attr *AttributeProfile, err error) { @@ -1285,12 +1153,12 @@ func (iDB *InternalDB) GetAttributeProfileDrv(tenant, id string) (attr *Attribut } func (iDB *InternalDB) SetAttributeProfileDrv(attr *AttributeProfile) (err error) { iDB.db.Set(utils.CacheAttributeProfiles, attr.TenantID(), attr, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveAttributeProfileDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheAttributeProfiles, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) GetChargerProfileDrv(tenant, id string) (ch *ChargerProfile, err error) { @@ -1302,12 +1170,12 @@ func (iDB *InternalDB) GetChargerProfileDrv(tenant, id string) (ch *ChargerProfi } func (iDB *InternalDB) SetChargerProfileDrv(chr *ChargerProfile) (err error) { iDB.db.Set(utils.CacheChargerProfiles, chr.TenantID(), chr, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveChargerProfileDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheChargerProfiles, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) GetDispatcherProfileDrv(tenant, id string) (dpp *DispatcherProfile, err error) { @@ -1319,12 +1187,12 @@ func (iDB *InternalDB) GetDispatcherProfileDrv(tenant, id string) (dpp *Dispatch } func (iDB *InternalDB) SetDispatcherProfileDrv(dpp *DispatcherProfile) (err error) { iDB.db.Set(utils.CacheDispatcherProfiles, dpp.TenantID(), dpp, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveDispatcherProfileDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheDispatcherProfiles, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[string]int64, err error) { @@ -1341,7 +1209,7 @@ func (iDB *InternalDB) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[strin } func (iDB *InternalDB) SetLoadIDsDrv(loadIDs map[string]int64) (err error) { iDB.db.Set(utils.CacheLoadIDs, utils.LoadIDs, loadIDs, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) GetDispatcherHostDrv(tenant, id string) (dpp *DispatcherHost, err error) { @@ -1353,12 +1221,12 @@ func (iDB *InternalDB) GetDispatcherHostDrv(tenant, id string) (dpp *DispatcherH } func (iDB *InternalDB) SetDispatcherHostDrv(dpp *DispatcherHost) (err error) { iDB.db.Set(utils.CacheDispatcherHosts, dpp.TenantID(), dpp, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } func (iDB *InternalDB) RemoveDispatcherHostDrv(tenant, id string) (err error) { iDB.db.Remove(utils.CacheDispatcherHosts, utils.ConcatenatedKey(tenant, id), - cacheCommit(utils.NonTransactional), utils.NonTransactional) + true, utils.NonTransactional) return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index e1e502ff2..8bc8d26e5 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -32,7 +32,6 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/ltcache" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/bsoncodec" @@ -428,105 +427,60 @@ func (ms *MongoStorage) SelectDatabase(dbName string) (err error) { return } -// RebuildReverseForPrefix implementation -func (ms *MongoStorage) RebuildReverseForPrefix(prefix string) (err error) { - if !utils.SliceHasMember([]string{utils.AccountActionPlansPrefix, utils.REVERSE_DESTINATION_PREFIX}, prefix) { - return utils.ErrInvalidKey - } - colName, ok := ms.getColNameForPrefix(prefix) - if !ok { +func (ms *MongoStorage) RemoveKeysForPrefix(prefix string) (err error) { + var colName string + switch prefix { + case utils.DestinationPrefix: + colName = ColDst + case utils.REVERSE_DESTINATION_PREFIX: + colName = ColRds + case utils.ACTION_PREFIX: + colName = ColAct + case utils.ACTION_PLAN_PREFIX: + colName = ColApl + case utils.AccountActionPlansPrefix: + colName = ColAAp + case utils.TASKS_KEY: + colName = ColTsk + case utils.ACTION_TRIGGER_PREFIX: + colName = ColAtr + case utils.RATING_PLAN_PREFIX: + colName = ColRpl + case utils.RATING_PROFILE_PREFIX: + colName = ColRpf + case utils.ACCOUNT_PREFIX: + colName = ColAcc + case utils.SHARED_GROUP_PREFIX: + colName = ColShg + case utils.LOADINST_KEY: + colName = ColLht + case utils.VERSION_PREFIX: + colName = ColVer + case utils.TimingsPrefix: + colName = ColTmg + case utils.ResourcesPrefix: + colName = ColRes + case utils.ResourceProfilesPrefix: + colName = ColRsP + case utils.ThresholdProfilePrefix: + colName = ColTps + case utils.StatQueueProfilePrefix: + colName = ColSqp + case utils.ThresholdPrefix: + colName = ColThs + case utils.FilterPrefix: + colName = ColFlt + case utils.SupplierProfilePrefix: + colName = ColSpp + case utils.AttributeProfilePrefix: + colName = ColAttr + default: return utils.ErrInvalidKey } + return ms.query(func(sctx mongo.SessionContext) error { - col := ms.getCol(colName) - if _, err := col.DeleteMany(sctx, bson.M{}); err != nil { - return err - } - var keys []string - switch prefix { - case utils.REVERSE_DESTINATION_PREFIX: - if keys, err = ms.GetKeysForPrefix(utils.DESTINATION_PREFIX); err != nil { - return err - } - for _, key := range keys { - dest, err := ms.GetDestinationDrv(key[len(utils.DESTINATION_PREFIX):], true, utils.NonTransactional) - if err != nil { - return err - } - if err = ms.SetReverseDestinationDrv(dest, utils.NonTransactional); err != nil { - return err - } - } - case utils.AccountActionPlansPrefix: - if keys, err = ms.GetKeysForPrefix(utils.ACTION_PLAN_PREFIX); err != nil { - return err - } - for _, key := range keys { - apl, err := ms.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):], true, utils.NonTransactional) - if err != nil { - return err - } - for acntID := range apl.AccountIDs { - if err = ms.SetAccountActionPlansDrv(acntID, []string{apl.Id}, false); err != nil { - return err - } - } - } - } - return nil - }) -} - -// RemoveReverseForPrefix implementation -func (ms *MongoStorage) RemoveReverseForPrefix(prefix string) (err error) { - if !utils.SliceHasMember([]string{utils.AccountActionPlansPrefix, utils.REVERSE_DESTINATION_PREFIX}, prefix) { - return utils.ErrInvalidKey - } - colName, ok := ms.getColNameForPrefix(prefix) - if !ok { - return utils.ErrInvalidKey - } - return ms.query(func(sctx mongo.SessionContext) error { - col := ms.getCol(colName) - - if dr, err := col.DeleteMany(sctx, bson.M{}); err != nil { - return err - } else if dr.DeletedCount == 0 { - return utils.ErrNotFound - } - - var keys []string - switch prefix { - case utils.REVERSE_DESTINATION_PREFIX: - if keys, err = ms.GetKeysForPrefix(utils.DESTINATION_PREFIX); err != nil { - return err - } - for _, key := range keys { - dest, err := ms.GetDestinationDrv(key[len(utils.DESTINATION_PREFIX):], true, utils.NonTransactional) - if err != nil { - return err - } - if err := ms.RemoveDestinationDrv(dest.Id, utils.NonTransactional); err != nil { - return err - } - } - case utils.AccountActionPlansPrefix: - if keys, err = ms.GetKeysForPrefix(utils.ACTION_PLAN_PREFIX); err != nil { - return err - } - for _, key := range keys { - apl, err := ms.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):], true, utils.NonTransactional) - if err != nil { - return err - } - for acntID := range apl.AccountIDs { - if err = ms.RemAccountActionPlansDrv(acntID, []string{apl.Id}); err != nil { - return err - } - } - } - } - return nil + _, err := ms.getCol(colName).DeleteMany(sctx, bson.M{}) + return err }) } @@ -1350,19 +1304,7 @@ func (ms *MongoStorage) RemoveActionTriggersDrv(key string) error { }) } -func (ms *MongoStorage) GetActionPlanDrv(key string, skipCache bool, - transactionID string) (ats *ActionPlan, err error) { - if !skipCache { - if x, err := Cache.GetCloned(utils.CacheActionPlans, key); err != nil { - if err != ltcache.ErrNotFound { // Only consider cache if item was found - return nil, err - } - } else if x == nil { // item was placed nil in cache - return nil, utils.ErrNotFound - } else { - return x.(*ActionPlan), nil - } - } +func (ms *MongoStorage) GetActionPlanDrv(key string) (ats *ActionPlan, err error) { var kv struct { Key string Value []byte @@ -1371,8 +1313,6 @@ func (ms *MongoStorage) GetActionPlanDrv(key string, skipCache bool, cur := ms.getCol(ColApl).FindOne(sctx, bson.M{"key": key}) if err := cur.Decode(&kv); err != nil { if err == mongo.ErrNoDocuments { - Cache.Set(utils.CacheActionPlans, key, nil, nil, - cacheCommit(transactionID), transactionID) return utils.ErrNotFound } return err @@ -1391,38 +1331,11 @@ func (ms *MongoStorage) GetActionPlanDrv(key string, skipCache bool, return nil, err } r.Close() - if err = ms.ms.Unmarshal(out, &ats); err != nil { - return nil, err - } - Cache.Set(utils.CacheActionPlans, key, ats, nil, - cacheCommit(transactionID), transactionID) + err = ms.ms.Unmarshal(out, &ats) return } -func (ms *MongoStorage) SetActionPlanDrv(key string, ats *ActionPlan, - overwrite bool, transactionID string) (err error) { - // clean dots from account ids map - cCommit := cacheCommit(transactionID) - if len(ats.ActionTimings) == 0 { - err = ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColApl).DeleteOne(sctx, bson.M{"key": key}) - return err - }) - Cache.Remove(utils.CacheActionPlans, key, - cCommit, transactionID) - return - } - if !overwrite { - // get existing action plan to merge the account ids - if existingAts, _ := ms.GetActionPlanDrv(key, true, transactionID); existingAts != nil { - if ats.AccountIDs == nil && len(existingAts.AccountIDs) > 0 { - ats.AccountIDs = make(utils.StringMap) - } - for accID := range existingAts.AccountIDs { - ats.AccountIDs[accID] = true - } - } - } +func (ms *MongoStorage) SetActionPlanDrv(key string, ats *ActionPlan) (err error) { result, err := ms.ms.Marshal(ats) if err != nil { return err @@ -1443,9 +1356,7 @@ func (ms *MongoStorage) SetActionPlanDrv(key string, ats *ActionPlan, }) } -func (ms *MongoStorage) RemoveActionPlanDrv(key string, transactionID string) error { - cCommit := cacheCommit(transactionID) - Cache.Remove(utils.CacheActionPlans, key, cCommit, transactionID) +func (ms *MongoStorage) RemoveActionPlanDrv(key string) error { return ms.query(func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColApl).DeleteOne(sctx, bson.M{"key": key}) return err @@ -1462,8 +1373,7 @@ func (ms *MongoStorage) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err } ats = make(map[string]*ActionPlan, len(keys)) for _, key := range keys { - ap, err := ms.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):], - false, utils.NonTransactional) + ap, err := ms.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):]) if err != nil { return nil, err } @@ -1472,15 +1382,7 @@ func (ms *MongoStorage) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err return } -func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string, skipCache bool, transactionID string) (aPlIDs []string, err error) { - if !skipCache { - if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.([]string), nil - } - } +func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string) (aPlIDs []string, err error) { var kv struct { Key string Value []string @@ -1489,8 +1391,6 @@ func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string, skipCache bool, cur := ms.getCol(ColAAp).FindOne(sctx, bson.M{"key": acntID}) if err := cur.Decode(&kv); err != nil { if err == mongo.ErrNoDocuments { - Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, - cacheCommit(transactionID), transactionID) return utils.ErrNotFound } return err @@ -1500,23 +1400,10 @@ func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string, skipCache bool, return nil, err } aPlIDs = kv.Value - Cache.Set(utils.CacheAccountActionPlans, acntID, aPlIDs, nil, - cacheCommit(transactionID), transactionID) return } -func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string, overwrite bool) (err error) { - if !overwrite { - if oldaPlIDs, err := ms.GetAccountActionPlansDrv(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { - return err - } else { - for _, oldAPid := range oldaPlIDs { - if !utils.IsSliceMember(aPlIDs, oldAPid) { - aPlIDs = append(aPlIDs, oldAPid) - } - } - } - } +func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { return ms.query(func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColAAp).UpdateOne(sctx, bson.M{"key": acntID}, bson.M{"$set": struct { @@ -1530,44 +1417,12 @@ func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string, } // ToDo: check return len(aPlIDs) == 0 -func (ms *MongoStorage) RemAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { - if len(aPlIDs) == 0 { - return ms.query(func(sctx mongo.SessionContext) (err error) { - dr, err := ms.getCol(ColAAp).DeleteOne(sctx, bson.M{"key": acntID}) - if dr.DeletedCount == 0 { - return utils.ErrNotFound - } - return err - }) - } - oldAPlIDs, err := ms.GetAccountActionPlansDrv(acntID, true, utils.NonTransactional) - if err != nil { - return err - } - for i := 0; i < len(oldAPlIDs); { - if utils.IsSliceMember(aPlIDs, oldAPlIDs[i]) { - oldAPlIDs = append(oldAPlIDs[:i], oldAPlIDs[i+1:]...) - continue // if we have stripped, don't increase index so we can check next element by next run - } - i++ - } - if len(oldAPlIDs) == 0 { // no more elements, remove the reference - return ms.query(func(sctx mongo.SessionContext) (err error) { - dr, err := ms.getCol(ColAAp).DeleteOne(sctx, bson.M{"key": acntID}) - if dr.DeletedCount == 0 { - return utils.ErrNotFound - } - return err - }) - } +func (ms *MongoStorage) RemAccountActionPlansDrv(acntID string) (err error) { return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAAp).UpdateOne(sctx, bson.M{"key": acntID}, - bson.M{"$set": struct { - Key string - Value []string - }{Key: acntID, Value: oldAPlIDs}}, - options.Update().SetUpsert(true), - ) + dr, err := ms.getCol(ColAAp).DeleteOne(sctx, bson.M{"key": acntID}) + if dr.DeletedCount == 0 { + return utils.ErrNotFound + } return err }) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 4392f5bfa..f47d467c8 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -31,7 +31,6 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/ltcache" "github.com/mediocregopher/radix.v2/pool" "github.com/mediocregopher/radix.v2/redis" "github.com/mediocregopher/radix.v2/sentinel" @@ -261,10 +260,7 @@ func (rs *RedisStorage) IsDBEmpty() (resp bool, err error) { return true, nil } -func (rs *RedisStorage) RebuildReverseForPrefix(prefix string) (err error) { - if !utils.SliceHasMember([]string{utils.AccountActionPlansPrefix, utils.REVERSE_DESTINATION_PREFIX}, prefix) { - return utils.ErrInvalidKey - } +func (rs *RedisStorage) RemoveKeysForPrefix(prefix string) (err error) { var keys []string keys, err = rs.GetKeysForPrefix(prefix) if err != nil { @@ -275,84 +271,7 @@ func (rs *RedisStorage) RebuildReverseForPrefix(prefix string) (err error) { return } } - switch prefix { - case utils.REVERSE_DESTINATION_PREFIX: - if keys, err = rs.GetKeysForPrefix(utils.DESTINATION_PREFIX); err != nil { - return - } - for _, key := range keys { - dest, err := rs.GetDestinationDrv(key[len(utils.DESTINATION_PREFIX):], true, utils.NonTransactional) - if err != nil { - return err - } - if err = rs.SetReverseDestinationDrv(dest, utils.NonTransactional); err != nil { - return err - } - } - case utils.AccountActionPlansPrefix: - if keys, err = rs.GetKeysForPrefix(utils.ACTION_PLAN_PREFIX); err != nil { - return - } - for _, key := range keys { - apl, err := rs.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):], true, utils.NonTransactional) // skipCache on get since loader checks and caches empty data for loaded objects - if err != nil { - return err - } - for acntID := range apl.AccountIDs { - if err = rs.SetAccountActionPlansDrv(acntID, []string{apl.Id}, false); err != nil { - return err - } - } - } - } - return nil -} - -func (rs *RedisStorage) RemoveReverseForPrefix(prefix string) (err error) { - if !utils.SliceHasMember([]string{utils.AccountActionPlansPrefix, utils.REVERSE_DESTINATION_PREFIX}, prefix) { - return utils.ErrInvalidKey - } - var keys []string - keys, err = rs.GetKeysForPrefix(prefix) - if err != nil { - return - } - for _, key := range keys { - if err = rs.Cmd(redis_DEL, key).Err; err != nil { - return - } - } - switch prefix { - case utils.REVERSE_DESTINATION_PREFIX: - if keys, err = rs.GetKeysForPrefix(utils.DESTINATION_PREFIX); err != nil { - return - } - for _, key := range keys { - dest, err := rs.GetDestinationDrv(key[len(utils.DESTINATION_PREFIX):], true, utils.NonTransactional) - if err != nil { - return err - } - if err := rs.RemoveDestinationDrv(dest.Id, utils.NonTransactional); err != nil { - return err - } - } - case utils.AccountActionPlansPrefix: - if keys, err = rs.GetKeysForPrefix(utils.ACTION_PLAN_PREFIX); err != nil { - return - } - for _, key := range keys { - apl, err := rs.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):], true, utils.NonTransactional) // skipCache on get since loader checks and caches empty data for loaded objects - if err != nil { - return err - } - for acntID := range apl.AccountIDs { - if err = rs.RemAccountActionPlansDrv(acntID, []string{apl.Id}); err != nil { - return err - } - } - } - } - return nil + return } func (rs *RedisStorage) getKeysForFilterIndexesKeys(fkeys []string) (keys []string, err error) { @@ -888,24 +807,10 @@ func (rs *RedisStorage) RemoveActionTriggersDrv(key string) (err error) { return } -func (rs *RedisStorage) GetActionPlanDrv(key string, skipCache bool, - transactionID string) (ats *ActionPlan, err error) { - if !skipCache { - if x, err := Cache.GetCloned(utils.CacheActionPlans, key); err != nil { - if err != ltcache.ErrNotFound { // Only consider cache if item was found - return nil, err - } - } else if x == nil { // item was placed nil in cache - return nil, utils.ErrNotFound - } else { - return x.(*ActionPlan), nil - } - } +func (rs *RedisStorage) GetActionPlanDrv(key string) (ats *ActionPlan, err error) { var values []byte if values, err = rs.Cmd(redis_GET, utils.ACTION_PLAN_PREFIX+key).Bytes(); err != nil { if err == redis.ErrRespNil { // did not find the destination - Cache.Set(utils.CacheActionPlans, key, nil, nil, - cacheCommit(transactionID), transactionID) err = utils.ErrNotFound } return @@ -920,49 +825,18 @@ func (rs *RedisStorage) GetActionPlanDrv(key string, skipCache bool, return nil, err } r.Close() - if err = rs.ms.Unmarshal(out, &ats); err != nil { - return - } - Cache.Set(utils.CacheActionPlans, key, ats, nil, - cacheCommit(transactionID), transactionID) + err = rs.ms.Unmarshal(out, &ats) return } -func (rs *RedisStorage) RemoveActionPlanDrv(key string, - transactionID string) error { - cCommit := cacheCommit(transactionID) + +func (rs *RedisStorage) RemoveActionPlanDrv(key string) error { if err := rs.Cmd(redis_SREM, utils.ActionPlanIndexes, utils.ACTION_PLAN_PREFIX+key).Err; err != nil { return err } - err := rs.Cmd(redis_DEL, utils.ACTION_PLAN_PREFIX+key).Err - Cache.Remove(utils.CacheActionPlans, key, - cCommit, transactionID) - return err + return rs.Cmd(redis_DEL, utils.ACTION_PLAN_PREFIX+key).Err } -func (rs *RedisStorage) SetActionPlanDrv(key string, ats *ActionPlan, - overwrite bool, transactionID string) (err error) { - cCommit := cacheCommit(transactionID) - if len(ats.ActionTimings) == 0 { - // delete the key - if err := rs.Cmd(redis_SREM, utils.ActionPlanIndexes, utils.ACTION_PLAN_PREFIX+key).Err; err != nil { - return err - } - err = rs.Cmd(redis_DEL, utils.ACTION_PLAN_PREFIX+key).Err - Cache.Remove(utils.CacheActionPlans, key, - cCommit, transactionID) - return - } - if !overwrite { - // get existing action plan to merge the account ids - if existingAts, _ := rs.GetActionPlanDrv(key, true, transactionID); existingAts != nil { - if ats.AccountIDs == nil && len(existingAts.AccountIDs) > 0 { - ats.AccountIDs = make(utils.StringMap) - } - for accID := range existingAts.AccountIDs { - ats.AccountIDs[accID] = true - } - } - } +func (rs *RedisStorage) SetActionPlanDrv(key string, ats *ActionPlan) (err error) { var result []byte if result, err = rs.ms.Marshal(ats); err != nil { return @@ -987,8 +861,7 @@ func (rs *RedisStorage) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err } ats = make(map[string]*ActionPlan, len(keys)) for _, key := range keys { - ap, err := rs.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):], - false, utils.NonTransactional) + ap, err := rs.GetActionPlanDrv(key[len(utils.ACTION_PLAN_PREFIX):]) if err != nil { return nil, err } @@ -997,46 +870,20 @@ func (rs *RedisStorage) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err return } -func (rs *RedisStorage) GetAccountActionPlansDrv(acntID string, skipCache bool, - transactionID string) (aPlIDs []string, err error) { - if !skipCache { - if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.([]string), nil - } - } +func (rs *RedisStorage) GetAccountActionPlansDrv(acntID string) (aPlIDs []string, err error) { var values []byte if values, err = rs.Cmd(redis_GET, utils.AccountActionPlansPrefix+acntID).Bytes(); err != nil { if err == redis.ErrRespNil { // did not find the destination - Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, - cacheCommit(transactionID), transactionID) err = utils.ErrNotFound } return } - if err = rs.ms.Unmarshal(values, &aPlIDs); err != nil { - return - } - Cache.Set(utils.CacheAccountActionPlans, acntID, aPlIDs, nil, - cacheCommit(transactionID), transactionID) + err = rs.ms.Unmarshal(values, &aPlIDs) return } -func (rs *RedisStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string, overwrite bool) (err error) { - if !overwrite { - if oldaPlIDs, err := rs.GetAccountActionPlansDrv(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { - return err - } else { - for _, oldAPid := range oldaPlIDs { - if !utils.IsSliceMember(aPlIDs, oldAPid) { - aPlIDs = append(aPlIDs, oldAPid) - } - } - } - } +func (rs *RedisStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { var result []byte if result, err = rs.ms.Marshal(aPlIDs); err != nil { return err @@ -1044,30 +891,8 @@ func (rs *RedisStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string, return rs.Cmd(redis_SET, utils.AccountActionPlansPrefix+acntID, result).Err } -func (rs *RedisStorage) RemAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { - key := utils.AccountActionPlansPrefix + acntID - if len(aPlIDs) == 0 { - return rs.Cmd(redis_DEL, key).Err - } - oldaPlIDs, err := rs.GetAccountActionPlansDrv(acntID, true, utils.NonTransactional) - if err != nil { - return err - } - for i := 0; i < len(oldaPlIDs); { - if utils.IsSliceMember(aPlIDs, oldaPlIDs[i]) { - oldaPlIDs = append(oldaPlIDs[:i], oldaPlIDs[i+1:]...) - continue // if we have stripped, don't increase index so we can check next element by next run - } - i++ - } - if len(oldaPlIDs) == 0 { // no more elements, remove the reference - return rs.Cmd(redis_DEL, key).Err - } - var result []byte - if result, err = rs.ms.Marshal(oldaPlIDs); err != nil { - return err - } - return rs.Cmd(redis_SET, key, result).Err +func (rs *RedisStorage) RemAccountActionPlansDrv(acntID string) (err error) { + return rs.Cmd(redis_DEL, utils.AccountActionPlansPrefix+acntID).Err } func (rs *RedisStorage) PushTask(t *Task) error { diff --git a/engine/tpreader.go b/engine/tpreader.go index a2d316930..e4087eddf 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -749,7 +749,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) if accountAction.ActionPlanId != "" { // get old userBalanceIds exitingAccountIds := make(utils.StringMap) - existingActionPlan, err := tpr.dm.GetActionPlan(accountAction.ActionPlanId, true, utils.NonTransactional) + existingActionPlan, err := tpr.dm.GetActionPlan(accountAction.ActionPlanId, false, false, utils.NonTransactional) if err == nil && existingActionPlan != nil { exitingAccountIds = existingActionPlan.AccountIDs } @@ -1778,7 +1778,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error) if verbose { log.Print("Rebuilding reverse destinations") } - if err = tpr.dm.DataDB().RebuildReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil { + if err = tpr.dm.RebuildReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil { return err } } @@ -1786,7 +1786,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error) if verbose { log.Print("Rebuilding account action plans") } - if err = tpr.dm.DataDB().RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil { + if err = tpr.dm.RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil { return err } } @@ -2274,7 +2274,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro if verbose { log.Print("Removing reverse destinations") } - if err = tpr.dm.DataDB().RemoveReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil { + if err = tpr.dm.DataDB().RemoveKeysForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil { return err } } @@ -2282,7 +2282,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro if verbose { log.Print("Removing account action plans") } - if err = tpr.dm.DataDB().RemoveReverseForPrefix(utils.AccountActionPlansPrefix); err != nil { + if err = tpr.dm.DataDB().RemoveKeysForPrefix(utils.AccountActionPlansPrefix); err != nil { return err } } @@ -2302,6 +2302,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro } if len(tpr.destinations) != 0 { loadIDs[utils.CacheDestinations] = loadID + loadIDs[utils.CacheReverseDestinations] = loadID } if len(tpr.revDests) != 0 { loadIDs[utils.CacheReverseDestinations] = loadID diff --git a/migrator/action_plan.go b/migrator/action_plan.go index d3b7d376b..ed7a278b5 100644 --- a/migrator/action_plan.go +++ b/migrator/action_plan.go @@ -55,7 +55,7 @@ func (m *Migrator) migrateCurrentActionPlans() (err error) { } for _, id := range ids { idg := strings.TrimPrefix(id, utils.ACTION_PLAN_PREFIX) - acts, err := m.dmIN.DataManager().GetActionPlan(idg, true, utils.NonTransactional) + acts, err := m.dmIN.DataManager().GetActionPlan(idg, false, false, utils.NonTransactional) if err != nil { return err } diff --git a/migrator/action_plan_it_test.go b/migrator/action_plan_it_test.go index 46ee0bd6e..e7a92a912 100644 --- a/migrator/action_plan_it_test.go +++ b/migrator/action_plan_it_test.go @@ -245,7 +245,7 @@ func testActPlnITMigrateAndMove(t *testing.T) { if err != nil { t.Error("Error when migrating ActionPlan ", err.Error()) } - result, err := actPlnMigrator.dmOut.DataManager().GetActionPlan((*v1actPln)[0].Id, true, utils.NonTransactional) + result, err := actPlnMigrator.dmOut.DataManager().GetActionPlan((*v1actPln)[0].Id, false, false, utils.NonTransactional) if err != nil { t.Fatal("Error when getting ActionPlan ", err.Error()) } @@ -270,7 +270,7 @@ func testActPlnITMigrateAndMove(t *testing.T) { if err != nil { t.Error("Error when migrating ActionPlan ", err.Error()) } - result, err := actPlnMigrator.dmOut.DataManager().GetActionPlan((*v1actPln)[0].Id, true, utils.NonTransactional) + result, err := actPlnMigrator.dmOut.DataManager().GetActionPlan((*v1actPln)[0].Id, false, false, utils.NonTransactional) if err != nil { t.Error("Error when getting ActionPlan ", err.Error()) } @@ -282,7 +282,7 @@ func testActPlnITMigrateAndMove(t *testing.T) { } else if !reflect.DeepEqual(actPln.ActionTimings[0].Timing, result.ActionTimings[0].Timing) { t.Errorf("Expecting: %+v, received: %+v", actPln.ActionTimings[0].Timing, result.ActionTimings[0].Timing) } - result, err = actPlnMigrator.dmIN.DataManager().GetActionPlan((*v1actPln)[0].Id, true, utils.NonTransactional) + result, err = actPlnMigrator.dmIN.DataManager().GetActionPlan((*v1actPln)[0].Id, false, false, utils.NonTransactional) if err != utils.ErrNotFound { t.Error(err) }