Updated SetFilter Cache reload

This commit is contained in:
Trial97
2021-05-17 17:00:56 +03:00
committed by Dan Christian Bogos
parent 6e75c2f9fd
commit 9c3f9519ad
3 changed files with 192 additions and 3 deletions

View File

@@ -26,23 +26,36 @@ import (
)
//SetFilter add a new Filter
func (apierSv1 *APIerSv1) SetFilter(arg *engine.FilterWithAPIOpts, reply *string) error {
func (apierSv1 *APIerSv1) SetFilter(arg *engine.FilterWithAPIOpts, reply *string) (err error) {
if missing := utils.MissingStructFields(arg.Filter, []string{utils.ID}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
if arg.Tenant == utils.EmptyString {
arg.Tenant = apierSv1.Config.GeneralCfg().DefaultTenant
}
var argC map[string][]string
tntID := arg.TenantID()
if fltr, err := apierSv1.DataManager.GetFilter(arg.Filter.Tenant, arg.Filter.ID, true, false, utils.NonTransactional); err != nil {
return utils.APIErrorHandler(err)
} else if argC, err = composeCacheArgsForFilter(apierSv1.DataManager, fltr, fltr.Tenant, tntID, map[string][]string{utils.FilterIDs: {tntID}}); err != nil {
return utils.APIErrorHandler(err)
}
if err := apierSv1.DataManager.SetFilter(arg.Filter, true); err != nil {
return utils.APIErrorHandler(err)
}
if argC, err = composeCacheArgsForFilter(apierSv1.DataManager, arg.Filter, arg.Filter.Tenant, tntID, argC); err != nil {
return utils.APIErrorHandler(err)
}
//generate a loadID for CacheFilters and store it in database
if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheFilters: time.Now().UnixNano()}); err != nil {
return utils.APIErrorHandler(err)
}
//handle caching for Filter
if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant, utils.CacheFilters,
arg.TenantID(), nil, nil, arg.APIOpts); err != nil {
if err := callCacheForFilter(apierSv1.ConnMgr, apierSv1.Config.ApierCfg().CachesConns,
utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]),
apierSv1.Config.GeneralCfg().DefaultCaching,
arg.Tenant, argC, arg.APIOpts); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK

View File

@@ -249,3 +249,103 @@ func (apierSv1 *APIerSv1) callCacheMultiple(cacheopt, tnt, cacheID string, itemI
return apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().CachesConns, nil,
method, args, &reply)
}
func composeCacheArgsForFilter(dm *engine.DataManager, fltr *engine.Filter, tnt, tntID string, args map[string][]string) (_ map[string][]string, err error) {
indxIDs := make([]string, 0, len(fltr.Rules))
for _, flt := range fltr.Rules {
if !engine.FilterIndexTypes.Has(flt.Type) {
continue
}
isDyn := strings.HasPrefix(flt.Element, utils.DynamicDataPrefix)
for _, fldVal := range flt.Values {
if isDyn {
if !strings.HasPrefix(fldVal, utils.DynamicDataPrefix) {
indxIDs = append(indxIDs, utils.ConcatenatedKey(flt.Type, flt.Element[1:], fldVal))
}
} else if strings.HasPrefix(fldVal, utils.DynamicDataPrefix) {
indxIDs = append(indxIDs, utils.ConcatenatedKey(flt.Type, fldVal[1:], flt.Element))
}
}
}
if len(indxIDs) == 0 { // no index
return args, nil
}
var rcvIndx map[string]utils.StringSet
if rcvIndx, err = dm.GetIndexes(utils.CacheReverseFilterIndexes, tntID,
utils.EmptyString, true, true); err != nil && err != utils.ErrNotFound { // error when geting the revers
return
}
if err == utils.ErrNotFound || len(rcvIndx) == 0 { // no reverse index for this filter
return args, nil
}
for k, ids := range rcvIndx {
switch k {
default:
if cField, has := utils.CacheInstanceToArg[k]; has {
for _, indx := range indxIDs {
args[cField] = append(args[cField], utils.ConcatenatedKey(tnt, indx))
}
}
case utils.CacheAttributeFilterIndexes: // this is slow
for attrID := range ids {
var attr *engine.AttributeProfile
if attr, err = dm.GetAttributeProfile(tnt, attrID, true, true, utils.NonTransactional); err != nil {
return
}
for _, ctx := range attr.Contexts {
for _, indx := range indxIDs {
args[utils.AttributeFilterIndexIDs] = append(args[utils.AttributeFilterIndexIDs], utils.ConcatenatedKey(tnt, ctx, indx))
}
}
}
case utils.CacheDispatcherFilterIndexes: // this is slow
for attrID := range ids {
var attr *engine.DispatcherProfile
if attr, err = dm.GetDispatcherProfile(tnt, attrID, true, true, utils.NonTransactional); err != nil {
return
}
for _, ctx := range attr.Subsystems {
for _, indx := range indxIDs {
args[utils.DispatcherFilterIndexIDs] = append(args[utils.DispatcherFilterIndexIDs], utils.ConcatenatedKey(tnt, ctx, indx))
}
}
}
}
}
return args, nil
}
// callCacheForFilter will call the cache for filter
func callCacheForFilter(connMgr *engine.ConnManager, cacheConns []string, cacheopt, dftCache, tnt string,
argC map[string][]string, opts map[string]interface{}) (err error) {
var reply, method string
var args interface{} = utils.AttrReloadCacheWithAPIOpts{
Tenant: tnt,
ArgsCache: argC,
APIOpts: opts,
}
switch utils.FirstNonEmpty(cacheopt, dftCache) {
case utils.MetaNone:
return
case utils.MetaReload:
method = utils.CacheSv1ReloadCache
case utils.MetaLoad:
method = utils.CacheSv1LoadCache
case utils.MetaRemove:
method = utils.CacheSv1RemoveItems
case utils.MetaClear:
cacheIDs := make([]string, 0, len(argC))
for k := range argC {
cacheIDs = append(cacheIDs, utils.ArgCacheToInstance[k])
}
method = utils.CacheSv1Clear
args = &utils.AttrCacheIDsWithAPIOpts{
Tenant: tnt,
CacheIDs: cacheIDs,
APIOpts: opts,
}
}
return connMgr.Call(cacheConns, nil, method, args, &reply)
}

