Added reverse index filter for index health check

This commit is contained in:
Trial97
2021-07-01 11:24:44 +03:00
committed by Dan Christian Bogos
parent 795fca2c8b
commit 2f61746959

View File

@@ -271,9 +271,10 @@ func GetReverseDestinationsIndexHealth(dm *DataManager, objLimit, indexLimit int
}
type FilterIHReply struct {
MissingObjects []string // list of object that are referenced in indexes but are not found in the dataDB
MissingIndexes map[string][]string // list of missing indexes for each object (the map has the key as the objectID and a list of indexes)
MissingFilters map[string][]string // list of broken references (the map has the key as the filterID and a list of objectIDs)
MissingObjects []string // list of object that are referenced in indexes but are not found in the dataDB
MissingIndexes map[string][]string // list of missing indexes for each object (the map has the key as the objectID and a list of indexes)
MissingReverseIndexes map[string][]string // list of missing reverse indexes for each object (the map has the key as the objectID and a list of filters)
MissingFilters map[string][]string // list of broken references (the map has the key as the filterID and a list of objectIDs)
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -487,7 +488,7 @@ func updateFilterIHMisingIndx(dm *DataManager, fltrCache, fltrIdxCache *ltcache.
}
// getFltrIdxHealth returns the missing indexes for all objects
func getFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcache.Cache, indxType string) (rply *FilterIHReply, err error) {
func getFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, revFltrIdxCache, objCache *ltcache.Cache, indxType string) (rply *FilterIHReply, err error) {
// check the objects ( obj->filter->index relation)
objPrfx := utils.CacheIndexesToPrefix[indxType]
var ids []string
@@ -506,12 +507,48 @@ func getFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
if rply, err = updateFilterIHMisingIndx(dm, fltrCache, fltrIdxCache, obj.filterIDs, indxType, tntID.Tenant, tntID.Tenant, id, rply); err != nil {
return
}
for _, fltrID := range obj.filterIDs {
if strings.HasPrefix(fltrID, utils.Meta) {
continue
}
var revIdx utils.StringSet
if revIdx, err = getIHFltrIdxFromCache(dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntID.Tenant, fltrID), indxType); err != nil {
if err == utils.ErrNotFound {
rply.MissingReverseIndexes[id] = append(rply.MissingReverseIndexes[id], fltrID)
continue
}
return
}
if !revIdx.Has(tntID.ID) {
rply.MissingReverseIndexes[id] = append(rply.MissingReverseIndexes[id], fltrID)
}
}
} else {
for _, ctx := range *obj.contexts {
if rply, err = updateFilterIHMisingIndx(dm, fltrCache, fltrIdxCache, obj.filterIDs, indxType, tntID.Tenant, utils.ConcatenatedKey(tntID.Tenant, ctx), id, rply); err != nil {
return
}
}
for _, fltrID := range obj.filterIDs {
if strings.HasPrefix(fltrID, utils.Meta) {
continue
}
var revIdx utils.StringSet
if revIdx, err = getIHFltrIdxFromCache(dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntID.Tenant, fltrID), indxType); err != nil {
if err == utils.ErrNotFound {
for _, ctx := range *obj.contexts {
rply.MissingReverseIndexes[id] = append(rply.MissingReverseIndexes[id], utils.ConcatenatedKey(fltrID, ctx))
}
continue
}
return
}
for _, ctx := range *obj.contexts {
if !revIdx.Has(utils.ConcatenatedKey(id, ctx)) {
rply.MissingReverseIndexes[id] = append(rply.MissingReverseIndexes[id], utils.ConcatenatedKey(fltrID, ctx))
}
}
}
}
}
@@ -578,6 +615,7 @@ func getFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
if err != utils.ErrNotFound {
return
}
fltrID = utils.ConcatenatedKey(tnt, fltrID)
rply.MissingFilters[fltrID] = append(rply.MissingFilters[fltrID], itmID)
continue
}
@@ -592,5 +630,44 @@ func getFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
}
}
}
revIdxPrfx := utils.CacheInstanceToPrefix[indxType]
var revIndexKeys []string
if revIndexKeys, err = dm.dataDB.GetKeysForPrefix(revIdxPrfx); err != nil {
return
}
for _, revIdxKey := range revIndexKeys {
revIdxKey = strings.TrimPrefix(revIdxKey, revIdxPrfx)
fltrID := utils.NewTenantID(revIdxKey)
var revIdx utils.StringSet
if revIdx, err = getIHFltrIdxFromCache(dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, revIdxKey, indxType); err != nil {
return
}
for itemIDCtx := range revIdx {
var id, ctx string
if !strings.Contains(itemIDCtx, utils.ConcatenatedKeySep) {
id = itemIDCtx
} else {
spl := strings.SplitN(itemIDCtx, utils.ConcatenatedKeySep, 2)
id = spl[0]
ctx = spl[1]
}
var obj *objFIH
if obj, err = getIHObjFromCache(dm, objCache, indxType, fltrID.ID, id); err != nil {
if err == utils.ErrNotFound {
rply.MissingObjects = append(rply.MissingObjects, utils.ConcatenatedKey(fltrID.Tenant, id))
continue
}
return
}
if !utils.IsSliceMember(obj.filterIDs, fltrID.ID) {
rply.MissingFilters[revIdxKey] = append(rply.MissingFilters[revIdxKey], id)
} else if obj.contexts != nil && !utils.IsSliceMember(*obj.contexts, ctx) {
rply.MissingFilters[revIdxKey] = append(rply.MissingFilters[revIdxKey], itemIDCtx)
}
}
}
return
}