Updated the locking for filter indexes

This commit is contained in:
Trial97
2020-08-31 17:07:10 +03:00
committed by Dan Christian Bogos
parent f85a12f0ef
commit bd0626a908

View File

@@ -32,6 +32,7 @@ var (
)
// newFilterIndex will get the index from DataManager if is not found it will create it
// is used to update the mentioned index
func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string) (indexes map[string]utils.StringSet, err error) {
tntCtx := tnt
if ctx != utils.EmptyString {
@@ -43,7 +44,7 @@ func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filter
var rcvIndx map[string]utils.StringSet
if rcvIndx, err = dm.GetIndexes(idxItmType, tntCtx,
idxKey,
true, true); err != nil {
true, false); err != nil {
if err != utils.ErrNotFound {
return
}
@@ -56,6 +57,8 @@ func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filter
}
return
}
// in case of more filters we parse each filter rule and only for supported index types
// we try to get them from Cache/DataDB or if not found in this location we create them here
for _, fltrID := range filterIDs {
var fltr *Filter
if fltr, err = dm.GetFilter(tnt, fltrID,
@@ -86,6 +89,7 @@ func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filter
continue
}
var rcvIndx map[string]utils.StringSet
// only read from cache in case if we do not find the index to not cache the negative response
if rcvIndx, err = dm.GetIndexes(idxItmType, tntCtx,
idxKey, true, false); err != nil {
if err != utils.ErrNotFound {
@@ -106,36 +110,47 @@ func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filter
// addItemToFilterIndex will add the itemID to the existing/created index and set it in the DataDB
func addItemToFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string) (err error) {
var indexes map[string]utils.StringSet
if indexes, err = newFilterIndex(dm, idxItmType, tnt, ctx, itemID, filterIDs); err != nil {
return
}
if len(indexes) == 0 { // in case we have a profile with only non indexable filters(e.g. only *gt)
return
}
tntCtx := tnt
if ctx != utils.EmptyString {
tntCtx = utils.ConcatenatedKey(tnt, ctx)
}
refID := guardian.Guardian.GuardIDs("",
// early lock to be sure that until we do not write back the indexes
// another goroutine can't create new indexes
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx)
defer guardian.Guardian.UnguardIDs(refID)
for _, index := range indexes {
index.Add(itemID)
var indexes map[string]utils.StringSet
if indexes, err = newFilterIndex(dm, idxItmType, tnt, ctx, itemID, filterIDs); err != nil {
return
}
// in case we have a profile with only non indexable filters(e.g. only *gt)
if len(indexes) == 0 {
return
}
for indxKey := range indexes {
for indxKey, index := range indexes {
index.Add(itemID)
// remove from cache in order to corectly update the index
if err = Cache.Remove(idxItmType, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil {
return
}
}
return dm.SetIndexes(idxItmType, tntCtx, indexes, true, utils.NonTransactional)
}
// removeItemFromFilterIndex will remove the itemID from the existing/created index and set it in the DataDB
func removeItemFromFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string) (err error) {
tntCtx := tnt
if ctx != utils.EmptyString {
tntCtx = utils.ConcatenatedKey(tnt, ctx)
}
// early lock to be sure that until we do not write back the indexes
// another goroutine can't create new indexes
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx)
defer guardian.Guardian.UnguardIDs(refID)
var indexes map[string]utils.StringSet
if indexes, err = newFilterIndex(dm, idxItmType, tnt, ctx, itemID, filterIDs); err != nil {
return
@@ -143,24 +158,13 @@ func removeItemFromFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID str
if len(indexes) == 0 { // in case we have a profile with only non indexable filters(e.g. only *gt)
return
}
tntCtx := tnt
if ctx != utils.EmptyString {
tntCtx = utils.ConcatenatedKey(tnt, ctx)
}
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx)
defer guardian.Guardian.UnguardIDs(refID)
for idxKey, index := range indexes {
index.Remove(itemID)
if index.Size() == 0 { // empty index set it with nil for cache
indexes[idxKey] = nil // this will not be set in DB(handled by driver)
}
}
for indxKey := range indexes {
if err = Cache.Remove(idxItmType, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil {
// remove from cache in order to corectly update the index
if err = Cache.Remove(idxItmType, utils.ConcatenatedKey(tntCtx, idxKey), true, utils.NonTransactional); err != nil {
return
}
}
@@ -331,10 +335,8 @@ func updatedIndexesWithContexts(dm *DataManager, idxItmType, tnt, itemID string,
}
}
}
} else {
if err = addIndexFiltersItem(dm, idxItmType, tnt, itemID, newFilterIDs); err != nil {
return
}
} else if err = addIndexFiltersItem(dm, idxItmType, tnt, itemID, newFilterIDs); err != nil {
return
}
// add indexes for new contexts
@@ -379,6 +381,11 @@ func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string,
if ctx != utils.EmptyString {
tntCtx = utils.ConcatenatedKey(tnt, ctx)
}
// early lock to be sure that until we do not write back the indexes
// another goroutine can't create new indexes
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx)
defer guardian.Guardian.UnguardIDs(refID)
for _, id := range profilesIDs {
var filterIDs *[]string
if filterIDs, err = getFilters(tnt, id, ctx); err != nil {
@@ -392,6 +399,7 @@ func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string,
tnt, ctx, id, *filterIDs); err != nil {
return
}
// ensure that the item is in the index set
for _, idx := range index {
idx.Add(id)
}
@@ -434,7 +442,7 @@ func addIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, filter
return
}
// addIndexFiltersItem will removes a reference for the items in the reverse filter index
// removeIndexFiltersItem will removes a reference for the items in the reverse filter index
func removeIndexFiltersItem(dm *DataManager, idxItmType, tnt, itemID string, filterIDs []string) (err error) {
for _, ID := range filterIDs {
if strings.HasPrefix(ID, utils.Meta) { // skip inline
@@ -536,7 +544,7 @@ func UpdateFilterIndex(dm *DataManager, oldFlt, newFlt *Filter) (err error) {
var rcvIndx map[string]utils.StringSet
if rcvIndx, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, newFlt.TenantID(),
utils.EmptyString, true, true); err != nil {
utils.EmptyString, true, false); err != nil {
if err != utils.ErrNotFound {
return
}
@@ -722,13 +730,6 @@ func removeFilterIndexesForFilter(dm *DataManager, idxItmType, tnt string,
for idx := range itemIDs {
remIndx[idxKey].Remove(idx)
}
// for indxKey := range remIndx {
// if err = Cache.Remove(idxItmType, utils.ConcatenatedKey(tnt, indxKey),
// true, utils.NonTransactional); err != nil {
// return
// }
// }
if err = dm.SetIndexes(idxItmType, tnt, remIndx, true, utils.NonTransactional); err != nil {
return
}