mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated index health APIs
This commit is contained in:
committed by
Dan Christian Bogos
parent
7411db3a31
commit
1578e542ff
@@ -103,7 +103,7 @@ func testV1FIdxHRpcConn(t *testing.T) {
|
||||
|
||||
func testV1FIdxHLoadFromFolder(t *testing.T) {
|
||||
var reply string
|
||||
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
|
||||
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial2")}
|
||||
if err := tFIdxHRpc.Call(utils.APIerSv1LoadTariffPlanFromFolder, attrs, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
---
|
||||
- hosts: all
|
||||
- hosts: local
|
||||
vars:
|
||||
###############################################################
|
||||
##################### Golang Vars #############################
|
||||
|
||||
@@ -78,48 +78,56 @@ func newFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filter
|
||||
}
|
||||
return
|
||||
}
|
||||
for _, flt := range fltr.Rules {
|
||||
if !FilterIndexTypes.Has(flt.Type) ||
|
||||
IsDynamicDPPath(flt.Element) {
|
||||
continue
|
||||
}
|
||||
isDyn := strings.HasPrefix(flt.Element, utils.DynamicDataPrefix)
|
||||
for _, fldVal := range flt.Values {
|
||||
if IsDynamicDPPath(fldVal) {
|
||||
continue
|
||||
}
|
||||
var idxKey string
|
||||
if isDyn {
|
||||
if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) { // do not index if both the element and the value is dynamic
|
||||
continue
|
||||
}
|
||||
idxKey = utils.ConcatenatedKey(flt.Type, flt.Element[1:], fldVal)
|
||||
} else if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) {
|
||||
idxKey = utils.ConcatenatedKey(flt.Type, fldVal[1:], flt.Element)
|
||||
} else {
|
||||
// do not index not dynamic filters
|
||||
continue
|
||||
}
|
||||
var rcvIndx map[string]utils.StringSet
|
||||
// only read from cache in case if we do not find the index to not cache the negative response
|
||||
if rcvIndx, err = dm.GetIndexes(idxItmType, tntCtx,
|
||||
idxKey, true, false); err != nil {
|
||||
if err != utils.ErrNotFound {
|
||||
return
|
||||
}
|
||||
err = nil
|
||||
indexes[idxKey] = make(utils.StringSet) // create an empty index if is not found in DB in case we add them later
|
||||
continue
|
||||
}
|
||||
for idxKey, idx := range rcvIndx { // parse the received indexes
|
||||
indexes[idxKey] = idx
|
||||
}
|
||||
}
|
||||
if indexes, err = addFilterToIndexSet(dm, idxItmType, tntCtx, fltr, indexes); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// addFilterToIndexSet will parse the rules of filter and add them to the index map received
|
||||
func addFilterToIndexSet(dm *DataManager, idxItmType, tntCtx string, fltr *Filter, indexes map[string]utils.StringSet) (_ map[string]utils.StringSet, err error) {
|
||||
for _, flt := range fltr.Rules {
|
||||
if !FilterIndexTypes.Has(flt.Type) ||
|
||||
IsDynamicDPPath(flt.Element) {
|
||||
continue
|
||||
}
|
||||
isDyn := strings.HasPrefix(flt.Element, utils.DynamicDataPrefix)
|
||||
for _, fldVal := range flt.Values {
|
||||
if IsDynamicDPPath(fldVal) {
|
||||
continue
|
||||
}
|
||||
var idxKey string
|
||||
if isDyn {
|
||||
if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) { // do not index if both the element and the value is dynamic
|
||||
continue
|
||||
}
|
||||
idxKey = utils.ConcatenatedKey(flt.Type, flt.Element[1:], fldVal)
|
||||
} else if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) {
|
||||
idxKey = utils.ConcatenatedKey(flt.Type, fldVal[1:], flt.Element)
|
||||
} else {
|
||||
// do not index not dynamic filters
|
||||
continue
|
||||
}
|
||||
var rcvIndx map[string]utils.StringSet
|
||||
// only read from cache in case if we do not find the index to not cache the negative response
|
||||
if rcvIndx, err = dm.GetIndexes(idxItmType, tntCtx,
|
||||
idxKey, true, false); err != nil {
|
||||
if err != utils.ErrNotFound {
|
||||
return
|
||||
}
|
||||
err = nil
|
||||
indexes[idxKey] = make(utils.StringSet) // create an empty index if is not found in DB in case we add them later
|
||||
continue
|
||||
}
|
||||
for idxKey, idx := range rcvIndx { // parse the received indexes
|
||||
indexes[idxKey] = idx
|
||||
}
|
||||
}
|
||||
}
|
||||
return indexes, nil
|
||||
}
|
||||
|
||||
// addItemToFilterIndex will add the itemID to the existing/created index and set it in the DataDB
|
||||
func addItemToFilterIndex(dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string) (err error) {
|
||||
tntCtx := tnt
|
||||
@@ -832,8 +840,7 @@ type IndexHealthArgs struct {
|
||||
}
|
||||
|
||||
type AccountActionPlanIHReply struct {
|
||||
MissingActionPlans []string // list of object that are referenced in indexes but are not found in the dataDB
|
||||
MissingAccountActionPlans map[string][]string // list of missing indexes for each object (the map has the key as the objectID and a list of indexes)
|
||||
MissingAccountActionPlans map[string][]string // list of missing indexes for each object (the map has the key as the indexKey and a list of objects)
|
||||
BrokenReferences map[string][]string // list of broken references (the map has the key as the objectID and a list of indexes)
|
||||
}
|
||||
|
||||
@@ -950,3 +957,153 @@ func GetAccountActionPlanIndexHealth(dm *DataManager, objLimit, indexLimit int,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func getFiltersAndContexts(dm *DataManager, indxType, tnt, id string) (filterIDs, contexts []string, err error) { // add contexts
|
||||
switch indxType {
|
||||
case utils.CacheResourceFilterIndexes:
|
||||
var rs *ResourceProfile
|
||||
if rs, err = dm.GetResourceProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
filterIDs = rs.FilterIDs
|
||||
case utils.CacheStatFilterIndexes:
|
||||
var st *StatQueueProfile
|
||||
if st, err = dm.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
filterIDs = st.FilterIDs
|
||||
case utils.CacheThresholdFilterIndexes:
|
||||
var th *ThresholdProfile
|
||||
if th, err = dm.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
filterIDs = th.FilterIDs
|
||||
case utils.CacheRouteFilterIndexes:
|
||||
var rt *RouteProfile
|
||||
if rt, err = dm.GetRouteProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
filterIDs = rt.FilterIDs
|
||||
case utils.CacheAttributeFilterIndexes:
|
||||
var at *AttributeProfile
|
||||
if at, err = dm.GetAttributeProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
filterIDs = at.FilterIDs
|
||||
contexts = at.Contexts
|
||||
case utils.CacheChargerFilterIndexes:
|
||||
var ch *ChargerProfile
|
||||
if ch, err = dm.GetChargerProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
filterIDs = ch.FilterIDs
|
||||
case utils.CacheDispatcherFilterIndexes:
|
||||
var ds *DispatcherProfile
|
||||
if ds, err = dm.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
filterIDs = ds.FilterIDs
|
||||
contexts = ds.Subsystems
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("unsuported index type:<%q>", indxType)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type FilterIHReply struct {
|
||||
MissingObjects []string // list of object that are referenced in indexes but are not found in the dataDB
|
||||
MissingIndexes map[string][]string // list of missing indexes for each object (the map has the key as the objectID and a list of indexes)
|
||||
MissingFilters map[string][]string // list of broken references (the map has the key as the filterID and a list of objectIDs)
|
||||
}
|
||||
|
||||
/*
|
||||
func GetFilterIndexHealth(dm *DataManager, indxType string,
|
||||
objLimit, indexLimit int, objTTL, indexTTL time.Duration,
|
||||
objStaticTTL, indexStaticTTL bool) (rply *FilterIHReply, err error) {
|
||||
// check the objects
|
||||
objPrfx := utils.CacheIndexesToPrefix[indxType]
|
||||
var ids []string
|
||||
if ids, err = dm.dataDB.GetKeysForPrefix(objPrfx); err != nil {
|
||||
return
|
||||
}
|
||||
for _, id := range ids {
|
||||
id = strings.TrimPrefix(id, objPrfx)
|
||||
tntID := utils.NewTenantID(id)
|
||||
var filterIDs, contexts []string
|
||||
if filterIDs, contexts, err = getFiltersAndContexts(dm, indxType, tntID.Tenant, tntID.ID); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if len(contexts) == 0 {
|
||||
if rply, err = updateFilterIH(dm, filterIDs, indxType, tntID.Tenant, tntID.Tenant, id, rply); err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
for _, ctx := range contexts {
|
||||
if rply, err = updateFilterIH(dm, filterIDs, indxType, tntID.Tenant, utils.ConcatenatedKey(tntID.Tenant, ctx), id, rply); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// check the indexes
|
||||
idxPrfx := utils.CacheInstanceToPrefix[indxType]
|
||||
var indexKeys []string
|
||||
if indexKeys, err = dm.dataDB.GetKeysForPrefix(idxPrfx); err != nil {
|
||||
return
|
||||
}
|
||||
for _, dataID := range indexKeys {
|
||||
dataID = strings.TrimPrefix(dataID, idxPrfx)
|
||||
|
||||
splt := utils.SplitConcatenatedKey(dataID) // tntCtx:filterType:fieldName:fieldVal
|
||||
lsplt := len(splt)
|
||||
if lsplt < 4 {
|
||||
err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", dataID)
|
||||
return
|
||||
}
|
||||
tnt := splt[0]
|
||||
var ctx *string
|
||||
if lsplt-3 == 2 {
|
||||
ctx = &splt[1]
|
||||
}
|
||||
tntCtx := utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems
|
||||
idxKey := utils.ConcatenatedKey(splt[lsplt-3:]...)
|
||||
|
||||
var indexes map[string]utils.StringSet
|
||||
if indexes, err = dm.GetIndexes(indxType, tntCtx, idxKey, true, false); err != nil {
|
||||
return
|
||||
}
|
||||
for idxKey, idx := range indexes {
|
||||
for itmID:=range idx{}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func updateFilterIH(dm *DataManager, filterIDs []string, indxType, tnt, tntCtx, itmID string, rply *FilterIHReply) (_ *FilterIHReply, err error) {
|
||||
if len(filterIDs) == 0 {
|
||||
|
||||
return rply, nil
|
||||
}
|
||||
for _, fltrID := range filterIDs {
|
||||
var fltr *Filter
|
||||
if fltr, err = dm.GetFilter(tnt, fltrID,
|
||||
true, false, utils.NonTransactional); err != nil {
|
||||
if err != utils.ErrNotFound {
|
||||
return
|
||||
}
|
||||
rply.MissingFilters[fltrID] = append(rply.MissingFilters[fltrID], itmID)
|
||||
}
|
||||
indexes := map[string]utils.StringSet{}
|
||||
if indexes, err = addFilterToIndexSet(dm, indxType, tntCtx, fltr, indexes); err != nil {
|
||||
return
|
||||
}
|
||||
for key, idx := range indexes {
|
||||
if !idx.Has(itmID) {
|
||||
rply.MissingIndexes[itmID] = append(rply.MissingIndexes[itmID], key)
|
||||
}
|
||||
}
|
||||
}
|
||||
return rply, nil
|
||||
}
|
||||
*/
|
||||
|
||||
@@ -44,9 +44,8 @@ func TestHealthAccountAction(t *testing.T) {
|
||||
}
|
||||
|
||||
exp := &AccountActionPlanIHReply{
|
||||
MissingActionPlans: []string{"AP1"},
|
||||
MissingAccountActionPlans: map[string][]string{"AP2": {"1002"}},
|
||||
BrokenReferences: map[string][]string{"AP2": {"1001"}},
|
||||
MissingAccountActionPlans: map[string][]string{"1002": {"AP2"}}, // 1
|
||||
BrokenReferences: map[string][]string{"AP2": {"1001"}, "AP1": nil}, // 2
|
||||
}
|
||||
if rply, err := GetAccountActionPlanIndexHealth(dm, -1, -1, -1, -1, false, false); err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
Reference in New Issue
Block a user