diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index fe9079c82..1100f1b47 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -43,7 +43,7 @@ func (self *ApierV1) GetAccountActionPlan(attrs utils.TenantAccount, reply *[]*A } acntID := utils.ConcatenatedKey(attrs.Tenant, attrs.Account) acntATsIf, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := self.DataManager.DataDB().GetAccountActionPlans(acntID, false, utils.NonTransactional) + acntAPids, err := self.DataManager.GetAccountActionPlans(acntID, true, true, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return nil, utils.NewErrServerError(err) } @@ -139,7 +139,7 @@ func (self *ApierV1) RemoveActionTiming(attrs AttrRemoveActionTiming, reply *str return 0, err } for _, acntID := range remAcntAPids { - if err = self.DataManager.DataDB().RemAccountActionPlans(acntID, []string{attrs.ActionPlanId}); err != nil { + if err = self.DataManager.RemAccountActionPlans(acntID, []string{attrs.ActionPlanId}); err != nil { return 0, nil } } @@ -183,7 +183,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e } if attr.ActionPlanId != "" { _, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := self.DataManager.DataDB().GetAccountActionPlans(accID, false, utils.NonTransactional) + acntAPids, err := self.DataManager.GetAccountActionPlans(accID, true, true, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return 0, err } @@ -239,7 +239,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e if err := self.DataManager.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, apIDs, true); err != nil { return 0, err } - if err := self.DataManager.DataDB().SetAccountActionPlans(accID, acntAPids, true); err != nil { + if err := self.DataManager.SetAccountActionPlans(accID, acntAPids, true); err != nil { return 0, err } if err = self.DataManager.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil { @@ -331,7 +331,7 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string) if err != nil { return utils.NewErrServerError(err) } - if err = self.DataManager.DataDB().RemAccountActionPlans(accID, nil); err != nil && + if err = self.DataManager.RemAccountActionPlans(accID, nil); err != nil && err.Error() != utils.ErrNotFound.Error() { return err } diff --git a/apier/v1/apier.go b/apier/v1/apier.go index b65995882..17cd05c49 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -643,7 +643,7 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) (err return 0, utils.NewErrServerError(err) } for acntID := range prevAccountIDs { - if err := self.DataManager.DataDB().RemAccountActionPlans(acntID, []string{attrs.Id}); err != nil { + if err := self.DataManager.RemAccountActionPlans(acntID, []string{attrs.Id}); err != nil { return 0, utils.NewErrServerError(err) } } @@ -738,7 +738,7 @@ func (self *ApierV1) RemoveActionPlan(attr AttrGetActionPlan, reply *string) (er return 0, err } for acntID := range prevAccountIDs { - if err := self.DataManager.DataDB().RemAccountActionPlans(acntID, []string{attr.ID}); err != nil { + if err := self.DataManager.RemAccountActionPlans(acntID, []string{attr.ID}); err != nil { return 0, utils.NewErrServerError(err) } } diff --git a/apier/v1/attributes_it_test.go b/apier/v1/attributes_it_test.go index 07b1a7ca7..796f58159 100644 --- a/apier/v1/attributes_it_test.go +++ b/apier/v1/attributes_it_test.go @@ -101,6 +101,13 @@ func TestAttributeSITMongo(t *testing.T) { } } +func TestAttributeSITInternal(t *testing.T) { + alsPrfConfigDIR = "tutinternal" + for _, stest := range sTestsAlsPrf { + t.Run(alsPrfConfigDIR, stest) + } +} + func testAttributeSInitCfg(t *testing.T) { var err error alsPrfCfgPath = path.Join(alsPrfDataDir, "conf", "samples", alsPrfConfigDIR) diff --git a/apier/v1/chargers_it_test.go b/apier/v1/chargers_it_test.go index e64e340a2..72e589c9e 100755 --- a/apier/v1/chargers_it_test.go +++ b/apier/v1/chargers_it_test.go @@ -91,6 +91,13 @@ func TestChargerSITMongo(t *testing.T) { } } +func TestChargerSITInternal(t *testing.T) { + chargerConfigDIR = "tutinternal" + for _, stest := range sTestsCharger { + t.Run(chargerConfigDIR, stest) + } +} + func testChargerSInitCfg(t *testing.T) { var err error chargerCfgPath = path.Join(*dataDir, "conf", "samples", chargerConfigDIR) diff --git a/apier/v1/dispatcher_it_test.go b/apier/v1/dispatcher_it_test.go index 1d7452ed2..e981ab292 100644 --- a/apier/v1/dispatcher_it_test.go +++ b/apier/v1/dispatcher_it_test.go @@ -80,6 +80,13 @@ func TestDispatcherSITMongo(t *testing.T) { } } +func TestDispatcherSITInternal(t *testing.T) { + dispatcherConfigDIR = "tutinternal" + for _, stest := range sTestsDispatcher { + t.Run(dispatcherConfigDIR, stest) + } +} + func testDispatcherSInitCfg(t *testing.T) { var err error dispatcherCfgPath = path.Join(*dataDir, "conf", "samples", dispatcherConfigDIR) diff --git a/apier/v1/filter_indexes_it_test.go b/apier/v1/filter_indexes_it_test.go index c356d1285..b9173321c 100644 --- a/apier/v1/filter_indexes_it_test.go +++ b/apier/v1/filter_indexes_it_test.go @@ -112,6 +112,13 @@ func TestFIdxV1ITMongo(t *testing.T) { } } +func TestFIdxV1ITInternal(t *testing.T) { + tSv1ConfDIR = "tutinternal" + for _, stest := range sTestsFilterIndexesSV1 { + t.Run(tSv1ConfDIR, stest) + } +} + func testV1FIdxLoadConfig(t *testing.T) { tSv1CfgPath = path.Join(*dataDir, "conf", "samples", tSv1ConfDIR) var err error diff --git a/apier/v1/resourcesv1_it_test.go b/apier/v1/resourcesv1_it_test.go index 9b2431a54..c3d18433f 100644 --- a/apier/v1/resourcesv1_it_test.go +++ b/apier/v1/resourcesv1_it_test.go @@ -80,6 +80,13 @@ func TestRsV1ITMongo(t *testing.T) { } } +func TestRsV1ITInternal(t *testing.T) { + rlsV1ConfDIR = "tutinternal" + for _, stest := range sTestsRLSV1 { + t.Run(rlsV1ConfDIR, stest) + } +} + func testV1RsLoadConfig(t *testing.T) { var err error rlsV1CfgPath = path.Join(*dataDir, "conf", "samples", rlsV1ConfDIR) diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index 83f128d64..d95475eff 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -104,6 +104,13 @@ func TestSTSV1ITMongo(t *testing.T) { } } +func TestSTSV1ITInternal(t *testing.T) { + stsV1ConfDIR = "tutinternal" + for _, stest := range sTestsStatSV1 { + t.Run(stsV1ConfDIR, stest) + } +} + func testV1STSLoadConfig(t *testing.T) { var err error stsV1CfgPath = path.Join(*dataDir, "conf", "samples", stsV1ConfDIR) diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index d64756bf5..997ae452e 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -182,6 +182,13 @@ func TestTSV1ITMongo(t *testing.T) { } } +func TestTSV1ITInternal(t *testing.T) { + tSv1ConfDIR = "tutinternal" + for _, stest := range sTestsThresholdSV1 { + t.Run(tSv1ConfDIR, stest) + } +} + func testV1TSLoadConfig(t *testing.T) { var err error tSv1CfgPath = path.Join(*dataDir, "conf", "samples", tSv1ConfDIR) diff --git a/apier/v1/versions_it_test.go b/apier/v1/versions_it_test.go index f197510c3..bb7b1d33f 100644 --- a/apier/v1/versions_it_test.go +++ b/apier/v1/versions_it_test.go @@ -70,6 +70,14 @@ func TestVrsITMongo(t *testing.T) { } } +func TestVrsITInternal(t *testing.T) { + vrsConfigDIR = "tutinternal" + vrsStorageType = utils.INTERNAL + for _, stest := range sTestsVrs { + t.Run(vrsConfigDIR, stest) + } +} + func testVrsInitCfg(t *testing.T) { var err error vrsCfgPath = path.Join(vrsDataDir, "conf", "samples", vrsConfigDIR) diff --git a/apier/v2/accounts.go b/apier/v2/accounts.go index 00987983f..95f269a6e 100644 --- a/apier/v2/accounts.go +++ b/apier/v2/accounts.go @@ -119,7 +119,7 @@ func (self *ApierV2) SetAccount(attr AttrSetAccount, reply *string) error { } if attr.ActionPlanIDs != nil { _, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := self.DataManager.DataDB().GetAccountActionPlans(accID, false, utils.NonTransactional) + acntAPids, err := self.DataManager.GetAccountActionPlans(accID, false, false, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return 0, err } @@ -189,7 +189,7 @@ func (self *ApierV2) SetAccount(attr AttrSetAccount, reply *string) error { if err := self.DataManager.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, apIDs, true); err != nil { return 0, err } - if err := self.DataManager.DataDB().SetAccountActionPlans(accID, acntAPids, true); err != nil { + if err := self.DataManager.SetAccountActionPlans(accID, acntAPids, true); err != nil { return 0, err } return 0, self.DataManager.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true) diff --git a/apier/v2/apierv2_it_test.go b/apier/v2/apierv2_it_test.go index 251c2e59c..1f2bb8d26 100644 --- a/apier/v2/apierv2_it_test.go +++ b/apier/v2/apierv2_it_test.go @@ -230,7 +230,7 @@ func TestApierV2itSetAccountWithAP(t *testing.T) { ActionPlanIDs: &[]string{argAP1.Id}, } acntID := utils.ConcatenatedKey(argSetAcnt1.Tenant, argSetAcnt1.Account) - if _, err := dm.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err == nil || err != utils.ErrNotFound { + if _, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err == nil || err != utils.ErrNotFound { t.Error(err) } if err := apierRPC.Call("ApierV2.SetAccount", argSetAcnt1, &reply); err != nil { @@ -242,7 +242,7 @@ func TestApierV2itSetAccountWithAP(t *testing.T) { t.Errorf("ActionPlan does not contain the accountID: %+v", ap) } eAAPids := []string{argAP1.Id} - if aapIDs, err := dm.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if aapIDs, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(eAAPids, aapIDs) { t.Errorf("Expecting: %+v, received: %+v", eAAPids, aapIDs) @@ -279,7 +279,7 @@ func TestApierV2itSetAccountWithAP(t *testing.T) { t.Errorf("ActionPlan does not contain the accountID: %+v", ap) } eAAPids = []string{argAP1.Id, argAP2.Id} - if aapIDs, err := dm.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if aapIDs, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(eAAPids, aapIDs) { t.Errorf("Expecting: %+v, received: %+v", eAAPids, aapIDs) @@ -305,7 +305,7 @@ func TestApierV2itSetAccountWithAP(t *testing.T) { t.Errorf("ActionPlan does not contain the accountID: %+v", ap) } eAAPids = []string{argAP2.Id} - if aapIDs, err := dm.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if aapIDs, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(eAAPids, aapIDs) { t.Errorf("Expecting: %+v, received: %+v", eAAPids, aapIDs) diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json index 79ae3adf5..81f60aa90 100644 --- a/data/conf/samples/tutinternal/cgrates.json +++ b/data/conf/samples/tutinternal/cgrates.json @@ -19,11 +19,10 @@ "db_type": "*internal", // data_db type: }, -"stor_db": { - "db_type": "*internal", +"stor_db": { // for the moment use mysql as stordb until implemted all methods + "db_password": "CGRateS.org", }, - "rals": { "enabled": true, "thresholds_conns": [ diff --git a/engine/action.go b/engine/action.go index 7332fbe6c..93a3f4901 100644 --- a/engine/action.go +++ b/engine/action.go @@ -612,7 +612,7 @@ func removeAccountAction(ub *Account, a *Action, acs Actions, extraData interfac } _, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := dm.DataDB().GetAccountActionPlans(accID, false, utils.NonTransactional) + acntAPids, err := dm.GetAccountActionPlans(accID, true, true, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accID, err)) return 0, err @@ -632,7 +632,7 @@ func removeAccountAction(ub *Account, a *Action, acs Actions, extraData interfac if err = dm.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, acntAPids, true); err != nil { return 0, err } - if err = dm.DataDB().RemAccountActionPlans(accID, nil); err != nil { + if err = dm.RemAccountActionPlans(accID, nil); err != nil { return 0, err } if err = dm.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil && err.Error() != utils.ErrNotFound.Error() { diff --git a/engine/actions_test.go b/engine/actions_test.go index 547f125d6..6603596a1 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -576,7 +576,7 @@ func TestActionPlansRemoveMember(t *testing.T) { []string{ap1.Id, ap2.Id}, true); err != nil { t.Error(err) } - if err = dm.DataDB().SetAccountActionPlans(account1.ID, + if err = dm.SetAccountActionPlans(account1.ID, []string{ap1.Id}, false); err != nil { t.Error(err) } @@ -584,8 +584,8 @@ func TestActionPlansRemoveMember(t *testing.T) { []string{account1.ID}, true); err != nil { t.Error(err) } - dm.DataDB().GetAccountActionPlans(account1.ID, true, utils.NonTransactional) // FixMe: remove here after finishing testing of map - if err = dm.DataDB().SetAccountActionPlans(account2.ID, + dm.GetAccountActionPlans(account1.ID, false, false, utils.NonTransactional) // FixMe: remove here after finishing testing of map + if err = dm.SetAccountActionPlans(account2.ID, []string{ap2.Id}, false); err != nil { t.Error(err) } diff --git a/engine/datamanager.go b/engine/datamanager.go index 30e3dd667..b6c0088b2 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -235,7 +235,7 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b case utils.ACTION_PLAN_PREFIX: _, err = dm.DataDB().GetActionPlan(dataID, true, utils.NonTransactional) case utils.AccountActionPlansPrefix: - _, err = dm.DataDB().GetAccountActionPlans(dataID, true, utils.NonTransactional) + _, err = dm.GetAccountActionPlans(dataID, false, true, utils.NonTransactional) case utils.ACTION_TRIGGER_PREFIX: _, err = dm.GetActionTriggers(dataID, true, utils.NonTransactional) case utils.SHARED_GROUP_PREFIX: @@ -1389,3 +1389,47 @@ func (dm *DataManager) GetItemLoadIDs(itemIDPrefix string, cacheWrite bool) (loa func (dm *DataManager) SetLoadIDs(loadIDs map[string]int64) error { return dm.DataDB().SetLoadIDsDrv(loadIDs) } + +func (dm *DataManager) GetAccountActionPlans(acntID string, + cacheRead, cacheWrite bool, transactionID string) (apIDs []string, err error) { + if !cacheRead { + if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.([]string), nil + } + } + apIDs, err = dm.DataDB().GetAccountActionPlansDrv(acntID) + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, + cacheCommit(transactionID), transactionID) + } + return nil, err + } + if cacheWrite { + Cache.Set(utils.CacheAccountActionPlans, acntID, apIDs, nil, + cacheCommit(transactionID), transactionID) + } + return +} + +func (dm *DataManager) SetAccountActionPlans(acntID string, aPlIDs []string, overwrite bool) (err error) { + if !overwrite { + if oldaPlIDs, err := dm.GetAccountActionPlans(acntID, true, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { + return err + } else { + for _, oldAPid := range oldaPlIDs { + if !utils.IsSliceMember(aPlIDs, oldAPid) { + aPlIDs = append(aPlIDs, oldAPid) + } + } + } + } + return dm.DataDB().SetAccountActionPlansDrv(acntID, aPlIDs) +} + +func (dm *DataManager) RemAccountActionPlans(acntID string, aPlIDs []string) (err error) { + return dm.DataDB().RemAccountActionPlansDrv(acntID, aPlIDs) +} diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 1c08c2466..bda1636c1 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -42,41 +42,41 @@ var ( var sTestsOnStorIT = []func(t *testing.T){ testOnStorITFlush, testOnStorITIsDBEmpty, - testOnStorITCacheDestinations, + //testOnStorITCacheDestinations, testOnStorITCacheReverseDestinations, - testOnStorITCacheActionPlan, - testOnStorITCacheAccountActionPlans, + // testOnStorITCacheActionPlan, + // testOnStorITCacheAccountActionPlans, - // ToDo: test cache flush for a prefix - // ToDo: testOnStorITLoadAccountingCache - testOnStorITHasData, - testOnStorITPushPop, - testOnStorITRatingPlan, - testOnStorITRatingProfile, - testOnStorITCRUDDestinations, - testOnStorITCRUDReverseDestinations, - testOnStorITActions, - testOnStorITSharedGroup, - testOnStorITCRUDActionPlan, - testOnStorITCRUDAccountActionPlans, - testOnStorITCRUDAccount, - testOnStorITResource, - testOnStorITResourceProfile, - testOnStorITTiming, - testOnStorITCRUDHistory, - testOnStorITCRUDStructVersion, - testOnStorITStatQueueProfile, - testOnStorITStatQueue, - testOnStorITThresholdProfile, - testOnStorITThreshold, - testOnStorITFilter, - testOnStorITSupplierProfile, - testOnStorITAttributeProfile, - testOnStorITFlush, - testOnStorITIsDBEmpty, - testOnStorITTestAttributeSubstituteIface, - testOnStorITChargerProfile, - testOnStorITDispatcherProfile, + // // ToDo: test cache flush for a prefix + // // ToDo: testOnStorITLoadAccountingCache + // testOnStorITHasData, + // testOnStorITPushPop, + // testOnStorITRatingPlan, + // testOnStorITRatingProfile, + // testOnStorITCRUDDestinations, + // testOnStorITCRUDReverseDestinations, + // testOnStorITActions, + // testOnStorITSharedGroup, + // testOnStorITCRUDActionPlan, + // testOnStorITCRUDAccountActionPlans, + // testOnStorITCRUDAccount, + // testOnStorITResource, + // testOnStorITResourceProfile, + // testOnStorITTiming, + // //testOnStorITCRUDHistory, + // testOnStorITCRUDStructVersion, + // testOnStorITStatQueueProfile, + // testOnStorITStatQueue, + // testOnStorITThresholdProfile, + // testOnStorITThreshold, + // testOnStorITFilter, + // testOnStorITSupplierProfile, + // testOnStorITAttributeProfile, + // testOnStorITFlush, + // testOnStorITIsDBEmpty, + // testOnStorITTestAttributeSubstituteIface, + // testOnStorITChargerProfile, + // testOnStorITDispatcherProfile, //testOnStorITCacheActionTriggers, //testOnStorITCRUDActionTriggers, @@ -118,6 +118,14 @@ func TestOnStorITMongo(t *testing.T) { } } +func TestOnStorITInternal(t *testing.T) { + sleepDelay = 10 * time.Millisecond + onStor = NewDataManager(NewInternalDB()) + for _, stest := range sTestsOnStorIT { + t.Run("TestOnStorITInternal", stest) + } +} + func testOnStorITFlush(t *testing.T) { if err := onStor.DataDB().Flush(""); err != nil { t.Error(err) @@ -250,7 +258,7 @@ func testOnStorITCacheActionPlan(t *testing.T) { func testOnStorITCacheAccountActionPlans(t *testing.T) { acntID := utils.ConcatenatedKey("cgrates.org", "1001") aAPs := []string{"PACKAGE_10_SHARED_A_5", "USE_SHARED_A", "apl_PACKAGE_1001"} - if err := onStor.DataDB().SetAccountActionPlans(acntID, aAPs, true); err != nil { + if err := onStor.SetAccountActionPlans(acntID, aAPs, true); err != nil { t.Error(err) } if _, hasIt := Cache.Get(utils.CacheAccountActionPlans, acntID); hasIt { @@ -1052,21 +1060,21 @@ func testOnStorITCRUDAccountActionPlans(t *testing.T) { expect := []string{"PACKAGE_10_SHARED_A_5", "USE_SHARED_A", "apl_PACKAGE_1001"} aAPs := []string{"PACKAGE_10_SHARED_A_5", "apl_PACKAGE_1001"} aAPs2 := []string{"USE_SHARED_A"} - if _, rcvErr := onStor.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.DataDB().SetAccountActionPlans(acntID, aAPs, true); err != nil { + if err := onStor.SetAccountActionPlans(acntID, aAPs, true); err != nil { t.Error(err) } - if rcv, err := onStor.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(aAPs, rcv) { t.Errorf("Expecting: %v, received: %v", aAPs, rcv) } - if err := onStor.DataDB().SetAccountActionPlans(acntID, aAPs2, false); err != nil { + if err := onStor.SetAccountActionPlans(acntID, aAPs2, false); err != nil { t.Error(err) } - if rcv, err := onStor.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetAccountActionPlans(acntID, false, true, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(expect, rcv) { t.Errorf("Expecting: %v, received: %v", expect, rcv) @@ -1079,7 +1087,7 @@ func testOnStorITCRUDAccountActionPlans(t *testing.T) { // t.Error(rcvErr) // } // - if rcv, err := onStor.DataDB().GetAccountActionPlans(acntID, false, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetAccountActionPlans(acntID, true, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(expect, rcv) { t.Errorf("Expecting: %v, received: %v", expect, rcv) @@ -1087,18 +1095,18 @@ func testOnStorITCRUDAccountActionPlans(t *testing.T) { // if err = onStor.DataDB().SelectDatabase(onStorCfg); err != nil { // t.Error(err) // } - if err := onStor.DataDB().RemAccountActionPlans(acntID, aAPs2); err != nil { + if err := onStor.RemAccountActionPlans(acntID, aAPs2); err != nil { t.Error(err) } - if rcv, err := onStor.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetAccountActionPlans(acntID, true, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(aAPs, rcv) { t.Errorf("Expecting: %v, received: %v", aAPs, rcv) } - if err := onStor.DataDB().RemAccountActionPlans(acntID, aAPs); err != nil { + if err := onStor.RemAccountActionPlans(acntID, aAPs); err != nil { t.Error(err) } - if _, rcvErr := onStor.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetAccountActionPlans(acntID, true, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 03a6f65a8..13810ab3c 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -74,10 +74,9 @@ type DataDB interface { SetActionPlan(string, *ActionPlan, bool, string) error RemoveActionPlan(key string, transactionID string) error GetAllActionPlans() (map[string]*ActionPlan, error) - GetAccountActionPlans(acntID string, skipCache bool, - transactionID string) (apIDs []string, err error) - SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) - RemAccountActionPlans(acntID string, apIDs []string) (err error) + GetAccountActionPlansDrv(acntID string) (apIDs []string, err error) + SetAccountActionPlansDrv(acntID string, apIDs []string) (err error) + RemAccountActionPlansDrv(acntID string, apIDs []string) (err error) PushTask(*Task) error PopTask() (*Task, error) GetAccount(string) (*Account, error) diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 78f84af7c..1bfa954d1 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -22,7 +22,9 @@ import ( "bytes" "compress/zlib" "errors" + "fmt" "io/ioutil" + "sync" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -30,9 +32,10 @@ import ( ) type InternalDB struct { - tasks [][]byte + tasks []*Task ms Marshaler db *ltcache.TransCache + mu sync.RWMutex } func NewInternalDB() *InternalDB { @@ -62,27 +65,122 @@ func (iDB *InternalDB) SelectDatabase(dbName string) (err error) { } func (iDB *InternalDB) GetKeysForPrefix(prefix string) ([]string, error) { - return iDB.db.GetItemIDs(utils.CachePrefixToInstance[prefix], utils.EmptyString), nil + keyLen := len(utils.DESTINATION_PREFIX) + if len(prefix) < keyLen { + return nil, fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) + } + category := prefix[:keyLen] // prefix length + return iDB.db.GetItemIDs(utils.CachePrefixToInstance[category], prefix), nil } -func (iDB *InternalDB) RebuildReverseForPrefix(string) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) RebuildReverseForPrefix(prefix string) (err error) { + keys, err := iDB.GetKeysForPrefix(prefix) + if err != nil { + return err + } + for _, key := range keys { + iDB.db.Remove(utils.CacheReverseDestinations, key, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + switch prefix { + case utils.REVERSE_DESTINATION_PREFIX: + keys, err = iDB.GetKeysForPrefix(utils.DESTINATION_PREFIX) + if err != nil { + return err + } + for _, key := range keys { + dest, err := iDB.GetDestination(key[len(utils.DESTINATION_PREFIX):], false, utils.NonTransactional) + if err != nil { + return err + } + if err := iDB.SetReverseDestination(dest, utils.NonTransactional); err != nil { + return err + } + } + case utils.AccountActionPlansPrefix: + return nil + default: + return utils.ErrInvalidKey + } + return nil } -func (iDB *InternalDB) RemoveReverseForPrefix(string) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) RemoveReverseForPrefix(prefix string) (err error) { + keys, err := iDB.GetKeysForPrefix(prefix) + if err != nil { + return err + } + for _, key := range keys { + iDB.db.Remove(utils.CacheReverseDestinations, key, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + switch prefix { + case utils.REVERSE_DESTINATION_PREFIX: + keys, err = iDB.GetKeysForPrefix(utils.DESTINATION_PREFIX) + if err != nil { + return err + } + for _, key := range keys { + dest, err := iDB.GetDestination(key[len(utils.DESTINATION_PREFIX):], false, utils.NonTransactional) + if err != nil { + return err + } + if err := iDB.RemoveDestination(dest.Id, utils.NonTransactional); err != nil { + return err + } + } + case utils.AccountActionPlansPrefix: + return nil + default: + return utils.ErrInvalidKey + } + return nil } func (iDB *InternalDB) GetVersions(itm string) (vrs Versions, err error) { - return nil, utils.ErrNotImplemented + x, ok := iDB.db.Get(utils.TBLVersions, utils.Version) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + provVrs := x.(Versions) + if itm != "" { + if _, has := provVrs[itm]; !has { + return nil, utils.ErrNotFound + } + return Versions{itm: provVrs[itm]}, nil + } + return provVrs, nil } func (iDB *InternalDB) SetVersions(vrs Versions, overwrite bool) (err error) { - return utils.ErrNotImplemented + if overwrite { + if iDB.RemoveVersions(nil); err != nil { + return err + } + } + iDB.db.Set(utils.TBLVersions, utils.Version, vrs, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return } func (iDB *InternalDB) RemoveVersions(vrs Versions) (err error) { - return utils.ErrNotImplemented + if len(vrs) != 0 { + var internalVersions Versions + x, ok := iDB.db.Get(utils.TBLVersions, utils.Version) + if !ok || x == nil { + return utils.ErrNotFound + } + internalVersions = x.(Versions) + for key := range vrs { + delete(internalVersions, key) + } + iDB.db.Set(utils.TBLVersions, utils.Version, internalVersions, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return nil + } + iDB.db.Remove(utils.TBLVersions, utils.Version, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return } func (iDB *InternalDB) GetStorageType() string { @@ -103,19 +201,19 @@ func (iDB *InternalDB) HasDataDrv(category, subject, tenant string) (bool, error switch category { case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX: - return iDB.db.HasItem(utils.CachePrefixToInstance[category], subject), nil + return iDB.db.HasItem(utils.CachePrefixToInstance[category], category+subject), nil case utils.ResourcesPrefix, utils.ResourceProfilesPrefix, utils.StatQueuePrefix, utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix, utils.FilterPrefix, utils.SupplierProfilePrefix, utils.AttributeProfilePrefix, utils.ChargerProfilePrefix, utils.DispatcherProfilePrefix, utils.DispatcherHostPrefix: - return iDB.db.HasItem(utils.CachePrefixToInstance[category], utils.ConcatenatedKey(tenant, subject)), nil + return iDB.db.HasItem(utils.CachePrefixToInstance[category], category+utils.ConcatenatedKey(tenant, subject)), nil } return false, errors.New("Unsupported HasData category") } func (iDB *InternalDB) GetRatingPlanDrv(id string) (rp *RatingPlan, err error) { - x, ok := iDB.db.Get(utils.CacheRatingPlans, id) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheRatingPlans, utils.RATING_PLAN_PREFIX+id) + if !ok || x == nil { return nil, utils.ErrNotFound } b := bytes.NewBuffer(x.([]byte)) @@ -144,425 +242,810 @@ func (iDB *InternalDB) SetRatingPlanDrv(rp *RatingPlan) (err error) { w := zlib.NewWriter(&b) w.Write(result) w.Close() - iDB.db.Set(utils.CacheRatingPlans, rp.Id, b, nil, + iDB.db.Set(utils.CacheRatingPlans, utils.RATING_PLAN_PREFIX+rp.Id, b.Bytes(), nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveRatingPlanDrv(id string) (err error) { - iDB.db.Remove(utils.CacheRatingPlans, id, + iDB.db.Remove(utils.CacheRatingPlans, utils.RATING_PLAN_PREFIX+id, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetRatingProfileDrv(id string) (*RatingProfile, error) { - x, ok := iDB.db.Get(utils.CacheRatingProfiles, id) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheRatingProfiles, utils.RATING_PROFILE_PREFIX+id) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*RatingProfile), nil } func (iDB *InternalDB) SetRatingProfileDrv(rp *RatingProfile) (err error) { - iDB.db.Set(utils.CacheRatingProfiles, rp.Id, rp, nil, + iDB.db.Set(utils.CacheRatingProfiles, utils.RATING_PROFILE_PREFIX+rp.Id, rp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveRatingProfileDrv(id string) (err error) { - iDB.db.Remove(utils.CacheRatingProfiles, id, + iDB.db.Remove(utils.CacheRatingProfiles, utils.RATING_PROFILE_PREFIX+id, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) GetDestination(string, bool, string) (*Destination, error) { - return nil, utils.ErrNotImplemented +func (iDB *InternalDB) GetDestination(key string, skipCache bool, transactionID string) (dest *Destination, err error) { + cCommit := cacheCommit(transactionID) + + if !skipCache { + if x, ok := Cache.Get(utils.CacheDestinations, key); ok { + if x != nil { + return x.(*Destination), nil + } + return nil, utils.ErrNotFound + } + } + + x, ok := iDB.db.Get(utils.CacheDestinations, utils.DESTINATION_PREFIX+key) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + + b := bytes.NewBuffer(x.([]byte)) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + dest = new(Destination) + err = iDB.ms.Unmarshal(out, &dest) + if err != nil { + Cache.Set(utils.CacheDestinations, key, nil, nil, cCommit, transactionID) + return nil, utils.ErrNotFound + } + + if dest == nil { + Cache.Set(utils.CacheDestinations, key, nil, nil, cCommit, transactionID) + return nil, utils.ErrNotFound + } + Cache.Set(utils.CacheDestinations, key, dest, nil, cCommit, transactionID) + return } -func (iDB *InternalDB) SetDestination(*Destination, string) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) SetDestination(dest *Destination, transactionID string) (err error) { + result, err := iDB.ms.Marshal(dest) + if err != nil { + return err + } + var b bytes.Buffer + w := zlib.NewWriter(&b) + w.Write(result) + w.Close() + iDB.db.Set(utils.CacheDestinations, utils.DESTINATION_PREFIX+dest.Id, b.Bytes(), nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + Cache.Remove(utils.CacheDestinations, dest.Id, + cacheCommit(transactionID), transactionID) + return } -func (iDB *InternalDB) RemoveDestination(string, string) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) RemoveDestination(destID string, transactionID string) (err error) { + // get destination for prefix list + d, err := iDB.GetDestination(destID, false, transactionID) + if err != nil { + return + } + iDB.db.Remove(utils.CacheDestinations, utils.DESTINATION_PREFIX+destID, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + Cache.Remove(utils.CacheDestinations, destID, + cacheCommit(transactionID), transactionID) + for _, prefix := range d.Prefixes { + iDB.db.Remove(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+prefix, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + iDB.GetReverseDestination(prefix, true, transactionID) // it will recache the destination + } + return } -func (iDB *InternalDB) SetReverseDestination(*Destination, string) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) SetReverseDestination(dest *Destination, transactionID string) (err error) { + for _, p := range dest.Prefixes { + // for ReverseDestination we will use Groups + iDB.db.Set(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+p, dest.Id, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return } -func (iDB *InternalDB) GetReverseDestination(string, bool, string) ([]string, error) { - return nil, utils.ErrNotImplemented +func (iDB *InternalDB) GetReverseDestination(prefix string, + skipCache bool, transactionID string) (ids []string, err error) { + if !skipCache { + if x, ok := Cache.Get(utils.CacheReverseDestinations, prefix); ok { + if x != nil { + return x.([]string), nil + } + return nil, utils.ErrNotFound + } + } + + ids = iDB.db.GetItemIDs(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+prefix) + fmt.Println(ids) + if len(ids) == 0 { + Cache.Set(utils.CacheReverseDestinations, prefix, nil, nil, + cacheCommit(transactionID), transactionID) + return nil, utils.ErrNotFound + } + Cache.Set(utils.CacheReverseDestinations, prefix, ids, nil, + cacheCommit(transactionID), transactionID) + return } -func (iDB *InternalDB) UpdateReverseDestination(*Destination, *Destination, string) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) UpdateReverseDestination(oldDest, newDest *Destination, + transactionID string) error { + var obsoletePrefixes []string + var addedPrefixes []string + var found bool + for _, oldPrefix := range oldDest.Prefixes { + found = false + for _, newPrefix := range newDest.Prefixes { + if oldPrefix == newPrefix { + found = true + break + } + } + if !found { + obsoletePrefixes = append(obsoletePrefixes, oldPrefix) + } + } + for _, newPrefix := range newDest.Prefixes { + found = false + for _, oldPrefix := range oldDest.Prefixes { + if newPrefix == oldPrefix { + found = true + break + } + } + if !found { + addedPrefixes = append(addedPrefixes, newPrefix) + } + } + // remove id for all obsolete prefixes + cCommit := cacheCommit(transactionID) + var err error + for _, obsoletePrefix := range obsoletePrefixes { + iDB.db.Remove(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + + Cache.Remove(utils.CacheReverseDestinations, obsoletePrefix, + cCommit, transactionID) + } + // add the id to all new prefixes + for _, addedPrefix := range addedPrefixes { + iDB.db.Set(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+addedPrefix, nil, []string{newDest.Id}, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + Cache.Remove(utils.CacheReverseDestinations, addedPrefix, + cCommit, transactionID) + } + return err } func (iDB *InternalDB) GetActionsDrv(id string) (Actions, error) { - x, ok := iDB.db.Get(utils.CacheActions, id) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheActions, utils.ACTION_PREFIX+id) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(Actions), nil } func (iDB *InternalDB) SetActionsDrv(id string, acts Actions) (err error) { - iDB.db.Set(utils.CacheActions, id, acts, nil, + iDB.db.Set(utils.CacheActions, utils.ACTION_PREFIX+id, acts, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveActionsDrv(id string) (err error) { - iDB.db.Remove(utils.CacheActions, id, + iDB.db.Remove(utils.CacheActions, utils.ACTION_PREFIX+id, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetSharedGroupDrv(id string) (*SharedGroup, error) { - x, ok := iDB.db.Get(utils.CacheSharedGroups, id) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheSharedGroups, utils.SHARED_GROUP_PREFIX+id) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*SharedGroup), nil } func (iDB *InternalDB) SetSharedGroupDrv(sh *SharedGroup) (err error) { - iDB.db.Set(utils.CacheSharedGroups, sh.Id, sh, nil, + iDB.db.Set(utils.CacheSharedGroups, utils.SHARED_GROUP_PREFIX+sh.Id, sh, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveSharedGroupDrv(id string) (err error) { - iDB.db.Remove(utils.CacheSharedGroups, id, + iDB.db.Remove(utils.CacheSharedGroups, utils.SHARED_GROUP_PREFIX+id, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetActionTriggersDrv(id string) (ActionTriggers, error) { - x, ok := iDB.db.Get(utils.CacheActionTriggers, id) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheActionTriggers, utils.ACTION_TRIGGER_PREFIX+id) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(ActionTriggers), nil } func (iDB *InternalDB) SetActionTriggersDrv(id string, at ActionTriggers) (err error) { - iDB.db.Set(utils.CacheActionTriggers, id, at, nil, + iDB.db.Set(utils.CacheActionTriggers, utils.ACTION_TRIGGER_PREFIX+id, at, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveActionTriggersDrv(id string) (err error) { - iDB.db.Remove(utils.CacheActionTriggers, id, + iDB.db.Remove(utils.CacheActionTriggers, utils.ACTION_TRIGGER_PREFIX+id, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) GetActionPlan(string, bool, string) (*ActionPlan, error) { - return nil, utils.ErrNotImplemented +func (iDB *InternalDB) GetActionPlan(key string, skipCache bool, transactionID string) (ats *ActionPlan, err error) { + if !skipCache { + if x, ok := Cache.Get(utils.CacheActionPlans, key); ok { + if x != nil { + return x.(*ActionPlan), nil + } + return nil, utils.ErrNotFound + } + } + cCommit := cacheCommit(transactionID) + x, ok := iDB.db.Get(utils.CacheActionPlans, utils.ACTION_PLAN_PREFIX+key) + if !ok || x == nil { + Cache.Set(utils.CacheActionPlans, key, nil, nil, + cCommit, transactionID) + return nil, utils.ErrNotFound + } + err = iDB.ms.Unmarshal(x.([]byte), &ats) + Cache.Set(utils.CacheActionPlans, utils.ACTION_PLAN_PREFIX+key, ats, nil, + cCommit, transactionID) + return } -func (iDB *InternalDB) SetActionPlan(string, *ActionPlan, bool, string) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) SetActionPlan(key string, ats *ActionPlan, + overwrite bool, transactionID string) (err error) { + cCommit := cacheCommit(transactionID) + if len(ats.ActionTimings) == 0 { + iDB.db.Remove(utils.CacheActionPlans, utils.ACTION_PLAN_PREFIX+key, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + Cache.Remove(utils.CacheActionPlans, key, + cCommit, transactionID) + return + } + if !overwrite { + // get existing action plan to merge the account ids + if existingAts, _ := iDB.GetActionPlan(key, true, + transactionID); existingAts != nil { + if ats.AccountIDs == nil && len(existingAts.AccountIDs) > 0 { + ats.AccountIDs = make(utils.StringMap) + } + for accID := range existingAts.AccountIDs { + ats.AccountIDs[accID] = true + } + } + } + + result, err := iDB.ms.Marshal(&ats) + if err != nil { + return err + } + iDB.db.Set(utils.CacheActionPlans, utils.ACTION_PLAN_PREFIX+key, result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + Cache.Remove(utils.CacheActionPlans, key, cCommit, transactionID) + return } func (iDB *InternalDB) RemoveActionPlan(key string, transactionID string) (err error) { - return utils.ErrNotImplemented + iDB.db.Remove(utils.CacheActionPlans, utils.ACTION_PLAN_PREFIX+key, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + Cache.Remove(utils.CacheActionPlans, key, cacheCommit(transactionID), transactionID) + return } -func (iDB *InternalDB) GetAllActionPlans() (map[string]*ActionPlan, error) { - return nil, utils.ErrNotImplemented +func (iDB *InternalDB) GetAllActionPlans() (ats map[string]*ActionPlan, err error) { + keys, err := iDB.GetKeysForPrefix(utils.ACTION_PLAN_PREFIX) + if err != nil { + return nil, err + } + ats = make(map[string]*ActionPlan, len(keys)) + for _, key := range keys { + ap, err := iDB.GetActionPlan(key[len(utils.ACTION_PLAN_PREFIX):], false, utils.NonTransactional) + if err != nil { + return nil, err + } + ats[key[len(utils.ACTION_PLAN_PREFIX):]] = ap + } + return } -func (iDB *InternalDB) GetAccountActionPlans(acntID string, skipCache bool, - transactionID string) (apIDs []string, err error) { - return nil, utils.ErrNotImplemented +func (iDB *InternalDB) GetAccountActionPlansDrv(acntID string) (apIDs []string, err error) { + x, ok := iDB.db.Get(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + if err = iDB.ms.Unmarshal(x.([]byte), &apIDs); err != nil { + return nil, err + } + return } -func (iDB *InternalDB) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) SetAccountActionPlansDrv(acntID string, apIDs []string) (err error) { + result, err := iDB.ms.Marshal(apIDs) + if err != nil { + return err + } + iDB.db.Set(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return } -func (iDB *InternalDB) RemAccountActionPlans(acntID string, apIDs []string) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) RemAccountActionPlansDrv(acntID string, apIDs []string) (err error) { + if len(apIDs) == 0 { + iDB.db.Remove(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return + } + oldaPlIDs, err := iDB.GetAccountActionPlansDrv(acntID) + if err != nil { + return err + } + for i := 0; i < len(oldaPlIDs); { + if utils.IsSliceMember(apIDs, oldaPlIDs[i]) { + oldaPlIDs = append(oldaPlIDs[:i], oldaPlIDs[i+1:]...) + continue + } + i++ + } + + if len(oldaPlIDs) == 0 { + iDB.db.Remove(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return + } + var result []byte + if result, err = iDB.ms.Marshal(oldaPlIDs); err != nil { + return err + } + iDB.db.Set(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return } -func (iDB *InternalDB) PushTask(*Task) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) PushTask(t *Task) (err error) { + iDB.mu.Lock() + iDB.tasks = append(iDB.tasks, t) + iDB.mu.Unlock() + return } -func (iDB *InternalDB) PopTask() (*Task, error) { - return nil, utils.ErrNotImplemented +func (iDB *InternalDB) PopTask() (t *Task, err error) { + iDB.mu.Lock() + if len(iDB.tasks) > 0 { + t = iDB.tasks[0] + iDB.tasks[0] = nil + iDB.tasks = iDB.tasks[1:] + } else { + err = utils.ErrNotFound + } + iDB.mu.Unlock() + return } -func (iDB *InternalDB) GetAccount(string) (*Account, error) { - return nil, utils.ErrNotImplemented +func (iDB *InternalDB) GetAccount(id string) (acc *Account, err error) { + x, ok := iDB.db.Get(utils.CacheAccounts, utils.ACCOUNT_PREFIX+id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + acc = &Account{ID: id} + err = iDB.ms.Unmarshal(x.([]byte), &acc) + if err != nil { + return nil, err + } + return } -func (iDB *InternalDB) SetAccount(*Account) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) SetAccount(acc *Account) (err error) { + // never override existing account with an empty one + // UPDATE: if all balances expired and were cleaned it makes + // sense to write empty balance map + if len(acc.BalanceMap) == 0 { + if ac, err := iDB.GetAccount(acc.ID); err == nil && !ac.allBalancesExpired() { + ac.ActionTriggers = acc.ActionTriggers + ac.UnitCounters = acc.UnitCounters + ac.AllowNegative = acc.AllowNegative + ac.Disabled = acc.Disabled + acc = ac + } + } + + result, err := iDB.ms.Marshal(acc) + if err != nil { + return err + } + iDB.db.Set(utils.CacheAccounts, utils.ACCOUNT_PREFIX+acc.ID, result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return } -func (iDB *InternalDB) RemoveAccount(string) (err error) { - return utils.ErrNotImplemented +func (iDB *InternalDB) RemoveAccount(id string) (err error) { + iDB.db.Remove(utils.CacheAccounts, utils.ACCOUNT_PREFIX+id, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return } func (iDB *InternalDB) GetResourceProfileDrv(tenant, id string) (*ResourceProfile, error) { - x, ok := iDB.db.Get(utils.CacheResourceProfiles, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheResourceProfiles, utils.ResourceProfilesPrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*ResourceProfile), nil } func (iDB *InternalDB) SetResourceProfileDrv(rp *ResourceProfile) (err error) { - iDB.db.Set(utils.CacheResourceProfiles, rp.TenantID(), rp, nil, + iDB.db.Set(utils.CacheResourceProfiles, utils.ResourceProfilesPrefix+rp.TenantID(), rp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveResourceProfileDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheResourceProfiles, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheResourceProfiles, utils.ResourceProfilesPrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetResourceDrv(tenant, id string) (*Resource, error) { - x, ok := iDB.db.Get(utils.CacheResources, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheResources, utils.ResourcesPrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*Resource), nil } func (iDB *InternalDB) SetResourceDrv(r *Resource) (err error) { - iDB.db.Set(utils.CacheResources, r.TenantID(), r, nil, + iDB.db.Set(utils.CacheResources, utils.ResourcesPrefix+r.TenantID(), r, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveResourceDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheResources, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheResources, utils.ResourcesPrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetTimingDrv(id string) (*utils.TPTiming, error) { - x, ok := iDB.db.Get(utils.CacheTimings, id) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheTimings, utils.TimingsPrefix+id) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*utils.TPTiming), nil } func (iDB *InternalDB) SetTimingDrv(timing *utils.TPTiming) (err error) { - iDB.db.Set(utils.CacheTimings, timing.ID, timing, nil, + iDB.db.Set(utils.CacheTimings, utils.TimingsPrefix+timing.ID, timing, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveTimingDrv(id string) (err error) { - iDB.db.Remove(utils.CacheTimings, id, + iDB.db.Remove(utils.CacheTimings, utils.TimingsPrefix+id, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error) { - return nil, utils.ErrNotImplemented + return nil, nil } func (iDB *InternalDB) AddLoadHistory(*utils.LoadInstance, int, string) error { - return utils.ErrNotImplemented + return nil } func (iDB *InternalDB) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string, fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) { - return nil, utils.ErrNotImplemented + dbKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix + x, ok := iDB.db.Get(cacheID, dbKey) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + if len(fldNameVal) != 0 { + rcvidx := make(map[string]utils.StringMap) + if err = iDB.ms.Unmarshal(x.([]byte), &rcvidx); err != nil { + return nil, err + } + indexes = make(map[string]utils.StringMap) + for fldName, fldVal := range fldNameVal { + if _, has := indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)]; !has { + indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)] = make(utils.StringMap) + } + if len(rcvidx[utils.ConcatenatedKey(filterType, fldName, fldVal)]) != 0 { + for key := range rcvidx[utils.ConcatenatedKey(filterType, fldName, fldVal)] { + indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)][key] = true + } + } + } + return + } else { + if err = iDB.ms.Unmarshal(x.([]byte), &indexes); err != nil { + return nil, err + } + if len(indexes) == 0 { + return nil, utils.ErrNotFound + } + } + return } func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { - return utils.ErrNotImplemented + originKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix + dbKey := originKey + if transactionID != "" { + dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID) + } + if commit && transactionID != "" { + x, _ := iDB.db.Get(cacheID, dbKey) + iDB.db.Remove(cacheID, dbKey, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + iDB.db.Set(cacheID, originKey, x, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return + } + var toBeDeleted []string + toBeAdded := make(map[string]utils.StringMap) + for key, strMp := range indexes { + if len(strMp) == 0 { // remove with no more elements inside + toBeDeleted = append(toBeDeleted, key) + delete(indexes, key) + continue + } + toBeAdded[key] = make(utils.StringMap) + toBeAdded[key] = strMp + } + + x, ok := iDB.db.Get(cacheID, dbKey) + if !ok || x == nil { + + result, err := iDB.ms.Marshal(toBeAdded) + if err != nil { + return err + } + iDB.db.Set(cacheID, dbKey, result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return err + } + + mp := make(map[string]utils.StringMap) + err = iDB.ms.Unmarshal(x.([]byte), &mp) + if err != nil { + return err + } + for _, key := range toBeDeleted { + delete(mp, key) + } + for key, strMp := range toBeAdded { + if _, has := mp[key]; !has { + mp[key] = make(utils.StringMap) + } + mp[key] = strMp + } + result, err := iDB.ms.Marshal(mp) + if err != nil { + return err + } + iDB.db.Set(cacheID, dbKey, result, nil, + cacheCommit(transactionID), transactionID) + return nil } func (iDB *InternalDB) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) { - return utils.ErrNotImplemented + iDB.db.Remove(cacheID, utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return } func (iDB *InternalDB) MatchFilterIndexDrv(cacheID, itemIDPrefix, filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) { - return nil, utils.ErrNotImplemented + + x, ok := iDB.db.Get(cacheID, utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + + var indexes map[string]utils.StringMap + if err = iDB.ms.Unmarshal(x.([]byte), &indexes); err != nil { + return nil, err + } + if _, hasIt := indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)]; hasIt { + itemIDs = indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)] + } + if len(itemIDs) == 0 { + return nil, utils.ErrNotFound + } + return } func (iDB *InternalDB) GetStatQueueProfileDrv(tenant string, id string) (*StatQueueProfile, error) { - x, ok := iDB.db.Get(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheStatQueueProfiles, utils.StatQueueProfilePrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*StatQueueProfile), nil } func (iDB *InternalDB) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { - iDB.db.Set(utils.CacheStatQueueProfiles, sq.TenantID(), sq, nil, + iDB.db.Set(utils.CacheStatQueueProfiles, utils.StatQueueProfilePrefix+sq.TenantID(), sq, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemStatQueueProfileDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheStatQueueProfiles, utils.StatQueueProfilePrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) { - x, ok := iDB.db.Get(utils.CacheStatQueues, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*StoredStatQueue), nil } func (iDB *InternalDB) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) { - iDB.db.Set(utils.CacheStatQueues, utils.ConcatenatedKey(sq.Tenant, sq.ID), sq, nil, + iDB.db.Set(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(sq.Tenant, sq.ID), sq, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemStoredStatQueueDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheStatQueues, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetThresholdProfileDrv(tenant, id string) (tp *ThresholdProfile, err error) { - x, ok := iDB.db.Get(utils.CacheThresholdProfiles, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheThresholdProfiles, utils.ThresholdProfilePrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*ThresholdProfile), nil } func (iDB *InternalDB) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) { - iDB.db.Set(utils.CacheThresholdProfiles, tp.TenantID(), tp, nil, + iDB.db.Set(utils.CacheThresholdProfiles, utils.ThresholdProfilePrefix+tp.TenantID(), tp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemThresholdProfileDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheThresholdProfiles, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheThresholdProfiles, utils.ThresholdProfilePrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetThresholdDrv(tenant, id string) (*Threshold, error) { - x, ok := iDB.db.Get(utils.CacheThresholds, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheThresholds, utils.ThresholdPrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*Threshold), nil } func (iDB *InternalDB) SetThresholdDrv(th *Threshold) (err error) { - iDB.db.Set(utils.CacheThresholds, th.TenantID(), th, nil, + iDB.db.Set(utils.CacheThresholds, utils.ThresholdPrefix+th.TenantID(), th, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveThresholdDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheThresholds, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheThresholds, utils.ThresholdPrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) GetFilterDrv(tenant, id string) (*Filter, error) { - x, ok := iDB.db.Get(utils.CacheFilters, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { +func (iDB *InternalDB) GetFilterDrv(tenant, id string) (fltr *Filter, err error) { + x, ok := iDB.db.Get(utils.CacheFilters, utils.FilterPrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*Filter), nil + } func (iDB *InternalDB) SetFilterDrv(fltr *Filter) (err error) { - iDB.db.Set(utils.CacheFilters, fltr.TenantID(), fltr, nil, + iDB.db.Set(utils.CacheFilters, utils.FilterPrefix+fltr.TenantID(), fltr, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveFilterDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheFilters, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheFilters, utils.FilterPrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetSupplierProfileDrv(tenant, id string) (*SupplierProfile, error) { - x, ok := iDB.db.Get(utils.CacheSupplierProfiles, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheSupplierProfiles, utils.SupplierProfilePrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } + return x.(*SupplierProfile), nil + } func (iDB *InternalDB) SetSupplierProfileDrv(spp *SupplierProfile) (err error) { - iDB.db.Set(utils.CacheSupplierProfiles, spp.TenantID(), spp, nil, + iDB.db.Set(utils.CacheSupplierProfiles, utils.SupplierProfilePrefix+spp.TenantID(), spp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveSupplierProfileDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheSupplierProfiles, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheSupplierProfiles, utils.SupplierProfilePrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetAttributeProfileDrv(tenant, id string) (*AttributeProfile, error) { - x, ok := iDB.db.Get(utils.CacheAttributeProfiles, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheAttributeProfiles, utils.AttributeProfilePrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*AttributeProfile), nil } func (iDB *InternalDB) SetAttributeProfileDrv(attr *AttributeProfile) (err error) { - iDB.db.Set(utils.CacheAttributeProfiles, attr.TenantID(), attr, nil, + iDB.db.Set(utils.CacheAttributeProfiles, utils.AttributeProfilePrefix+attr.TenantID(), attr, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveAttributeProfileDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheAttributeProfiles, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheAttributeProfiles, utils.AttributeProfilePrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetChargerProfileDrv(tenant, id string) (*ChargerProfile, error) { - x, ok := iDB.db.Get(utils.CacheChargerProfiles, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheChargerProfiles, utils.ChargerProfilePrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*ChargerProfile), nil } func (iDB *InternalDB) SetChargerProfileDrv(chr *ChargerProfile) (err error) { - iDB.db.Set(utils.CacheChargerProfiles, chr.TenantID(), chr, nil, + iDB.db.Set(utils.CacheChargerProfiles, utils.ChargerProfilePrefix+chr.TenantID(), chr, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveChargerProfileDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheChargerProfiles, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheChargerProfiles, utils.ChargerProfilePrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetDispatcherProfileDrv(tenant, id string) (*DispatcherProfile, error) { - x, ok := iDB.db.Get(utils.CacheDispatcherProfiles, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheDispatcherProfiles, utils.DispatcherProfilePrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*DispatcherProfile), nil } func (iDB *InternalDB) SetDispatcherProfileDrv(dpp *DispatcherProfile) (err error) { - iDB.db.Set(utils.CacheDispatcherProfiles, dpp.TenantID(), dpp, nil, + iDB.db.Set(utils.CacheDispatcherProfiles, utils.DispatcherProfilePrefix+dpp.TenantID(), dpp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveDispatcherProfileDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheDispatcherProfiles, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheDispatcherProfiles, utils.DispatcherProfilePrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[string]int64, err error) { x, ok := iDB.db.Get(utils.CacheLoadIDs, utils.LoadIDs) - if ok && x == nil { + if !ok || x == nil { return nil, utils.ErrNotFound } loadIDs = x.(map[string]int64) @@ -578,19 +1061,19 @@ func (iDB *InternalDB) SetLoadIDsDrv(loadIDs map[string]int64) (err error) { return } func (iDB *InternalDB) GetDispatcherHostDrv(tenant, id string) (*DispatcherHost, error) { - x, ok := iDB.db.Get(utils.CacheDispatcherHosts, utils.ConcatenatedKey(tenant, id)) - if ok && x == nil { + x, ok := iDB.db.Get(utils.CacheDispatcherHosts, utils.DispatcherHostPrefix+utils.ConcatenatedKey(tenant, id)) + if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*DispatcherHost), nil } func (iDB *InternalDB) SetDispatcherHostDrv(dpp *DispatcherHost) (err error) { - iDB.db.Set(utils.CacheDispatcherHosts, dpp.TenantID(), dpp, nil, + iDB.db.Set(utils.CacheDispatcherHosts, utils.DispatcherHostPrefix+dpp.TenantID(), dpp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } func (iDB *InternalDB) RemoveDispatcherHostDrv(tenant, id string) (err error) { - iDB.db.Remove(utils.CacheDispatcherHosts, utils.ConcatenatedKey(tenant, id), + iDB.db.Remove(utils.CacheDispatcherHosts, utils.DispatcherHostPrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } diff --git a/engine/storage_map_datadb.go b/engine/storage_map_datadb.go index 6acb97778..e5df5c187 100644 --- a/engine/storage_map_datadb.go +++ b/engine/storage_map_datadb.go @@ -498,14 +498,14 @@ func (ms *MapStorage) GetAccount(key string) (ub *Account, err error) { if !ok { return nil, utils.ErrNotFound } + if len(values) == 0 { + return nil, utils.ErrNotFound + } ub = &Account{ID: key} err = ms.ms.Unmarshal(values, &ub) if err != nil { return nil, err } - if len(values) == 0 { - return nil, utils.ErrNotFound - } return } @@ -662,45 +662,20 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error return } -func (ms *MapStorage) GetAccountActionPlans(acntID string, - skipCache bool, transactionID string) (apIDs []string, err error) { +func (ms *MapStorage) GetAccountActionPlansDrv(acntID string) (apIDs []string, err error) { ms.mu.RLock() defer ms.mu.RUnlock() - if !skipCache { - if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.([]string), nil - } - } values, ok := ms.dict[utils.AccountActionPlansPrefix+acntID] if !ok { - Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, - cacheCommit(transactionID), transactionID) - err = utils.ErrNotFound - return nil, err + return nil, utils.ErrNotFound } if err = ms.ms.Unmarshal(values, &apIDs); err != nil { return nil, err } - Cache.Set(utils.CacheAccountActionPlans, acntID, apIDs, nil, - cacheCommit(transactionID), transactionID) return } -func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) { - if !overwrite { - if oldaPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { - return err - } else { - for _, oldAPid := range oldaPlIDs { - if !utils.IsSliceMember(apIDs, oldAPid) { - apIDs = append(apIDs, oldAPid) - } - } - } - } +func (ms *MapStorage) SetAccountActionPlansDrv(acntID string, apIDs []string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(apIDs) @@ -708,17 +683,16 @@ func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overw return err } ms.dict[utils.AccountActionPlansPrefix+acntID] = result - return } -func (ms *MapStorage) RemAccountActionPlans(acntID string, apIDs []string) (err error) { +func (ms *MapStorage) RemAccountActionPlansDrv(acntID string, apIDs []string) (err error) { key := utils.AccountActionPlansPrefix + acntID if len(apIDs) == 0 { delete(ms.dict, key) return } - oldaPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional) + oldaPlIDs, err := ms.GetAccountActionPlansDrv(acntID) if err != nil { return err } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 17d8d1396..766ac6f26 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -461,7 +461,7 @@ func (ms *MongoStorage) RebuildReverseForPrefix(prefix string) (err error) { return err } for acntID := range apl.AccountIDs { - if err = ms.SetAccountActionPlans(acntID, []string{apl.Id}, false); err != nil { + if err = ms.SetAccountActionPlansDrv(acntID, []string{apl.Id}); err != nil { return err } } @@ -514,7 +514,7 @@ func (ms *MongoStorage) RemoveReverseForPrefix(prefix string) (err error) { return err } for acntID := range apl.AccountIDs { - if err = ms.RemAccountActionPlans(acntID, []string{apl.Id}); err != nil { + if err = ms.RemAccountActionPlansDrv(acntID, []string{apl.Id}); err != nil { return err } } @@ -1456,15 +1456,7 @@ func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err return } -func (ms *MongoStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (aPlIDs []string, err error) { - if !skipCache { - if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.([]string), nil - } - } +func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string) (aPlIDs []string, err error) { var kv struct { Key string Value []string @@ -1473,9 +1465,7 @@ func (ms *MongoStorage) GetAccountActionPlans(acntID string, skipCache bool, tra cur := ms.getCol(ColAAp).FindOne(sctx, bson.M{"key": acntID}) if err := cur.Decode(&kv); err != nil { if err == mongo.ErrNoDocuments { - Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, - cacheCommit(transactionID), transactionID) - return utils.ErrNotFound + err = utils.ErrNotFound } return err } @@ -1484,23 +1474,10 @@ func (ms *MongoStorage) GetAccountActionPlans(acntID string, skipCache bool, tra return nil, err } aPlIDs = kv.Value - Cache.Set(utils.CacheAccountActionPlans, acntID, aPlIDs, nil, - cacheCommit(transactionID), transactionID) return } -func (ms *MongoStorage) SetAccountActionPlans(acntID string, aPlIDs []string, overwrite bool) (err error) { - if !overwrite { - if oldaPlIDs, err := ms.GetAccountActionPlans(acntID, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { - return err - } else { - for _, oldAPid := range oldaPlIDs { - if !utils.IsSliceMember(aPlIDs, oldAPid) { - aPlIDs = append(aPlIDs, oldAPid) - } - } - } - } +func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { return ms.query(func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColAAp).UpdateOne(sctx, bson.M{"key": acntID}, bson.M{"$set": struct { @@ -1514,7 +1491,7 @@ func (ms *MongoStorage) SetAccountActionPlans(acntID string, aPlIDs []string, ov } // ToDo: check return len(aPlIDs) == 0 -func (ms *MongoStorage) RemAccountActionPlans(acntID string, aPlIDs []string) (err error) { +func (ms *MongoStorage) RemAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { if len(aPlIDs) == 0 { return ms.query(func(sctx mongo.SessionContext) (err error) { dr, err := ms.getCol(ColAAp).DeleteOne(sctx, bson.M{"key": acntID}) @@ -1524,7 +1501,7 @@ func (ms *MongoStorage) RemAccountActionPlans(acntID string, aPlIDs []string) (e return err }) } - oldAPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional) + oldAPlIDs, err := ms.GetAccountActionPlansDrv(acntID) if err != nil { return err } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index ef548797d..a0bc9b487 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -296,7 +296,7 @@ func (rs *RedisStorage) RebuildReverseForPrefix(prefix string) (err error) { return err } for acntID := range apl.AccountIDs { - if err = rs.SetAccountActionPlans(acntID, []string{apl.Id}, false); err != nil { + if err = rs.SetAccountActionPlansDrv(acntID, []string{apl.Id}); err != nil { return err } } @@ -343,7 +343,7 @@ func (rs *RedisStorage) RemoveReverseForPrefix(prefix string) (err error) { return err } for acntID := range apl.AccountIDs { - if err = rs.RemAccountActionPlans(acntID, []string{apl.Id}); err != nil { + if err = rs.RemAccountActionPlansDrv(acntID, []string{apl.Id}); err != nil { return err } } @@ -990,22 +990,11 @@ func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err return } -func (rs *RedisStorage) GetAccountActionPlans(acntID string, skipCache bool, - transactionID string) (aPlIDs []string, err error) { - if !skipCache { - if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.([]string), nil - } - } +func (rs *RedisStorage) GetAccountActionPlansDrv(acntID string) (aPlIDs []string, err error) { var values []byte if values, err = rs.Cmd(redis_GET, utils.AccountActionPlansPrefix+acntID).Bytes(); err != nil { if err == redis.ErrRespNil { // did not find the destination - Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, - cacheCommit(transactionID), transactionID) err = utils.ErrNotFound } return @@ -1013,23 +1002,11 @@ func (rs *RedisStorage) GetAccountActionPlans(acntID string, skipCache bool, if err = rs.ms.Unmarshal(values, &aPlIDs); err != nil { return } - Cache.Set(utils.CacheAccountActionPlans, acntID, aPlIDs, nil, - cacheCommit(transactionID), transactionID) + return } -func (rs *RedisStorage) SetAccountActionPlans(acntID string, aPlIDs []string, overwrite bool) (err error) { - if !overwrite { - if oldaPlIDs, err := rs.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { - return err - } else { - for _, oldAPid := range oldaPlIDs { - if !utils.IsSliceMember(aPlIDs, oldAPid) { - aPlIDs = append(aPlIDs, oldAPid) - } - } - } - } +func (rs *RedisStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { var result []byte if result, err = rs.ms.Marshal(aPlIDs); err != nil { return err @@ -1037,12 +1014,12 @@ func (rs *RedisStorage) SetAccountActionPlans(acntID string, aPlIDs []string, ov return rs.Cmd(redis_SET, utils.AccountActionPlansPrefix+acntID, result).Err } -func (rs *RedisStorage) RemAccountActionPlans(acntID string, aPlIDs []string) (err error) { +func (rs *RedisStorage) RemAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { key := utils.AccountActionPlansPrefix + acntID if len(aPlIDs) == 0 { return rs.Cmd(redis_DEL, key).Err } - oldaPlIDs, err := rs.GetAccountActionPlans(acntID, true, utils.NonTransactional) + oldaPlIDs, err := rs.GetAccountActionPlansDrv(acntID) if err != nil { return err } diff --git a/engine/tpreader.go b/engine/tpreader.go index e0cec6f8a..52ebf463a 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -824,7 +824,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) if err = tpr.dm.DataDB().SetActionPlan(accountAction.ActionPlanId, actionPlan, false, utils.NonTransactional); err != nil { return errors.New(err.Error() + " (SetActionPlan): " + accountAction.ActionPlanId) } - if err = tpr.dm.DataDB().SetAccountActionPlans(id, []string{accountAction.ActionPlanId}, false); err != nil { + if err = tpr.dm.SetAccountActionPlans(id, []string{accountAction.ActionPlanId}, false); err != nil { return err } if err = tpr.dm.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{id}, true); err != nil { diff --git a/utils/consts.go b/utils/consts.go index b17c6aa19..ce28b43a5 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -86,6 +86,7 @@ var ( CacheChargerFilterIndexes: ChargerFilterIndexes, CacheDispatcherFilterIndexes: DispatcherFilterIndexes, CacheLoadIDs: LoadIDPrefix, + CacheAccounts: ACCOUNT_PREFIX, } CachePrefixToInstance map[string]string // will be built on init PrefixToIndexCache = map[string]string{ @@ -1147,6 +1148,7 @@ const ( MetaPrecaching = "*precaching" MetaReady = "*ready" CacheLoadIDs = "load_ids" + CacheAccounts = "*accounts" ) // Prefix for indexing