Added RemoveIndexes

This commit is contained in:
Trial97
2020-06-04 17:19:18 +03:00
committed by Dan Christian Bogos
parent 180182a264
commit 2f0f3f6bd5
5 changed files with 44 additions and 42 deletions

View File

@@ -3281,13 +3281,13 @@ func (dm *DataManager) Reconnect(marshaller string, newcfg *config.DataDbCfg) (e
return
}
func (dm *DataManager) GetIndexes(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) {
func (dm *DataManager) GetIndexes(idxItmType, tntCtx, idxKey string, cahceRead, cacheWrite bool) (indexes map[string]utils.StringSet, err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
var cachekey string
if idxKey != utils.EmptyString {
if idxKey != utils.EmptyString { // do not check cache if we want all the indexes
cachekey = utils.ConcatenatedKey(tntCtx, idxKey)
if x, ok := Cache.Get(idxItmType, cachekey); ok { // Attempt to find in cache first
@@ -3373,15 +3373,12 @@ func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string,
}
return
}
*/
func (dm *DataManager) RemoveFilterIndexes(cacheID, itemIDPrefix string) (err error) {
func (dm *DataManager) RemoveIndexes(idxItmType, tntCtx, idxKey string) (err error) {
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
if err = dm.DataDB().RemoveFilterIndexesDrv(cacheID, itemIDPrefix); err != nil {
return
}
return
return dm.DataDB().RemoveIndexesDrv(idxItmType, tntCtx, idxKey)
}
*/

View File

@@ -101,7 +101,7 @@ type DataDB interface {
GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error)
SetIndexesDrv(idxItmType, tntCtx string,
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error)
RemoveIndexesDrv(idxItmType, tntCtx string) (err error)
RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error)
MatchFilterIndexDrv(cacheID, itemIDPrefix,
filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error)

View File

@@ -1518,7 +1518,12 @@ func (iDB *InternalDB) SetIndexesDrv(idxItmType, tntCtx string,
cacheCommit(transactionID), transactionID)
return nil
}
func (iDB *InternalDB) RemoveIndexesDrv(idxItmType, tntCtx string) (err error) {
iDB.db.Remove(idxItmType, tntCtx, cacheCommit(utils.NonTransactional), utils.NonTransactional)
func (iDB *InternalDB) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error) {
if idxKey == utils.EmptyString {
iDB.db.RemoveGroup(idxItmType, tntCtx, true, utils.EmptyString)
return
}
iDB.db.Remove(idxItmType, utils.ConcatenatedKey(tntCtx, idxKey), cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}

View File

@@ -1744,15 +1744,6 @@ func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) {
// GetFilterIndexesDrv retrieves Indexes from dataDB
//filterType is used togheter with fieldName:Val
/*
dataManager.GetFilterIndexesDrv(
utils.CacheAttributeFilterIndexes,
"cgrates.org:*sessions", utils.MetaString, map[string]string{
"Subject": "dan",
})
*/
func (ms *MongoStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
type result struct {
@@ -2380,16 +2371,15 @@ func (ms *MongoStorage) RemoveRateProfileDrv(tenant, id string) (err error) {
// GetIndexesDrv retrieves Indexes from dataDB
// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context
// id is used as a concatenated key in case of filterIndexes the id will be filterType:fieldName:fieldVal
func (ms *MongoStorage) GetIndexesDrv(cacheID, key, id string) (indexes map[string]utils.StringSet, err error) {
func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) {
type result struct {
Key string
Value []string
}
dbKey := utils.CacheInstanceToPrefix[cacheID] + key
dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
var q bson.M
if len(id) != 0 {
q = bson.M{"key": dbKey + id}
if len(idxKey) != 0 {
q = bson.M{"key": utils.ConcatenatedKey(dbKey, idxKey)}
} else {
for _, character := range []string{".", "*"} {
dbKey = strings.Replace(dbKey, character, `\`+character, strings.Count(dbKey, character))
@@ -2415,7 +2405,7 @@ func (ms *MongoStorage) GetIndexesDrv(cacheID, key, id string) (indexes map[stri
keys := strings.Split(elem.Key, ":")
indexKey := utils.ConcatenatedKey(keys[1], keys[2], keys[3])
//check here if key has context
if len(strings.Split(key, ":")) == 2 {
if len(strings.Split(tntCtx, ":")) == 2 {
indexKey = utils.ConcatenatedKey(keys[2], keys[3], keys[4])
}
indexes[indexKey] = utils.NewStringSet(elem.Value)
@@ -2432,9 +2422,9 @@ func (ms *MongoStorage) GetIndexesDrv(cacheID, key, id string) (indexes map[stri
// SetIndexesDrv stores Indexes into DataDB
// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context
func (ms *MongoStorage) SetIndexesDrv(cacheID, key string,
func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string,
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) {
originKey := utils.CacheInstanceToPrefix[cacheID] + key
originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID)
@@ -2454,10 +2444,10 @@ func (ms *MongoStorage) SetIndexesDrv(cacheID, key string,
return err
}
var lastErr error
for key, itmMp := range indexes {
for idxKey, itmMp := range indexes {
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(originKey, key)},
bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(originKey, key), "value": itmMp.AsSlice()}},
_, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(originKey, idxKey)},
bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(originKey, idxKey), "value": itmMp.AsSlice()}},
options.Update().SetUpsert(true),
)
return err
@@ -2475,15 +2465,15 @@ func (ms *MongoStorage) SetIndexesDrv(cacheID, key string,
})
}
var lastErr error
for key, itmMp := range indexes {
for idxKey, itmMp := range indexes {
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
var action bson.M
if len(itmMp) == 0 {
action = bson.M{"$unset": bson.M{"value": 1}}
} else {
action = bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(dbKey, key), "value": itmMp.AsSlice()}}
action = bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(dbKey, idxKey), "value": itmMp.AsSlice()}}
}
_, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, key)},
_, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, idxKey)},
action, options.Update().SetUpsert(true),
)
return err
@@ -2496,8 +2486,18 @@ func (ms *MongoStorage) SetIndexesDrv(cacheID, key string,
// RemoveIndexesDrv removes the indexes
// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context
func (ms *MongoStorage) RemoveIndexesDrv(cacheID, key string) (err error) {
regexKey := utils.CacheInstanceToPrefix[cacheID] + key
func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error) {
if len(idxKey) != 0 {
return ms.query(func(sctx mongo.SessionContext) (err error) {
dr, err := ms.getCol(ColIndx).DeleteOne(sctx,
bson.M{"key": utils.ConcatenatedKey(utils.CacheInstanceToPrefix[idxItmType]+tntCtx, idxKey)})
if dr.DeletedCount == 0 {
return utils.ErrNotFound
}
return err
})
}
regexKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
for _, character := range []string{".", "*"} {
regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character))
}

View File

@@ -1781,7 +1781,7 @@ func (rs *RedisStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexe
return
}
// SetFilterIndexesDrv stores Indexes into DataDB
// SetIndexesDrv stores Indexes into DataDB
func (rs *RedisStorage) SetIndexesDrv(idxItmType, tntCtx string,
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) {
originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
@@ -1799,11 +1799,11 @@ func (rs *RedisStorage) SetIndexesDrv(idxItmType, tntCtx string,
nameValSls = append(nameValSls, key)
continue
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
encodedMp, err := rs.ms.Marshal(strMp)
if err != nil {
return err
} else {
mp[key] = string(encodedMp)
}
mp[key] = string(encodedMp)
}
if len(nameValSls) != 1 {
if err = rs.Cmd(redis_HDEL, nameValSls...).Err; err != nil {
@@ -1816,6 +1816,6 @@ func (rs *RedisStorage) SetIndexesDrv(idxItmType, tntCtx string,
return
}
func (rs *RedisStorage) RemoveIndexesDrv(idxItmType, tntCtx string) (err error) {
return rs.Cmd(redis_DEL, utils.CacheInstanceToPrefix[idxItmType]+tntCtx).Err
func (rs *RedisStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error) {
return rs.Cmd(redis_DEL, utils.CacheInstanceToPrefix[idxItmType]+utils.ConcatenatedKey(tntCtx, idxKey)).Err
}