From c242636e3ee83808ae6c68359710a40f3c28e53b Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 21 May 2021 10:03:27 +0300 Subject: [PATCH] Updated Threshold handling in datamanager --- apier/v1/libapier.go | 18 +++++++++++- apier/v1/libapier_test.go | 4 ++- apier/v1/thresholds.go | 19 +------------ engine/datadbmock.go | 28 ++++++++++++++---- engine/datamanager.go | 48 ++++++++++++++++--------------- engine/loader_csv_test.go | 9 ------ engine/thresholds.go | 2 +- engine/thresholds_test.go | 12 ++++---- engine/tpreader.go | 58 +++++--------------------------------- engine/tpreader_test.go | 31 +------------------- engine/z_onstor_it_test.go | 4 +-- loaders/lib_test.go | 25 ---------------- loaders/loader.go | 6 ---- loaders/loader_test.go | 55 ++++++++++++++++-------------------- migrator/thresholds.go | 50 +++++++++++++++----------------- 15 files changed, 134 insertions(+), 235 deletions(-) diff --git a/apier/v1/libapier.go b/apier/v1/libapier.go index f15e880fe..00fba905f 100644 --- a/apier/v1/libapier.go +++ b/apier/v1/libapier.go @@ -50,12 +50,20 @@ func (apierSv1 *APIerSv1) CallCache(cacheopt string, tnt, cacheID, itemID string return } case utils.MetaClear: - cacheIDs := make([]string, 1, 2) + cacheIDs := make([]string, 1, 3) cacheIDs[0] = cacheID // do not send a EmptyString if the item doesn't have indexes if cIdx, has := utils.CacheInstanceToCacheIndex[cacheID]; has { cacheIDs = append(cacheIDs, cIdx) } + switch cacheID { // add the items to the cache reload + case utils.CacheThresholdProfiles: + cacheIDs = append(cacheIDs, utils.CacheThresholds) + case utils.CacheResourceProfiles: + cacheIDs = append(cacheIDs, utils.CacheResources) + case utils.CacheStatQueueProfiles: + cacheIDs = append(cacheIDs, utils.CacheStatQueues) + } method = utils.CacheSv1Clear args = &utils.AttrCacheIDsWithAPIOpts{ Tenant: tnt, @@ -78,6 +86,14 @@ func (apierSv1 *APIerSv1) composeArgsReload(tnt, cacheID, itemID string, filterI }, APIOpts: opts, } + switch cacheID { // add the items to the cache reload + case utils.CacheThresholdProfiles: + rpl.ArgsCache[utils.ThresholdIDs] = []string{itemID} + case utils.CacheResourceProfiles: + rpl.ArgsCache[utils.ResourceIDs] = []string{itemID} + case utils.CacheStatQueueProfiles: + rpl.ArgsCache[utils.StatsQueueIDs] = []string{itemID} + } if filterIDs == nil { // in case we remove a profile we do not need to reload the indexes return } diff --git a/apier/v1/libapier_test.go b/apier/v1/libapier_test.go index 736290f5f..9f5334405 100644 --- a/apier/v1/libapier_test.go +++ b/apier/v1/libapier_test.go @@ -71,6 +71,7 @@ func TestComposeArgsReload(t *testing.T) { Tenant: "cgrates.org", ArgsCache: map[string][]string{ utils.StatsQueueProfileIDs: {"cgrates.org:Stat2"}, + utils.StatsQueueIDs: {"cgrates.org:Stat2"}, utils.StatFilterIndexIDs: { "cgrates.org:*string:*req.Account:1001", "cgrates.org:*prefix:*req.Destination:1001", @@ -134,7 +135,7 @@ func TestCallCache(t *testing.T) { Method: utils.CacheSv1Clear, Params: &utils.AttrCacheIDsWithAPIOpts{ Tenant: "cgrates.org", - CacheIDs: []string{utils.CacheStatQueueProfiles, utils.CacheStatFilterIndexes}, + CacheIDs: []string{utils.CacheStatQueueProfiles, utils.CacheStatFilterIndexes, utils.CacheStatQueues}, APIOpts: make(map[string]interface{}), }, } @@ -153,6 +154,7 @@ func TestCallCache(t *testing.T) { Tenant: "cgrates.org", ArgsCache: map[string][]string{ utils.StatsQueueProfileIDs: {"cgrates.org:Stat2"}, + utils.StatsQueueIDs: {"cgrates.org:Stat2"}, utils.StatFilterIndexIDs: { "cgrates.org:*string:*req.Account:1001", "cgrates.org:*prefix:*req.Destination:1001", diff --git a/apier/v1/thresholds.go b/apier/v1/thresholds.go index 189e76df0..44222fc04 100644 --- a/apier/v1/thresholds.go +++ b/apier/v1/thresholds.go @@ -140,20 +140,11 @@ func (apierSv1 *APIerSv1) SetThresholdProfile(args *engine.ThresholdProfileWithA if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheThresholdProfiles: loadID, utils.CacheThresholds: loadID}); err != nil { return utils.APIErrorHandler(err) } - //handle caching for ThresholdProfile + //handle caching for ThresholdProfile and Threshold if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheThresholdProfiles, args.TenantID(), &args.FilterIDs, nil, args.APIOpts); err != nil { return utils.APIErrorHandler(err) } - if err := apierSv1.DataManager.SetThreshold(&engine.Threshold{Tenant: args.Tenant, ID: args.ID}, args.MinSleep, false); err != nil { - return err - } - //handle caching for Threshold - if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheThresholds, - args.TenantID(), nil, nil, args.APIOpts); err != nil { - return utils.APIErrorHandler(err) - } - *reply = utils.OK return nil } @@ -175,20 +166,12 @@ func (apierSv1 *APIerSv1) RemoveThresholdProfile(args *utils.TenantIDWithAPIOpts utils.ConcatenatedKey(tnt, args.ID), nil, nil, args.APIOpts); err != nil { return utils.APIErrorHandler(err) } - if err := apierSv1.DataManager.RemoveThreshold(tnt, args.ID, utils.NonTransactional); err != nil { - return utils.APIErrorHandler(err) - } //generate a loadID for CacheThresholdProfiles and CacheThresholds and store it in database //make 1 insert for both ThresholdProfile and Threshold instead of 2 loadID := time.Now().UnixNano() if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheThresholdProfiles: loadID, utils.CacheThresholds: loadID}); err != nil { return utils.APIErrorHandler(err) } - //handle caching for Threshold - if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), tnt, utils.CacheThresholds, - utils.ConcatenatedKey(tnt, args.ID), nil, nil, args.APIOpts); err != nil { - return utils.APIErrorHandler(err) - } *reply = utils.OK return nil } diff --git a/engine/datadbmock.go b/engine/datadbmock.go index f9d1aff85..ba0aa05e8 100644 --- a/engine/datadbmock.go +++ b/engine/datadbmock.go @@ -23,10 +23,14 @@ import ( ) type DataDBMock struct { - GetKeysForPrefixF func(string) ([]string, error) - GetChargerProfileDrvF func(string, string) (*ChargerProfile, error) - GetFilterDrvF func(string, string) (*Filter, error) - GetIndexesDrvF func(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) + GetKeysForPrefixF func(string) ([]string, error) + GetChargerProfileDrvF func(string, string) (*ChargerProfile, error) + GetFilterDrvF func(string, string) (*Filter, error) + GetIndexesDrvF func(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) + GetThresholdProfileDrvF func(tenant, id string) (tp *ThresholdProfile, err error) + SetThresholdProfileDrvF func(tp *ThresholdProfile) (err error) + RemThresholdProfileDrvF func(tenant, id string) (err error) + GetThresholdDrvF func(tenant, id string) (*Threshold, error) } //Storage methods @@ -285,19 +289,31 @@ func (dbM *DataDBMock) RemStatQueueDrv(tenant, id string) (err error) { return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetThresholdProfileDrv(tenant string, ID string) (tp *ThresholdProfile, err error) { +func (dbM *DataDBMock) GetThresholdProfileDrv(tenant, id string) (tp *ThresholdProfile, err error) { + if dbM.GetThresholdProfileDrvF != nil { + return dbM.GetThresholdProfileDrvF(tenant, id) + } return nil, utils.ErrNotImplemented } func (dbM *DataDBMock) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) { + if dbM.SetThresholdProfileDrvF != nil { + return dbM.SetThresholdProfileDrvF(tp) + } return utils.ErrNotImplemented } func (dbM *DataDBMock) RemThresholdProfileDrv(tenant, id string) (err error) { + if dbM.RemThresholdProfileDrvF != nil { + return dbM.RemThresholdProfileDrvF(tenant, id) + } return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetThresholdDrv(string, string) (*Threshold, error) { +func (dbM *DataDBMock) GetThresholdDrv(tenant, id string) (*Threshold, error) { + if dbM.GetThresholdDrvF != nil { + return dbM.GetThresholdDrvF(tenant, id) + } return nil, utils.ErrNotImplemented } diff --git a/engine/datamanager.go b/engine/datamanager.go index b05cf49a3..f6ea25784 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -950,28 +950,10 @@ func (dm *DataManager) GetThreshold(tenant, id string, return } -func (dm *DataManager) SetThreshold(th *Threshold, snooze time.Duration, simpleSet bool) (err error) { +func (dm *DataManager) SetThreshold(th *Threshold) (err error) { if dm == nil { return utils.ErrNoDatabaseConn } - if !simpleSet { - tnt := th.Tenant // save the tenant - id := th.ID // save the ID from the initial Threshold - th, err = dm.GetThreshold(tnt, id, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return - } - if err == utils.ErrNotFound { - th = &Threshold{Tenant: tnt, ID: id, Hits: 0} - } else { - if th.tPrfl == nil { - if th.tPrfl, err = dm.GetThresholdProfile(th.Tenant, th.ID, true, false, utils.NonTransactional); err != nil { - return - } - } - th.Snooze = th.Snooze.Add(-th.tPrfl.MinSleep).Add(snooze) - } - } if err = dm.DataDB().SetThresholdDrv(th); err != nil { return } @@ -1087,14 +1069,36 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) } } if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, config.CgrConfig().DataDbCfg().RplFiltered, utils.ThresholdProfilePrefix, th.TenantID(), // this are used to get the host IDs from cache utils.ReplicatorSv1SetThresholdProfile, &ThresholdProfileWithAPIOpts{ ThresholdProfile: th, APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil { + return + } + } + + if oldTh == nil || // create the threshold if it didn't exit before + oldTh.MaxHits != th.MaxHits || + oldTh.MinHits != th.MinHits || + oldTh.MinSleep != th.MinSleep { // reset the threshold if the profile changed this fields + err = dm.SetThreshold(&Threshold{ + Tenant: th.Tenant, + ID: th.ID, + Hits: 0, + tPrfl: th, + }) + } else if _, errTh := dm.GetThreshold(th.Tenant, th.ID, // do not try to get the threshold if the configuration changed + true, false, utils.NonTransactional); errTh == utils.ErrNotFound { // the threshold does not exist + err = dm.SetThreshold(&Threshold{ + Tenant: th.Tenant, + ID: th.ID, + Hits: 0, + tPrfl: th, + }) } return } @@ -1133,7 +1137,7 @@ func (dm *DataManager) RemoveThresholdProfile(tenant, id, APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) } - return + return dm.RemoveThreshold(tenant, id, transactionID) // remove the thrshold } func (dm *DataManager) GetStatQueueProfile(tenant, id string, cacheRead, cacheWrite bool, diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index 91f5464f5..c7c9d1aa9 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -1432,12 +1432,3 @@ func TestLoadstatQueues(t *testing.T) { t.Errorf("Failed to load statQueues: %s", utils.ToIJSON(csvr.statQueues)) } } - -func TestLoadThresholds(t *testing.T) { - eThresholds := []*utils.TenantID{ - {Tenant: "cgrates.org", ID: "Threshold1"}, - } - if len(csvr.thresholds) != len(eThresholds) { - t.Errorf("Failed to load thresholds: %s", utils.ToIJSON(csvr.thresholds)) - } -} diff --git a/engine/thresholds.go b/engine/thresholds.go index 8781210b0..0364b61b7 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -209,7 +209,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) { if t.dirty == nil || !*t.dirty { return } - if err = tS.dm.SetThreshold(t, 0, true); err != nil { + if err = tS.dm.SetThreshold(t); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed saving Threshold with tenant: %s and ID: %s, error: %s", t.Tenant, t.ID, err.Error())) diff --git a/engine/thresholds_test.go b/engine/thresholds_test.go index fc922a947..d99298471 100644 --- a/engine/thresholds_test.go +++ b/engine/thresholds_test.go @@ -176,7 +176,7 @@ func TestThresholdsCache(t *testing.T) { } } for _, th := range ths { - if err = dmTH.SetThreshold(th, 0, true); err != nil { + if err = dmTH.SetThreshold(th); err != nil { t.Errorf("Error: %+v", err) } } @@ -358,7 +358,7 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) { } } for _, th := range ths { - if err = dmTH.SetThreshold(th, 0, true); err != nil { + if err = dmTH.SetThreshold(th); err != nil { t.Errorf("Error: %+v", err) } } @@ -568,7 +568,7 @@ func TestThresholdsProcessEvent(t *testing.T) { } } for _, th := range ths { - if err = dmTH.SetThreshold(th, 0, true); err != nil { + if err = dmTH.SetThreshold(th); err != nil { t.Errorf("Error: %+v", err) } } @@ -769,7 +769,7 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) { } } for _, th := range ths { - if err = dmTH.SetThreshold(th, 0, true); err != nil { + if err = dmTH.SetThreshold(th); err != nil { t.Errorf("Error: %+v", err) } } @@ -992,7 +992,7 @@ func TestThresholdsProcessEvent2(t *testing.T) { } } for _, th := range ths { - if err = dmTH.SetThreshold(th, 0, true); err != nil { + if err = dmTH.SetThreshold(th); err != nil { t.Errorf("Error: %+v", err) } } @@ -1037,7 +1037,7 @@ func TestThresholdsProcessEvent2(t *testing.T) { } else if !reflect.DeepEqual(thPrf, temptTh) { t.Errorf("Expecting: %+v, received: %+v", th, temptTh) } - if err = dmTH.SetThreshold(th, 0, true); err != nil { + if err = dmTH.SetThreshold(th); err != nil { t.Errorf("Error: %+v", err) } if temptTh, err := dmTH.GetThreshold(th.Tenant, diff --git a/engine/tpreader.go b/engine/tpreader.go index d00efd9f6..1892f8d4e 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -58,11 +58,11 @@ type TpReader struct { dispatcherHosts map[utils.TenantID]*utils.TPDispatcherHost resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles - thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles - acntActionPlans map[string][]string - cacheConns []string - schedulerConns []string - isInternalDB bool // do not reload cache if we use intarnalDB + // thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles + acntActionPlans map[string][]string + cacheConns []string + schedulerConns []string + isInternalDB bool // do not reload cache if we use intarnalDB } func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string, @@ -1154,11 +1154,7 @@ func (tpr *TpReader) LoadThresholdsFiltered(tag string) (err error) { mapTHs[utils.TenantID{Tenant: th.Tenant, ID: th.ID}] = th } tpr.thProfiles = mapTHs - for tntID := range mapTHs { - tpr.thresholds = append(tpr.thresholds, &utils.TenantID{Tenant: tntID.Tenant, ID: tntID.ID}) - - } - return nil + return } func (tpr *TpReader) LoadThresholds() error { @@ -1679,25 +1675,6 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { } if len(tpr.thProfiles) != 0 { loadIDs[utils.CacheThresholdProfiles] = loadID - } - if verbose { - log.Print("Thresholds:") - } - for _, thd := range tpr.thresholds { - var minSleep time.Duration - if tpr.thProfiles[*thd].MinSleep != utils.EmptyString { - if minSleep, err = utils.ParseDurationWithNanosecs(tpr.thProfiles[*thd].MinSleep); err != nil { - return - } - } - if err = tpr.dm.SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}, minSleep, false); err != nil { - return - } - if verbose { - log.Print("\t", thd.TenantID()) - } - } - if len(tpr.thresholds) != 0 { loadIDs[utils.CacheThresholds] = loadID } if verbose { @@ -1901,12 +1878,6 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { keys[i] = k.TenantID() } return keys, nil - case utils.ThresholdPrefix: - keys := make([]string, len(tpr.thresholds)) - for i, k := range tpr.thresholds { - keys[i] = k.TenantID() - } - return keys, nil case utils.DestinationPrefix: keys := make([]string, len(tpr.destinations)) i := 0 @@ -2208,18 +2179,6 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("\t", utils.ConcatenatedKey(tpTH.Tenant, tpTH.ID)) } } - if verbose { - log.Print("Thresholds:") - } - for _, thd := range tpr.thresholds { - if err = tpr.dm.RemoveThreshold(thd.Tenant, thd.ID, utils.NonTransactional); err != nil { - return - } - if verbose { - log.Print("\t", thd.TenantID()) - } - } - if verbose { log.Print("RouteProfiles:") } @@ -2367,8 +2326,6 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error } if len(tpr.thProfiles) != 0 { loadIDs[utils.CacheThresholdProfiles] = loadID - } - if len(tpr.thresholds) != 0 { loadIDs[utils.CacheThresholds] = loadID } if len(tpr.routeProfiles) != 0 { @@ -2414,7 +2371,6 @@ func (tpr *TpReader) ReloadCache(caching string, verbose bool, opts map[string]i aatIDs, _ := tpr.GetLoadedIds(utils.ActionTriggerPrefix) stqIDs, _ := tpr.GetLoadedIds(utils.StatQueuePrefix) stqpIDs, _ := tpr.GetLoadedIds(utils.StatQueueProfilePrefix) - trsIDs, _ := tpr.GetLoadedIds(utils.ThresholdPrefix) trspfIDs, _ := tpr.GetLoadedIds(utils.ThresholdProfilePrefix) flrIDs, _ := tpr.GetLoadedIds(utils.FilterPrefix) routeIDs, _ := tpr.GetLoadedIds(utils.RouteProfilePrefix) @@ -2440,7 +2396,7 @@ func (tpr *TpReader) ReloadCache(caching string, verbose bool, opts map[string]i utils.ActionTriggerIDs: aatIDs, utils.StatsQueueIDs: stqIDs, utils.StatsQueueProfileIDs: stqpIDs, - utils.ThresholdIDs: trsIDs, + utils.ThresholdIDs: trspfIDs, utils.ThresholdProfileIDs: trspfIDs, utils.FilterIDs: flrIDs, utils.RouteProfileIDs: routeIDs, diff --git a/engine/tpreader_test.go b/engine/tpreader_test.go index a5f2201a6..fffc85163 100644 --- a/engine/tpreader_test.go +++ b/engine/tpreader_test.go @@ -452,29 +452,6 @@ func TestGetLoadedIdsStatQueues(t *testing.T) { } } -func TestGetLoadedIdsThresholds(t *testing.T) { - tpr := &TpReader{ - thresholds: []*utils.TenantID{ - { - Tenant: "cgrates.org", - ID: "thresholdsID", - }, - { - Tenant: "tenant.com", - ID: "mytenantID", - }, - }, - } - rcv, err := tpr.GetLoadedIds(utils.ThresholdPrefix) - if err != nil { - t.Error(err) - } - expRcv := []string{"cgrates.org:thresholdsID", "tenant.com:mytenantID"} - if !reflect.DeepEqual(expRcv, rcv) { - t.Errorf("\nExpected %v but received \n%v", expRcv, rcv) - } -} - func TestGetLoadedIdsDestinations(t *testing.T) { tpr := &TpReader{ destinations: map[string]*Destination{ @@ -1031,7 +1008,7 @@ func TestReloadCache(t *testing.T) { "DispatcherHostIDs": {"cgrates.org:dispatcherHostsID"}, "ResourceIDs": {"cgrates.org:resourcesID"}, "StatsQueueIDs": {"cgrates.org:statQueuesID"}, - "ThresholdIDs": {"cgrates.org:thresholdsID"}, + "ThresholdIDs": {"cgrates.org:thresholdProfilesID"}, "AccountActionPlanIDs": {"AccountActionPlansID"}, }, } @@ -1117,12 +1094,6 @@ func TestReloadCache(t *testing.T) { ID: "statQueuesID", }, }, - thresholds: []*utils.TenantID{ - { - Tenant: "cgrates.org", - ID: "thresholdsID", - }, - }, acntActionPlans: map[string][]string{ "AccountActionPlansID": {}, }, diff --git a/engine/z_onstor_it_test.go b/engine/z_onstor_it_test.go index 91348ecea..ac4de15d9 100644 --- a/engine/z_onstor_it_test.go +++ b/engine/z_onstor_it_test.go @@ -1653,7 +1653,7 @@ func testOnStorITThreshold(t *testing.T) { true, false, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.SetThreshold(th, 0, true); err != nil { + if err := onStor.SetThreshold(th); err != nil { t.Error(err) } //get from database @@ -1671,7 +1671,7 @@ func testOnStorITThreshold(t *testing.T) { } //update th.Hits = 20 - if err := onStor.SetThreshold(th, 0, true); err != nil { + if err := onStor.SetThreshold(th); err != nil { t.Error(err) } diff --git a/loaders/lib_test.go b/loaders/lib_test.go index c131ade74..32db78f10 100644 --- a/loaders/lib_test.go +++ b/loaders/lib_test.go @@ -745,36 +745,11 @@ type dataDBMockError struct { *engine.DataDBMock } -//For Threshold -func (dbM *dataDBMockError) RemThresholdProfileDrv(tenant, id string) (err error) { - return -} - func (dbM *dataDBMockError) SetIndexesDrv(idxItmType, tntCtx string, indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) { return } -func (dbM *dataDBMockError) RemoveThresholdDrv(string, string) error { - return utils.ErrNoDatabaseConn -} - -func (dbM *dataDBMockError) GetThresholdProfileDrv(tenant string, ID string) (tp *engine.ThresholdProfile, err error) { - expThresholdPrf := &engine.ThresholdProfile{ - Tenant: "cgrates.org", - ID: "REM_THRESHOLDS_1", - } - return expThresholdPrf, nil -} - -func (dbM *dataDBMockError) SetThresholdProfileDrv(tp *engine.ThresholdProfile) (err error) { - return -} - -func (dbM *dataDBMockError) GetThresholdDrv(string, string) (*engine.Threshold, error) { - return nil, utils.ErrNoDatabaseConn -} - func (dbM *dataDBMockError) HasDataDrv(string, string, string) (bool, error) { return false, nil } diff --git a/loaders/loader.go b/loaders/loader.go index 508f918a0..8e3e9eab8 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -448,9 +448,6 @@ func (ldr *Loader) storeLoadedData(loaderType string, if err := ldr.dm.SetThresholdProfile(thPrf, true); err != nil { return err } - if err := ldr.dm.SetThreshold(&engine.Threshold{Tenant: thPrf.Tenant, ID: thPrf.ID}, thPrf.MinSleep, false); err != nil { - return err - } cacheArgs[utils.ThresholdProfileIDs] = ids cacheArgs[utils.ThresholdIDs] = ids } @@ -747,9 +744,6 @@ func (ldr *Loader) removeLoadedData(loaderType string, lds map[string][]LoaderDa tntIDStruct.ID, utils.NonTransactional, true); err != nil { return err } - if err := ldr.dm.RemoveThreshold(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { - return err - } cacheArgs[utils.ThresholdProfileIDs] = ids cacheArgs[utils.ThresholdIDs] = ids } diff --git a/loaders/loader_test.go b/loaders/loader_test.go index d78a13b2e..a9b1f7fd4 100644 --- a/loaders/loader_test.go +++ b/loaders/loader_test.go @@ -3000,26 +3000,23 @@ func TestRemoveThresholdsMockError(t *testing.T) { bufLoaderData: make(map[string][]LoaderData), dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil), timezone: "UTC", - } - ldr.dataTpls = map[string][]*config.FCTemplate{ - utils.MetaThresholds: { - {Tag: "TenantID", - Path: "Tenant", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep), - Mandatory: true}, - {Tag: "ID", - Path: "ID", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep), - Mandatory: true}, + dataTpls: map[string][]*config.FCTemplate{ + utils.MetaThresholds: { + {Tag: "TenantID", + Path: "Tenant", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep), + Mandatory: true}, + {Tag: "ID", + Path: "ID", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep), + Mandatory: true}, + }, }, } - thresholdsCsv := ` -#Tenant[0],ID[1] -cgrates.org,REM_THRESHOLDS_1, -` - rdr := io.NopCloser(strings.NewReader(thresholdsCsv)) + rdr := io.NopCloser(strings.NewReader(`#Tenant[0],ID[1] + cgrates.org,REM_THRESHOLDS_1,`)) csvRdr := csv.NewReader(rdr) csvRdr.Comment = '#' ldr.rdrs = map[string]map[string]*openedCSVFile{ @@ -3032,24 +3029,22 @@ cgrates.org,REM_THRESHOLDS_1, }, } - expThresholdPrf := &engine.ThresholdProfile{ - Tenant: "cgrates.org", - ID: "REM_THRESHOLDS_1", - } - - if err := ldr.dm.SetThresholdProfile(expThresholdPrf, true); err != nil { - t.Error(err) - } - - newData := &dataDBMockError{} - ldr.dm = engine.NewDataManager(newData, config.CgrConfig().CacheCfg(), nil) expected := utils.ErrNoDatabaseConn + ldr.dm = engine.NewDataManager(&engine.DataDBMock{ + GetThresholdProfileDrvF: func(tenant, id string) (tp *engine.ThresholdProfile, err error) { + return &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: "REM_THRESHOLDS_1", + }, nil + }, + SetThresholdProfileDrvF: func(tp *engine.ThresholdProfile) (err error) { return expected }, + RemThresholdProfileDrvF: func(tenant, id string) (err error) { return expected }, + }, config.CgrConfig().CacheCfg(), nil) if err := ldr.processContent(utils.MetaThresholds, utils.EmptyString); err == nil || err != expected { t.Errorf("Expected %+v, received %+v", expected, err) } else if err := ldr.removeContent(utils.MetaThresholds, utils.EmptyString); err == nil || err != expected { t.Errorf("Expected %+v, received %+v", expected, err) } - } func TestRemoveStatQueueMockError(t *testing.T) { diff --git a/migrator/thresholds.go b/migrator/thresholds.go index 882b9031e..2abbe0b34 100644 --- a/migrator/thresholds.go +++ b/migrator/thresholds.go @@ -48,29 +48,12 @@ type v2ActionTrigger struct { func (m *Migrator) migrateCurrentThresholds() (err error) { var ids []string //Thresholds - ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ThresholdPrefix) - if err != nil { - return err - } for _, id := range ids { tntID := strings.SplitN(strings.TrimPrefix(id, utils.ThresholdPrefix), utils.InInFieldSep, 2) if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating thresholds", id) } - ths, err := m.dmIN.DataManager().GetThreshold(tntID[0], tntID[1], false, false, utils.NonTransactional) - if err != nil { - return err - } - if ths == nil || m.dryRun { - continue - } - if err := m.dmOut.DataManager().SetThreshold(ths, 0, true); err != nil { - return err - } - if err := m.dmIN.DataManager().RemoveThreshold(tntID[0], tntID[1], utils.NonTransactional); err != nil { - return err - } - m.stats[utils.Thresholds]++ + } //ThresholdProfiles ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ThresholdProfilePrefix) @@ -82,19 +65,31 @@ func (m *Migrator) migrateCurrentThresholds() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating threshold profiles", id) } - ths, err := m.dmIN.DataManager().GetThresholdProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + thps, err := m.dmIN.DataManager().GetThresholdProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } - if ths == nil || m.dryRun { + ths, err := m.dmIN.DataManager().GetThreshold(tntID[0], tntID[1], false, false, utils.NonTransactional) + if err != nil { + return err + } + if thps == nil || m.dryRun { continue } - if err := m.dmOut.DataManager().SetThresholdProfile(ths, true); err != nil { + if err := m.dmOut.DataManager().SetThresholdProfile(thps, true); err != nil { + return err + } + // update the threshold in the new DB + if ths == nil { + continue + } + if err := m.dmOut.DataManager().SetThreshold(ths); err != nil { return err } if err := m.dmIN.DataManager().RemoveThresholdProfile(tntID[0], tntID[1], utils.NonTransactional, false); err != nil { return err } + m.stats[utils.Thresholds]++ } return } @@ -200,14 +195,15 @@ func (m *Migrator) migrateThresholds() (err error) { if err = m.dmOut.DataManager().SetFilter(filter, true); err != nil { return } - if err = m.dmOut.DataManager().SetThreshold(th, 0, true); err != nil { - return - } } if err = m.dmOut.DataManager().SetThresholdProfile(v4, true); err != nil { return } - + if migratedFrom == 1 { // do it after SetThresholdProfile to overwrite the created threshold + if err = m.dmOut.DataManager().SetThreshold(th); err != nil { + return + } + } } m.stats[utils.Thresholds]++ } @@ -324,10 +320,10 @@ func (m *Migrator) SasThreshold(v2ATR *engine.ActionTrigger) (err error) { return err } } - if err := m.dmOut.DataManager().SetThreshold(th, 0, true); err != nil { + if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil { return err } - if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil { + if err := m.dmOut.DataManager().SetThreshold(th); err != nil { return err } m.stats[utils.Thresholds]++