RemoveIndex for mongo

This commit is contained in:
TeoV
2017-12-16 11:02:05 +02:00
committed by Dan Christian Bogos
parent 2d888b1717
commit c14855e8eb
8 changed files with 48 additions and 37 deletions

View File

@@ -847,8 +847,8 @@ func (dm *DataManager) GetReqFilterIndexes(dbKey string, fldNameVal map[string]s
return dm.DataDB().GetReqFilterIndexesDrv(dbKey, fldNameVal)
}
func (dm *DataManager) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
return dm.DataDB().SetReqFilterIndexesDrv(dbKey, indexes)
func (dm *DataManager) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
return dm.DataDB().SetReqFilterIndexesDrv(dbKey, indexes, update)
}
func (dm *DataManager) RemoveReqFilterIndexes(dbKey string) (err error) {

View File

@@ -135,11 +135,11 @@ func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID
}
// StoreIndexes handles storing the indexes to dataDB
func (rfi *ReqFilterIndexer) StoreIndexes() (err error) {
if err = rfi.dm.SetReqFilterIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), rfi.indexes); err != nil {
func (rfi *ReqFilterIndexer) StoreIndexes(update bool) (err error) {
if err = rfi.dm.SetReqFilterIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), rfi.indexes, update); err != nil {
return
}
return rfi.dm.SetReqFilterIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), rfi.reveseIndex)
return rfi.dm.SetReqFilterIndexes(GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), rfi.reveseIndex, update)
}
//Populate the ReqFilterIndexer.reveseIndex for specifil itemID
@@ -202,9 +202,9 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) {
}
}
}
rfi.StoreIndexes()
utils.Logger.Debug(fmt.Sprintf("Indexes : %+v \n", rfi.indexes))
rfi.StoreIndexes(true)
return
}
//GetDBIndexKey return the dbKey for an specific item

View File

@@ -118,7 +118,7 @@ type DataDB interface {
GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error)
AddLoadHistory(*utils.LoadInstance, int, string) error
GetReqFilterIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]map[string]utils.StringMap, err error)
SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error)
SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error)
RemoveReqFilterIndexesDrv(id string) (err error)
MatchReqFilterIndexDrv(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error)

View File

@@ -1287,7 +1287,7 @@ func (ms *MapStorage) GetReqFilterIndexesDrv(dbKey string,
}
return
}
func (ms *MapStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
func (ms *MapStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(indexes)

View File

@@ -1963,23 +1963,30 @@ func (ms *MongoStorage) GetReqFilterIndexesDrv(dbKey string,
return result.Value, nil
}
func (ms *MongoStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
func (ms *MongoStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
for k, v := range indexes {
for k2, v2 := range v {
for k3, _ := range v2 {
findParam2 := bson.M{fmt.Sprintf("value.%s.%s", k, k2, k3): nil}
if len(v2) == 0 {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": findParam2})
if update {
utils.Logger.Debug("\n ENTER HERE !! \n")
for k, v := range indexes {
for k2, v2 := range v {
findParam2 := fmt.Sprintf("value.%s.%s", k, k2)
utils.Logger.Debug(fmt.Sprintf("\nFINDPARAM2 %+v and len(v2) %+v \n", findParam2, len(v2)))
if len(v2) != 0 {
for k3 := range v2 {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$set": bson.M{findParam2: bson.M{k3: true}}})
}
} else {
err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam2: 1}})
}
}
}
} else {
_, err = col.Upsert(bson.M{"key": dbKey}, &struct {
Key string
Value map[string]map[string]utils.StringMap
}{dbKey, indexes})
}
_, err = col.Upsert(bson.M{"key": dbKey}, &struct {
Key string
Value map[string]map[string]utils.StringMap
}{dbKey, indexes})
return
}

View File

@@ -1412,21 +1412,25 @@ func (rs *RedisStorage) GetReqFilterIndexesDrv(dbKey string,
return
}
func (rs *RedisStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
func (rs *RedisStorage) SetReqFilterIndexesDrv(dbKey string, indexes map[string]map[string]utils.StringMap, update bool) (err error) {
mp := make(map[string]string)
for fldName, fldValMp := range indexes {
for fldVal, strMp := range fldValMp {
if len(strMp) == 0 { // remove with no more elements inside
return rs.Cmd("HDEL", dbKey, utils.ConcatenatedKey(fldName, fldVal)).Err
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
return err
} else {
mp[utils.ConcatenatedKey(fldName, fldVal)] = string(encodedMp)
if update {
for fldName, fldValMp := range indexes {
for fldVal, strMp := range fldValMp {
if len(strMp) == 0 { // remove with no more elements inside
return rs.Cmd("HDEL", dbKey, utils.ConcatenatedKey(fldName, fldVal)).Err
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
return err
} else {
mp[utils.ConcatenatedKey(fldName, fldVal)] = string(encodedMp)
}
}
}
} else {
return rs.Cmd("HMSET", dbKey, mp).Err
}
return rs.Cmd("HMSET", dbKey, mp).Err
return
}
func (rs *RedisStorage) RemoveReqFilterIndexesDrv(id string) (err error) {

View File

@@ -276,7 +276,7 @@ func TestSuppliersPopulateSupplierService(t *testing.T) {
ref := NewReqFilterIndexer(dmspl, utils.SupplierProfilePrefix, "cgrates.org")
ref.IndexFilters("supplierprofile1", filters1)
ref.IndexFilters("supplierprofile2", filters2)
err = ref.StoreIndexes()
err = ref.StoreIndexes(false)
if err != nil {
t.Errorf("Error: %+v", err)
}

View File

@@ -2335,7 +2335,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("Indexing resource profiles")
}
for tenant, fltrIdxer := range tpr.resIndexers {
if err := fltrIdxer.StoreIndexes(); err != nil {
if err := fltrIdxer.StoreIndexes(false); err != nil {
return err
}
if verbose {
@@ -2350,7 +2350,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("StatQueue filter indexes:")
}
for tenant, fltrIdxer := range tpr.sqpIndexers {
if err := fltrIdxer.StoreIndexes(); err != nil {
if err := fltrIdxer.StoreIndexes(false); err != nil {
return err
}
if verbose {
@@ -2365,7 +2365,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("Threshold filter indexes:")
}
for tenant, fltrIdxer := range tpr.thdsIndexers {
if err := fltrIdxer.StoreIndexes(); err != nil {
if err := fltrIdxer.StoreIndexes(false); err != nil {
return err
}
if verbose {
@@ -2380,7 +2380,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("Indexing Supplier Profiles")
}
for tenant, fltrIdxer := range tpr.sppIndexers {
if err := fltrIdxer.StoreIndexes(); err != nil {
if err := fltrIdxer.StoreIndexes(false); err != nil {
return err
}
if verbose {
@@ -2395,7 +2395,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("Indexing Attribute Profiles")
}
for tntCntx, fltrIdxer := range tpr.attrIndexers {
if err := fltrIdxer.StoreIndexes(); err != nil {
if err := fltrIdxer.StoreIndexes(false); err != nil {
return err
}
if verbose {