From 7bea811ca02fe4e2a044b31e65fc4037e070f801 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 4 Jul 2018 04:03:45 -0400 Subject: [PATCH] Centralize indexing in data manager --- config/config_defaults.go | 1 + config/config_json_test.go | 3 +- config/config_test.go | 3 +- config/filterscfg.go | 6 +- config/libconfig_json.go | 3 +- engine/datamanager.go | 347 ++++++-------------------------- engine/filterindexer.go | 6 +- engine/filterindexer_it_test.go | 31 +-- 8 files changed, 90 insertions(+), 310 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index eecd9c80a..55fae5138 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -148,6 +148,7 @@ const CGRATES_CFG_JSON = ` "filters": { // Filters configuration (*new) "stats_conns": [], // address where to reach the stat service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> + "indexes_selects":true, }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 7a0f66c55..628472e86 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -707,7 +707,8 @@ func TestDfAttributeServJsonCfg(t *testing.T) { func TestDfFilterSJsonCfg(t *testing.T) { eCfg := &FilterSJsonCfg{ - Stats_conns: &[]*HaPoolJsonCfg{}, + Stats_conns: &[]*HaPoolJsonCfg{}, + Indexes_selects: utils.BoolPointer(true), } if cfg, err := dfCgrJsonCfg.FilterSJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index b1b2e2088..6cefdc743 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -780,7 +780,8 @@ func TestCgrCfgJSONDefaultsUserS(t *testing.T) { func TestCgrCfgJSONDefaultFiltersCfg(t *testing.T) { eFiltersCfg := &FilterSCfg{ - StatSConns: []*HaPoolConfig{}, + StatSConns: []*HaPoolConfig{}, + IndexedSelects: true, } if !reflect.DeepEqual(cgrCfg.filterSCfg, eFiltersCfg) { t.Errorf("received: %+v, expecting: %+v", cgrCfg.filterSCfg, eFiltersCfg) diff --git a/config/filterscfg.go b/config/filterscfg.go index 90e195a8b..847837553 100644 --- a/config/filterscfg.go +++ b/config/filterscfg.go @@ -19,7 +19,8 @@ along with this program. If not, see package config type FilterSCfg struct { - StatSConns []*HaPoolConfig + StatSConns []*HaPoolConfig + IndexedSelects bool } func (fSCfg *FilterSCfg) loadFromJsonCfg(jsnCfg *FilterSJsonCfg) (err error) { @@ -33,5 +34,8 @@ func (fSCfg *FilterSCfg) loadFromJsonCfg(jsnCfg *FilterSJsonCfg) (err error) { fSCfg.StatSConns[idx].loadFromJsonCfg(jsnHaCfg) } } + if jsnCfg.Indexes_selects != nil { + fSCfg.IndexedSelects = *jsnCfg.Indexes_selects + } return } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 61bbf42e3..e4e251fc0 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -85,7 +85,8 @@ type DbJsonCfg struct { // Filters config type FilterSJsonCfg struct { - Stats_conns *[]*HaPoolJsonCfg + Stats_conns *[]*HaPoolJsonCfg + Indexes_selects *bool } // Rater config section diff --git a/engine/datamanager.go b/engine/datamanager.go index 0b8899131..4dd3dde80 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -423,65 +423,7 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) return } if withIndex { - //remove old ThresholdProfile indexes - indexerRemove := NewFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant) - if err = indexerRemove.RemoveItemFromIndex(th.ID); err != nil && - err.Error() != utils.ErrNotFound.Error() { - return - } - indexer := NewFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant) - //Verify matching Filters for every FilterID from ThresholdProfile - fltrIDs := make([]string, len(th.FilterIDs)) - for i, fltrID := range th.FilterIDs { - fltrIDs[i] = fltrID - } - if len(fltrIDs) == 0 { - fltrIDs = []string{utils.META_NONE} - } - for _, fltrID := range fltrIDs { - var fltr *Filter - if fltrID == utils.META_NONE { - fltr = &Filter{ - Tenant: th.Tenant, - ID: th.ID, - Rules: []*FilterRule{ - &FilterRule{ - Type: utils.MetaDefault, - FieldName: utils.META_ANY, - Values: []string{utils.META_ANY}, - }, - }, - } - } else if fltr, err = dm.GetFilter(th.Tenant, fltrID, - false, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", - fltrID, th) - } - return - } - for _, flt := range fltr.Rules { - var fldType, fldName string - var fldVals []string - if utils.IsSliceMember([]string{MetaString, MetaPrefix}, flt.Type) { - fldType, fldName = flt.Type, flt.FieldName - fldVals = flt.Values - } else { - fldType, fldName = utils.MetaDefault, utils.META_ANY - fldVals = []string{utils.META_ANY} - } - for _, fldVal := range fldVals { - if err = indexer.loadFldNameFldValIndex(fldType, - fldName, fldVal); err != nil && err != utils.ErrNotFound { - return err - } - } - } - indexer.IndexTPFilter(FilterToTPFilter(fltr), th.ID) - } - if err = indexer.StoreIndexes(true, utils.NonTransactional); err != nil { - return - } + return dm.createAndIndex(utils.ThresholdProfilePrefix, th.Tenant, th.ID, th.FilterIDs) } return } @@ -533,64 +475,7 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool return } if withIndex { - indexer := NewFilterIndexer(dm, utils.StatQueueProfilePrefix, sqp.Tenant) - //remove old StatQueueProfile indexes - if err = indexer.RemoveItemFromIndex(sqp.ID); err != nil && - err.Error() != utils.ErrNotFound.Error() { - return - } - //Verify matching Filters for every FilterID from StatQueueProfile - fltrIDs := make([]string, len(sqp.FilterIDs)) - for i, fltrID := range sqp.FilterIDs { - fltrIDs[i] = fltrID - } - if len(fltrIDs) == 0 { - fltrIDs = []string{utils.META_NONE} - } - for _, fltrID := range fltrIDs { - var fltr *Filter - if fltrID == utils.META_NONE { - fltr = &Filter{ - Tenant: sqp.Tenant, - ID: sqp.ID, - Rules: []*FilterRule{ - &FilterRule{ - Type: utils.MetaDefault, - FieldName: utils.META_ANY, - Values: []string{utils.META_ANY}, - }, - }, - } - } else if fltr, err = dm.GetFilter(sqp.Tenant, fltrID, - false, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - err = fmt.Errorf("broken reference to filter: %+v for statqueue: %+v", - fltrID, sqp) - } - return - } - for _, flt := range fltr.Rules { - var fldType, fldName string - var fldVals []string - if utils.IsSliceMember([]string{MetaString, MetaPrefix}, flt.Type) { - fldType, fldName = flt.Type, flt.FieldName - fldVals = flt.Values - } else { - fldType, fldName = utils.MetaDefault, utils.META_ANY - fldVals = []string{utils.META_ANY} - } - for _, fldVal := range fldVals { - if err = indexer.loadFldNameFldValIndex(fldType, - fldName, fldVal); err != nil && err != utils.ErrNotFound { - return err - } - } - } - indexer.IndexTPFilter(FilterToTPFilter(fltr), sqp.ID) - } - if err = indexer.StoreIndexes(true, utils.NonTransactional); err != nil { - return - } + return dm.createAndIndex(utils.StatQueueProfilePrefix, sqp.Tenant, sqp.ID, sqp.FilterIDs) } return } @@ -726,62 +611,7 @@ func (dm *DataManager) SetResourceProfile(rp *ResourceProfile, withIndex bool) ( } //to be implemented in tests if withIndex { - indexer := NewFilterIndexer(dm, utils.ResourceProfilesPrefix, rp.Tenant) - //remove old ResourceProfiles indexes - if err = indexer.RemoveItemFromIndex(rp.ID); err != nil && - err.Error() != utils.ErrNotFound.Error() { - return - } - //Verify matching Filters for every FilterID from ResourceProfiles - fltrIDs := make([]string, len(rp.FilterIDs)) - for i, fltrID := range rp.FilterIDs { - fltrIDs[i] = fltrID - } - if len(fltrIDs) == 0 { - fltrIDs = []string{utils.META_NONE} - } - for _, fltrID := range fltrIDs { - var fltr *Filter - if fltrID == utils.META_NONE { - fltr = &Filter{ - Tenant: rp.Tenant, - ID: rp.ID, - Rules: []*FilterRule{ - &FilterRule{ - Type: utils.MetaDefault, - FieldName: utils.META_ANY, - Values: []string{utils.META_ANY}, - }, - }, - } - } else if fltr, err = dm.GetFilter(rp.Tenant, fltrID, - false, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", - fltrID, rp) - } - return - } - for _, flt := range fltr.Rules { - var fldType, fldName string - var fldVals []string - if utils.IsSliceMember([]string{MetaString, MetaPrefix}, flt.Type) { - fldType, fldName = flt.Type, flt.FieldName - fldVals = flt.Values - } else { - fldType, fldName = utils.MetaDefault, utils.META_ANY - fldVals = []string{utils.META_ANY} - } - for _, fldVal := range fldVals { - if err = indexer.loadFldNameFldValIndex(fldType, - fldName, fldVal); err != nil && err != utils.ErrNotFound { - return err - } - } - } - indexer.IndexTPFilter(FilterToTPFilter(fltr), rp.ID) - } - if err = indexer.StoreIndexes(true, utils.NonTransactional); err != nil { + if err = dm.createAndIndex(utils.ResourceProfilesPrefix, rp.Tenant, rp.ID, rp.FilterIDs); err != nil { return } Cache.Clear([]string{utils.CacheEventResources}) @@ -1214,66 +1044,8 @@ func (dm *DataManager) SetSupplierProfile(supp *SupplierProfile, withIndex bool) if err = dm.CacheDataFromDB(utils.SupplierProfilePrefix, []string{supp.TenantID()}, true); err != nil { return } - //to be implemented in tests if withIndex { - indexer := NewFilterIndexer(dm, utils.SupplierProfilePrefix, supp.Tenant) - //remove old SupplierProfile indexes - if err = indexer.RemoveItemFromIndex(supp.ID); err != nil && - err.Error() != utils.ErrNotFound.Error() { - return - } - //Verify matching Filters for every FilterID from SupplierProfile - fltrIDs := make([]string, len(supp.FilterIDs)) - for i, fltrID := range supp.FilterIDs { - fltrIDs[i] = fltrID - } - if len(fltrIDs) == 0 { - fltrIDs = []string{utils.META_NONE} - } - for _, fltrID := range fltrIDs { - var fltr *Filter - if fltrID == utils.META_NONE { - fltr = &Filter{ - Tenant: supp.Tenant, - ID: supp.ID, - Rules: []*FilterRule{ - &FilterRule{ - Type: utils.MetaDefault, - FieldName: utils.META_ANY, - Values: []string{utils.META_ANY}, - }, - }, - } - } else if fltr, err = dm.GetFilter(supp.Tenant, fltrID, - false, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - err = fmt.Errorf("broken reference to filter: %+v for SupplierProfile: %+v", - fltrID, supp) - } - return - } - for _, flt := range fltr.Rules { - var fldType, fldName string - var fldVals []string - if utils.IsSliceMember([]string{MetaString, MetaPrefix}, flt.Type) { - fldType, fldName = flt.Type, flt.FieldName - fldVals = flt.Values - } else { - fldType, fldName = utils.MetaDefault, utils.META_ANY - fldVals = []string{utils.META_ANY} - } - for _, fldVal := range fldVals { - if err = indexer.loadFldNameFldValIndex(fldType, - fldName, fldVal); err != nil && err != utils.ErrNotFound { - return err - } - } - } - indexer.IndexTPFilter(FilterToTPFilter(fltr), supp.ID) - } - if err = indexer.StoreIndexes(true, utils.NonTransactional); err != nil { - return - } + return dm.createAndIndex(utils.SupplierProfilePrefix, supp.Tenant, supp.ID, supp.FilterIDs) } return } @@ -1358,57 +1130,8 @@ func (dm *DataManager) SetAttributeProfile(ap *AttributeProfile, withIndex bool) } } for _, ctx := range ap.Contexts { - indexer := NewFilterIndexer(dm, utils.AttributeProfilePrefix, utils.ConcatenatedKey(ap.Tenant, ctx)) - //Verify matching Filters for every FilterID from AttributeProfile - fltrIDs := make([]string, len(ap.FilterIDs)) - for i, fltrID := range ap.FilterIDs { - fltrIDs[i] = fltrID - } - if len(fltrIDs) == 0 { - fltrIDs = []string{utils.META_NONE} - } - for _, fltrID := range fltrIDs { - var fltr *Filter - if fltrID == utils.META_NONE { - fltr = &Filter{ - Tenant: ap.Tenant, - ID: ap.ID, - Rules: []*FilterRule{ - &FilterRule{ - Type: utils.MetaDefault, - FieldName: utils.META_ANY, - Values: []string{utils.META_ANY}, - }, - }, - } - } else if fltr, err = dm.GetFilter(ap.Tenant, fltrID, - false, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - err = fmt.Errorf("broken reference to filter: %+v for AttributeProfile: %+v", - fltrID, ap) - } - return - } - for _, flt := range fltr.Rules { - var fldType, fldName string - var fldVals []string - if utils.IsSliceMember([]string{MetaString, MetaPrefix}, flt.Type) { - fldType, fldName = flt.Type, flt.FieldName - fldVals = flt.Values - } else { - fldType, fldName = utils.MetaDefault, utils.META_ANY - fldVals = []string{utils.META_ANY} - } - for _, fldVal := range fldVals { - if err = indexer.loadFldNameFldValIndex(fldType, - fldName, fldVal); err != nil && err != utils.ErrNotFound { - return err - } - } - } - indexer.IndexTPFilter(FilterToTPFilter(fltr), ap.ID) - } - if err = indexer.StoreIndexes(true, utils.NonTransactional); err != nil { + if err = dm.createAndIndex(utils.AttributeProfilePrefix, + utils.ConcatenatedKey(ap.Tenant, ctx), ap.ID, ap.FilterIDs); err != nil { return } } @@ -1433,3 +1156,61 @@ func (dm *DataManager) RemoveAttributeProfile(tenant, id string, contexts []stri } return } + +func (dm *DataManager) createAndIndex(itemPrefix, tenant, itemID string, filterIDs []string) (err error) { + indexer := NewFilterIndexer(dm, itemPrefix, tenant) + if err = indexer.RemoveItemFromIndex(itemID); err != nil && + err.Error() != utils.ErrNotFound.Error() { + return + } + if itemPrefix == utils.AttributeProfilePrefix { + tenant = strings.Split(tenant, utils.InInFieldSep)[0] + } + fltrIDs := make([]string, len(filterIDs)) + for i, fltrID := range filterIDs { + fltrIDs[i] = fltrID + } + if len(fltrIDs) == 0 { + fltrIDs = []string{utils.META_NONE} + } + for _, fltrID := range fltrIDs { + var fltr *Filter + if fltrID == utils.META_NONE { + fltr = &Filter{ + Tenant: tenant, + ID: itemID, + Rules: []*FilterRule{ + &FilterRule{ + Type: utils.META_NONE, + FieldName: utils.META_ANY, + Values: []string{utils.META_ANY}, + }, + }, + } + } else if fltr, err = dm.GetFilter(tenant, fltrID, + false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for itemType: %+v and ID: %+v", + fltrID, itemPrefix, itemID) + } + return + } + for _, flt := range fltr.Rules { + var fldType, fldName string + var fldVals []string + if utils.IsSliceMember([]string{MetaString, MetaPrefix, utils.META_NONE}, flt.Type) { + fldType, fldName = flt.Type, flt.FieldName + fldVals = flt.Values + } + for _, fldVal := range fldVals { + if err = indexer.loadFldNameFldValIndex(fldType, + fldName, fldVal); err != nil && err != utils.ErrNotFound { + return err + } + } + } + indexer.IndexTPFilter(FilterToTPFilter(fltr), itemID) + } + return indexer.StoreIndexes(true, utils.NonTransactional) + +} diff --git a/engine/filterindexer.go b/engine/filterindexer.go index c4855062d..e03f21c34 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -86,8 +86,8 @@ func (rfi *FilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID st rfi.chngdIndxKeys[concatKey] = true } rfi.chngdRevIndxKeys[itemID] = true - default: - concatKey := utils.ConcatenatedKey(utils.MetaDefault, utils.ANY, utils.ANY) + case utils.META_NONE: + concatKey := utils.ConcatenatedKey(utils.META_NONE, utils.ANY, utils.ANY) if _, hasIt := rfi.indexes[concatKey]; !hasIt { rfi.indexes[concatKey] = make(utils.StringMap) } @@ -95,7 +95,7 @@ func (rfi *FilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID st rfi.reveseIndex[itemID] = make(utils.StringMap) } rfi.reveseIndex[itemID][concatKey] = true - rfi.indexes[concatKey][itemID] = true // Fields without real field index will be located in map[*any:*any][rl.ID] + rfi.indexes[concatKey][itemID] = true } } return diff --git a/engine/filterindexer_it_test.go b/engine/filterindexer_it_test.go index 9e0f4b347..707070a69 100644 --- a/engine/filterindexer_it_test.go +++ b/engine/filterindexer_it_test.go @@ -470,7 +470,7 @@ func testITTestThresholdFilterIndexes(t *testing.T) { func testITTestAttributeProfileFilterIndexes(t *testing.T) { fp := &Filter{ Tenant: "cgrates.org", - ID: "Filter1", + ID: "AttrFilter", Rules: []*FilterRule{ &FilterRule{ FieldName: "EventType", @@ -489,7 +489,7 @@ func testITTestAttributeProfileFilterIndexes(t *testing.T) { attrProfile := &AttributeProfile{ Tenant: "cgrates.org", ID: "AttrPrf", - FilterIDs: []string{"Filter1"}, + FilterIDs: []string{"AttrFilter"}, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), }, @@ -838,17 +838,17 @@ func testITTestIndexingWithEmptyFltrID(t *testing.T) { t.Error(err) } eIdxes := map[string]utils.StringMap{ - "*default:*any:*any": utils.StringMap{ + "*none:*any:*any": utils.StringMap{ "THD_Test": true, "THD_Test2": true, }, } reverseIdxes := map[string]utils.StringMap{ "THD_Test": utils.StringMap{ - "*default:*any:*any": true, + "*none:*any:*any": true, }, "THD_Test2": utils.StringMap{ - "*default:*any:*any": true, + "*none:*any:*any": true, }, } rfi := NewFilterIndexer(onStor, utils.ThresholdProfilePrefix, th.Tenant) @@ -875,7 +875,7 @@ func testITTestIndexingWithEmptyFltrID(t *testing.T) { "THD_Test2": true, } if rcvMp, err := dataManager.MatchFilterIndex(utils.CacheThresholdFilterIndexes, th.Tenant, - utils.MetaDefault, utils.META_ANY, utils.META_ANY); err != nil { + utils.META_NONE, utils.META_ANY, utils.META_ANY); err != nil { t.Error(err) } else if !reflect.DeepEqual(eMp, rcvMp) { t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp) @@ -937,17 +937,17 @@ func testITTestIndexingWithEmptyFltrID2(t *testing.T) { t.Error(err) } eIdxes := map[string]utils.StringMap{ - "*default:*any:*any": utils.StringMap{ + "*none:*any:*any": utils.StringMap{ "SPL_Weight": true, "SPL_Weight2": true, }, } reverseIdxes := map[string]utils.StringMap{ "SPL_Weight": utils.StringMap{ - "*default:*any:*any": true, + "*none:*any:*any": true, }, "SPL_Weight2": utils.StringMap{ - "*default:*any:*any": true, + "*none:*any:*any": true, }, } rfi := NewFilterIndexer(onStor, utils.SupplierProfilePrefix, splProfile.Tenant) @@ -974,7 +974,7 @@ func testITTestIndexingWithEmptyFltrID2(t *testing.T) { "SPL_Weight2": true, } if rcvMp, err := dataManager.MatchFilterIndex(utils.CacheSupplierFilterIndexes, - splProfile.Tenant, utils.MetaDefault, utils.META_ANY, utils.META_ANY); err != nil { + splProfile.Tenant, utils.META_NONE, utils.META_ANY, utils.META_ANY); err != nil { t.Error(err) } else if !reflect.DeepEqual(eMp, rcvMp) { t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp) @@ -1011,11 +1011,6 @@ func testITTestIndexingThresholds(t *testing.T) { t.Error(err) } eIdxes := map[string]utils.StringMap{ - "*default:*any:*any": utils.StringMap{ - "TH1": true, - "TH2": true, - "TH3": true, - }, "*string:Account:1001": utils.StringMap{ "TH1": true, "TH2": true, @@ -1026,15 +1021,12 @@ func testITTestIndexingThresholds(t *testing.T) { } reverseIdxes := map[string]utils.StringMap{ "TH1": utils.StringMap{ - "*default:*any:*any": true, "*string:Account:1001": true, }, "TH2": utils.StringMap{ - "*default:*any:*any": true, "*string:Account:1001": true, }, "TH3": utils.StringMap{ - "*default:*any:*any": true, "*string:Account:1002": true, }, } @@ -1060,10 +1052,9 @@ func testITTestIndexingThresholds(t *testing.T) { eMp := utils.StringMap{ "TH1": true, "TH2": true, - "TH3": true, } if rcvMp, err := dataManager.MatchFilterIndex(utils.CacheThresholdFilterIndexes, th.Tenant, - utils.MetaDefault, utils.META_ANY, utils.META_ANY); err != nil { + utils.MetaString, utils.Account, "1001"); err != nil { t.Error(err) } else if !reflect.DeepEqual(eMp, rcvMp) { t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp)