Update filters without computing when overwriting + chargers case

This commit is contained in:
adi
2022-12-08 17:23:56 +02:00
committed by Dan Christian Bogos
parent 4ec5e420b0
commit 5914213c96
8 changed files with 261 additions and 90 deletions

View File

@@ -893,7 +893,7 @@ func testApierSetRatingProfile(t *testing.T) {
if err := rater.Call(utils.CacheSv1GetCacheStats, new(utils.AttrCacheIDsWithArgDispatcher), &rcvStats); err != nil {
t.Error("Got error on CacheSv1.GetCacheStats: ", err.Error())
} else if !reflect.DeepEqual(expectedStats, rcvStats) {
t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v, received: %+v", expectedStats, rcvStats)
t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v, received: %+v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats))
}
// Calling the second time should not raise EXISTS
if err := rater.Call(utils.APIerSv1SetRatingProfile, rpf, &reply); err != nil {
@@ -1914,7 +1914,7 @@ func testApierGetCacheStats2(t *testing.T) {
if err != nil {
t.Error("Got error on CacheSv1.GetCacheStats: ", err.Error())
} else if !reflect.DeepEqual(expectedStats, rcvStats) {
t.Errorf("Calling CacheSv1.GetCacheStats expected: %v, received: %v", expectedStats, rcvStats)
t.Errorf("Calling CacheSv1.GetCacheStats expected: %v, received: %v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats))
}
}

View File

@@ -249,7 +249,7 @@ func (api *APIerSv1) ComputeFilterIndexes(args utils.ArgsComputeFilterIndexes, r
//ChargerProfile Indexes
var cppIndexes *engine.FilterIndexer
if args.ChargerS {
cppIndexes, err = api.computeChargerIndexes(args.Tenant, nil, transactionID)
cppIndexes, err = engine.ComputeChargerIndexes(api.DataManager, args.Tenant, nil, transactionID)
if err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
@@ -340,7 +340,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args utils.ArgsComputeFilterIndexIDs,
return utils.APIErrorHandler(err)
}
//ChargerProfile Indexes
cppIndexes, err := api.computeChargerIndexes(args.Tenant, &args.ChargerIDs, transactionID)
cppIndexes, err := engine.ComputeChargerIndexes(api.DataManager, args.Tenant, &args.ChargerIDs, transactionID)
if err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
@@ -845,82 +845,6 @@ func (api *APIerSv1) computeSupplierIndexes(tenant string, sppIDs *[]string,
return sppIndexers, nil
}
func (api *APIerSv1) computeChargerIndexes(tenant string, cppIDs *[]string,
transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var chargerIDs []string
var cppIndexes *engine.FilterIndexer
if cppIDs == nil {
ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.ChargerProfilePrefix)
if err != nil {
return nil, err
}
for _, id := range ids {
chargerIDs = append(chargerIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
}
// this will be on ComputeIndexes that contains empty indexes
cppIndexes = engine.NewFilterIndexer(api.DataManager, utils.ChargerProfilePrefix, tenant)
} else {
// this will be on ComputeIndexesIDs that contains the old indexes from the next getter
var oldIDx map[string]utils.StringMap
if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.ChargerProfilePrefix],
tenant, utils.EmptyString, nil); err != nil || oldIDx == nil {
cppIndexes = engine.NewFilterIndexer(api.DataManager, utils.ChargerProfilePrefix, tenant)
} else {
cppIndexes = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.ChargerProfilePrefix, tenant, oldIDx)
}
chargerIDs = *cppIDs
transactionID = utils.NonTransactional
}
for _, id := range chargerIDs {
cpp, err := api.DataManager.GetChargerProfile(tenant, id, true, false, utils.NonTransactional)
if err != nil {
return nil, err
}
fltrIDs := make([]string, len(cpp.FilterIDs))
for i, fltrID := range cpp.FilterIDs {
fltrIDs[i] = fltrID
}
if len(fltrIDs) == 0 {
fltrIDs = []string{utils.META_NONE}
}
for _, fltrID := range fltrIDs {
var fltr *engine.Filter
if fltrID == utils.META_NONE {
fltr = &engine.Filter{
Tenant: cpp.Tenant,
ID: cpp.ID,
Rules: []*engine.FilterRule{
{
Type: utils.META_NONE,
Element: utils.META_ANY,
Values: []string{utils.META_ANY},
},
},
}
} else if fltr, err = engine.GetFilter(api.DataManager, cpp.Tenant, fltrID,
true, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for charger: %+v",
fltrID, cpp)
}
return nil, err
}
cppIndexes.IndexTPFilter(engine.FilterToTPFilter(fltr), cpp.ID)
}
}
if transactionID == utils.NonTransactional {
if err := cppIndexes.StoreIndexes(true, transactionID); err != nil {
return nil, err
}
return nil, nil
} else {
if err := cppIndexes.StoreIndexes(false, transactionID); err != nil {
return nil, err
}
}
return cppIndexes, nil
}
func (api *APIerSv1) computeDispatcherIndexes(tenant, context string, dspIDs *[]string,
transactionID string) (filterIndexer *engine.FilterIndexer, err error) {
var dispatcherIDs []string

View File

@@ -311,6 +311,10 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) {
Items: 0,
Groups: 0,
},
utils.CacheReverseFilterIndexes: {
Items: 0,
Groups: 0,
},
}
if err := precacheRPC.Call(utils.CacheSv1GetCacheStats, args, &reply); err != nil {
t.Error(err.Error())

View File

@@ -636,10 +636,11 @@ func (dm *DataManager) SetFilter(fltr *Filter) (err error) {
return
}
if oldFltr != nil {
/* if err = UpdateFilterIndexes(dm, oldFltr, fltr); err != nil {
return
} */
/*
if err = UpdateFilterIndexes(dm, fltr.Tenant, oldFltr, fltr); err != nil {
return
}
*/
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,

102
engine/filter_indexes.go Normal file
View File

@@ -0,0 +1,102 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"strings"
"github.com/cgrates/cgrates/utils"
)
func ComputeChargerIndexes(dm *DataManager, tenant string, cppIDs *[]string,
transactionID string) (cppIndexes *FilterIndexer, err error) {
var chargerIDs []string
//var cppIndexes *FilterIndexer
if cppIDs == nil {
ids, err := dm.DataDB().GetKeysForPrefix(utils.ChargerProfilePrefix)
if err != nil {
return nil, err
}
for _, id := range ids {
chargerIDs = append(chargerIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
}
// this will be on ComputeIndexes that contains empty indexes
cppIndexes = NewFilterIndexer(dm, utils.ChargerProfilePrefix, tenant)
} else {
// this will be on ComputeIndexesIDs that contains the old indexes from the next getter
var oldIDx map[string]utils.StringMap
if oldIDx, err = dm.GetFilterIndexes(utils.PrefixToIndexCache[utils.ChargerProfilePrefix],
tenant, utils.EmptyString, nil); err != nil || oldIDx == nil {
cppIndexes = NewFilterIndexer(dm, utils.ChargerProfilePrefix, tenant)
} else {
cppIndexes = NewFilterIndexerWithIndexes(dm, utils.ChargerProfilePrefix, tenant, oldIDx)
}
chargerIDs = *cppIDs
transactionID = utils.NonTransactional
}
for _, id := range chargerIDs {
cpp, err := dm.GetChargerProfile(tenant, id, true, false, utils.NonTransactional)
if err != nil {
return nil, err
}
fltrIDs := make([]string, len(cpp.FilterIDs))
for i, fltrID := range cpp.FilterIDs {
fltrIDs[i] = fltrID
}
if len(fltrIDs) == 0 {
fltrIDs = []string{utils.META_NONE}
}
for _, fltrID := range fltrIDs {
var fltr *Filter
if fltrID == utils.META_NONE {
fltr = &Filter{
Tenant: cpp.Tenant,
ID: cpp.ID,
Rules: []*FilterRule{
{
Type: utils.META_NONE,
Element: utils.META_ANY,
Values: []string{utils.META_ANY},
},
},
}
} else if fltr, err = GetFilter(dm, cpp.Tenant, fltrID,
true, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for charger: %+v",
fltrID, cpp)
}
return nil, err
}
cppIndexes.IndexTPFilter(FilterToTPFilter(fltr), cpp.ID)
}
}
if transactionID == utils.NonTransactional {
if err := cppIndexes.StoreIndexes(true, transactionID); err != nil {
return nil, err
}
return nil, nil
} else {
if err := cppIndexes.StoreIndexes(false, transactionID); err != nil {
return nil, err
}
}
return cppIndexes, nil
}

