diff --git a/apier/v1/filter_indexes.go b/apier/v1/filter_indexes.go
index 475631c44..4ca39f6a4 100644
--- a/apier/v1/filter_indexes.go
+++ b/apier/v1/filter_indexes.go
@@ -24,6 +24,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/ltcache"
)
type AttrGetFilterIndexes struct {
@@ -925,3 +926,123 @@ func (api *APIerSv1) computeDispatcherIndexes(tenant, context string, dspIDs *[]
}
return dspIndexes, nil
}
+
+func (apierSv1 *APIerSv1) GetAccountActionPlansIndexHealth(args *engine.IndexHealthArgsWith2Ch, reply *engine.AccountActionPlanIHReply) error {
+ rp, err := engine.GetAccountActionPlansIndexHealth(apierSv1.DataManager, args.ObjectCacheLimit, args.IndexCacheLimit,
+ args.ObjectCacheTTL, args.IndexCacheTTL,
+ args.ObjectCacheStaticTTL, args.IndexCacheStaticTTL)
+ if err != nil {
+ return err
+ }
+ *reply = *rp
+ return nil
+}
+
+func (apierSv1 *APIerSv1) GetReverseDestinationsIndexHealth(args *engine.IndexHealthArgsWith2Ch, reply *engine.ReverseDestinationsIHReply) error {
+ rp, err := engine.GetReverseDestinationsIndexHealth(apierSv1.DataManager, args.ObjectCacheLimit, args.IndexCacheLimit,
+ args.ObjectCacheTTL, args.IndexCacheTTL,
+ args.ObjectCacheStaticTTL, args.IndexCacheStaticTTL)
+ if err != nil {
+ return err
+ }
+ *reply = *rp
+ return nil
+}
+
+func (apierSv1 *APIerSv1) GetThresholdsIndexesHealth(args *engine.IndexHealthArgsWith3Ch, reply *engine.FilterIHReply) error {
+ rp, err := engine.GetFltrIdxHealth(apierSv1.DataManager,
+ ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
+ ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
+ ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
+ utils.CacheThresholdFilterIndexes,
+ )
+ if err != nil {
+ return err
+ }
+ *reply = *rp
+ return nil
+}
+
+func (apierSv1 *APIerSv1) GetResourcesIndexesHealth(args *engine.IndexHealthArgsWith3Ch, reply *engine.FilterIHReply) error {
+ rp, err := engine.GetFltrIdxHealth(apierSv1.DataManager,
+ ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
+ ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
+ ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
+ utils.CacheResourceFilterIndexes,
+ )
+ if err != nil {
+ return err
+ }
+ *reply = *rp
+ return nil
+}
+
+func (apierSv1 *APIerSv1) GetStatsIndexesHealth(args *engine.IndexHealthArgsWith3Ch, reply *engine.FilterIHReply) error {
+ rp, err := engine.GetFltrIdxHealth(apierSv1.DataManager,
+ ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
+ ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
+ ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
+ utils.CacheStatFilterIndexes,
+ )
+ if err != nil {
+ return err
+ }
+ *reply = *rp
+ return nil
+}
+
+func (apierSv1 *APIerSv1) GetSuppliersIndexesHealth(args *engine.IndexHealthArgsWith3Ch, reply *engine.FilterIHReply) error {
+ rp, err := engine.GetFltrIdxHealth(apierSv1.DataManager,
+ ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
+ ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
+ ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
+ utils.CacheSupplierFilterIndexes,
+ )
+ if err != nil {
+ return err
+ }
+ *reply = *rp
+ return nil
+}
+
+func (apierSv1 *APIerSv1) GetAttributesIndexesHealth(args *engine.IndexHealthArgsWith3Ch, reply *engine.FilterIHReply) error {
+ rp, err := engine.GetFltrIdxHealth(apierSv1.DataManager,
+ ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
+ ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
+ ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
+ utils.CacheAttributeFilterIndexes,
+ )
+ if err != nil {
+ return err
+ }
+ *reply = *rp
+ return nil
+}
+
+func (apierSv1 *APIerSv1) GetChargersIndexesHealth(args *engine.IndexHealthArgsWith3Ch, reply *engine.FilterIHReply) error {
+ rp, err := engine.GetFltrIdxHealth(apierSv1.DataManager,
+ ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
+ ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
+ ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
+ utils.CacheChargerFilterIndexes,
+ )
+ if err != nil {
+ return err
+ }
+ *reply = *rp
+ return nil
+}
+
+func (apierSv1 *APIerSv1) GetDispatchersIndexesHealth(args *engine.IndexHealthArgsWith3Ch, reply *engine.FilterIHReply) error {
+ rp, err := engine.GetFltrIdxHealth(apierSv1.DataManager,
+ ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
+ ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
+ ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
+ utils.CacheDispatcherFilterIndexes,
+ )
+ if err != nil {
+ return err
+ }
+ *reply = *rp
+ return nil
+}
diff --git a/engine/libindex_health.go b/engine/libindex_health.go
new file mode 100644
index 000000000..eefa4ba50
--- /dev/null
+++ b/engine/libindex_health.go
@@ -0,0 +1,606 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package engine
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/ltcache"
+)
+
+type IndexHealthArgsWith2Ch struct {
+ IndexCacheLimit int
+ IndexCacheTTL time.Duration
+ IndexCacheStaticTTL bool
+
+ ObjectCacheLimit int
+ ObjectCacheTTL time.Duration
+ ObjectCacheStaticTTL bool
+}
+
+type IndexHealthArgsWith3Ch struct {
+ IndexCacheLimit int
+ IndexCacheTTL time.Duration
+ IndexCacheStaticTTL bool
+
+ ObjectCacheLimit int
+ ObjectCacheTTL time.Duration
+ ObjectCacheStaticTTL bool
+
+ FilterCacheLimit int
+ FilterCacheTTL time.Duration
+ FilterCacheStaticTTL bool
+}
+
+type AccountActionPlanIHReply struct {
+ MissingAccountActionPlans map[string][]string // list of missing indexes for each object (the map has the key as the indexKey and a list of objects)
+ BrokenReferences map[string][]string // list of broken references (the map has the key as the objectID and a list of indexes)
+}
+
+// add cache in args API
+func GetAccountActionPlansIndexHealth(dm *DataManager, objLimit, indexLimit int, objTTL, indexTTL time.Duration, objStaticTTL, indexStaticTTL bool) (rply *AccountActionPlanIHReply, err error) {
+ // posible errors
+ brokenRef := map[string][]string{} // the actionPlans match the index but they are missing the account // broken reference
+ missingIndex := map[string][]string{} // the indexes are not present but the action plans points to that account // misingAccounts
+
+ // local cache
+ indexesCache := ltcache.NewCache(objLimit, objTTL, objStaticTTL, nil)
+ objectsCache := ltcache.NewCache(indexLimit, indexTTL, indexStaticTTL, nil)
+
+ getCachedIndex := func(acntID string) (apIDs []string, err error) {
+ if x, ok := indexesCache.Get(acntID); ok {
+ if x == nil {
+ return nil, utils.ErrNotFound
+ }
+ return x.([]string), nil
+ }
+ if apIDs, err = dm.GetAccountActionPlans(acntID, true, false, utils.NonTransactional); err != nil { // read from cache but do not write if not there
+ if err == utils.ErrNotFound {
+ indexesCache.Set(acntID, nil, nil)
+ }
+ return
+ }
+ indexesCache.Set(acntID, apIDs, nil)
+ return
+ }
+
+ getCachedObject := func(apID string) (obj *ActionPlan, err error) {
+ if x, ok := objectsCache.Get(apID); ok {
+ if x == nil {
+ return nil, utils.ErrNotFound
+ }
+ return x.(*ActionPlan), nil
+ }
+ if obj, err = dm.GetActionPlan(apID, true, false, utils.NonTransactional); err != nil { // read from cache but do not write if not there
+ if err == utils.ErrNotFound {
+ objectsCache.Set(apID, nil, nil)
+ }
+ return
+ }
+ objectsCache.Set(apID, obj, nil)
+ return
+ }
+
+ var acntIDs []string // start with the indexes and check the references
+ if acntIDs, err = dm.DataDB().GetKeysForPrefix(utils.AccountActionPlansPrefix); err != nil {
+ err = fmt.Errorf("error <%s> querying keys for accountActionPlans", err.Error())
+ return
+ }
+
+ for _, acntID := range acntIDs {
+ acntID = strings.TrimPrefix(acntID, utils.AccountActionPlansPrefix) //
+ var apIDs []string
+ if apIDs, err = getCachedIndex(acntID); err != nil { // read from cache but do not write if not there
+ err = fmt.Errorf("error <%s> querying the accountActionPlan: <%v>", err.Error(), acntID)
+ return
+ }
+ for _, apID := range apIDs {
+ var ap *ActionPlan
+ if ap, err = getCachedObject(apID); err != nil {
+ if err != utils.ErrNotFound {
+ err = fmt.Errorf("error <%s> querying the actionPlan: <%v>", err.Error(), apID)
+ return
+ }
+ err = nil
+ brokenRef[apID] = nil
+ continue
+
+ }
+ if !ap.AccountIDs.HasKey(acntID) { // the action plan exists but doesn't point towards the account we have index
+ brokenRef[apID] = append(brokenRef[apID], acntID)
+ }
+ }
+ }
+
+ var apIDs []string // we have all the indexes in cache now do a reverse check
+ if apIDs, err = dm.DataDB().GetKeysForPrefix(utils.ACTION_PLAN_PREFIX); err != nil {
+ err = fmt.Errorf("error <%s> querying keys for actionPlans", err.Error())
+ return
+ }
+
+ for _, apID := range apIDs {
+ apID = strings.TrimPrefix(apID, utils.ACTION_PLAN_PREFIX) //
+ var ap *ActionPlan
+ if ap, err = getCachedObject(apID); err != nil {
+ err = fmt.Errorf("error <%s> querying the actionPlan: <%v>", err.Error(), apID)
+ return
+ }
+ for acntID := range ap.AccountIDs {
+ var ids []string
+ if ids, err = getCachedIndex(acntID); err != nil { // read from cache but do not write if not there
+ if err != utils.ErrNotFound {
+ err = fmt.Errorf("error <%s> querying the accountActionPlan: <%v>", err.Error(), acntID)
+ return
+ }
+ err = nil
+ missingIndex[acntID] = append(missingIndex[acntID], apID)
+ continue
+ }
+ if !utils.IsSliceMember(ids, apID) { // the index doesn't exits for this actionPlan
+ missingIndex[acntID] = append(missingIndex[acntID], apID)
+ }
+ }
+ }
+
+ rply = &AccountActionPlanIHReply{
+ MissingAccountActionPlans: missingIndex,
+ BrokenReferences: brokenRef,
+ }
+ return
+}
+
+type ReverseDestinationsIHReply struct {
+ MissingReverseDestinations map[string][]string // list of missing indexes for each object (the map has the key as the indexKey and a list of objects)
+ BrokenReferences map[string][]string // list of broken references (the map has the key as the objectID and a list of indexes)
+}
+
+// add cache in args API
+func GetReverseDestinationsIndexHealth(dm *DataManager, objLimit, indexLimit int, objTTL, indexTTL time.Duration, objStaticTTL, indexStaticTTL bool) (rply *ReverseDestinationsIHReply, err error) {
+ // posible errors
+ brokenRef := map[string][]string{} // the actionPlans match the index but they are missing the account // broken reference
+ missingIndex := map[string][]string{} // the indexes are not present but the action plans points to that account // misingAccounts
+
+ // local cache
+ indexesCache := ltcache.NewCache(objLimit, objTTL, objStaticTTL, nil)
+ objectsCache := ltcache.NewCache(indexLimit, indexTTL, indexStaticTTL, nil)
+
+ getCachedIndex := func(prefix string) (dstIDs []string, err error) {
+ if x, ok := indexesCache.Get(prefix); ok {
+ if x == nil {
+ return nil, utils.ErrNotFound
+ }
+ return x.([]string), nil
+ }
+ if dstIDs, err = dm.GetReverseDestination(prefix, true, utils.NonTransactional); err != nil { // read from cache but do not write if not there
+ if err == utils.ErrNotFound {
+ indexesCache.Set(prefix, nil, nil)
+ }
+ return
+ }
+ indexesCache.Set(prefix, dstIDs, nil)
+ return
+ }
+
+ getCachedObject := func(dstID string) (obj *Destination, err error) {
+ if x, ok := objectsCache.Get(dstID); ok {
+ if x == nil {
+ return nil, utils.ErrNotFound
+ }
+ return x.(*Destination), nil
+ }
+ if obj, err = dm.GetDestination(dstID, true, utils.NonTransactional); err != nil { // read from cache but do not write if not there
+ if err == utils.ErrNotFound {
+ objectsCache.Set(dstID, nil, nil)
+ }
+ return
+ }
+ objectsCache.Set(dstID, obj, nil)
+ return
+ }
+
+ var prefixes []string // start with the indexes and check the references
+ if prefixes, err = dm.DataDB().GetKeysForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil {
+ err = fmt.Errorf("error <%s> querying keys for reverseDestinations", err.Error())
+ return
+ }
+
+ for _, prefix := range prefixes {
+ prefix = strings.TrimPrefix(prefix, utils.REVERSE_DESTINATION_PREFIX) //
+ var dstIDs []string
+ if dstIDs, err = getCachedIndex(prefix); err != nil { // read from cache but do not write if not there
+ err = fmt.Errorf("error <%s> querying the reverseDestination: <%v>", err.Error(), prefix)
+ return
+ }
+ for _, dstID := range dstIDs {
+ var dst *Destination
+ if dst, err = getCachedObject(dstID); err != nil {
+ if err != utils.ErrNotFound {
+ err = fmt.Errorf("error <%s> querying the destination: <%v>", err.Error(), dstID)
+ return
+ }
+ err = nil
+ brokenRef[dstID] = nil
+ continue
+ }
+ if !utils.IsSliceMember(dst.Prefixes, prefix) { // the action plan exists but doesn't point towards the account we have index
+ brokenRef[dstID] = append(brokenRef[dstID], prefix)
+ }
+ }
+ }
+
+ var dstIDs []string // we have all the indexes in cache now do a reverse check
+ if dstIDs, err = dm.DataDB().GetKeysForPrefix(utils.DESTINATION_PREFIX); err != nil {
+ err = fmt.Errorf("error <%s> querying keys for destinations", err.Error())
+ return
+ }
+
+ for _, dstID := range dstIDs {
+ dstID = strings.TrimPrefix(dstID, utils.DESTINATION_PREFIX) //
+ var dst *Destination
+ if dst, err = getCachedObject(dstID); err != nil {
+ err = fmt.Errorf("error <%s> querying the destination: <%v>", err.Error(), dstID)
+ return
+ }
+ for _, prefix := range dst.Prefixes {
+ var ids []string
+ if ids, err = getCachedIndex(prefix); err != nil { // read from cache but do not write if not there
+ if err != utils.ErrNotFound {
+ err = fmt.Errorf("error <%s> querying the reverseDestination: <%v>", err.Error(), prefix)
+ return
+ }
+ err = nil
+ missingIndex[prefix] = append(missingIndex[prefix], dstID)
+ continue
+ }
+ if !utils.IsSliceMember(ids, dstID) { // the index doesn't exits for this actionPlan
+ missingIndex[prefix] = append(missingIndex[prefix], dstID)
+ }
+ }
+ }
+
+ rply = &ReverseDestinationsIHReply{
+ MissingReverseDestinations: missingIndex,
+ BrokenReferences: brokenRef,
+ }
+ return
+}
+
+type FilterIHReply struct {
+ MissingObjects []string // list of object that are referenced in indexes but are not found in the dataDB
+ MissingIndexes map[string][]string // list of missing indexes for each object (the map has the key as the objectID and a list of indexes)
+ BrokenIndexes map[string][]string // list of broken indexes for each object (the map has the key as the index and a list of objects)
+ MissingFilters map[string][]string // list of broken references (the map has the key as the filterID and a list of objectIDs)
+}
+
+//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+// getFiltersAndContexts returns the filtreIDs and context(if any) for that object
+func getFiltersAndContexts(dm *DataManager, indxType, tnt, id string) (filterIDs []string, contexts *[]string, err error) { // add contexts
+ switch indxType {
+ case utils.CacheResourceFilterIndexes:
+ var rs *ResourceProfile
+ if rs, err = dm.GetResourceProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
+ return
+ }
+ filterIDs = rs.FilterIDs
+ case utils.CacheStatFilterIndexes:
+ var st *StatQueueProfile
+ if st, err = dm.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
+ return
+ }
+ filterIDs = st.FilterIDs
+ case utils.CacheThresholdFilterIndexes:
+ var th *ThresholdProfile
+ if th, err = dm.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
+ return
+ }
+ filterIDs = th.FilterIDs
+ case utils.CacheSupplierFilterIndexes:
+ var rt *SupplierProfile
+ if rt, err = dm.GetSupplierProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
+ return
+ }
+ filterIDs = rt.FilterIDs
+ case utils.CacheAttributeFilterIndexes:
+ var at *AttributeProfile
+ if at, err = dm.GetAttributeProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
+ return
+ }
+ filterIDs = at.FilterIDs
+ contexts = &at.Contexts
+ case utils.CacheChargerFilterIndexes:
+ var ch *ChargerProfile
+ if ch, err = dm.GetChargerProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
+ return
+ }
+ filterIDs = ch.FilterIDs
+ case utils.CacheDispatcherFilterIndexes:
+ var ds *DispatcherProfile
+ if ds, err = dm.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional); err != nil {
+ return
+ }
+ filterIDs = ds.FilterIDs
+ contexts = &ds.Subsystems
+ default:
+ return nil, nil, fmt.Errorf("unsuported index type:<%q>", indxType)
+ }
+ return
+}
+
+// objFIH keeps only the FilterIDs and Contexts from objects
+type objFIH struct {
+ filterIDs []string
+ contexts *[]string
+}
+
+// getIHObjFromCache returns all information that is needed from the mentioned object
+// uses an extra cache(controled by the API) to optimize data management
+func getIHObjFromCache(dm *DataManager, objCache *ltcache.Cache, indxType, tnt, id string) (obj *objFIH, err error) {
+ cacheKey := utils.ConcatenatedKey(tnt, id)
+ if objVal, ok := objCache.Get(cacheKey); ok {
+ if objVal == nil {
+ return nil, utils.ErrNotFound
+ }
+ return objVal.(*objFIH), nil
+ }
+ var filtIDs []string
+ var contexts *[]string
+ if filtIDs, contexts, err = getFiltersAndContexts(dm, indxType, tnt, id); err != nil {
+ if err == utils.ErrNotFound {
+ objCache.Set(cacheKey, nil, nil)
+ }
+ return
+ }
+ obj = &objFIH{
+ filterIDs: filtIDs,
+ contexts: contexts,
+ }
+ objCache.Set(cacheKey, obj, nil)
+ return
+}
+
+// getIHFltrFromCache returns the Filter
+// uses an extra cache(controled by the API) to optimize data management
+func getIHFltrFromCache(dm *DataManager, fltrCache *ltcache.Cache, tnt, id string) (fltr *Filter, err error) {
+ cacheKey := utils.ConcatenatedKey(tnt, id)
+ if fltrVal, ok := fltrCache.Get(cacheKey); ok {
+ if fltrVal == nil {
+ return nil, utils.ErrNotFound
+ }
+ return fltrVal.(*Filter), nil
+ }
+ if fltr, err = GetFilter(dm, tnt, id,
+ true, false, utils.NonTransactional); err != nil {
+ if err == utils.ErrNotFound {
+ fltrCache.Set(cacheKey, nil, nil)
+ }
+ return
+ }
+ fltrCache.Set(cacheKey, fltr, nil)
+ return
+}
+
+// getIHFltrIdxFromCache returns the Filter index
+// uses an extra cache(controled by the API) to optimize data management
+func getIHFltrIdxFromCache(dm *DataManager, fltrIdxCache *ltcache.Cache, idxItmType, tntCtx, fltrType, fldName, fldValue string) (idx utils.StringMap, err error) {
+ idxKey := utils.ConcatenatedKey(fltrType, fldName, fldValue)
+ cacheKey := utils.ConcatenatedKey(tntCtx, idxKey)
+ if fltrVal, ok := fltrIdxCache.Get(cacheKey); ok {
+ if fltrVal == nil {
+ return nil, utils.ErrNotFound
+ }
+ return fltrVal.(utils.StringMap), nil
+ }
+ var indexes map[string]utils.StringMap
+ if indexes, err = dm.GetFilterIndexes(idxItmType, tntCtx, fltrType, map[string]string{fldName: fldValue}); err != nil {
+ if err == utils.ErrNotFound {
+ fltrIdxCache.Set(cacheKey, nil, nil)
+ }
+ return
+ }
+ idx = indexes[idxKey]
+ fltrIdxCache.Set(cacheKey, idx, nil)
+ return
+}
+
+// getFilterAsIndexSet will parse the rules of filter and add them to the index map
+func getFilterAsIndexSet(dm *DataManager, fltrIdxCache *ltcache.Cache, idxItmType, tntCtx string, fltr *Filter) (indexes map[string]utils.StringMap, err error) {
+ indexes = make(map[string]utils.StringMap)
+ for _, flt := range fltr.Rules {
+ if flt.Type != utils.MetaString &&
+ flt.Type != utils.MetaPrefix {
+ continue
+ }
+ for _, fldVal := range flt.Values {
+ var rcvIndx utils.StringMap
+ // only read from cache in case if we do not find the index to not cache the negative response
+ if rcvIndx, err = getIHFltrIdxFromCache(dm, fltrIdxCache, idxItmType, tntCtx, flt.Type, flt.Element, fldVal); err != nil {
+ if err != utils.ErrNotFound {
+ return
+ }
+ err = nil
+ rcvIndx = make(utils.StringMap) // create an empty index if is not found in DB in case we add them later
+ }
+ indexes[utils.ConcatenatedKey(flt.Type, flt.Element, fldVal)] = rcvIndx
+ }
+ }
+ return indexes, nil
+}
+
+// updateFilterIHMisingIndx updates the reply with the missing indexes for a specific object( obj->filter->index relation)
+func updateFilterIHMisingIndx(dm *DataManager, fltrCache, fltrIdxCache *ltcache.Cache, filterIDs []string, indxType, tnt, tntCtx, itmID string, rply *FilterIHReply) (_ *FilterIHReply, err error) {
+ if len(filterIDs) == 0 {
+ var rcvIndx utils.StringMap
+ if rcvIndx, err = getIHFltrIdxFromCache(dm, nil, indxType, tntCtx, utils.META_NONE, utils.META_ANY, utils.META_ANY); err != nil {
+ if err != utils.ErrNotFound {
+ return
+ }
+ key := utils.ConcatenatedKey(tntCtx, utils.META_NONE, utils.META_ANY, utils.META_ANY)
+ rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
+ } else if !rcvIndx.HasKey(itmID) {
+ key := utils.ConcatenatedKey(tntCtx, utils.META_NONE, utils.META_ANY, utils.META_ANY)
+ rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
+ }
+
+ return rply, nil
+ }
+ for _, fltrID := range filterIDs {
+ var fltr *Filter
+ if fltr, err = getIHFltrFromCache(dm, fltrCache, tnt, fltrID); err != nil {
+ if err != utils.ErrNotFound {
+ return
+ }
+ fltrID = utils.ConcatenatedKey(tnt, fltrID)
+ rply.MissingFilters[fltrID] = append(rply.MissingFilters[fltrID], itmID)
+ continue
+ }
+ var indexes map[string]utils.StringMap
+ if indexes, err = getFilterAsIndexSet(dm, fltrIdxCache, indxType, tntCtx, fltr); err != nil {
+ return
+ }
+ for key, idx := range indexes {
+ if !idx.HasKey(itmID) {
+ key = utils.ConcatenatedKey(tntCtx, key)
+ rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
+ }
+ }
+ }
+ return rply, nil
+}
+
+// GetFltrIdxHealth returns the missing indexes for all objects
+func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcache.Cache, indxType string) (rply *FilterIHReply, err error) {
+ // check the objects ( obj->filter->index relation)
+ rply = &FilterIHReply{
+ MissingIndexes: make(map[string][]string),
+ BrokenIndexes: make(map[string][]string),
+ MissingFilters: make(map[string][]string),
+ }
+ objPrfx := utils.CacheIndexesToPrefix[indxType]
+ var ids []string
+ if ids, err = dm.dataDB.GetKeysForPrefix(objPrfx); err != nil {
+ return
+ }
+ for _, id := range ids {
+ id = strings.TrimPrefix(id, objPrfx)
+ tntID := utils.NewTenantID(id)
+ var obj *objFIH
+ if obj, err = getIHObjFromCache(dm, objCache, indxType, tntID.Tenant, tntID.ID); err != nil {
+ return
+ }
+
+ if obj.contexts == nil {
+ if rply, err = updateFilterIHMisingIndx(dm, fltrCache, fltrIdxCache, obj.filterIDs, indxType, tntID.Tenant, tntID.Tenant, tntID.ID, rply); err != nil {
+ return
+ }
+ } else {
+ for _, ctx := range *obj.contexts {
+ if rply, err = updateFilterIHMisingIndx(dm, fltrCache, fltrIdxCache, obj.filterIDs, indxType, tntID.Tenant, utils.ConcatenatedKey(tntID.Tenant, ctx), tntID.ID, rply); err != nil {
+ return
+ }
+ }
+ }
+ }
+
+ // check the indexes( index->filter->obj relation)
+ idxPrfx := utils.CacheInstanceToPrefix[indxType]
+ var indexKeys []string
+ if indexKeys, err = dm.dataDB.GetKeysForPrefix(idxPrfx); err != nil {
+ return
+ }
+ for _, dataID := range indexKeys {
+ dataID = strings.TrimPrefix(dataID, idxPrfx)
+
+ splt := utils.SplitConcatenatedKey(dataID) // tntCtx:filterType:fieldName:fieldVal
+ lsplt := len(splt)
+ if lsplt < 4 {
+ fmt.Println(idxPrfx)
+ err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", dataID)
+ return
+ }
+ tnt := splt[0]
+ var ctx *string
+ if lsplt-3 == 2 {
+ ctx = &splt[1]
+ }
+
+ fieldVal := splt[lsplt-1]
+ fieldName := splt[lsplt-2]
+ filterType := splt[lsplt-3]
+
+ tntCtx := utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems
+ idxKey := utils.ConcatenatedKey(splt[lsplt-3:]...)
+
+ var idx utils.StringMap
+ if idx, err = getIHFltrIdxFromCache(dm, fltrIdxCache, indxType, tntCtx, filterType, fieldName, fieldVal); err != nil {
+ return
+ }
+ for itmID := range idx {
+ var obj *objFIH
+ if obj, err = getIHObjFromCache(dm, objCache, indxType, tnt, itmID); err != nil {
+ if err != utils.ErrNotFound {
+ return
+ }
+ rply.MissingObjects = append(rply.MissingObjects, utils.ConcatenatedKey(tnt, itmID))
+ err = nil
+ continue
+ }
+ if ctx != nil ||
+ (obj.contexts == nil || !utils.IsSliceMember(*obj.contexts, *ctx)) {
+ key := utils.ConcatenatedKey(tntCtx, idxKey)
+ rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID)
+ continue
+ }
+ if len(obj.filterIDs) == 0 {
+ if utils.ConcatenatedKey(utils.META_NONE, utils.META_ANY, utils.META_ANY) != idxKey {
+ rply.BrokenIndexes[dataID] = append(rply.BrokenIndexes[dataID], itmID)
+ }
+ continue
+ }
+ var hasIndx bool
+ for _, fltrID := range obj.filterIDs {
+ var fltr *Filter
+ if fltr, err = getIHFltrFromCache(dm, fltrCache, tnt, fltrID); err != nil {
+ if err != utils.ErrNotFound {
+ return
+ }
+ err = nil // should be already logged when we parsed all the objects
+ continue
+ }
+ var indexes map[string]utils.StringMap
+ if indexes, err = getFilterAsIndexSet(dm, fltrIdxCache, indxType, tntCtx, fltr); err != nil { //TODO
+ return
+ }
+ idx, has := indexes[idxKey]
+ if hasIndx = has && idx.HasKey(itmID); hasIndx {
+ break
+ }
+ }
+ if !hasIndx {
+ key := utils.ConcatenatedKey(tnt, idxKey)
+ rply.BrokenIndexes[key] = append(rply.BrokenIndexes[key], itmID)
+ }
+ }
+ }
+
+ return
+}
diff --git a/engine/z_libindex_health_test.go b/engine/z_libindex_health_test.go
new file mode 100644
index 000000000..c7bb36511
--- /dev/null
+++ b/engine/z_libindex_health_test.go
@@ -0,0 +1,365 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package engine
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/ltcache"
+)
+
+func TestHealthAccountAction(t *testing.T) {
+ Cache.Clear(nil)
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
+ dm := NewDataManager(db, cfg.CacheCfg(), nil)
+
+ if err := dm.SetAccountActionPlans("1001", []string{"AP1", "AP2"}, true); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetActionPlan("AP2", &ActionPlan{
+ Id: "AP2",
+ AccountIDs: utils.NewStringMap("1002"),
+ ActionTimings: []*ActionTiming{{}},
+ }, true, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+
+ exp := &AccountActionPlanIHReply{
+ MissingAccountActionPlans: map[string][]string{"1002": {"AP2"}},
+ BrokenReferences: map[string][]string{"AP2": {"1001"}, "AP1": nil},
+ }
+ if rply, err := GetAccountActionPlansIndexHealth(dm, -1, -1, -1, -1, false, false); err != nil {
+ t.Fatal(err)
+ } else if !reflect.DeepEqual(exp, rply) {
+ t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
+ }
+}
+
+func TestHealthAccountAction2(t *testing.T) {
+ Cache.Clear(nil)
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
+ dm := NewDataManager(db, cfg.CacheCfg(), nil)
+
+ if err := dm.SetAccountActionPlans("1001", []string{"AP1", "AP2"}, true); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetActionPlan("AP2", &ActionPlan{
+ Id: "AP2",
+ AccountIDs: utils.NewStringMap("1001"),
+ ActionTimings: []*ActionTiming{{}},
+ }, true, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+
+ exp := &AccountActionPlanIHReply{
+ MissingAccountActionPlans: map[string][]string{},
+ BrokenReferences: map[string][]string{"AP1": nil},
+ }
+ if rply, err := GetAccountActionPlansIndexHealth(dm, -1, -1, -1, -1, false, false); err != nil {
+ t.Fatal(err)
+ } else if !reflect.DeepEqual(exp, rply) {
+ t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
+ }
+}
+
+func TestHealthAccountAction3(t *testing.T) {
+ Cache.Clear(nil)
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
+ dm := NewDataManager(db, cfg.CacheCfg(), nil)
+
+ if err := dm.SetAccountActionPlans("1002", []string{"AP1"}, true); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetActionPlan("AP1", &ActionPlan{
+ Id: "AP1",
+ AccountIDs: utils.NewStringMap("1002"),
+ ActionTimings: []*ActionTiming{{}},
+ }, true, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetActionPlan("AP2", &ActionPlan{
+ Id: "AP2",
+ AccountIDs: utils.NewStringMap("1002"),
+ ActionTimings: []*ActionTiming{{}},
+ }, true, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+
+ exp := &AccountActionPlanIHReply{
+ MissingAccountActionPlans: map[string][]string{"1002": {"AP2"}},
+ BrokenReferences: map[string][]string{},
+ }
+ if rply, err := GetAccountActionPlansIndexHealth(dm, -1, -1, -1, -1, false, false); err != nil {
+ t.Fatal(err)
+ } else if !reflect.DeepEqual(exp, rply) {
+ t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
+ }
+}
+
+func TestHealthAccountAction4(t *testing.T) {
+ Cache.Clear(nil)
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
+ dm := NewDataManager(db, cfg.CacheCfg(), nil)
+
+ if err := dm.SetAccountActionPlans("1002", []string{"AP2", "AP1"}, true); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetAccountActionPlans("1001", []string{"AP2"}, true); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetActionPlan("AP1", &ActionPlan{
+ Id: "AP1",
+ AccountIDs: utils.NewStringMap("1002"),
+ ActionTimings: []*ActionTiming{{}},
+ }, true, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetActionPlan("AP2", &ActionPlan{
+ Id: "AP2",
+ AccountIDs: utils.NewStringMap("1001"),
+ ActionTimings: []*ActionTiming{{}},
+ }, true, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+
+ exp := &AccountActionPlanIHReply{
+ MissingAccountActionPlans: map[string][]string{},
+ BrokenReferences: map[string][]string{"AP2": {"1002"}},
+ }
+ if rply, err := GetAccountActionPlansIndexHealth(dm, -1, -1, -1, -1, false, false); err != nil {
+ t.Fatal(err)
+ } else if !reflect.DeepEqual(exp, rply) {
+ t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
+ }
+}
+
+func TestHealthReverseDestination(t *testing.T) {
+ Cache.Clear(nil)
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
+ dm := NewDataManager(db, cfg.CacheCfg(), nil)
+
+ if err := dm.SetReverseDestination(&Destination{Id: "DST1", Prefixes: []string{"1001", "1002"}}, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetReverseDestination(&Destination{Id: "DST2", Prefixes: []string{"1001"}}, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetDestination(&Destination{
+ Id: "DST2",
+ Prefixes: []string{"1002"},
+ }, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+
+ exp := &ReverseDestinationsIHReply{
+ MissingReverseDestinations: map[string][]string{"1002": {"DST2"}},
+ BrokenReferences: map[string][]string{"DST1": nil, "DST2": {"1001"}},
+ }
+ if rply, err := GetReverseDestinationsIndexHealth(dm, -1, -1, -1, -1, false, false); err != nil {
+ t.Fatal(err)
+ } else if !reflect.DeepEqual(exp, rply) {
+ t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
+ }
+}
+
+func TestHealthReverseDestination2(t *testing.T) {
+ Cache.Clear(nil)
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
+ dm := NewDataManager(db, cfg.CacheCfg(), nil)
+
+ if err := dm.SetReverseDestination(&Destination{Id: "DST1", Prefixes: []string{"1001"}}, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetReverseDestination(&Destination{Id: "DST2", Prefixes: []string{"1001"}}, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetDestination(&Destination{
+ Id: "DST2",
+ Prefixes: []string{"1001"},
+ }, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+
+ exp := &ReverseDestinationsIHReply{
+ MissingReverseDestinations: map[string][]string{},
+ BrokenReferences: map[string][]string{"DST1": nil},
+ }
+ if rply, err := GetReverseDestinationsIndexHealth(dm, -1, -1, -1, -1, false, false); err != nil {
+ t.Fatal(err)
+ } else if !reflect.DeepEqual(exp, rply) {
+ t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
+ }
+}
+
+func TestHealthReverseDestination3(t *testing.T) {
+ Cache.Clear(nil)
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
+ dm := NewDataManager(db, cfg.CacheCfg(), nil)
+
+ if err := dm.SetReverseDestination(&Destination{Id: "DST1", Prefixes: []string{"1002"}}, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetDestination(&Destination{
+ Id: "DST1",
+ Prefixes: []string{"1002"},
+ }, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetDestination(&Destination{
+ Id: "DST2",
+ Prefixes: []string{"1002"},
+ }, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+
+ exp := &ReverseDestinationsIHReply{
+ MissingReverseDestinations: map[string][]string{"1002": {"DST2"}},
+ BrokenReferences: map[string][]string{},
+ }
+ if rply, err := GetReverseDestinationsIndexHealth(dm, -1, -1, -1, -1, false, false); err != nil {
+ t.Fatal(err)
+ } else if !reflect.DeepEqual(exp, rply) {
+ t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
+ }
+}
+
+func TestHealthReverseDestination4(t *testing.T) {
+ Cache.Clear(nil)
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
+ dm := NewDataManager(db, cfg.CacheCfg(), nil)
+
+ if err := dm.SetReverseDestination(&Destination{Id: "DST1", Prefixes: []string{"1002"}}, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetReverseDestination(&Destination{Id: "DST2", Prefixes: []string{"1001", "1002"}}, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetDestination(&Destination{
+ Id: "DST1",
+ Prefixes: []string{"1002"},
+ }, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ if err := dm.SetDestination(&Destination{
+ Id: "DST2",
+ Prefixes: []string{"1001"},
+ }, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+
+ exp := &ReverseDestinationsIHReply{
+ MissingReverseDestinations: map[string][]string{},
+ BrokenReferences: map[string][]string{"DST2": {"1002"}},
+ }
+ if rply, err := GetReverseDestinationsIndexHealth(dm, -1, -1, -1, -1, false, false); err != nil {
+ t.Fatal(err)
+ } else if !reflect.DeepEqual(exp, rply) {
+ t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
+ }
+}
+
+func TestHealthFilter(t *testing.T) {
+ Cache.Clear(nil)
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
+ // db, err := NewDataDBConn(cfg.DataDbCfg().DataDbType,
+ // cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort,
+ // cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser,
+ // cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding,
+ // "", cfg.DataDbCfg().Items)
+ // if err != nil {
+ // t.Fatal(err)
+ // }
+ dm := NewDataManager(db, cfg.CacheCfg(), nil)
+
+ if err := dm.SetAttributeProfile(&AttributeProfile{
+ Tenant: "cgrates.org",
+ ID: "ATTR1",
+ Contexts: []string{utils.META_ANY},
+ FilterIDs: []string{"*string:~*req.Account:1001", "Fltr1"},
+ }, false); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := dm.SetFilterIndexes(utils.CacheAttributeFilterIndexes, "cgrates.org:*any",
+ map[string]utils.StringMap{"*string:~*req.Account:1002": {"ATTR1": true, "ATTR2": true}},
+ true, utils.NonTransactional); err != nil {
+ t.Fatal(err)
+ }
+ exp := &FilterIHReply{
+ MissingIndexes: map[string][]string{
+ "cgrates.org:*any:*string:~*req.Account:1001": {"ATTR1"},
+ "cgrates.org:*any:*string:~*req.Account:1002": {"ATTR1"},
+ },
+ BrokenIndexes: make(map[string][]string),
+ MissingFilters: map[string][]string{
+ "cgrates.org:Fltr1": {"ATTR1"},
+ },
+ MissingObjects: []string{"cgrates.org:ATTR2"},
+ }
+
+ if rply, err := GetFltrIdxHealth(dm,
+ ltcache.NewCache(-1, 0, false, nil),
+ ltcache.NewCache(-1, 0, false, nil),
+ ltcache.NewCache(-1, 0, false, nil),
+ utils.CacheAttributeFilterIndexes); err != nil {
+ t.Fatal(err)
+ } else if !reflect.DeepEqual(exp, rply) {
+ t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rply))
+ }
+}