Updated filter index health

This commit is contained in:
Trial97
2021-07-09 16:22:03 +03:00
committed by Dan Christian Bogos
parent 142e12a7ed
commit a1cf8ba92a

View File

@@ -448,7 +448,7 @@ func getFilterAsIndexSet(dm *DataManager, fltrIdxCache *ltcache.Cache, idxItmTyp
// updateFilterIHMisingIndx updates the reply with the missing indexes for a specific object( obj->filter->index relation)
func updateFilterIHMisingIndx(dm *DataManager, fltrCache, fltrIdxCache *ltcache.Cache, filterIDs []string, indxType, tnt, tntCtx, itmID string, rply *FilterIHReply) (_ *FilterIHReply, err error) {
if len(filterIDs) == 0 {
if len(filterIDs) == 0 { // no filter so check the *none:*any:*any index
var rcvIndx utils.StringMap
if rcvIndx, err = getIHFltrIdxFromCache(dm, nil, indxType, tntCtx, utils.META_NONE, utils.META_ANY, utils.META_ANY); err != nil {
if err != utils.ErrNotFound {
@@ -463,7 +463,7 @@ func updateFilterIHMisingIndx(dm *DataManager, fltrCache, fltrIdxCache *ltcache.
return rply, nil
}
for _, fltrID := range filterIDs {
for _, fltrID := range filterIDs { // parse all the filters
var fltr *Filter
if fltr, err = getIHFltrFromCache(dm, fltrCache, tnt, fltrID); err != nil {
if err != utils.ErrNotFound {
@@ -474,10 +474,10 @@ func updateFilterIHMisingIndx(dm *DataManager, fltrCache, fltrIdxCache *ltcache.
continue
}
var indexes map[string]utils.StringMap
if indexes, err = getFilterAsIndexSet(dm, fltrIdxCache, indxType, tntCtx, fltr); err != nil {
if indexes, err = getFilterAsIndexSet(dm, fltrIdxCache, indxType, tntCtx, fltr); err != nil { // build the index from filter
return
}
for key, idx := range indexes {
for key, idx := range indexes { // check if the item is in the indexes
if !idx.HasKey(itmID) {
key = utils.ConcatenatedKey(tntCtx, key)
rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
@@ -490,7 +490,7 @@ func updateFilterIHMisingIndx(dm *DataManager, fltrCache, fltrIdxCache *ltcache.
// GetFltrIdxHealth returns the missing indexes for all objects
func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcache.Cache, indxType string) (rply *FilterIHReply, err error) {
// check the objects ( obj->filter->index relation)
rply = &FilterIHReply{
rply = &FilterIHReply{ // prepare the reply
MissingIndexes: make(map[string][]string),
BrokenIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
@@ -500,7 +500,7 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
if ids, err = dm.dataDB.GetKeysForPrefix(objPrfx); err != nil {
return
}
for _, id := range ids {
for _, id := range ids { // get all the objects from DB
id = strings.TrimPrefix(id, objPrfx)
tntID := utils.NewTenantID(id)
var obj *objFIH
@@ -508,7 +508,7 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
return
}
if obj.contexts == nil {
if obj.contexts == nil { // update the reply
if rply, err = updateFilterIHMisingIndx(dm, fltrCache, fltrIdxCache, obj.filterIDs, indxType, tntID.Tenant, tntID.Tenant, tntID.ID, rply); err != nil {
return
}
@@ -527,7 +527,7 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
if indexKeys, err = dm.dataDB.GetKeysForPrefix(idxPrfx); err != nil {
return
}
for _, dataID := range indexKeys {
for _, dataID := range indexKeys { // get all the indexes
dataID = strings.TrimPrefix(dataID, idxPrfx)
splt := utils.SplitConcatenatedKey(dataID) // tntCtx:filterType:fieldName:fieldVal
@@ -564,19 +564,19 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
continue
}
if ctx != nil ||
(obj.contexts == nil || !utils.IsSliceMember(*obj.contexts, *ctx)) {
(obj.contexts == nil || !utils.IsSliceMember(*obj.contexts, *ctx)) { // check the contexts if present
key := utils.ConcatenatedKey(tntCtx, idxKey)
rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
continue
}
if len(obj.filterIDs) == 0 {
if len(obj.filterIDs) == 0 { // check if the index is *none:*any:*any
if utils.ConcatenatedKey(utils.META_NONE, utils.META_ANY, utils.META_ANY) != idxKey {
rply.BrokenIndexes[dataID] = append(rply.BrokenIndexes[dataID], itmID)
}
continue
}
var hasIndx bool
for _, fltrID := range obj.filterIDs {
var hasIndx bool // just one filter needs to be the index
for _, fltrID := range obj.filterIDs { // get the index for each filter from the object
var fltr *Filter
if fltr, err = getIHFltrFromCache(dm, fltrCache, tnt, fltrID); err != nil {
if err != utils.ErrNotFound {
@@ -586,7 +586,7 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
continue
}
var indexes map[string]utils.StringMap
if indexes, err = getFilterAsIndexSet(dm, fltrIdxCache, indxType, tntCtx, fltr); err != nil { //TODO
if indexes, err = getFilterAsIndexSet(dm, fltrIdxCache, indxType, tntCtx, fltr); err != nil {
return
}
idx, has := indexes[idxKey]