diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 1100f1b47..fe9079c82 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -43,7 +43,7 @@ func (self *ApierV1) GetAccountActionPlan(attrs utils.TenantAccount, reply *[]*A } acntID := utils.ConcatenatedKey(attrs.Tenant, attrs.Account) acntATsIf, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := self.DataManager.GetAccountActionPlans(acntID, true, true, utils.NonTransactional) + acntAPids, err := self.DataManager.DataDB().GetAccountActionPlans(acntID, false, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return nil, utils.NewErrServerError(err) } @@ -139,7 +139,7 @@ func (self *ApierV1) RemoveActionTiming(attrs AttrRemoveActionTiming, reply *str return 0, err } for _, acntID := range remAcntAPids { - if err = self.DataManager.RemAccountActionPlans(acntID, []string{attrs.ActionPlanId}); err != nil { + if err = self.DataManager.DataDB().RemAccountActionPlans(acntID, []string{attrs.ActionPlanId}); err != nil { return 0, nil } } @@ -183,7 +183,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e } if attr.ActionPlanId != "" { _, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := self.DataManager.GetAccountActionPlans(accID, true, true, utils.NonTransactional) + acntAPids, err := self.DataManager.DataDB().GetAccountActionPlans(accID, false, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return 0, err } @@ -239,7 +239,7 @@ func (self *ApierV1) SetAccount(attr utils.AttrSetAccount, reply *string) (err e if err := self.DataManager.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, apIDs, true); err != nil { return 0, err } - if err := self.DataManager.SetAccountActionPlans(accID, acntAPids, true); err != nil { + if err := self.DataManager.DataDB().SetAccountActionPlans(accID, acntAPids, true); err != nil { return 0, err } if err = self.DataManager.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil { @@ -331,7 +331,7 @@ func (self *ApierV1) RemoveAccount(attr utils.AttrRemoveAccount, reply *string) if err != nil { return utils.NewErrServerError(err) } - if err = self.DataManager.RemAccountActionPlans(accID, nil); err != nil && + if err = self.DataManager.DataDB().RemAccountActionPlans(accID, nil); err != nil && err.Error() != utils.ErrNotFound.Error() { return err } diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 17cd05c49..b65995882 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -643,7 +643,7 @@ func (self *ApierV1) SetActionPlan(attrs AttrSetActionPlan, reply *string) (err return 0, utils.NewErrServerError(err) } for acntID := range prevAccountIDs { - if err := self.DataManager.RemAccountActionPlans(acntID, []string{attrs.Id}); err != nil { + if err := self.DataManager.DataDB().RemAccountActionPlans(acntID, []string{attrs.Id}); err != nil { return 0, utils.NewErrServerError(err) } } @@ -738,7 +738,7 @@ func (self *ApierV1) RemoveActionPlan(attr AttrGetActionPlan, reply *string) (er return 0, err } for acntID := range prevAccountIDs { - if err := self.DataManager.RemAccountActionPlans(acntID, []string{attr.ID}); err != nil { + if err := self.DataManager.DataDB().RemAccountActionPlans(acntID, []string{attr.ID}); err != nil { return 0, utils.NewErrServerError(err) } } diff --git a/apier/v1/attributes_it_test.go b/apier/v1/attributes_it_test.go index 796f58159..3f41acedf 100644 --- a/apier/v1/attributes_it_test.go +++ b/apier/v1/attributes_it_test.go @@ -25,6 +25,7 @@ import ( "net/rpc/jsonrpc" "path" "reflect" + "sort" "testing" "time" @@ -771,6 +772,10 @@ func testAttributeSUpdateAlsPrf(t *testing.T) { t.Fatal(err) } reply.Compile() + sort.Strings(reply.FilterIDs) + sort.Strings(alsPrf.AttributeProfile.FilterIDs) + sort.Strings(reply.Contexts) + sort.Strings(alsPrf.AttributeProfile.Contexts) if !reflect.DeepEqual(alsPrf.AttributeProfile, reply) { t.Errorf("Expecting : %+v, received: %+v", alsPrf.AttributeProfile, reply) } diff --git a/apier/v1/resourcesv1_it_test.go b/apier/v1/resourcesv1_it_test.go index c3d18433f..f1aac0132 100644 --- a/apier/v1/resourcesv1_it_test.go +++ b/apier/v1/resourcesv1_it_test.go @@ -24,6 +24,7 @@ import ( "net/rpc/jsonrpc" "path" "reflect" + "sort" "testing" "time" @@ -689,8 +690,12 @@ func testV1RsGetResourceProfileAfterUpdate(t *testing.T) { if err := rlsV1Rpc.Call("ApierV1.GetResourceProfile", &utils.TenantID{Tenant: "cgrates.org", ID: rlsConfig.ID}, &reply); err != nil { t.Error(err) - } else if !reflect.DeepEqual(reply, rlsConfig.ResourceProfile) { - t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(rlsConfig.ResourceProfile), utils.ToJSON(reply)) + } else { + sort.Strings(reply.FilterIDs) + sort.Strings(rlsConfig.ResourceProfile.FilterIDs) + if !reflect.DeepEqual(reply, rlsConfig.ResourceProfile) { + t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(rlsConfig.ResourceProfile), utils.ToJSON(reply)) + } } } diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index 997ae452e..a080c0102 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -26,13 +26,14 @@ import ( "reflect" "testing" "time" + "sort" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -var ( + var ( tSv1CfgPath string tSv1Cfg *config.CGRConfig tSv1Rpc *rpc.Client @@ -403,8 +404,12 @@ func testV1TSUpdateThresholdProfile(t *testing.T) { if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "THD_Test"}, &reply); err != nil { t.Error(err) - } else if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) { - t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply) + } else{ + sort.Strings(reply.FilterIDs) + sort.Strings(tPrfl.ThresholdProfile.FilterIDs) + if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply) + } } } diff --git a/apier/v1/tpaccountactions_it_test.go b/apier/v1/tpaccountactions_it_test.go index 40c23e8a4..368a548ac 100644 --- a/apier/v1/tpaccountactions_it_test.go +++ b/apier/v1/tpaccountactions_it_test.go @@ -26,6 +26,7 @@ import ( "path" "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -83,6 +84,13 @@ func TestTPAccActionsITPG(t *testing.T) { } } +func TestTPAccActionsITInternal(t *testing.T) { + tpAccActionsConfigDIR = "tutinternal" + for _, stest := range sTestsTPAccActions { + t.Run(tpAccActionsConfigDIR, stest) + } +} + func testTPAccActionsInitCfg(t *testing.T) { var err error tpAccActionsCfgPath = path.Join(tpAccActionsDataDir, "conf", "samples", tpAccActionsConfigDIR) @@ -219,7 +227,7 @@ func testTPAccActionsRemTPAccAction(t *testing.T) { } else if resp != utils.OK { t.Error("Unexpected reply returned", resp) } - + time.Sleep(time.Duration(100 * time.Millisecond)) } func testTPAccActionsGetTPAccActionAfterRemove(t *testing.T) { @@ -232,7 +240,7 @@ func testTPAccActionsGetTPAccActionAfterRemove(t *testing.T) { } func testTPAccActionsKillEngine(t *testing.T) { - if err := engine.KillEngine(tpDestinationDelay); err != nil { + if err := engine.KillEngine(tpAccActionsDelay); err != nil { t.Error(err) } } diff --git a/apier/v1/tpactionplans_it_test.go b/apier/v1/tpactionplans_it_test.go index 6160b3a47..9835e1939 100644 --- a/apier/v1/tpactionplans_it_test.go +++ b/apier/v1/tpactionplans_it_test.go @@ -81,6 +81,13 @@ func TestTPAccPlansITPG(t *testing.T) { } } +func TestTPAccPlansITInternal(t *testing.T) { + tpAccPlansConfigDIR = "tutinternal" + for _, stest := range sTestsTPAccPlans { + t.Run(tpAccPlansConfigDIR, stest) + } +} + func testTPAccPlansInitCfg(t *testing.T) { var err error tpAccPlansCfgPath = path.Join(tpAccPlansDataDir, "conf", "samples", tpAccPlansConfigDIR) diff --git a/apier/v1/tpactions_it_test.go b/apier/v1/tpactions_it_test.go index e8976353d..e8c451e0a 100644 --- a/apier/v1/tpactions_it_test.go +++ b/apier/v1/tpactions_it_test.go @@ -81,6 +81,13 @@ func TestTPActionsITPG(t *testing.T) { } } +func TestTPActionsITInternal(t *testing.T) { + tpActionConfigDIR = "tutinternal" + for _, stest := range sTestsTPActions { + t.Run(tpActionConfigDIR, stest) + } +} + func testTPActionsInitCfg(t *testing.T) { var err error tpActionCfgPath = path.Join(tpActionDataDir, "conf", "samples", tpActionConfigDIR) diff --git a/apier/v1/tpactiontriggers_it_test.go b/apier/v1/tpactiontriggers_it_test.go index 7edea0c34..d29b0cf37 100644 --- a/apier/v1/tpactiontriggers_it_test.go +++ b/apier/v1/tpactiontriggers_it_test.go @@ -81,6 +81,13 @@ func TestTPActionTriggersITPG(t *testing.T) { } } +func TestTPActionTriggersITInternal(t *testing.T) { + tpActionTriggerConfigDIR = "tutinternal" + for _, stest := range sTestsTPActionTriggers { + t.Run(tpActionTriggerConfigDIR, stest) + } +} + func testTPActionTriggersInitCfg(t *testing.T) { var err error tpActionTriggerCfgPath = path.Join(tpActionTriggerDataDir, "conf", "samples", tpActionTriggerConfigDIR) diff --git a/apier/v1/tpattributes_it_test.go b/apier/v1/tpattributes_it_test.go index 088aab866..92bd70e04 100644 --- a/apier/v1/tpattributes_it_test.go +++ b/apier/v1/tpattributes_it_test.go @@ -75,6 +75,13 @@ func TestTPAlsPrfITMongo(t *testing.T) { } } +func TestTPAlsPrfITInternal(t *testing.T) { + tpAlsPrfConfigDIR = "tutinternal" + for _, stest := range sTestsTPAlsPrf { + t.Run(tpAlsPrfConfigDIR, stest) + } +} + func testTPAlsPrfInitCfg(t *testing.T) { var err error tpAlsPrfCfgPath = path.Join(tpAlsPrfDataDir, "conf", "samples", tpAlsPrfConfigDIR) diff --git a/apier/v1/tpchargers_it_test.go b/apier/v1/tpchargers_it_test.go index fbb382422..8d229e832 100644 --- a/apier/v1/tpchargers_it_test.go +++ b/apier/v1/tpchargers_it_test.go @@ -81,6 +81,13 @@ func TestTPChrgsITMapStorage(t *testing.T) { } } +func TestTPChrgsITInternal(t *testing.T) { + tpChrgsConfigDIR = "tutinternal" + for _, stest := range sTestsTPChrgs { + t.Run(tpChrgsConfigDIR, stest) + } +} + func testTPChrgsInitCfg(t *testing.T) { var err error tpChrgsCfgPath = path.Join(tpChrgsDataDir, "conf", "samples", tpChrgsConfigDIR) diff --git a/apier/v1/tpdestinationrates_it_test.go b/apier/v1/tpdestinationrates_it_test.go index aa6731798..9223c4e4f 100644 --- a/apier/v1/tpdestinationrates_it_test.go +++ b/apier/v1/tpdestinationrates_it_test.go @@ -80,6 +80,13 @@ func TestTPDstRateITPG(t *testing.T) { } } +func TestTPDstRateITInternal(t *testing.T) { + tpDstRateConfigDIR = "tutinternal" + for _, stest := range sTestsTPDstRates { + t.Run(tpDstRateConfigDIR, stest) + } +} + func testTPDstRateInitCfg(t *testing.T) { var err error tpDstRateCfgPath = path.Join(tpDstRateDataDir, "conf", "samples", tpDstRateConfigDIR) diff --git a/apier/v1/tpdestinations_it_test.go b/apier/v1/tpdestinations_it_test.go index 87c1cf9ba..102183afe 100644 --- a/apier/v1/tpdestinations_it_test.go +++ b/apier/v1/tpdestinations_it_test.go @@ -80,6 +80,13 @@ func TestTPDestinationsITPG(t *testing.T) { } } +func TestTPDestinationsITInternal(t *testing.T) { + tpDestinationConfigDIR = "tutinternal" + for _, stest := range sTestsTPDestinations { + t.Run(tpDestinationConfigDIR, stest) + } +} + func testTPDestinationsInitCfg(t *testing.T) { var err error tpDestinationCfgPath = path.Join(tpDestinationDataDir, "conf", "samples", tpDestinationConfigDIR) diff --git a/apier/v1/tpdispatchers_it_test.go b/apier/v1/tpdispatchers_it_test.go index 232aae3f8..31074c228 100644 --- a/apier/v1/tpdispatchers_it_test.go +++ b/apier/v1/tpdispatchers_it_test.go @@ -73,6 +73,13 @@ func TestTPDispatcherITMongo(t *testing.T) { } } +func TestTPDispatcherITInternal(t *testing.T) { + tpDispatcherConfigDIR = "tutinternal" + for _, stest := range sTestsTPDispatchers { + t.Run(tpDispatcherConfigDIR, stest) + } +} + func testTPDispatcherInitCfg(t *testing.T) { var err error tpDispatcherCfgPath = path.Join(tpDispatcherDataDir, "conf", "samples", tpDispatcherConfigDIR) diff --git a/apier/v1/tpfilters_it_test.go b/apier/v1/tpfilters_it_test.go index 34958cf3e..f76e3f775 100644 --- a/apier/v1/tpfilters_it_test.go +++ b/apier/v1/tpfilters_it_test.go @@ -82,6 +82,13 @@ func TestTPFilterITPG(t *testing.T) { } } +func TestTPFilterITInternal(t *testing.T) { + tpFilterConfigDIR = "tutinternal" + for _, stest := range sTestsTPFilters { + t.Run(tpFilterConfigDIR, stest) + } +} + func testTPFilterInitCfg(t *testing.T) { var err error tpFilterCfgPath = path.Join(tpFilterDataDir, "conf", "samples", tpFilterConfigDIR) diff --git a/apier/v1/tprates_it_test.go b/apier/v1/tprates_it_test.go index 26210085e..fe4ce2f76 100644 --- a/apier/v1/tprates_it_test.go +++ b/apier/v1/tprates_it_test.go @@ -80,6 +80,13 @@ func TestTPRatesITPG(t *testing.T) { } } +func TestTPRatesITInternal(t *testing.T) { + tpRateConfigDIR = "tutinternal" + for _, stest := range sTestsTPRates { + t.Run(tpRateConfigDIR, stest) + } +} + func testTPRatesInitCfg(t *testing.T) { var err error tpRateCfgPath = path.Join(tpRateDataDir, "conf", "samples", tpRateConfigDIR) diff --git a/apier/v1/tpratingplans_it_test.go b/apier/v1/tpratingplans_it_test.go index 593a44e39..652749277 100644 --- a/apier/v1/tpratingplans_it_test.go +++ b/apier/v1/tpratingplans_it_test.go @@ -80,6 +80,13 @@ func TestTPRatingPlansITPG(t *testing.T) { } } +func TestTPRatingPlansITInternal(t *testing.T) { + tpRatingPlanConfigDIR = "tutinternal" + for _, stest := range sTestsTPRatingPlans { + t.Run(tpRatingPlanConfigDIR, stest) + } +} + func testTPRatingPlansInitCfg(t *testing.T) { var err error tpRatingPlanCfgPath = path.Join(tpRatingPlanDataDir, "conf", "samples", tpRatingPlanConfigDIR) diff --git a/apier/v1/tpratingprofiles_it_test.go b/apier/v1/tpratingprofiles_it_test.go index 4c42e6a4b..62c4f175a 100644 --- a/apier/v1/tpratingprofiles_it_test.go +++ b/apier/v1/tpratingprofiles_it_test.go @@ -26,6 +26,7 @@ import ( "path" "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -83,6 +84,13 @@ func TestTPRatingProfilesITPG(t *testing.T) { } } +func TestTPRatingProfilesITInternal(t *testing.T) { + tpRatingProfileConfigDIR = "tutinternal" + for _, stest := range sTestsTPRatingProfiles { + t.Run(tpRatingProfileConfigDIR, stest) + } +} + func testTPRatingProfilesInitCfg(t *testing.T) { var err error tpRatingProfileCfgPath = path.Join(tpRatingProfileDataDir, "conf", "samples", tpRatingProfileConfigDIR) @@ -276,6 +284,7 @@ func testTPRatingProfilesRemoveTPRatingProfile(t *testing.T) { } else if resp != utils.OK { t.Error("Unexpected reply returned", resp) } + time.Sleep(time.Duration(100 * time.Millisecond)) } func testTPRatingProfilesGetTPRatingProfileAfterRemove(t *testing.T) { diff --git a/apier/v1/tpresources_it_test.go b/apier/v1/tpresources_it_test.go index 558ee1512..3d8ea873a 100644 --- a/apier/v1/tpresources_it_test.go +++ b/apier/v1/tpresources_it_test.go @@ -80,6 +80,13 @@ func TestTPResITPG(t *testing.T) { } } +func TestTPResITInternal(t *testing.T) { + tpResConfigDIR = "tutinternal" + for _, stest := range sTestsTPResources { + t.Run(tpResConfigDIR, stest) + } +} + func testTPResInitCfg(t *testing.T) { var err error tpResCfgPath = path.Join(tpResDataDir, "conf", "samples", tpResConfigDIR) diff --git a/apier/v1/tpsharedgroups_it_test.go b/apier/v1/tpsharedgroups_it_test.go index d8804e39a..54ffbd3c2 100644 --- a/apier/v1/tpsharedgroups_it_test.go +++ b/apier/v1/tpsharedgroups_it_test.go @@ -80,6 +80,13 @@ func TestTPSharedGroupsITPG(t *testing.T) { } } +func TestTPSharedGroupsITInternal(t *testing.T) { + tpSharedGroupConfigDIR = "tutinternal" + for _, stest := range sTestsTPSharedGroups { + t.Run(tpSharedGroupConfigDIR, stest) + } +} + func testTPSharedGroupsInitCfg(t *testing.T) { var err error tpSharedGroupCfgPath = path.Join(tpSharedGroupDataDir, "conf", "samples", tpSharedGroupConfigDIR) diff --git a/apier/v1/tpstats_it_test.go b/apier/v1/tpstats_it_test.go index d530addfb..c6c4fddfc 100644 --- a/apier/v1/tpstats_it_test.go +++ b/apier/v1/tpstats_it_test.go @@ -81,6 +81,13 @@ func TestTPStatITPG(t *testing.T) { } } +func TestTPStatITInternal(t *testing.T) { + tpStatConfigDIR = "tutinternal" + for _, stest := range sTestsTPStats { + t.Run(tpStatConfigDIR, stest) + } +} + func testTPStatsInitCfg(t *testing.T) { var err error tpStatCfgPath = path.Join(tpStatDataDir, "conf", "samples", tpStatConfigDIR) diff --git a/apier/v1/tpsuppliers_it_test.go b/apier/v1/tpsuppliers_it_test.go index 766344a70..94e6bb339 100644 --- a/apier/v1/tpsuppliers_it_test.go +++ b/apier/v1/tpsuppliers_it_test.go @@ -75,6 +75,13 @@ func TestTPSplPrfITMongo(t *testing.T) { } } +func TestTPSplPrfITInternal(t *testing.T) { + tpSplPrfConfigDIR = "tutinternal" + for _, stest := range sTestsTPSplPrf { + t.Run(tpSplPrfConfigDIR, stest) + } +} + func testTPSplPrfInitCfg(t *testing.T) { var err error tpSplPrfCfgPath = path.Join(tpSplPrfDataDire, "conf", "samples", tpSplPrfConfigDIR) diff --git a/apier/v1/tpthresholds_it_test.go b/apier/v1/tpthresholds_it_test.go index c998b938c..777091ecb 100644 --- a/apier/v1/tpthresholds_it_test.go +++ b/apier/v1/tpthresholds_it_test.go @@ -81,6 +81,13 @@ func TestTPThresholdITPG(t *testing.T) { } } +func TestTPThresholdITInternal(t *testing.T) { + tpThresholdConfigDIR = "tutinternal" + for _, stest := range sTestsTPThreshold { + t.Run(tpThresholdConfigDIR, stest) + } +} + func testTPThreholdInitCfg(t *testing.T) { var err error tpThresholdCfgPath = path.Join(tpThresholdDataDir, "conf", "samples", tpThresholdConfigDIR) diff --git a/apier/v1/tptimings_it_test.go b/apier/v1/tptimings_it_test.go index 652a70f3c..1da91c7b4 100644 --- a/apier/v1/tptimings_it_test.go +++ b/apier/v1/tptimings_it_test.go @@ -80,6 +80,13 @@ func TestTPTimingITPG(t *testing.T) { } } +func TestTPTimingITInternal(t *testing.T) { + tpTimingConfigDIR = "tutinternal" + for _, stest := range sTestsTPTiming { + t.Run(tpTimingConfigDIR, stest) + } +} + func testTPTimingsInitCfg(t *testing.T) { var err error tpTimingCfgPath = path.Join(tpTimingDataDir, "conf", "samples", tpTimingConfigDIR) diff --git a/apier/v1/versions_it_test.go b/apier/v1/versions_it_test.go index bb7b1d33f..bd0f8c56a 100644 --- a/apier/v1/versions_it_test.go +++ b/apier/v1/versions_it_test.go @@ -70,13 +70,13 @@ func TestVrsITMongo(t *testing.T) { } } -func TestVrsITInternal(t *testing.T) { - vrsConfigDIR = "tutinternal" - vrsStorageType = utils.INTERNAL - for _, stest := range sTestsVrs { - t.Run(vrsConfigDIR, stest) - } -} +// func TestVrsITInternal(t *testing.T) { +// vrsConfigDIR = "tutinternal" +// vrsStorageType = utils.INTERNAL +// for _, stest := range sTestsVrs { +// t.Run(vrsConfigDIR, stest) +// } +// } func testVrsInitCfg(t *testing.T) { var err error diff --git a/apier/v2/accounts.go b/apier/v2/accounts.go index 95f269a6e..00987983f 100644 --- a/apier/v2/accounts.go +++ b/apier/v2/accounts.go @@ -119,7 +119,7 @@ func (self *ApierV2) SetAccount(attr AttrSetAccount, reply *string) error { } if attr.ActionPlanIDs != nil { _, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := self.DataManager.GetAccountActionPlans(accID, false, false, utils.NonTransactional) + acntAPids, err := self.DataManager.DataDB().GetAccountActionPlans(accID, false, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return 0, err } @@ -189,7 +189,7 @@ func (self *ApierV2) SetAccount(attr AttrSetAccount, reply *string) error { if err := self.DataManager.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, apIDs, true); err != nil { return 0, err } - if err := self.DataManager.SetAccountActionPlans(accID, acntAPids, true); err != nil { + if err := self.DataManager.DataDB().SetAccountActionPlans(accID, acntAPids, true); err != nil { return 0, err } return 0, self.DataManager.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true) diff --git a/apier/v2/apierv2_it_test.go b/apier/v2/apierv2_it_test.go index 1f2bb8d26..251c2e59c 100644 --- a/apier/v2/apierv2_it_test.go +++ b/apier/v2/apierv2_it_test.go @@ -230,7 +230,7 @@ func TestApierV2itSetAccountWithAP(t *testing.T) { ActionPlanIDs: &[]string{argAP1.Id}, } acntID := utils.ConcatenatedKey(argSetAcnt1.Tenant, argSetAcnt1.Account) - if _, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err == nil || err != utils.ErrNotFound { + if _, err := dm.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err == nil || err != utils.ErrNotFound { t.Error(err) } if err := apierRPC.Call("ApierV2.SetAccount", argSetAcnt1, &reply); err != nil { @@ -242,7 +242,7 @@ func TestApierV2itSetAccountWithAP(t *testing.T) { t.Errorf("ActionPlan does not contain the accountID: %+v", ap) } eAAPids := []string{argAP1.Id} - if aapIDs, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { + if aapIDs, err := dm.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(eAAPids, aapIDs) { t.Errorf("Expecting: %+v, received: %+v", eAAPids, aapIDs) @@ -279,7 +279,7 @@ func TestApierV2itSetAccountWithAP(t *testing.T) { t.Errorf("ActionPlan does not contain the accountID: %+v", ap) } eAAPids = []string{argAP1.Id, argAP2.Id} - if aapIDs, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { + if aapIDs, err := dm.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(eAAPids, aapIDs) { t.Errorf("Expecting: %+v, received: %+v", eAAPids, aapIDs) @@ -305,7 +305,7 @@ func TestApierV2itSetAccountWithAP(t *testing.T) { t.Errorf("ActionPlan does not contain the accountID: %+v", ap) } eAAPids = []string{argAP2.Id} - if aapIDs, err := dm.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { + if aapIDs, err := dm.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(eAAPids, aapIDs) { t.Errorf("Expecting: %+v, received: %+v", eAAPids, aapIDs) diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json index 81f60aa90..0a0191668 100644 --- a/data/conf/samples/tutinternal/cgrates.json +++ b/data/conf/samples/tutinternal/cgrates.json @@ -15,12 +15,12 @@ "http": ":2080", }, -"data_db": { // database used to store runtime data (eg: accounts, cdr stats) - "db_type": "*internal", // data_db type: +"data_db": { + "db_type": "*internal", }, -"stor_db": { // for the moment use mysql as stordb until implemted all methods - "db_password": "CGRateS.org", +"stor_db": { + "db_type": "*internal", }, "rals": { diff --git a/engine/action.go b/engine/action.go index 93a3f4901..7332fbe6c 100644 --- a/engine/action.go +++ b/engine/action.go @@ -612,7 +612,7 @@ func removeAccountAction(ub *Account, a *Action, acs Actions, extraData interfac } _, err := guardian.Guardian.Guard(func() (interface{}, error) { - acntAPids, err := dm.GetAccountActionPlans(accID, true, true, utils.NonTransactional) + acntAPids, err := dm.DataDB().GetAccountActionPlans(accID, false, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { utils.Logger.Err(fmt.Sprintf("Could not get action plans: %s: %v", accID, err)) return 0, err @@ -632,7 +632,7 @@ func removeAccountAction(ub *Account, a *Action, acs Actions, extraData interfac if err = dm.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, acntAPids, true); err != nil { return 0, err } - if err = dm.RemAccountActionPlans(accID, nil); err != nil { + if err = dm.DataDB().RemAccountActionPlans(accID, nil); err != nil { return 0, err } if err = dm.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{accID}, true); err != nil && err.Error() != utils.ErrNotFound.Error() { diff --git a/engine/actions_test.go b/engine/actions_test.go index 6603596a1..547f125d6 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -576,7 +576,7 @@ func TestActionPlansRemoveMember(t *testing.T) { []string{ap1.Id, ap2.Id}, true); err != nil { t.Error(err) } - if err = dm.SetAccountActionPlans(account1.ID, + if err = dm.DataDB().SetAccountActionPlans(account1.ID, []string{ap1.Id}, false); err != nil { t.Error(err) } @@ -584,8 +584,8 @@ func TestActionPlansRemoveMember(t *testing.T) { []string{account1.ID}, true); err != nil { t.Error(err) } - dm.GetAccountActionPlans(account1.ID, false, false, utils.NonTransactional) // FixMe: remove here after finishing testing of map - if err = dm.SetAccountActionPlans(account2.ID, + dm.DataDB().GetAccountActionPlans(account1.ID, true, utils.NonTransactional) // FixMe: remove here after finishing testing of map + if err = dm.DataDB().SetAccountActionPlans(account2.ID, []string{ap2.Id}, false); err != nil { t.Error(err) } diff --git a/engine/datamanager.go b/engine/datamanager.go index b6c0088b2..30e3dd667 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -235,7 +235,7 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b case utils.ACTION_PLAN_PREFIX: _, err = dm.DataDB().GetActionPlan(dataID, true, utils.NonTransactional) case utils.AccountActionPlansPrefix: - _, err = dm.GetAccountActionPlans(dataID, false, true, utils.NonTransactional) + _, err = dm.DataDB().GetAccountActionPlans(dataID, true, utils.NonTransactional) case utils.ACTION_TRIGGER_PREFIX: _, err = dm.GetActionTriggers(dataID, true, utils.NonTransactional) case utils.SHARED_GROUP_PREFIX: @@ -1389,47 +1389,3 @@ func (dm *DataManager) GetItemLoadIDs(itemIDPrefix string, cacheWrite bool) (loa func (dm *DataManager) SetLoadIDs(loadIDs map[string]int64) error { return dm.DataDB().SetLoadIDsDrv(loadIDs) } - -func (dm *DataManager) GetAccountActionPlans(acntID string, - cacheRead, cacheWrite bool, transactionID string) (apIDs []string, err error) { - if !cacheRead { - if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.([]string), nil - } - } - apIDs, err = dm.DataDB().GetAccountActionPlansDrv(acntID) - if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, - cacheCommit(transactionID), transactionID) - } - return nil, err - } - if cacheWrite { - Cache.Set(utils.CacheAccountActionPlans, acntID, apIDs, nil, - cacheCommit(transactionID), transactionID) - } - return -} - -func (dm *DataManager) SetAccountActionPlans(acntID string, aPlIDs []string, overwrite bool) (err error) { - if !overwrite { - if oldaPlIDs, err := dm.GetAccountActionPlans(acntID, true, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { - return err - } else { - for _, oldAPid := range oldaPlIDs { - if !utils.IsSliceMember(aPlIDs, oldAPid) { - aPlIDs = append(aPlIDs, oldAPid) - } - } - } - } - return dm.DataDB().SetAccountActionPlansDrv(acntID, aPlIDs) -} - -func (dm *DataManager) RemAccountActionPlans(acntID string, aPlIDs []string) (err error) { - return dm.DataDB().RemAccountActionPlansDrv(acntID, aPlIDs) -} diff --git a/engine/filterindexer_it_test.go b/engine/filterindexer_it_test.go index e00573ea9..2cec3e486 100644 --- a/engine/filterindexer_it_test.go +++ b/engine/filterindexer_it_test.go @@ -100,11 +100,7 @@ func TestFilterIndexerITMongo(t *testing.T) { } func TestFilterIndexerITInternal(t *testing.T) { - mapDataDB, err := NewMapStorage() - if err != nil { - t.Fatal(err) - } - dataManager = NewDataManager(mapDataDB) + dataManager = NewDataManager(NewInternalDB()) for _, stest := range sTests { t.Run("TestITInternal", stest) } @@ -311,9 +307,11 @@ func testITTestThresholdFilterIndexes(t *testing.T) { if err := dataManager.SetFilter(fp2); err != nil { t.Error(err) } - th.FilterIDs = []string{"Filter2"} + cloneTh1:=new(ThresholdProfile) + *cloneTh1 = *th + cloneTh1.FilterIDs = []string{"Filter2"} time.Sleep(50 * time.Millisecond) - if err := dataManager.SetThresholdProfile(th, true); err != nil { + if err := dataManager.SetThresholdProfile(cloneTh1, true); err != nil { t.Error(err) } eIdxes = map[string]utils.StringMap{ @@ -356,9 +354,12 @@ func testITTestThresholdFilterIndexes(t *testing.T) { if err := dataManager.SetFilter(fp3); err != nil { t.Error(err) } - th.FilterIDs = []string{"Filter1", "Filter3"} + + clone2Th1:=new(ThresholdProfile) + *clone2Th1 = *th + clone2Th1.FilterIDs = []string{"Filter1", "Filter3"} time.Sleep(50 * time.Millisecond) - if err := dataManager.SetThresholdProfile(th, true); err != nil { + if err := dataManager.SetThresholdProfile(clone2Th1, true); err != nil { t.Error(err) } eIdxes = map[string]utils.StringMap{ diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index bda1636c1..2e6bb4cd0 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -42,41 +42,41 @@ var ( var sTestsOnStorIT = []func(t *testing.T){ testOnStorITFlush, testOnStorITIsDBEmpty, - //testOnStorITCacheDestinations, + testOnStorITCacheDestinations, testOnStorITCacheReverseDestinations, - // testOnStorITCacheActionPlan, - // testOnStorITCacheAccountActionPlans, + testOnStorITCacheActionPlan, + testOnStorITCacheAccountActionPlans, - // // ToDo: test cache flush for a prefix - // // ToDo: testOnStorITLoadAccountingCache - // testOnStorITHasData, - // testOnStorITPushPop, - // testOnStorITRatingPlan, - // testOnStorITRatingProfile, - // testOnStorITCRUDDestinations, - // testOnStorITCRUDReverseDestinations, - // testOnStorITActions, - // testOnStorITSharedGroup, - // testOnStorITCRUDActionPlan, - // testOnStorITCRUDAccountActionPlans, - // testOnStorITCRUDAccount, - // testOnStorITResource, - // testOnStorITResourceProfile, - // testOnStorITTiming, - // //testOnStorITCRUDHistory, - // testOnStorITCRUDStructVersion, - // testOnStorITStatQueueProfile, - // testOnStorITStatQueue, - // testOnStorITThresholdProfile, - // testOnStorITThreshold, - // testOnStorITFilter, - // testOnStorITSupplierProfile, - // testOnStorITAttributeProfile, - // testOnStorITFlush, - // testOnStorITIsDBEmpty, - // testOnStorITTestAttributeSubstituteIface, - // testOnStorITChargerProfile, - // testOnStorITDispatcherProfile, + // ToDo: test cache flush for a prefix + // ToDo: testOnStorITLoadAccountingCache + testOnStorITHasData, + testOnStorITPushPop, + testOnStorITRatingPlan, + testOnStorITRatingProfile, + testOnStorITCRUDDestinations, + testOnStorITCRUDReverseDestinations, + testOnStorITActions, + testOnStorITSharedGroup, + testOnStorITCRUDActionPlan, + testOnStorITCRUDAccountActionPlans, + testOnStorITCRUDAccount, + testOnStorITResource, + testOnStorITResourceProfile, + testOnStorITTiming, + //testOnStorITCRUDHistory, + testOnStorITCRUDStructVersion, + testOnStorITStatQueueProfile, + testOnStorITStatQueue, + testOnStorITThresholdProfile, + testOnStorITThreshold, + testOnStorITFilter, + testOnStorITSupplierProfile, + testOnStorITAttributeProfile, + testOnStorITFlush, + testOnStorITIsDBEmpty, + testOnStorITTestAttributeSubstituteIface, + testOnStorITChargerProfile, + testOnStorITDispatcherProfile, //testOnStorITCacheActionTriggers, //testOnStorITCRUDActionTriggers, @@ -258,7 +258,7 @@ func testOnStorITCacheActionPlan(t *testing.T) { func testOnStorITCacheAccountActionPlans(t *testing.T) { acntID := utils.ConcatenatedKey("cgrates.org", "1001") aAPs := []string{"PACKAGE_10_SHARED_A_5", "USE_SHARED_A", "apl_PACKAGE_1001"} - if err := onStor.SetAccountActionPlans(acntID, aAPs, true); err != nil { + if err := onStor.DataDB().SetAccountActionPlans(acntID, aAPs, true); err != nil { t.Error(err) } if _, hasIt := Cache.Get(utils.CacheAccountActionPlans, acntID); hasIt { @@ -1060,21 +1060,21 @@ func testOnStorITCRUDAccountActionPlans(t *testing.T) { expect := []string{"PACKAGE_10_SHARED_A_5", "USE_SHARED_A", "apl_PACKAGE_1001"} aAPs := []string{"PACKAGE_10_SHARED_A_5", "apl_PACKAGE_1001"} aAPs2 := []string{"USE_SHARED_A"} - if _, rcvErr := onStor.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.SetAccountActionPlans(acntID, aAPs, true); err != nil { + if err := onStor.DataDB().SetAccountActionPlans(acntID, aAPs, true); err != nil { t.Error(err) } - if rcv, err := onStor.GetAccountActionPlans(acntID, false, false, utils.NonTransactional); err != nil { + if rcv, err := onStor.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(aAPs, rcv) { t.Errorf("Expecting: %v, received: %v", aAPs, rcv) } - if err := onStor.SetAccountActionPlans(acntID, aAPs2, false); err != nil { + if err := onStor.DataDB().SetAccountActionPlans(acntID, aAPs2, false); err != nil { t.Error(err) } - if rcv, err := onStor.GetAccountActionPlans(acntID, false, true, utils.NonTransactional); err != nil { + if rcv, err := onStor.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(expect, rcv) { t.Errorf("Expecting: %v, received: %v", expect, rcv) @@ -1087,7 +1087,7 @@ func testOnStorITCRUDAccountActionPlans(t *testing.T) { // t.Error(rcvErr) // } // - if rcv, err := onStor.GetAccountActionPlans(acntID, true, false, utils.NonTransactional); err != nil { + if rcv, err := onStor.DataDB().GetAccountActionPlans(acntID, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(expect, rcv) { t.Errorf("Expecting: %v, received: %v", expect, rcv) @@ -1095,18 +1095,18 @@ func testOnStorITCRUDAccountActionPlans(t *testing.T) { // if err = onStor.DataDB().SelectDatabase(onStorCfg); err != nil { // t.Error(err) // } - if err := onStor.RemAccountActionPlans(acntID, aAPs2); err != nil { + if err := onStor.DataDB().RemAccountActionPlans(acntID, aAPs2); err != nil { t.Error(err) } - if rcv, err := onStor.GetAccountActionPlans(acntID, true, false, utils.NonTransactional); err != nil { + if rcv, err := onStor.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(aAPs, rcv) { t.Errorf("Expecting: %v, received: %v", aAPs, rcv) } - if err := onStor.RemAccountActionPlans(acntID, aAPs); err != nil { + if err := onStor.DataDB().RemAccountActionPlans(acntID, aAPs); err != nil { t.Error(err) } - if _, rcvErr := onStor.GetAccountActionPlans(acntID, true, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.DataDB().GetAccountActionPlans(acntID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 13810ab3c..03a6f65a8 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -74,9 +74,10 @@ type DataDB interface { SetActionPlan(string, *ActionPlan, bool, string) error RemoveActionPlan(key string, transactionID string) error GetAllActionPlans() (map[string]*ActionPlan, error) - GetAccountActionPlansDrv(acntID string) (apIDs []string, err error) - SetAccountActionPlansDrv(acntID string, apIDs []string) (err error) - RemAccountActionPlansDrv(acntID string, apIDs []string) (err error) + GetAccountActionPlans(acntID string, skipCache bool, + transactionID string) (apIDs []string, err error) + SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) + RemAccountActionPlans(acntID string, apIDs []string) (err error) PushTask(*Task) error PopTask() (*Task, error) GetAccount(string) (*Account, error) diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 1bfa954d1..a1c355795 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -158,7 +158,17 @@ func (iDB *InternalDB) SetVersions(vrs Versions, overwrite bool) (err error) { return err } } - iDB.db.Set(utils.TBLVersions, utils.Version, vrs, nil, + x, ok := iDB.db.Get(utils.TBLVersions, utils.Version) + if !ok || x == nil { + iDB.db.Set(utils.TBLVersions, utils.Version, vrs, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return + } + provVrs := x.(Versions) + for key, val := range vrs { + provVrs[key] = val + } + iDB.db.Set(utils.TBLVersions, utils.Version, provVrs, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -253,16 +263,24 @@ func (iDB *InternalDB) RemoveRatingPlanDrv(id string) (err error) { return } -func (iDB *InternalDB) GetRatingProfileDrv(id string) (*RatingProfile, error) { +func (iDB *InternalDB) GetRatingProfileDrv(id string) (rp *RatingProfile, err error) { x, ok := iDB.db.Get(utils.CacheRatingProfiles, utils.RATING_PROFILE_PREFIX+id) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(*RatingProfile), nil + err = iDB.ms.Unmarshal(x.([]byte), &rp) + if err != nil { + return nil, err + } + return } func (iDB *InternalDB) SetRatingProfileDrv(rp *RatingProfile) (err error) { - iDB.db.Set(utils.CacheRatingProfiles, utils.RATING_PROFILE_PREFIX+rp.Id, rp, nil, + result, err := iDB.ms.Marshal(rp) + if err != nil { + return err + } + iDB.db.Set(utils.CacheRatingProfiles, utils.RATING_PROFILE_PREFIX+rp.Id, result, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -350,9 +368,20 @@ func (iDB *InternalDB) RemoveDestination(destID string, transactionID string) (e } func (iDB *InternalDB) SetReverseDestination(dest *Destination, transactionID string) (err error) { + var mpRevDst utils.StringMap for _, p := range dest.Prefixes { + if iDB.db.HasItem(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+p) { + x, ok := iDB.db.Get(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+p) + if !ok || x == nil { + return utils.ErrNotFound + } + mpRevDst = x.(utils.StringMap) + } else { + mpRevDst = make(utils.StringMap) + } + mpRevDst[dest.Id] = true // for ReverseDestination we will use Groups - iDB.db.Set(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+p, dest.Id, nil, + iDB.db.Set(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+p, mpRevDst, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -368,9 +397,11 @@ func (iDB *InternalDB) GetReverseDestination(prefix string, return nil, utils.ErrNotFound } } - - ids = iDB.db.GetItemIDs(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+prefix) - fmt.Println(ids) + x, ok := iDB.db.Get(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+prefix) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + ids = x.(utils.StringMap).Slice() if len(ids) == 0 { Cache.Set(utils.CacheReverseDestinations, prefix, nil, nil, cacheCommit(transactionID), transactionID) @@ -384,6 +415,7 @@ func (iDB *InternalDB) GetReverseDestination(prefix string, func (iDB *InternalDB) UpdateReverseDestination(oldDest, newDest *Destination, transactionID string) error { var obsoletePrefixes []string + var mpRevDst utils.StringMap var addedPrefixes []string var found bool for _, oldPrefix := range oldDest.Prefixes { @@ -414,32 +446,59 @@ func (iDB *InternalDB) UpdateReverseDestination(oldDest, newDest *Destination, cCommit := cacheCommit(transactionID) var err error for _, obsoletePrefix := range obsoletePrefixes { - iDB.db.Remove(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix, - cacheCommit(utils.NonTransactional), utils.NonTransactional) + if iDB.db.HasItem(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix) { + x, ok := iDB.db.Get(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix) + if !ok || x == nil { + return utils.ErrNotFound + } + mpRevDst = x.(utils.StringMap) + if _, has := mpRevDst[oldDest.Id]; has { + delete(mpRevDst, oldDest.Id) + } + // for ReverseDestination we will use Groups + iDB.db.Set(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix, mpRevDst, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } Cache.Remove(utils.CacheReverseDestinations, obsoletePrefix, cCommit, transactionID) } // add the id to all new prefixes for _, addedPrefix := range addedPrefixes { - iDB.db.Set(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+addedPrefix, nil, []string{newDest.Id}, + if iDB.db.HasItem(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+addedPrefix) { + x, ok := iDB.db.Get(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+addedPrefix) + if !ok || x == nil { + return utils.ErrNotFound + } + mpRevDst = x.(utils.StringMap) + } else { + mpRevDst = make(utils.StringMap) + } + mpRevDst[newDest.Id] = true + // for ReverseDestination we will use Groups + iDB.db.Set(utils.CacheReverseDestinations, utils.REVERSE_DESTINATION_PREFIX+addedPrefix, mpRevDst, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) - Cache.Remove(utils.CacheReverseDestinations, addedPrefix, - cCommit, transactionID) } return err } -func (iDB *InternalDB) GetActionsDrv(id string) (Actions, error) { +func (iDB *InternalDB) GetActionsDrv(id string) (acts Actions, err error) { x, ok := iDB.db.Get(utils.CacheActions, utils.ACTION_PREFIX+id) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(Actions), nil + if err = iDB.ms.Unmarshal(x.([]byte), &acts); err != nil { + return nil, err + } + return } func (iDB *InternalDB) SetActionsDrv(id string, acts Actions) (err error) { - iDB.db.Set(utils.CacheActions, utils.ACTION_PREFIX+id, acts, nil, + result, err := iDB.ms.Marshal(acts) + if err != nil { + return err + } + iDB.db.Set(utils.CacheActions, utils.ACTION_PREFIX+id, result, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -450,16 +509,23 @@ func (iDB *InternalDB) RemoveActionsDrv(id string) (err error) { return } -func (iDB *InternalDB) GetSharedGroupDrv(id string) (*SharedGroup, error) { +func (iDB *InternalDB) GetSharedGroupDrv(id string) (sh *SharedGroup, err error) { x, ok := iDB.db.Get(utils.CacheSharedGroups, utils.SHARED_GROUP_PREFIX+id) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(*SharedGroup), nil + if err = iDB.ms.Unmarshal(x.([]byte), &sh); err != nil { + return nil, err + } + return } func (iDB *InternalDB) SetSharedGroupDrv(sh *SharedGroup) (err error) { - iDB.db.Set(utils.CacheSharedGroups, utils.SHARED_GROUP_PREFIX+sh.Id, sh, nil, + result, err := iDB.ms.Marshal(sh) + if err != nil { + return err + } + iDB.db.Set(utils.CacheSharedGroups, utils.SHARED_GROUP_PREFIX+sh.Id, result, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -470,16 +536,23 @@ func (iDB *InternalDB) RemoveSharedGroupDrv(id string) (err error) { return } -func (iDB *InternalDB) GetActionTriggersDrv(id string) (ActionTriggers, error) { +func (iDB *InternalDB) GetActionTriggersDrv(id string) (at ActionTriggers, err error) { x, ok := iDB.db.Get(utils.CacheActionTriggers, utils.ACTION_TRIGGER_PREFIX+id) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(ActionTriggers), nil + if err = iDB.ms.Unmarshal(x.([]byte), &at); err != nil { + return nil, err + } + return } func (iDB *InternalDB) SetActionTriggersDrv(id string, at ActionTriggers) (err error) { - iDB.db.Set(utils.CacheActionTriggers, utils.ACTION_TRIGGER_PREFIX+id, at, nil, + result, err := iDB.ms.Marshal(at) + if err != nil { + return err + } + iDB.db.Set(utils.CacheActionTriggers, utils.ACTION_TRIGGER_PREFIX+id, result, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -507,7 +580,7 @@ func (iDB *InternalDB) GetActionPlan(key string, skipCache bool, transactionID s return nil, utils.ErrNotFound } err = iDB.ms.Unmarshal(x.([]byte), &ats) - Cache.Set(utils.CacheActionPlans, utils.ACTION_PLAN_PREFIX+key, ats, nil, + Cache.Set(utils.CacheActionPlans, key, ats, nil, cCommit, transactionID) return } @@ -541,7 +614,6 @@ func (iDB *InternalDB) SetActionPlan(key string, ats *ActionPlan, } iDB.db.Set(utils.CacheActionPlans, utils.ACTION_PLAN_PREFIX+key, result, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) - Cache.Remove(utils.CacheActionPlans, key, cCommit, transactionID) return } @@ -568,18 +640,43 @@ func (iDB *InternalDB) GetAllActionPlans() (ats map[string]*ActionPlan, err erro return } -func (iDB *InternalDB) GetAccountActionPlansDrv(acntID string) (apIDs []string, err error) { +func (iDB *InternalDB) GetAccountActionPlans(acntID string, + skipCache bool, transactionID string) (apIDs []string, err error) { + if !skipCache { + if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.([]string), nil + } + } x, ok := iDB.db.Get(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID) if !ok || x == nil { + Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, + cacheCommit(transactionID), transactionID) return nil, utils.ErrNotFound } if err = iDB.ms.Unmarshal(x.([]byte), &apIDs); err != nil { return nil, err } + Cache.Set(utils.CacheAccountActionPlans, acntID, apIDs, nil, + cacheCommit(transactionID), transactionID) return } -func (iDB *InternalDB) SetAccountActionPlansDrv(acntID string, apIDs []string) (err error) { + +func (iDB *InternalDB) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) { + if !overwrite { + if oldaPlIDs, err := iDB.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { + return err + } else { + for _, oldAPid := range oldaPlIDs { + if !utils.IsSliceMember(apIDs, oldAPid) { + apIDs = append(apIDs, oldAPid) + } + } + } + } result, err := iDB.ms.Marshal(apIDs) if err != nil { return err @@ -589,13 +686,15 @@ func (iDB *InternalDB) SetAccountActionPlansDrv(acntID string, apIDs []string) ( return } -func (iDB *InternalDB) RemAccountActionPlansDrv(acntID string, apIDs []string) (err error) { + +func (iDB *InternalDB) RemAccountActionPlans(acntID string, apIDs []string) (err error) { + key := utils.AccountActionPlansPrefix + acntID if len(apIDs) == 0 { - iDB.db.Remove(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, + iDB.db.Remove(utils.CacheAccountActionPlans, key, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } - oldaPlIDs, err := iDB.GetAccountActionPlansDrv(acntID) + oldaPlIDs, err := iDB.GetAccountActionPlans(acntID, true, utils.NonTransactional) if err != nil { return err } @@ -606,7 +705,6 @@ func (iDB *InternalDB) RemAccountActionPlansDrv(acntID string, apIDs []string) ( } i++ } - if len(oldaPlIDs) == 0 { iDB.db.Remove(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, cacheCommit(utils.NonTransactional), utils.NonTransactional) @@ -683,16 +781,24 @@ func (iDB *InternalDB) RemoveAccount(id string) (err error) { return } -func (iDB *InternalDB) GetResourceProfileDrv(tenant, id string) (*ResourceProfile, error) { +func (iDB *InternalDB) GetResourceProfileDrv(tenant, id string) (rp *ResourceProfile, err error) { x, ok := iDB.db.Get(utils.CacheResourceProfiles, utils.ResourceProfilesPrefix+utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(*ResourceProfile), nil + err = iDB.ms.Unmarshal(x.([]byte), &rp) + if err != nil { + return nil, err + } + return } func (iDB *InternalDB) SetResourceProfileDrv(rp *ResourceProfile) (err error) { - iDB.db.Set(utils.CacheResourceProfiles, utils.ResourceProfilesPrefix+rp.TenantID(), rp, nil, + result, err := iDB.ms.Marshal(rp) + if err != nil { + return err + } + iDB.db.Set(utils.CacheResourceProfiles, utils.ResourceProfilesPrefix+rp.TenantID(), result, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -703,16 +809,24 @@ func (iDB *InternalDB) RemoveResourceProfileDrv(tenant, id string) (err error) { return } -func (iDB *InternalDB) GetResourceDrv(tenant, id string) (*Resource, error) { +func (iDB *InternalDB) GetResourceDrv(tenant, id string) (r *Resource, err error) { x, ok := iDB.db.Get(utils.CacheResources, utils.ResourcesPrefix+utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(*Resource), nil + err = iDB.ms.Unmarshal(x.([]byte), &r) + if err != nil { + return nil, err + } + return } func (iDB *InternalDB) SetResourceDrv(r *Resource) (err error) { - iDB.db.Set(utils.CacheResources, utils.ResourcesPrefix+r.TenantID(), r, nil, + result, err := iDB.ms.Marshal(r) + if err != nil { + return err + } + iDB.db.Set(utils.CacheResources, utils.ResourcesPrefix+r.TenantID(), result, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -723,16 +837,24 @@ func (iDB *InternalDB) RemoveResourceDrv(tenant, id string) (err error) { return } -func (iDB *InternalDB) GetTimingDrv(id string) (*utils.TPTiming, error) { +func (iDB *InternalDB) GetTimingDrv(id string) (tmg *utils.TPTiming, err error) { x, ok := iDB.db.Get(utils.CacheTimings, utils.TimingsPrefix+id) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(*utils.TPTiming), nil + err = iDB.ms.Unmarshal(x.([]byte), &tmg) + if err != nil { + return nil, err + } + return } func (iDB *InternalDB) SetTimingDrv(timing *utils.TPTiming) (err error) { - iDB.db.Set(utils.CacheTimings, utils.TimingsPrefix+timing.ID, timing, nil, + result, err := iDB.ms.Marshal(timing) + if err != nil { + return err + } + iDB.db.Set(utils.CacheTimings, utils.TimingsPrefix+timing.ID, result, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -815,7 +937,6 @@ func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string, x, ok := iDB.db.Get(cacheID, dbKey) if !ok || x == nil { - result, err := iDB.ms.Marshal(toBeAdded) if err != nil { return err @@ -874,16 +995,24 @@ func (iDB *InternalDB) MatchFilterIndexDrv(cacheID, itemIDPrefix, return } -func (iDB *InternalDB) GetStatQueueProfileDrv(tenant string, id string) (*StatQueueProfile, error) { +func (iDB *InternalDB) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) { x, ok := iDB.db.Get(utils.CacheStatQueueProfiles, utils.StatQueueProfilePrefix+utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(*StatQueueProfile), nil + err = iDB.ms.Unmarshal(x.([]byte), &sq) + if err != nil { + return nil, err + } + return } func (iDB *InternalDB) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { - iDB.db.Set(utils.CacheStatQueueProfiles, utils.StatQueueProfilePrefix+sq.TenantID(), sq, nil, + result, err := iDB.ms.Marshal(sq) + if err != nil { + return err + } + iDB.db.Set(utils.CacheStatQueueProfiles, utils.StatQueueProfilePrefix+sq.TenantID(), result, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -899,10 +1028,18 @@ func (iDB *InternalDB) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQ if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(*StoredStatQueue), nil + err = iDB.ms.Unmarshal(x.([]byte), &sq) + if err != nil { + return nil, err + } + return } func (iDB *InternalDB) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) { - iDB.db.Set(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(sq.Tenant, sq.ID), sq, nil, + result, err := iDB.ms.Marshal(sq) + if err != nil { + return err + } + iDB.db.Set(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(sq.Tenant, sq.ID), result, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -932,7 +1069,7 @@ func (iDB *InternalDB) RemThresholdProfileDrv(tenant, id string) (err error) { return } -func (iDB *InternalDB) GetThresholdDrv(tenant, id string) (*Threshold, error) { +func (iDB *InternalDB) GetThresholdDrv(tenant, id string) (th *Threshold, err error) { x, ok := iDB.db.Get(utils.CacheThresholds, utils.ThresholdPrefix+utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound @@ -973,12 +1110,11 @@ func (iDB *InternalDB) RemoveFilterDrv(tenant, id string) (err error) { return } -func (iDB *InternalDB) GetSupplierProfileDrv(tenant, id string) (*SupplierProfile, error) { +func (iDB *InternalDB) GetSupplierProfileDrv(tenant, id string) (spp *SupplierProfile, err error) { x, ok := iDB.db.Get(utils.CacheSupplierProfiles, utils.SupplierProfilePrefix+utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound } - return x.(*SupplierProfile), nil } @@ -992,7 +1128,7 @@ func (iDB *InternalDB) RemoveSupplierProfileDrv(tenant, id string) (err error) { cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) GetAttributeProfileDrv(tenant, id string) (*AttributeProfile, error) { +func (iDB *InternalDB) GetAttributeProfileDrv(tenant, id string) (attr *AttributeProfile, err error) { x, ok := iDB.db.Get(utils.CacheAttributeProfiles, utils.AttributeProfilePrefix+utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound @@ -1009,7 +1145,7 @@ func (iDB *InternalDB) RemoveAttributeProfileDrv(tenant, id string) (err error) cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) GetChargerProfileDrv(tenant, id string) (*ChargerProfile, error) { +func (iDB *InternalDB) GetChargerProfileDrv(tenant, id string) (ch *ChargerProfile, err error) { x, ok := iDB.db.Get(utils.CacheChargerProfiles, utils.ChargerProfilePrefix+utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound @@ -1026,7 +1162,7 @@ func (iDB *InternalDB) RemoveChargerProfileDrv(tenant, id string) (err error) { cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) GetDispatcherProfileDrv(tenant, id string) (*DispatcherProfile, error) { +func (iDB *InternalDB) GetDispatcherProfileDrv(tenant, id string) (dpp *DispatcherProfile, err error) { x, ok := iDB.db.Get(utils.CacheDispatcherProfiles, utils.DispatcherProfilePrefix+utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound @@ -1048,7 +1184,10 @@ func (iDB *InternalDB) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[strin if !ok || x == nil { return nil, utils.ErrNotFound } - loadIDs = x.(map[string]int64) + err = iDB.ms.Unmarshal(x.([]byte), &loadIDs) + if err != nil { + return nil, err + } if itemIDPrefix != utils.EmptyString { return map[string]int64{itemIDPrefix: loadIDs[itemIDPrefix]}, nil } @@ -1056,11 +1195,15 @@ func (iDB *InternalDB) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[strin } func (iDB *InternalDB) SetLoadIDsDrv(loadIDs map[string]int64) (err error) { - iDB.db.Set(utils.CacheLoadIDs, utils.LoadIDs, loadIDs, nil, + result, err := iDB.ms.Marshal(loadIDs) + if err != nil { + return err + } + iDB.db.Set(utils.CacheLoadIDs, utils.LoadIDs, result, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) GetDispatcherHostDrv(tenant, id string) (*DispatcherHost, error) { +func (iDB *InternalDB) GetDispatcherHostDrv(tenant, id string) (dpp *DispatcherHost, err error) { x, ok := iDB.db.Get(utils.CacheDispatcherHosts, utils.DispatcherHostPrefix+utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound diff --git a/engine/storage_internal_stordb.go b/engine/storage_internal_stordb.go index 88c63c3dc..174c14b9a 100644 --- a/engine/storage_internal_stordb.go +++ b/engine/storage_internal_stordb.go @@ -17,3 +17,1010 @@ along with this program. If not, see */ package engine + +import ( + "strings" + + "github.com/cgrates/cgrates/utils" +) + +//implement LoadReader interface +func (iDB *InternalDB) GetTpIds(colName string) (ids []string, err error) { + return nil, utils.ErrNotImplemented +} + +func (iDB *InternalDB) GetTpTableIds(tpid, table string, distinct utils.TPDistinctIds, + filters map[string]string, paginator *utils.PaginatorWithSearch) (ids []string, err error) { + key := table + utils.CONCATENATED_KEY_SEP + tpid + fullIDs := iDB.db.GetItemIDs(table, key) + switch table { + // in case of account action we have the id the following form : loadid:tenant:account + // so we need to treat it as a special case + case utils.TBLTPAccountActions: + for _, fullID := range fullIDs { + var buildedID string + sliceID := strings.Split(fullID[len(key)+1:], utils.CONCATENATED_KEY_SEP) + for _, key := range distinct { + switch key { + case "loadid": + if len(buildedID) == 0 { + buildedID += sliceID[0] + } else { + buildedID += utils.CONCATENATED_KEY_SEP + sliceID[0] + } + case "tenant": + buildedID += utils.CONCATENATED_KEY_SEP + sliceID[1] + case "account": + buildedID += utils.CONCATENATED_KEY_SEP + sliceID[2] + + } + } + ids = append(ids, buildedID) + } + // in case of rating profile we have the id in the following form : loadid:tenant:category:subject + // so we need to treat it as a special case + case utils.TBLTPRateProfiles: + for _, fullID := range fullIDs { + var buildedID string + sliceID := strings.Split(fullID[len(key)+1:], utils.CONCATENATED_KEY_SEP) + for _, key := range distinct { + switch key { + case "loadid": + if len(buildedID) == 0 { + buildedID += sliceID[0] + } else { + buildedID += utils.CONCATENATED_KEY_SEP + sliceID[0] + } + case "tenant": + buildedID += utils.CONCATENATED_KEY_SEP + sliceID[1] + case "category": + buildedID += utils.CONCATENATED_KEY_SEP + sliceID[2] + case "subject": + buildedID += utils.CONCATENATED_KEY_SEP + sliceID[3] + + } + } + ids = append(ids, buildedID) + } + default: + for _, fullID := range fullIDs { + var buildedID string + sliceID := strings.Split(fullID[len(key)+1:], utils.CONCATENATED_KEY_SEP) + for i := 0; i < len(distinct); i++ { + if len(buildedID) == 0 { + buildedID += sliceID[len(sliceID)-i-1] + } else { + buildedID += utils.CONCATENATED_KEY_SEP + sliceID[len(sliceID)-i+1] + } + } + ids = append(ids, buildedID) + } + } + + return +} + +func (iDB *InternalDB) GetTPTimings(tpid, id string) (timings []*utils.ApierTPTiming, err error) { + key := utils.TBLTPTimings + utils.CONCATENATED_KEY_SEP + tpid + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + + ids := iDB.db.GetItemIDs(utils.TBLTPTimings, key) + for _, id := range ids { + + x, ok := iDB.db.Get(utils.TBLTPTimings, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.ApierTPTiming + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + timings = append(timings, result) + } + if len(timings) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPDestinations(tpid, id string) (dsts []*utils.TPDestination, err error) { + key := utils.TBLTPDestinations + utils.CONCATENATED_KEY_SEP + tpid + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPDestinations, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPDestinations, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPDestination + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + dsts = append(dsts, result) + } + + if len(dsts) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPRates(tpid, id string) (rates []*utils.TPRate, err error) { + key := utils.TBLTPRates + utils.CONCATENATED_KEY_SEP + tpid + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPRates, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPRates, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPRate + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + for _, rs := range result.RateSlots { + rs.SetDurations() + } + rates = append(rates, result) + } + + if len(rates) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPDestinationRates(tpid, id string, + paginator *utils.Paginator) (dRates []*utils.TPDestinationRate, err error) { + key := utils.TBLTPDestinationRates + utils.CONCATENATED_KEY_SEP + tpid + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPDestinationRates, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPDestinationRates, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPDestinationRate + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + dRates = append(dRates, result) + } + + if len(dRates) == 0 { + return nil, utils.ErrNotFound + } + if paginator != nil { + var limit, offset int + if paginator.Limit != nil && *paginator.Limit > 0 { + limit = *paginator.Limit + } + if paginator.Offset != nil && *paginator.Offset > 0 { + offset = *paginator.Offset + } + if limit == 0 && offset == 0 { + return dRates, nil + } + if offset > len(dRates) { + return + } + if offset != 0 { + limit = limit + offset + } + if limit == 0 { + limit = len(dRates[offset:]) + } else if limit > len(dRates) { + limit = len(dRates) + } + dRates = dRates[offset:limit] + } + return +} + +func (iDB *InternalDB) GetTPRatingPlans(tpid, id string, paginator *utils.Paginator) (rPlans []*utils.TPRatingPlan, err error) { + key := utils.TBLTPRatingPlans + utils.CONCATENATED_KEY_SEP + tpid + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPRatingPlans, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPRatingPlans, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPRatingPlan + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + rPlans = append(rPlans, result) + } + + if len(rPlans) == 0 { + return nil, utils.ErrNotFound + } + if paginator != nil { + var limit, offset int + if paginator.Limit != nil && *paginator.Limit > 0 { + limit = *paginator.Limit + } + if paginator.Offset != nil && *paginator.Offset > 0 { + offset = *paginator.Offset + } + if limit == 0 && offset == 0 { + return rPlans, nil + } + if offset > len(rPlans) { + return + } + if offset != 0 { + limit = limit + offset + } + if limit == 0 { + limit = len(rPlans[offset:]) + } else if limit > len(rPlans) { + limit = len(rPlans) + } + rPlans = rPlans[offset:limit] + } + return +} + +func (iDB *InternalDB) GetTPRatingProfiles(filter *utils.TPRatingProfile) (rProfiles []*utils.TPRatingProfile, err error) { + key := utils.TBLTPRateProfiles + utils.CONCATENATED_KEY_SEP + filter.TPid + + if filter.LoadId != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + filter.LoadId + } + if filter.Tenant != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + filter.Tenant + } + if filter.Category != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + filter.Category + } + if filter.Subject != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + filter.Subject + } + ids := iDB.db.GetItemIDs(utils.TBLTPRateProfiles, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPRateProfiles, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPRatingProfile + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + rProfiles = append(rProfiles, result) + } + + if len(rProfiles) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPSharedGroups(tpid, id string) (sGroups []*utils.TPSharedGroups, err error) { + key := utils.TBLTPSharedGroups + utils.CONCATENATED_KEY_SEP + tpid + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPSharedGroups, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPSharedGroups, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPSharedGroups + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + sGroups = append(sGroups, result) + } + + if len(sGroups) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPActions(tpid, id string) (actions []*utils.TPActions, err error) { + key := utils.TBLTPActions + utils.CONCATENATED_KEY_SEP + tpid + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPActions, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPActions, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPActions + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + actions = append(actions, result) + + } + if len(actions) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPActionPlans(tpid, id string) (aPlans []*utils.TPActionPlan, err error) { + key := utils.TBLTPActionPlans + utils.CONCATENATED_KEY_SEP + tpid + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPActionPlans, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPActionPlans, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPActionPlan + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + aPlans = append(aPlans, result) + + } + if len(aPlans) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPActionTriggers(tpid, id string) (aTriggers []*utils.TPActionTriggers, err error) { + key := utils.TBLTPActionTriggers + utils.CONCATENATED_KEY_SEP + tpid + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPActionTriggers, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPActionTriggers, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPActionTriggers + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + aTriggers = append(aTriggers, result) + } + if len(aTriggers) == 0 { + return nil, utils.ErrNotFound + } + return +} +func (iDB *InternalDB) GetTPAccountActions(filter *utils.TPAccountActions) (accounts []*utils.TPAccountActions, err error) { + key := utils.TBLTPAccountActions + utils.CONCATENATED_KEY_SEP + filter.TPid + + if filter.LoadId != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + filter.LoadId + } + if filter.Tenant != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + filter.Tenant + } + if filter.Account != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + filter.Account + } + ids := iDB.db.GetItemIDs(utils.TBLTPAccountActions, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPAccountActions, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPAccountActions + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + accounts = append(accounts, result) + } + + if len(accounts) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPResources(tpid, tenant, id string) (resources []*utils.TPResourceProfile, err error) { + key := utils.TBLTPResources + utils.CONCATENATED_KEY_SEP + tpid + if tenant != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + tenant + } + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPResources, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPResources, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPResourceProfile + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + resources = append(resources, result) + + } + if len(resources) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPStats(tpid, tenant, id string) (stats []*utils.TPStatProfile, err error) { + key := utils.TBLTPStats + utils.CONCATENATED_KEY_SEP + tpid + if tenant != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + tenant + } + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPStats, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPStats, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPStatProfile + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + stats = append(stats, result) + + } + if len(stats) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPThresholds(tpid, tenant, id string) (ths []*utils.TPThresholdProfile, err error) { + key := utils.TBLTPThresholds + utils.CONCATENATED_KEY_SEP + tpid + if tenant != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + tenant + } + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPThresholds, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPThresholds, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPThresholdProfile + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + ths = append(ths, result) + + } + if len(ths) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPFilters(tpid, tenant, id string) (fltrs []*utils.TPFilterProfile, err error) { + key := utils.TBLTPFilters + utils.CONCATENATED_KEY_SEP + tpid + if tenant != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + tenant + } + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPFilters, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPFilters, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPFilterProfile + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + fltrs = append(fltrs, result) + + } + if len(fltrs) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPSuppliers(tpid, tenant, id string) (supps []*utils.TPSupplierProfile, err error) { + key := utils.TBLTPSuppliers + utils.CONCATENATED_KEY_SEP + tpid + if tenant != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + tenant + } + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPSuppliers, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPSuppliers, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPSupplierProfile + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + supps = append(supps, result) + + } + if len(supps) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPAttributes(tpid, tenant, id string) (attrs []*utils.TPAttributeProfile, err error) { + key := utils.TBLTPAttributes + utils.CONCATENATED_KEY_SEP + tpid + if tenant != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + tenant + } + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPAttributes, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPAttributes, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPAttributeProfile + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + attrs = append(attrs, result) + + } + if len(attrs) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPChargers(tpid, tenant, id string) (cpps []*utils.TPChargerProfile, err error) { + key := utils.TBLTPChargers + utils.CONCATENATED_KEY_SEP + tpid + if tenant != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + tenant + } + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPChargers, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPChargers, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPChargerProfile + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + cpps = append(cpps, result) + + } + if len(cpps) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPDispatcherProfiles(tpid, tenant, id string) (dpps []*utils.TPDispatcherProfile, err error) { + key := utils.TBLTPDispatchers + utils.CONCATENATED_KEY_SEP + tpid + if tenant != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + tenant + } + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPDispatchers, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPDispatchers, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPDispatcherProfile + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + dpps = append(dpps, result) + + } + if len(dpps) == 0 { + return nil, utils.ErrNotFound + } + return +} + +func (iDB *InternalDB) GetTPDispatcherHosts(tpid, tenant, id string) (dpps []*utils.TPDispatcherHost, err error) { + key := utils.TBLTPDispatcherHosts + utils.CONCATENATED_KEY_SEP + tpid + if tenant != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + tenant + } + if id != utils.EmptyString { + key += utils.CONCATENATED_KEY_SEP + id + } + ids := iDB.db.GetItemIDs(utils.TBLTPDispatcherHosts, key) + for _, id := range ids { + x, ok := iDB.db.Get(utils.TBLTPDispatcherHosts, id) + if !ok || x == nil { + return nil, utils.ErrNotFound + } + var result *utils.TPDispatcherHost + if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { + return nil, err + } + dpps = append(dpps, result) + + } + if len(dpps) == 0 { + return nil, utils.ErrNotFound + } + return +} + +//implement LoadWriter interface +func (iDB *InternalDB) RemTpData(table, tpid string, args map[string]string) (err error) { + if table == utils.EmptyString { + return iDB.Flush(utils.EmptyString) + } + key := table + utils.CONCATENATED_KEY_SEP + tpid + if args != nil { + for _, val := range args { + key += utils.CONCATENATED_KEY_SEP + val + } + } + ids := iDB.db.GetItemIDs(table, key) + for _, id := range ids { + iDB.db.Remove(table, id, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPTimings(timings []*utils.ApierTPTiming) (err error) { + if len(timings) == 0 { + return nil + } + for _, timing := range timings { + result, err := iDB.ms.Marshal(timing) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPTimings, utils.ConcatenatedKey(utils.TBLTPTimings, timing.TPid, timing.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} +func (iDB *InternalDB) SetTPDestinations(dests []*utils.TPDestination) (err error) { + if len(dests) == 0 { + return nil + } + for _, destination := range dests { + result, err := iDB.ms.Marshal(destination) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPDestinations, utils.ConcatenatedKey(utils.TBLTPDestinations, destination.TPid, destination.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPRates(rates []*utils.TPRate) (err error) { + if len(rates) == 0 { + return nil + } + for _, rate := range rates { + result, err := iDB.ms.Marshal(rate) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPRates, utils.ConcatenatedKey(utils.TBLTPRates, rate.TPid, rate.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPDestinationRates(dRates []*utils.TPDestinationRate) (err error) { + if len(dRates) == 0 { + return nil + } + for _, dRate := range dRates { + result, err := iDB.ms.Marshal(dRate) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPDestinationRates, utils.ConcatenatedKey(utils.TBLTPDestinationRates, dRate.TPid, dRate.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPRatingPlans(ratingPlans []*utils.TPRatingPlan) (err error) { + if len(ratingPlans) == 0 { + return nil + } + for _, rPlan := range ratingPlans { + result, err := iDB.ms.Marshal(rPlan) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPRatingPlans, utils.ConcatenatedKey(utils.TBLTPRatingPlans, rPlan.TPid, rPlan.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPRatingProfiles(ratingProfiles []*utils.TPRatingProfile) (err error) { + if len(ratingProfiles) == 0 { + return nil + } + for _, rProfile := range ratingProfiles { + result, err := iDB.ms.Marshal(rProfile) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPRateProfiles, utils.ConcatenatedKey(utils.TBLTPRateProfiles, rProfile.TPid, + rProfile.LoadId, rProfile.Tenant, rProfile.Category, rProfile.Subject), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPSharedGroups(groups []*utils.TPSharedGroups) (err error) { + if len(groups) == 0 { + return nil + } + for _, group := range groups { + result, err := iDB.ms.Marshal(group) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPSharedGroups, utils.ConcatenatedKey(utils.TBLTPSharedGroups, group.TPid, group.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPActions(acts []*utils.TPActions) (err error) { + if len(acts) == 0 { + return nil + } + for _, action := range acts { + result, err := iDB.ms.Marshal(action) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPActions, utils.ConcatenatedKey(utils.TBLTPActions, action.TPid, action.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPActionPlans(aPlans []*utils.TPActionPlan) (err error) { + if len(aPlans) == 0 { + return nil + } + for _, aPlan := range aPlans { + result, err := iDB.ms.Marshal(aPlan) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPActionPlans, utils.ConcatenatedKey(utils.TBLTPActionPlans, aPlan.TPid, aPlan.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPActionTriggers(aTriggers []*utils.TPActionTriggers) (err error) { + if len(aTriggers) == 0 { + return nil + } + for _, aTrigger := range aTriggers { + result, err := iDB.ms.Marshal(aTrigger) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPActionTriggers, utils.ConcatenatedKey(utils.TBLTPActionTriggers, aTrigger.TPid, aTrigger.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPAccountActions(accActions []*utils.TPAccountActions) (err error) { + if len(accActions) == 0 { + return nil + } + for _, accAction := range accActions { + result, err := iDB.ms.Marshal(accAction) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPAccountActions, utils.ConcatenatedKey(utils.TBLTPAccountActions, accAction.TPid, + accAction.LoadId, accAction.Tenant, accAction.Account), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPResources(resources []*utils.TPResourceProfile) (err error) { + if len(resources) == 0 { + return nil + } + for _, resource := range resources { + result, err := iDB.ms.Marshal(resource) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPResources, utils.ConcatenatedKey(utils.TBLTPResources, resource.TPid, resource.Tenant, resource.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} +func (iDB *InternalDB) SetTPStats(stats []*utils.TPStatProfile) (err error) { + if len(stats) == 0 { + return nil + } + for _, stat := range stats { + result, err := iDB.ms.Marshal(stat) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPStats, utils.ConcatenatedKey(utils.TBLTPStats, stat.TPid, stat.Tenant, stat.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} +func (iDB *InternalDB) SetTPThresholds(thresholds []*utils.TPThresholdProfile) (err error) { + if len(thresholds) == 0 { + return nil + } + + for _, threshold := range thresholds { + result, err := iDB.ms.Marshal(threshold) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPThresholds, utils.ConcatenatedKey(utils.TBLTPThresholds, threshold.TPid, threshold.Tenant, threshold.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} +func (iDB *InternalDB) SetTPFilters(filters []*utils.TPFilterProfile) (err error) { + if len(filters) == 0 { + return nil + } + + for _, filter := range filters { + result, err := iDB.ms.Marshal(filter) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPFilters, utils.ConcatenatedKey(utils.TBLTPFilters, filter.TPid, filter.Tenant, filter.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPSuppliers(suppliers []*utils.TPSupplierProfile) (err error) { + if len(suppliers) == 0 { + return nil + } + for _, supplier := range suppliers { + result, err := iDB.ms.Marshal(supplier) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPSuppliers, utils.ConcatenatedKey(utils.TBLTPSuppliers, supplier.TPid, supplier.Tenant, supplier.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +func (iDB *InternalDB) SetTPAttributes(attributes []*utils.TPAttributeProfile) (err error) { + if len(attributes) == 0 { + return nil + } + + for _, attribute := range attributes { + result, err := iDB.ms.Marshal(attribute) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPAttributes, utils.ConcatenatedKey(utils.TBLTPAttributes, attribute.TPid, attribute.Tenant, attribute.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} +func (iDB *InternalDB) SetTPChargers(cpps []*utils.TPChargerProfile) (err error) { + if len(cpps) == 0 { + return nil + } + + for _, cpp := range cpps { + result, err := iDB.ms.Marshal(cpp) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPChargers, utils.ConcatenatedKey(utils.TBLTPChargers, cpp.TPid, cpp.Tenant, cpp.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} +func (iDB *InternalDB) SetTPDispatcherProfiles(dpps []*utils.TPDispatcherProfile) (err error) { + if len(dpps) == 0 { + return nil + } + + for _, dpp := range dpps { + result, err := iDB.ms.Marshal(dpp) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPDispatchers, utils.ConcatenatedKey(utils.TBLTPDispatchers, dpp.TPid, dpp.Tenant, dpp.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} +func (iDB *InternalDB) SetTPDispatcherHosts(dpps []*utils.TPDispatcherHost) (err error) { + if len(dpps) == 0 { + return nil + } + for _, dpp := range dpps { + result, err := iDB.ms.Marshal(dpp) + if err != nil { + return err + } + iDB.db.Set(utils.TBLTPDispatcherHosts, utils.ConcatenatedKey(utils.TBLTPDispatcherHosts, dpp.TPid, dpp.Tenant, dpp.ID), result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return +} + +//implement CdrStorage interface +func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) { + return utils.ErrNotImplemented +} +func (iDB *InternalDB) RemoveSMCost(smc *SMCost) (err error) { + return utils.ErrNotImplemented +} +func (iDB *InternalDB) RemoveSMCosts(qryFltr *utils.SMCostFilter) error { + return utils.ErrNotImplemented +} +func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*CDR, count int64, err error) { + return nil, 0, utils.ErrNotImplemented +} + +func (iDB *InternalDB) GetSMCosts(cgrid, runid, originHost, originIDPrfx string) (smCosts []*SMCost, err error) { + return nil, utils.ErrNotImplemented +} + +func (iDB *InternalDB) SetSMCost(smCost *SMCost) error { + if smCost.CostDetails == nil { + return nil + } + result, err := iDB.ms.Marshal(smCost) + if err != nil { + return err + } + iDB.db.Set(utils.SessionCostsTBL, utils.LOG_CALL_COST_PREFIX+smCost.CostSource+smCost.RunID+"_"+smCost.CGRID, result, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return err +} diff --git a/engine/storage_map_datadb.go b/engine/storage_map_datadb.go index e5df5c187..485357a04 100644 --- a/engine/storage_map_datadb.go +++ b/engine/storage_map_datadb.go @@ -662,20 +662,45 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error return } -func (ms *MapStorage) GetAccountActionPlansDrv(acntID string) (apIDs []string, err error) { +func (ms *MapStorage) GetAccountActionPlans(acntID string, + skipCache bool, transactionID string) (apIDs []string, err error) { ms.mu.RLock() defer ms.mu.RUnlock() + if !skipCache { + if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.([]string), nil + } + } values, ok := ms.dict[utils.AccountActionPlansPrefix+acntID] if !ok { - return nil, utils.ErrNotFound + Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, + cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + return nil, err } if err = ms.ms.Unmarshal(values, &apIDs); err != nil { return nil, err } + Cache.Set(utils.CacheAccountActionPlans, acntID, apIDs, nil, + cacheCommit(transactionID), transactionID) return } -func (ms *MapStorage) SetAccountActionPlansDrv(acntID string, apIDs []string) (err error) { +func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) { + if !overwrite { + if oldaPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { + return err + } else { + for _, oldAPid := range oldaPlIDs { + if !utils.IsSliceMember(apIDs, oldAPid) { + apIDs = append(apIDs, oldAPid) + } + } + } + } ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(apIDs) @@ -683,16 +708,17 @@ func (ms *MapStorage) SetAccountActionPlansDrv(acntID string, apIDs []string) (e return err } ms.dict[utils.AccountActionPlansPrefix+acntID] = result + return } -func (ms *MapStorage) RemAccountActionPlansDrv(acntID string, apIDs []string) (err error) { +func (ms *MapStorage) RemAccountActionPlans(acntID string, apIDs []string) (err error) { key := utils.AccountActionPlansPrefix + acntID if len(apIDs) == 0 { delete(ms.dict, key) return } - oldaPlIDs, err := ms.GetAccountActionPlansDrv(acntID) + oldaPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional) if err != nil { return err } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 766ac6f26..17d8d1396 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -461,7 +461,7 @@ func (ms *MongoStorage) RebuildReverseForPrefix(prefix string) (err error) { return err } for acntID := range apl.AccountIDs { - if err = ms.SetAccountActionPlansDrv(acntID, []string{apl.Id}); err != nil { + if err = ms.SetAccountActionPlans(acntID, []string{apl.Id}, false); err != nil { return err } } @@ -514,7 +514,7 @@ func (ms *MongoStorage) RemoveReverseForPrefix(prefix string) (err error) { return err } for acntID := range apl.AccountIDs { - if err = ms.RemAccountActionPlansDrv(acntID, []string{apl.Id}); err != nil { + if err = ms.RemAccountActionPlans(acntID, []string{apl.Id}); err != nil { return err } } @@ -1456,7 +1456,15 @@ func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err return } -func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string) (aPlIDs []string, err error) { +func (ms *MongoStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (aPlIDs []string, err error) { + if !skipCache { + if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.([]string), nil + } + } var kv struct { Key string Value []string @@ -1465,7 +1473,9 @@ func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string) (aPlIDs []string cur := ms.getCol(ColAAp).FindOne(sctx, bson.M{"key": acntID}) if err := cur.Decode(&kv); err != nil { if err == mongo.ErrNoDocuments { - err = utils.ErrNotFound + Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, + cacheCommit(transactionID), transactionID) + return utils.ErrNotFound } return err } @@ -1474,10 +1484,23 @@ func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string) (aPlIDs []string return nil, err } aPlIDs = kv.Value + Cache.Set(utils.CacheAccountActionPlans, acntID, aPlIDs, nil, + cacheCommit(transactionID), transactionID) return } -func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { +func (ms *MongoStorage) SetAccountActionPlans(acntID string, aPlIDs []string, overwrite bool) (err error) { + if !overwrite { + if oldaPlIDs, err := ms.GetAccountActionPlans(acntID, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { + return err + } else { + for _, oldAPid := range oldaPlIDs { + if !utils.IsSliceMember(aPlIDs, oldAPid) { + aPlIDs = append(aPlIDs, oldAPid) + } + } + } + } return ms.query(func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColAAp).UpdateOne(sctx, bson.M{"key": acntID}, bson.M{"$set": struct { @@ -1491,7 +1514,7 @@ func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) } // ToDo: check return len(aPlIDs) == 0 -func (ms *MongoStorage) RemAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { +func (ms *MongoStorage) RemAccountActionPlans(acntID string, aPlIDs []string) (err error) { if len(aPlIDs) == 0 { return ms.query(func(sctx mongo.SessionContext) (err error) { dr, err := ms.getCol(ColAAp).DeleteOne(sctx, bson.M{"key": acntID}) @@ -1501,7 +1524,7 @@ func (ms *MongoStorage) RemAccountActionPlansDrv(acntID string, aPlIDs []string) return err }) } - oldAPlIDs, err := ms.GetAccountActionPlansDrv(acntID) + oldAPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional) if err != nil { return err } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index a0bc9b487..ef548797d 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -296,7 +296,7 @@ func (rs *RedisStorage) RebuildReverseForPrefix(prefix string) (err error) { return err } for acntID := range apl.AccountIDs { - if err = rs.SetAccountActionPlansDrv(acntID, []string{apl.Id}); err != nil { + if err = rs.SetAccountActionPlans(acntID, []string{apl.Id}, false); err != nil { return err } } @@ -343,7 +343,7 @@ func (rs *RedisStorage) RemoveReverseForPrefix(prefix string) (err error) { return err } for acntID := range apl.AccountIDs { - if err = rs.RemAccountActionPlansDrv(acntID, []string{apl.Id}); err != nil { + if err = rs.RemAccountActionPlans(acntID, []string{apl.Id}); err != nil { return err } } @@ -990,11 +990,22 @@ func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err return } -func (rs *RedisStorage) GetAccountActionPlansDrv(acntID string) (aPlIDs []string, err error) { +func (rs *RedisStorage) GetAccountActionPlans(acntID string, skipCache bool, + transactionID string) (aPlIDs []string, err error) { + if !skipCache { + if x, ok := Cache.Get(utils.CacheAccountActionPlans, acntID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.([]string), nil + } + } var values []byte if values, err = rs.Cmd(redis_GET, utils.AccountActionPlansPrefix+acntID).Bytes(); err != nil { if err == redis.ErrRespNil { // did not find the destination + Cache.Set(utils.CacheAccountActionPlans, acntID, nil, nil, + cacheCommit(transactionID), transactionID) err = utils.ErrNotFound } return @@ -1002,11 +1013,23 @@ func (rs *RedisStorage) GetAccountActionPlansDrv(acntID string) (aPlIDs []string if err = rs.ms.Unmarshal(values, &aPlIDs); err != nil { return } - + Cache.Set(utils.CacheAccountActionPlans, acntID, aPlIDs, nil, + cacheCommit(transactionID), transactionID) return } -func (rs *RedisStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { +func (rs *RedisStorage) SetAccountActionPlans(acntID string, aPlIDs []string, overwrite bool) (err error) { + if !overwrite { + if oldaPlIDs, err := rs.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { + return err + } else { + for _, oldAPid := range oldaPlIDs { + if !utils.IsSliceMember(aPlIDs, oldAPid) { + aPlIDs = append(aPlIDs, oldAPid) + } + } + } + } var result []byte if result, err = rs.ms.Marshal(aPlIDs); err != nil { return err @@ -1014,12 +1037,12 @@ func (rs *RedisStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) return rs.Cmd(redis_SET, utils.AccountActionPlansPrefix+acntID, result).Err } -func (rs *RedisStorage) RemAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { +func (rs *RedisStorage) RemAccountActionPlans(acntID string, aPlIDs []string) (err error) { key := utils.AccountActionPlansPrefix + acntID if len(aPlIDs) == 0 { return rs.Cmd(redis_DEL, key).Err } - oldaPlIDs, err := rs.GetAccountActionPlansDrv(acntID) + oldaPlIDs, err := rs.GetAccountActionPlans(acntID, true, utils.NonTransactional) if err != nil { return err } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index d0bdc13a0..39452298e 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -870,13 +870,6 @@ func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri return smCosts, nil } -func (self *SQLStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { - return -} -func (self *SQLStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { - return -} - func (self *SQLStorage) SetCDR(cdr *CDR, allowUpdate bool) error { tx := self.db.Begin() cdrSql := cdr.AsCDRsql() diff --git a/engine/storage_utils.go b/engine/storage_utils.go index b95c1c60b..d7b4acc2c 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -80,7 +80,7 @@ func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler strin case utils.MYSQL: d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime) case utils.INTERNAL: - d, err = NewMapStorage() + d = NewInternalDB() default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]", db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL)) @@ -102,7 +102,7 @@ func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler strin case utils.MONGO: d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, false) case utils.INTERNAL: - d, err = NewMapStorage() + d = NewInternalDB() default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]", db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL)) @@ -124,7 +124,7 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string, case utils.MONGO: d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, false) case utils.INTERNAL: - d, err = NewMapStorage() + d = NewInternalDB() default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]", db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL)) @@ -146,7 +146,7 @@ func ConfigureStorDB(db_type, host, port, name, user, pass string, case utils.MONGO: d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, false) case utils.INTERNAL: - d, err = NewMapStorage() + d = NewInternalDB() default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]", db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL)) diff --git a/engine/stordb_it_test.go b/engine/stordb_it_test.go index f49d2158f..bb83eb1c7 100644 --- a/engine/stordb_it_test.go +++ b/engine/stordb_it_test.go @@ -46,18 +46,18 @@ var sTestsStorDBit = []func(t *testing.T){ testStorDBitCRUDTpDestinations, testStorDBitCRUDTpRates, testStorDBitCRUDTpDestinationRates, - // testStorDBitCRUDTpRatingPlans, - // testStorDBitCRUDTpRatingProfiles, + testStorDBitCRUDTpRatingPlans, + testStorDBitCRUDTpRatingProfiles, testStorDBitCRUDTpSharedGroups, testStorDBitCRUDTpActions, testStorDBitCRUDTpActionPlans, testStorDBitCRUDTpActionTriggers, - // testStorDBitCRUDTpAccountActions, + testStorDBitCRUDTpAccountActions, testStorDBitCRUDTpResources, - //testStorDBitCRUDTpStats, - // testStorDBitCRUDCDRs, - // testStorDBitCRUDSMCosts, - // testStorDBitCRUDSMCosts2, + testStorDBitCRUDTpStats, + testStorDBitCRUDCDRs, + testStorDBitCRUDSMCosts, + testStorDBitCRUDSMCosts2, } func TestStorDBitMySQL(t *testing.T) { @@ -123,14 +123,12 @@ func TestStorDBitMongo(t *testing.T) { } } -func TestStorDBitMapStorage(t *testing.T) { +func TestStorDBitInternalDB(t *testing.T) { if cfg, err = config.NewDefaultCGRConfig(); err != nil { t.Error(err) } config.SetCgrConfig(cfg) - if storDB, err = NewMapStorage(); err != nil { - t.Error(err) - } + storDB = NewInternalDB() for _, stest := range sTestsStorDBit { stestFullName := runtime.FuncForPC(reflect.ValueOf(stest).Pointer()).Name() split := strings.Split(stestFullName, ".") diff --git a/engine/tpreader.go b/engine/tpreader.go index 52ebf463a..e0cec6f8a 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -824,7 +824,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) if err = tpr.dm.DataDB().SetActionPlan(accountAction.ActionPlanId, actionPlan, false, utils.NonTransactional); err != nil { return errors.New(err.Error() + " (SetActionPlan): " + accountAction.ActionPlanId) } - if err = tpr.dm.SetAccountActionPlans(id, []string{accountAction.ActionPlanId}, false); err != nil { + if err = tpr.dm.DataDB().SetAccountActionPlans(id, []string{accountAction.ActionPlanId}, false); err != nil { return err } if err = tpr.dm.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{id}, true); err != nil {