From bd0626a9089e4b058e8df2eba9bcd35b1c0c1e28 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Mon, 31 Aug 2020 17:07:10 +0300 Subject: [PATCH] Updated the locking for filter indexes --- engine/libindex.go | 79 +++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/engine/libindex.go b/engine/libindex.go index 8ee9a441e..ba5ba62e9 100644 --- a/engine/libindex.go +++ b/engine/libindex.go @@ -32,6 +32,7 @@ var ( ) // newFilterIndex will get the index from DataManager if is not found it will create it +// is used to update the mentioned index func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string) (indexes map[string]utils.StringSet, err error) { tntCtx := tnt if ctx != utils.EmptyString { @@ -43,7 +44,7 @@ func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filter var rcvIndx map[string]utils.StringSet if rcvIndx, err = dm.GetIndexes(idxItmType, tntCtx, idxKey, - true, true); err != nil { + true, false); err != nil { if err != utils.ErrNotFound { return } @@ -56,6 +57,8 @@ func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filter } return } + // in case of more filters we parse each filter rule and only for supported index types + // we try to get them from Cache/DataDB or if not found in this location we create them here for _, fltrID := range filterIDs { var fltr *Filter if fltr, err = dm.GetFilter(tnt, fltrID, @@ -86,6 +89,7 @@ func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filter continue } var rcvIndx map[string]utils.StringSet + // only read from cache in case if we do not find the index to not cache the negative response if rcvIndx, err = dm.GetIndexes(idxItmType, tntCtx, idxKey, true, false); err != nil { if err != utils.ErrNotFound { @@ -106,36 +110,47 @@ func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filter // addItemToFilterIndex will add the itemID to the existing/created index and set it in the DataDB func addItemToFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string) (err error) { - var indexes map[string]utils.StringSet - if indexes, err = newFilterIndex(dm, idxItmType, tnt, ctx, itemID, filterIDs); err != nil { - return - } - if len(indexes) == 0 { // in case we have a profile with only non indexable filters(e.g. only *gt) - return - } tntCtx := tnt if ctx != utils.EmptyString { tntCtx = utils.ConcatenatedKey(tnt, ctx) } - refID := guardian.Guardian.GuardIDs("", + // early lock to be sure that until we do not write back the indexes + // another goroutine can't create new indexes + refID := guardian.Guardian.GuardIDs(utils.EmptyString, config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx) defer guardian.Guardian.UnguardIDs(refID) - for _, index := range indexes { - index.Add(itemID) + var indexes map[string]utils.StringSet + if indexes, err = newFilterIndex(dm, idxItmType, tnt, ctx, itemID, filterIDs); err != nil { + return + } + // in case we have a profile with only non indexable filters(e.g. only *gt) + if len(indexes) == 0 { + return } - for indxKey := range indexes { + for indxKey, index := range indexes { + index.Add(itemID) + // remove from cache in order to corectly update the index if err = Cache.Remove(idxItmType, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil { return } } - return dm.SetIndexes(idxItmType, tntCtx, indexes, true, utils.NonTransactional) } // removeItemFromFilterIndex will remove the itemID from the existing/created index and set it in the DataDB func removeItemFromFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string) (err error) { + tntCtx := tnt + if ctx != utils.EmptyString { + tntCtx = utils.ConcatenatedKey(tnt, ctx) + } + // early lock to be sure that until we do not write back the indexes + // another goroutine can't create new indexes + refID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx) + defer guardian.Guardian.UnguardIDs(refID) + var indexes map[string]utils.StringSet if indexes, err = newFilterIndex(dm, idxItmType, tnt, ctx, itemID, filterIDs); err != nil { return @@ -143,24 +158,13 @@ func removeItemFromFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID str if len(indexes) == 0 { // in case we have a profile with only non indexable filters(e.g. only *gt) return } - - tntCtx := tnt - if ctx != utils.EmptyString { - tntCtx = utils.ConcatenatedKey(tnt, ctx) - } - refID := guardian.Guardian.GuardIDs("", - config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx) - defer guardian.Guardian.UnguardIDs(refID) - for idxKey, index := range indexes { index.Remove(itemID) if index.Size() == 0 { // empty index set it with nil for cache indexes[idxKey] = nil // this will not be set in DB(handled by driver) } - } - - for indxKey := range indexes { - if err = Cache.Remove(idxItmType, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil { + // remove from cache in order to corectly update the index + if err = Cache.Remove(idxItmType, utils.ConcatenatedKey(tntCtx, idxKey), true, utils.NonTransactional); err != nil { return } } @@ -331,10 +335,8 @@ func updatedIndexesWithContexts(dm *DataManager, idxItmType, tnt, itemID string, } } } - } else { - if err = addIndexFiltersItem(dm, idxItmType, tnt, itemID, newFilterIDs); err != nil { - return - } + } else if err = addIndexFiltersItem(dm, idxItmType, tnt, itemID, newFilterIDs); err != nil { + return } // add indexes for new contexts @@ -379,6 +381,11 @@ func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string, if ctx != utils.EmptyString { tntCtx = utils.ConcatenatedKey(tnt, ctx) } + // early lock to be sure that until we do not write back the indexes + // another goroutine can't create new indexes + refID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx) + defer guardian.Guardian.UnguardIDs(refID) for _, id := range profilesIDs { var filterIDs *[]string if filterIDs, err = getFilters(tnt, id, ctx); err != nil { @@ -392,6 +399,7 @@ func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string, tnt, ctx, id, *filterIDs); err != nil { return } + // ensure that the item is in the index set for _, idx := range index { idx.Add(id) } @@ -434,7 +442,7 @@ func addIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, filter return } -// addIndexFiltersItem will removes a reference for the items in the reverse filter index +// removeIndexFiltersItem will removes a reference for the items in the reverse filter index func removeIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, filterIDs []string) (err error) { for _, ID := range filterIDs { if strings.HasPrefix(ID, utils.Meta) { // skip inline @@ -536,7 +544,7 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) { var rcvIndx map[string]utils.StringSet if rcvIndx, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, newFlt.TenantID(), - utils.EmptyString, true, true); err != nil { + utils.EmptyString, true, false); err != nil { if err != utils.ErrNotFound { return } @@ -722,13 +730,6 @@ func removeFilterIndexesForFilter(dm *DataManager, idxItmType, tnt string, for idx := range itemIDs { remIndx[idxKey].Remove(idx) } - - // for indxKey := range remIndx { - // if err = Cache.Remove(idxItmType, utils.ConcatenatedKey(tnt, indxKey), - // true, utils.NonTransactional); err != nil { - // return - // } - // } if err = dm.SetIndexes(idxItmType, tnt, remIndx, true, utils.NonTransactional); err != nil { return }