From 9c3f9519ad03275162e8be5304d32ee63042e0d9 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Mon, 17 May 2021 17:00:56 +0300 Subject: [PATCH] Updated SetFilter Cache reload --- apier/v1/filters.go | 19 ++++++-- apier/v1/libapier.go | 100 ++++++++++++++++++++++++++++++++++++++ apier/v1/libapier_test.go | 76 +++++++++++++++++++++++++++++ 3 files changed, 192 insertions(+), 3 deletions(-) diff --git a/apier/v1/filters.go b/apier/v1/filters.go index 9c36d7a35..2c99d47e6 100644 --- a/apier/v1/filters.go +++ b/apier/v1/filters.go @@ -26,23 +26,36 @@ import ( ) //SetFilter add a new Filter -func (apierSv1 *APIerSv1) SetFilter(arg *engine.FilterWithAPIOpts, reply *string) error { +func (apierSv1 *APIerSv1) SetFilter(arg *engine.FilterWithAPIOpts, reply *string) (err error) { if missing := utils.MissingStructFields(arg.Filter, []string{utils.ID}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } if arg.Tenant == utils.EmptyString { arg.Tenant = apierSv1.Config.GeneralCfg().DefaultTenant } + var argC map[string][]string + tntID := arg.TenantID() + if fltr, err := apierSv1.DataManager.GetFilter(arg.Filter.Tenant, arg.Filter.ID, true, false, utils.NonTransactional); err != nil { + return utils.APIErrorHandler(err) + } else if argC, err = composeCacheArgsForFilter(apierSv1.DataManager, fltr, fltr.Tenant, tntID, map[string][]string{utils.FilterIDs: {tntID}}); err != nil { + return utils.APIErrorHandler(err) + } if err := apierSv1.DataManager.SetFilter(arg.Filter, true); err != nil { return utils.APIErrorHandler(err) } + + if argC, err = composeCacheArgsForFilter(apierSv1.DataManager, arg.Filter, arg.Filter.Tenant, tntID, argC); err != nil { + return utils.APIErrorHandler(err) + } //generate a loadID for CacheFilters and store it in database if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheFilters: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } //handle caching for Filter - if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant, utils.CacheFilters, - arg.TenantID(), nil, nil, arg.APIOpts); err != nil { + if err := callCacheForFilter(apierSv1.ConnMgr, apierSv1.Config.ApierCfg().CachesConns, + utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), + apierSv1.Config.GeneralCfg().DefaultCaching, + arg.Tenant, argC, arg.APIOpts); err != nil { return utils.APIErrorHandler(err) } *reply = utils.OK diff --git a/apier/v1/libapier.go b/apier/v1/libapier.go index c8e7691f5..f15e880fe 100644 --- a/apier/v1/libapier.go +++ b/apier/v1/libapier.go @@ -249,3 +249,103 @@ func (apierSv1 *APIerSv1) callCacheMultiple(cacheopt, tnt, cacheID string, itemI return apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().CachesConns, nil, method, args, &reply) } + +func composeCacheArgsForFilter(dm *engine.DataManager, fltr *engine.Filter, tnt, tntID string, args map[string][]string) (_ map[string][]string, err error) { + indxIDs := make([]string, 0, len(fltr.Rules)) + for _, flt := range fltr.Rules { + if !engine.FilterIndexTypes.Has(flt.Type) { + continue + } + isDyn := strings.HasPrefix(flt.Element, utils.DynamicDataPrefix) + for _, fldVal := range flt.Values { + if isDyn { + if !strings.HasPrefix(fldVal, utils.DynamicDataPrefix) { + indxIDs = append(indxIDs, utils.ConcatenatedKey(flt.Type, flt.Element[1:], fldVal)) + } + } else if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) { + indxIDs = append(indxIDs, utils.ConcatenatedKey(flt.Type, fldVal[1:], flt.Element)) + } + } + } + if len(indxIDs) == 0 { // no index + return args, nil + } + + var rcvIndx map[string]utils.StringSet + if rcvIndx, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, tntID, + utils.EmptyString, true, true); err != nil && err != utils.ErrNotFound { // error when geting the revers + return + } + if err == utils.ErrNotFound || len(rcvIndx) == 0 { // no reverse index for this filter + return args, nil + } + + for k, ids := range rcvIndx { + switch k { + default: + if cField, has := utils.CacheInstanceToArg[k]; has { + for _, indx := range indxIDs { + args[cField] = append(args[cField], utils.ConcatenatedKey(tnt, indx)) + } + } + case utils.CacheAttributeFilterIndexes: // this is slow + for attrID := range ids { + var attr *engine.AttributeProfile + if attr, err = dm.GetAttributeProfile(tnt, attrID, true, true, utils.NonTransactional); err != nil { + return + } + for _, ctx := range attr.Contexts { + for _, indx := range indxIDs { + args[utils.AttributeFilterIndexIDs] = append(args[utils.AttributeFilterIndexIDs], utils.ConcatenatedKey(tnt, ctx, indx)) + } + } + } + case utils.CacheDispatcherFilterIndexes: // this is slow + for attrID := range ids { + var attr *engine.DispatcherProfile + if attr, err = dm.GetDispatcherProfile(tnt, attrID, true, true, utils.NonTransactional); err != nil { + return + } + for _, ctx := range attr.Subsystems { + for _, indx := range indxIDs { + args[utils.DispatcherFilterIndexIDs] = append(args[utils.DispatcherFilterIndexIDs], utils.ConcatenatedKey(tnt, ctx, indx)) + } + } + } + } + } + return args, nil +} + +// callCacheForFilter will call the cache for filter +func callCacheForFilter(connMgr *engine.ConnManager, cacheConns []string, cacheopt, dftCache, tnt string, + argC map[string][]string, opts map[string]interface{}) (err error) { + var reply, method string + var args interface{} = utils.AttrReloadCacheWithAPIOpts{ + Tenant: tnt, + ArgsCache: argC, + APIOpts: opts, + } + switch utils.FirstNonEmpty(cacheopt, dftCache) { + case utils.MetaNone: + return + case utils.MetaReload: + method = utils.CacheSv1ReloadCache + case utils.MetaLoad: + method = utils.CacheSv1LoadCache + case utils.MetaRemove: + method = utils.CacheSv1RemoveItems + case utils.MetaClear: + cacheIDs := make([]string, 0, len(argC)) + for k := range argC { + cacheIDs = append(cacheIDs, utils.ArgCacheToInstance[k]) + } + method = utils.CacheSv1Clear + args = &utils.AttrCacheIDsWithAPIOpts{ + Tenant: tnt, + CacheIDs: cacheIDs, + APIOpts: opts, + } + } + return connMgr.Call(cacheConns, nil, method, args, &reply) +} diff --git a/apier/v1/libapier_test.go b/apier/v1/libapier_test.go index 8450e4487..736290f5f 100644 --- a/apier/v1/libapier_test.go +++ b/apier/v1/libapier_test.go @@ -213,3 +213,79 @@ func TestCallCache(t *testing.T) { t.Fatal("Expected call cache to not be called") } } + +func TestCallCacheForFilter(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true), cfg.CacheCfg(), nil) + tnt := "cgrates.org" + flt := &engine.Filter{ + Tenant: tnt, + ID: "FLTR1", + Rules: []*engine.FilterRule{{ + Type: utils.MetaString, + Element: "~*req.Account", + Values: []string{"1001"}, + }}, + } + if err := flt.Compile(); err != nil { + t.Fatal(err) + } + if err := dm.SetFilter(flt, true); err != nil { + t.Fatal(err) + } + th := &engine.ThresholdProfile{ + Tenant: tnt, + ID: "TH1", + FilterIDs: []string{flt.ID}, + } + if err := dm.SetThresholdProfile(th, true); err != nil { + t.Fatal(err) + } + attr := &engine.AttributeProfile{ + Tenant: tnt, + ID: "Attr1", + Contexts: []string{utils.MetaAny}, + FilterIDs: []string{flt.ID}, + } + if err := dm.SetAttributeProfile(attr, true); err != nil { + t.Fatal(err) + } + + exp := map[string][]string{ + utils.FilterIDs: {"cgrates.org:FLTR1"}, + utils.AttributeFilterIndexIDs: {"cgrates.org:*any:*string:*req.Account:1001"}, + utils.ThresholdFilterIndexIDs: {"cgrates.org:*string:*req.Account:1001"}, + } + rpl, err := composeCacheArgsForFilter(dm, flt, tnt, flt.TenantID(), map[string][]string{utils.FilterIDs: {"cgrates.org:FLTR1"}}) + if err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(rpl, exp) { + t.Errorf("Expected %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(rpl)) + } + flt = &engine.Filter{ + Tenant: tnt, + ID: "FLTR1", + Rules: []*engine.FilterRule{{ + Type: utils.MetaString, + Element: "~*req.Account", + Values: []string{"1002"}, + }}, + } + if err := flt.Compile(); err != nil { + t.Fatal(err) + } + if err := dm.SetFilter(flt, true); err != nil { + t.Fatal(err) + } + exp = map[string][]string{ + utils.FilterIDs: {"cgrates.org:FLTR1"}, + utils.AttributeFilterIndexIDs: {"cgrates.org:*any:*string:*req.Account:1001", "cgrates.org:*any:*string:*req.Account:1002"}, + utils.ThresholdFilterIndexIDs: {"cgrates.org:*string:*req.Account:1001", "cgrates.org:*string:*req.Account:1002"}, + } + rpl, err = composeCacheArgsForFilter(dm, flt, tnt, flt.TenantID(), rpl) + if err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(rpl, exp) { + t.Errorf("Expected %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(rpl)) + } +}