From 77b5a25d220c816b21e1161ec3c1641bdaea4956 Mon Sep 17 00:00:00 2001 From: edwardro22 Date: Thu, 28 Dec 2017 15:51:48 +0200 Subject: [PATCH] Updated SetThresholdProfile to work with indexes --- engine/attributes_test.go | 4 +-- engine/datamanager.go | 38 ++++++++++----------- engine/filterindexer.go | 70 +++++++++------------------------------ engine/onstor_it_test.go | 32 ++++++++++-------- engine/storage_redis.go | 11 ++++++ engine/stordb_it_test.go | 21 +++++++++--- engine/suppliers_test.go | 4 +-- 7 files changed, 85 insertions(+), 95 deletions(-) diff --git a/engine/attributes_test.go b/engine/attributes_test.go index 1f319fe88..82678c44e 100644 --- a/engine/attributes_test.go +++ b/engine/attributes_test.go @@ -135,8 +135,8 @@ func testPopulateAttrService(t *testing.T) { } prefix := utils.ConcatenatedKey(sev.Tenant, *sev.Context) ref := NewReqFilterIndexer(dmAtr, utils.AttributeProfilePrefix, prefix) - ref.IndexFilters("attributeprofile1", filter1) - ref.IndexFilters("attributeprofile2", filter2) + ref.IndexTPFilter(FilterToTPFilter(filter1), "attributeprofile1") + ref.IndexTPFilter(FilterToTPFilter(filter2), "attributeprofile2") err = ref.StoreIndexes() if err != nil { t.Errorf("Error: %+v", err) diff --git a/engine/datamanager.go b/engine/datamanager.go index 49b489948..568a013f6 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -384,25 +384,25 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) return } if withIndex { - //ToDo : update SetThresholdProfile to store indexes - // thdsIndexers := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant) - // for _, fltrID := range th.FilterIDs { - // var fltr *Filter - // if fltr, err = dm.GetFilter(th.Tenant, fltrID, false, utils.NonTransactional); err != nil { - // if err == utils.ErrNotFound { - // err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th) - // } - // return - // } - // thdsIndexers.IndexFilters(th.ID, fltr) - // } - // if dm.DataDB().GetStorageType() == utils.REDIS { - // if err = thdsIndexers.RemoveItemFromIndex(th.ID); err != nil && - // err.Error() != utils.ErrNotFound.Error() { - // return - // } - // } - //return thdsIndexers.StoreIndexes() + thdsIndexers := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant) + //remove old ThresholdProfile indexes + if err = thdsIndexers.RemoveItemFromIndex(th.ID); err != nil && + err.Error() != utils.ErrNotFound.Error() { + return + } + + //Verify matching Filters for every FilterID from ThresholdProfile + for _, fltrID := range th.FilterIDs { + var fltr *Filter + if fltr, err = dm.GetFilter(th.Tenant, fltrID, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th) + } + return + } + thdsIndexers.IndexTPFilter(FilterToTPFilter(fltr), th.ID) + } + return thdsIndexers.StoreIndexes() } return } diff --git a/engine/filterindexer.go b/engine/filterindexer.go index 5a13fba31..8cab463f0 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -54,43 +54,6 @@ func (rfi *ReqFilterIndexer) ChangedKeys(reverse bool) utils.StringMap { return rfi.chngdIndxKeys } -// IndexFilters parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys -func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs *Filter) { - var hasMetaString bool - if _, hasIt := rfi.reveseIndex[itemID]; !hasIt { - rfi.reveseIndex[itemID] = make(utils.StringMap) - } - for _, fltr := range reqFltrs.RequestFilters { - if fltr.Type != MetaString { - continue - } - hasMetaString = true // Mark that we found at least one metatring so we don't index globally - if _, hastIt := rfi.reveseIndex[itemID]; !hastIt { - rfi.reveseIndex[itemID] = make(utils.StringMap) - } - for _, fldVal := range fltr.Values { - if _, hasIt := rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)]; !hasIt { - rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = make(utils.StringMap) - } - rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)][itemID] = true - rfi.reveseIndex[itemID][utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true - rfi.chngdIndxKeys[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true - } - rfi.chngdRevIndxKeys[itemID] = true - } - if !hasMetaString { - if _, hasIt := rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)]; !hasIt { - rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)] = make(utils.StringMap) - } - if _, hastIt := rfi.reveseIndex[itemID]; !hastIt { - rfi.reveseIndex[itemID] = make(utils.StringMap) - } - rfi.reveseIndex[itemID][utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)] = true - rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE:NOT_AVAILABLE][rl.ID] - } - return -} - // IndexTPFilter parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID string) { var hasMetaString bool @@ -102,28 +65,24 @@ func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID continue } hasMetaString = true // Mark that we found at least one metatring so we don't index globally - if _, hastIt := rfi.reveseIndex[itemID]; !hastIt { - rfi.reveseIndex[itemID] = make(utils.StringMap) - } for _, fldVal := range fltr.Values { - if _, hasIt := rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)]; !hasIt { - rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = make(utils.StringMap) + concatKey := utils.ConcatenatedKey(fltr.FieldName, fldVal) + if _, hasIt := rfi.indexes[concatKey]; !hasIt { + rfi.indexes[concatKey] = make(utils.StringMap) } - rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)][itemID] = true - rfi.reveseIndex[itemID][utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true - rfi.chngdIndxKeys[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true + rfi.indexes[concatKey][itemID] = true + rfi.reveseIndex[itemID][concatKey] = true + rfi.chngdIndxKeys[concatKey] = true } rfi.chngdRevIndxKeys[itemID] = true } if !hasMetaString { - if _, hasIt := rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)]; !hasIt { - rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)] = make(utils.StringMap) + naConcatKey := utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE) + if _, hasIt := rfi.indexes[naConcatKey]; !hasIt { + rfi.indexes[naConcatKey] = make(utils.StringMap) } - if _, hastIt := rfi.reveseIndex[itemID]; !hastIt { - rfi.reveseIndex[itemID] = make(utils.StringMap) - } - rfi.reveseIndex[itemID][utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)] = true - rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE:NOT_AVAILABLE][rl.ID] + rfi.reveseIndex[itemID][naConcatKey] = true + rfi.indexes[naConcatKey][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE:NOT_AVAILABLE][rl.ID] } return } @@ -191,18 +150,19 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) { for _, itmMp := range rfi.indexes { for range itmMp { if _, has := itmMp[itemID]; has { - delete(itmMp, itemID) + delete(itmMp, itemID) //Force deleting in driver } } } + rfi.reveseIndex[itemID] = make(utils.StringMap) //Force deleting in driver if err = rfi.dm.SetFilterIndexes( GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), rfi.indexes); err != nil { return } - if err = rfi.dm.RemoveFilterReverseIndexes( + if err = rfi.dm.SetFilterReverseIndexes( GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), - itemID); err != nil { + rfi.reveseIndex); err != nil { return } return diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 822c907f8..8feae3feb 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -105,7 +105,7 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCRUDAttributeProfile, testOnStorITFlush, testOnStorITIsDBEmpty, - //testOnStorITTestNewFilterIndexes, + testOnStorITTestNewFilterIndexes, } func TestOnStorITRedisConnect(t *testing.T) { @@ -965,12 +965,17 @@ func testOnStorITCacheStatQueueProfile(t *testing.T) { }, QueueLength: 10, TTL: time.Duration(10) * time.Second, - Metrics: []string{"ASR"}, - Thresholds: []string{"Th1"}, - Blocker: true, - Stored: true, - Weight: 20, - MinItems: 1, + Metrics: []*utils.MetricWithParams{ + &utils.MetricWithParams{ + MetricID: "ASR", + Parameters: "", + }, + }, + Thresholds: []string{"Th1"}, + Blocker: true, + Stored: true, + Weight: 20, + MinItems: 1, } if err := onStor.SetStatQueueProfile(statProfile); err != nil { t.Error(err) @@ -2344,9 +2349,11 @@ func testOnStorITCRUDStatQueueProfile(t *testing.T) { FilterIDs: []string{}, QueueLength: 2, TTL: timeTTL, - Metrics: []string{}, - Stored: true, - Thresholds: []string{}, + Metrics: []*utils.MetricWithParams{ + &utils.MetricWithParams{}, + }, + Stored: true, + Thresholds: []string{}, } if _, rcvErr := onStor.GetStatQueueProfile(sq.Tenant, sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) @@ -2471,9 +2478,8 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) { } else if !reflect.DeepEqual(th, rcv) { t.Errorf("Expecting: %v, received: %v", th, rcv) } - //rcv NotFound because SetThresholdProfile don't store Indexes (for now) - if err := onStor.RemoveThresholdProfile(th.Tenant, th.ID, utils.NonTransactional); err != utils.ErrNotFound { - t.Error(err) + if err := onStor.RemoveThresholdProfile(th.Tenant, th.ID, utils.NonTransactional); err != nil { + t.Error(err) } if _, rcvErr := onStor.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 4cb35f5f3..fdc442ca2 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1479,13 +1479,24 @@ func (rs *RedisStorage) GetFilterReverseIndexesDrv(dbKey string, //SetFilterReverseIndexesDrv stores ReverseIndexes into DataDB func (rs *RedisStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[string]utils.StringMap) (err error) { mp := make(map[string]string) + nameValSls := []interface{}{dbKey} for key, strMp := range revIdx { + if len(strMp) == 0 { // remove with no more elements inside + nameValSls = append(nameValSls, key) + continue + } if encodedMp, err := rs.ms.Marshal(strMp); err != nil { return err } else { mp[key] = string(encodedMp) } } + if len(nameValSls) != 1 { + if err = rs.Cmd("HDEL", nameValSls...).Err; err != nil { + return err + } + } + if len(mp) != 0 { return rs.Cmd("HMSET", dbKey, mp).Err } diff --git a/engine/stordb_it_test.go b/engine/stordb_it_test.go index 3d600a3bc..bbf42be17 100755 --- a/engine/stordb_it_test.go +++ b/engine/stordb_it_test.go @@ -1552,10 +1552,23 @@ func testStorDBitCRUDTpStats(t *testing.T) { }, QueueLength: 100, TTL: "1s", - Metrics: []string{"*asr", "*acd", "*acc"}, - Thresholds: []string{"THRESH1", "THRESH2"}, - Weight: 20.0, - MinItems: 1, + Metrics: []*utils.MetricWithParams{ + &utils.MetricWithParams{ + MetricID: "*asr", + Parameters: "", + }, + &utils.MetricWithParams{ + MetricID: "*acd", + Parameters: "", + }, + &utils.MetricWithParams{ + MetricID: "*acc", + Parameters: "", + }, + }, + Thresholds: []string{"THRESH1", "THRESH2"}, + Weight: 20.0, + MinItems: 1, }, } diff --git a/engine/suppliers_test.go b/engine/suppliers_test.go index c75bc664e..9bd489b13 100644 --- a/engine/suppliers_test.go +++ b/engine/suppliers_test.go @@ -273,8 +273,8 @@ func TestSuppliersPopulateSupplierService(t *testing.T) { dmspl.DataDB().SetSupplierProfileDrv(spr) } ref := NewReqFilterIndexer(dmspl, utils.SupplierProfilePrefix, "cgrates.org") - ref.IndexFilters("supplierprofile1", filter3) - ref.IndexFilters("supplierprofile2", filter4) + ref.IndexTPFilter(FilterToTPFilter(filter3), "attributeprofile1") + ref.IndexTPFilter(FilterToTPFilter(filter4), "attributeprofile2") err = ref.StoreIndexes() if err != nil { t.Errorf("Error: %+v", err)