From 5914213c96f30439e7502990a99db0e367e358e8 Mon Sep 17 00:00:00 2001 From: adi Date: Thu, 8 Dec 2022 17:23:56 +0200 Subject: [PATCH] Update filters without computing when overwriting + chargers case --- apier/v1/apier_it_test.go | 4 +- apier/v1/filter_indexes.go | 80 +--------- apier/v1/precache_it_test.go | 4 + engine/datamanager.go | 9 +- engine/filter_indexes.go | 102 +++++++++++++ engine/libindex.go | 144 +++++++++++++++++- engine/libtest.go | 4 + general_tests/filter_indexes_cases_it_test.go | 4 +- 8 files changed, 261 insertions(+), 90 deletions(-) create mode 100644 engine/filter_indexes.go diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index 39dc92537..e8aaa2542 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -893,7 +893,7 @@ func testApierSetRatingProfile(t *testing.T) { if err := rater.Call(utils.CacheSv1GetCacheStats, new(utils.AttrCacheIDsWithArgDispatcher), &rcvStats); err != nil { t.Error("Got error on CacheSv1.GetCacheStats: ", err.Error()) } else if !reflect.DeepEqual(expectedStats, rcvStats) { - t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v, received: %+v", expectedStats, rcvStats) + t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v, received: %+v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats)) } // Calling the second time should not raise EXISTS if err := rater.Call(utils.APIerSv1SetRatingProfile, rpf, &reply); err != nil { @@ -1914,7 +1914,7 @@ func testApierGetCacheStats2(t *testing.T) { if err != nil { t.Error("Got error on CacheSv1.GetCacheStats: ", err.Error()) } else if !reflect.DeepEqual(expectedStats, rcvStats) { - t.Errorf("Calling CacheSv1.GetCacheStats expected: %v, received: %v", expectedStats, rcvStats) + t.Errorf("Calling CacheSv1.GetCacheStats expected: %v, received: %v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats)) } } diff --git a/apier/v1/filter_indexes.go b/apier/v1/filter_indexes.go index 706a19130..465269515 100644 --- a/apier/v1/filter_indexes.go +++ b/apier/v1/filter_indexes.go @@ -249,7 +249,7 @@ func (api *APIerSv1) ComputeFilterIndexes(args utils.ArgsComputeFilterIndexes, r //ChargerProfile Indexes var cppIndexes *engine.FilterIndexer if args.ChargerS { - cppIndexes, err = api.computeChargerIndexes(args.Tenant, nil, transactionID) + cppIndexes, err = engine.ComputeChargerIndexes(api.DataManager, args.Tenant, nil, transactionID) if err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } @@ -340,7 +340,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args utils.ArgsComputeFilterIndexIDs, return utils.APIErrorHandler(err) } //ChargerProfile Indexes - cppIndexes, err := api.computeChargerIndexes(args.Tenant, &args.ChargerIDs, transactionID) + cppIndexes, err := engine.ComputeChargerIndexes(api.DataManager, args.Tenant, &args.ChargerIDs, transactionID) if err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } @@ -845,82 +845,6 @@ func (api *APIerSv1) computeSupplierIndexes(tenant string, sppIDs *[]string, return sppIndexers, nil } -func (api *APIerSv1) computeChargerIndexes(tenant string, cppIDs *[]string, - transactionID string) (filterIndexer *engine.FilterIndexer, err error) { - var chargerIDs []string - var cppIndexes *engine.FilterIndexer - if cppIDs == nil { - ids, err := api.DataManager.DataDB().GetKeysForPrefix(utils.ChargerProfilePrefix) - if err != nil { - return nil, err - } - for _, id := range ids { - chargerIDs = append(chargerIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) - } - // this will be on ComputeIndexes that contains empty indexes - cppIndexes = engine.NewFilterIndexer(api.DataManager, utils.ChargerProfilePrefix, tenant) - } else { - // this will be on ComputeIndexesIDs that contains the old indexes from the next getter - var oldIDx map[string]utils.StringMap - if oldIDx, err = api.DataManager.GetFilterIndexes(utils.PrefixToIndexCache[utils.ChargerProfilePrefix], - tenant, utils.EmptyString, nil); err != nil || oldIDx == nil { - cppIndexes = engine.NewFilterIndexer(api.DataManager, utils.ChargerProfilePrefix, tenant) - } else { - cppIndexes = engine.NewFilterIndexerWithIndexes(api.DataManager, utils.ChargerProfilePrefix, tenant, oldIDx) - } - chargerIDs = *cppIDs - transactionID = utils.NonTransactional - } - for _, id := range chargerIDs { - cpp, err := api.DataManager.GetChargerProfile(tenant, id, true, false, utils.NonTransactional) - if err != nil { - return nil, err - } - fltrIDs := make([]string, len(cpp.FilterIDs)) - for i, fltrID := range cpp.FilterIDs { - fltrIDs[i] = fltrID - } - if len(fltrIDs) == 0 { - fltrIDs = []string{utils.META_NONE} - } - for _, fltrID := range fltrIDs { - var fltr *engine.Filter - if fltrID == utils.META_NONE { - fltr = &engine.Filter{ - Tenant: cpp.Tenant, - ID: cpp.ID, - Rules: []*engine.FilterRule{ - { - Type: utils.META_NONE, - Element: utils.META_ANY, - Values: []string{utils.META_ANY}, - }, - }, - } - } else if fltr, err = engine.GetFilter(api.DataManager, cpp.Tenant, fltrID, - true, false, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - err = fmt.Errorf("broken reference to filter: %+v for charger: %+v", - fltrID, cpp) - } - return nil, err - } - cppIndexes.IndexTPFilter(engine.FilterToTPFilter(fltr), cpp.ID) - } - } - if transactionID == utils.NonTransactional { - if err := cppIndexes.StoreIndexes(true, transactionID); err != nil { - return nil, err - } - return nil, nil - } else { - if err := cppIndexes.StoreIndexes(false, transactionID); err != nil { - return nil, err - } - } - return cppIndexes, nil -} - func (api *APIerSv1) computeDispatcherIndexes(tenant, context string, dspIDs *[]string, transactionID string) (filterIndexer *engine.FilterIndexer, err error) { var dispatcherIDs []string diff --git a/apier/v1/precache_it_test.go b/apier/v1/precache_it_test.go index a73ba90b9..2d3d59f2d 100644 --- a/apier/v1/precache_it_test.go +++ b/apier/v1/precache_it_test.go @@ -311,6 +311,10 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) { Items: 0, Groups: 0, }, + utils.CacheReverseFilterIndexes: { + Items: 0, + Groups: 0, + }, } if err := precacheRPC.Call(utils.CacheSv1GetCacheStats, args, &reply); err != nil { t.Error(err.Error()) diff --git a/engine/datamanager.go b/engine/datamanager.go index 5415e4f33..977c819cf 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -636,10 +636,11 @@ func (dm *DataManager) SetFilter(fltr *Filter) (err error) { return } if oldFltr != nil { - /* if err = UpdateFilterIndexes(dm, oldFltr, fltr); err != nil { - return - } */ - + /* + if err = UpdateFilterIndexes(dm, fltr.Tenant, oldFltr, fltr); err != nil { + return + } + */ } if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters]; itm.Replicate { err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, diff --git a/engine/filter_indexes.go b/engine/filter_indexes.go new file mode 100644 index 000000000..ae7f8e037 --- /dev/null +++ b/engine/filter_indexes.go @@ -0,0 +1,102 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "fmt" + "strings" + + "github.com/cgrates/cgrates/utils" +) + +func ComputeChargerIndexes(dm *DataManager, tenant string, cppIDs *[]string, + transactionID string) (cppIndexes *FilterIndexer, err error) { + var chargerIDs []string + //var cppIndexes *FilterIndexer + if cppIDs == nil { + ids, err := dm.DataDB().GetKeysForPrefix(utils.ChargerProfilePrefix) + if err != nil { + return nil, err + } + for _, id := range ids { + chargerIDs = append(chargerIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) + } + // this will be on ComputeIndexes that contains empty indexes + cppIndexes = NewFilterIndexer(dm, utils.ChargerProfilePrefix, tenant) + } else { + // this will be on ComputeIndexesIDs that contains the old indexes from the next getter + var oldIDx map[string]utils.StringMap + if oldIDx, err = dm.GetFilterIndexes(utils.PrefixToIndexCache[utils.ChargerProfilePrefix], + tenant, utils.EmptyString, nil); err != nil || oldIDx == nil { + cppIndexes = NewFilterIndexer(dm, utils.ChargerProfilePrefix, tenant) + } else { + cppIndexes = NewFilterIndexerWithIndexes(dm, utils.ChargerProfilePrefix, tenant, oldIDx) + } + chargerIDs = *cppIDs + transactionID = utils.NonTransactional + } + for _, id := range chargerIDs { + cpp, err := dm.GetChargerProfile(tenant, id, true, false, utils.NonTransactional) + if err != nil { + return nil, err + } + fltrIDs := make([]string, len(cpp.FilterIDs)) + for i, fltrID := range cpp.FilterIDs { + fltrIDs[i] = fltrID + } + if len(fltrIDs) == 0 { + fltrIDs = []string{utils.META_NONE} + } + for _, fltrID := range fltrIDs { + var fltr *Filter + if fltrID == utils.META_NONE { + fltr = &Filter{ + Tenant: cpp.Tenant, + ID: cpp.ID, + Rules: []*FilterRule{ + { + Type: utils.META_NONE, + Element: utils.META_ANY, + Values: []string{utils.META_ANY}, + }, + }, + } + } else if fltr, err = GetFilter(dm, cpp.Tenant, fltrID, + true, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for charger: %+v", + fltrID, cpp) + } + return nil, err + } + cppIndexes.IndexTPFilter(FilterToTPFilter(fltr), cpp.ID) + } + } + if transactionID == utils.NonTransactional { + if err := cppIndexes.StoreIndexes(true, transactionID); err != nil { + return nil, err + } + return nil, nil + } else { + if err := cppIndexes.StoreIndexes(false, transactionID); err != nil { + return nil, err + } + } + return cppIndexes, nil +} diff --git a/engine/libindex.go b/engine/libindex.go index d801b6466..52be5283f 100644 --- a/engine/libindex.go +++ b/engine/libindex.go @@ -26,13 +26,149 @@ import ( "github.com/cgrates/cgrates/utils" ) +var ( + filterIndexType = utils.StringMap{ + utils.MetaString: true, + utils.MetaPrefix: true} +) + // UpdateFilterIndexes will update the indexes for every reference of a filter that exists in a profile. // Every profile that contains the filters from oldFltr will be updated with the new values for newFltr. // oldFltr and newFltr has the same tenant and ID. -func UpdateFilterIndexes(dm *DataManager, oldFltr *Filter, newFltr *Filter) (err error) { +func UpdateFilterIndexes(dm *DataManager, tnt string, oldFltr *Filter, newFltr *Filter) (err error) { + // we need the rules in roder to compute the new indexes + oldRules := utils.StringMap{} // rules from old filters + newRules := utils.StringMap{} // rules for new filters + removeRules := utils.StringMap{} // the difference from newRules and oldRules that are needed to be removed + // first we check the rules from the new filter + for _, fltr := range newFltr.Rules { + if !filterIndexType.HasKey(fltr.Type) { // we do not consider other types, just *string and *prefix + continue + } + isElementDyn := strings.HasPrefix(fltr.Element, utils.DynamicDataPrefix) + for _, value := range fltr.Values { + var idxKey string + if isElementDyn { + // we do not index element:value both of dynamic types e.g. *string:~*req.Account:~*req.Destination + if strings.HasPrefix(value, utils.DynamicDataPrefix) { + continue + } + idxKey = utils.ConcatenatedKey(fltr.Type, fltr.Element, value) + } else if strings.HasPrefix(value, utils.DynamicDataPrefix) { + idxKey = utils.ConcatenatedKey(fltr.Type, value, fltr.Element) + } else { + continue // none of the element or value are dynamic, so we do not index + } + newRules[idxKey] = true + } + } + // now we check the rules from the old filter + // compare the new rules and old rules and check what rules needs to be removed + for _, fltr := range oldFltr.Rules { + if !filterIndexType.HasKey(fltr.Type) { // we do not consider other types, just *string and *prefix + continue + } + isElementDyn := strings.HasPrefix(fltr.Element, utils.DynamicDataPrefix) + for _, value := range fltr.Values { + var idxKey string + if isElementDyn { + // we do not index element:value both of dynamic types e.g. *string:~*req.Account:~*req.Destination + if strings.HasPrefix(value, utils.DynamicDataPrefix) { + continue + } + idxKey = utils.ConcatenatedKey(fltr.Type, fltr.Element, value) + } else if strings.HasPrefix(value, utils.DynamicDataPrefix) { + idxKey = utils.ConcatenatedKey(fltr.Type, value, fltr.Element) + } else { + continue // none of the element or value are dynamic, so we do not index + } + if !newRules.HasKey(idxKey) { + removeRules[idxKey] = true + } else { + oldRules[idxKey] = true + } + } + } + + needsRebuild := len(removeRules) != 0 // nothing to remove + if !needsRebuild { //check if we added something in remove rules by checking the difference betweend remove rules and old rules + for key := range newRules { + if needsRebuild = !oldRules.HasKey(key); needsRebuild { + break + } + } + if !needsRebuild { + return // nothing to change + } + } + + tntFltrID := utils.ConcatenatedKey(newFltr.Tenant, newFltr.ID) + refID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntFltrID) + defer guardian.Guardian.UnguardIDs(refID) + var rcvIndexes map[string]utils.StringMap + // get all the reverse indexes for the specific filter from db + if rcvIndexes, err = dm.GetFilterIndexes(utils.PrefixToIndexCache[utils.ReverseFilterIndexes], tntFltrID, + utils.EmptyString, nil); err != nil { + if err != utils.ErrNotFound { + return // + } + err = nil // if the error is NOT_FOUND, it means that no indexes were found for this filter, so no need to update + return + } + removeIndexKeys := removeRules.Slice() + + for idxItmType, index := range rcvIndexes { + switch idxItmType { + case utils.CacheChargerFilterIndexes: + // remove the indexes from this filter for this partition + if err = removeFilterIndexesForFilter(dm, idxItmType, tnt, + removeIndexKeys, index); err != nil { + return + } + // we removed the old reverse indexes, now we have to compute the new ones + chargerIDs := index.Slice() + if _, err = ComputeChargerIndexes(dm, newFltr.Tenant, &chargerIDs, + utils.NonTransactional); err != nil { + return err + } + } + } return nil } +// removeFilterIndexesForFilter removes the itemID for the index keys +// used to remove the old indexes when a filter is updated +func removeFilterIndexesForFilter(dm *DataManager, idxItmType, tnt string, + removeIndexKeys []string, itemIDs utils.StringMap) (err error) { + refID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, idxItmType+tnt) + defer guardian.Guardian.UnguardIDs(refID) + for _, idxKey := range removeIndexKeys { // delete old filters indexes for this item + var remIndx map[string]utils.StringMap + + if remIndx, err = dm.GetFilterIndexes(idxItmType, tnt, + utils.EmptyString, nil); err != nil { + if err != utils.ErrNotFound { + return + } + err = nil + continue + } + + for idx := range itemIDs { + delete(remIndx[idxKey], idx) + //remIndx[idxKey].Remove(idx) + } + fltrIndexer := NewFilterIndexer(dm, utils.CacheInstanceToPrefix[idxItmType], tnt) + fltrIndexer.indexes = remIndx + if err = fltrIndexer.StoreIndexes(true, utils.NonTransactional); err != nil { + return + } + } + return +} + // addReverseFilterIndexForFilter will add a reference for the filter in reverse filter indexes func addReverseFilterIndexForFilter(dm *DataManager, idxItmType, ctx, tnt, itemID string, filterIDs []string) (err error) { @@ -80,11 +216,11 @@ func removeReverseFilterIndexForFilter(dm *DataManager, idxItmType, ctx, tnt, it if strings.HasPrefix(fltrID, utils.Meta) { // we do not reverse for inline filters continue } - tntCtx := utils.ConcatenatedKey(tnt, fltrID) + tntFltrID := utils.ConcatenatedKey(tnt, fltrID) refID := guardian.Guardian.GuardIDs(utils.EmptyString, - config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntCtx) + config.CgrConfig().GeneralCfg().LockingTimeout, utils.CacheReverseFilterIndexes+tntFltrID) var indexes map[string]utils.StringMap - if indexes, err = dm.GetFilterIndexes(utils.PrefixToIndexCache[utils.ReverseFilterIndexes], tntCtx, + if indexes, err = dm.GetFilterIndexes(utils.PrefixToIndexCache[utils.ReverseFilterIndexes], tntFltrID, utils.EmptyString, nil); err != nil { if err != utils.ErrNotFound { guardian.Guardian.UnguardIDs(refID) diff --git a/engine/libtest.go b/engine/libtest.go index 699365636..54c03a7e5 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -505,6 +505,10 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats { Items: 0, Groups: 0, }, + utils.CacheReverseFilterIndexes: { + Items: 0, + Groups: 0, + }, utils.CacheDispatcherProfiles: { Items: 0, Groups: 0, diff --git a/general_tests/filter_indexes_cases_it_test.go b/general_tests/filter_indexes_cases_it_test.go index 4780679b6..ab536a8dd 100644 --- a/general_tests/filter_indexes_cases_it_test.go +++ b/general_tests/filter_indexes_cases_it_test.go @@ -93,7 +93,7 @@ var ( testFilterIndexesCasesStartEngine, testFilterIndexesCasesRpcConn, - /* testFilterIndexesCasesSetFilters, + testFilterIndexesCasesSetFilters, testFilterIndexesCasesSetAttributesWithFilters, testFilterIndexesCasesGetIndexesAnyContext, testFilterIndexesCasesGetIndexesSessionsContext, @@ -102,7 +102,7 @@ var ( testFilterIndexesCasesOverwriteAttributes, testFilterIndexesCasesComputeAttributesIndexes, testFilterIndexesCasesGetIndexesAnyContextChanged, - testFilterIndexesCasesGetIndexesSessionsContextChanged, */ + testFilterIndexesCasesGetIndexesSessionsContextChanged, testFilterIndexesCasesSetIndexedFilter, testFilterIndexesCasesSetChargerWithFltr,