Updated the locks for reverse filter indexes

This commit is contained in:
Trial97
2020-09-01 09:59:37 +03:00
committed by Dan Christian Bogos
parent bd0626a908
commit e95e4bfa7b
2 changed files with 40 additions and 10 deletions

View File

@@ -990,11 +990,7 @@ func testAccITAccountMonthlyEstimated(t *testing.T) {
t.Errorf("Expected: %v,\n received: %v", 1, len(aps))
} else {
// verify the GetNextTimeStart
endOfMonth, err := utils.ParseTimeDetectLayout(utils.MetaMonthlyEstimated, "")
if err != nil {
t.Fatal(err)
}
endOfMonth := utils.GetEndOfMonth(time.Now())
if execDay := aps[0].ActionTimings[0].GetNextStartTime(time.Now()).Day(); execDay != endOfMonth.Day() {
t.Errorf("Expected: %v,\n received: %v", endOfMonth.Day(), execDay)
}

View File

@@ -418,10 +418,13 @@ func addIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, filter
continue
}
tntCtx := utils.ConcatenatedKey(tnt, ID)
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntCtx)
var indexes map[string]utils.StringSet
if indexes, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, tntCtx,
idxItmType, true, false); err != nil {
if err != utils.ErrNotFound {
guardian.Guardian.UnguardIDs(refID)
return
}
err = nil
@@ -432,12 +435,15 @@ func addIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, filter
indexes[idxItmType].Add(itemID)
for indxKey := range indexes {
if err = Cache.Remove(utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
}
if err = dm.SetIndexes(utils.CacheReverseFilterIndexes, tntCtx, indexes, true, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
guardian.Guardian.UnguardIDs(refID)
}
return
}
@@ -449,9 +455,12 @@ func removeIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, fil
continue
}
tntCtx := utils.ConcatenatedKey(tnt, ID)
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntCtx)
var indexes map[string]utils.StringSet
if indexes, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, tntCtx,
idxItmType, true, false); err != nil {
guardian.Guardian.UnguardIDs(refID)
if err != utils.ErrNotFound {
return
}
@@ -462,12 +471,15 @@ func removeIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, fil
for indxKey := range indexes {
if err = Cache.Remove(utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
}
if err = dm.SetIndexes(utils.CacheReverseFilterIndexes, tntCtx, indexes, true, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
guardian.Guardian.UnguardIDs(refID)
}
return
}
@@ -542,8 +554,13 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) {
}
}
tntID := newFlt.TenantID()
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntID)
defer guardian.Guardian.UnguardIDs(refID)
var rcvIndx map[string]utils.StringSet
if rcvIndx, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, newFlt.TenantID(),
// get all reverse indexes from DB
if rcvIndx, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, tntID,
utils.EmptyString, true, false); err != nil {
if err != utils.ErrNotFound {
return
@@ -553,6 +570,7 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) {
}
removeIndexKeys := removeRules.AsSlice()
// remove the old indexes and compute the new ones
for idxItmType, indx := range rcvIndx {
switch idxItmType {
case utils.CacheThresholdFilterIndexes:
@@ -663,23 +681,29 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) {
return
}
for _, ctx := range ap.Contexts {
tntCtx := utils.ConcatenatedKey(newFlt.Tenant, ctx)
if err = removeFilterIndexesForFilter(dm, idxItmType,
utils.ConcatenatedKey(newFlt.Tenant, ctx), // remove the indexes for the filter
tntCtx, // remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx)
var updIdx map[string]utils.StringSet
if updIdx, err = newFilterIndex(dm, idxItmType,
newFlt.Tenant, ctx, itemID, ap.FilterIDs); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
for _, idx := range updIdx {
idx.Add(itemID)
}
if err = dm.SetIndexes(idxItmType, utils.ConcatenatedKey(newFlt.Tenant, ctx),
if err = dm.SetIndexes(idxItmType, tntCtx,
updIdx, false, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
guardian.Guardian.UnguardIDs(refID)
}
}
case utils.CacheDispatcherFilterIndexes:
@@ -690,23 +714,29 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) {
return
}
for _, ctx := range dp.Subsystems {
tntCtx := utils.ConcatenatedKey(newFlt.Tenant, ctx)
if err = removeFilterIndexesForFilter(dm, idxItmType,
utils.ConcatenatedKey(newFlt.Tenant, ctx), // remove the indexes for the filter
tntCtx, // remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx)
var updIdx map[string]utils.StringSet
if updIdx, err = newFilterIndex(dm, idxItmType,
newFlt.Tenant, ctx, itemID, dp.FilterIDs); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
for _, idx := range updIdx {
idx.Add(itemID)
}
if err = dm.SetIndexes(idxItmType, utils.ConcatenatedKey(newFlt.Tenant, ctx),
if err = dm.SetIndexes(idxItmType, tntCtx,
updIdx, false, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
guardian.Guardian.UnguardIDs(refID)
}
}
}
@@ -715,8 +745,12 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) {
}
// removeFilterIndexesForFilter removes the itemID for the index keys
// used to remove the old indexes when a filter is updated
func removeFilterIndexesForFilter(dm *DataManager, idxItmType, tnt string,
removeIndexKeys []string, itemIDs utils.StringSet) (err error) {
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tnt)
defer guardian.Guardian.UnguardIDs(refID)
for _, idxKey := range removeIndexKeys { // delete old filters indexes for this item
var remIndx map[string]utils.StringSet
if remIndx, err = dm.GetIndexes(idxItmType, tnt,