Storage.Get/Set/RemVersions improvements

This commit is contained in:
DanB
2018-04-25 18:45:20 +02:00
parent 28de5ac30f
commit 910ea04c08
53 changed files with 167 additions and 95 deletions

View File

@@ -50,6 +50,9 @@ func TestActionsitInitCfg(t *testing.T) {
}
func TestActionsitInitCdrDb(t *testing.T) {
if err := InitDataDb(actsLclCfg); err != nil { // need it for versions
t.Fatal(err)
}
if err := InitStorDb(actsLclCfg); err != nil {
t.Fatal(err)
}

View File

@@ -49,6 +49,7 @@ var sTestsOnStorIT = []func(t *testing.T){
testOnStorITCacheActionPlan,
testOnStorITCacheAccountActionPlans,
testOnStorITCacheDerivedChargers,
// ToDo: test cache flush for a prefix
// ToDo: testOnStorITLoadAccountingCache
testOnStorITHasData,
@@ -1868,22 +1869,61 @@ func testOnStorITCRUDHistory(t *testing.T) {
}
func testOnStorITCRUDStructVersion(t *testing.T) {
CurrentVersion := Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 2}
if _, rcvErr := onStor.DataDB().GetVersions(utils.TBLVersions); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if err := onStor.DataDB().SetVersions(CurrentVersion, false); err != nil {
if _, err := onStor.DataDB().GetVersions(utils.Accounts); err != utils.ErrNotFound {
t.Error(err)
}
if rcv, err := onStor.DataDB().GetVersions(utils.TBLVersions); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(CurrentVersion, rcv) {
t.Errorf("Expecting: %v, received: %v", CurrentVersion, rcv)
} else if err = onStor.DataDB().RemoveVersions(rcv); err != nil {
vrs := Versions{utils.Accounts: 3, utils.Actions: 2, utils.ActionTriggers: 2,
utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 1}
if err := onStor.DataDB().SetVersions(vrs, false); err != nil {
t.Error(err)
}
if _, rcvErr := onStor.DataDB().GetVersions(utils.TBLVersions); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
if rcv, err := onStor.DataDB().GetVersions(""); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(vrs, rcv) {
t.Errorf("Expecting: %v, received: %v", vrs, rcv)
}
delete(vrs, utils.SharedGroups)
if err := onStor.DataDB().SetVersions(vrs, true); err != nil { // overwrite
t.Error(err)
}
if rcv, err := onStor.DataDB().GetVersions(""); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(vrs, rcv) {
t.Errorf("Expecting: %v, received: %v", vrs, rcv)
}
eAcnts := Versions{utils.Accounts: vrs[utils.Accounts]}
if rcv, err := onStor.DataDB().GetVersions(utils.Accounts); err != nil { //query one element
t.Error(err)
} else if !reflect.DeepEqual(eAcnts, rcv) {
t.Errorf("Expecting: %v, received: %v", eAcnts, rcv)
}
if _, err := onStor.DataDB().GetVersions(utils.NOT_AVAILABLE); err != utils.ErrNotFound { //query non-existent
t.Error(err)
}
eAcnts[utils.Accounts] = 2
vrs[utils.Accounts] = eAcnts[utils.Accounts]
if err := onStor.DataDB().SetVersions(eAcnts, false); err != nil { // change one element
t.Error(err)
}
if rcv, err := onStor.DataDB().GetVersions(""); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(vrs, rcv) {
t.Errorf("Expecting: %v, received: %v", vrs, rcv)
}
if err = onStor.DataDB().RemoveVersions(eAcnts); err != nil { // remove one element
t.Error(err)
}
delete(vrs, utils.Accounts)
if rcv, err := onStor.DataDB().GetVersions(""); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(vrs, rcv) {
t.Errorf("Expecting: %v, received: %v", vrs, rcv)
}
if err = onStor.DataDB().RemoveVersions(nil); err != nil { // remove one element
t.Error(err)
}
if _, err := onStor.DataDB().GetVersions(""); err != utils.ErrNotFound { //query non-existent
t.Error(err)
}
}

View File

@@ -1669,7 +1669,7 @@ func (ms *MapStorage) SetVersions(vrs Versions, overwrite bool) (err error) {
var result []byte
var x Versions
if !overwrite {
x, err = ms.GetVersions(utils.TBLVersions)
x, err = ms.GetVersions("")
if err != nil {
return err
}

View File

@@ -1294,12 +1294,19 @@ func (ms *MongoStorage) SetTPAttributes(tpSPs []*utils.TPAttributeProfile) (err
func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) {
session, col := ms.conn(colVer)
defer session.Close()
if err = col.Find(bson.M{}).One(&vrs); err != nil {
proj := bson.M{} // projection params
if itm != "" {
proj[itm] = 1
}
if err = col.Find(bson.M{}).Select(proj).One(&vrs); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
}
return nil, err
}
if len(vrs) == 0 {
return nil, utils.ErrNotFound
}
return
}
@@ -1307,27 +1314,33 @@ func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) {
session, col := ms.conn(colVer)
defer session.Close()
if overwrite {
if err = ms.RemoveVersions(vrs); err != nil {
return err
}
_, err = col.Upsert(bson.M{}, &vrs)
return
}
if _, err = col.Upsert(bson.M{}, &vrs); err != nil {
return err
}
_, err = col.Upsert(bson.M{}, bson.M{"$set": &vrs})
return
}
func (ms *MongoStorage) RemoveVersions(vrs Versions) (err error) {
session, col := ms.conn(colVer)
defer session.Close()
err = col.Remove(bson.M{})
if len(vrs) != 0 {
var pairs []interface{}
for k := range vrs {
pairs = append(pairs, bson.M{}) // match first
pairs = append(pairs, bson.M{"$unset": bson.M{k: 1}})
}
bulk := col.Bulk()
bulk.Unordered()
bulk.Upsert(pairs...)
_, err = bulk.Run()
} else {
err = col.Remove(bson.M{})
}
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
} else {
return err
}
return nil
return
}
func (ms *MongoStorage) GetStorageType() string {

View File

@@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"strconv"
"strings"
"github.com/cgrates/cgrates/config"
@@ -1544,11 +1545,25 @@ func (rs *RedisStorage) MatchFilterIndexDrv(cacheID, itemIDPrefix,
}
func (rs *RedisStorage) GetVersions(itm string) (vrs Versions, err error) {
x, err := rs.Cmd("HGETALL", itm).Map()
if itm != "" {
fldVal, err := rs.Cmd("HGET", utils.TBLVersions, itm).Str()
if err != nil {
if err == redis.ErrRespNil {
err = utils.ErrNotFound
}
return nil, err
}
intVal, err := strconv.ParseInt(fldVal, 10, 64)
if err != nil {
return nil, err
}
return Versions{itm: intVal}, nil
}
mp, err := rs.Cmd("HGETALL", utils.TBLVersions).Map()
if err != nil {
return nil, err
}
vrs, err = utils.MapStringToInt64(x)
vrs, err = utils.MapStringToInt64(mp)
if err != nil {
return nil, err
}
@@ -1560,7 +1575,7 @@ func (rs *RedisStorage) GetVersions(itm string) (vrs Versions, err error) {
func (rs *RedisStorage) SetVersions(vrs Versions, overwrite bool) (err error) {
if overwrite {
if err = rs.RemoveVersions(vrs); err != nil {
if err = rs.RemoveVersions(nil); err != nil {
return
}
}
@@ -1568,14 +1583,16 @@ func (rs *RedisStorage) SetVersions(vrs Versions, overwrite bool) (err error) {
}
func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) {
for key, _ := range vrs {
err = rs.Cmd("HDEL", utils.TBLVersions, key).Err
if err != nil {
return err
if len(vrs) != 0 {
for key, _ := range vrs {
err = rs.Cmd("HDEL", utils.TBLVersions, key).Err
if err != nil {
return err
}
}
return
}
return
return rs.Cmd("DEL", utils.TBLVersions).Err
}
// GetStatQueueProfileDrv retrieves a StatQueueProfile from dataDB

View File

@@ -32,7 +32,7 @@ func CheckVersions(storage Storage) error {
// get current db version
storType := storage.GetStorageType()
x := CurrentDBVersions(storType)
dbVersion, err := storage.GetVersions(utils.TBLVersions)
dbVersion, err := storage.GetVersions("")
if err == utils.ErrNotFound {
empty, err := storage.IsDBEmpty()
if err != nil {

View File

@@ -153,13 +153,13 @@ func testVersion(t *testing.T) {
}
//dataDB
if _, rcvErr := dm3.DataDB().GetVersions(utils.TBLVersions); rcvErr != utils.ErrNotFound {
if _, rcvErr := dm3.DataDB().GetVersions(""); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if err := CheckVersions(dm3.DataDB()); err != nil {
t.Error(err)
}
if rcv, err := dm3.DataDB().GetVersions(utils.TBLVersions); err != nil {
if rcv, err := dm3.DataDB().GetVersions(""); err != nil {
t.Error(err)
} else if len(currentVersion) != len(rcv) {
t.Errorf("Expecting: %v, received: %v", currentVersion, rcv)
@@ -167,7 +167,7 @@ func testVersion(t *testing.T) {
if err = dm3.DataDB().RemoveVersions(currentVersion); err != nil {
t.Error(err)
}
if _, rcvErr := dm3.DataDB().GetVersions(utils.TBLVersions); rcvErr != utils.ErrNotFound {
if _, rcvErr := dm3.DataDB().GetVersions(""); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if err := dm3.DataDB().SetVersions(testVersion, false); err != nil {
@@ -197,7 +197,7 @@ func testVersion(t *testing.T) {
if err := CheckVersions(storageDb); err != nil {
t.Error(err)
}
if rcv, err := storageDb.GetVersions(utils.TBLVersions); err != nil {
if rcv, err := storageDb.GetVersions(""); err != nil {
t.Error(err)
} else if len(currentVersion) != len(rcv) {
t.Errorf("Expecting: %v, received: %v", currentVersion, rcv)
@@ -205,7 +205,7 @@ func testVersion(t *testing.T) {
if err = storageDb.RemoveVersions(currentVersion); err != nil {
t.Error(err)
}
if _, rcvErr := storageDb.GetVersions(utils.TBLVersions); rcvErr != utils.ErrNotFound {
if _, rcvErr := storageDb.GetVersions(""); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if err := storageDb.SetVersions(testVersion, false); err != nil {