diff --git a/engine/datamanager.go b/engine/datamanager.go index bc07847bd..4aced932b 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -2981,48 +2981,38 @@ func (dm *DataManager) SetRateProfile(rpp *RateProfile, withIndex bool) (err err return err } if withIndex { + var oldFiltersIDs *[]string if oldRpp != nil { - var needsRemove bool - for _, fltrID := range oldRpp.FilterIDs { - if !utils.IsSliceMember(rpp.FilterIDs, fltrID) { - needsRemove = true + oldFiltersIDs = &oldRpp.FilterIDs + } + if err := updatedIndexes(dm, utils.CacheRateProfilesFilterIndexes, rpp.Tenant, + utils.EmptyString, rpp.ID, oldFiltersIDs, rpp.FilterIDs); err != nil { + return err + } + // remove indexes for old rates + if oldRpp != nil { + for key, rate := range oldRpp.Rates { + if _, has := rpp.Rates[key]; has { + continue } - } - - if needsRemove { - if err = NewFilterIndexer(dm, utils.RateProfilePrefix, - rpp.Tenant).RemoveItemFromIndex(rpp.Tenant, rpp.ID, oldRpp.FilterIDs); err != nil { + if err = removeItemFromFilterIndex(dm, utils.CacheRateFilterIndexes, + rpp.Tenant, rpp.ID, key, rate.FilterIDs); err != nil { return } } } - if err = createAndIndex(utils.RateProfilePrefix, rpp.Tenant, utils.EmptyString, - rpp.ID, rpp.FilterIDs, dm); err != nil { - return - } // create index for each rate for key, rate := range rpp.Rates { + var oldRateFiltersIDs *[]string if oldRpp != nil { if oldRate, has := oldRpp.Rates[key]; has { - var needsRemove bool - for _, fltrID := range oldRate.FilterIDs { - if !utils.IsSliceMember(rate.FilterIDs, fltrID) { - needsRemove = true - } - } - - if needsRemove { - if err = NewFilterIndexer(dm, utils.RatePrefix, - rpp.Tenant).RemoveItemFromIndex(rpp.Tenant, utils.ConcatenatedKey(rpp.ID, key), oldRate.FilterIDs); err != nil { - return - } - } + oldRateFiltersIDs = &oldRate.FilterIDs } } // when we create the indexes for rates we use RateProfile ID as context - if err = createAndIndex(utils.RatePrefix, rpp.Tenant, rpp.ID, - key, rate.FilterIDs, dm); err != nil { - return + if err := updatedIndexes(dm, utils.CacheRateFilterIndexes, rpp.Tenant, + rpp.ID, key, oldRateFiltersIDs, rate.FilterIDs); err != nil { + return err } } @@ -3061,8 +3051,14 @@ func (dm *DataManager) RemoveRateProfile(tenant, id string, return utils.ErrNotFound } if withIndex { - if err = NewFilterIndexer(dm, utils.RateProfilePrefix, - tenant).RemoveItemFromIndex(tenant, id, oldRpp.FilterIDs); err != nil { + for key, rate := range oldRpp.Rates { + if err = removeItemFromFilterIndex(dm, utils.CacheRateFilterIndexes, + oldRpp.Tenant, oldRpp.ID, key, rate.FilterIDs); err != nil { + return + } + } + if err = removeItemFromFilterIndex(dm, utils.CacheRateProfilesFilterIndexes, + tenant, utils.EmptyString, id, oldRpp.FilterIDs); err != nil { return } } diff --git a/engine/filterindexer.go b/engine/filterindexer.go deleted file mode 100644 index 70aa16316..000000000 --- a/engine/filterindexer.go +++ /dev/null @@ -1,357 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package engine - -import ( - "fmt" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/guardian" - "github.com/cgrates/cgrates/utils" -) - -func NewFilterIndexer(dm *DataManager, itemType, dbKeySuffix string) *FilterIndexer { - return &FilterIndexer{dm: dm, itemType: itemType, dbKeySuffix: dbKeySuffix, - indexes: make(map[string]utils.StringMap), - chngdIndxKeys: make(utils.StringMap)} -} - -// FilterIndexer is a centralized indexer for all data sources using RequestFilter -// retrieves and stores it's data from/to dataDB -// not thread safe, meant to be used as logic within other code blocks -type FilterIndexer struct { - indexes map[string]utils.StringMap // map[fieldName:fieldValue]utils.StringMap[itemID] - dm *DataManager - itemType string - dbKeySuffix string // get/store the result from/into this key - chngdIndxKeys utils.StringMap // keep record of the changed fieldName:fieldValue pair so we can re-cache wisely -} - -// IndexTPFilter parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys -func (rfi *FilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID string) { - for _, fltr := range tpFltr.Filters { - switch fltr.Type { - case utils.MetaString: - for _, fldVal := range fltr.Values { - concatKey := utils.ConcatenatedKey(fltr.Type, fltr.Element, fldVal) - if _, hasIt := rfi.indexes[concatKey]; !hasIt { - rfi.indexes[concatKey] = make(utils.StringMap) - } - rfi.indexes[concatKey][itemID] = true - rfi.chngdIndxKeys[concatKey] = true - } - case utils.MetaPrefix: - for _, fldVal := range fltr.Values { - concatKey := utils.ConcatenatedKey(fltr.Type, fltr.Element, fldVal) - if _, hasIt := rfi.indexes[concatKey]; !hasIt { - rfi.indexes[concatKey] = make(utils.StringMap) - } - rfi.indexes[concatKey][itemID] = true - rfi.chngdIndxKeys[concatKey] = true - } - case utils.META_NONE: - concatKey := utils.ConcatenatedKey(utils.META_NONE, utils.ANY, utils.ANY) - if _, hasIt := rfi.indexes[concatKey]; !hasIt { - rfi.indexes[concatKey] = make(utils.StringMap) - } - rfi.indexes[concatKey][itemID] = true - rfi.chngdIndxKeys[concatKey] = true - } - } - return -} - -func (rfi *FilterIndexer) cacheRemItemType() { // ToDo: tune here by removing per item - switch rfi.itemType { - - case utils.ThresholdProfilePrefix: - Cache.Clear([]string{utils.CacheThresholdFilterIndexes}) - - case utils.ResourceProfilesPrefix: - Cache.Clear([]string{utils.CacheResourceFilterIndexes}) - - case utils.StatQueueProfilePrefix: - Cache.Clear([]string{utils.CacheStatFilterIndexes}) - - case utils.RouteProfilePrefix: - Cache.Clear([]string{utils.CacheRouteFilterIndexes}) - - case utils.AttributeProfilePrefix: - Cache.Clear([]string{utils.CacheAttributeFilterIndexes}) - - case utils.ChargerProfilePrefix: - Cache.Clear([]string{utils.CacheChargerFilterIndexes}) - - case utils.DispatcherProfilePrefix: - Cache.Clear([]string{utils.CacheDispatcherFilterIndexes}) - - case utils.RateProfilePrefix: - Cache.Clear([]string{utils.CacheRateProfilesFilterIndexes}) - - case utils.RatePrefix: - Cache.Clear([]string{utils.CacheRateFilterIndexes}) - } -} - -// StoreIndexes handles storing the indexes to dataDB -func (rfi *FilterIndexer) StoreIndexes(commit bool, transactionID string) (err error) { - lockID := utils.CacheInstanceToPrefix[utils.PrefixToIndexCache[rfi.itemType]] + rfi.dbKeySuffix - refID := guardian.Guardian.GuardIDs("", - config.CgrConfig().GeneralCfg().LockingTimeout, lockID) - defer guardian.Guardian.UnguardIDs(refID) - if err = rfi.dm.SetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - rfi.indexes, commit, transactionID); err != nil { - return - } - rfi.cacheRemItemType() - return -} - -//Populate FilterIndexer.indexes with specific fieldName:fieldValue , item -func (rfi *FilterIndexer) loadFldNameFldValIndex(filterType, fldName, fldVal string) error { - rcvIdx, err := rfi.dm.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, filterType, - map[string]string{fldName: fldVal}) - if err != nil { - return err - } - for fldName, nameValMp := range rcvIdx { - if _, has := rfi.indexes[fldName]; !has { - rfi.indexes[fldName] = make(utils.StringMap) - } - rfi.indexes[fldName] = nameValMp - } - return nil -} - -//RemoveItemFromIndex remove Indexes for a specific itemID -func (rfi *FilterIndexer) RemoveItemFromIndex(tenant, itemID string, oldFilters []string) (err error) { - var filterIDs []string - switch rfi.itemType { - case utils.ThresholdProfilePrefix: - th, err := rfi.dm.GetThresholdProfile(tenant, itemID, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return err - } - if th != nil { - filterIDs = make([]string, len(th.FilterIDs)) - for i, fltrID := range th.FilterIDs { - filterIDs[i] = fltrID - } - } - case utils.AttributeProfilePrefix: - attrPrf, err := rfi.dm.GetAttributeProfile(tenant, itemID, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return err - } - if attrPrf != nil { - filterIDs = make([]string, len(attrPrf.FilterIDs)) - for i, fltrID := range attrPrf.FilterIDs { - filterIDs[i] = fltrID - } - } - case utils.ResourceProfilesPrefix: - res, err := rfi.dm.GetResourceProfile(tenant, itemID, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return err - } - if res != nil { - filterIDs = make([]string, len(res.FilterIDs)) - for i, fltrID := range res.FilterIDs { - filterIDs[i] = fltrID - } - } - case utils.StatQueueProfilePrefix: - stq, err := rfi.dm.GetStatQueueProfile(tenant, itemID, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return err - } - if stq != nil { - filterIDs = make([]string, len(stq.FilterIDs)) - for i, fltrID := range stq.FilterIDs { - filterIDs[i] = fltrID - } - } - case utils.RouteProfilePrefix: - spp, err := rfi.dm.GetRouteProfile(tenant, itemID, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return err - } - if spp != nil { - filterIDs = make([]string, len(spp.FilterIDs)) - for i, fltrID := range spp.FilterIDs { - filterIDs[i] = fltrID - } - } - case utils.ChargerProfilePrefix: - cpp, err := rfi.dm.GetChargerProfile(tenant, itemID, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return err - } - if cpp != nil { - filterIDs = make([]string, len(cpp.FilterIDs)) - for i, fltrID := range cpp.FilterIDs { - filterIDs[i] = fltrID - } - } - case utils.DispatcherProfilePrefix: - dpp, err := rfi.dm.GetDispatcherProfile(tenant, itemID, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return err - } - if dpp != nil { - filterIDs = make([]string, len(dpp.FilterIDs)) - for i, fltrID := range dpp.FilterIDs { - filterIDs[i] = fltrID - } - } - case utils.RateProfilePrefix: - rpp, err := rfi.dm.GetRateProfile(tenant, itemID, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return err - } - if rpp != nil { - filterIDs = make([]string, len(rpp.FilterIDs)) - for i, fltrID := range rpp.FilterIDs { - filterIDs[i] = fltrID - } - } - case utils.RatePrefix: - composedIDs := utils.SplitConcatenatedKey(itemID) - rppID, rateKey := composedIDs[0], composedIDs[1] - rpp, err := rfi.dm.GetRateProfile(tenant, rppID, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return err - } - if rpp != nil { - if rate, has := rpp.Rates[rateKey]; has { - filterIDs = make([]string, len(rate.FilterIDs)) - for i, fltrID := range rate.FilterIDs { - filterIDs[i] = fltrID - } - } - } - default: - } - if len(filterIDs) == 0 { - filterIDs = []string{utils.META_NONE} - } - for _, oldFltr := range oldFilters { - filterIDs = append(filterIDs, oldFltr) - } - for _, fltrID := range filterIDs { - var fltr *Filter - if fltrID == utils.META_NONE { - fltr = &Filter{ - Tenant: tenant, - ID: itemID, - Rules: []*FilterRule{ - { - Type: utils.META_NONE, - Element: utils.META_ANY, - Values: []string{utils.META_ANY}, - }, - }, - } - } else if fltr, err = rfi.dm.GetFilter(tenant, fltrID, - true, false, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v", - fltrID, rfi.itemType, itemID) - } - return err - } - for _, flt := range fltr.Rules { - var fldType, fldName string - var fldVals []string - if utils.SliceHasMember([]string{utils.META_NONE, utils.MetaPrefix, utils.MetaString}, flt.Type) { - fldType, fldName = flt.Type, flt.Element - fldVals = flt.Values - } - for _, fldVal := range fldVals { - if err = rfi.loadFldNameFldValIndex(fldType, - fldName, fldVal); err != nil && err != utils.ErrNotFound { - return err - } - } - } - } - for _, itmMp := range rfi.indexes { - if _, has := itmMp[itemID]; has { - delete(itmMp, itemID) //Force deleting in driver - } - } - return rfi.StoreIndexes(false, utils.NonTransactional) -} - -//createAndIndex create indexes for an item -func createAndIndex(itmPrfx, tenant, context, itemID string, filterIDs []string, dm *DataManager) (err error) { - indexerKey := tenant - if context != "" { - indexerKey = utils.ConcatenatedKey(tenant, context) - } - indexer := NewFilterIndexer(dm, itmPrfx, indexerKey) - fltrIDs := make([]string, len(filterIDs)) - for i, fltrID := range filterIDs { - fltrIDs[i] = fltrID - } - if len(fltrIDs) == 0 { - fltrIDs = []string{utils.META_NONE} - } - for _, fltrID := range fltrIDs { - var fltr *Filter - if fltrID == utils.META_NONE { - fltr = &Filter{ - Tenant: tenant, - ID: itemID, - Rules: []*FilterRule{ - { - Type: utils.META_NONE, - Element: utils.META_ANY, - Values: []string{utils.META_ANY}, - }, - }, - } - } else if fltr, err = dm.GetFilter(tenant, fltrID, - true, false, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v", - fltrID, itmPrfx, itemID) - } - return - } - for _, flt := range fltr.Rules { - var fldType, fldName string - var fldVals []string - if utils.SliceHasMember([]string{utils.META_NONE, utils.MetaPrefix, utils.MetaString}, flt.Type) { - fldType, fldName = flt.Type, flt.Element - fldVals = flt.Values - } - for _, fldVal := range fldVals { - if err = indexer.loadFldNameFldValIndex(fldType, - fldName, fldVal); err != nil && err != utils.ErrNotFound { - return err - } - } - } - indexer.IndexTPFilter(FilterToTPFilter(fltr), itemID) - } - return indexer.StoreIndexes(true, utils.NonTransactional) -} diff --git a/general_tests/rpccaching_it_test.go b/general_tests/rpccaching_it_test.go index afeeee69f..5d48e448e 100644 --- a/general_tests/rpccaching_it_test.go +++ b/general_tests/rpccaching_it_test.go @@ -367,7 +367,8 @@ func testRPCMethodsInitSession(t *testing.T) { time.Sleep(1*time.Second + 500*time.Millisecond) if err := rpcRpc.Call(utils.SessionSv1InitiateSession, - args, &rply); err == nil || !(err.Error() == "RALS_ERROR:ACCOUNT_DISABLED" || err.Error() == utils.NewErrRALs(utils.ErrExists).Error()) { // ErrExist -> initSession twice + args, &rply); err == nil || !(err.Error() == "RALS_ERROR:ACCOUNT_DISABLED" || + err.Error() == utils.ErrExists.Error()) { // ErrExist -> initSession twice t.Error("Unexpected error returned", err) } diff --git a/rates/rates.go b/rates/rates.go index 0321d4581..522d38be7 100644 --- a/rates/rates.go +++ b/rates/rates.go @@ -75,7 +75,7 @@ func (rS *RateS) Call(serviceMethod string, args interface{}, reply interface{}) func (rS *RateS) matchingRateProfileForEvent(args *ArgsCostForEvent) (rtPfl *engine.RateProfile, err error) { rPfIDs := args.RateProfileIDs if len(rPfIDs) == 0 { - var rPfIDMp utils.StringMap + var rPfIDMp utils.StringSet if rPfIDMp, err = engine.MatchingItemIDsForEvent( args.CGREvent.Event, rS.cfg.RateSCfg().StringIndexedFields, @@ -88,7 +88,7 @@ func (rS *RateS) matchingRateProfileForEvent(args *ArgsCostForEvent) (rtPfl *eng ); err != nil { return } - rPfIDs = rPfIDMp.Slice() + rPfIDs = rPfIDMp.AsSlice() } matchingRPfs := make([]*engine.RateProfile, 0, len(rPfIDs)) evNm := utils.MapStorage{utils.MetaReq: args.CGREvent.Event} @@ -126,7 +126,7 @@ func (rS *RateS) matchingRateProfileForEvent(args *ArgsCostForEvent) (rtPfl *eng // indexed based on intervalStart, there will be one winner per interval start // returned in order of intervalStart func (rS *RateS) matchingRatesForEvent(rtPfl *engine.RateProfile, cgrEv *utils.CGREvent) (rts []*engine.Rate, err error) { - var rtIDs utils.StringMap + var rtIDs utils.StringSet // when matching we use the RateProfile ID as context if rtIDs, err = engine.MatchingItemIDsForEvent( cgrEv.Event, diff --git a/services/datadb_it_test.go b/services/datadb_it_test.go index 38604f215..41f9faf21 100644 --- a/services/datadb_it_test.go +++ b/services/datadb_it_test.go @@ -212,6 +212,11 @@ func TestDataDBReload(t *testing.T) { Remote: false, TTL: time.Duration(0), Limit: -1}, + utils.MetaRateProfiles: { + Replicate: false, + Remote: false, + TTL: time.Duration(0), + Limit: -1}, }, } if !reflect.DeepEqual(oldcfg, db.oldDBCfg) {