diff --git a/engine/datamanager.go b/engine/datamanager.go index d299006f0..6b2d83ed7 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -996,8 +996,8 @@ func (dm *DataManager) GetFilterIndexes(dbKey, filterType string, fldNameVal map return dm.DataDB().GetFilterIndexesDrv(dbKey, filterType, fldNameVal) } -func (dm *DataManager) SetFilterIndexes(dbKey string, indexes map[string]utils.StringMap) (err error) { - return dm.DataDB().SetFilterIndexesDrv(dbKey, indexes) +func (dm *DataManager) SetFilterIndexes(dbKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { + return dm.DataDB().SetFilterIndexesDrv(dbKey, indexes, commit, transactionID) } func (dm *DataManager) RemoveFilterIndexes(dbKey string) (err error) { @@ -1008,8 +1008,8 @@ func (dm *DataManager) GetFilterReverseIndexes(dbKey string, fldNameVal map[stri return dm.DataDB().GetFilterReverseIndexesDrv(dbKey, fldNameVal) } -func (dm *DataManager) SetFilterReverseIndexes(dbKey string, indexes map[string]utils.StringMap) (err error) { - return dm.DataDB().SetFilterReverseIndexesDrv(dbKey, indexes) +func (dm *DataManager) SetFilterReverseIndexes(dbKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { + return dm.DataDB().SetFilterReverseIndexesDrv(dbKey, indexes, commit, transactionID) } func (dm *DataManager) RemoveFilterReverseIndexes(dbKey string) (err error) { diff --git a/engine/filterindexer.go b/engine/filterindexer.go index bab173b2a..be04b83d7 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -131,12 +131,12 @@ func (rfi *ReqFilterIndexer) cacheRemItemType() { func (rfi *ReqFilterIndexer) StoreIndexes() (err error) { if err = rfi.dm.SetFilterIndexes( GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), - rfi.indexes); err != nil { + rfi.indexes, false, utils.NonTransactional); err != nil { return } if err = rfi.dm.SetFilterReverseIndexes( GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), - rfi.reveseIndex); err != nil { + rfi.reveseIndex, false, utils.NonTransactional); err != nil { return } rfi.cacheRemItemType() @@ -201,12 +201,12 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) { rfi.reveseIndex[itemID] = make(utils.StringMap) //Force deleting in driver if err = rfi.dm.SetFilterIndexes( GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), - rfi.indexes); err != nil { + rfi.indexes, false, utils.NonTransactional); err != nil { return } if err = rfi.dm.SetFilterReverseIndexes( GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, true), - rfi.reveseIndex); err != nil { + rfi.reveseIndex, false, utils.NonTransactional); err != nil { return } return diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index b5e7a3c17..ed8705b3a 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -88,6 +88,7 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITTestThresholdInlineFilterIndexing, testOnStorITFlush, testOnStorITTestAttributeSubstituteIface, + testOnStorITTestStoreFilterIndexesWithTransID, //testOnStorITCacheActionTriggers, //testOnStorITCacheAlias, //testOnStorITCacheReverseAlias, @@ -215,7 +216,7 @@ func testOnStorITSetFilterIndexes(t *testing.T) { } if err := onStor.SetFilterIndexes( GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), - idxes); err != nil { + idxes, false, utils.NonTransactional); err != nil { t.Error(err) } } @@ -280,7 +281,7 @@ func testOnStorITGetFilterIndexes(t *testing.T) { } if err := onStor.SetFilterIndexes( GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), - eIdxes); err != nil { + eIdxes, false, utils.NonTransactional); err != nil { t.Error(err) } } @@ -3264,3 +3265,59 @@ func testOnStorITTestAttributeSubstituteIface(t *testing.T) { t.Errorf("Expecting: %v, received: %v", utils.ToJSON(attrProfile), utils.ToJSON(rcv)) } } + +func testOnStorITTestStoreFilterIndexesWithTransID(t *testing.T) { + tmpKey := "tmp_" + utils.ConcatenatedKey(GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), "transaction1") + idxes := map[string]utils.StringMap{ + "*string:Account:1001": utils.StringMap{ + "RL1": true, + }, + "*string:Account:1002": utils.StringMap{ + "RL1": true, + "RL2": true, + }, + "*string:Account:dan": utils.StringMap{ + "RL2": true, + }, + "*string:Subject:dan": utils.StringMap{ + "RL2": true, + "RL3": true, + }, + utils.ConcatenatedKey(utils.MetaDefault, utils.ANY, utils.ANY): utils.StringMap{ + "RL4": true, + "RL5": true, + }, + } + if err := onStor.SetFilterIndexes( + GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), + idxes, false, "transaction1"); err != nil { + t.Error(err) + } + if rcv, err := onStor.GetFilterIndexes(tmpKey, MetaString, nil); err != nil { + t.Error(err) + } else { + if !reflect.DeepEqual(idxes, rcv) { + t.Errorf("Expecting: %+v, received: %+v", idxes, rcv) + } + } + //commit transaction + if err := onStor.SetFilterIndexes( + GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), + idxes, true, "transaction1"); err != nil { + t.Error(err) + } + //verify if old key was deleted + if _, err := onStor.GetFilterIndexes(tmpKey, MetaString, nil); err != utils.ErrNotFound { + t.Error(err) + } + //verify new key and check if data was moved + newKey := utils.ConcatenatedKey(GetDBIndexKey(utils.ResourceProfilesPrefix, "cgrates.org", false), "transaction1") + if rcv, err := onStor.GetFilterIndexes(newKey, MetaString, nil); err != nil { + t.Error(err) + } else { + if !reflect.DeepEqual(idxes, rcv) { + t.Errorf("Expecting: %+v, received: %+v", idxes, rcv) + } + } + +} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 05b7091ed..c07254cd9 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -118,10 +118,10 @@ type DataDB interface { GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error) AddLoadHistory(*utils.LoadInstance, int, string) error GetFilterIndexesDrv(dbKey, filterType string, fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) - SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) + SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) RemoveFilterIndexesDrv(id string) (err error) GetFilterReverseIndexesDrv(dbKey string, fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) - SetFilterReverseIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) + SetFilterReverseIndexesDrv(dbKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) RemoveFilterReverseIndexesDrv(dbKey string) (err error) MatchFilterIndexDrv(dbKey, filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 7427d7269..53a566d8f 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1268,15 +1268,29 @@ func (ms *MapStorage) GetFilterIndexesDrv(dbKey, filterType string, } //SetFilterIndexesDrv stores Indexes into DataDB -func (ms *MapStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) { +func (ms *MapStorage) SetFilterIndexesDrv(originKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() - result, err := ms.ms.Marshal(indexes) - if err != nil { - return err + dbKey := originKey + if transactionID != "" { + dbKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID) + } + if commit && transactionID != "" { + delete(ms.dict, dbKey) + result, err := ms.ms.Marshal(indexes) + if err != nil { + return err + } + ms.dict[utils.ConcatenatedKey(originKey, transactionID)] = result + return nil + } else { + result, err := ms.ms.Marshal(indexes) + if err != nil { + return err + } + ms.dict[dbKey] = result + return nil } - ms.dict[dbKey] = result - return } func (ms *MapStorage) RemoveFilterIndexesDrv(id string) (err error) { @@ -1319,7 +1333,7 @@ func (ms *MapStorage) GetFilterReverseIndexesDrv(dbKey string, } //SetFilterReverseIndexesDrv stores ReverseIndexes into DataDB -func (ms *MapStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) { +func (ms *MapStorage) SetFilterReverseIndexesDrv(dbKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(indexes) diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 9f59391fe..a3dd12a26 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1947,24 +1947,46 @@ func (ms *MongoStorage) GetFilterIndexesDrv(dbKey, filterType string, } // SetFilterIndexesDrv stores Indexes into DataDB -func (ms *MongoStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) { +func (ms *MongoStorage) SetFilterIndexesDrv(originKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { session, col := ms.conn(colRFI) defer session.Close() - pairs := []interface{}{} - for key, itmMp := range indexes { - param := fmt.Sprintf("value.%s", key) - pairs = append(pairs, bson.M{"key": dbKey}) - if len(itmMp) == 0 { - pairs = append(pairs, bson.M{"$unset": bson.M{param: 1}}) - } else { - pairs = append(pairs, bson.M{"$set": bson.M{"key": dbKey, param: itmMp.Slice()}}) - } + dbKey := originKey + if transactionID != "" { + dbKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID) } - if len(pairs) != 0 { - bulk := col.Bulk() - bulk.Unordered() - bulk.Upsert(pairs...) - _, err = bulk.Run() + if commit && transactionID != "" { + oldKey := "tmp_" + utils.ConcatenatedKey(originKey, transactionID) + newKey := utils.ConcatenatedKey(originKey, transactionID) + pairs := []interface{}{} + for key, itmMp := range indexes { + param := fmt.Sprintf("value.%s", key) + pairs = append(pairs, bson.M{"key": newKey}) + pairs = append(pairs, bson.M{"$set": bson.M{"key": newKey, param: itmMp.Slice()}}) + } + if len(pairs) != 0 { + bulk := col.Bulk() + bulk.Unordered() + bulk.Upsert(pairs...) + _, err = bulk.Run() + } + return col.Remove(bson.M{"key": oldKey}) + } else { + pairs := []interface{}{} + for key, itmMp := range indexes { + param := fmt.Sprintf("value.%s", key) + pairs = append(pairs, bson.M{"key": dbKey}) + if len(itmMp) == 0 { + pairs = append(pairs, bson.M{"$unset": bson.M{param: 1}}) + } else { + pairs = append(pairs, bson.M{"$set": bson.M{"key": dbKey, param: itmMp.Slice()}}) + } + } + if len(pairs) != 0 { + bulk := col.Bulk() + bulk.Unordered() + bulk.Upsert(pairs...) + _, err = bulk.Run() + } } return } @@ -2021,7 +2043,7 @@ func (ms *MongoStorage) GetFilterReverseIndexesDrv(dbKey string, } //SetFilterReverseIndexesDrv stores ReverseIndexes into DataDB -func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[string]utils.StringMap) (err error) { +func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[string]utils.StringMap, commit bool, transactionID string) (err error) { session, col := ms.conn(colRFI) defer session.Close() pairs := []interface{}{} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 1dae94978..e0fa21ced 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1385,29 +1385,37 @@ func (rs *RedisStorage) GetFilterIndexesDrv(dbKey, filterType string, } //SetFilterIndexesDrv stores Indexes into DataDB -func (rs *RedisStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]utils.StringMap) (err error) { - mp := make(map[string]string) - nameValSls := []interface{}{dbKey} - for key, strMp := range indexes { - if len(strMp) == 0 { // remove with no more elements inside - nameValSls = append(nameValSls, key) - continue - } - if encodedMp, err := rs.ms.Marshal(strMp); err != nil { - return err - } else { - mp[key] = string(encodedMp) - } +func (rs *RedisStorage) SetFilterIndexesDrv(originKey string, indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { + dbKey := originKey + if transactionID != "" { + dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID) } - if len(nameValSls) != 1 { - if err = rs.Cmd("HDEL", nameValSls...).Err; err != nil { - return err + if commit && transactionID != "" { + return rs.Cmd("RENAME", dbKey, utils.ConcatenatedKey(originKey, transactionID)).Err + } else { + mp := make(map[string]string) + nameValSls := []interface{}{dbKey} + for key, strMp := range indexes { + if len(strMp) == 0 { // remove with no more elements inside + nameValSls = append(nameValSls, key) + continue + } + if encodedMp, err := rs.ms.Marshal(strMp); err != nil { + return err + } else { + mp[key] = string(encodedMp) + } } + if len(nameValSls) != 1 { + if err = rs.Cmd("HDEL", nameValSls...).Err; err != nil { + return err + } + } + if len(mp) != 0 { + return rs.Cmd("HMSET", dbKey, mp).Err + } + return } - if len(mp) != 0 { - return rs.Cmd("HMSET", dbKey, mp).Err - } - return } func (rs *RedisStorage) RemoveFilterIndexesDrv(id string) (err error) { @@ -1452,30 +1460,37 @@ func (rs *RedisStorage) GetFilterReverseIndexesDrv(dbKey string, } //SetFilterReverseIndexesDrv stores ReverseIndexes into DataDB -func (rs *RedisStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[string]utils.StringMap) (err error) { - mp := make(map[string]string) - nameValSls := []interface{}{dbKey} - for key, strMp := range revIdx { - if len(strMp) == 0 { // remove with no more elements inside - nameValSls = append(nameValSls, key) - continue - } - if encodedMp, err := rs.ms.Marshal(strMp); err != nil { - return err - } else { - mp[key] = string(encodedMp) - } +func (rs *RedisStorage) SetFilterReverseIndexesDrv(originKey string, revIdx map[string]utils.StringMap, commit bool, transactionID string) (err error) { + dbKey := originKey + if transactionID != "" { + dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID) } - if len(nameValSls) != 1 { - if err = rs.Cmd("HDEL", nameValSls...).Err; err != nil { - return err + if commit && transactionID != "" { + return rs.Cmd("RENAME", dbKey, utils.ConcatenatedKey(originKey, transactionID)).Err + } else { + mp := make(map[string]string) + nameValSls := []interface{}{dbKey} + for key, strMp := range revIdx { + if len(strMp) == 0 { // remove with no more elements inside + nameValSls = append(nameValSls, key) + continue + } + if encodedMp, err := rs.ms.Marshal(strMp); err != nil { + return err + } else { + mp[key] = string(encodedMp) + } } + if len(nameValSls) != 1 { + if err = rs.Cmd("HDEL", nameValSls...).Err; err != nil { + return err + } + } + if len(mp) != 0 { + return rs.Cmd("HMSET", dbKey, mp).Err + } + return } - - if len(mp) != 0 { - return rs.Cmd("HMSET", dbKey, mp).Err - } - return } //RemoveFilterReverseIndexesDrv removes ReverseIndexes for a specific itemID