From e95e4bfa7bdbe1d46199912bdbc335e42968f136 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 1 Sep 2020 09:59:37 +0300 Subject: [PATCH] Updated the locks for reverse filter indexes --- apier/v1/accounts_it_test.go | 6 +---- engine/libindex.go | 44 ++++++++++++++++++++++++++++++++---- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/apier/v1/accounts_it_test.go b/apier/v1/accounts_it_test.go index 7ad4bb168..a3273cf20 100644 --- a/apier/v1/accounts_it_test.go +++ b/apier/v1/accounts_it_test.go @@ -990,11 +990,7 @@ func testAccITAccountMonthlyEstimated(t *testing.T) { t.Errorf("Expected: %v,\n received: %v", 1, len(aps)) } else { // verify the GetNextTimeStart - - endOfMonth, err := utils.ParseTimeDetectLayout(utils.MetaMonthlyEstimated, "") - if err != nil { - t.Fatal(err) - } + endOfMonth := utils.GetEndOfMonth(time.Now()) if execDay := aps[0].ActionTimings[0].GetNextStartTime(time.Now()).Day(); execDay != endOfMonth.Day() { t.Errorf("Expected: %v,\n received: %v", endOfMonth.Day(), execDay) } diff --git a/engine/libindex.go b/engine/libindex.go index ba5ba62e9..095f671fa 100644 --- a/engine/libindex.go +++ b/engine/libindex.go @@ -418,10 +418,13 @@ func addIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, filter continue } tntCtx := utils.ConcatenatedKey(tnt, ID) + refID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntCtx) var indexes map[string]utils.StringSet if indexes, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, tntCtx, idxItmType, true, false); err != nil { if err != utils.ErrNotFound { + guardian.Guardian.UnguardIDs(refID) return } err = nil @@ -432,12 +435,15 @@ func addIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, filter indexes[idxItmType].Add(itemID) for indxKey := range indexes { if err = Cache.Remove(utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil { + guardian.Guardian.UnguardIDs(refID) return } } if err = dm.SetIndexes(utils.CacheReverseFilterIndexes, tntCtx, indexes, true, utils.NonTransactional); err != nil { + guardian.Guardian.UnguardIDs(refID) return } + guardian.Guardian.UnguardIDs(refID) } return } @@ -449,9 +455,12 @@ func removeIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, fil continue } tntCtx := utils.ConcatenatedKey(tnt, ID) + refID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntCtx) var indexes map[string]utils.StringSet if indexes, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, tntCtx, idxItmType, true, false); err != nil { + guardian.Guardian.UnguardIDs(refID) if err != utils.ErrNotFound { return } @@ -462,12 +471,15 @@ func removeIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, fil for indxKey := range indexes { if err = Cache.Remove(utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil { + guardian.Guardian.UnguardIDs(refID) return } } if err = dm.SetIndexes(utils.CacheReverseFilterIndexes, tntCtx, indexes, true, utils.NonTransactional); err != nil { + guardian.Guardian.UnguardIDs(refID) return } + guardian.Guardian.UnguardIDs(refID) } return } @@ -542,8 +554,13 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) { } } + tntID := newFlt.TenantID() + refID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntID) + defer guardian.Guardian.UnguardIDs(refID) var rcvIndx map[string]utils.StringSet - if rcvIndx, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, newFlt.TenantID(), + // get all reverse indexes from DB + if rcvIndx, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, tntID, utils.EmptyString, true, false); err != nil { if err != utils.ErrNotFound { return @@ -553,6 +570,7 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) { } removeIndexKeys := removeRules.AsSlice() + // remove the old indexes and compute the new ones for idxItmType, indx := range rcvIndx { switch idxItmType { case utils.CacheThresholdFilterIndexes: @@ -663,23 +681,29 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) { return } for _, ctx := range ap.Contexts { + tntCtx := utils.ConcatenatedKey(newFlt.Tenant, ctx) if err = removeFilterIndexesForFilter(dm, idxItmType, - utils.ConcatenatedKey(newFlt.Tenant, ctx), // remove the indexes for the filter + tntCtx, // remove the indexes for the filter removeIndexKeys, indx); err != nil { return } + refID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx) var updIdx map[string]utils.StringSet if updIdx, err = newFilterIndex(dm, idxItmType, newFlt.Tenant, ctx, itemID, ap.FilterIDs); err != nil { + guardian.Guardian.UnguardIDs(refID) return } for _, idx := range updIdx { idx.Add(itemID) } - if err = dm.SetIndexes(idxItmType, utils.ConcatenatedKey(newFlt.Tenant, ctx), + if err = dm.SetIndexes(idxItmType, tntCtx, updIdx, false, utils.NonTransactional); err != nil { + guardian.Guardian.UnguardIDs(refID) return } + guardian.Guardian.UnguardIDs(refID) } } case utils.CacheDispatcherFilterIndexes: @@ -690,23 +714,29 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) { return } for _, ctx := range dp.Subsystems { + tntCtx := utils.ConcatenatedKey(newFlt.Tenant, ctx) if err = removeFilterIndexesForFilter(dm, idxItmType, - utils.ConcatenatedKey(newFlt.Tenant, ctx), // remove the indexes for the filter + tntCtx, // remove the indexes for the filter removeIndexKeys, indx); err != nil { return } + refID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx) var updIdx map[string]utils.StringSet if updIdx, err = newFilterIndex(dm, idxItmType, newFlt.Tenant, ctx, itemID, dp.FilterIDs); err != nil { + guardian.Guardian.UnguardIDs(refID) return } for _, idx := range updIdx { idx.Add(itemID) } - if err = dm.SetIndexes(idxItmType, utils.ConcatenatedKey(newFlt.Tenant, ctx), + if err = dm.SetIndexes(idxItmType, tntCtx, updIdx, false, utils.NonTransactional); err != nil { + guardian.Guardian.UnguardIDs(refID) return } + guardian.Guardian.UnguardIDs(refID) } } } @@ -715,8 +745,12 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) { } // removeFilterIndexesForFilter removes the itemID for the index keys +// used to remove the old indexes when a filter is updated func removeFilterIndexesForFilter(dm *DataManager, idxItmType, tnt string, removeIndexKeys []string, itemIDs utils.StringSet) (err error) { + refID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tnt) + defer guardian.Guardian.UnguardIDs(refID) for _, idxKey := range removeIndexKeys { // delete old filters indexes for this item var remIndx map[string]utils.StringSet if remIndx, err = dm.GetIndexes(idxItmType, tnt,