Add transactionID functionality on idx/revIdx

This commit is contained in:
TeoV
2018-01-29 13:27:59 +02:00
committed by Dan Christian Bogos
parent 47476bdbf0
commit bab62a6d11
7 changed files with 184 additions and 76 deletions

View File

@@ -996,8 +996,8 @@ func (dm *DataManager) GetFilterIndexes(dbKey, filterType string, fldNameVal map
return dm.DataDB().GetFilterIndexesDrv(dbKey, filterType, fldNameVal)
}
func (dm *DataManager) SetFilterIndexes(dbKey string, indexes map[string]utils.StringMap) (err error) {
return dm.DataDB().SetFilterIndexesDrv(dbKey, indexes)
func (dm *DataManager) SetFilterIndexes(dbKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
return dm.DataDB().SetFilterIndexesDrv(dbKey, indexes, commit, transactionID)
}
func (dm *DataManager) RemoveFilterIndexes(dbKey string) (err error) {
@@ -1008,8 +1008,8 @@ func (dm *DataManager) GetFilterReverseIndexes(dbKey string, fldNameVal map[stri
return dm.DataDB().GetFilterReverseIndexesDrv(dbKey, fldNameVal)
}
func (dm *DataManager) SetFilterReverseIndexes(dbKey string, indexes map[string]utils.StringMap) (err error) {
return dm.DataDB().SetFilterReverseIndexesDrv(dbKey, indexes)
func (dm *DataManager) SetFilterReverseIndexes(dbKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
return dm.DataDB().SetFilterReverseIndexesDrv(dbKey, indexes, commit, transactionID)
}
func (dm *DataManager) RemoveFilterReverseIndexes(dbKey string) (err error) {

View File

@@ -131,12 +131,12 @@ func (rfi *ReqFilterIndexer) cacheRemItemType() {
func (rfi *ReqFilterIndexer) StoreIndexes() (err error) {
if err = rfi.dm.SetFilterIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false),
rfi.indexes); err != nil {
rfi.indexes, false, utils.NonTransactional); err != nil {
return
}
if err = rfi.dm.SetFilterReverseIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true),
rfi.reveseIndex); err != nil {
rfi.reveseIndex, false, utils.NonTransactional); err != nil {
return
}
rfi.cacheRemItemType()
@@ -201,12 +201,12 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) {
rfi.reveseIndex[itemID] = make(utils.StringMap) //Force deleting in driver
if err = rfi.dm.SetFilterIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false),
rfi.indexes); err != nil {
rfi.indexes, false, utils.NonTransactional); err != nil {
return
}
if err = rfi.dm.SetFilterReverseIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true),
rfi.reveseIndex); err != nil {
rfi.reveseIndex, false, utils.NonTransactional); err != nil {
return
}
return

View File

@@ -88,6 +88,7 @@ var sTestsOnStorIT = []func(t *testing.T){
testOnStorITTestThresholdInlineFilterIndexing,
testOnStorITFlush,
testOnStorITTestAttributeSubstituteIface,
testOnStorITTestStoreFilterIndexesWithTransID,
//testOnStorITCacheActionTriggers,
//testOnStorITCacheAlias,
//testOnStorITCacheReverseAlias,
@@ -215,7 +216,7 @@ func testOnStorITSetFilterIndexes(t *testing.T) {
}
if err := onStor.SetFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
idxes); err != nil {
idxes, false, utils.NonTransactional); err != nil {
t.Error(err)
}
}
@@ -280,7 +281,7 @@ func testOnStorITGetFilterIndexes(t *testing.T) {
}
if err := onStor.SetFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
eIdxes); err != nil {
eIdxes, false, utils.NonTransactional); err != nil {
t.Error(err)
}
}
@@ -3264,3 +3265,59 @@ func testOnStorITTestAttributeSubstituteIface(t *testing.T) {
t.Errorf("Expecting: %v, received: %v", utils.ToJSON(attrProfile), utils.ToJSON(rcv))
}
}
func testOnStorITTestStoreFilterIndexesWithTransID(t *testing.T) {
tmpKey := "tmp_" + utils.ConcatenatedKey(GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), "transaction1")
idxes := map[string]utils.StringMap{
"*string:Account:1001": utils.StringMap{
"RL1": true,
},
"*string:Account:1002": utils.StringMap{
"RL1": true,
"RL2": true,
},
"*string:Account:dan": utils.StringMap{
"RL2": true,
},
"*string:Subject:dan": utils.StringMap{
"RL2": true,
"RL3": true,
},
utils.ConcatenatedKey(utils.MetaDefault, utils.ANY, utils.ANY): utils.StringMap{
"RL4": true,
"RL5": true,
},
}
if err := onStor.SetFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
idxes, false, "transaction1"); err != nil {
t.Error(err)
}
if rcv, err := onStor.GetFilterIndexes(tmpKey, MetaString, nil); err != nil {
t.Error(err)
} else {
if !reflect.DeepEqual(idxes, rcv) {
t.Errorf("Expecting: %+v, received: %+v", idxes, rcv)
}
}
//commit transaction
if err := onStor.SetFilterIndexes(
GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false),
idxes, true, "transaction1"); err != nil {
t.Error(err)
}
//verify if old key was deleted
if _, err := onStor.GetFilterIndexes(tmpKey, MetaString, nil); err != utils.ErrNotFound {
t.Error(err)
}
//verify new key and check if data was moved
newKey := utils.ConcatenatedKey(GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), "transaction1")
if rcv, err := onStor.GetFilterIndexes(newKey, MetaString, nil); err != nil {
t.Error(err)
} else {
if !reflect.DeepEqual(idxes, rcv) {
t.Errorf("Expecting: %+v, received: %+v", idxes, rcv)
}
}
}