View File

@@ -26,13 +26,149 @@ import (
"github.com/cgrates/cgrates/utils"
)
var (
filterIndexType = utils.StringMap{
utils.MetaString: true,
utils.MetaPrefix: true}
)
// UpdateFilterIndexes will update the indexes for every reference of a filter that exists in a profile.
// Every profile that contains the filters from oldFltr will be updated with the new values for newFltr.
// oldFltr and newFltr has the same tenant and ID.
func UpdateFilterIndexes(dm *DataManager, oldFltr *Filter, newFltr *Filter) (err error) {
func UpdateFilterIndexes(dm *DataManager, tnt string, oldFltr *Filter, newFltr *Filter) (err error) {
// we need the rules in roder to compute the new indexes
oldRules := utils.StringMap{} // rules from old filters
newRules := utils.StringMap{} // rules for new filters
removeRules := utils.StringMap{} // the difference from newRules and oldRules that are needed to be removed
// first we check the rules from the new filter
for _, fltr := range newFltr.Rules {
if !filterIndexType.HasKey(fltr.Type) { // we do not consider other types, just *string and *prefix
continue
}
isElementDyn := strings.HasPrefix(fltr.Element, utils.DynamicDataPrefix)
for _, value := range fltr.Values {
var idxKey string
if isElementDyn {
// we do not index element:value both of dynamic types e.g. *string:~*req.Account:~*req.Destination
if strings.HasPrefix(value, utils.DynamicDataPrefix) {
continue
}
idxKey = utils.ConcatenatedKey(fltr.Type, fltr.Element, value)
} else if strings.HasPrefix(value, utils.DynamicDataPrefix) {
idxKey = utils.ConcatenatedKey(fltr.Type, value, fltr.Element)
} else {
continue // none of the element or value are dynamic, so we do not index
}
newRules[idxKey] = true
}
}
// now we check the rules from the old filter
// compare the new rules and old rules and check what rules needs to be removed
for _, fltr := range oldFltr.Rules {
if !filterIndexType.HasKey(fltr.Type) { // we do not consider other types, just *string and *prefix
continue
}
isElementDyn := strings.HasPrefix(fltr.Element, utils.DynamicDataPrefix)
for _, value := range fltr.Values {
var idxKey string
if isElementDyn {
// we do not index element:value both of dynamic types e.g. *string:~*req.Account:~*req.Destination
if strings.HasPrefix(value, utils.DynamicDataPrefix) {
continue
}
idxKey = utils.ConcatenatedKey(fltr.Type, fltr.Element, value)
} else if strings.HasPrefix(value, utils.DynamicDataPrefix) {
idxKey = utils.ConcatenatedKey(fltr.Type, value, fltr.Element)
} else {
continue // none of the element or value are dynamic, so we do not index
}
if !newRules.HasKey(idxKey) {
removeRules[idxKey] = true
} else {
oldRules[idxKey] = true
}
}
}
needsRebuild := len(removeRules) != 0 // nothing to remove
if !needsRebuild { //check if we added something in remove rules by checking the difference betweend remove rules and old rules
for key := range newRules {
if needsRebuild = !oldRules.HasKey(key); needsRebuild {
break
}
}
if !needsRebuild {
return // nothing to change
}
}
tntFltrID := utils.ConcatenatedKey(newFltr.Tenant, newFltr.ID)
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntFltrID)
defer guardian.Guardian.UnguardIDs(refID)
var rcvIndexes map[string]utils.StringMap
// get all the reverse indexes for the specific filter from db
if rcvIndexes, err = dm.GetFilterIndexes(utils.PrefixToIndexCache[utils.ReverseFilterIndexes], tntFltrID,
utils.EmptyString, nil); err != nil {
if err != utils.ErrNotFound {
return //
}
err = nil // if the error is NOT_FOUND, it means that no indexes were found for this filter, so no need to update
return
}
removeIndexKeys := removeRules.Slice()
for idxItmType, index := range rcvIndexes {
switch idxItmType {
case utils.CacheChargerFilterIndexes:
// remove the indexes from this filter for this partition
if err = removeFilterIndexesForFilter(dm, idxItmType, tnt,
removeIndexKeys, index); err != nil {
return
}
// we removed the old reverse indexes, now we have to compute the new ones
chargerIDs := index.Slice()
if _, err = ComputeChargerIndexes(dm, newFltr.Tenant, &chargerIDs,
utils.NonTransactional); err != nil {
return err
}
}
}
return nil
}
// removeFilterIndexesForFilter removes the itemID for the index keys
// used to remove the old indexes when a filter is updated
func removeFilterIndexesForFilter(dm *DataManager, idxItmType, tnt string,
removeIndexKeys []string, itemIDs utils.StringMap) (err error) {
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tnt)
defer guardian.Guardian.UnguardIDs(refID)
for _, idxKey := range removeIndexKeys { // delete old filters indexes for this item
var remIndx map[string]utils.StringMap
if remIndx, err = dm.GetFilterIndexes(idxItmType, tnt,
utils.EmptyString, nil); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil
continue
}
for idx := range itemIDs {
delete(remIndx[idxKey], idx)
//remIndx[idxKey].Remove(idx)
}
fltrIndexer := NewFilterIndexer(dm, utils.CacheInstanceToPrefix[idxItmType], tnt)
fltrIndexer.indexes = remIndx
if err = fltrIndexer.StoreIndexes(true, utils.NonTransactional); err != nil {
return
}
}
return
}
// addReverseFilterIndexForFilter will add a reference for the filter in reverse filter indexes
func addReverseFilterIndexForFilter(dm *DataManager, idxItmType, ctx, tnt,
itemID string, filterIDs []string) (err error) {
@@ -80,11 +216,11 @@ func removeReverseFilterIndexForFilter(dm *DataManager, idxItmType, ctx, tnt, it
if strings.HasPrefix(fltrID, utils.Meta) { // we do not reverse for inline filters
continue
}
tntCtx := utils.ConcatenatedKey(tnt, fltrID)
tntFltrID := utils.ConcatenatedKey(tnt, fltrID)
refID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntCtx)
config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntFltrID)
var indexes map[string]utils.StringMap
if indexes, err = dm.GetFilterIndexes(utils.PrefixToIndexCache[utils.ReverseFilterIndexes], tntCtx,
if indexes, err = dm.GetFilterIndexes(utils.PrefixToIndexCache[utils.ReverseFilterIndexes], tntFltrID,
utils.EmptyString, nil); err != nil {
if err != utils.ErrNotFound {
guardian.Guardian.UnguardIDs(refID)

View File

@@ -505,6 +505,10 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
Items: 0,
Groups: 0,
},
utils.CacheReverseFilterIndexes: {
Items: 0,
Groups: 0,
},
utils.CacheDispatcherProfiles: {
Items: 0,
Groups: 0,

View File

@@ -93,7 +93,7 @@ var (
testFilterIndexesCasesStartEngine,
testFilterIndexesCasesRpcConn,
/* testFilterIndexesCasesSetFilters,
testFilterIndexesCasesSetFilters,
testFilterIndexesCasesSetAttributesWithFilters,
testFilterIndexesCasesGetIndexesAnyContext,
testFilterIndexesCasesGetIndexesSessionsContext,
@@ -102,7 +102,7 @@ var (
testFilterIndexesCasesOverwriteAttributes,
testFilterIndexesCasesComputeAttributesIndexes,
testFilterIndexesCasesGetIndexesAnyContextChanged,
testFilterIndexesCasesGetIndexesSessionsContextChanged, */
testFilterIndexesCasesGetIndexesSessionsContextChanged,
testFilterIndexesCasesSetIndexedFilter,
testFilterIndexesCasesSetChargerWithFltr,