Updated filter index health

This commit is contained in:
Trial97
2021-07-09 16:22:03 +03:00
committed by Dan Christian Bogos
parent 21be029d65
commit bbb975b274
2 changed files with 45 additions and 32 deletions

View File

@@ -470,7 +470,7 @@ func getFilterAsIndexSet(dm *DataManager, fltrIdxCache *ltcache.Cache, idxItmTyp
// updateFilterIHMisingIndx updates the reply with the missing indexes for a specific object( obj->filter->index relation)
func updateFilterIHMisingIndx(dm *DataManager, fltrCache, fltrIdxCache *ltcache.Cache, filterIDs []string, indxType, tnt, tntCtx, itmID string, rply *FilterIHReply) (_ *FilterIHReply, err error) {
if len(filterIDs) == 0 {
if len(filterIDs) == 0 { // no filter so check the *none:*any:*any index
idxKey := utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny)
var rcvIndx utils.StringSet
if rcvIndx, err = getIHFltrIdxFromCache(dm, nil, indxType, tntCtx, idxKey); err != nil {
@@ -486,7 +486,7 @@ func updateFilterIHMisingIndx(dm *DataManager, fltrCache, fltrIdxCache *ltcache.
return rply, nil
}
for _, fltrID := range filterIDs {
for _, fltrID := range filterIDs { // parse all the filters
var fltr *Filter
if fltr, err = getIHFltrFromCache(dm, fltrCache, tnt, fltrID); err != nil {
if err != utils.ErrNotFound {
@@ -497,10 +497,10 @@ func updateFilterIHMisingIndx(dm *DataManager, fltrCache, fltrIdxCache *ltcache.
continue
}
var indexes map[string]utils.StringSet
if indexes, err = getFilterAsIndexSet(dm, fltrIdxCache, indxType, tntCtx, fltr); err != nil {
if indexes, err = getFilterAsIndexSet(dm, fltrIdxCache, indxType, tntCtx, fltr); err != nil { // build the index from filter
return
}
for key, idx := range indexes {
for key, idx := range indexes { // check if the item is in the indexes
if !idx.Has(itmID) {
key = utils.ConcatenatedKey(tntCtx, key)
rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
@@ -513,7 +513,7 @@ func updateFilterIHMisingIndx(dm *DataManager, fltrCache, fltrIdxCache *ltcache.
// GetFltrIdxHealth returns the missing indexes for all objects
func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcache.Cache, indxType string) (rply *FilterIHReply, err error) {
// check the objects ( obj->filter->index relation)
rply = &FilterIHReply{
rply = &FilterIHReply{ // prepare the reply
MissingIndexes: make(map[string][]string),
BrokenIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
@@ -523,7 +523,7 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
if ids, err = dm.dataDB.GetKeysForPrefix(objPrfx); err != nil {
return
}
for _, id := range ids {
for _, id := range ids { // get all the objects from DB
id = strings.TrimPrefix(id, objPrfx)
tntID := utils.NewTenantID(id)
var obj *objFIH
@@ -531,7 +531,7 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
return
}
if obj.contexts == nil {
if obj.contexts == nil { // update the reply
if rply, err = updateFilterIHMisingIndx(dm, fltrCache, fltrIdxCache, obj.filterIDs, indxType, tntID.Tenant, tntID.Tenant, tntID.ID, rply); err != nil {
return
}
@@ -550,7 +550,7 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
if indexKeys, err = dm.dataDB.GetKeysForPrefix(idxPrfx); err != nil {
return
}
for _, dataID := range indexKeys {
for _, dataID := range indexKeys { // get all the indexes
dataID = strings.TrimPrefix(dataID, idxPrfx)
splt := utils.SplitConcatenatedKey(dataID) // tntCtx:filterType:fieldName:fieldVal
@@ -582,19 +582,19 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
continue
}
if ctx != nil ||
(obj.contexts == nil || !utils.IsSliceMember(*obj.contexts, *ctx)) {
(obj.contexts == nil || !utils.IsSliceMember(*obj.contexts, *ctx)) { // check the contexts if present
key := utils.ConcatenatedKey(tntCtx, idxKey)
rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
continue
}
if len(obj.filterIDs) == 0 {
if len(obj.filterIDs) == 0 { // check if the index is *none:*any:*any
if utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny) != idxKey {
rply.BrokenIndexes[dataID] = append(rply.BrokenIndexes[dataID], itmID)
}
continue
}
var hasIndx bool
for _, fltrID := range obj.filterIDs {
var hasIndx bool // just one filter needs to be the index
for _, fltrID := range obj.filterIDs { // get the index for each filter from the object
var fltr *Filter
if fltr, err = getIHFltrFromCache(dm, fltrCache, tnt, fltrID); err != nil {
if err != utils.ErrNotFound {
@@ -604,7 +604,7 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
continue
}
var indexes map[string]utils.StringSet
if indexes, err = getFilterAsIndexSet(dm, fltrIdxCache, indxType, tntCtx, fltr); err != nil { //TODO
if indexes, err = getFilterAsIndexSet(dm, fltrIdxCache, indxType, tntCtx, fltr); err != nil {
return
}
idx, has := indexes[idxKey]
@@ -622,10 +622,10 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
return
}
// GetRevFltrIdxHealth returns the missing reverse indexes for all objects
// getRevFltrIdxHealthFromObj returns the missing reverse indexes for all objects of the given type
func getRevFltrIdxHealthFromObj(dm *DataManager, fltrCache, revFltrIdxCache, objCache *ltcache.Cache, indxType string) (rply *ReverseFilterIHReply, err error) {
// check the objects ( obj->filter->index relation)
rply = &ReverseFilterIHReply{
rply = &ReverseFilterIHReply{ // prepare the reply
MissingReverseIndexes: make(map[string][]string),
BrokenReverseIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
@@ -635,7 +635,7 @@ func getRevFltrIdxHealthFromObj(dm *DataManager, fltrCache, revFltrIdxCache, obj
if ids, err = dm.dataDB.GetKeysForPrefix(objPrfx); err != nil {
return
}
for _, id := range ids {
for _, id := range ids { // get all the objects
id = strings.TrimPrefix(id, objPrfx)
tntID := utils.NewTenantID(id)
var obj *objFIH
@@ -643,12 +643,12 @@ func getRevFltrIdxHealthFromObj(dm *DataManager, fltrCache, revFltrIdxCache, obj
return
}
if obj.contexts == nil {
if obj.contexts == nil { // no contexts
for _, fltrID := range obj.filterIDs {
if strings.HasPrefix(fltrID, utils.Meta) {
continue
}
if _, err = getIHFltrFromCache(dm, fltrCache, tntID.Tenant, fltrID); err != nil {
if _, err = getIHFltrFromCache(dm, fltrCache, tntID.Tenant, fltrID); err != nil { // check if the filter exists
if err != utils.ErrNotFound {
return
}
@@ -658,7 +658,7 @@ func getRevFltrIdxHealthFromObj(dm *DataManager, fltrCache, revFltrIdxCache, obj
continue
}
var revIdx utils.StringSet
if revIdx, err = getIHFltrIdxFromCache(dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntID.Tenant, fltrID), indxType); err != nil {
if revIdx, err = getIHFltrIdxFromCache(dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntID.Tenant, fltrID), indxType); err != nil { // check the reverese index
if err == utils.ErrNotFound {
rply.MissingReverseIndexes[id] = append(rply.MissingReverseIndexes[id], fltrID)
err = nil
@@ -675,8 +675,17 @@ func getRevFltrIdxHealthFromObj(dm *DataManager, fltrCache, revFltrIdxCache, obj
if strings.HasPrefix(fltrID, utils.Meta) {
continue
}
if _, err = getIHFltrFromCache(dm, fltrCache, tntID.Tenant, fltrID); err != nil { // check if the filter exists
if err != utils.ErrNotFound {
return
}
err = nil
key := utils.ConcatenatedKey(tntID.Tenant, fltrID)
rply.MissingFilters[key] = append(rply.MissingFilters[key], tntID.ID)
continue
}
var revIdx utils.StringSet
if revIdx, err = getIHFltrIdxFromCache(dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntID.Tenant, fltrID), indxType); err != nil {
if revIdx, err = getIHFltrIdxFromCache(dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntID.Tenant, fltrID), indxType); err != nil { // check the reverese index
if err == utils.ErrNotFound {
for _, ctx := range *obj.contexts {
rply.MissingReverseIndexes[id] = append(rply.MissingReverseIndexes[id], utils.ConcatenatedKey(fltrID, ctx))
@@ -686,7 +695,7 @@ func getRevFltrIdxHealthFromObj(dm *DataManager, fltrCache, revFltrIdxCache, obj
}
return
}
for _, ctx := range *obj.contexts {
for _, ctx := range *obj.contexts { // check the context
if !revIdx.Has(utils.ConcatenatedKey(id, ctx)) {
rply.MissingReverseIndexes[id] = append(rply.MissingReverseIndexes[id], utils.ConcatenatedKey(fltrID, ctx))
}
@@ -696,19 +705,22 @@ func getRevFltrIdxHealthFromObj(dm *DataManager, fltrCache, revFltrIdxCache, obj
}
return
}
// getRevFltrIdxHealthFromReverse parses the reverse indexes and updates the reply
func getRevFltrIdxHealthFromReverse(dm *DataManager, fltrCache, revFltrIdxCache *ltcache.Cache, objCaches map[string]*ltcache.Cache, rply map[string]*ReverseFilterIHReply) (_ map[string]*ReverseFilterIHReply, err error) {
var revIndexKeys []string
if revIndexKeys, err = dm.dataDB.GetKeysForPrefix(utils.FilterIndexPrfx); err != nil {
return
}
for _, revIdxKey := range revIndexKeys {
for _, revIdxKey := range revIndexKeys { // parse all the reverse indexes
// compose the needed information from key
revIdxKey = strings.TrimPrefix(revIdxKey, utils.FilterIndexPrfx)
revIDxSplit := strings.SplitN(revIdxKey, utils.ConcatenatedKeySep, 3)
tnt, fltrID, indxType := revIDxSplit[0], revIDxSplit[1], revIDxSplit[2]
revIdxKey = utils.ConcatenatedKey(tnt, fltrID)
objCache := objCaches[indxType]
if _, has := rply[indxType]; !has {
if _, has := rply[indxType]; !has { // make sure that the reply has the type in map
rply[indxType] = &ReverseFilterIHReply{
MissingReverseIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
@@ -721,7 +733,7 @@ func getRevFltrIdxHealthFromReverse(dm *DataManager, fltrCache, revFltrIdxCache
return
}
for itemIDCtx := range revIdx {
var id, ctx string
var id, ctx string // split the id and context
if !strings.Contains(itemIDCtx, utils.ConcatenatedKeySep) {
id = itemIDCtx
} else {
@@ -730,7 +742,7 @@ func getRevFltrIdxHealthFromReverse(dm *DataManager, fltrCache, revFltrIdxCache
ctx = spl[1]
}
var obj *objFIH
if obj, err = getIHObjFromCache(dm, objCache, indxType, tnt, id); err != nil {
if obj, err = getIHObjFromCache(dm, objCache, indxType, tnt, id); err != nil { // get the object
if err == utils.ErrNotFound {
rply[indxType].MissingObjects = append(rply[indxType].MissingObjects, utils.ConcatenatedKey(tnt, id))
err = nil
@@ -738,10 +750,10 @@ func getRevFltrIdxHealthFromReverse(dm *DataManager, fltrCache, revFltrIdxCache
}
return
}
if !utils.IsSliceMember(obj.filterIDs, fltrID) {
if !utils.IsSliceMember(obj.filterIDs, fltrID) { // check the filters
key := utils.ConcatenatedKey(tnt, itemIDCtx)
rply[indxType].BrokenReverseIndexes[key] = append(rply[indxType].BrokenReverseIndexes[key], fltrID)
} else if obj.contexts != nil && !utils.IsSliceMember(*obj.contexts, ctx) {
} else if obj.contexts != nil && !utils.IsSliceMember(*obj.contexts, ctx) { // and the contexts
key := utils.ConcatenatedKey(tnt, itemIDCtx)
rply[indxType].BrokenReverseIndexes[key] = append(rply[indxType].BrokenReverseIndexes[key], fltrID)
}
@@ -751,10 +763,11 @@ func getRevFltrIdxHealthFromReverse(dm *DataManager, fltrCache, revFltrIdxCache
return rply, nil
}
// GetRevFltrIdxHealth will return all the broken indexes
func GetRevFltrIdxHealth(dm *DataManager, fltrCache, revFltrIdxCache *ltcache.Cache, objCaches map[string]*ltcache.Cache) (rply map[string]*ReverseFilterIHReply, err error) {
rply = make(map[string]*ReverseFilterIHReply)
for indxType := range utils.CacheIndexesToPrefix {
if indxType == utils.CacheReverseFilterIndexes {
for indxType := range utils.CacheIndexesToPrefix { // parse all posible filter indexes
if indxType == utils.CacheReverseFilterIndexes { // ommit the reverse indexes
continue
}
if rply[indxType], err = getRevFltrIdxHealthFromObj(dm, fltrCache, revFltrIdxCache, objCaches[indxType], indxType); err != nil {
@@ -763,7 +776,7 @@ func GetRevFltrIdxHealth(dm *DataManager, fltrCache, revFltrIdxCache *ltcache.Ca
}
rply, err = getRevFltrIdxHealthFromReverse(dm, fltrCache, revFltrIdxCache, objCaches, rply)
for k, v := range rply { // should be a safe for (even on rply==nil)
if len(v.MissingFilters) == 0 &&
if len(v.MissingFilters) == 0 && // remove nonpopulated objects
len(v.MissingObjects) == 0 &&
len(v.BrokenReverseIndexes) == 0 &&
len(v.MissingReverseIndexes) == 0 {

View File

@@ -358,9 +358,9 @@ func TestHealthReverseFilter(t *testing.T) {
exp := map[string]*ReverseFilterIHReply{
utils.CacheAttributeFilterIndexes: {
MissingReverseIndexes: map[string][]string{
"cgrates.org:ATTR1": {"Fltr1:*any"},
// "cgrates.org:ATTR1": {"Fltr1:*any"},
},
MissingFilters: make(map[string][]string),
MissingFilters: map[string][]string{"cgrates.org:Fltr1": {"ATTR1"}},
BrokenReverseIndexes: map[string][]string{
"cgrates.org:ATTR1:*cdrs": {"Fltr1", "Fltr2"},
},