Added all index health APIs

This commit is contained in:
Trial97
2021-07-07 17:11:20 +03:00
committed by Dan Christian Bogos
parent 89c6fc81c9
commit 9fa3cd4b8c
9 changed files with 1220 additions and 212 deletions

View File

@@ -55,13 +55,13 @@ func (alS *AttributeService) Shutdown() {
}
// attributeProfileForEvent returns the matching attribute
func (alS *AttributeService) attributeProfileForEvent(apiCtx *context.Context, tnt string, attrsIDs []string,
func (alS *AttributeService) attributeProfileForEvent(ctx *context.Context, tnt string, attrsIDs []string,
evNm utils.MapStorage, lastID string) (matchAttrPrfl *AttributeProfile, err error) {
var attrIDs []string
if len(attrsIDs) != 0 {
attrIDs = attrsIDs
} else {
aPrflIDs, err := MatchingItemIDsForEvent(apiCtx, evNm,
aPrflIDs, err := MatchingItemIDsForEvent(ctx, evNm,
alS.cgrcfg.AttributeSCfg().StringIndexedFields,
alS.cgrcfg.AttributeSCfg().PrefixIndexedFields,
alS.cgrcfg.AttributeSCfg().SuffixIndexedFields,
@@ -75,14 +75,14 @@ func (alS *AttributeService) attributeProfileForEvent(apiCtx *context.Context, t
attrIDs = aPrflIDs.AsSlice()
}
for _, apID := range attrIDs {
aPrfl, err := alS.dm.GetAttributeProfile(apiCtx, tnt, apID, true, true, utils.NonTransactional)
aPrfl, err := alS.dm.GetAttributeProfile(ctx, tnt, apID, true, true, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
if pass, err := alS.filterS.Pass(apiCtx, tnt, aPrfl.FilterIDs,
if pass, err := alS.filterS.Pass(ctx, tnt, aPrfl.FilterIDs,
evNm); err != nil {
return nil, err
} else if !pass {

View File

@@ -1404,31 +1404,31 @@ func (dm *DataManager) SetAttributeProfile(ctx *context.Context, ap *AttributePr
return
}
func (dm *DataManager) RemoveAttributeProfile(apiCtx *context.Context, tenant, id string, withIndex bool) (err error) {
func (dm *DataManager) RemoveAttributeProfile(ctx *context.Context, tenant, id string, withIndex bool) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
oldAttr, err := dm.GetAttributeProfile(apiCtx, tenant, id, true, false, utils.NonTransactional)
oldAttr, err := dm.GetAttributeProfile(ctx, tenant, id, true, false, utils.NonTransactional)
if err != nil {
return err
}
if err = dm.DataDB().RemoveAttributeProfileDrv(apiCtx, tenant, id); err != nil {
if err = dm.DataDB().RemoveAttributeProfileDrv(ctx, tenant, id); err != nil {
return
}
if oldAttr == nil {
return utils.ErrNotFound
}
if withIndex {
if err = removeIndexFiltersItem(apiCtx, dm, utils.CacheAttributeFilterIndexes, tenant, id, oldAttr.FilterIDs); err != nil {
if err = removeIndexFiltersItem(ctx, dm, utils.CacheAttributeFilterIndexes, tenant, id, oldAttr.FilterIDs); err != nil {
return
}
if err = removeItemFromFilterIndex(apiCtx, dm, utils.CacheAttributeFilterIndexes,
if err = removeItemFromFilterIndex(ctx, dm, utils.CacheAttributeFilterIndexes,
tenant, utils.EmptyString, id, oldAttr.FilterIDs); err != nil {
return
}
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles]; itm.Replicate {
replicate(apiCtx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.AttributeProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveAttributeProfile,

View File

@@ -40,16 +40,16 @@ var (
// newFilterIndex will get the index from DataManager if is not found it will create it
// is used to update the mentioned index
func newFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string, newFlt *Filter) (indexes map[string]utils.StringSet, err error) {
tntCtx := tnt
if ctx != utils.EmptyString {
tntCtx = utils.ConcatenatedKey(tnt, ctx)
func newFilterIndex(ctx *context.Context, dm *DataManager, idxItmType, tnt, grp, itemID string, filterIDs []string, newFlt *Filter) (indexes map[string]utils.StringSet, err error) {
tntGrp := tnt
if grp != utils.EmptyString {
tntGrp = utils.ConcatenatedKey(tnt, grp)
}
indexes = make(map[string]utils.StringSet)
if len(filterIDs) == 0 { // in case of None
idxKey := utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny)
var rcvIndx map[string]utils.StringSet
if rcvIndx, err = dm.GetIndexes(apiCtx, idxItmType, tntCtx,
if rcvIndx, err = dm.GetIndexes(ctx, idxItmType, tntGrp,
idxKey,
true, false); err != nil {
if err != utils.ErrNotFound {
@@ -70,7 +70,7 @@ func newFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c
var fltr *Filter
if newFlt != nil && newFlt.Tenant == tnt && newFlt.ID == fltrID {
fltr = newFlt
} else if fltr, err = dm.GetFilter(apiCtx, tnt, fltrID,
} else if fltr, err = dm.GetFilter(ctx, tnt, fltrID,
true, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v",
@@ -102,7 +102,7 @@ func newFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c
}
var rcvIndx map[string]utils.StringSet
// only read from cache in case if we do not find the index to not cache the negative response
if rcvIndx, err = dm.GetIndexes(apiCtx, idxItmType, tntCtx,
if rcvIndx, err = dm.GetIndexes(ctx, idxItmType, tntGrp,
idxKey, true, false); err != nil {
if err != utils.ErrNotFound {
return
@@ -121,19 +121,19 @@ func newFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c
}
// addItemToFilterIndex will add the itemID to the existing/created index and set it in the DataDB
func addItemToFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string) (err error) {
tntCtx := tnt
if ctx != utils.EmptyString {
tntCtx = utils.ConcatenatedKey(tnt, ctx)
func addItemToFilterIndex(ctx *context.Context, dm *DataManager, idxItmType, tnt, grp, itemID string, filterIDs []string) (err error) {
tntGrp := tnt
if grp != utils.EmptyString {
tntGrp = utils.ConcatenatedKey(tnt, grp)
}
// early lock to be sure that until we do not write back the indexes
// another goroutine can't create new indexes
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx)
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntGrp)
defer guardian.Guardian.UnguardIDs(refID)
var indexes map[string]utils.StringSet
if indexes, err = newFilterIndex(apiCtx, dm, idxItmType, tnt, ctx, itemID, filterIDs, nil); err != nil {
if indexes, err = newFilterIndex(ctx, dm, idxItmType, tnt, grp, itemID, filterIDs, nil); err != nil {
return
}
// in case we have a profile with only non indexable filters(e.g. only *gt)
@@ -144,27 +144,27 @@ func addItemToFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType,
for indxKey, index := range indexes {
index.Add(itemID)
// remove from cache in order to corectly update the index
if err = Cache.Remove(apiCtx, idxItmType, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil {
if err = Cache.Remove(ctx, idxItmType, utils.ConcatenatedKey(tntGrp, indxKey), true, utils.NonTransactional); err != nil {
return
}
}
return dm.SetIndexes(apiCtx, idxItmType, tntCtx, indexes, true, utils.NonTransactional)
return dm.SetIndexes(ctx, idxItmType, tntGrp, indexes, true, utils.NonTransactional)
}
// removeItemFromFilterIndex will remove the itemID from the existing/created index and set it in the DataDB
func removeItemFromFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string) (err error) {
tntCtx := tnt
if ctx != utils.EmptyString {
tntCtx = utils.ConcatenatedKey(tnt, ctx)
func removeItemFromFilterIndex(ctx *context.Context, dm *DataManager, idxItmType, tnt, grp, itemID string, filterIDs []string) (err error) {
tntGrp := tnt
if grp != utils.EmptyString {
tntGrp = utils.ConcatenatedKey(tnt, grp)
}
// early lock to be sure that until we do not write back the indexes
// another goroutine can't create new indexes
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx)
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntGrp)
defer guardian.Guardian.UnguardIDs(refID)
var indexes map[string]utils.StringSet
if indexes, err = newFilterIndex(apiCtx, dm, idxItmType, tnt, ctx, itemID, filterIDs, nil); err != nil {
if indexes, err = newFilterIndex(ctx, dm, idxItmType, tnt, grp, itemID, filterIDs, nil); err != nil {
return
}
if len(indexes) == 0 { // in case we have a profile with only non indexable filters(e.g. only *gt)
@@ -176,32 +176,32 @@ func removeItemFromFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmT
indexes[idxKey] = nil // this will not be set in DB(handled by driver)
}
// remove from cache in order to corectly update the index
if err = Cache.Remove(apiCtx, idxItmType, utils.ConcatenatedKey(tntCtx, idxKey), true, utils.NonTransactional); err != nil {
if err = Cache.Remove(ctx, idxItmType, utils.ConcatenatedKey(tntGrp, idxKey), true, utils.NonTransactional); err != nil {
return
}
}
return dm.SetIndexes(apiCtx, idxItmType, tntCtx, indexes, true, utils.NonTransactional)
return dm.SetIndexes(ctx, idxItmType, tntGrp, indexes, true, utils.NonTransactional)
}
// updatedIndexes will compare the old filtersIDs with the new ones and only update the filters indexes that are added/removed
// idxItmType - the index object type(e.g.*attribute_filter_indexes, *rate_filter_indexes, *threshold_filter_indexes)
// tnt - the tenant of the object
// ctx - the rate profile id for rate from RateProfile(sub indexes); for all the rest the ctx is ""(AttributePrf and DispatcherPrf have a separate function)
// grp - the rate profile id for rate from RateProfile(sub indexes); for all the rest the grp is ""(AttributePrf and DispatcherPrf have a separate function)
// itemID - the object id
// oldFilterIds - the filtersIDs that the old object had; this is optional if the object did not exist
// newFilterIDs - the filtersIDs for the object that will be set
// useCtx - in case of subindexes(e.g. Rate from RateProfiles) need to add the ctx to the itemID when reverse filter indexes are set
// useGrp - in case of subindexes(e.g. Rate from RateProfiles) need to add the grp to the itemID when reverse filter indexes are set
// used when updating the filters
func updatedIndexes(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, ctx, itemID string, oldFilterIds *[]string, newFilterIDs []string, useCtx bool) (err error) {
itmCtx := itemID
if useCtx {
itmCtx = utils.ConcatenatedKey(itemID, ctx)
func updatedIndexes(ctx *context.Context, dm *DataManager, idxItmType, tnt, grp, itemID string, oldFilterIds *[]string, newFilterIDs []string, useGrp bool) (err error) {
itmGrp := itemID
if useGrp {
itmGrp = utils.ConcatenatedKey(itemID, grp)
}
if oldFilterIds == nil { // nothing to remove so just create the new indexes
if err = addIndexFiltersItem(apiCtx, dm, idxItmType, tnt, itmCtx, newFilterIDs); err != nil {
if err = addIndexFiltersItem(ctx, dm, idxItmType, tnt, itmGrp, newFilterIDs); err != nil {
return
}
return addItemToFilterIndex(apiCtx, dm, idxItmType, tnt, ctx, itemID, newFilterIDs)
return addItemToFilterIndex(ctx, dm, idxItmType, tnt, grp, itemID, newFilterIDs)
}
if len(*oldFilterIds) == 0 && len(newFilterIDs) == 0 { // nothing to update
return
@@ -229,10 +229,10 @@ func updatedIndexes(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c
if len(oldFilterIDs) != 0 || oldFltrs.Size() == 0 {
// has some indexes to remove or
// the old profile doesn't have filters but the new one has so remove the *none index
if err = removeIndexFiltersItem(apiCtx, dm, idxItmType, tnt, itmCtx, oldFilterIDs); err != nil {
if err = removeIndexFiltersItem(ctx, dm, idxItmType, tnt, itmGrp, oldFilterIDs); err != nil {
return
}
if err = removeItemFromFilterIndex(apiCtx, dm, idxItmType, tnt, ctx, itemID, oldFilterIDs); err != nil {
if err = removeItemFromFilterIndex(ctx, dm, idxItmType, tnt, grp, itemID, oldFilterIDs); err != nil {
return
}
}
@@ -240,10 +240,10 @@ func updatedIndexes(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c
if len(newFilterIDs) != 0 || newFltrs.Size() == 0 {
// has some indexes to add or
// the old profile has filters but the new one does not so add the *none index
if err = addIndexFiltersItem(apiCtx, dm, idxItmType, tnt, itmCtx, newFilterIDs); err != nil {
if err = addIndexFiltersItem(ctx, dm, idxItmType, tnt, itmGrp, newFilterIDs); err != nil {
return
}
if err = addItemToFilterIndex(apiCtx, dm, idxItmType, tnt, ctx, itemID, newFilterIDs); err != nil {
if err = addItemToFilterIndex(ctx, dm, idxItmType, tnt, grp, itemID, newFilterIDs); err != nil {
return
}
}
@@ -251,27 +251,27 @@ func updatedIndexes(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c
}
// splitFilterIndex splits the cache key so it can be used to recache the indexes
func splitFilterIndex(tntCtxIdxKey string) (tntCtx, idxKey string, err error) {
splt := utils.SplitConcatenatedKey(tntCtxIdxKey) // tntCtx:filterType:fieldName:fieldVal
func splitFilterIndex(tntGrpIdxKey string) (tntGrp, idxKey string, err error) {
splt := utils.SplitConcatenatedKey(tntGrpIdxKey) // tntCtx:filterType:fieldName:fieldVal
lsplt := len(splt)
if lsplt < 4 {
err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", tntCtxIdxKey)
err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", tntGrpIdxKey)
return
}
tntCtx = utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems
tntGrp = utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems
idxKey = utils.ConcatenatedKey(splt[lsplt-3:]...)
return
}
// ComputeIndexes gets the indexes from the DB and ensure that the items are indexed
// getFilters returns a list of filters IDs for the given profile id
func ComputeIndexes(cntxt *context.Context, dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string,
transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error), newFltr *Filter) (indexes utils.StringSet, err error) {
func ComputeIndexes(ctx *context.Context, dm *DataManager, tnt, grp, idxItmType string, IDs *[]string,
transactionID string, getFilters func(tnt, id, grp string) (*[]string, error), newFltr *Filter) (indexes utils.StringSet, err error) {
indexes = make(utils.StringSet)
var profilesIDs []string
if IDs == nil { // get all items
var ids []string
if ids, err = dm.DataDB().GetKeysForPrefix(cntxt, utils.CacheIndexesToPrefix[idxItmType]); err != nil {
if ids, err = dm.DataDB().GetKeysForPrefix(ctx, utils.CacheIndexesToPrefix[idxItmType]); err != nil {
return
}
for _, id := range ids {
@@ -280,34 +280,34 @@ func ComputeIndexes(cntxt *context.Context, dm *DataManager, tnt, ctx, idxItmTyp
} else {
profilesIDs = *IDs
}
tntCtx := tnt
if ctx != utils.EmptyString {
tntCtx = utils.ConcatenatedKey(tnt, ctx)
tntGrp := tnt
if grp != utils.EmptyString {
tntGrp = utils.ConcatenatedKey(tnt, grp)
}
// early lock to be sure that until we do not write back the indexes
// another goroutine can't create new indexes
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx)
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntGrp)
defer guardian.Guardian.UnguardIDs(refID)
for _, id := range profilesIDs {
var filterIDs *[]string
if filterIDs, err = getFilters(tnt, id, ctx); err != nil {
if filterIDs, err = getFilters(tnt, id, grp); err != nil {
return
}
if filterIDs == nil {
continue
}
var index map[string]utils.StringSet
if index, err = newFilterIndex(cntxt, dm, idxItmType,
tnt, ctx, id, *filterIDs, newFltr); err != nil {
if index, err = newFilterIndex(ctx, dm, idxItmType,
tnt, grp, id, *filterIDs, newFltr); err != nil {
return
}
// ensure that the item is in the index set
for key, idx := range index {
idx.Add(id)
indexes.Add(utils.ConcatenatedKey(tntCtx, key))
indexes.Add(utils.ConcatenatedKey(tntGrp, key))
}
if err = dm.SetIndexes(cntxt, idxItmType, tntCtx, index, cacheCommit(transactionID), transactionID); err != nil {
if err = dm.SetIndexes(ctx, idxItmType, tntGrp, index, cacheCommit(transactionID), transactionID); err != nil {
return
}
}
@@ -320,11 +320,11 @@ func addIndexFiltersItem(ctx *context.Context, dm *DataManager, idxItmType, tnt,
if strings.HasPrefix(ID, utils.Meta) { // skip inline
continue
}
tntCtx := utils.ConcatenatedKey(tnt, ID)
tntGrp := utils.ConcatenatedKey(tnt, ID)
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntCtx)
config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntGrp)
var indexes map[string]utils.StringSet
if indexes, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntCtx,
if indexes, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntGrp,
idxItmType, true, false); err != nil {
if err != utils.ErrNotFound {
guardian.Guardian.UnguardIDs(refID)
@@ -337,12 +337,12 @@ func addIndexFiltersItem(ctx *context.Context, dm *DataManager, idxItmType, tnt,
}
indexes[idxItmType].Add(itemID)
for indxKey := range indexes {
if err = Cache.Remove(ctx, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil {
if err = Cache.Remove(ctx, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntGrp, indxKey), true, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
}
if err = dm.SetIndexes(ctx, utils.CacheReverseFilterIndexes, tntCtx, indexes, true, utils.NonTransactional); err != nil {
if err = dm.SetIndexes(ctx, utils.CacheReverseFilterIndexes, tntGrp, indexes, true, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
@@ -357,11 +357,11 @@ func removeIndexFiltersItem(ctx *context.Context, dm *DataManager, idxItmType, t
if strings.HasPrefix(ID, utils.Meta) { // skip inline
continue
}
tntCtx := utils.ConcatenatedKey(tnt, ID)
tntGrp := utils.ConcatenatedKey(tnt, ID)
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntCtx)
config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntGrp)
var indexes map[string]utils.StringSet
if indexes, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntCtx,
if indexes, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntGrp,
idxItmType, true, false); err != nil {
guardian.Guardian.UnguardIDs(refID)
if err != utils.ErrNotFound {
@@ -373,12 +373,12 @@ func removeIndexFiltersItem(ctx *context.Context, dm *DataManager, idxItmType, t
indexes[idxItmType].Remove(itemID)
for indxKey := range indexes {
if err = Cache.Remove(ctx, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil {
if err = Cache.Remove(ctx, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntGrp, indxKey), true, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
}
if err = dm.SetIndexes(ctx, utils.CacheReverseFilterIndexes, tntCtx, indexes, true, utils.NonTransactional); err != nil {
if err = dm.SetIndexes(ctx, utils.CacheReverseFilterIndexes, tntGrp, indexes, true, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
}
@@ -390,7 +390,7 @@ func removeIndexFiltersItem(ctx *context.Context, dm *DataManager, idxItmType, t
// UpdateFilterIndex will update the indexes for the new Filter
// we do not care what is added
// exported for the migrator
func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt *Filter) (err error) {
func UpdateFilterIndex(ctx *context.Context, dm *DataManager, oldFlt, newFlt *Filter) (err error) {
if oldFlt == nil { // no filter before so no index to update
return // nothing to update
}
@@ -471,7 +471,7 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
defer guardian.Guardian.UnguardIDs(refID)
var rcvIndx map[string]utils.StringSet
// get all reverse indexes from DB
if rcvIndx, err = dm.GetIndexes(apiCtx, utils.CacheReverseFilterIndexes, tntID,
if rcvIndx, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntID,
utils.EmptyString, true, false); err != nil {
if err != utils.ErrNotFound {
return
@@ -485,14 +485,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
for idxItmType, indx := range rcvIndx {
switch idxItmType {
case utils.CacheThresholdFilterIndexes:
if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
idxSlice := indx.AsSlice()
if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) {
th, e := dm.GetThresholdProfile(apiCtx, tnt, id, true, false, utils.NonTransactional)
if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) {
th, e := dm.GetThresholdProfile(ctx, tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
@@ -505,14 +505,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
return utils.APIErrorHandler(err)
}
case utils.CacheStatFilterIndexes:
if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
idxSlice := indx.AsSlice()
if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) {
sq, e := dm.GetStatQueueProfile(apiCtx, tnt, id, true, false, utils.NonTransactional)
if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) {
sq, e := dm.GetStatQueueProfile(ctx, tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
@@ -525,14 +525,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
return utils.APIErrorHandler(err)
}
case utils.CacheResourceFilterIndexes:
if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
idxSlice := indx.AsSlice()
if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) {
rs, e := dm.GetResourceProfile(apiCtx, tnt, id, true, false, utils.NonTransactional)
if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) {
rs, e := dm.GetResourceProfile(ctx, tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
@@ -545,14 +545,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
return utils.APIErrorHandler(err)
}
case utils.CacheRouteFilterIndexes:
if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
idxSlice := indx.AsSlice()
if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) {
rt, e := dm.GetRouteProfile(apiCtx, tnt, id, true, false, utils.NonTransactional)
if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) {
rt, e := dm.GetRouteProfile(ctx, tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
@@ -565,14 +565,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
return utils.APIErrorHandler(err)
}
case utils.CacheChargerFilterIndexes:
if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
idxSlice := indx.AsSlice()
if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) {
ch, e := dm.GetChargerProfile(apiCtx, tnt, id, true, false, utils.NonTransactional)
if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) {
ch, e := dm.GetChargerProfile(ctx, tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
@@ -585,13 +585,13 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
return utils.APIErrorHandler(err)
}
case utils.CacheAccountsFilterIndexes:
if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter
if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
idxSlice := indx.AsSlice()
if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) {
if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) {
ap, e := dm.GetAccount(context.Background(), tnt, id)
if e != nil {
return nil, e
@@ -605,14 +605,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
return utils.APIErrorHandler(err)
}
case utils.CacheActionProfilesFilterIndexes:
if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter
if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
idxSlice := indx.AsSlice()
if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) {
acp, e := dm.GetActionProfile(apiCtx, tnt, id, true, false, utils.NonTransactional)
if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) {
acp, e := dm.GetActionProfile(ctx, tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
@@ -625,14 +625,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
return utils.APIErrorHandler(err)
}
case utils.CacheRateProfilesFilterIndexes:
if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter
if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
idxSlice := indx.AsSlice()
if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) {
rp, e := dm.GetRateProfile(apiCtx, tnt, id, true, false, utils.NonTransactional)
if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) {
rp, e := dm.GetRateProfile(ctx, tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
@@ -658,13 +658,13 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
itemIDs[idSplit[1]].Add(idSplit[0])
}
for rpID, ids := range itemIDs {
tntCtx := utils.ConcatenatedKey(newFlt.Tenant, rpID)
if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, tntCtx,
tntGrp := utils.ConcatenatedKey(newFlt.Tenant, rpID)
if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, tntGrp,
removeIndexKeys, ids); err != nil {
return
}
var rp *utils.RateProfile
if rp, err = dm.GetRateProfile(apiCtx, newFlt.Tenant, rpID, true, false, utils.NonTransactional); err != nil {
if rp, err = dm.GetRateProfile(ctx, newFlt.Tenant, rpID, true, false, utils.NonTransactional); err != nil {
return
}
for itemID := range ids {
@@ -673,9 +673,9 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
return utils.ErrNotFound
}
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx)
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntGrp)
var updIdx map[string]utils.StringSet
if updIdx, err = newFilterIndex(apiCtx, dm, idxItmType,
if updIdx, err = newFilterIndex(ctx, dm, idxItmType,
newFlt.Tenant, rpID, itemID, rate.FilterIDs, newFlt); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
@@ -683,7 +683,7 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
for _, idx := range updIdx {
idx.Add(itemID)
}
if err = dm.SetIndexes(apiCtx, idxItmType, tntCtx,
if err = dm.SetIndexes(ctx, idxItmType, tntGrp,
updIdx, false, utils.NonTransactional); err != nil {
guardian.Guardian.UnguardIDs(refID)
return
@@ -692,14 +692,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
}
}
case utils.CacheAttributeFilterIndexes:
if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
idxSlice := indx.AsSlice()
if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, _ string) (*[]string, error) {
ap, e := dm.GetAttributeProfile(apiCtx, tnt, id, true, false, utils.NonTransactional)
ap, e := dm.GetAttributeProfile(ctx, tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
@@ -712,14 +712,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt
return utils.APIErrorHandler(err)
}
case utils.CacheDispatcherFilterIndexes:
if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter
removeIndexKeys, indx); err != nil {
return
}
idxSlice := indx.AsSlice()
if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
&idxSlice, utils.NonTransactional, func(tnt, id, _ string) (*[]string, error) {
dp, e := dm.GetDispatcherProfile(apiCtx, tnt, id, true, false, utils.NonTransactional)
dp, e := dm.GetDispatcherProfile(ctx, tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}

688
engine/libindex_health.go Normal file
View File

@@ -0,0 +1,688 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"strings"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
type IndexHealthArgs struct {
IndexCacheLimit int
IndexCacheTTL time.Duration
IndexCacheStaticTTL bool
ObjectCacheLimit int
ObjectCacheTTL time.Duration
ObjectCacheStaticTTL bool
FilterCacheLimit int
FilterCacheTTL time.Duration
FilterCacheStaticTTL bool
}
type FilterIHReply struct {
MissingObjects []string // list of object that are referenced in indexes but are not found in the dataDB
MissingIndexes map[string][]string // list of missing indexes for each object (the map has the key as the objectID and a list of indexes)
BrokenIndexes map[string][]string // list of broken indexes for each object (the map has the key as the index and a list of objects)
MissingFilters map[string][]string // list of broken references (the map has the key as the filterID and a list of objectIDs)
}
type ReverseFilterIHReply struct {
MissingObjects []string // list of object that are referenced in indexes but are not found in the dataDB
MissingReverseIndexes map[string][]string // list of missing indexes for each object (the map has the key as the objectID and a list of indexes)
BrokenReverseIndexes map[string][]string // list of broken indexes for each object (the map has the key as the objectID and a list of indexes)
MissingFilters map[string][]string // list of broken references (the map has the key as the filterID and a list of objectIDs)
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// getFilters returns the filtreIDs and context(if any) for that object
func getFilters(ctx *context.Context, dm *DataManager, indxType, tnt, id string) (filterIDs []string, err error) { // add contexts
switch indxType {
case utils.CacheResourceFilterIndexes:
var rs *ResourceProfile
if rs, err = dm.GetResourceProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil {
return
}
filterIDs = rs.FilterIDs
case utils.CacheStatFilterIndexes:
var st *StatQueueProfile
if st, err = dm.GetStatQueueProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil {
return
}
filterIDs = st.FilterIDs
case utils.CacheThresholdFilterIndexes:
var th *ThresholdProfile
if th, err = dm.GetThresholdProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil {
return
}
filterIDs = th.FilterIDs
case utils.CacheRouteFilterIndexes:
var rt *RouteProfile
if rt, err = dm.GetRouteProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil {
return
}
filterIDs = rt.FilterIDs
case utils.CacheAttributeFilterIndexes:
var at *AttributeProfile
if at, err = dm.GetAttributeProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil {
return
}
filterIDs = at.FilterIDs
case utils.CacheChargerFilterIndexes:
var ch *ChargerProfile
if ch, err = dm.GetChargerProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil {
return
}
filterIDs = ch.FilterIDs
case utils.CacheDispatcherFilterIndexes:
var ds *DispatcherProfile
if ds, err = dm.GetDispatcherProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil {
return
}
filterIDs = ds.FilterIDs
case utils.CacheRateProfilesFilterIndexes:
var rp *utils.RateProfile
if rp, err = dm.GetRateProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil {
return
}
filterIDs = rp.FilterIDs
case utils.CacheActionProfilesFilterIndexes:
var ap *ActionProfile
if ap, err = dm.GetActionProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil {
return
}
filterIDs = ap.FilterIDs
case utils.CacheAccountsFilterIndexes:
var ac *utils.Account
if ac, err = dm.GetAccount(ctx, tnt, id); err != nil {
return
}
filterIDs = ac.FilterIDs
default:
return nil, fmt.Errorf("unsuported index type:<%q>", indxType)
}
if filterIDs == nil { // nil means ErrNotFound in cache
filterIDs = make([]string, 0)
}
return
}
// getIHObjFromCache returns all information that is needed from the mentioned object
// uses an extra cache(controled by the API) to optimize data management
func getIHObjFromCache(ctx *context.Context, dm *DataManager, objCache *ltcache.Cache, indxType, tnt, id string) (filtIDs []string, err error) {
cacheKey := utils.ConcatenatedKey(tnt, id)
if objVal, ok := objCache.Get(cacheKey); ok {
if objVal == nil {
return nil, utils.ErrNotFound
}
return objVal.([]string), nil
}
if filtIDs, err = getFilters(ctx, dm, indxType, tnt, id); err != nil {
if err == utils.ErrNotFound {
objCache.Set(cacheKey, nil, nil)
}
return
}
objCache.Set(cacheKey, filtIDs, nil)
return
}
// getIHFltrFromCache returns the Filter
// uses an extra cache(controled by the API) to optimize data management
func getIHFltrFromCache(ctx *context.Context, dm *DataManager, fltrCache *ltcache.Cache, tnt, id string) (fltr *Filter, err error) {
cacheKey := utils.ConcatenatedKey(tnt, id)
if fltrVal, ok := fltrCache.Get(cacheKey); ok {
if fltrVal == nil {
return nil, utils.ErrNotFound
}
return fltrVal.(*Filter), nil
}
if fltr, err = dm.GetFilter(ctx, tnt, id,
true, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
fltrCache.Set(cacheKey, nil, nil)
}
return
}
fltrCache.Set(cacheKey, fltr, nil)
return
}
// getIHFltrIdxFromCache returns the Filter index
// uses an extra cache(controled by the API) to optimize data management
func getIHFltrIdxFromCache(ctx *context.Context, dm *DataManager, fltrIdxCache *ltcache.Cache, idxItmType, tntGrp, idxKey string) (idx utils.StringSet, err error) {
cacheKey := utils.ConcatenatedKey(tntGrp, idxKey)
if fltrVal, ok := fltrIdxCache.Get(cacheKey); ok {
if fltrVal == nil {
return nil, utils.ErrNotFound
}
return fltrVal.(utils.StringSet), nil
}
var indexes map[string]utils.StringSet
if indexes, err = dm.GetIndexes(ctx, idxItmType, tntGrp, idxKey, true, false); err != nil {
if err == utils.ErrNotFound {
fltrIdxCache.Set(cacheKey, nil, nil)
}
return
}
idx = indexes[idxKey]
fltrIdxCache.Set(cacheKey, idx, nil)
return
}
// getFilterAsIndexSet will parse the rules of filter and add them to the index map
func getFilterAsIndexSet(ctx *context.Context, dm *DataManager, fltrIdxCache *ltcache.Cache, idxItmType, tntGrp string, fltr *Filter) (indexes map[string]utils.StringSet, err error) {
indexes = make(map[string]utils.StringSet)
for _, flt := range fltr.Rules {
if !FilterIndexTypes.Has(flt.Type) ||
IsDynamicDPPath(flt.Element) {
continue
}
isDyn := strings.HasPrefix(flt.Element, utils.DynamicDataPrefix)
for _, fldVal := range flt.Values {
if IsDynamicDPPath(fldVal) {
continue
}
var idxKey string
if isDyn {
if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) { // do not index if both the element and the value is dynamic
continue
}
idxKey = utils.ConcatenatedKey(flt.Type, flt.Element[1:], fldVal)
} else if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) {
idxKey = utils.ConcatenatedKey(flt.Type, fldVal[1:], flt.Element)
} else {
// do not index not dynamic filters
continue
}
var rcvIndx utils.StringSet
// only read from cache in case if we do not find the index to not cache the negative response
if rcvIndx, err = getIHFltrIdxFromCache(ctx, dm, fltrIdxCache, idxItmType, tntGrp, idxKey); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil
rcvIndx = make(utils.StringSet) // create an empty index if is not found in DB in case we add them later
}
indexes[idxKey] = rcvIndx
}
}
return indexes, nil
}
// updateFilterIHMisingIndx updates the reply with the missing indexes for a specific object( obj->filter->index relation)
func updateFilterIHMisingIndx(ctx *context.Context, dm *DataManager, fltrCache, fltrIdxCache *ltcache.Cache, filterIDs []string, indxType, tnt, tntGrp, itmID string, rply *FilterIHReply) (_ *FilterIHReply, err error) {
if len(filterIDs) == 0 {
idxKey := utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny)
var rcvIndx utils.StringSet
if rcvIndx, err = getIHFltrIdxFromCache(ctx, dm, nil, indxType, tntGrp, idxKey); err != nil {
if err != utils.ErrNotFound {
return
}
key := utils.ConcatenatedKey(tntGrp, idxKey)
rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
} else if !rcvIndx.Has(itmID) {
key := utils.ConcatenatedKey(tntGrp, idxKey)
rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
}
return rply, nil
}
for _, fltrID := range filterIDs {
var fltr *Filter
if fltr, err = getIHFltrFromCache(ctx, dm, fltrCache, tnt, fltrID); err != nil {
if err != utils.ErrNotFound {
return
}
fltrID = utils.ConcatenatedKey(tnt, fltrID)
rply.MissingFilters[fltrID] = append(rply.MissingFilters[fltrID], itmID)
continue
}
var indexes map[string]utils.StringSet
if indexes, err = getFilterAsIndexSet(ctx, dm, fltrIdxCache, indxType, tntGrp, fltr); err != nil {
return
}
for key, idx := range indexes {
if !idx.Has(itmID) {
key = utils.ConcatenatedKey(tntGrp, key)
rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
}
}
}
return rply, nil
}
// GetFltrIdxHealth returns the missing indexes for all objects
func GetFltrIdxHealth(ctx *context.Context, dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcache.Cache, indxType string) (rply *FilterIHReply, err error) {
// check the objects ( obj->filter->index relation)
rply = &FilterIHReply{
MissingIndexes: make(map[string][]string),
BrokenIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
}
objPrfx := utils.CacheIndexesToPrefix[indxType]
var ids []string
if ids, err = dm.dataDB.GetKeysForPrefix(ctx, objPrfx); err != nil {
return
}
for _, id := range ids {
id = strings.TrimPrefix(id, objPrfx)
tntID := utils.NewTenantID(id)
var filterIDs []string
if filterIDs, err = getIHObjFromCache(ctx, dm, objCache, indxType, tntID.Tenant, tntID.ID); err != nil {
return
}
if rply, err = updateFilterIHMisingIndx(ctx, dm, fltrCache, fltrIdxCache, filterIDs, indxType, tntID.Tenant, tntID.Tenant, tntID.ID, rply); err != nil {
return
}
}
// check the indexes( index->filter->obj relation)
idxPrfx := utils.CacheInstanceToPrefix[indxType]
var indexKeys []string
if indexKeys, err = dm.dataDB.GetKeysForPrefix(ctx, idxPrfx); err != nil {
return
}
for _, dataID := range indexKeys {
dataID = strings.TrimPrefix(dataID, idxPrfx)
splt := utils.SplitConcatenatedKey(dataID) // tntGrp:filterType:fieldName:fieldVal
lsplt := len(splt)
if lsplt < 4 {
err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", dataID)
return
}
tnt := utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems
idxKey := utils.ConcatenatedKey(splt[lsplt-3:]...)
var idx utils.StringSet
if idx, err = getIHFltrIdxFromCache(ctx, dm, fltrIdxCache, indxType, tnt, idxKey); err != nil {
return
}
for itmID := range idx {
var filterIDs []string
if filterIDs, err = getIHObjFromCache(ctx, dm, objCache, indxType, tnt, itmID); err != nil {
if err != utils.ErrNotFound {
return
}
rply.MissingObjects = append(rply.MissingObjects, utils.ConcatenatedKey(tnt, itmID))
err = nil
continue
}
if len(filterIDs) == 0 {
if utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny) != idxKey {
rply.BrokenIndexes[dataID] = append(rply.BrokenIndexes[dataID], itmID)
}
continue
}
var hasIndx bool
for _, fltrID := range filterIDs {
var fltr *Filter
if fltr, err = getIHFltrFromCache(ctx, dm, fltrCache, tnt, fltrID); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil // should be already logged when we parsed all the objects
continue
}
var indexes map[string]utils.StringSet
if indexes, err = getFilterAsIndexSet(ctx, dm, fltrIdxCache, indxType, tnt, fltr); err != nil {
return
}
idx, has := indexes[idxKey]
if hasIndx = has && idx.Has(itmID); hasIndx {
break
}
}
if !hasIndx {
key := utils.ConcatenatedKey(tnt, idxKey)
rply.BrokenIndexes[key] = append(rply.BrokenIndexes[key], itmID)
}
}
}
return
}
// GetRevFltrIdxHealth returns the missing reverse indexes for all objects
func getRevFltrIdxHealthFromObj(ctx *context.Context, dm *DataManager, fltrCache, revFltrIdxCache, objCache *ltcache.Cache, indxType string) (rply *ReverseFilterIHReply, err error) {
// check the objects ( obj->filter->index relation)
rply = &ReverseFilterIHReply{
MissingReverseIndexes: make(map[string][]string),
BrokenReverseIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
}
objPrfx := utils.CacheIndexesToPrefix[indxType]
var ids []string
if ids, err = dm.dataDB.GetKeysForPrefix(ctx, objPrfx); err != nil {
return
}
for _, id := range ids {
id = strings.TrimPrefix(id, objPrfx)
tntID := utils.NewTenantID(id)
var filterIDs []string
if filterIDs, err = getIHObjFromCache(ctx, dm, objCache, indxType, tntID.Tenant, tntID.ID); err != nil {
return
}
for _, fltrID := range filterIDs {
if strings.HasPrefix(fltrID, utils.Meta) {
continue
}
if _, err = getIHFltrFromCache(ctx, dm, fltrCache, tntID.Tenant, fltrID); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil
key := utils.ConcatenatedKey(tntID.Tenant, fltrID)
rply.MissingFilters[key] = append(rply.MissingFilters[key], tntID.ID)
continue
}
var revIdx utils.StringSet
if revIdx, err = getIHFltrIdxFromCache(ctx, dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntID.Tenant, fltrID), indxType); err != nil {
if err == utils.ErrNotFound {
rply.MissingReverseIndexes[id] = append(rply.MissingReverseIndexes[id], fltrID)
err = nil
continue
}
return
}
if !revIdx.Has(tntID.ID) {
rply.MissingReverseIndexes[id] = append(rply.MissingReverseIndexes[id], fltrID)
}
}
}
return
}
func getRevFltrIdxHealthFromReverse(ctx *context.Context, dm *DataManager, fltrCache, revFltrIdxCache *ltcache.Cache, objCaches map[string]*ltcache.Cache, rply map[string]*ReverseFilterIHReply) (_ map[string]*ReverseFilterIHReply, err error) {
var revIndexKeys []string
if revIndexKeys, err = dm.dataDB.GetKeysForPrefix(ctx, utils.FilterIndexPrfx); err != nil {
return
}
for _, revIdxKey := range revIndexKeys {
revIdxKey = strings.TrimPrefix(revIdxKey, utils.FilterIndexPrfx)
revIDxSplit := strings.SplitN(revIdxKey, utils.ConcatenatedKeySep, 3)
tnt, fltrID, indxType := revIDxSplit[0], revIDxSplit[1], revIDxSplit[2]
revIdxKey = utils.ConcatenatedKey(tnt, fltrID)
objCache := objCaches[indxType]
if _, has := rply[indxType]; !has {
rply[indxType] = &ReverseFilterIHReply{
MissingReverseIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
BrokenReverseIndexes: make(map[string][]string),
}
}
var revIdx utils.StringSet
if revIdx, err = getIHFltrIdxFromCache(ctx, dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, revIdxKey, indxType); err != nil {
return
}
for id := range revIdx {
var filterIDs []string
if indxType == utils.CacheRateFilterIndexes {
spl := strings.SplitN(id, utils.ConcatenatedKeySep, 2)
rateID := spl[0]
rprfID := spl[1]
var rates map[string]*utils.Rate
if rates, err = getRatesFromCache(ctx, dm, objCache, tnt, rprfID); err != nil {
if err != utils.ErrNotFound {
return
}
rply[indxType].MissingObjects = append(rply[indxType].MissingObjects, utils.ConcatenatedKey(tnt, id))
err = nil
continue
}
if rate, has := rates[rateID]; !has {
rply[indxType].MissingObjects = append(rply[indxType].MissingObjects, utils.ConcatenatedKey(tnt, id))
continue
} else {
filterIDs = rate.FilterIDs
}
} else if filterIDs, err = getIHObjFromCache(ctx, dm, objCache, indxType, tnt, id); err != nil {
if err == utils.ErrNotFound {
rply[indxType].MissingObjects = append(rply[indxType].MissingObjects, utils.ConcatenatedKey(tnt, id))
err = nil
continue
}
return
}
if !utils.IsSliceMember(filterIDs, fltrID) {
key := utils.ConcatenatedKey(tnt, id)
rply[indxType].BrokenReverseIndexes[key] = append(rply[indxType].BrokenReverseIndexes[key], fltrID)
}
}
}
return rply, nil
}
func GetRevFltrIdxHealth(ctx *context.Context, dm *DataManager, fltrCache, revFltrIdxCache *ltcache.Cache, objCaches map[string]*ltcache.Cache) (rply map[string]*ReverseFilterIHReply, err error) {
rply = make(map[string]*ReverseFilterIHReply)
for indxType := range utils.CacheIndexesToPrefix {
if indxType == utils.CacheReverseFilterIndexes {
continue
}
if rply[indxType], err = getRevFltrIdxHealthFromObj(ctx, dm, fltrCache, revFltrIdxCache, objCaches[indxType], indxType); err != nil {
return
}
}
if rply[utils.CacheRateFilterIndexes], err = getRevFltrIdxHealthFromRateRates(ctx, dm, fltrCache, revFltrIdxCache, objCaches[utils.CacheRateFilterIndexes]); err != nil {
return
}
rply, err = getRevFltrIdxHealthFromReverse(ctx, dm, fltrCache, revFltrIdxCache, objCaches, rply)
for k, v := range rply { // should be a safe for (even on rply==nil)
if len(v.MissingFilters) == 0 &&
len(v.MissingObjects) == 0 &&
len(v.BrokenReverseIndexes) == 0 &&
len(v.MissingReverseIndexes) == 0 {
delete(rply, k)
}
}
return
}
// getRatesFromCache returns all rates from rateprofile
// uses an extra cache(controled by the API) to optimize data management
func getRatesFromCache(ctx *context.Context, dm *DataManager, objCache *ltcache.Cache, tnt, rprfID string) (_ map[string]*utils.Rate, err error) {
cacheKey := utils.ConcatenatedKey(tnt, rprfID)
if objVal, ok := objCache.Get(cacheKey); ok {
if objVal == nil {
return nil, utils.ErrNotFound
}
rprf := objVal.(*utils.RateProfile)
return rprf.Rates, nil
}
var rprf *utils.RateProfile
if rprf, err = dm.GetRateProfile(ctx, tnt, rprfID, true, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
objCache.Set(cacheKey, nil, nil)
}
return
}
objCache.Set(cacheKey, rprf, nil)
return rprf.Rates, nil
}
// GetFltrIdxHealth returns the missing indexes for all objects
func GetFltrIdxHealthForRateRates(ctx *context.Context, dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcache.Cache) (rply *FilterIHReply, err error) {
// check the objects ( obj->filter->index relation)
rply = &FilterIHReply{
MissingIndexes: make(map[string][]string),
BrokenIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
}
var ids []string
if ids, err = dm.dataDB.GetKeysForPrefix(ctx, utils.RateProfilePrefix); err != nil {
return
}
for _, id := range ids {
id = strings.TrimPrefix(id, utils.RateProfilePrefix)
tntID := utils.NewTenantID(id)
var rates map[string]*utils.Rate
if rates, err = getRatesFromCache(ctx, dm, objCache, tntID.Tenant, tntID.ID); err != nil {
return
}
for rtID, rate := range rates {
if rply, err = updateFilterIHMisingIndx(ctx, dm, fltrCache, fltrIdxCache, rate.FilterIDs, utils.CacheRateFilterIndexes, tntID.Tenant, utils.ConcatenatedKey(tntID.Tenant, tntID.ID), rtID, rply); err != nil {
return
}
}
}
// check the indexes( index->filter->obj relation)
var indexKeys []string
if indexKeys, err = dm.dataDB.GetKeysForPrefix(ctx, utils.RateFilterIndexPrfx); err != nil {
return
}
for _, dataID := range indexKeys {
dataID = strings.TrimPrefix(dataID, utils.RateFilterIndexPrfx)
splt := utils.SplitConcatenatedKey(dataID) // tntGrp:filterType:fieldName:fieldVal
lsplt := len(splt)
if lsplt < 4 {
err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", dataID)
return
}
tnt := splt[0]
rpID := splt[1]
tntGrp := utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems
idxKey := utils.ConcatenatedKey(splt[lsplt-3:]...)
var idx utils.StringSet
if idx, err = getIHFltrIdxFromCache(ctx, dm, fltrIdxCache, utils.CacheRateFilterIndexes, tntGrp, idxKey); err != nil {
return
}
for itmID := range idx {
var rates map[string]*utils.Rate
if rates, err = getRatesFromCache(ctx, dm, objCache, tnt, rpID); err != nil {
if err != utils.ErrNotFound {
return
}
rply.MissingObjects = append(rply.MissingObjects, utils.ConcatenatedKey(tntGrp, itmID))
err = nil
continue
}
var filterIDs []string
if rate, has := rates[itmID]; !has {
rply.MissingObjects = append(rply.MissingObjects, utils.ConcatenatedKey(tntGrp, itmID))
continue
} else {
filterIDs = rate.FilterIDs
}
if len(filterIDs) == 0 {
if utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny) != idxKey {
rply.BrokenIndexes[dataID] = append(rply.BrokenIndexes[dataID], itmID)
}
continue
}
var hasIndx bool
for _, fltrID := range filterIDs {
var fltr *Filter
if fltr, err = getIHFltrFromCache(ctx, dm, fltrCache, tnt, fltrID); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil // should be already logged when we parsed all the objects
continue
}
var indexes map[string]utils.StringSet
if indexes, err = getFilterAsIndexSet(ctx, dm, fltrIdxCache, utils.CacheRateFilterIndexes, tntGrp, fltr); err != nil {
return
}
idx, has := indexes[idxKey]
if hasIndx = has && idx.Has(itmID); hasIndx {
break
}
}
if !hasIndx {
key := utils.ConcatenatedKey(tnt, idxKey)
rply.BrokenIndexes[key] = append(rply.BrokenIndexes[key], itmID)
}
}
}
return
}
func getRevFltrIdxHealthFromRateRates(ctx *context.Context, dm *DataManager, fltrCache, revFltrIdxCache, objCache *ltcache.Cache) (rply *ReverseFilterIHReply, err error) {
// check the objects ( obj->filter->index relation)
rply = &ReverseFilterIHReply{
MissingReverseIndexes: make(map[string][]string),
BrokenReverseIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
}
var ids []string
if ids, err = dm.dataDB.GetKeysForPrefix(ctx, utils.RateProfilePrefix); err != nil {
return
}
for _, id := range ids {
id = strings.TrimPrefix(id, utils.RateProfilePrefix)
tntID := utils.NewTenantID(id)
var rates map[string]*utils.Rate
if rates, err = getRatesFromCache(ctx, dm, objCache, tntID.Tenant, tntID.ID); err != nil {
return
}
for rtID, rate := range rates {
itmID := utils.ConcatenatedKey(rtID, tntID.ID)
itmIDWithTnt := utils.ConcatenatedKey(id, rtID)
for _, fltrID := range rate.FilterIDs {
if strings.HasPrefix(fltrID, utils.Meta) {
continue
}
if _, err = getIHFltrFromCache(ctx, dm, fltrCache, tntID.Tenant, fltrID); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil
key := utils.ConcatenatedKey(tntID.Tenant, fltrID)
rply.MissingFilters[key] = append(rply.MissingFilters[key], itmID)
continue
}
var revIdx utils.StringSet
if revIdx, err = getIHFltrIdxFromCache(ctx, dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntID.Tenant, fltrID), utils.CacheRateFilterIndexes); err != nil {
if err == utils.ErrNotFound {
rply.MissingReverseIndexes[itmIDWithTnt] = append(rply.MissingReverseIndexes[itmIDWithTnt], fltrID)
err = nil
continue
}
return
}
if !revIdx.Has(itmID) {
rply.MissingReverseIndexes[itmIDWithTnt] = append(rply.MissingReverseIndexes[itmIDWithTnt], fltrID)
}
}
}
}
return
}

View File

@@ -0,0 +1,148 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"reflect"
"testing"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
func TestHealthFilter(t *testing.T) {
Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
db := NewInternalDB(nil, nil, true)
dm := NewDataManager(db, cfg.CacheCfg(), nil)
if err := dm.SetAttributeProfile(context.Background(), &AttributeProfile{
Tenant: "cgrates.org",
ID: "ATTR1",
FilterIDs: []string{"*string:~*req.Account:1001", "Fltr1"},
}, false); err != nil {
t.Fatal(err)
}
if err := dm.SetIndexes(context.Background(), utils.CacheAttributeFilterIndexes, "cgrates.org",
map[string]utils.StringSet{"*string:*req.Account:1002": {"ATTR1": {}, "ATTR2": {}}},
true, utils.NonTransactional); err != nil {
t.Fatal(err)
}
exp := &FilterIHReply{
MissingIndexes: map[string][]string{
"cgrates.org:*string:*req.Account:1001": {"ATTR1"},
},
BrokenIndexes: map[string][]string{
"cgrates.org:*string:*req.Account:1002": {"ATTR1"},
},
MissingFilters: map[string][]string{
"cgrates.org:Fltr1": {"ATTR1"},
},
MissingObjects: []string{"cgrates.org:ATTR2"},
}
if rply, err := GetFltrIdxHealth(context.Background(), dm,
ltcache.NewCache(-1, 0, false, nil),
ltcache.NewCache(-1, 0, false, nil),
ltcache.NewCache(-1, 0, false, nil),
utils.CacheAttributeFilterIndexes); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
}
}
func TestHealthReverseFilter(t *testing.T) {
Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
db := NewInternalDB(nil, nil, true)
dm := NewDataManager(db, cfg.CacheCfg(), nil)
if err := dm.SetAttributeProfile(context.Background(), &AttributeProfile{
Tenant: "cgrates.org",
ID: "ATTR1",
FilterIDs: []string{"*string:~*req.Account:1001", "Fltr1", "Fltr3"},
}, false); err != nil {
t.Fatal(err)
}
if err := dm.SetFilter(context.Background(), &Filter{
Tenant: "cgrates.org",
ID: "Fltr3",
}, false); err != nil {
t.Fatal(err)
}
if err := dm.SetIndexes(context.Background(), utils.CacheReverseFilterIndexes, "cgrates.org:Fltr2",
map[string]utils.StringSet{utils.CacheAttributeFilterIndexes: {"ATTR1": {}, "ATTR2": {}}},
true, utils.NonTransactional); err != nil {
t.Fatal(err)
}
if err := dm.SetRateProfile(context.Background(), &utils.RateProfile{
Tenant: "cgrates.org",
ID: "RP1",
Rates: map[string]*utils.Rate{
"RT1": {
ID: "RT1",
FilterIDs: []string{"Fltr3"},
},
},
}, false); err != nil {
t.Fatal(err)
}
exp := map[string]*ReverseFilterIHReply{
utils.CacheAttributeFilterIndexes: {
MissingReverseIndexes: map[string][]string{
"cgrates.org:ATTR1": {"Fltr3"},
},
MissingFilters: map[string][]string{
"cgrates.org:Fltr1": {"ATTR1"},
},
BrokenReverseIndexes: map[string][]string{
"cgrates.org:ATTR1": {"Fltr2"},
},
MissingObjects: []string{"cgrates.org:ATTR2"},
},
utils.CacheRateFilterIndexes: {
MissingReverseIndexes: map[string][]string{
"cgrates.org:RP1:RT1": {"Fltr3"},
},
BrokenReverseIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
},
}
objCaches := make(map[string]*ltcache.Cache)
for indxType := range utils.CacheIndexesToPrefix {
objCaches[indxType] = ltcache.NewCache(-1, 0, false, nil)
}
objCaches[utils.CacheRateFilterIndexes] = ltcache.NewCache(-1, 0, false, nil)
if rply, err := GetRevFltrIdxHealth(context.Background(), dm,
ltcache.NewCache(-1, 0, false, nil),
ltcache.NewCache(-1, 0, false, nil),
objCaches); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expecting: %+v,\n received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
}
}