Updated filter index health APIs

This commit is contained in:
Trial97
2021-07-07 17:57:44 +03:00
committed by Dan Christian Bogos
parent 1157e29f51
commit 3f6e333e29
3 changed files with 61 additions and 36 deletions

View File

@@ -616,10 +616,14 @@ func (apierSv1 *APIerSv1) GetReverseDestinationsIndexHealth(args *engine.IndexHe
}
func (apierSv1 *APIerSv1) GetReverseFilterHealth(args *engine.IndexHealthArgsWith3Ch, reply *map[string]*engine.ReverseFilterIHReply) (err error) {
objCaches := make(map[string]*ltcache.Cache)
for indxType := range utils.CacheIndexesToPrefix {
objCaches[indxType] = ltcache.NewCache(-1, 0, false, nil)
}
*reply, err = engine.GetRevFltrIdxHealth(apierSv1.DataManager,
ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
objCaches,
)
return
}

View File

@@ -287,12 +287,14 @@ func GetReverseDestinationsIndexHealth(dm *DataManager, objLimit, indexLimit int
type FilterIHReply struct {
MissingObjects []string // list of object that are referenced in indexes but are not found in the dataDB
MissingIndexes map[string][]string // list of missing indexes for each object (the map has the key as the objectID and a list of indexes)
BrokenIndexes map[string][]string // list of broken indexes for each object (the map has the key as the index and a list of objects)
MissingFilters map[string][]string // list of broken references (the map has the key as the filterID and a list of objectIDs)
}
type ReverseFilterIHReply struct {
MissingObjects []string // list of object that are referenced in indexes but are not found in the dataDB
MissingReverseIndexes map[string][]string // list of missing indexes for each object (the map has the key as the objectID and a list of indexes)
BrokenReverseIndexes map[string][]string // list of broken indexes for each object (the map has the key as the objectID and a list of indexes)
MissingFilters map[string][]string // list of broken references (the map has the key as the filterID and a list of objectIDs)
}
@@ -490,6 +492,7 @@ func updateFilterIHMisingIndx(dm *DataManager, fltrCache, fltrIdxCache *ltcache.
if err != utils.ErrNotFound {
return
}
fltrID = utils.ConcatenatedKey(tnt, fltrID)
rply.MissingFilters[fltrID] = append(rply.MissingFilters[fltrID], itmID)
continue
}
@@ -512,6 +515,7 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
// check the objects ( obj->filter->index relation)
rply = &FilterIHReply{
MissingIndexes: make(map[string][]string),
BrokenIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
}
objPrfx := utils.CacheIndexesToPrefix[indxType]
@@ -528,12 +532,12 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
}
if obj.contexts == nil {
if rply, err = updateFilterIHMisingIndx(dm, fltrCache, fltrIdxCache, obj.filterIDs, indxType, tntID.Tenant, tntID.Tenant, id, rply); err != nil {
if rply, err = updateFilterIHMisingIndx(dm, fltrCache, fltrIdxCache, obj.filterIDs, indxType, tntID.Tenant, tntID.Tenant, tntID.ID, rply); err != nil {
return
}
} else {
for _, ctx := range *obj.contexts {
if rply, err = updateFilterIHMisingIndx(dm, fltrCache, fltrIdxCache, obj.filterIDs, indxType, tntID.Tenant, utils.ConcatenatedKey(tntID.Tenant, ctx), id, rply); err != nil {
if rply, err = updateFilterIHMisingIndx(dm, fltrCache, fltrIdxCache, obj.filterIDs, indxType, tntID.Tenant, utils.ConcatenatedKey(tntID.Tenant, ctx), tntID.ID, rply); err != nil {
return
}
}
@@ -584,41 +588,34 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
continue
}
if len(obj.filterIDs) == 0 {
idxKey := utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny)
var rcvIndx utils.StringSet
if rcvIndx, err = getIHFltrIdxFromCache(dm, nil, indxType, tntCtx, idxKey); err != nil {
if err != utils.ErrNotFound {
return
}
key := utils.ConcatenatedKey(tntCtx, idxKey)
rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
err = nil
} else if !rcvIndx.Has(itmID) {
key := utils.ConcatenatedKey(tntCtx, idxKey)
rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
if utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny) != idxKey {
rply.BrokenIndexes[dataID] = append(rply.BrokenIndexes[dataID], itmID)
}
continue
}
var hasIndx bool
for _, fltrID := range obj.filterIDs {
var fltr *Filter
if fltr, err = getIHFltrFromCache(dm, fltrCache, tnt, fltrID); err != nil {
if err != utils.ErrNotFound {
return
}
fltrID = utils.ConcatenatedKey(tnt, fltrID)
rply.MissingFilters[fltrID] = append(rply.MissingFilters[fltrID], itmID)
err = nil
err = nil // should be already logged when we parsed all the objects
continue
}
var indexes map[string]utils.StringSet
if indexes, err = getFilterAsIndexSet(dm, fltrIdxCache, indxType, tntCtx, fltr); err != nil { //TODO
return
}
if idx, has := indexes[idxKey]; !has || !idx.Has(itmID) {
key := utils.ConcatenatedKey(tntCtx, idxKey)
rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
idx, has := indexes[idxKey]
if hasIndx = has && idx.Has(itmID); hasIndx {
break
}
}
if !hasIndx {
key := utils.ConcatenatedKey(tnt, idxKey)
rply.BrokenIndexes[key] = append(rply.BrokenIndexes[key], itmID)
}
}
}
@@ -630,6 +627,7 @@ func getRevFltrIdxHealthFromObj(dm *DataManager, fltrCache, revFltrIdxCache, obj
// check the objects ( obj->filter->index relation)
rply = &ReverseFilterIHReply{
MissingReverseIndexes: make(map[string][]string),
BrokenReverseIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
}
objPrfx := utils.CacheIndexesToPrefix[indxType]
@@ -650,6 +648,15 @@ func getRevFltrIdxHealthFromObj(dm *DataManager, fltrCache, revFltrIdxCache, obj
if strings.HasPrefix(fltrID, utils.Meta) {
continue
}
if _, err = getIHFltrFromCache(dm, fltrCache, tntID.Tenant, fltrID); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil
key := utils.ConcatenatedKey(tntID.Tenant, fltrID)
rply.MissingFilters[key] = append(rply.MissingFilters[key], tntID.ID)
continue
}
var revIdx utils.StringSet
if revIdx, err = getIHFltrIdxFromCache(dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntID.Tenant, fltrID), indxType); err != nil {
if err == utils.ErrNotFound {
@@ -689,7 +696,7 @@ func getRevFltrIdxHealthFromObj(dm *DataManager, fltrCache, revFltrIdxCache, obj
}
return
}
func getRevFltrIdxHealthFromReverse(dm *DataManager, fltrCache, revFltrIdxCache, objCache *ltcache.Cache, rply map[string]*ReverseFilterIHReply) (_ map[string]*ReverseFilterIHReply, err error) {
func getRevFltrIdxHealthFromReverse(dm *DataManager, fltrCache, revFltrIdxCache *ltcache.Cache, objCaches map[string]*ltcache.Cache, rply map[string]*ReverseFilterIHReply) (_ map[string]*ReverseFilterIHReply, err error) {
var revIndexKeys []string
if revIndexKeys, err = dm.dataDB.GetKeysForPrefix(utils.FilterIndexPrfx); err != nil {
return
@@ -699,11 +706,13 @@ func getRevFltrIdxHealthFromReverse(dm *DataManager, fltrCache, revFltrIdxCache,
revIDxSplit := strings.SplitN(revIdxKey, utils.ConcatenatedKeySep, 3)
tnt, fltrID, indxType := revIDxSplit[0], revIDxSplit[1], revIDxSplit[2]
revIdxKey = utils.ConcatenatedKey(tnt, fltrID)
objCache := objCaches[indxType]
if _, has := rply[indxType]; !has {
rply[indxType] = &ReverseFilterIHReply{
MissingReverseIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
BrokenReverseIndexes: make(map[string][]string),
}
}
@@ -730,9 +739,11 @@ func getRevFltrIdxHealthFromReverse(dm *DataManager, fltrCache, revFltrIdxCache,
return
}
if !utils.IsSliceMember(obj.filterIDs, fltrID) {
rply[indxType].MissingFilters[revIdxKey] = append(rply[indxType].MissingFilters[revIdxKey], id)
key := utils.ConcatenatedKey(tnt, itemIDCtx)
rply[indxType].BrokenReverseIndexes[key] = append(rply[indxType].BrokenReverseIndexes[key], fltrID)
} else if obj.contexts != nil && !utils.IsSliceMember(*obj.contexts, ctx) {
rply[indxType].MissingFilters[revIdxKey] = append(rply[indxType].MissingFilters[revIdxKey], itemIDCtx)
key := utils.ConcatenatedKey(tnt, itemIDCtx)
rply[indxType].BrokenReverseIndexes[key] = append(rply[indxType].BrokenReverseIndexes[key], fltrID)
}
}
@@ -740,20 +751,21 @@ func getRevFltrIdxHealthFromReverse(dm *DataManager, fltrCache, revFltrIdxCache,
return rply, nil
}
func GetRevFltrIdxHealth(dm *DataManager, fltrCache, revFltrIdxCache, objCache *ltcache.Cache) (rply map[string]*ReverseFilterIHReply, err error) {
func GetRevFltrIdxHealth(dm *DataManager, fltrCache, revFltrIdxCache *ltcache.Cache, objCaches map[string]*ltcache.Cache) (rply map[string]*ReverseFilterIHReply, err error) {
rply = make(map[string]*ReverseFilterIHReply)
for indxType := range utils.CacheIndexesToPrefix {
if indxType == utils.CacheReverseFilterIndexes {
continue
}
if rply[indxType], err = getRevFltrIdxHealthFromObj(dm, fltrCache, revFltrIdxCache, objCache, indxType); err != nil {
if rply[indxType], err = getRevFltrIdxHealthFromObj(dm, fltrCache, revFltrIdxCache, objCaches[indxType], indxType); err != nil {
return
}
}
rply, err = getRevFltrIdxHealthFromReverse(dm, fltrCache, revFltrIdxCache, objCache, rply)
for k, v := range rply { // shpuld be a safe for (even on rply==nil)
rply, err = getRevFltrIdxHealthFromReverse(dm, fltrCache, revFltrIdxCache, objCaches, rply)
for k, v := range rply { // should be a safe for (even on rply==nil)
if len(v.MissingFilters) == 0 &&
len(v.MissingObjects) == 0 &&
len(v.BrokenReverseIndexes) == 0 &&
len(v.MissingReverseIndexes) == 0 {
delete(rply, k)
}

View File

@@ -20,6 +20,7 @@ package engine
import (
"reflect"
"sort"
"testing"
"github.com/cgrates/cgrates/config"
@@ -307,11 +308,12 @@ func TestHealthFilter(t *testing.T) {
}
exp := &FilterIHReply{
MissingIndexes: map[string][]string{
"cgrates.org:*any:*string:*req.Account:1001": {"cgrates.org:ATTR1"},
"cgrates.org:*any:*string:*req.Account:1001": {"ATTR1"},
"cgrates.org:*any:*string:*req.Account:1002": {"ATTR1"},
},
BrokenIndexes: make(map[string][]string),
MissingFilters: map[string][]string{
"Fltr1": {"cgrates.org:ATTR1"},
"cgrates.org:Fltr1": {"ATTR1"},
},
MissingObjects: []string{"cgrates.org:ATTR2"},
}
@@ -358,20 +360,27 @@ func TestHealthReverseFilter(t *testing.T) {
MissingReverseIndexes: map[string][]string{
"cgrates.org:ATTR1": {"Fltr1:*any"},
},
MissingFilters: map[string][]string{
"cgrates.org:Fltr2": {"ATTR1"},
"cgrates.org:Fltr1": {"ATTR1:*cdrs"},
MissingFilters: make(map[string][]string),
BrokenReverseIndexes: map[string][]string{
"cgrates.org:ATTR1:*cdrs": {"Fltr1", "Fltr2"},
},
MissingObjects: []string{"cgrates.org:ATTR2"},
},
}
objCaches := make(map[string]*ltcache.Cache)
for indxType := range utils.CacheIndexesToPrefix {
objCaches[indxType] = ltcache.NewCache(-1, 0, false, nil)
}
if rply, err := GetRevFltrIdxHealth(dm,
ltcache.NewCache(-1, 0, false, nil),
ltcache.NewCache(-1, 0, false, nil),
ltcache.NewCache(-1, 0, false, nil)); err != nil {
objCaches); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expecting: %+v,\n received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
} else {
sort.Strings(rply[utils.CacheAttributeFilterIndexes].BrokenReverseIndexes["cgrates.org:ATTR1:*cdrs"])
if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expecting: %+v,\n received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
}
}
}