Updated SetThresholdProfile to work with indexes

This commit is contained in:
edwardro22
2017-12-28 15:51:48 +02:00
committed by Dan Christian Bogos
parent 5cf3cd59ec
commit 77b5a25d22
7 changed files with 85 additions and 95 deletions

View File

@@ -135,8 +135,8 @@ func testPopulateAttrService(t *testing.T) {
}
prefix := utils.ConcatenatedKey(sev.Tenant, *sev.Context)
ref := NewReqFilterIndexer(dmAtr, utils.AttributeProfilePrefix, prefix)
ref.IndexFilters("attributeprofile1", filter1)
ref.IndexFilters("attributeprofile2", filter2)
ref.IndexTPFilter(FilterToTPFilter(filter1), "attributeprofile1")
ref.IndexTPFilter(FilterToTPFilter(filter2), "attributeprofile2")
err = ref.StoreIndexes()
if err != nil {
t.Errorf("Error: %+v", err)

View File

@@ -384,25 +384,25 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool)
return
}
if withIndex {
//ToDo : update SetThresholdProfile to store indexes
// thdsIndexers := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant)
// for _, fltrID := range th.FilterIDs {
// var fltr *Filter
// if fltr, err = dm.GetFilter(th.Tenant, fltrID, false, utils.NonTransactional); err != nil {
// if err == utils.ErrNotFound {
// err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th)
// }
// return
// }
// thdsIndexers.IndexFilters(th.ID, fltr)
// }
// if dm.DataDB().GetStorageType() == utils.REDIS {
// if err = thdsIndexers.RemoveItemFromIndex(th.ID); err != nil &&
// err.Error() != utils.ErrNotFound.Error() {
// return
// }
// }
//return thdsIndexers.StoreIndexes()
thdsIndexers := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant)
//remove old ThresholdProfile indexes
if err = thdsIndexers.RemoveItemFromIndex(th.ID); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
return
}
//Verify matching Filters for every FilterID from ThresholdProfile
for _, fltrID := range th.FilterIDs {
var fltr *Filter
if fltr, err = dm.GetFilter(th.Tenant, fltrID, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th)
}
return
}
thdsIndexers.IndexTPFilter(FilterToTPFilter(fltr), th.ID)
}
return thdsIndexers.StoreIndexes()
}
return
}

View File

@@ -54,43 +54,6 @@ func (rfi *ReqFilterIndexer) ChangedKeys(reverse bool) utils.StringMap {
return rfi.chngdIndxKeys
}
// IndexFilters parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys
func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs *Filter) {
var hasMetaString bool
if _, hasIt := rfi.reveseIndex[itemID]; !hasIt {
rfi.reveseIndex[itemID] = make(utils.StringMap)
}
for _, fltr := range reqFltrs.RequestFilters {
if fltr.Type != MetaString {
continue
}
hasMetaString = true // Mark that we found at least one metatring so we don't index globally
if _, hastIt := rfi.reveseIndex[itemID]; !hastIt {
rfi.reveseIndex[itemID] = make(utils.StringMap)
}
for _, fldVal := range fltr.Values {
if _, hasIt := rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)]; !hasIt {
rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = make(utils.StringMap)
}
rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)][itemID] = true
rfi.reveseIndex[itemID][utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true
rfi.chngdIndxKeys[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true
}
rfi.chngdRevIndxKeys[itemID] = true
}
if !hasMetaString {
if _, hasIt := rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)]; !hasIt {
rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)] = make(utils.StringMap)
}
if _, hastIt := rfi.reveseIndex[itemID]; !hastIt {
rfi.reveseIndex[itemID] = make(utils.StringMap)
}
rfi.reveseIndex[itemID][utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)] = true
rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE:NOT_AVAILABLE][rl.ID]
}
return
}
// IndexTPFilter parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys
func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID string) {
var hasMetaString bool
@@ -102,28 +65,24 @@ func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID
continue
}
hasMetaString = true // Mark that we found at least one metatring so we don't index globally
if _, hastIt := rfi.reveseIndex[itemID]; !hastIt {
rfi.reveseIndex[itemID] = make(utils.StringMap)
}
for _, fldVal := range fltr.Values {
if _, hasIt := rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)]; !hasIt {
rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = make(utils.StringMap)
concatKey := utils.ConcatenatedKey(fltr.FieldName, fldVal)
if _, hasIt := rfi.indexes[concatKey]; !hasIt {
rfi.indexes[concatKey] = make(utils.StringMap)
}
rfi.indexes[utils.ConcatenatedKey(fltr.FieldName, fldVal)][itemID] = true
rfi.reveseIndex[itemID][utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true
rfi.chngdIndxKeys[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true
rfi.indexes[concatKey][itemID] = true
rfi.reveseIndex[itemID][concatKey] = true
rfi.chngdIndxKeys[concatKey] = true
}
rfi.chngdRevIndxKeys[itemID] = true
}
if !hasMetaString {
if _, hasIt := rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)]; !hasIt {
rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)] = make(utils.StringMap)
naConcatKey := utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)
if _, hasIt := rfi.indexes[naConcatKey]; !hasIt {
rfi.indexes[naConcatKey] = make(utils.StringMap)
}
if _, hastIt := rfi.reveseIndex[itemID]; !hastIt {
rfi.reveseIndex[itemID] = make(utils.StringMap)
}
rfi.reveseIndex[itemID][utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)] = true
rfi.indexes[utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE:NOT_AVAILABLE][rl.ID]
rfi.reveseIndex[itemID][naConcatKey] = true
rfi.indexes[naConcatKey][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE:NOT_AVAILABLE][rl.ID]
}
return
}
@@ -191,18 +150,19 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) {
for _, itmMp := range rfi.indexes {
for range itmMp {
if _, has := itmMp[itemID]; has {
delete(itmMp, itemID)
delete(itmMp, itemID) //Force deleting in driver
}
}
}
rfi.reveseIndex[itemID] = make(utils.StringMap) //Force deleting in driver
if err = rfi.dm.SetFilterIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false),
rfi.indexes); err != nil {
return
}
if err = rfi.dm.RemoveFilterReverseIndexes(
if err = rfi.dm.SetFilterReverseIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true),
itemID); err != nil {
rfi.reveseIndex); err != nil {
return
}
return

