Indexing Rates from RateProfiles

This commit is contained in:
TeoV
2020-06-16 11:43:16 +03:00
committed by Dan Christian Bogos
parent 704aa541c2
commit da35ce7447
12 changed files with 226 additions and 4 deletions

View File

@@ -203,6 +203,7 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) {
Groups: 0,
},
utils.CacheRateProfilesFilterIndexes: {},
utils.CacheRateFilterIndexes: {},
utils.CacheRateProfiles: {},
utils.CacheRatingPlans: {
Items: 4,

View File

@@ -219,6 +219,7 @@ const CGRATES_CFG_JSON = `
"*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control charger filter indexes caching
"*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher filter indexes caching
"*rate_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control rate profile filter indexes caching
"*rate_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control rate filter indexes caching
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher routes caching
"*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load ( in case of *load strategy )
"*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher interface

View File

@@ -169,6 +169,9 @@ func TestCacheJsonCfg(t *testing.T) {
utils.CacheRateProfilesFilterIndexes: {Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Replicate: utils.BoolPointer(false)},
utils.CacheRateFilterIndexes: {Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Replicate: utils.BoolPointer(false)},
utils.CacheDispatcherRoutes: {Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Replicate: utils.BoolPointer(false)},

View File

@@ -687,6 +687,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheRateProfilesFilterIndexes: {Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheRateFilterIndexes: {Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheDispatcherRoutes: {Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheDispatcherLoads: {Limit: -1,

View File

@@ -186,6 +186,7 @@ func testDspChcPrecacheStatus(t *testing.T) {
utils.CacheChargerFilterIndexes: utils.MetaReady,
utils.CacheDispatcherFilterIndexes: utils.MetaReady,
utils.CacheRateProfilesFilterIndexes: utils.MetaReady,
utils.CacheRateFilterIndexes: utils.MetaReady,
utils.CacheRateProfiles: utils.MetaReady,
utils.CacheLoadIDs: utils.MetaReady,
utils.CacheCDRIDs: utils.MetaReady,

View File

@@ -86,6 +86,7 @@ var (
utils.ChargerFilterIndexes: true,
utils.DispatcherFilterIndexes: true,
utils.RateProfilesFilterIndexPrfx: true,
utils.RateFilterIndexPrfx: true,
}
)
@@ -318,6 +319,8 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
err = dm.MatchFilterIndexFromKey(utils.CacheDispatcherFilterIndexes, dataID)
case utils.RateProfilesFilterIndexPrfx:
err = dm.MatchFilterIndexFromKey(utils.CacheRateProfilesFilterIndexes, dataID)
case utils.RateFilterIndexPrfx:
err = dm.MatchFilterIndexFromKey(utils.CacheRateFilterIndexes, dataID)
case utils.LoadIDPrefix:
_, err = dm.GetItemLoadIDs(utils.EmptyString, true)
}
@@ -3182,6 +3185,31 @@ func (dm *DataManager) SetRateProfile(rpp *RateProfile, withIndex bool) (err err
rpp.ID, rpp.FilterIDs, dm); err != nil {
return
}
// create index for each rate
for key, rate := range rpp.Rates {
if oldRpp != nil {
if oldRate, has := oldRpp.Rates[key]; has {
var needsRemove bool
for _, fltrID := range oldRate.FilterIDs {
if !utils.IsSliceMember(rate.FilterIDs, fltrID) {
needsRemove = true
}
}
if needsRemove {
if err = NewFilterIndexer(dm, utils.RatePrefix,
rpp.Tenant).RemoveItemFromIndex(rpp.Tenant, utils.ConcatenatedKey(rpp.ID, key), oldRate.FilterIDs); err != nil {
return
}
}
}
}
// when we create the indexes for rates we use RateProfile ID as context
if err = createAndIndex(utils.RatePrefix, rpp.Tenant, rpp.ID,
key, rate.FilterIDs, dm); err != nil {
return
}
}
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDispatcherProfiles]; itm.Replicate {

View File

@@ -103,6 +103,9 @@ func (rfi *FilterIndexer) cacheRemItemType() { // ToDo: tune here by removing pe
case utils.RateProfilePrefix:
Cache.Clear([]string{utils.CacheRateProfilesFilterIndexes})
case utils.RatePrefix:
Cache.Clear([]string{utils.CacheRateFilterIndexes})
}
}
@@ -230,6 +233,21 @@ func (rfi *FilterIndexer) RemoveItemFromIndex(tenant, itemID string, oldFilters
filterIDs[i] = fltrID
}
}
case utils.RatePrefix:
composedIDs := utils.SplitConcatenatedKey(itemID)
rppID, rateKey := composedIDs[0], composedIDs[1]
rpp, err := rfi.dm.GetRateProfile(tenant, rppID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
if rpp != nil {
if rate, has := rpp.Rates[rateKey]; has {
filterIDs = make([]string, len(rate.FilterIDs))
for i, fltrID := range rate.FilterIDs {
filterIDs[i] = fltrID
}
}
}
default:
}
if len(filterIDs) == 0 {

View File

@@ -61,6 +61,7 @@ var sTests = []func(t *testing.T){
testITFlush,
testITIsDBEmpty,
testITTestIndexingMetaNot,
testITIndexRateProfile,
}
func TestFilterIndexerIT(t *testing.T) {
@@ -946,3 +947,165 @@ func testITTestIndexingMetaNot(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp)
}
}
func testITIndexRateProfile(t *testing.T) {
rfi := NewFilterIndexer(onStor, utils.RatePrefix, "cgrates.org:RP1")
rPrf := &RateProfile{
Tenant: "cgrates.org",
ID: "RP1",
FilterIDs: []string{"*string:~*req.Subject:1001", "*string:~*req.Subject:1002"},
Weight: 0,
ConnectFee: 0.1,
RoundingMethod: "*up",
RoundingDecimals: 4,
MinCost: 0.1,
MaxCost: 0.6,
MaxCostStrategy: "*free",
Rates: map[string]*Rate{
"FIRST_GI": &Rate{
ID: "FIRST_GI",
FilterIDs: []string{"*string:~*req.Category:call"},
Weight: 0,
Value: 0.12,
Unit: time.Duration(1 * time.Minute),
Increment: time.Duration(1 * time.Minute),
Blocker: false,
},
"SECOND_GI": &Rate{
ID: "SECOND_GI",
FilterIDs: []string{"*string:~*req.Category:voice"},
Weight: 10,
Value: 0.06,
Unit: time.Duration(1 * time.Minute),
Increment: time.Duration(1 * time.Second),
Blocker: false,
},
},
}
if err := dataManager.SetRateProfile(rPrf, true); err != nil {
t.Error(err)
}
eIdxes := map[string]utils.StringMap{
"*string:~*req.Category:call": {
"FIRST_GI": true,
},
"*string:~*req.Category:voice": {
"SECOND_GI": true,
},
}
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
t.Error(err)
} else {
if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
}
}
// update the RateProfile by adding a new Rate
rPrf.Rates = map[string]*Rate{
"FIRST_GI": &Rate{
ID: "FIRST_GI",
FilterIDs: []string{"*string:~*req.Category:call"},
Weight: 0,
Value: 0.12,
Unit: time.Duration(1 * time.Minute),
Increment: time.Duration(1 * time.Minute),
Blocker: false,
},
"SECOND_GI": &Rate{
ID: "SECOND_GI",
FilterIDs: []string{"*string:~*req.Category:voice"},
Weight: 10,
Value: 0.06,
Unit: time.Duration(1 * time.Minute),
Increment: time.Duration(1 * time.Second),
Blocker: false,
},
"THIRD_GI": &Rate{
ID: "THIRD_GI",
FilterIDs: []string{"*string:~*req.Category:custom"},
Weight: 20,
Value: 0.06,
Unit: time.Duration(1 * time.Minute),
Increment: time.Duration(1 * time.Second),
Blocker: false,
},
}
if err := dataManager.SetRateProfile(rPrf, true); err != nil {
t.Error(err)
}
eIdxes = map[string]utils.StringMap{
"*string:~*req.Category:call": {
"FIRST_GI": true,
},
"*string:~*req.Category:voice": {
"SECOND_GI": true,
},
"*string:~*req.Category:custom": {
"THIRD_GI": true,
},
}
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix,
utils.EmptyString, nil); err != nil {
t.Error(err)
} else {
if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
}
}
rfi2 := NewFilterIndexer(onStor, utils.RatePrefix, "cgrates.org:RP2")
rPrf2 := &RateProfile{
Tenant: "cgrates.org",
ID: "RP2",
Weight: 0,
ConnectFee: 0.1,
RoundingMethod: "*up",
RoundingDecimals: 4,
MinCost: 0.1,
MaxCost: 0.6,
MaxCostStrategy: "*free",
Rates: map[string]*Rate{
"CUSTOM_RATE1": &Rate{
ID: "CUSTOM_RATE1",
FilterIDs: []string{"*string:~*req.Subject:1001"},
Weight: 0,
Value: 0.12,
Unit: time.Duration(1 * time.Minute),
Increment: time.Duration(1 * time.Minute),
Blocker: false,
},
"CUSTOM_RATE2": &Rate{
ID: "CUSTOM_RATE2",
FilterIDs: []string{"*string:~*req.Subject:1001", "*string:~*req.Category:call"},
Weight: 10,
Value: 0.6,
Unit: time.Duration(1 * time.Minute),
Increment: time.Duration(1 * time.Second),
Blocker: false,
},
},
}
if err := dataManager.SetRateProfile(rPrf2, true); err != nil {
t.Error(err)
}
eIdxes = map[string]utils.StringMap{
"*string:~*req.Subject:1001": {
"CUSTOM_RATE1": true,
"CUSTOM_RATE2": true,
},
"*string:~*req.Category:call": {
"CUSTOM_RATE2": true,
},
}
if rcvIdx, err := dataManager.GetFilterIndexes(
utils.PrefixToIndexCache[rfi2.itemType], rfi2.dbKeySuffix,
utils.EmptyString, nil); err != nil {
t.Error(err)
} else {
if !reflect.DeepEqual(eIdxes, rcvIdx) {
t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
}
}
}

View File

@@ -507,6 +507,7 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
utils.CacheThresholds: {},
utils.CacheRateProfiles: {},
utils.CacheRateProfilesFilterIndexes: {},
utils.CacheRateFilterIndexes: {},
utils.CacheTimings: {},
utils.CacheDiameterMessages: {},
utils.CacheClosedSessions: {},

View File

@@ -2499,6 +2499,7 @@ func (tpr *TpReader) ReloadCache(caching string, verbose bool, argDispatcher *ut
}
if len(ratePrfIDs) != 0 {
cacheIDs = append(cacheIDs, utils.CacheRateProfilesFilterIndexes)
cacheIDs = append(cacheIDs, utils.CacheRateFilterIndexes)
}
if verbose {
log.Print("Clearing indexes")

View File

@@ -127,13 +127,14 @@ func (rS *RateS) matchingRateProfileForEvent(args *ArgsCostForEvent) (rtPfl *eng
// returned in order of intervalStart
func (rS *RateS) matchingRatesForEvent(rtPfl *engine.RateProfile, cgrEv *utils.CGREvent) (rts []*engine.Rate, err error) {
var rtIDs utils.StringMap
// when matching we use the RateProfile ID as context
if rtIDs, err = engine.MatchingItemIDsForEvent(
cgrEv.Event,
rS.cfg.RateSCfg().RateStringIndexedFields,
rS.cfg.RateSCfg().RatePrefixIndexedFields,
rS.dm,
utils.CacheRateFilterIndexes,
cgrEv.Tenant,
utils.ConcatenatedKey(cgrEv.Tenant, rtPfl.ID),
rS.cfg.RateSCfg().RateIndexedSelects,
rS.cfg.RateSCfg().RateNestedFields,
); err != nil {

View File

@@ -70,7 +70,8 @@ var (
CacheAttributeFilterIndexes, CacheChargerFilterIndexes, CacheDispatcherFilterIndexes,
CacheDispatcherRoutes, CacheDispatcherLoads, CacheDiameterMessages, CacheRPCResponses,
CacheClosedSessions, CacheCDRIDs, CacheLoadIDs, CacheRPCConnections, CacheRatingProfilesTmp,
CacheUCH, CacheSTIR, CacheEventCharges, CacheRateProfiles, CacheRateProfilesFilterIndexes})
CacheUCH, CacheSTIR, CacheEventCharges, CacheRateProfiles, CacheRateProfilesFilterIndexes,
CacheRateFilterIndexes})
CacheInstanceToPrefix = map[string]string{
CacheDestinations: DESTINATION_PREFIX,
CacheReverseDestinations: REVERSE_DESTINATION_PREFIX,
@@ -117,6 +118,7 @@ var (
ChargerProfilePrefix: CacheChargerFilterIndexes,
DispatcherProfilePrefix: CacheDispatcherFilterIndexes,
RateProfilePrefix: CacheRateProfilesFilterIndexes,
RatePrefix: CacheRateFilterIndexes,
}
CacheIndexesToPrefix map[string]string // will be built on init
@@ -134,7 +136,7 @@ var (
CacheDispatcherProfiles, CacheDispatcherHosts, CacheResourceFilterIndexes, CacheStatFilterIndexes,
CacheThresholdFilterIndexes, CacheRouteFilterIndexes, CacheAttributeFilterIndexes,
CacheChargerFilterIndexes, CacheDispatcherFilterIndexes, CacheLoadIDs, CacheAccounts,
CacheRateProfiles, CacheRateProfilesFilterIndexes})
CacheRateProfiles, CacheRateProfilesFilterIndexes, CacheRateFilterIndexes})
CacheStorDBPartitions = NewStringSet([]string{TBLTPTimings, TBLTPDestinations, TBLTPRates,
TBLTPDestinationRates, TBLTPRatingPlans, TBLTPRatingProfiles, TBLTPSharedGroups,
@@ -272,7 +274,7 @@ const (
VERSION_PREFIX = "ver_"
StatQueueProfilePrefix = "sqp_"
RouteProfilePrefix = "rpp_"
RateProfilePrfx = "rtp_"
RatePrefix = "rep_"
AttributeProfilePrefix = "alp_"
ChargerProfilePrefix = "cpp_"
DispatcherProfilePrefix = "dpp_"