From 9fa3cd4b8c5e5cb672c0b91da2fbe184539ac00e Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 7 Jul 2021 17:11:20 +0300 Subject: [PATCH] Added all index health APIs --- apis/filter_indexes.go | 326 +++++++--- data/tariffplans/tutrates/RateProfiles.csv | 6 +- engine/attributes.go | 8 +- engine/datamanager.go | 12 +- engine/libindex.go | 212 +++---- engine/libindex_health.go | 688 +++++++++++++++++++++ engine/z_libindex_health_test.go | 148 +++++ rates/rates.go | 30 +- utils/consts.go | 2 - 9 files changed, 1220 insertions(+), 212 deletions(-) create mode 100644 engine/libindex_health.go create mode 100644 engine/z_libindex_health_test.go diff --git a/apis/filter_indexes.go b/apis/filter_indexes.go index 4d5b52773..69cc7f6b2 100644 --- a/apis/filter_indexes.go +++ b/apis/filter_indexes.go @@ -22,6 +22,7 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" ) type AttrGetFilterIndexes struct { @@ -223,7 +224,7 @@ func (adms *AdminSv1) GetFilterIndexes(ctx *context.Context, arg *AttrGetFilterI } // ComputeFilterIndexes selects which index filters to recompute -func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.ArgsComputeFilterIndexes, reply *string) (err error) { +func (adms *AdminSv1) ComputeFilterIndexes(ctx *context.Context, args *utils.ArgsComputeFilterIndexes, reply *string) (err error) { transactionID := utils.GenUUID() tnt := args.Tenant if tnt == utils.EmptyString { @@ -235,9 +236,9 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A //ThresholdProfile Indexes if args.ThresholdS { cacheIDs[utils.ThresholdFilterIndexIDs] = []string{utils.MetaAny} - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheThresholdFilterIndexes, - nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - th, e := adms.dm.GetThresholdProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheThresholdFilterIndexes, + nil, transactionID, func(tnt, id, grp string) (*[]string, error) { + th, e := adms.dm.GetThresholdProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -250,9 +251,9 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A //StatQueueProfile Indexes if args.StatS { cacheIDs[utils.StatFilterIndexIDs] = []string{utils.MetaAny} - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheStatFilterIndexes, - nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - sq, e := adms.dm.GetStatQueueProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheStatFilterIndexes, + nil, transactionID, func(tnt, id, grp string) (*[]string, error) { + sq, e := adms.dm.GetStatQueueProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -265,9 +266,9 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A //ResourceProfile Indexes if args.ResourceS { cacheIDs[utils.ResourceFilterIndexIDs] = []string{utils.MetaAny} - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheResourceFilterIndexes, - nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - rp, e := adms.dm.GetResourceProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheResourceFilterIndexes, + nil, transactionID, func(tnt, id, grp string) (*[]string, error) { + rp, e := adms.dm.GetResourceProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -280,9 +281,9 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A //RouteSProfile Indexes if args.RouteS { cacheIDs[utils.RouteFilterIndexIDs] = []string{utils.MetaAny} - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheRouteFilterIndexes, - nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - rp, e := adms.dm.GetRouteProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheRouteFilterIndexes, + nil, transactionID, func(tnt, id, grp string) (*[]string, error) { + rp, e := adms.dm.GetRouteProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -295,9 +296,9 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A //AttributeProfile Indexes if args.AttributeS { cacheIDs[utils.AttributeFilterIndexIDs] = []string{utils.MetaAny} - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheAttributeFilterIndexes, - nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - attr, e := adms.dm.GetAttributeProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheAttributeFilterIndexes, + nil, transactionID, func(tnt, id, grp string) (*[]string, error) { + attr, e := adms.dm.GetAttributeProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -310,9 +311,9 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A //ChargerProfile Indexes if args.ChargerS { cacheIDs[utils.ChargerFilterIndexIDs] = []string{utils.MetaAny} - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheChargerFilterIndexes, - nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - ch, e := adms.dm.GetChargerProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheChargerFilterIndexes, + nil, transactionID, func(tnt, id, grp string) (*[]string, error) { + ch, e := adms.dm.GetChargerProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -325,9 +326,9 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A //AccountFilter Indexes if args.AccountS { cacheIDs[utils.AccountsFilterIndexIDs] = []string{utils.MetaAny} - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheAccountsFilterIndexes, - nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - acnts, e := adms.dm.GetAccount(cntxt, tnt, id) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheAccountsFilterIndexes, + nil, transactionID, func(tnt, id, grp string) (*[]string, error) { + acnts, e := adms.dm.GetAccount(ctx, tnt, id) if e != nil { return nil, e } @@ -340,9 +341,9 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A //ActionFilter Indexes if args.ActionS { cacheIDs[utils.ActionProfilesFilterIndexIDs] = []string{utils.MetaAny} - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheActionProfilesFilterIndexes, - nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - act, e := adms.dm.GetActionProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheActionProfilesFilterIndexes, + nil, transactionID, func(tnt, id, grp string) (*[]string, error) { + act, e := adms.dm.GetActionProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -355,9 +356,9 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A var ratePrf []string if args.RateS { cacheIDs[utils.RateProfilesFilterIndexIDs] = []string{utils.MetaAny} - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheRateProfilesFilterIndexes, - nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - rtPrf, e := adms.dm.GetRateProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheRateProfilesFilterIndexes, + nil, transactionID, func(tnt, id, grp string) (*[]string, error) { + rtPrf, e := adms.dm.GetRateProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -367,7 +368,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A rtIds = append(rtIds, key) } cacheIDs[utils.RateFilterIndexIDs] = rtIds - _, e = engine.ComputeIndexes(cntxt, adms.dm, tnt, id, utils.CacheRateFilterIndexes, + _, e = engine.ComputeIndexes(ctx, adms.dm, tnt, id, utils.CacheRateFilterIndexes, &rtIds, transactionID, func(_, id, _ string) (*[]string, error) { return utils.SliceStringPointer(utils.CloneStringSlice(rtPrf.Rates[id].FilterIDs)), nil }, nil) @@ -383,9 +384,9 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A //DispatcherProfile Indexes if args.DispatcherS { cacheIDs[utils.DispatcherFilterIndexIDs] = []string{utils.MetaAny} - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheDispatcherFilterIndexes, - nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - dsp, e := adms.dm.GetDispatcherProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheDispatcherFilterIndexes, + nil, transactionID, func(tnt, id, grp string) (*[]string, error) { + dsp, e := adms.dm.GetDispatcherProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -399,59 +400,59 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A //Now we move from tmpKey to the right key for each type //ThresholdProfile Indexes if args.ThresholdS { - if err = adms.dm.SetIndexes(cntxt, utils.CacheThresholdFilterIndexes, tnt, nil, true, transactionID); err != nil { + if err = adms.dm.SetIndexes(ctx, utils.CacheThresholdFilterIndexes, tnt, nil, true, transactionID); err != nil { return } } //StatQueueProfile Indexes if args.StatS { - if err = adms.dm.SetIndexes(cntxt, utils.CacheStatFilterIndexes, tnt, nil, true, transactionID); err != nil { + if err = adms.dm.SetIndexes(ctx, utils.CacheStatFilterIndexes, tnt, nil, true, transactionID); err != nil { return } } //ResourceProfile Indexes if args.ResourceS { - if err = adms.dm.SetIndexes(cntxt, utils.CacheResourceFilterIndexes, tnt, nil, true, transactionID); err != nil { + if err = adms.dm.SetIndexes(ctx, utils.CacheResourceFilterIndexes, tnt, nil, true, transactionID); err != nil { return } } //RouteProfile Indexes if args.RouteS { - if err = adms.dm.SetIndexes(cntxt, utils.CacheRouteFilterIndexes, tnt, nil, true, transactionID); err != nil { + if err = adms.dm.SetIndexes(ctx, utils.CacheRouteFilterIndexes, tnt, nil, true, transactionID); err != nil { return } } //AttributeProfile Indexes if args.AttributeS { - if err = adms.dm.SetIndexes(cntxt, utils.CacheAttributeFilterIndexes, tnt, nil, true, transactionID); err != nil { + if err = adms.dm.SetIndexes(ctx, utils.CacheAttributeFilterIndexes, tnt, nil, true, transactionID); err != nil { return } } //ChargerProfile Indexes if args.ChargerS { - if err = adms.dm.SetIndexes(cntxt, utils.CacheChargerFilterIndexes, tnt, nil, true, transactionID); err != nil { + if err = adms.dm.SetIndexes(ctx, utils.CacheChargerFilterIndexes, tnt, nil, true, transactionID); err != nil { return } } //AccountProfile Indexes if args.AccountS { - if err = adms.dm.SetIndexes(cntxt, utils.CacheAccountsFilterIndexes, tnt, nil, true, transactionID); err != nil { + if err = adms.dm.SetIndexes(ctx, utils.CacheAccountsFilterIndexes, tnt, nil, true, transactionID); err != nil { return err } } //ActionProfile Indexes if args.ActionS { - if err = adms.dm.SetIndexes(cntxt, utils.CacheActionProfilesFilterIndexes, tnt, nil, true, transactionID); err != nil { + if err = adms.dm.SetIndexes(ctx, utils.CacheActionProfilesFilterIndexes, tnt, nil, true, transactionID); err != nil { return err } } //RateProfile Indexes if args.RateS { - if err = adms.dm.SetIndexes(cntxt, utils.CacheRateProfilesFilterIndexes, tnt, nil, true, transactionID); err != nil { + if err = adms.dm.SetIndexes(ctx, utils.CacheRateProfilesFilterIndexes, tnt, nil, true, transactionID); err != nil { return err } for _, tntId := range ratePrf { - if err = adms.dm.SetIndexes(cntxt, utils.CacheRateFilterIndexes, tntId, nil, true, transactionID); err != nil { + if err = adms.dm.SetIndexes(ctx, utils.CacheRateFilterIndexes, tntId, nil, true, transactionID); err != nil { return err } @@ -459,7 +460,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A } //DispatcherProfile Indexes if args.DispatcherS { - if err = adms.dm.SetIndexes(cntxt, utils.CacheDispatcherFilterIndexes, tnt, nil, true, transactionID); err != nil { + if err = adms.dm.SetIndexes(ctx, utils.CacheDispatcherFilterIndexes, tnt, nil, true, transactionID); err != nil { return } } @@ -470,10 +471,10 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A for idx := range cacheIDs { loadIDs[utils.ArgCacheToInstance[idx]] = timeNow } - if err := adms.dm.SetLoadIDs(cntxt, loadIDs); err != nil { + if err := adms.dm.SetLoadIDs(ctx, loadIDs); err != nil { return utils.APIErrorHandler(err) } - if err := adms.callCacheForComputeIndexes(cntxt, utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), + if err := adms.callCacheForComputeIndexes(ctx, utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, cacheIDs, args.APIOpts); err != nil { return err } @@ -482,7 +483,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A } // ComputeFilterIndexIDs computes specific filter indexes -func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils.ArgsComputeFilterIndexIDs, reply *string) (err error) { +func (adms *AdminSv1) ComputeFilterIndexIDs(ctx *context.Context, args *utils.ArgsComputeFilterIndexIDs, reply *string) (err error) { transactionID := utils.NonTransactional tnt := args.Tenant if tnt == utils.EmptyString { @@ -491,9 +492,9 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. var indexes utils.StringSet cacheIDs := make(map[string][]string) //ThresholdProfile Indexes - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheThresholdFilterIndexes, - &args.ThresholdIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - th, e := adms.dm.GetThresholdProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheThresholdFilterIndexes, + &args.ThresholdIDs, transactionID, func(tnt, id, grp string) (*[]string, error) { + th, e := adms.dm.GetThresholdProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -505,9 +506,9 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. cacheIDs[utils.ThresholdFilterIndexIDs] = indexes.AsSlice() } //StatQueueProfile Indexes - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheStatFilterIndexes, - &args.StatIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - sq, e := adms.dm.GetStatQueueProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheStatFilterIndexes, + &args.StatIDs, transactionID, func(tnt, id, grp string) (*[]string, error) { + sq, e := adms.dm.GetStatQueueProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -520,9 +521,9 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. cacheIDs[utils.StatFilterIndexIDs] = indexes.AsSlice() } //ResourceProfile Indexes - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheResourceFilterIndexes, - &args.ResourceIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - rp, e := adms.dm.GetResourceProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheResourceFilterIndexes, + &args.ResourceIDs, transactionID, func(tnt, id, grp string) (*[]string, error) { + rp, e := adms.dm.GetResourceProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -535,9 +536,9 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. cacheIDs[utils.ResourceFilterIndexIDs] = indexes.AsSlice() } //RouteProfile Indexes - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheRouteFilterIndexes, - &args.RouteIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - rp, e := adms.dm.GetRouteProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheRouteFilterIndexes, + &args.RouteIDs, transactionID, func(tnt, id, grp string) (*[]string, error) { + rp, e := adms.dm.GetRouteProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -550,9 +551,9 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. cacheIDs[utils.RouteFilterIndexIDs] = indexes.AsSlice() } //AttributeProfile Indexes - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheAttributeFilterIndexes, - &args.AttributeIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - attr, e := adms.dm.GetAttributeProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheAttributeFilterIndexes, + &args.AttributeIDs, transactionID, func(tnt, id, grp string) (*[]string, error) { + attr, e := adms.dm.GetAttributeProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -564,9 +565,9 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. cacheIDs[utils.AttributeFilterIndexIDs] = indexes.AsSlice() } //ChargerProfile Indexes - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheChargerFilterIndexes, - &args.ChargerIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - ch, e := adms.dm.GetChargerProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheChargerFilterIndexes, + &args.ChargerIDs, transactionID, func(tnt, id, grp string) (*[]string, error) { + ch, e := adms.dm.GetChargerProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -578,9 +579,9 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. cacheIDs[utils.ChargerFilterIndexIDs] = indexes.AsSlice() } //AccountIndexes - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheAccountsFilterIndexes, - &args.AccountIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - acc, e := adms.dm.GetAccount(cntxt, tnt, id) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheAccountsFilterIndexes, + &args.AccountIDs, transactionID, func(tnt, id, grp string) (*[]string, error) { + acc, e := adms.dm.GetAccount(ctx, tnt, id) if e != nil { return nil, e } @@ -592,9 +593,9 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. cacheIDs[utils.AccountsFilterIndexIDs] = indexes.AsSlice() } //ActionProfile Indexes - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheActionProfilesFilterIndexes, - &args.ActionProfileIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - act, e := adms.dm.GetActionProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheActionProfilesFilterIndexes, + &args.ActionProfileIDs, transactionID, func(tnt, id, grp string) (*[]string, error) { + act, e := adms.dm.GetActionProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -606,9 +607,9 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. cacheIDs[utils.ActionProfilesFilterIndexIDs] = indexes.AsSlice() } //RateProfile Indexes - if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheRateProfilesFilterIndexes, - &args.RateProfileIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - rpr, e := adms.dm.GetRateProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if _, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheRateProfilesFilterIndexes, + &args.RateProfileIDs, transactionID, func(tnt, id, grp string) (*[]string, error) { + rpr, e := adms.dm.GetRateProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -616,7 +617,7 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. for key := range rpr.Rates { rtIds = append(rtIds, key) } - indexesRate, e := engine.ComputeIndexes(cntxt, adms.dm, tnt, id, utils.CacheRateFilterIndexes, + indexesRate, e := engine.ComputeIndexes(ctx, adms.dm, tnt, id, utils.CacheRateFilterIndexes, &rtIds, transactionID, func(_, id, _ string) (*[]string, error) { return utils.SliceStringPointer(utils.CloneStringSlice(rpr.Rates[id].FilterIDs)), nil }, nil) @@ -634,9 +635,9 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. cacheIDs[utils.RateProfilesFilterIndexIDs] = indexes.AsSlice() } //DispatcherProfile Indexes - if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, utils.EmptyString, utils.CacheDispatcherFilterIndexes, - &args.DispatcherIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - dsp, e := adms.dm.GetDispatcherProfile(cntxt, tnt, id, true, false, utils.NonTransactional) + if indexes, err = engine.ComputeIndexes(ctx, adms.dm, tnt, utils.EmptyString, utils.CacheDispatcherFilterIndexes, + &args.DispatcherIDs, transactionID, func(tnt, id, grp string) (*[]string, error) { + dsp, e := adms.dm.GetDispatcherProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -653,13 +654,180 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. for idx := range cacheIDs { loadIDs[utils.ArgCacheToInstance[idx]] = timeNow } - if err := adms.dm.SetLoadIDs(cntxt, loadIDs); err != nil { + if err := adms.dm.SetLoadIDs(ctx, loadIDs); err != nil { return utils.APIErrorHandler(err) } - if err := adms.callCacheForComputeIndexes(cntxt, utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), + if err := adms.callCacheForComputeIndexes(ctx, utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, cacheIDs, args.APIOpts); err != nil { return err } *reply = utils.OK return nil } + +func (adms *AdminSv1) GetReverseFilterHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *map[string]*engine.ReverseFilterIHReply) (err error) { + objCaches := map[string]*ltcache.Cache{utils.CacheRateFilterIndexes: ltcache.NewCache(-1, 0, false, nil)} + for indxType := range utils.CacheIndexesToPrefix { + objCaches[indxType] = ltcache.NewCache(-1, 0, false, nil) + } + + *reply, err = engine.GetRevFltrIdxHealth(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + objCaches, + ) + return +} + +func (adms *AdminSv1) GetThresholdsIndexesHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *engine.FilterIHReply) error { + rp, err := engine.GetFltrIdxHealth(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil), + utils.CacheThresholdFilterIndexes, + ) + if err != nil { + return err + } + *reply = *rp + return nil +} + +func (adms *AdminSv1) GetResourcesIndexesHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *engine.FilterIHReply) error { + rp, err := engine.GetFltrIdxHealth(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil), + utils.CacheResourceFilterIndexes, + ) + if err != nil { + return err + } + *reply = *rp + return nil +} + +func (adms *AdminSv1) GetStatsIndexesHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *engine.FilterIHReply) error { + rp, err := engine.GetFltrIdxHealth(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil), + utils.CacheStatFilterIndexes, + ) + if err != nil { + return err + } + *reply = *rp + return nil +} + +func (adms *AdminSv1) GetRoutesIndexesHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *engine.FilterIHReply) error { + rp, err := engine.GetFltrIdxHealth(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil), + utils.CacheRouteFilterIndexes, + ) + if err != nil { + return err + } + *reply = *rp + return nil +} + +func (adms *AdminSv1) GetAttributesIndexesHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *engine.FilterIHReply) error { + rp, err := engine.GetFltrIdxHealth(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil), + utils.CacheAttributeFilterIndexes, + ) + if err != nil { + return err + } + *reply = *rp + return nil +} + +func (adms *AdminSv1) GetChargersIndexesHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *engine.FilterIHReply) error { + rp, err := engine.GetFltrIdxHealth(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil), + utils.CacheChargerFilterIndexes, + ) + if err != nil { + return err + } + *reply = *rp + return nil +} + +func (adms *AdminSv1) GetDispatchersIndexesHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *engine.FilterIHReply) error { + rp, err := engine.GetFltrIdxHealth(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil), + utils.CacheDispatcherFilterIndexes, + ) + if err != nil { + return err + } + *reply = *rp + return nil +} + +func (adms *AdminSv1) GetRateProfilesIndexesHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *engine.FilterIHReply) error { + rp, err := engine.GetFltrIdxHealth(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil), + utils.CacheRateProfilesFilterIndexes, + ) + if err != nil { + return err + } + *reply = *rp + return nil +} + +func (adms *AdminSv1) GetActionsIndexesHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *engine.FilterIHReply) error { + rp, err := engine.GetFltrIdxHealth(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil), + utils.CacheActionProfilesFilterIndexes, + ) + if err != nil { + return err + } + *reply = *rp + return nil +} + +func (adms *AdminSv1) GetAccountsIndexesHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *engine.FilterIHReply) error { + rp, err := engine.GetFltrIdxHealth(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil), + utils.CacheAccountsFilterIndexes, + ) + if err != nil { + return err + } + *reply = *rp + return nil +} + +func (adms *AdminSv1) GetRateRatesIndexesHealth(ctx *context.Context, args *engine.IndexHealthArgs, reply *engine.FilterIHReply) error { + rp, err := engine.GetFltrIdxHealthForRateRates(ctx, adms.dm, + ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil), + ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil), + ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil), + ) + if err != nil { + return err + } + *reply = *rp + return nil +} diff --git a/data/tariffplans/tutrates/RateProfiles.csv b/data/tariffplans/tutrates/RateProfiles.csv index 522637787..7db298161 100644 --- a/data/tariffplans/tutrates/RateProfiles.csv +++ b/data/tariffplans/tutrates/RateProfiles.csv @@ -1,5 +1,5 @@ #Tenant,ID,FilterIDs,Weights,MinCost,MaxCost,MaxCostStrategy,RateID,RateFilterIDs,RateActivationStart,RateWeights,RateBlocker,RateIntervalStart,RateFixedFee,RateRecurrentFee,RateUnit,RateIncrement cgrates.org,RP1,*string:~*req.Subject:1001,;0,0.1,0.6,*free,RT_WEEK,,"* * * * 1-5",;0,false,0s,,0.12,1m,1m -cgrates.org,RP1,,,,,,,RT_WEEK,,,,,1m,,0.6,1m,1s -cgrates.org,RP1,,,,,,,RT_WEEKEND,,"* * * * 0,6",;10,false,0s,,0.06,1m,1s -cgrates.org,RP1,,,,,,,RT_CHRISTMAS,,* * 24 12 *,;30,false,0s,,0.06,1m,1s \ No newline at end of file +cgrates.org,RP1,,,,,,RT_WEEK,,,,,1m,,0.6,1m,1s +cgrates.org,RP1,,,,,,RT_WEEKEND,,"* * * * 0,6",;10,false,0s,,0.06,1m,1s +cgrates.org,RP1,,,,,,RT_CHRISTMAS,,* * 24 12 *,;30,false,0s,,0.06,1m,1s \ No newline at end of file diff --git a/engine/attributes.go b/engine/attributes.go index 4fbf986a3..5d85d1772 100644 --- a/engine/attributes.go +++ b/engine/attributes.go @@ -55,13 +55,13 @@ func (alS *AttributeService) Shutdown() { } // attributeProfileForEvent returns the matching attribute -func (alS *AttributeService) attributeProfileForEvent(apiCtx *context.Context, tnt string, attrsIDs []string, +func (alS *AttributeService) attributeProfileForEvent(ctx *context.Context, tnt string, attrsIDs []string, evNm utils.MapStorage, lastID string) (matchAttrPrfl *AttributeProfile, err error) { var attrIDs []string if len(attrsIDs) != 0 { attrIDs = attrsIDs } else { - aPrflIDs, err := MatchingItemIDsForEvent(apiCtx, evNm, + aPrflIDs, err := MatchingItemIDsForEvent(ctx, evNm, alS.cgrcfg.AttributeSCfg().StringIndexedFields, alS.cgrcfg.AttributeSCfg().PrefixIndexedFields, alS.cgrcfg.AttributeSCfg().SuffixIndexedFields, @@ -75,14 +75,14 @@ func (alS *AttributeService) attributeProfileForEvent(apiCtx *context.Context, t attrIDs = aPrflIDs.AsSlice() } for _, apID := range attrIDs { - aPrfl, err := alS.dm.GetAttributeProfile(apiCtx, tnt, apID, true, true, utils.NonTransactional) + aPrfl, err := alS.dm.GetAttributeProfile(ctx, tnt, apID, true, true, utils.NonTransactional) if err != nil { if err == utils.ErrNotFound { continue } return nil, err } - if pass, err := alS.filterS.Pass(apiCtx, tnt, aPrfl.FilterIDs, + if pass, err := alS.filterS.Pass(ctx, tnt, aPrfl.FilterIDs, evNm); err != nil { return nil, err } else if !pass { diff --git a/engine/datamanager.go b/engine/datamanager.go index b1c2418db..1e6bb59ff 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1404,31 +1404,31 @@ func (dm *DataManager) SetAttributeProfile(ctx *context.Context, ap *AttributePr return } -func (dm *DataManager) RemoveAttributeProfile(apiCtx *context.Context, tenant, id string, withIndex bool) (err error) { +func (dm *DataManager) RemoveAttributeProfile(ctx *context.Context, tenant, id string, withIndex bool) (err error) { if dm == nil { return utils.ErrNoDatabaseConn } - oldAttr, err := dm.GetAttributeProfile(apiCtx, tenant, id, true, false, utils.NonTransactional) + oldAttr, err := dm.GetAttributeProfile(ctx, tenant, id, true, false, utils.NonTransactional) if err != nil { return err } - if err = dm.DataDB().RemoveAttributeProfileDrv(apiCtx, tenant, id); err != nil { + if err = dm.DataDB().RemoveAttributeProfileDrv(ctx, tenant, id); err != nil { return } if oldAttr == nil { return utils.ErrNotFound } if withIndex { - if err = removeIndexFiltersItem(apiCtx, dm, utils.CacheAttributeFilterIndexes, tenant, id, oldAttr.FilterIDs); err != nil { + if err = removeIndexFiltersItem(ctx, dm, utils.CacheAttributeFilterIndexes, tenant, id, oldAttr.FilterIDs); err != nil { return } - if err = removeItemFromFilterIndex(apiCtx, dm, utils.CacheAttributeFilterIndexes, + if err = removeItemFromFilterIndex(ctx, dm, utils.CacheAttributeFilterIndexes, tenant, utils.EmptyString, id, oldAttr.FilterIDs); err != nil { return } } if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles]; itm.Replicate { - replicate(apiCtx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, config.CgrConfig().DataDbCfg().RplFiltered, utils.AttributeProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache utils.ReplicatorSv1RemoveAttributeProfile, diff --git a/engine/libindex.go b/engine/libindex.go index fc4e074a1..f97d15c88 100644 --- a/engine/libindex.go +++ b/engine/libindex.go @@ -40,16 +40,16 @@ var ( // newFilterIndex will get the index from DataManager if is not found it will create it // is used to update the mentioned index -func newFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string, newFlt *Filter) (indexes map[string]utils.StringSet, err error) { - tntCtx := tnt - if ctx != utils.EmptyString { - tntCtx = utils.ConcatenatedKey(tnt, ctx) +func newFilterIndex(ctx *context.Context, dm *DataManager, idxItmType, tnt, grp, itemID string, filterIDs []string, newFlt *Filter) (indexes map[string]utils.StringSet, err error) { + tntGrp := tnt + if grp != utils.EmptyString { + tntGrp = utils.ConcatenatedKey(tnt, grp) } indexes = make(map[string]utils.StringSet) if len(filterIDs) == 0 { // in case of None idxKey := utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny) var rcvIndx map[string]utils.StringSet - if rcvIndx, err = dm.GetIndexes(apiCtx, idxItmType, tntCtx, + if rcvIndx, err = dm.GetIndexes(ctx, idxItmType, tntGrp, idxKey, true, false); err != nil { if err != utils.ErrNotFound { @@ -70,7 +70,7 @@ func newFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c var fltr *Filter if newFlt != nil && newFlt.Tenant == tnt && newFlt.ID == fltrID { fltr = newFlt - } else if fltr, err = dm.GetFilter(apiCtx, tnt, fltrID, + } else if fltr, err = dm.GetFilter(ctx, tnt, fltrID, true, false, utils.NonTransactional); err != nil { if err == utils.ErrNotFound { err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v", @@ -102,7 +102,7 @@ func newFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c } var rcvIndx map[string]utils.StringSet // only read from cache in case if we do not find the index to not cache the negative response - if rcvIndx, err = dm.GetIndexes(apiCtx, idxItmType, tntCtx, + if rcvIndx, err = dm.GetIndexes(ctx, idxItmType, tntGrp, idxKey, true, false); err != nil { if err != utils.ErrNotFound { return @@ -121,19 +121,19 @@ func newFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c } // addItemToFilterIndex will add the itemID to the existing/created index and set it in the DataDB -func addItemToFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string) (err error) { - tntCtx := tnt - if ctx != utils.EmptyString { - tntCtx = utils.ConcatenatedKey(tnt, ctx) +func addItemToFilterIndex(ctx *context.Context, dm *DataManager, idxItmType, tnt, grp, itemID string, filterIDs []string) (err error) { + tntGrp := tnt + if grp != utils.EmptyString { + tntGrp = utils.ConcatenatedKey(tnt, grp) } // early lock to be sure that until we do not write back the indexes // another goroutine can't create new indexes refID := guardian.Guardian.GuardIDs(utils.EmptyString, - config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx) + config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntGrp) defer guardian.Guardian.UnguardIDs(refID) var indexes map[string]utils.StringSet - if indexes, err = newFilterIndex(apiCtx, dm, idxItmType, tnt, ctx, itemID, filterIDs, nil); err != nil { + if indexes, err = newFilterIndex(ctx, dm, idxItmType, tnt, grp, itemID, filterIDs, nil); err != nil { return } // in case we have a profile with only non indexable filters(e.g. only *gt) @@ -144,27 +144,27 @@ func addItemToFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, for indxKey, index := range indexes { index.Add(itemID) // remove from cache in order to corectly update the index - if err = Cache.Remove(apiCtx, idxItmType, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil { + if err = Cache.Remove(ctx, idxItmType, utils.ConcatenatedKey(tntGrp, indxKey), true, utils.NonTransactional); err != nil { return } } - return dm.SetIndexes(apiCtx, idxItmType, tntCtx, indexes, true, utils.NonTransactional) + return dm.SetIndexes(ctx, idxItmType, tntGrp, indexes, true, utils.NonTransactional) } // removeItemFromFilterIndex will remove the itemID from the existing/created index and set it in the DataDB -func removeItemFromFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, ctx, itemID string, filterIDs []string) (err error) { - tntCtx := tnt - if ctx != utils.EmptyString { - tntCtx = utils.ConcatenatedKey(tnt, ctx) +func removeItemFromFilterIndex(ctx *context.Context, dm *DataManager, idxItmType, tnt, grp, itemID string, filterIDs []string) (err error) { + tntGrp := tnt + if grp != utils.EmptyString { + tntGrp = utils.ConcatenatedKey(tnt, grp) } // early lock to be sure that until we do not write back the indexes // another goroutine can't create new indexes refID := guardian.Guardian.GuardIDs(utils.EmptyString, - config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx) + config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntGrp) defer guardian.Guardian.UnguardIDs(refID) var indexes map[string]utils.StringSet - if indexes, err = newFilterIndex(apiCtx, dm, idxItmType, tnt, ctx, itemID, filterIDs, nil); err != nil { + if indexes, err = newFilterIndex(ctx, dm, idxItmType, tnt, grp, itemID, filterIDs, nil); err != nil { return } if len(indexes) == 0 { // in case we have a profile with only non indexable filters(e.g. only *gt) @@ -176,32 +176,32 @@ func removeItemFromFilterIndex(apiCtx *context.Context, dm *DataManager, idxItmT indexes[idxKey] = nil // this will not be set in DB(handled by driver) } // remove from cache in order to corectly update the index - if err = Cache.Remove(apiCtx, idxItmType, utils.ConcatenatedKey(tntCtx, idxKey), true, utils.NonTransactional); err != nil { + if err = Cache.Remove(ctx, idxItmType, utils.ConcatenatedKey(tntGrp, idxKey), true, utils.NonTransactional); err != nil { return } } - return dm.SetIndexes(apiCtx, idxItmType, tntCtx, indexes, true, utils.NonTransactional) + return dm.SetIndexes(ctx, idxItmType, tntGrp, indexes, true, utils.NonTransactional) } // updatedIndexes will compare the old filtersIDs with the new ones and only update the filters indexes that are added/removed // idxItmType - the index object type(e.g.*attribute_filter_indexes, *rate_filter_indexes, *threshold_filter_indexes) // tnt - the tenant of the object -// ctx - the rate profile id for rate from RateProfile(sub indexes); for all the rest the ctx is ""(AttributePrf and DispatcherPrf have a separate function) +// grp - the rate profile id for rate from RateProfile(sub indexes); for all the rest the grp is ""(AttributePrf and DispatcherPrf have a separate function) // itemID - the object id // oldFilterIds - the filtersIDs that the old object had; this is optional if the object did not exist // newFilterIDs - the filtersIDs for the object that will be set -// useCtx - in case of subindexes(e.g. Rate from RateProfiles) need to add the ctx to the itemID when reverse filter indexes are set +// useGrp - in case of subindexes(e.g. Rate from RateProfiles) need to add the grp to the itemID when reverse filter indexes are set // used when updating the filters -func updatedIndexes(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, ctx, itemID string, oldFilterIds *[]string, newFilterIDs []string, useCtx bool) (err error) { - itmCtx := itemID - if useCtx { - itmCtx = utils.ConcatenatedKey(itemID, ctx) +func updatedIndexes(ctx *context.Context, dm *DataManager, idxItmType, tnt, grp, itemID string, oldFilterIds *[]string, newFilterIDs []string, useGrp bool) (err error) { + itmGrp := itemID + if useGrp { + itmGrp = utils.ConcatenatedKey(itemID, grp) } if oldFilterIds == nil { // nothing to remove so just create the new indexes - if err = addIndexFiltersItem(apiCtx, dm, idxItmType, tnt, itmCtx, newFilterIDs); err != nil { + if err = addIndexFiltersItem(ctx, dm, idxItmType, tnt, itmGrp, newFilterIDs); err != nil { return } - return addItemToFilterIndex(apiCtx, dm, idxItmType, tnt, ctx, itemID, newFilterIDs) + return addItemToFilterIndex(ctx, dm, idxItmType, tnt, grp, itemID, newFilterIDs) } if len(*oldFilterIds) == 0 && len(newFilterIDs) == 0 { // nothing to update return @@ -229,10 +229,10 @@ func updatedIndexes(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c if len(oldFilterIDs) != 0 || oldFltrs.Size() == 0 { // has some indexes to remove or // the old profile doesn't have filters but the new one has so remove the *none index - if err = removeIndexFiltersItem(apiCtx, dm, idxItmType, tnt, itmCtx, oldFilterIDs); err != nil { + if err = removeIndexFiltersItem(ctx, dm, idxItmType, tnt, itmGrp, oldFilterIDs); err != nil { return } - if err = removeItemFromFilterIndex(apiCtx, dm, idxItmType, tnt, ctx, itemID, oldFilterIDs); err != nil { + if err = removeItemFromFilterIndex(ctx, dm, idxItmType, tnt, grp, itemID, oldFilterIDs); err != nil { return } } @@ -240,10 +240,10 @@ func updatedIndexes(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c if len(newFilterIDs) != 0 || newFltrs.Size() == 0 { // has some indexes to add or // the old profile has filters but the new one does not so add the *none index - if err = addIndexFiltersItem(apiCtx, dm, idxItmType, tnt, itmCtx, newFilterIDs); err != nil { + if err = addIndexFiltersItem(ctx, dm, idxItmType, tnt, itmGrp, newFilterIDs); err != nil { return } - if err = addItemToFilterIndex(apiCtx, dm, idxItmType, tnt, ctx, itemID, newFilterIDs); err != nil { + if err = addItemToFilterIndex(ctx, dm, idxItmType, tnt, grp, itemID, newFilterIDs); err != nil { return } } @@ -251,27 +251,27 @@ func updatedIndexes(apiCtx *context.Context, dm *DataManager, idxItmType, tnt, c } // splitFilterIndex splits the cache key so it can be used to recache the indexes -func splitFilterIndex(tntCtxIdxKey string) (tntCtx, idxKey string, err error) { - splt := utils.SplitConcatenatedKey(tntCtxIdxKey) // tntCtx:filterType:fieldName:fieldVal +func splitFilterIndex(tntGrpIdxKey string) (tntGrp, idxKey string, err error) { + splt := utils.SplitConcatenatedKey(tntGrpIdxKey) // tntCtx:filterType:fieldName:fieldVal lsplt := len(splt) if lsplt < 4 { - err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", tntCtxIdxKey) + err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", tntGrpIdxKey) return } - tntCtx = utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems + tntGrp = utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems idxKey = utils.ConcatenatedKey(splt[lsplt-3:]...) return } // ComputeIndexes gets the indexes from the DB and ensure that the items are indexed // getFilters returns a list of filters IDs for the given profile id -func ComputeIndexes(cntxt *context.Context, dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string, - transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error), newFltr *Filter) (indexes utils.StringSet, err error) { +func ComputeIndexes(ctx *context.Context, dm *DataManager, tnt, grp, idxItmType string, IDs *[]string, + transactionID string, getFilters func(tnt, id, grp string) (*[]string, error), newFltr *Filter) (indexes utils.StringSet, err error) { indexes = make(utils.StringSet) var profilesIDs []string if IDs == nil { // get all items var ids []string - if ids, err = dm.DataDB().GetKeysForPrefix(cntxt, utils.CacheIndexesToPrefix[idxItmType]); err != nil { + if ids, err = dm.DataDB().GetKeysForPrefix(ctx, utils.CacheIndexesToPrefix[idxItmType]); err != nil { return } for _, id := range ids { @@ -280,34 +280,34 @@ func ComputeIndexes(cntxt *context.Context, dm *DataManager, tnt, ctx, idxItmTyp } else { profilesIDs = *IDs } - tntCtx := tnt - if ctx != utils.EmptyString { - tntCtx = utils.ConcatenatedKey(tnt, ctx) + tntGrp := tnt + if grp != utils.EmptyString { + tntGrp = utils.ConcatenatedKey(tnt, grp) } // early lock to be sure that until we do not write back the indexes // another goroutine can't create new indexes refID := guardian.Guardian.GuardIDs(utils.EmptyString, - config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx) + config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntGrp) defer guardian.Guardian.UnguardIDs(refID) for _, id := range profilesIDs { var filterIDs *[]string - if filterIDs, err = getFilters(tnt, id, ctx); err != nil { + if filterIDs, err = getFilters(tnt, id, grp); err != nil { return } if filterIDs == nil { continue } var index map[string]utils.StringSet - if index, err = newFilterIndex(cntxt, dm, idxItmType, - tnt, ctx, id, *filterIDs, newFltr); err != nil { + if index, err = newFilterIndex(ctx, dm, idxItmType, + tnt, grp, id, *filterIDs, newFltr); err != nil { return } // ensure that the item is in the index set for key, idx := range index { idx.Add(id) - indexes.Add(utils.ConcatenatedKey(tntCtx, key)) + indexes.Add(utils.ConcatenatedKey(tntGrp, key)) } - if err = dm.SetIndexes(cntxt, idxItmType, tntCtx, index, cacheCommit(transactionID), transactionID); err != nil { + if err = dm.SetIndexes(ctx, idxItmType, tntGrp, index, cacheCommit(transactionID), transactionID); err != nil { return } } @@ -320,11 +320,11 @@ func addIndexFiltersItem(ctx *context.Context, dm *DataManager, idxItmType, tnt, if strings.HasPrefix(ID, utils.Meta) { // skip inline continue } - tntCtx := utils.ConcatenatedKey(tnt, ID) + tntGrp := utils.ConcatenatedKey(tnt, ID) refID := guardian.Guardian.GuardIDs(utils.EmptyString, - config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntCtx) + config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntGrp) var indexes map[string]utils.StringSet - if indexes, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntCtx, + if indexes, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntGrp, idxItmType, true, false); err != nil { if err != utils.ErrNotFound { guardian.Guardian.UnguardIDs(refID) @@ -337,12 +337,12 @@ func addIndexFiltersItem(ctx *context.Context, dm *DataManager, idxItmType, tnt, } indexes[idxItmType].Add(itemID) for indxKey := range indexes { - if err = Cache.Remove(ctx, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil { + if err = Cache.Remove(ctx, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntGrp, indxKey), true, utils.NonTransactional); err != nil { guardian.Guardian.UnguardIDs(refID) return } } - if err = dm.SetIndexes(ctx, utils.CacheReverseFilterIndexes, tntCtx, indexes, true, utils.NonTransactional); err != nil { + if err = dm.SetIndexes(ctx, utils.CacheReverseFilterIndexes, tntGrp, indexes, true, utils.NonTransactional); err != nil { guardian.Guardian.UnguardIDs(refID) return } @@ -357,11 +357,11 @@ func removeIndexFiltersItem(ctx *context.Context, dm *DataManager, idxItmType, t if strings.HasPrefix(ID, utils.Meta) { // skip inline continue } - tntCtx := utils.ConcatenatedKey(tnt, ID) + tntGrp := utils.ConcatenatedKey(tnt, ID) refID := guardian.Guardian.GuardIDs(utils.EmptyString, - config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntCtx) + config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntGrp) var indexes map[string]utils.StringSet - if indexes, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntCtx, + if indexes, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntGrp, idxItmType, true, false); err != nil { guardian.Guardian.UnguardIDs(refID) if err != utils.ErrNotFound { @@ -373,12 +373,12 @@ func removeIndexFiltersItem(ctx *context.Context, dm *DataManager, idxItmType, t indexes[idxItmType].Remove(itemID) for indxKey := range indexes { - if err = Cache.Remove(ctx, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntCtx, indxKey), true, utils.NonTransactional); err != nil { + if err = Cache.Remove(ctx, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntGrp, indxKey), true, utils.NonTransactional); err != nil { guardian.Guardian.UnguardIDs(refID) return } } - if err = dm.SetIndexes(ctx, utils.CacheReverseFilterIndexes, tntCtx, indexes, true, utils.NonTransactional); err != nil { + if err = dm.SetIndexes(ctx, utils.CacheReverseFilterIndexes, tntGrp, indexes, true, utils.NonTransactional); err != nil { guardian.Guardian.UnguardIDs(refID) return } @@ -390,7 +390,7 @@ func removeIndexFiltersItem(ctx *context.Context, dm *DataManager, idxItmType, t // UpdateFilterIndex will update the indexes for the new Filter // we do not care what is added // exported for the migrator -func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt *Filter) (err error) { +func UpdateFilterIndex(ctx *context.Context, dm *DataManager, oldFlt, newFlt *Filter) (err error) { if oldFlt == nil { // no filter before so no index to update return // nothing to update } @@ -471,7 +471,7 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt defer guardian.Guardian.UnguardIDs(refID) var rcvIndx map[string]utils.StringSet // get all reverse indexes from DB - if rcvIndx, err = dm.GetIndexes(apiCtx, utils.CacheReverseFilterIndexes, tntID, + if rcvIndx, err = dm.GetIndexes(ctx, utils.CacheReverseFilterIndexes, tntID, utils.EmptyString, true, false); err != nil { if err != utils.ErrNotFound { return @@ -485,14 +485,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt for idxItmType, indx := range rcvIndx { switch idxItmType { case utils.CacheThresholdFilterIndexes: - if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter + if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter removeIndexKeys, indx); err != nil { return } idxSlice := indx.AsSlice() - if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items - &idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) { - th, e := dm.GetThresholdProfile(apiCtx, tnt, id, true, false, utils.NonTransactional) + if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items + &idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) { + th, e := dm.GetThresholdProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -505,14 +505,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt return utils.APIErrorHandler(err) } case utils.CacheStatFilterIndexes: - if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter + if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter removeIndexKeys, indx); err != nil { return } idxSlice := indx.AsSlice() - if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items - &idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) { - sq, e := dm.GetStatQueueProfile(apiCtx, tnt, id, true, false, utils.NonTransactional) + if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items + &idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) { + sq, e := dm.GetStatQueueProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -525,14 +525,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt return utils.APIErrorHandler(err) } case utils.CacheResourceFilterIndexes: - if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter + if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter removeIndexKeys, indx); err != nil { return } idxSlice := indx.AsSlice() - if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items - &idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) { - rs, e := dm.GetResourceProfile(apiCtx, tnt, id, true, false, utils.NonTransactional) + if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items + &idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) { + rs, e := dm.GetResourceProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -545,14 +545,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt return utils.APIErrorHandler(err) } case utils.CacheRouteFilterIndexes: - if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter + if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter removeIndexKeys, indx); err != nil { return } idxSlice := indx.AsSlice() - if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items - &idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) { - rt, e := dm.GetRouteProfile(apiCtx, tnt, id, true, false, utils.NonTransactional) + if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items + &idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) { + rt, e := dm.GetRouteProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -565,14 +565,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt return utils.APIErrorHandler(err) } case utils.CacheChargerFilterIndexes: - if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter + if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter removeIndexKeys, indx); err != nil { return } idxSlice := indx.AsSlice() - if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items - &idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) { - ch, e := dm.GetChargerProfile(apiCtx, tnt, id, true, false, utils.NonTransactional) + if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items + &idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) { + ch, e := dm.GetChargerProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -585,13 +585,13 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt return utils.APIErrorHandler(err) } case utils.CacheAccountsFilterIndexes: - if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter + if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter removeIndexKeys, indx); err != nil { return } idxSlice := indx.AsSlice() - if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items - &idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) { + if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items + &idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) { ap, e := dm.GetAccount(context.Background(), tnt, id) if e != nil { return nil, e @@ -605,14 +605,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt return utils.APIErrorHandler(err) } case utils.CacheActionProfilesFilterIndexes: - if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter + if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter removeIndexKeys, indx); err != nil { return } idxSlice := indx.AsSlice() - if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items - &idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) { - acp, e := dm.GetActionProfile(apiCtx, tnt, id, true, false, utils.NonTransactional) + if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items + &idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) { + acp, e := dm.GetActionProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -625,14 +625,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt return utils.APIErrorHandler(err) } case utils.CacheRateProfilesFilterIndexes: - if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter + if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, //remove the indexes for the filter removeIndexKeys, indx); err != nil { return } idxSlice := indx.AsSlice() - if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items - &idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) { - rp, e := dm.GetRateProfile(apiCtx, tnt, id, true, false, utils.NonTransactional) + if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items + &idxSlice, utils.NonTransactional, func(tnt, id, grp string) (*[]string, error) { + rp, e := dm.GetRateProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -658,13 +658,13 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt itemIDs[idSplit[1]].Add(idSplit[0]) } for rpID, ids := range itemIDs { - tntCtx := utils.ConcatenatedKey(newFlt.Tenant, rpID) - if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, tntCtx, + tntGrp := utils.ConcatenatedKey(newFlt.Tenant, rpID) + if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, tntGrp, removeIndexKeys, ids); err != nil { return } var rp *utils.RateProfile - if rp, err = dm.GetRateProfile(apiCtx, newFlt.Tenant, rpID, true, false, utils.NonTransactional); err != nil { + if rp, err = dm.GetRateProfile(ctx, newFlt.Tenant, rpID, true, false, utils.NonTransactional); err != nil { return } for itemID := range ids { @@ -673,9 +673,9 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt return utils.ErrNotFound } refID := guardian.Guardian.GuardIDs(utils.EmptyString, - config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntCtx) + config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tntGrp) var updIdx map[string]utils.StringSet - if updIdx, err = newFilterIndex(apiCtx, dm, idxItmType, + if updIdx, err = newFilterIndex(ctx, dm, idxItmType, newFlt.Tenant, rpID, itemID, rate.FilterIDs, newFlt); err != nil { guardian.Guardian.UnguardIDs(refID) return @@ -683,7 +683,7 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt for _, idx := range updIdx { idx.Add(itemID) } - if err = dm.SetIndexes(apiCtx, idxItmType, tntCtx, + if err = dm.SetIndexes(ctx, idxItmType, tntGrp, updIdx, false, utils.NonTransactional); err != nil { guardian.Guardian.UnguardIDs(refID) return @@ -692,14 +692,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt } } case utils.CacheAttributeFilterIndexes: - if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter + if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter removeIndexKeys, indx); err != nil { return } idxSlice := indx.AsSlice() - if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items + if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items &idxSlice, utils.NonTransactional, func(tnt, id, _ string) (*[]string, error) { - ap, e := dm.GetAttributeProfile(apiCtx, tnt, id, true, false, utils.NonTransactional) + ap, e := dm.GetAttributeProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -712,14 +712,14 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt return utils.APIErrorHandler(err) } case utils.CacheDispatcherFilterIndexes: - if err = removeFilterIndexesForFilter(apiCtx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter + if err = removeFilterIndexesForFilter(ctx, dm, idxItmType, newFlt.Tenant, // remove the indexes for the filter removeIndexKeys, indx); err != nil { return } idxSlice := indx.AsSlice() - if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items + if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items &idxSlice, utils.NonTransactional, func(tnt, id, _ string) (*[]string, error) { - dp, e := dm.GetDispatcherProfile(apiCtx, tnt, id, true, false, utils.NonTransactional) + dp, e := dm.GetDispatcherProfile(ctx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } diff --git a/engine/libindex_health.go b/engine/libindex_health.go new file mode 100644 index 000000000..9f414d328 --- /dev/null +++ b/engine/libindex_health.go @@ -0,0 +1,688 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "fmt" + "strings" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" +) + +type IndexHealthArgs struct { + IndexCacheLimit int + IndexCacheTTL time.Duration + IndexCacheStaticTTL bool + + ObjectCacheLimit int + ObjectCacheTTL time.Duration + ObjectCacheStaticTTL bool + + FilterCacheLimit int + FilterCacheTTL time.Duration + FilterCacheStaticTTL bool +} + +type FilterIHReply struct { + MissingObjects []string // list of object that are referenced in indexes but are not found in the dataDB + MissingIndexes map[string][]string // list of missing indexes for each object (the map has the key as the objectID and a list of indexes) + BrokenIndexes map[string][]string // list of broken indexes for each object (the map has the key as the index and a list of objects) + MissingFilters map[string][]string // list of broken references (the map has the key as the filterID and a list of objectIDs) +} + +type ReverseFilterIHReply struct { + MissingObjects []string // list of object that are referenced in indexes but are not found in the dataDB + MissingReverseIndexes map[string][]string // list of missing indexes for each object (the map has the key as the objectID and a list of indexes) + BrokenReverseIndexes map[string][]string // list of broken indexes for each object (the map has the key as the objectID and a list of indexes) + MissingFilters map[string][]string // list of broken references (the map has the key as the filterID and a list of objectIDs) +} + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// getFilters returns the filtreIDs and context(if any) for that object +func getFilters(ctx *context.Context, dm *DataManager, indxType, tnt, id string) (filterIDs []string, err error) { // add contexts + switch indxType { + case utils.CacheResourceFilterIndexes: + var rs *ResourceProfile + if rs, err = dm.GetResourceProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil { + return + } + filterIDs = rs.FilterIDs + case utils.CacheStatFilterIndexes: + var st *StatQueueProfile + if st, err = dm.GetStatQueueProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil { + return + } + filterIDs = st.FilterIDs + case utils.CacheThresholdFilterIndexes: + var th *ThresholdProfile + if th, err = dm.GetThresholdProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil { + return + } + filterIDs = th.FilterIDs + case utils.CacheRouteFilterIndexes: + var rt *RouteProfile + if rt, err = dm.GetRouteProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil { + return + } + filterIDs = rt.FilterIDs + case utils.CacheAttributeFilterIndexes: + var at *AttributeProfile + if at, err = dm.GetAttributeProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil { + return + } + filterIDs = at.FilterIDs + case utils.CacheChargerFilterIndexes: + var ch *ChargerProfile + if ch, err = dm.GetChargerProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil { + return + } + filterIDs = ch.FilterIDs + case utils.CacheDispatcherFilterIndexes: + var ds *DispatcherProfile + if ds, err = dm.GetDispatcherProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil { + return + } + filterIDs = ds.FilterIDs + + case utils.CacheRateProfilesFilterIndexes: + var rp *utils.RateProfile + if rp, err = dm.GetRateProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil { + return + } + filterIDs = rp.FilterIDs + case utils.CacheActionProfilesFilterIndexes: + var ap *ActionProfile + if ap, err = dm.GetActionProfile(ctx, tnt, id, true, false, utils.NonTransactional); err != nil { + return + } + filterIDs = ap.FilterIDs + case utils.CacheAccountsFilterIndexes: + var ac *utils.Account + if ac, err = dm.GetAccount(ctx, tnt, id); err != nil { + return + } + filterIDs = ac.FilterIDs + default: + return nil, fmt.Errorf("unsuported index type:<%q>", indxType) + } + if filterIDs == nil { // nil means ErrNotFound in cache + filterIDs = make([]string, 0) + } + return +} + +// getIHObjFromCache returns all information that is needed from the mentioned object +// uses an extra cache(controled by the API) to optimize data management +func getIHObjFromCache(ctx *context.Context, dm *DataManager, objCache *ltcache.Cache, indxType, tnt, id string) (filtIDs []string, err error) { + cacheKey := utils.ConcatenatedKey(tnt, id) + if objVal, ok := objCache.Get(cacheKey); ok { + if objVal == nil { + return nil, utils.ErrNotFound + } + return objVal.([]string), nil + } + if filtIDs, err = getFilters(ctx, dm, indxType, tnt, id); err != nil { + if err == utils.ErrNotFound { + objCache.Set(cacheKey, nil, nil) + } + return + } + objCache.Set(cacheKey, filtIDs, nil) + return +} + +// getIHFltrFromCache returns the Filter +// uses an extra cache(controled by the API) to optimize data management +func getIHFltrFromCache(ctx *context.Context, dm *DataManager, fltrCache *ltcache.Cache, tnt, id string) (fltr *Filter, err error) { + cacheKey := utils.ConcatenatedKey(tnt, id) + if fltrVal, ok := fltrCache.Get(cacheKey); ok { + if fltrVal == nil { + return nil, utils.ErrNotFound + } + return fltrVal.(*Filter), nil + } + if fltr, err = dm.GetFilter(ctx, tnt, id, + true, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + fltrCache.Set(cacheKey, nil, nil) + } + return + } + fltrCache.Set(cacheKey, fltr, nil) + return +} + +// getIHFltrIdxFromCache returns the Filter index +// uses an extra cache(controled by the API) to optimize data management +func getIHFltrIdxFromCache(ctx *context.Context, dm *DataManager, fltrIdxCache *ltcache.Cache, idxItmType, tntGrp, idxKey string) (idx utils.StringSet, err error) { + cacheKey := utils.ConcatenatedKey(tntGrp, idxKey) + if fltrVal, ok := fltrIdxCache.Get(cacheKey); ok { + if fltrVal == nil { + return nil, utils.ErrNotFound + } + return fltrVal.(utils.StringSet), nil + } + var indexes map[string]utils.StringSet + if indexes, err = dm.GetIndexes(ctx, idxItmType, tntGrp, idxKey, true, false); err != nil { + if err == utils.ErrNotFound { + fltrIdxCache.Set(cacheKey, nil, nil) + } + return + } + idx = indexes[idxKey] + fltrIdxCache.Set(cacheKey, idx, nil) + return +} + +// getFilterAsIndexSet will parse the rules of filter and add them to the index map +func getFilterAsIndexSet(ctx *context.Context, dm *DataManager, fltrIdxCache *ltcache.Cache, idxItmType, tntGrp string, fltr *Filter) (indexes map[string]utils.StringSet, err error) { + indexes = make(map[string]utils.StringSet) + for _, flt := range fltr.Rules { + if !FilterIndexTypes.Has(flt.Type) || + IsDynamicDPPath(flt.Element) { + continue + } + isDyn := strings.HasPrefix(flt.Element, utils.DynamicDataPrefix) + for _, fldVal := range flt.Values { + if IsDynamicDPPath(fldVal) { + continue + } + var idxKey string + if isDyn { + if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) { // do not index if both the element and the value is dynamic + continue + } + idxKey = utils.ConcatenatedKey(flt.Type, flt.Element[1:], fldVal) + } else if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) { + idxKey = utils.ConcatenatedKey(flt.Type, fldVal[1:], flt.Element) + } else { + // do not index not dynamic filters + continue + } + var rcvIndx utils.StringSet + // only read from cache in case if we do not find the index to not cache the negative response + if rcvIndx, err = getIHFltrIdxFromCache(ctx, dm, fltrIdxCache, idxItmType, tntGrp, idxKey); err != nil { + if err != utils.ErrNotFound { + return + } + err = nil + rcvIndx = make(utils.StringSet) // create an empty index if is not found in DB in case we add them later + } + indexes[idxKey] = rcvIndx + } + } + return indexes, nil +} + +// updateFilterIHMisingIndx updates the reply with the missing indexes for a specific object( obj->filter->index relation) +func updateFilterIHMisingIndx(ctx *context.Context, dm *DataManager, fltrCache, fltrIdxCache *ltcache.Cache, filterIDs []string, indxType, tnt, tntGrp, itmID string, rply *FilterIHReply) (_ *FilterIHReply, err error) { + if len(filterIDs) == 0 { + idxKey := utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny) + var rcvIndx utils.StringSet + if rcvIndx, err = getIHFltrIdxFromCache(ctx, dm, nil, indxType, tntGrp, idxKey); err != nil { + if err != utils.ErrNotFound { + return + } + key := utils.ConcatenatedKey(tntGrp, idxKey) + rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID) + } else if !rcvIndx.Has(itmID) { + key := utils.ConcatenatedKey(tntGrp, idxKey) + rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID) + } + + return rply, nil + } + for _, fltrID := range filterIDs { + var fltr *Filter + if fltr, err = getIHFltrFromCache(ctx, dm, fltrCache, tnt, fltrID); err != nil { + if err != utils.ErrNotFound { + return + } + fltrID = utils.ConcatenatedKey(tnt, fltrID) + rply.MissingFilters[fltrID] = append(rply.MissingFilters[fltrID], itmID) + continue + } + var indexes map[string]utils.StringSet + if indexes, err = getFilterAsIndexSet(ctx, dm, fltrIdxCache, indxType, tntGrp, fltr); err != nil { + return + } + for key, idx := range indexes { + if !idx.Has(itmID) { + key = utils.ConcatenatedKey(tntGrp, key) + rply.MissingIndexes[key] = append(rply.MissingIndexes[key], itmID) + } + } + } + return rply, nil +} + +// GetFltrIdxHealth returns the missing indexes for all objects +func GetFltrIdxHealth(ctx *context.Context, dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcache.Cache, indxType string) (rply *FilterIHReply, err error) { + // check the objects ( obj->filter->index relation) + rply = &FilterIHReply{ + MissingIndexes: make(map[string][]string), + BrokenIndexes: make(map[string][]string), + MissingFilters: make(map[string][]string), + } + objPrfx := utils.CacheIndexesToPrefix[indxType] + var ids []string + if ids, err = dm.dataDB.GetKeysForPrefix(ctx, objPrfx); err != nil { + return + } + for _, id := range ids { + id = strings.TrimPrefix(id, objPrfx) + tntID := utils.NewTenantID(id) + var filterIDs []string + if filterIDs, err = getIHObjFromCache(ctx, dm, objCache, indxType, tntID.Tenant, tntID.ID); err != nil { + return + } + + if rply, err = updateFilterIHMisingIndx(ctx, dm, fltrCache, fltrIdxCache, filterIDs, indxType, tntID.Tenant, tntID.Tenant, tntID.ID, rply); err != nil { + return + } + } + + // check the indexes( index->filter->obj relation) + idxPrfx := utils.CacheInstanceToPrefix[indxType] + var indexKeys []string + if indexKeys, err = dm.dataDB.GetKeysForPrefix(ctx, idxPrfx); err != nil { + return + } + for _, dataID := range indexKeys { + dataID = strings.TrimPrefix(dataID, idxPrfx) + + splt := utils.SplitConcatenatedKey(dataID) // tntGrp:filterType:fieldName:fieldVal + lsplt := len(splt) + if lsplt < 4 { + err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", dataID) + return + } + tnt := utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems + idxKey := utils.ConcatenatedKey(splt[lsplt-3:]...) + + var idx utils.StringSet + if idx, err = getIHFltrIdxFromCache(ctx, dm, fltrIdxCache, indxType, tnt, idxKey); err != nil { + return + } + for itmID := range idx { + var filterIDs []string + if filterIDs, err = getIHObjFromCache(ctx, dm, objCache, indxType, tnt, itmID); err != nil { + if err != utils.ErrNotFound { + return + } + rply.MissingObjects = append(rply.MissingObjects, utils.ConcatenatedKey(tnt, itmID)) + err = nil + continue + } + if len(filterIDs) == 0 { + if utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny) != idxKey { + rply.BrokenIndexes[dataID] = append(rply.BrokenIndexes[dataID], itmID) + } + continue + } + var hasIndx bool + for _, fltrID := range filterIDs { + var fltr *Filter + if fltr, err = getIHFltrFromCache(ctx, dm, fltrCache, tnt, fltrID); err != nil { + if err != utils.ErrNotFound { + return + } + err = nil // should be already logged when we parsed all the objects + continue + } + var indexes map[string]utils.StringSet + if indexes, err = getFilterAsIndexSet(ctx, dm, fltrIdxCache, indxType, tnt, fltr); err != nil { + return + } + idx, has := indexes[idxKey] + if hasIndx = has && idx.Has(itmID); hasIndx { + break + } + } + if !hasIndx { + key := utils.ConcatenatedKey(tnt, idxKey) + rply.BrokenIndexes[key] = append(rply.BrokenIndexes[key], itmID) + } + } + } + + return +} + +// GetRevFltrIdxHealth returns the missing reverse indexes for all objects +func getRevFltrIdxHealthFromObj(ctx *context.Context, dm *DataManager, fltrCache, revFltrIdxCache, objCache *ltcache.Cache, indxType string) (rply *ReverseFilterIHReply, err error) { + // check the objects ( obj->filter->index relation) + rply = &ReverseFilterIHReply{ + MissingReverseIndexes: make(map[string][]string), + BrokenReverseIndexes: make(map[string][]string), + MissingFilters: make(map[string][]string), + } + objPrfx := utils.CacheIndexesToPrefix[indxType] + var ids []string + if ids, err = dm.dataDB.GetKeysForPrefix(ctx, objPrfx); err != nil { + return + } + for _, id := range ids { + id = strings.TrimPrefix(id, objPrfx) + tntID := utils.NewTenantID(id) + var filterIDs []string + if filterIDs, err = getIHObjFromCache(ctx, dm, objCache, indxType, tntID.Tenant, tntID.ID); err != nil { + return + } + + for _, fltrID := range filterIDs { + if strings.HasPrefix(fltrID, utils.Meta) { + continue + } + if _, err = getIHFltrFromCache(ctx, dm, fltrCache, tntID.Tenant, fltrID); err != nil { + if err != utils.ErrNotFound { + return + } + err = nil + key := utils.ConcatenatedKey(tntID.Tenant, fltrID) + rply.MissingFilters[key] = append(rply.MissingFilters[key], tntID.ID) + continue + } + var revIdx utils.StringSet + if revIdx, err = getIHFltrIdxFromCache(ctx, dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntID.Tenant, fltrID), indxType); err != nil { + if err == utils.ErrNotFound { + rply.MissingReverseIndexes[id] = append(rply.MissingReverseIndexes[id], fltrID) + err = nil + continue + } + return + } + if !revIdx.Has(tntID.ID) { + rply.MissingReverseIndexes[id] = append(rply.MissingReverseIndexes[id], fltrID) + } + } + } + return +} + +func getRevFltrIdxHealthFromReverse(ctx *context.Context, dm *DataManager, fltrCache, revFltrIdxCache *ltcache.Cache, objCaches map[string]*ltcache.Cache, rply map[string]*ReverseFilterIHReply) (_ map[string]*ReverseFilterIHReply, err error) { + var revIndexKeys []string + if revIndexKeys, err = dm.dataDB.GetKeysForPrefix(ctx, utils.FilterIndexPrfx); err != nil { + return + } + for _, revIdxKey := range revIndexKeys { + revIdxKey = strings.TrimPrefix(revIdxKey, utils.FilterIndexPrfx) + revIDxSplit := strings.SplitN(revIdxKey, utils.ConcatenatedKeySep, 3) + tnt, fltrID, indxType := revIDxSplit[0], revIDxSplit[1], revIDxSplit[2] + revIdxKey = utils.ConcatenatedKey(tnt, fltrID) + objCache := objCaches[indxType] + + if _, has := rply[indxType]; !has { + rply[indxType] = &ReverseFilterIHReply{ + MissingReverseIndexes: make(map[string][]string), + MissingFilters: make(map[string][]string), + BrokenReverseIndexes: make(map[string][]string), + } + } + + var revIdx utils.StringSet + if revIdx, err = getIHFltrIdxFromCache(ctx, dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, revIdxKey, indxType); err != nil { + return + } + for id := range revIdx { + var filterIDs []string + if indxType == utils.CacheRateFilterIndexes { + spl := strings.SplitN(id, utils.ConcatenatedKeySep, 2) + rateID := spl[0] + rprfID := spl[1] + var rates map[string]*utils.Rate + if rates, err = getRatesFromCache(ctx, dm, objCache, tnt, rprfID); err != nil { + if err != utils.ErrNotFound { + return + } + rply[indxType].MissingObjects = append(rply[indxType].MissingObjects, utils.ConcatenatedKey(tnt, id)) + err = nil + continue + } + if rate, has := rates[rateID]; !has { + rply[indxType].MissingObjects = append(rply[indxType].MissingObjects, utils.ConcatenatedKey(tnt, id)) + continue + } else { + filterIDs = rate.FilterIDs + } + + } else if filterIDs, err = getIHObjFromCache(ctx, dm, objCache, indxType, tnt, id); err != nil { + if err == utils.ErrNotFound { + rply[indxType].MissingObjects = append(rply[indxType].MissingObjects, utils.ConcatenatedKey(tnt, id)) + err = nil + continue + } + return + } + if !utils.IsSliceMember(filterIDs, fltrID) { + key := utils.ConcatenatedKey(tnt, id) + rply[indxType].BrokenReverseIndexes[key] = append(rply[indxType].BrokenReverseIndexes[key], fltrID) + } + + } + } + return rply, nil +} + +func GetRevFltrIdxHealth(ctx *context.Context, dm *DataManager, fltrCache, revFltrIdxCache *ltcache.Cache, objCaches map[string]*ltcache.Cache) (rply map[string]*ReverseFilterIHReply, err error) { + rply = make(map[string]*ReverseFilterIHReply) + for indxType := range utils.CacheIndexesToPrefix { + if indxType == utils.CacheReverseFilterIndexes { + continue + } + if rply[indxType], err = getRevFltrIdxHealthFromObj(ctx, dm, fltrCache, revFltrIdxCache, objCaches[indxType], indxType); err != nil { + return + } + } + if rply[utils.CacheRateFilterIndexes], err = getRevFltrIdxHealthFromRateRates(ctx, dm, fltrCache, revFltrIdxCache, objCaches[utils.CacheRateFilterIndexes]); err != nil { + return + } + rply, err = getRevFltrIdxHealthFromReverse(ctx, dm, fltrCache, revFltrIdxCache, objCaches, rply) + for k, v := range rply { // should be a safe for (even on rply==nil) + if len(v.MissingFilters) == 0 && + len(v.MissingObjects) == 0 && + len(v.BrokenReverseIndexes) == 0 && + len(v.MissingReverseIndexes) == 0 { + delete(rply, k) + } + } + return +} + +// getRatesFromCache returns all rates from rateprofile +// uses an extra cache(controled by the API) to optimize data management +func getRatesFromCache(ctx *context.Context, dm *DataManager, objCache *ltcache.Cache, tnt, rprfID string) (_ map[string]*utils.Rate, err error) { + cacheKey := utils.ConcatenatedKey(tnt, rprfID) + if objVal, ok := objCache.Get(cacheKey); ok { + if objVal == nil { + return nil, utils.ErrNotFound + } + rprf := objVal.(*utils.RateProfile) + return rprf.Rates, nil + } + + var rprf *utils.RateProfile + if rprf, err = dm.GetRateProfile(ctx, tnt, rprfID, true, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + objCache.Set(cacheKey, nil, nil) + } + return + } + objCache.Set(cacheKey, rprf, nil) + return rprf.Rates, nil +} + +// GetFltrIdxHealth returns the missing indexes for all objects +func GetFltrIdxHealthForRateRates(ctx *context.Context, dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcache.Cache) (rply *FilterIHReply, err error) { + // check the objects ( obj->filter->index relation) + rply = &FilterIHReply{ + MissingIndexes: make(map[string][]string), + BrokenIndexes: make(map[string][]string), + MissingFilters: make(map[string][]string), + } + var ids []string + if ids, err = dm.dataDB.GetKeysForPrefix(ctx, utils.RateProfilePrefix); err != nil { + return + } + for _, id := range ids { + id = strings.TrimPrefix(id, utils.RateProfilePrefix) + tntID := utils.NewTenantID(id) + + var rates map[string]*utils.Rate + if rates, err = getRatesFromCache(ctx, dm, objCache, tntID.Tenant, tntID.ID); err != nil { + return + } + for rtID, rate := range rates { + if rply, err = updateFilterIHMisingIndx(ctx, dm, fltrCache, fltrIdxCache, rate.FilterIDs, utils.CacheRateFilterIndexes, tntID.Tenant, utils.ConcatenatedKey(tntID.Tenant, tntID.ID), rtID, rply); err != nil { + return + } + } + } + + // check the indexes( index->filter->obj relation) + var indexKeys []string + if indexKeys, err = dm.dataDB.GetKeysForPrefix(ctx, utils.RateFilterIndexPrfx); err != nil { + return + } + for _, dataID := range indexKeys { + dataID = strings.TrimPrefix(dataID, utils.RateFilterIndexPrfx) + + splt := utils.SplitConcatenatedKey(dataID) // tntGrp:filterType:fieldName:fieldVal + lsplt := len(splt) + if lsplt < 4 { + err = fmt.Errorf("WRONG_IDX_KEY_FORMAT<%s>", dataID) + return + } + tnt := splt[0] + rpID := splt[1] + tntGrp := utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems + idxKey := utils.ConcatenatedKey(splt[lsplt-3:]...) + + var idx utils.StringSet + if idx, err = getIHFltrIdxFromCache(ctx, dm, fltrIdxCache, utils.CacheRateFilterIndexes, tntGrp, idxKey); err != nil { + return + } + for itmID := range idx { + var rates map[string]*utils.Rate + if rates, err = getRatesFromCache(ctx, dm, objCache, tnt, rpID); err != nil { + if err != utils.ErrNotFound { + return + } + rply.MissingObjects = append(rply.MissingObjects, utils.ConcatenatedKey(tntGrp, itmID)) + err = nil + continue + } + var filterIDs []string + if rate, has := rates[itmID]; !has { + rply.MissingObjects = append(rply.MissingObjects, utils.ConcatenatedKey(tntGrp, itmID)) + continue + } else { + filterIDs = rate.FilterIDs + } + if len(filterIDs) == 0 { + if utils.ConcatenatedKey(utils.MetaNone, utils.MetaAny, utils.MetaAny) != idxKey { + rply.BrokenIndexes[dataID] = append(rply.BrokenIndexes[dataID], itmID) + } + continue + } + var hasIndx bool + for _, fltrID := range filterIDs { + var fltr *Filter + if fltr, err = getIHFltrFromCache(ctx, dm, fltrCache, tnt, fltrID); err != nil { + if err != utils.ErrNotFound { + return + } + err = nil // should be already logged when we parsed all the objects + continue + } + var indexes map[string]utils.StringSet + if indexes, err = getFilterAsIndexSet(ctx, dm, fltrIdxCache, utils.CacheRateFilterIndexes, tntGrp, fltr); err != nil { + return + } + idx, has := indexes[idxKey] + if hasIndx = has && idx.Has(itmID); hasIndx { + break + } + } + if !hasIndx { + key := utils.ConcatenatedKey(tnt, idxKey) + rply.BrokenIndexes[key] = append(rply.BrokenIndexes[key], itmID) + } + } + } + + return +} + +func getRevFltrIdxHealthFromRateRates(ctx *context.Context, dm *DataManager, fltrCache, revFltrIdxCache, objCache *ltcache.Cache) (rply *ReverseFilterIHReply, err error) { + // check the objects ( obj->filter->index relation) + rply = &ReverseFilterIHReply{ + MissingReverseIndexes: make(map[string][]string), + BrokenReverseIndexes: make(map[string][]string), + MissingFilters: make(map[string][]string), + } + var ids []string + if ids, err = dm.dataDB.GetKeysForPrefix(ctx, utils.RateProfilePrefix); err != nil { + return + } + for _, id := range ids { + id = strings.TrimPrefix(id, utils.RateProfilePrefix) + tntID := utils.NewTenantID(id) + var rates map[string]*utils.Rate + if rates, err = getRatesFromCache(ctx, dm, objCache, tntID.Tenant, tntID.ID); err != nil { + return + } + for rtID, rate := range rates { + itmID := utils.ConcatenatedKey(rtID, tntID.ID) + itmIDWithTnt := utils.ConcatenatedKey(id, rtID) + + for _, fltrID := range rate.FilterIDs { + if strings.HasPrefix(fltrID, utils.Meta) { + continue + } + if _, err = getIHFltrFromCache(ctx, dm, fltrCache, tntID.Tenant, fltrID); err != nil { + if err != utils.ErrNotFound { + return + } + err = nil + key := utils.ConcatenatedKey(tntID.Tenant, fltrID) + rply.MissingFilters[key] = append(rply.MissingFilters[key], itmID) + continue + } + var revIdx utils.StringSet + if revIdx, err = getIHFltrIdxFromCache(ctx, dm, revFltrIdxCache, utils.CacheReverseFilterIndexes, utils.ConcatenatedKey(tntID.Tenant, fltrID), utils.CacheRateFilterIndexes); err != nil { + if err == utils.ErrNotFound { + rply.MissingReverseIndexes[itmIDWithTnt] = append(rply.MissingReverseIndexes[itmIDWithTnt], fltrID) + err = nil + continue + } + return + } + if !revIdx.Has(itmID) { + rply.MissingReverseIndexes[itmIDWithTnt] = append(rply.MissingReverseIndexes[itmIDWithTnt], fltrID) + } + } + } + } + return +} diff --git a/engine/z_libindex_health_test.go b/engine/z_libindex_health_test.go new file mode 100644 index 000000000..3e290cfba --- /dev/null +++ b/engine/z_libindex_health_test.go @@ -0,0 +1,148 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "reflect" + "testing" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" +) + +func TestHealthFilter(t *testing.T) { + Cache.Clear(nil) + cfg := config.NewDefaultCGRConfig() + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, cfg.CacheCfg(), nil) + + if err := dm.SetAttributeProfile(context.Background(), &AttributeProfile{ + Tenant: "cgrates.org", + ID: "ATTR1", + FilterIDs: []string{"*string:~*req.Account:1001", "Fltr1"}, + }, false); err != nil { + t.Fatal(err) + } + + if err := dm.SetIndexes(context.Background(), utils.CacheAttributeFilterIndexes, "cgrates.org", + map[string]utils.StringSet{"*string:*req.Account:1002": {"ATTR1": {}, "ATTR2": {}}}, + true, utils.NonTransactional); err != nil { + t.Fatal(err) + } + exp := &FilterIHReply{ + MissingIndexes: map[string][]string{ + "cgrates.org:*string:*req.Account:1001": {"ATTR1"}, + }, + BrokenIndexes: map[string][]string{ + "cgrates.org:*string:*req.Account:1002": {"ATTR1"}, + }, + MissingFilters: map[string][]string{ + "cgrates.org:Fltr1": {"ATTR1"}, + }, + MissingObjects: []string{"cgrates.org:ATTR2"}, + } + + if rply, err := GetFltrIdxHealth(context.Background(), dm, + ltcache.NewCache(-1, 0, false, nil), + ltcache.NewCache(-1, 0, false, nil), + ltcache.NewCache(-1, 0, false, nil), + utils.CacheAttributeFilterIndexes); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(exp, rply) { + t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rply)) + } +} + +func TestHealthReverseFilter(t *testing.T) { + Cache.Clear(nil) + cfg := config.NewDefaultCGRConfig() + db := NewInternalDB(nil, nil, true) + dm := NewDataManager(db, cfg.CacheCfg(), nil) + + if err := dm.SetAttributeProfile(context.Background(), &AttributeProfile{ + Tenant: "cgrates.org", + ID: "ATTR1", + FilterIDs: []string{"*string:~*req.Account:1001", "Fltr1", "Fltr3"}, + }, false); err != nil { + t.Fatal(err) + } + + if err := dm.SetFilter(context.Background(), &Filter{ + Tenant: "cgrates.org", + ID: "Fltr3", + }, false); err != nil { + t.Fatal(err) + } + + if err := dm.SetIndexes(context.Background(), utils.CacheReverseFilterIndexes, "cgrates.org:Fltr2", + map[string]utils.StringSet{utils.CacheAttributeFilterIndexes: {"ATTR1": {}, "ATTR2": {}}}, + true, utils.NonTransactional); err != nil { + t.Fatal(err) + } + + if err := dm.SetRateProfile(context.Background(), &utils.RateProfile{ + Tenant: "cgrates.org", + ID: "RP1", + Rates: map[string]*utils.Rate{ + "RT1": { + ID: "RT1", + FilterIDs: []string{"Fltr3"}, + }, + }, + }, false); err != nil { + t.Fatal(err) + } + + exp := map[string]*ReverseFilterIHReply{ + utils.CacheAttributeFilterIndexes: { + MissingReverseIndexes: map[string][]string{ + "cgrates.org:ATTR1": {"Fltr3"}, + }, + MissingFilters: map[string][]string{ + "cgrates.org:Fltr1": {"ATTR1"}, + }, + BrokenReverseIndexes: map[string][]string{ + "cgrates.org:ATTR1": {"Fltr2"}, + }, + MissingObjects: []string{"cgrates.org:ATTR2"}, + }, + utils.CacheRateFilterIndexes: { + MissingReverseIndexes: map[string][]string{ + "cgrates.org:RP1:RT1": {"Fltr3"}, + }, + BrokenReverseIndexes: make(map[string][]string), + MissingFilters: make(map[string][]string), + }, + } + objCaches := make(map[string]*ltcache.Cache) + for indxType := range utils.CacheIndexesToPrefix { + objCaches[indxType] = ltcache.NewCache(-1, 0, false, nil) + } + objCaches[utils.CacheRateFilterIndexes] = ltcache.NewCache(-1, 0, false, nil) + if rply, err := GetRevFltrIdxHealth(context.Background(), dm, + ltcache.NewCache(-1, 0, false, nil), + ltcache.NewCache(-1, 0, false, nil), + objCaches); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(exp, rply) { + t.Errorf("Expecting: %+v,\n received: %+v", utils.ToJSON(exp), utils.ToJSON(rply)) + } +} diff --git a/rates/rates.go b/rates/rates.go index ee71ed95f..5d232eced 100644 --- a/rates/rates.go +++ b/rates/rates.go @@ -129,18 +129,24 @@ func (rS *RateS) rateProfileCostForEvent(ctx *context.Context, rtPfl *utils.Rate utils.MetaOpts: args.APIOpts, } var rtIDs utils.StringSet - if rtIDs, err = engine.MatchingItemIDsForEvent(ctx, - evNm, - rS.cfg.RateSCfg().RateStringIndexedFields, - rS.cfg.RateSCfg().RatePrefixIndexedFields, - rS.cfg.RateSCfg().RateSuffixIndexedFields, - rS.dm, - utils.CacheRateFilterIndexes, - utils.ConcatenatedKey(args.CGREvent.Tenant, rtPfl.ID), - rS.cfg.RateSCfg().RateIndexedSelects, - rS.cfg.RateSCfg().RateNestedFields, - ); err != nil { - return + if rS.cfg.RateSCfg().RateIndexedSelects { + if rtIDs, err = engine.MatchingItemIDsForEvent(ctx, + evNm, + rS.cfg.RateSCfg().RateStringIndexedFields, + rS.cfg.RateSCfg().RatePrefixIndexedFields, + rS.cfg.RateSCfg().RateSuffixIndexedFields, + rS.dm, + utils.CacheRateFilterIndexes, + utils.ConcatenatedKey(args.CGREvent.Tenant, rtPfl.ID), + rS.cfg.RateSCfg().RateIndexedSelects, + rS.cfg.RateSCfg().RateNestedFields, + ); err != nil { + return + } + } else { + for id := range rtPfl.Rates { + rtIDs.Add(id) + } } aRates := make([]*utils.Rate, 0, len(rtIDs)) for rtID := range rtIDs { diff --git a/utils/consts.go b/utils/consts.go index fd0f46906..f554584fe 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -108,7 +108,6 @@ var ( CacheChargerFilterIndexes: ChargerProfilePrefix, CacheDispatcherFilterIndexes: DispatcherProfilePrefix, CacheRateProfilesFilterIndexes: RateProfilePrefix, - CacheRateFilterIndexes: RatePrefix, CacheActionProfilesFilterIndexes: ActionProfilePrefix, CacheAccountsFilterIndexes: AccountPrefix, CacheReverseFilterIndexes: FilterPrefix, @@ -340,7 +339,6 @@ const ( VersionPrefix = "ver_" StatQueueProfilePrefix = "sqp_" RouteProfilePrefix = "rpp_" - RatePrefix = "rep_" AttributeProfilePrefix = "alp_" ChargerProfilePrefix = "cpp_" DispatcherProfilePrefix = "dpp_"