View File

@@ -213,3 +213,79 @@ func TestCallCache(t *testing.T) {
t.Fatal("Expected call cache to not be called")
}
}
func TestCallCacheForFilter(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true), cfg.CacheCfg(), nil)
tnt := "cgrates.org"
flt := &engine.Filter{
Tenant: tnt,
ID: "FLTR1",
Rules: []*engine.FilterRule{{
Type: utils.MetaString,
Element: "~*req.Account",
Values: []string{"1001"},
}},
}
if err := flt.Compile(); err != nil {
t.Fatal(err)
}
if err := dm.SetFilter(flt, true); err != nil {
t.Fatal(err)
}
th := &engine.ThresholdProfile{
Tenant: tnt,
ID: "TH1",
FilterIDs: []string{flt.ID},
}
if err := dm.SetThresholdProfile(th, true); err != nil {
t.Fatal(err)
}
attr := &engine.AttributeProfile{
Tenant: tnt,
ID: "Attr1",
Contexts: []string{utils.MetaAny},
FilterIDs: []string{flt.ID},
}
if err := dm.SetAttributeProfile(attr, true); err != nil {
t.Fatal(err)
}
exp := map[string][]string{
utils.FilterIDs: {"cgrates.org:FLTR1"},
utils.AttributeFilterIndexIDs: {"cgrates.org:*any:*string:*req.Account:1001"},
utils.ThresholdFilterIndexIDs: {"cgrates.org:*string:*req.Account:1001"},
}
rpl, err := composeCacheArgsForFilter(dm, flt, tnt, flt.TenantID(), map[string][]string{utils.FilterIDs: {"cgrates.org:FLTR1"}})
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(rpl, exp) {
t.Errorf("Expected %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(rpl))
}
flt = &engine.Filter{
Tenant: tnt,
ID: "FLTR1",
Rules: []*engine.FilterRule{{
Type: utils.MetaString,
Element: "~*req.Account",
Values: []string{"1002"},
}},
}
if err := flt.Compile(); err != nil {
t.Fatal(err)
}
if err := dm.SetFilter(flt, true); err != nil {
t.Fatal(err)
}
exp = map[string][]string{
utils.FilterIDs: {"cgrates.org:FLTR1"},
utils.AttributeFilterIndexIDs: {"cgrates.org:*any:*string:*req.Account:1001", "cgrates.org:*any:*string:*req.Account:1002"},
utils.ThresholdFilterIndexIDs: {"cgrates.org:*string:*req.Account:1001", "cgrates.org:*string:*req.Account:1002"},
}
rpl, err = composeCacheArgsForFilter(dm, flt, tnt, flt.TenantID(), rpl)
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(rpl, exp) {
t.Errorf("Expected %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(rpl))
}
}