View File

@@ -105,7 +105,7 @@ var sTestsOnStorIT = []func(t *testing.T){
testOnStorITCRUDAttributeProfile,
testOnStorITFlush,
testOnStorITIsDBEmpty,
//testOnStorITTestNewFilterIndexes,
testOnStorITTestNewFilterIndexes,
}
func TestOnStorITRedisConnect(t *testing.T) {
@@ -965,12 +965,17 @@ func testOnStorITCacheStatQueueProfile(t *testing.T) {
},
QueueLength: 10,
TTL: time.Duration(10) * time.Second,
Metrics: []string{"ASR"},
Thresholds: []string{"Th1"},
Blocker: true,
Stored: true,
Weight: 20,
MinItems: 1,
Metrics: []*utils.MetricWithParams{
&utils.MetricWithParams{
MetricID: "ASR",
Parameters: "",
},
},
Thresholds: []string{"Th1"},
Blocker: true,
Stored: true,
Weight: 20,
MinItems: 1,
}
if err := onStor.SetStatQueueProfile(statProfile); err != nil {
t.Error(err)
@@ -2344,9 +2349,11 @@ func testOnStorITCRUDStatQueueProfile(t *testing.T) {
FilterIDs: []string{},
QueueLength: 2,
TTL: timeTTL,
Metrics: []string{},
Stored: true,
Thresholds: []string{},
Metrics: []*utils.MetricWithParams{
&utils.MetricWithParams{},
},
Stored: true,
Thresholds: []string{},
}
if _, rcvErr := onStor.GetStatQueueProfile(sq.Tenant, sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
@@ -2471,9 +2478,8 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) {
} else if !reflect.DeepEqual(th, rcv) {
t.Errorf("Expecting: %v, received: %v", th, rcv)
}
//rcv NotFound because SetThresholdProfile don't store Indexes (for now)
if err := onStor.RemoveThresholdProfile(th.Tenant, th.ID, utils.NonTransactional); err != utils.ErrNotFound {
t.Error(err)
if err := onStor.RemoveThresholdProfile(th.Tenant, th.ID, utils.NonTransactional); err != nil {
t.Error(err)
}
if _, rcvErr := onStor.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)

View File

@@ -1479,13 +1479,24 @@ func (rs *RedisStorage) GetFilterReverseIndexesDrv(dbKey string,
//SetFilterReverseIndexesDrv stores ReverseIndexes into DataDB
func (rs *RedisStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[string]utils.StringMap) (err error) {
mp := make(map[string]string)
nameValSls := []interface{}{dbKey}
for key, strMp := range revIdx {
if len(strMp) == 0 { // remove with no more elements inside
nameValSls = append(nameValSls, key)
continue
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
return err
} else {
mp[key] = string(encodedMp)
}
}
if len(nameValSls) != 1 {
if err = rs.Cmd("HDEL", nameValSls...).Err; err != nil {
return err
}
}
if len(mp) != 0 {
return rs.Cmd("HMSET", dbKey, mp).Err
}

View File

@@ -1552,10 +1552,23 @@ func testStorDBitCRUDTpStats(t *testing.T) {
},
QueueLength: 100,
TTL: "1s",
Metrics: []string{"*asr", "*acd", "*acc"},
Thresholds: []string{"THRESH1", "THRESH2"},
Weight: 20.0,
MinItems: 1,
Metrics: []*utils.MetricWithParams{
&utils.MetricWithParams{
MetricID: "*asr",
Parameters: "",
},
&utils.MetricWithParams{
MetricID: "*acd",
Parameters: "",
},
&utils.MetricWithParams{
MetricID: "*acc",
Parameters: "",
},
},
Thresholds: []string{"THRESH1", "THRESH2"},
Weight: 20.0,
MinItems: 1,
},
}

View File

@@ -273,8 +273,8 @@ func TestSuppliersPopulateSupplierService(t *testing.T) {
dmspl.DataDB().SetSupplierProfileDrv(spr)
}
ref := NewReqFilterIndexer(dmspl, utils.SupplierProfilePrefix, "cgrates.org")
ref.IndexFilters("supplierprofile1", filter3)
ref.IndexFilters("supplierprofile2", filter4)
ref.IndexTPFilter(FilterToTPFilter(filter3), "attributeprofile1")
ref.IndexTPFilter(FilterToTPFilter(filter4), "attributeprofile2")
err = ref.StoreIndexes()
if err != nil {
t.Errorf("Error: %+v", err)