Add RemoveIndex in RemoveThresholdProfile

This commit is contained in:
TeoV
2017-12-23 13:56:56 +02:00
committed by Dan Christian Bogos
parent b7178df83f
commit 2089f3b7b3
4 changed files with 59 additions and 60 deletions

View File

@@ -384,33 +384,25 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool)
return
}
if withIndex {
thdsIndexers := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant)
for _, fltrID := range th.FilterIDs {
var fltr *Filter
if fltr, err = dm.GetFilter(th.Tenant, fltrID, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th)
}
return
}
thdsIndexers.IndexFilters(th.ID, fltr)
}
if dm.DataDB().GetStorageType() == utils.REDIS {
if err = NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix,
th.Tenant).RemoveItemFromIndex(th.ID); err != nil {
if err.Error() == utils.ErrNotFound.Error() {
if err = thdsIndexers.StoreIndexes(); err != nil {
return
}
}
}
if err = thdsIndexers.StoreIndexes(); err != nil {
return
}
}
if err = thdsIndexers.StoreIndexes(); err != nil {
return
}
//ToDo : update SetThresholdProfile to store indexes
// thdsIndexers := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant)
// for _, fltrID := range th.FilterIDs {
// var fltr *Filter
// if fltr, err = dm.GetFilter(th.Tenant, fltrID, false, utils.NonTransactional); err != nil {
// if err == utils.ErrNotFound {
// err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th)
// }
// return
// }
// thdsIndexers.IndexFilters(th.ID, fltr)
// }
// if dm.DataDB().GetStorageType() == utils.REDIS {
// if err = thdsIndexers.RemoveItemFromIndex(th.ID); err != nil &&
// err.Error() != utils.ErrNotFound.Error() {
// return
// }
// }
//return thdsIndexers.StoreIndexes()
}
return
}

View File

