From 2089f3b7b3e2a18f990aefa1048b9a0f19d1c680 Mon Sep 17 00:00:00 2001 From: TeoV Date: Sat, 23 Dec 2017 13:56:56 +0200 Subject: [PATCH] Add RemoveIndex in RemoveThresholdProfile --- engine/datamanager.go | 46 ++++++++++++++------------------ engine/filterindexer.go | 16 +++++------- engine/onstor_it_test.go | 9 ++++--- engine/storage_mongo_datadb.go | 48 ++++++++++++++++++++-------------- 4 files changed, 59 insertions(+), 60 deletions(-) diff --git a/engine/datamanager.go b/engine/datamanager.go index bfe32556e..49b489948 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -384,33 +384,25 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) return } if withIndex { - thdsIndexers := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant) - for _, fltrID := range th.FilterIDs { - var fltr *Filter - if fltr, err = dm.GetFilter(th.Tenant, fltrID, false, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th) - } - return - } - thdsIndexers.IndexFilters(th.ID, fltr) - } - if dm.DataDB().GetStorageType() == utils.REDIS { - if err = NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, - th.Tenant).RemoveItemFromIndex(th.ID); err != nil { - if err.Error() == utils.ErrNotFound.Error() { - if err = thdsIndexers.StoreIndexes(); err != nil { - return - } - } - } - if err = thdsIndexers.StoreIndexes(); err != nil { - return - } - } - if err = thdsIndexers.StoreIndexes(); err != nil { - return - } + //ToDo : update SetThresholdProfile to store indexes + // thdsIndexers := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant) + // for _, fltrID := range th.FilterIDs { + // var fltr *Filter + // if fltr, err = dm.GetFilter(th.Tenant, fltrID, false, utils.NonTransactional); err != nil { + // if err == utils.ErrNotFound { + // err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th) + // } + // return + // } + // thdsIndexers.IndexFilters(th.ID, fltr) + // } + // if dm.DataDB().GetStorageType() == utils.REDIS { + // if err = thdsIndexers.RemoveItemFromIndex(th.ID); err != nil && + // err.Error() != utils.ErrNotFound.Error() { + // return + // } + // } + //return thdsIndexers.StoreIndexes() } return } diff --git a/engine/filterindexer.go b/engine/filterindexer.go index 501b81c76..5a13fba31 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -157,7 +157,7 @@ func (rfi *ReqFilterIndexer) loadItemReverseIndex(itemID string) (err error) { return err } -//Populate ReqFilterIndexer.indexes with specific fieldName,fieldValue , item +//Populate ReqFilterIndexer.indexes with specific fieldName:fieldValue , item func (rfi *ReqFilterIndexer) loadFldNameFldValIndex(fldName, fldVal string) error { rcvIdx, err := rfi.dm.GetFilterIndexes( GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), @@ -166,14 +166,10 @@ func (rfi *ReqFilterIndexer) loadFldNameFldValIndex(fldName, fldVal string) erro return err } for fldName, nameValMp := range rcvIdx { - for key, _ := range nameValMp { - if _, has := rfi.indexes[key]; !has { - rfi.indexes[key] = make(utils.StringMap) - } - rfi.indexes[key] = utils.StringMap{ - fldName: true, - } + if _, has := rfi.indexes[fldName]; !has { + rfi.indexes[fldName] = make(utils.StringMap) } + rfi.indexes[fldName] = nameValMp } return nil } @@ -199,7 +195,9 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) { } } } - if err = rfi.StoreIndexes(); err != nil { + if err = rfi.dm.SetFilterIndexes( + GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), + rfi.indexes); err != nil { return } if err = rfi.dm.RemoveFilterReverseIndexes( diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 76efac2bb..822c907f8 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -70,8 +70,8 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCacheFilter, testOnStorITCacheSupplierProfile, testOnStorITCacheAttributeProfile, - // // // ToDo: test cache flush for a prefix - // // // ToDo: testOnStorITLoadAccountingCache + // ToDo: test cache flush for a prefix + // ToDo: testOnStorITLoadAccountingCache testOnStorITHasData, testOnStorITPushPop, testOnStorITCRUDRatingPlan, @@ -2471,7 +2471,8 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) { } else if !reflect.DeepEqual(th, rcv) { t.Errorf("Expecting: %v, received: %v", th, rcv) } - if err := onStor.RemoveThresholdProfile(th.Tenant, th.ID, utils.NonTransactional); err != nil { + //rcv NotFound because SetThresholdProfile don't store Indexes (for now) + if err := onStor.RemoveThresholdProfile(th.Tenant, th.ID, utils.NonTransactional); err != utils.ErrNotFound { t.Error(err) } if _, rcvErr := onStor.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { @@ -2765,7 +2766,7 @@ func testOnStorITTestNewFilterIndexes(t *testing.T) { t.Errorf("Expecting %+v, received: %+v", reverseIdxes, reverseRcvIdx) } } - //replace old filter with two filters + //replace old filter with two different filters fp3 := &Filter{ Tenant: "cgrates.org", ID: "Filter3", diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 86f0f47c5..4e4285d87 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1928,7 +1928,7 @@ func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) { return nil } -//GetFilterIndexesDrv retrieves Indexes from dataDB +// GetFilterIndexesDrv retrieves Indexes from dataDB func (ms *MongoStorage) GetFilterIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) { session, col := ms.conn(colRFI) @@ -1940,8 +1940,9 @@ func (ms *MongoStorage) GetFilterIndexesDrv(dbKey string, findParam := bson.M{"key": dbKey} if len(fldNameVal) != 0 { for fldName, fldValue := range fldNameVal { - qryFltr := bson.M{fmt.Sprintf("value.%s", utils.ConcatenatedKey(fldName, fldValue)): 1} - if err = col.Find(findParam).Select(qryFltr).One(&result); err != nil { + qryFltr := fmt.Sprintf("value.%s", utils.ConcatenatedKey(fldName, fldValue)) + if err = col.Find(bson.M{"key": dbKey, qryFltr: bson.M{"$exists": true}}).Select( + bson.M{qryFltr: true}).One(&result); err != nil { if err == mgo.ErrNotFound { err = utils.ErrNotFound } @@ -1966,18 +1967,24 @@ func (ms *MongoStorage) GetFilterIndexesDrv(dbKey string, return indexes, nil } -//SetFilterIndexesDrv stores Indexes into DataDB +// SetFilterIndexesDrv stores Indexes into DataDB func (ms *MongoStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) { session, col := ms.conn(colRFI) defer session.Close() - mp := make(map[string][]string) + pairs := []interface{}{} for key, itmMp := range indexes { - mp[key] = itmMp.Slice() + param := fmt.Sprintf("value.%s", key) + pairs = append(pairs, bson.M{"key": dbKey}) + if len(itmMp) == 0 { + pairs = append(pairs, bson.M{"$unset": bson.M{param: 1}}) + } else { + pairs = append(pairs, bson.M{"$set": bson.M{"key": dbKey, param: itmMp.Slice()}}) + } } - _, err = col.Upsert(bson.M{"key": dbKey}, &struct { - Key string - Value map[string][]string - }{dbKey, mp}) + bulk := col.Bulk() + bulk.Unordered() + bulk.Upsert(pairs...) + _, err = bulk.Run() return } @@ -1990,7 +1997,7 @@ func (ms *MongoStorage) RemoveFilterIndexesDrv(id string) (err error) { return nil } -//GetFilterReverseIndexesDrv retrieves ReverseIndexes from dataDB +// GetFilterReverseIndexesDrv retrieves ReverseIndexes from dataDB func (ms *MongoStorage) GetFilterReverseIndexesDrv(dbKey string, fldNameVal map[string]string) (revIdx map[string]utils.StringMap, err error) { session, col := ms.conn(colRFI) @@ -2001,9 +2008,10 @@ func (ms *MongoStorage) GetFilterReverseIndexesDrv(dbKey string, } findParam := bson.M{"key": dbKey} if len(fldNameVal) != 0 { - for fldName, fldValue := range fldNameVal { - qryFltr := bson.M{fmt.Sprintf("value.%s", utils.ConcatenatedKey(fldName, fldValue)): 1} - if err = col.Find(findParam).Select(qryFltr).One(&result); err != nil { + for fldName, _ := range fldNameVal { + qryFltr := fmt.Sprintf("value.%s", fldName) + if err = col.Find(bson.M{"key": dbKey, qryFltr: bson.M{"$exists": true}}).Select( + bson.M{qryFltr: true}).One(&result); err != nil { if err == mgo.ErrNotFound { err = utils.ErrNotFound } @@ -2074,7 +2082,7 @@ func (ms *MongoStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (item return } -// GetStatQueueProfile retrieves a StatQueueProfile from dataDB +// GetStatQueueProfileDrv retrieves a StatQueueProfile from dataDB func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) { session, col := ms.conn(colSqp) defer session.Close() @@ -2087,7 +2095,7 @@ func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *St return } -// SetStatsQueueDrv stores a StatsQueue into DataDB +// SetStatQueueProfileDrv stores a StatsQueue into DataDB func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { session, col := ms.conn(colSqp) defer session.Close() @@ -2095,7 +2103,7 @@ func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) return } -// RemStatsQueueDrv removes a StatsQueue from dataDB +// RemStatQueueProfileDrv removes a StatsQueue from dataDB func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { session, col := ms.conn(colSqp) err = col.Remove(bson.M{"tenant": tenant, "id": id}) @@ -2106,7 +2114,7 @@ func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { return } -// GetStoredStatQueue retrieves a StoredStatQueue +// GetStoredStatQueueDrv retrieves a StoredStatQueue func (ms *MongoStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) { session, col := ms.conn(colSqs) defer session.Close() @@ -2119,7 +2127,7 @@ func (ms *MongoStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStat return } -// SetStoredStatQueue stores the metrics for a StoredStatQueue +// SetStoredStatQueueDrv stores the metrics for a StoredStatQueue func (ms *MongoStorage) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) { session, col := ms.conn(colSqs) defer session.Close() @@ -2127,7 +2135,7 @@ func (ms *MongoStorage) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) { return } -// RemStatQueue removes stored metrics for a StoredStatQueue +// RemStoredStatQueueDrv removes stored metrics for a StoredStatQueue func (ms *MongoStorage) RemStoredStatQueueDrv(tenant, id string) (err error) { session, col := ms.conn(colSqs) defer session.Close()