Updated RateProfileFilterIndexes

This commit is contained in:
Trial97
2020-06-12 10:05:48 +03:00
committed by Dan Christian Bogos
parent 8d94a2d8dc
commit 6b84c27b2d
5 changed files with 37 additions and 392 deletions

View File

@@ -2981,48 +2981,38 @@ func (dm *DataManager) SetRateProfile(rpp *RateProfile, withIndex bool) (err err
return err
}
if withIndex {
var oldFiltersIDs *[]string
if oldRpp != nil {
var needsRemove bool
for _, fltrID := range oldRpp.FilterIDs {
if !utils.IsSliceMember(rpp.FilterIDs, fltrID) {
needsRemove = true
oldFiltersIDs = &oldRpp.FilterIDs
}
if err := updatedIndexes(dm, utils.CacheRateProfilesFilterIndexes, rpp.Tenant,
utils.EmptyString, rpp.ID, oldFiltersIDs, rpp.FilterIDs); err != nil {
return err
}
// remove indexes for old rates
if oldRpp != nil {
for key, rate := range oldRpp.Rates {
if _, has := rpp.Rates[key]; has {
continue
}
}
if needsRemove {
if err = NewFilterIndexer(dm, utils.RateProfilePrefix,
rpp.Tenant).RemoveItemFromIndex(rpp.Tenant, rpp.ID, oldRpp.FilterIDs); err != nil {
if err = removeItemFromFilterIndex(dm, utils.CacheRateFilterIndexes,
rpp.Tenant, rpp.ID, key, rate.FilterIDs); err != nil {
return
}
}
}
if err = createAndIndex(utils.RateProfilePrefix, rpp.Tenant, utils.EmptyString,
rpp.ID, rpp.FilterIDs, dm); err != nil {
return
}
// create index for each rate
for key, rate := range rpp.Rates {
var oldRateFiltersIDs *[]string
if oldRpp != nil {
if oldRate, has := oldRpp.Rates[key]; has {
var needsRemove bool
for _, fltrID := range oldRate.FilterIDs {
if !utils.IsSliceMember(rate.FilterIDs, fltrID) {
needsRemove = true
}
}
if needsRemove {
if err = NewFilterIndexer(dm, utils.RatePrefix,
rpp.Tenant).RemoveItemFromIndex(rpp.Tenant, utils.ConcatenatedKey(rpp.ID, key), oldRate.FilterIDs); err != nil {
return
}
}
oldRateFiltersIDs = &oldRate.FilterIDs
}
}
// when we create the indexes for rates we use RateProfile ID as context
if err = createAndIndex(utils.RatePrefix, rpp.Tenant, rpp.ID,
key, rate.FilterIDs, dm); err != nil {
return
if err := updatedIndexes(dm, utils.CacheRateFilterIndexes, rpp.Tenant,
rpp.ID, key, oldRateFiltersIDs, rate.FilterIDs); err != nil {
return err
}
}
@@ -3061,8 +3051,14 @@ func (dm *DataManager) RemoveRateProfile(tenant, id string,
return utils.ErrNotFound
}
if withIndex {
if err = NewFilterIndexer(dm, utils.RateProfilePrefix,
tenant).RemoveItemFromIndex(tenant, id, oldRpp.FilterIDs); err != nil {
for key, rate := range oldRpp.Rates {
if err = removeItemFromFilterIndex(dm, utils.CacheRateFilterIndexes,
oldRpp.Tenant, oldRpp.ID, key, rate.FilterIDs); err != nil {
return
}
}
if err = removeItemFromFilterIndex(dm, utils.CacheRateProfilesFilterIndexes,
tenant, utils.EmptyString, id, oldRpp.FilterIDs); err != nil {
return
}
}

View File

@@ -1,357 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
func NewFilterIndexer(dm *DataManager, itemType, dbKeySuffix string) *FilterIndexer {
return &FilterIndexer{dm: dm, itemType: itemType, dbKeySuffix: dbKeySuffix,
indexes: make(map[string]utils.StringMap),
chngdIndxKeys: make(utils.StringMap)}
}
// FilterIndexer is a centralized indexer for all data sources using RequestFilter
// retrieves and stores it's data from/to dataDB
// not thread safe, meant to be used as logic within other code blocks
type FilterIndexer struct {
indexes map[string]utils.StringMap // map[fieldName:fieldValue]utils.StringMap[itemID]
dm *DataManager
itemType string
dbKeySuffix string // get/store the result from/into this key
chngdIndxKeys utils.StringMap // keep record of the changed fieldName:fieldValue pair so we can re-cache wisely
}
// IndexTPFilter parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys
func (rfi *FilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID string) {
for _, fltr := range tpFltr.Filters {
switch fltr.Type {
case utils.MetaString:
for _, fldVal := range fltr.Values {
concatKey := utils.ConcatenatedKey(fltr.Type, fltr.Element, fldVal)
if _, hasIt := rfi.indexes[concatKey]; !hasIt {
rfi.indexes[concatKey] = make(utils.StringMap)
}
rfi.indexes[concatKey][itemID] = true
rfi.chngdIndxKeys[concatKey] = true
}
case utils.MetaPrefix:
for _, fldVal := range fltr.Values {
concatKey := utils.ConcatenatedKey(fltr.Type, fltr.Element, fldVal)
if _, hasIt := rfi.indexes[concatKey]; !hasIt {
rfi.indexes[concatKey] = make(utils.StringMap)
}
rfi.indexes[concatKey][itemID] = true
rfi.chngdIndxKeys[concatKey] = true
}
case utils.META_NONE:
concatKey := utils.ConcatenatedKey(utils.META_NONE, utils.ANY, utils.ANY)
if _, hasIt := rfi.indexes[concatKey]; !hasIt {
rfi.indexes[concatKey] = make(utils.StringMap)
}
rfi.indexes[concatKey][itemID] = true
rfi.chngdIndxKeys[concatKey] = true
}
}
return
}
func (rfi *FilterIndexer) cacheRemItemType() { // ToDo: tune here by removing per item
switch rfi.itemType {
case utils.ThresholdProfilePrefix:
Cache.Clear([]string{utils.CacheThresholdFilterIndexes})
case utils.ResourceProfilesPrefix:
Cache.Clear([]string{utils.CacheResourceFilterIndexes})
case utils.StatQueueProfilePrefix:
Cache.Clear([]string{utils.CacheStatFilterIndexes})
case utils.RouteProfilePrefix:
Cache.Clear([]string{utils.CacheRouteFilterIndexes})
case utils.AttributeProfilePrefix:
Cache.Clear([]string{utils.CacheAttributeFilterIndexes})
case utils.ChargerProfilePrefix:
Cache.Clear([]string{utils.CacheChargerFilterIndexes})
case utils.DispatcherProfilePrefix:
Cache.Clear([]string{utils.CacheDispatcherFilterIndexes})
case utils.RateProfilePrefix:
Cache.Clear([]string{utils.CacheRateProfilesFilterIndexes})
case utils.RatePrefix:
Cache.Clear([]string{utils.CacheRateFilterIndexes})
}
}
// StoreIndexes handles storing the indexes to dataDB
func (rfi *FilterIndexer) StoreIndexes(commit bool, transactionID string) (err error) {
lockID := utils.CacheInstanceToPrefix[utils.PrefixToIndexCache[rfi.itemType]] + rfi.dbKeySuffix
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, lockID)
defer guardian.Guardian.UnguardIDs(refID)
if err = rfi.dm.SetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
rfi.indexes, commit, transactionID); err != nil {
return
}
rfi.cacheRemItemType()
return
}
//Populate FilterIndexer.indexes with specific fieldName:fieldValue , item
func (rfi *FilterIndexer) loadFldNameFldValIndex(filterType, fldName, fldVal string) error {
rcvIdx, err := rfi.dm.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, filterType,
map[string]string{fldName: fldVal})
if err != nil {
return err
}
for fldName, nameValMp := range rcvIdx {
if _, has := rfi.indexes[fldName]; !has {
rfi.indexes[fldName] = make(utils.StringMap)
}
rfi.indexes[fldName] = nameValMp
}
return nil
}
//RemoveItemFromIndex remove Indexes for a specific itemID
func (rfi *FilterIndexer) RemoveItemFromIndex(tenant, itemID string, oldFilters []string) (err error) {
var filterIDs []string
switch rfi.itemType {
case utils.ThresholdProfilePrefix:
th, err := rfi.dm.GetThresholdProfile(tenant, itemID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if th != nil {
filterIDs = make([]string, len(th.FilterIDs))
for i, fltrID := range th.FilterIDs {
filterIDs[i] = fltrID
}
}
case utils.AttributeProfilePrefix:
attrPrf, err := rfi.dm.GetAttributeProfile(tenant, itemID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if attrPrf != nil {
filterIDs = make([]string, len(attrPrf.FilterIDs))
for i, fltrID := range attrPrf.FilterIDs {
filterIDs[i] = fltrID
}
}
case utils.ResourceProfilesPrefix:
res, err := rfi.dm.GetResourceProfile(tenant, itemID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if res != nil {
filterIDs = make([]string, len(res.FilterIDs))
for i, fltrID := range res.FilterIDs {
filterIDs[i] = fltrID
}
}
case utils.StatQueueProfilePrefix:
stq, err := rfi.dm.GetStatQueueProfile(tenant, itemID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if stq != nil {
filterIDs = make([]string, len(stq.FilterIDs))
for i, fltrID := range stq.FilterIDs {
filterIDs[i] = fltrID
}
}
case utils.RouteProfilePrefix:
spp, err := rfi.dm.GetRouteProfile(tenant, itemID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if spp != nil {
filterIDs = make([]string, len(spp.FilterIDs))
for i, fltrID := range spp.FilterIDs {
filterIDs[i] = fltrID
}
}
case utils.ChargerProfilePrefix:
cpp, err := rfi.dm.GetChargerProfile(tenant, itemID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if cpp != nil {
filterIDs = make([]string, len(cpp.FilterIDs))
for i, fltrID := range cpp.FilterIDs {
filterIDs[i] = fltrID
}
}
case utils.DispatcherProfilePrefix:
dpp, err := rfi.dm.GetDispatcherProfile(tenant, itemID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if dpp != nil {
filterIDs = make([]string, len(dpp.FilterIDs))
for i, fltrID := range dpp.FilterIDs {
filterIDs[i] = fltrID
}
}
case utils.RateProfilePrefix:
rpp, err := rfi.dm.GetRateProfile(tenant, itemID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if rpp != nil {
filterIDs = make([]string, len(rpp.FilterIDs))
for i, fltrID := range rpp.FilterIDs {
filterIDs[i] = fltrID
}
}
case utils.RatePrefix:
composedIDs := utils.SplitConcatenatedKey(itemID)
rppID, rateKey := composedIDs[0], composedIDs[1]
rpp, err := rfi.dm.GetRateProfile(tenant, rppID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if rpp != nil {
if rate, has := rpp.Rates[rateKey]; has {
filterIDs = make([]string, len(rate.FilterIDs))
for i, fltrID := range rate.FilterIDs {
filterIDs[i] = fltrID
}
}
}
default:
}
if len(filterIDs) == 0 {
filterIDs = []string{utils.META_NONE}
}
for _, oldFltr := range oldFilters {
filterIDs = append(filterIDs, oldFltr)
}
for _, fltrID := range filterIDs {
var fltr *Filter
if fltrID == utils.META_NONE {
fltr = &Filter{
Tenant: tenant,
ID: itemID,
Rules: []*FilterRule{
{
Type: utils.META_NONE,
Element: utils.META_ANY,
Values: []string{utils.META_ANY},
},
},
}
} else if fltr, err = rfi.dm.GetFilter(tenant, fltrID,
true, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v",
fltrID, rfi.itemType, itemID)
}
return err
}
for _, flt := range fltr.Rules {
var fldType, fldName string
var fldVals []string
if utils.SliceHasMember([]string{utils.META_NONE, utils.MetaPrefix, utils.MetaString}, flt.Type) {
fldType, fldName = flt.Type, flt.Element
fldVals = flt.Values
}
for _, fldVal := range fldVals {
if err = rfi.loadFldNameFldValIndex(fldType,
fldName, fldVal); err != nil && err != utils.ErrNotFound {
return err
}
}
}
}
for _, itmMp := range rfi.indexes {
if _, has := itmMp[itemID]; has {
delete(itmMp, itemID) //Force deleting in driver
}
}
return rfi.StoreIndexes(false, utils.NonTransactional)
}
//createAndIndex create indexes for an item
func createAndIndex(itmPrfx, tenant, context, itemID string, filterIDs []string, dm *DataManager) (err error) {
indexerKey := tenant
if context != "" {
indexerKey = utils.ConcatenatedKey(tenant, context)
}
indexer := NewFilterIndexer(dm, itmPrfx, indexerKey)
fltrIDs := make([]string, len(filterIDs))
for i, fltrID := range filterIDs {
fltrIDs[i] = fltrID
}
if len(fltrIDs) == 0 {
fltrIDs = []string{utils.META_NONE}
}
for _, fltrID := range fltrIDs {
var fltr *Filter
if fltrID == utils.META_NONE {
fltr = &Filter{
Tenant: tenant,
ID: itemID,
Rules: []*FilterRule{
{
Type: utils.META_NONE,
Element: utils.META_ANY,
Values: []string{utils.META_ANY},
},
},
}
} else if fltr, err = dm.GetFilter(tenant, fltrID,
true, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v",
fltrID, itmPrfx, itemID)
}
return
}
for _, flt := range fltr.Rules {
var fldType, fldName string
var fldVals []string
if utils.SliceHasMember([]string{utils.META_NONE, utils.MetaPrefix, utils.MetaString}, flt.Type) {
fldType, fldName = flt.Type, flt.Element
fldVals = flt.Values
}
for _, fldVal := range fldVals {
if err = indexer.loadFldNameFldValIndex(fldType,
fldName, fldVal); err != nil && err != utils.ErrNotFound {
return err
}
}
}
indexer.IndexTPFilter(FilterToTPFilter(fltr), itemID)
}
return indexer.StoreIndexes(true, utils.NonTransactional)
}

View File

@@ -367,7 +367,8 @@ func testRPCMethodsInitSession(t *testing.T) {
time.Sleep(1*time.Second + 500*time.Millisecond)
if err := rpcRpc.Call(utils.SessionSv1InitiateSession,
args, &rply); err == nil || !(err.Error() == "RALS_ERROR:ACCOUNT_DISABLED" || err.Error() == utils.NewErrRALs(utils.ErrExists).Error()) { // ErrExist -> initSession twice
args, &rply); err == nil || !(err.Error() == "RALS_ERROR:ACCOUNT_DISABLED" ||
err.Error() == utils.ErrExists.Error()) { // ErrExist -> initSession twice
t.Error("Unexpected error returned", err)
}

View File

@@ -75,7 +75,7 @@ func (rS *RateS) Call(serviceMethod string, args interface{}, reply interface{})
func (rS *RateS) matchingRateProfileForEvent(args *ArgsCostForEvent) (rtPfl *engine.RateProfile, err error) {
rPfIDs := args.RateProfileIDs
if len(rPfIDs) == 0 {
var rPfIDMp utils.StringMap
var rPfIDMp utils.StringSet
if rPfIDMp, err = engine.MatchingItemIDsForEvent(
args.CGREvent.Event,
rS.cfg.RateSCfg().StringIndexedFields,
@@ -88,7 +88,7 @@ func (rS *RateS) matchingRateProfileForEvent(args *ArgsCostForEvent) (rtPfl *eng
); err != nil {
return
}
rPfIDs = rPfIDMp.Slice()
rPfIDs = rPfIDMp.AsSlice()
}
matchingRPfs := make([]*engine.RateProfile, 0, len(rPfIDs))
evNm := utils.MapStorage{utils.MetaReq: args.CGREvent.Event}
@@ -126,7 +126,7 @@ func (rS *RateS) matchingRateProfileForEvent(args *ArgsCostForEvent) (rtPfl *eng
// indexed based on intervalStart, there will be one winner per interval start
// returned in order of intervalStart
func (rS *RateS) matchingRatesForEvent(rtPfl *engine.RateProfile, cgrEv *utils.CGREvent) (rts []*engine.Rate, err error) {
var rtIDs utils.StringMap
var rtIDs utils.StringSet
// when matching we use the RateProfile ID as context
if rtIDs, err = engine.MatchingItemIDsForEvent(
cgrEv.Event,

View File

@@ -212,6 +212,11 @@ func TestDataDBReload(t *testing.T) {
Remote: false,
TTL: time.Duration(0),
Limit: -1},
utils.MetaRateProfiles: {
Replicate: false,
Remote: false,
TTL: time.Duration(0),
Limit: -1},
},
}
if !reflect.DeepEqual(oldcfg, db.oldDBCfg) {