@@ -157,7 +157,7 @@ func (rfi *ReqFilterIndexer) loadItemReverseIndex(itemID string) (err error) {
return err
}
//Populate ReqFilterIndexer.indexes with specific fieldName,fieldValue , item
//Populate ReqFilterIndexer.indexes with specific fieldName:fieldValue , item
func (rfi *ReqFilterIndexer) loadFldNameFldValIndex(fldName, fldVal string) error {
rcvIdx, err := rfi.dm.GetFilterIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false),
@@ -166,14 +166,10 @@ func (rfi *ReqFilterIndexer) loadFldNameFldValIndex(fldName, fldVal string) erro
return err
}
for fldName, nameValMp := range rcvIdx {
for key, _ := range nameValMp {
if _, has := rfi.indexes[key]; !has {
rfi.indexes[key] = make(utils.StringMap)
}
rfi.indexes[key] = utils.StringMap{
fldName: true,
}
if _, has := rfi.indexes[fldName]; !has {
rfi.indexes[fldName] = make(utils.StringMap)
}
rfi.indexes[fldName] = nameValMp
}
return nil
}
@@ -199,7 +195,9 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) {
}
}
}
if err = rfi.StoreIndexes(); err != nil {
if err = rfi.dm.SetFilterIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false),
rfi.indexes); err != nil {
return
}
if err = rfi.dm.RemoveFilterReverseIndexes(

View File

@@ -70,8 +70,8 @@ var sTestsOnStorIT = []func(t *testing.T){
testOnStorITCacheFilter,
testOnStorITCacheSupplierProfile,
testOnStorITCacheAttributeProfile,
// // // ToDo: test cache flush for a prefix
// // // ToDo: testOnStorITLoadAccountingCache
// ToDo: test cache flush for a prefix
// ToDo: testOnStorITLoadAccountingCache
testOnStorITHasData,
testOnStorITPushPop,
testOnStorITCRUDRatingPlan,
@@ -2471,7 +2471,8 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) {
} else if !reflect.DeepEqual(th, rcv) {
t.Errorf("Expecting: %v, received: %v", th, rcv)
}
if err := onStor.RemoveThresholdProfile(th.Tenant, th.ID, utils.NonTransactional); err != nil {
//rcv NotFound because SetThresholdProfile don't store Indexes (for now)
if err := onStor.RemoveThresholdProfile(th.Tenant, th.ID, utils.NonTransactional); err != utils.ErrNotFound {
t.Error(err)
}
if _, rcvErr := onStor.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
@@ -2765,7 +2766,7 @@ func testOnStorITTestNewFilterIndexes(t *testing.T) {
t.Errorf("Expecting %+v, received: %+v", reverseIdxes, reverseRcvIdx)
}
}
//replace old filter with two filters
//replace old filter with two different filters
fp3 := &Filter{
Tenant: "cgrates.org",
ID: "Filter3",

View File

@@ -1928,7 +1928,7 @@ func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) {
return nil
}
//GetFilterIndexesDrv retrieves Indexes from dataDB
// GetFilterIndexesDrv retrieves Indexes from dataDB
func (ms *MongoStorage) GetFilterIndexesDrv(dbKey string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
session, col := ms.conn(colRFI)
@@ -1940,8 +1940,9 @@ func (ms *MongoStorage) GetFilterIndexesDrv(dbKey string,
findParam := bson.M{"key": dbKey}
if len(fldNameVal) != 0 {
for fldName, fldValue := range fldNameVal {
qryFltr := bson.M{fmt.Sprintf("value.%s", utils.ConcatenatedKey(fldName, fldValue)): 1}
if err = col.Find(findParam).Select(qryFltr).One(&result); err != nil {
qryFltr := fmt.Sprintf("value.%s", utils.ConcatenatedKey(fldName, fldValue))
if err = col.Find(bson.M{"key": dbKey, qryFltr: bson.M{"$exists": true}}).Select(
bson.M{qryFltr: true}).One(&result); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
}
@@ -1966,18 +1967,24 @@ func (ms *MongoStorage) GetFilterIndexesDrv(dbKey string,
return indexes, nil
}
//SetFilterIndexesDrv stores Indexes into DataDB
// SetFilterIndexesDrv stores Indexes into DataDB
func (ms *MongoStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
mp := make(map[string][]string)
pairs := []interface{}{}
for key, itmMp := range indexes {
mp[key] = itmMp.Slice()
param := fmt.Sprintf("value.%s", key)
pairs = append(pairs, bson.M{"key": dbKey})
if len(itmMp) == 0 {
pairs = append(pairs, bson.M{"$unset": bson.M{param: 1}})
} else {
pairs = append(pairs, bson.M{"$set": bson.M{"key": dbKey, param: itmMp.Slice()}})
}
}
_, err = col.Upsert(bson.M{"key": dbKey}, &struct {
Key string
Value map[string][]string
}{dbKey, mp})
bulk := col.Bulk()
bulk.Unordered()
bulk.Upsert(pairs...)
_, err = bulk.Run()
return
}
@@ -1990,7 +1997,7 @@ func (ms *MongoStorage) RemoveFilterIndexesDrv(id string) (err error) {
return nil
}
//GetFilterReverseIndexesDrv retrieves ReverseIndexes from dataDB
// GetFilterReverseIndexesDrv retrieves ReverseIndexes from dataDB
func (ms *MongoStorage) GetFilterReverseIndexesDrv(dbKey string,
fldNameVal map[string]string) (revIdx map[string]utils.StringMap, err error) {
session, col := ms.conn(colRFI)
@@ -2001,9 +2008,10 @@ func (ms *MongoStorage) GetFilterReverseIndexesDrv(dbKey string,
}
findParam := bson.M{"key": dbKey}
if len(fldNameVal) != 0 {
for fldName, fldValue := range fldNameVal {
qryFltr := bson.M{fmt.Sprintf("value.%s", utils.ConcatenatedKey(fldName, fldValue)): 1}
if err = col.Find(findParam).Select(qryFltr).One(&result); err != nil {
for fldName, _ := range fldNameVal {
qryFltr := fmt.Sprintf("value.%s", fldName)
if err = col.Find(bson.M{"key": dbKey, qryFltr: bson.M{"$exists": true}}).Select(
bson.M{qryFltr: true}).One(&result); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
}
@@ -2074,7 +2082,7 @@ func (ms *MongoStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (item
return
}
// GetStatQueueProfile retrieves a StatQueueProfile from dataDB
// GetStatQueueProfileDrv retrieves a StatQueueProfile from dataDB
func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) {
session, col := ms.conn(colSqp)
defer session.Close()
@@ -2087,7 +2095,7 @@ func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *St
return
}
// SetStatsQueueDrv stores a StatsQueue into DataDB
// SetStatQueueProfileDrv stores a StatsQueue into DataDB
func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) {
session, col := ms.conn(colSqp)
defer session.Close()
@@ -2095,7 +2103,7 @@ func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error)
return
}
// RemStatsQueueDrv removes a StatsQueue from dataDB
// RemStatQueueProfileDrv removes a StatsQueue from dataDB
func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) {
session, col := ms.conn(colSqp)
err = col.Remove(bson.M{"tenant": tenant, "id": id})
@@ -2106,7 +2114,7 @@ func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) {
return
}
// GetStoredStatQueue retrieves a StoredStatQueue
// GetStoredStatQueueDrv retrieves a StoredStatQueue
func (ms *MongoStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) {
session, col := ms.conn(colSqs)
defer session.Close()
@@ -2119,7 +2127,7 @@ func (ms *MongoStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStat
return
}
// SetStoredStatQueue stores the metrics for a StoredStatQueue
// SetStoredStatQueueDrv stores the metrics for a StoredStatQueue
func (ms *MongoStorage) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) {
session, col := ms.conn(colSqs)
defer session.Close()
@@ -2127,7 +2135,7 @@ func (ms *MongoStorage) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) {
return
}
// RemStatQueue removes stored metrics for a StoredStatQueue
// RemStoredStatQueueDrv removes stored metrics for a StoredStatQueue
func (ms *MongoStorage) RemStoredStatQueueDrv(tenant, id string) (err error) {
session, col := ms.conn(colSqs)
defer session.Close()