View File

@@ -118,10 +118,10 @@ type DataDB interface {
GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error)
AddLoadHistory(*utils.LoadInstance, int, string) error
GetFilterIndexesDrv(dbKey, filterType string, fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error)
SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error)
SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error)
RemoveFilterIndexesDrv(id string) (err error)
GetFilterReverseIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error)
SetFilterReverseIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error)
SetFilterReverseIndexesDrv(dbKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error)
RemoveFilterReverseIndexesDrv(dbKey string) (err error)
MatchFilterIndexDrv(dbKey, filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error)

View File

@@ -1268,15 +1268,29 @@ func (ms *MapStorage) GetFilterIndexesDrv(dbKey, filterType string,
}
//SetFilterIndexesDrv stores Indexes into DataDB
func (ms *MapStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) {
func (ms *MapStorage) SetFilterIndexesDrv(originKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(indexes)
if err != nil {
return err
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID)
}
if commit && transactionID != "" {
delete(ms.dict, dbKey)
result, err := ms.ms.Marshal(indexes)
if err != nil {
return err
}
ms.dict[utils.ConcatenatedKey(originKey, transactionID)] = result
return nil
} else {
result, err := ms.ms.Marshal(indexes)
if err != nil {
return err
}
ms.dict[dbKey] = result
return nil
}
ms.dict[dbKey] = result
return
}
func (ms *MapStorage) RemoveFilterIndexesDrv(id string) (err error) {
@@ -1319,7 +1333,7 @@ func (ms *MapStorage) GetFilterReverseIndexesDrv(dbKey string,
}
//SetFilterReverseIndexesDrv stores ReverseIndexes into DataDB
func (ms *MapStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) {
func (ms *MapStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(indexes)

View File

@@ -1947,24 +1947,46 @@ func (ms *MongoStorage) GetFilterIndexesDrv(dbKey, filterType string,
}
// SetFilterIndexesDrv stores Indexes into DataDB
func (ms *MongoStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) {
func (ms *MongoStorage) SetFilterIndexesDrv(originKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
pairs := []interface{}{}
for key, itmMp := range indexes {
param := fmt.Sprintf("value.%s", key)
pairs = append(pairs, bson.M{"key": dbKey})
if len(itmMp) == 0 {
pairs = append(pairs, bson.M{"$unset": bson.M{param: 1}})
} else {
pairs = append(pairs, bson.M{"$set": bson.M{"key": dbKey, param: itmMp.Slice()}})
}
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID)
}
if len(pairs) != 0 {
bulk := col.Bulk()
bulk.Unordered()
bulk.Upsert(pairs...)
_, err = bulk.Run()
if commit && transactionID != "" {
oldKey := "tmp_" + utils.ConcatenatedKey(originKey, transactionID)
newKey := utils.ConcatenatedKey(originKey, transactionID)
pairs := []interface{}{}
for key, itmMp := range indexes {
param := fmt.Sprintf("value.%s", key)
pairs = append(pairs, bson.M{"key": newKey})
pairs = append(pairs, bson.M{"$set": bson.M{"key": newKey, param: itmMp.Slice()}})
}
if len(pairs) != 0 {
bulk := col.Bulk()
bulk.Unordered()
bulk.Upsert(pairs...)
_, err = bulk.Run()
}
return col.Remove(bson.M{"key": oldKey})
} else {
pairs := []interface{}{}
for key, itmMp := range indexes {
param := fmt.Sprintf("value.%s", key)
pairs = append(pairs, bson.M{"key": dbKey})
if len(itmMp) == 0 {
pairs = append(pairs, bson.M{"$unset": bson.M{param: 1}})
} else {
pairs = append(pairs, bson.M{"$set": bson.M{"key": dbKey, param: itmMp.Slice()}})
}
}
if len(pairs) != 0 {
bulk := col.Bulk()
bulk.Unordered()
bulk.Upsert(pairs...)
_, err = bulk.Run()
}
}
return
}
@@ -2021,7 +2043,7 @@ func (ms *MongoStorage) GetFilterReverseIndexesDrv(dbKey string,
}
//SetFilterReverseIndexesDrv stores ReverseIndexes into DataDB
func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[string]utils.StringMap) (err error) {
func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[string]utils.StringMap, commit bool, transactionID string) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
pairs := []interface{}{}

View File

@@ -1385,29 +1385,37 @@ func (rs *RedisStorage) GetFilterIndexesDrv(dbKey, filterType string,
}
//SetFilterIndexesDrv stores Indexes into DataDB
func (rs *RedisStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) {
mp := make(map[string]string)
nameValSls := []interface{}{dbKey}
for key, strMp := range indexes {
if len(strMp) == 0 { // remove with no more elements inside
nameValSls = append(nameValSls, key)
continue
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
return err
} else {
mp[key] = string(encodedMp)
}
func (rs *RedisStorage) SetFilterIndexesDrv(originKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID)
}
if len(nameValSls) != 1 {
if err = rs.Cmd("HDEL", nameValSls...).Err; err != nil {
return err
if commit && transactionID != "" {
return rs.Cmd("RENAME", dbKey, utils.ConcatenatedKey(originKey, transactionID)).Err
} else {
mp := make(map[string]string)
nameValSls := []interface{}{dbKey}
for key, strMp := range indexes {
if len(strMp) == 0 { // remove with no more elements inside
nameValSls = append(nameValSls, key)
continue
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
return err
} else {
mp[key] = string(encodedMp)
}
}
if len(nameValSls) != 1 {
if err = rs.Cmd("HDEL", nameValSls...).Err; err != nil {
return err
}
}
if len(mp) != 0 {
return rs.Cmd("HMSET", dbKey, mp).Err
}
return
}
if len(mp) != 0 {
return rs.Cmd("HMSET", dbKey, mp).Err
}
return
}
func (rs *RedisStorage) RemoveFilterIndexesDrv(id string) (err error) {
@@ -1452,30 +1460,37 @@ func (rs *RedisStorage) GetFilterReverseIndexesDrv(dbKey string,
}
//SetFilterReverseIndexesDrv stores ReverseIndexes into DataDB
func (rs *RedisStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[string]utils.StringMap) (err error) {
mp := make(map[string]string)
nameValSls := []interface{}{dbKey}
for key, strMp := range revIdx {
if len(strMp) == 0 { // remove with no more elements inside
nameValSls = append(nameValSls, key)
continue
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
return err
} else {
mp[key] = string(encodedMp)
}
func (rs *RedisStorage) SetFilterReverseIndexesDrv(originKey string, revIdx map[string]utils.StringMap, commit bool, transactionID string) (err error) {
dbKey := originKey
if transactionID != "" {
dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID)
}
if len(nameValSls) != 1 {
if err = rs.Cmd("HDEL", nameValSls...).Err; err != nil {
return err
if commit && transactionID != "" {
return rs.Cmd("RENAME", dbKey, utils.ConcatenatedKey(originKey, transactionID)).Err
} else {
mp := make(map[string]string)
nameValSls := []interface{}{dbKey}
for key, strMp := range revIdx {
if len(strMp) == 0 { // remove with no more elements inside
nameValSls = append(nameValSls, key)
continue
}
if encodedMp, err := rs.ms.Marshal(strMp); err != nil {
return err
} else {
mp[key] = string(encodedMp)
}
}
if len(nameValSls) != 1 {
if err = rs.Cmd("HDEL", nameValSls...).Err; err != nil {
return err
}
}
if len(mp) != 0 {
return rs.Cmd("HMSET", dbKey, mp).Err
}
return
}
if len(mp) != 0 {
return rs.Cmd("HMSET", dbKey, mp).Err
}
return
}
//RemoveFilterReverseIndexesDrv removes ReverseIndexes for a specific itemID