Updated internalDB index handling

This commit is contained in:
Trial97
2021-07-07 19:53:13 +03:00
committed by Dan Christian Bogos
parent 45a6d46cab
commit 7ef629e930

View File

@@ -21,6 +21,7 @@ package engine
import (
"errors"
"fmt"
"strings"
"sync"
"time"
@@ -918,103 +919,99 @@ func (iDB *InternalDB) AddLoadHistory(*utils.LoadInstance, int, string) error {
return nil
}
func (iDB *InternalDB) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string,
func (iDB *InternalDB) GetFilterIndexesDrv(cacheID, tntCtx, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
dbKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
x, ok := iDB.db.Get(cacheID, dbKey)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
if len(fldNameVal) != 0 {
rcvidx := x.(map[string]utils.StringMap)
if len(fldNameVal) == 0 { // return all
indexes = make(map[string]utils.StringMap)
for fldName, fldVal := range fldNameVal {
if _, has := indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)]; !has {
indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)] = make(utils.StringMap)
}
if len(rcvidx[utils.ConcatenatedKey(filterType, fldName, fldVal)]) != 0 {
for key := range rcvidx[utils.ConcatenatedKey(filterType, fldName, fldVal)] {
indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)][key] = true
}
for _, dbKey := range iDB.db.GetGroupItemIDs(cacheID, tntCtx) {
x, ok := iDB.db.Get(cacheID, dbKey)
if !ok || x == nil {
continue
}
dbKey = strings.TrimPrefix(dbKey, utils.CacheInstanceToPrefix[cacheID]+tntCtx+utils.CONCATENATED_KEY_SEP)
indexes[dbKey] = x.(utils.StringMap).Clone()
}
return
} else {
indexes = x.(map[string]utils.StringMap)
if len(indexes) == 0 {
return nil, utils.ErrNotFound
}
}
return
}
func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string,
indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
originKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID)
}
if commit && transactionID != "" {
x, _ := iDB.db.Get(cacheID, dbKey)
iDB.db.Remove(cacheID, dbKey,
true, utils.NonTransactional)
iDB.db.Set(cacheID, originKey, x, nil,
true, utils.NonTransactional)
return
}
var toBeDeleted []string
toBeAdded := make(map[string]utils.StringMap)
for key, strMp := range indexes {
if len(strMp) == 0 { // remove with no more elements inside
toBeDeleted = append(toBeDeleted, key)
delete(indexes, key)
continue
indexes = make(map[string]utils.StringMap)
for fldName, fldVal := range fldNameVal {
idxKey := utils.ConcatenatedKey(filterType, fldName, fldVal)
dbKey := utils.ConcatenatedKey(utils.CacheInstanceToPrefix[cacheID]+tntCtx, idxKey)
x, ok := iDB.db.Get(cacheID, dbKey)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
toBeAdded[key] = make(utils.StringMap)
toBeAdded[key] = strMp
}
rcvidx := x.(utils.StringMap)
x, ok := iDB.db.Get(cacheID, dbKey)
if !ok || x == nil {
iDB.db.Set(cacheID, dbKey, toBeAdded, nil,
true, utils.NonTransactional)
return err
}
mp := x.(map[string]utils.StringMap)
for _, key := range toBeDeleted {
delete(mp, key)
}
for key, strMp := range toBeAdded {
if _, has := mp[key]; !has {
mp[key] = make(utils.StringMap)
if _, has := indexes[idxKey]; !has || len(indexes[idxKey]) == 0 {
indexes[idxKey] = rcvidx
} else {
for key := range rcvidx {
indexes[idxKey][key] = true
}
}
mp[key] = strMp
}
iDB.db.Set(cacheID, dbKey, mp, nil,
cacheCommit(transactionID), transactionID)
return nil
}
func (iDB *InternalDB) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) {
iDB.db.Remove(cacheID, utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix,
true, utils.NonTransactional)
return
}
func (iDB *InternalDB) MatchFilterIndexDrv(cacheID, itemIDPrefix,
filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) {
func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, tntCtx string,
indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
if commit && transactionID != utils.EmptyString {
for _, dbKey := range iDB.db.GetGroupItemIDs(cacheID, tntCtx) {
if !strings.HasPrefix(dbKey, "tmp_") || !strings.HasSuffix(dbKey, transactionID) {
continue
}
x, ok := iDB.db.Get(cacheID, dbKey)
if !ok || x == nil {
continue
}
iDB.db.Remove(cacheID, dbKey,
true, utils.NonTransactional)
key := strings.TrimSuffix(strings.TrimPrefix(dbKey, "tmp_"), utils.CONCATENATED_KEY_SEP+transactionID)
iDB.db.Set(cacheID, key, x, []string{tntCtx},
true, utils.NonTransactional)
}
return
}
for idxKey, indx := range indexes {
dbKey := utils.ConcatenatedKey(utils.CacheInstanceToPrefix[cacheID]+tntCtx, idxKey)
if transactionID != utils.EmptyString {
dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID)
}
if len(indx) == 0 {
iDB.db.Set(cacheID, dbKey, nil, []string{tntCtx},
true, utils.NonTransactional)
continue
}
//to be the same as HMSET
if x, ok := iDB.db.Get(cacheID, dbKey); ok && x != nil {
for key := range x.(utils.StringMap) {
indx[key] = true
}
}
iDB.db.Set(cacheID, dbKey, indx, []string{tntCtx},
true, utils.NonTransactional)
}
return
}
func (iDB *InternalDB) RemoveFilterIndexesDrv(cacheID, tntCtx string) (err error) {
iDB.db.RemoveGroup(cacheID, tntCtx, true, utils.EmptyString)
return
}
x, ok := iDB.db.Get(cacheID, utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix)
func (iDB *InternalDB) MatchFilterIndexDrv(cacheID, tntCtx,
filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) {
dbKey := utils.ConcatenatedKey(utils.CacheInstanceToPrefix[cacheID]+tntCtx, filterType, fieldName, fieldVal)
x, ok := iDB.db.Get(cacheID, dbKey)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
indexes := x.(map[string]utils.StringMap)
itemIDs = x.(utils.StringMap)
if _, hasIt := indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)]; hasIt {
itemIDs = indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)]
}
if len(itemIDs) == 0 {
return nil, utils.ErrNotFound
}