mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Merge pull request #716 from TeoV/master
Add method Get/Set/RemResource in redis/mongo and add test for them in onstor_it_test.go
This commit is contained in:
@@ -60,8 +60,9 @@ var sTestsOnStorIT = []func(t *testing.T){
|
||||
testOnStorITCacheLCR,
|
||||
testOnStorITCacheAlias,
|
||||
testOnStorITCacheReverseAlias,
|
||||
testOnStorITCacheResource,
|
||||
testOnStorITCacheResourceCfg,
|
||||
testOnStorITCacheTiming,
|
||||
testOnStorITCacheResource,
|
||||
// ToDo: test cache flush for a prefix
|
||||
// ToDo: testOnStorITLoadAccountingCache
|
||||
testOnStorITHasData,
|
||||
@@ -83,8 +84,9 @@ var sTestsOnStorIT = []func(t *testing.T){
|
||||
testOnStorITCRUDUser,
|
||||
testOnStorITCRUDAlias,
|
||||
testOnStorITCRUDReverseAlias,
|
||||
testOnStorITCRUDResource,
|
||||
testOnStorITCRUDResourceCfg,
|
||||
testOnStorITCRUDTiming,
|
||||
testOnStorITCRUDResource,
|
||||
testOnStorITCRUDHistory,
|
||||
testOnStorITCRUDStructVersion,
|
||||
testOnStorITCRUDSQStoredMetrics,
|
||||
@@ -778,7 +780,7 @@ func testOnStorITCacheReverseAlias(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITCacheResource(t *testing.T) {
|
||||
func testOnStorITCacheResourceCfg(t *testing.T) {
|
||||
rCfg := &ResourceCfg{
|
||||
ID: "RL_TEST",
|
||||
Weight: 10,
|
||||
@@ -849,6 +851,43 @@ func testOnStorITCacheTiming(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
//test here Cache
|
||||
func testOnStorITCacheResource(t *testing.T) {
|
||||
res := &Resource{
|
||||
ID: "RL1",
|
||||
Usages: map[string]*ResourceUsage{
|
||||
"RU1": &ResourceUsage{
|
||||
ID: "RU1",
|
||||
ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC).Local(),
|
||||
Units: 2,
|
||||
},
|
||||
},
|
||||
TTLIdx: []string{"RU1"},
|
||||
}
|
||||
if err := onStor.SetResource(res); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
expectedT := []string{"res_RL1"}
|
||||
if itm, err := onStor.GetKeysForPrefix(utils.ResourcesPrefix); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expectedT, itm) {
|
||||
t.Errorf("Expected : %+v, but received %+v", expectedT, itm)
|
||||
}
|
||||
|
||||
if _, hasIt := cache.Get(utils.ResourcesPrefix + res.ID); hasIt {
|
||||
t.Error("Already in cache")
|
||||
}
|
||||
if err := onStor.CacheDataFromDB(utils.ResourcesPrefix, []string{res.ID}, false); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if itm, hasIt := cache.Get(utils.ResourcesPrefix + res.ID); !hasIt {
|
||||
t.Error("Did not cache")
|
||||
} else if rcv := itm.(*Resource); !reflect.DeepEqual(res, rcv) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", res, rcv)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testOnStorITHasData(t *testing.T) {
|
||||
rp := &RatingPlan{
|
||||
Id: "HasData",
|
||||
@@ -1744,7 +1783,7 @@ func testOnStorITCRUDReverseAlias(t *testing.T) {
|
||||
// }
|
||||
}
|
||||
|
||||
func testOnStorITCRUDResource(t *testing.T) {
|
||||
func testOnStorITCRUDResourceCfg(t *testing.T) {
|
||||
rL := &ResourceCfg{
|
||||
ID: "RL_TEST2",
|
||||
Weight: 10,
|
||||
@@ -1853,6 +1892,43 @@ func testOnStorITCRUDHistory(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
//test here Crud
|
||||
func testOnStorITCRUDResource(t *testing.T) {
|
||||
res := &Resource{
|
||||
ID: "RL1",
|
||||
Usages: map[string]*ResourceUsage{
|
||||
"RU1": &ResourceUsage{
|
||||
ID: "RU1",
|
||||
ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC).Local(),
|
||||
Units: 2,
|
||||
},
|
||||
},
|
||||
TTLIdx: []string{"RU1"},
|
||||
}
|
||||
if _, rcvErr := onStor.GetResource("RL1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
if err := onStor.SetResource(res); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rcv, err := onStor.GetResource("RL1", true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
} else if !(reflect.DeepEqual(res, rcv)) {
|
||||
t.Errorf("Expecting: %v, received: %v", res, rcv)
|
||||
}
|
||||
if rcv, err := onStor.GetResource("RL1", false, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(res, rcv) {
|
||||
t.Errorf("Expecting: %v, received: %v", res, rcv)
|
||||
}
|
||||
if err := onStor.RemoveResource(res.ID, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, rcvErr := onStor.GetResource(res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITCRUDStructVersion(t *testing.T) {
|
||||
cv := &StructVersion{
|
||||
Destinations: "1",
|
||||
|
||||
@@ -60,6 +60,7 @@ const (
|
||||
colSts = "stats"
|
||||
colRFI = "request_filter_indexes"
|
||||
colTmg = "timings"
|
||||
colRes = "resources"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -149,7 +150,7 @@ func (ms *MongoStorage) EnsureIndexes() (err error) {
|
||||
}
|
||||
var colectNames []string // collection names containing this index
|
||||
if ms.storageType == utils.DataDB {
|
||||
colectNames = []string{colAct, colApl, colAAp, colAtr, colDcs, colRCfgs, colRpl, colLcr, colDst, colRds, colAls, colUsr, colLht}
|
||||
colectNames = []string{colAct, colApl, colAAp, colAtr, colDcs, colRCfgs, colRpl, colLcr, colDst, colRds, colAls, colUsr, colLht, colRes}
|
||||
}
|
||||
for _, col := range colectNames {
|
||||
if err = db.C(col).EnsureIndex(idx); err != nil {
|
||||
@@ -180,7 +181,7 @@ func (ms *MongoStorage) EnsureIndexes() (err error) {
|
||||
Sparse: false,
|
||||
}
|
||||
for _, col := range []string{utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPDestinationRates, utils.TBLTPRatingPlans,
|
||||
utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPStats} {
|
||||
utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPStats, utils.TBLTPResources} {
|
||||
if err = db.C(col).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -326,6 +327,7 @@ func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool
|
||||
utils.ResourceConfigsPrefix: colRCfg,
|
||||
utils.StatsPrefix: colSts,
|
||||
utils.TimingsPrefix: colTmg,
|
||||
utils.ResourcesPrefix: colRes,
|
||||
}
|
||||
name, ok = colMap[prefix]
|
||||
return
|
||||
@@ -467,7 +469,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
utils.ALIASES_PREFIX,
|
||||
utils.REVERSE_ALIASES_PREFIX,
|
||||
utils.ResourceConfigsPrefix,
|
||||
utils.TimingsPrefix}, prfx) {
|
||||
utils.TimingsPrefix,
|
||||
utils.ResourcesPrefix}, prfx) {
|
||||
return utils.NewCGRError(utils.MONGO,
|
||||
utils.MandatoryIEMissingCaps,
|
||||
utils.UnsupportedCachePrefix,
|
||||
@@ -534,6 +537,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
_, err = ms.GetResourceCfg(dataID, true, utils.NonTransactional)
|
||||
case utils.TimingsPrefix:
|
||||
_, err = ms.GetTiming(dataID, true, utils.NonTransactional)
|
||||
case utils.ResourcesPrefix:
|
||||
_, err = ms.GetResource(dataID, true, utils.NonTransactional)
|
||||
}
|
||||
if err != nil {
|
||||
return utils.NewCGRError(utils.MONGO,
|
||||
@@ -649,6 +654,11 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er
|
||||
for iter.Next(&idResult) {
|
||||
result = append(result, utils.TimingsPrefix+idResult.Id)
|
||||
}
|
||||
case utils.ResourcesPrefix:
|
||||
iter := db.C(colRes).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
for iter.Next(&idResult) {
|
||||
result = append(result, utils.ResourcesPrefix+idResult.Id)
|
||||
}
|
||||
default:
|
||||
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix)
|
||||
}
|
||||
@@ -1892,18 +1902,51 @@ func (ms *MongoStorage) RemoveResourceCfg(id string, transactionID string) (err
|
||||
return nil
|
||||
}
|
||||
|
||||
//from here
|
||||
//find the right collumn
|
||||
func (ms *MongoStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) {
|
||||
key := utils.ResourcesPrefix + id
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Resource), nil
|
||||
}
|
||||
}
|
||||
session, col := ms.conn(colRes)
|
||||
defer session.Close()
|
||||
r = new(Resource)
|
||||
if err = col.Find(bson.M{"id": id}).One(r); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
err = utils.ErrNotFound
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetResource(r *Resource) (err error) {
|
||||
session, col := ms.conn(colRes)
|
||||
defer session.Close()
|
||||
_, err = col.Upsert(bson.M{"id": r.ID}, r)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) RemoveResource(id string, transactionID string) (err error) {
|
||||
return
|
||||
session, col := ms.conn(colRes)
|
||||
defer session.Close()
|
||||
if err = col.Remove(bson.M{"id": id}); err != nil {
|
||||
return
|
||||
}
|
||||
cache.RemKey(utils.ResourcesPrefix+id, cacheCommit(transactionID), transactionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
//to here
|
||||
|
||||
func (ms *MongoStorage) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) {
|
||||
key := utils.TimingsPrefix + id
|
||||
if !skipCache {
|
||||
|
||||
@@ -231,7 +231,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
utils.ALIASES_PREFIX,
|
||||
utils.REVERSE_ALIASES_PREFIX,
|
||||
utils.ResourceConfigsPrefix,
|
||||
utils.TimingsPrefix}, prfx) {
|
||||
utils.TimingsPrefix,
|
||||
utils.ResourcesPrefix}, prfx) {
|
||||
return utils.NewCGRError(utils.REDIS,
|
||||
utils.MandatoryIEMissingCaps,
|
||||
utils.UnsupportedCachePrefix,
|
||||
@@ -298,6 +299,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
_, err = rs.GetResourceCfg(dataID, true, utils.NonTransactional)
|
||||
case utils.TimingsPrefix:
|
||||
_, err = rs.GetTiming(dataID, true, utils.NonTransactional)
|
||||
case utils.ResourcesPrefix:
|
||||
_, err = rs.GetResource(dataID, true, utils.NonTransactional)
|
||||
}
|
||||
if err != nil {
|
||||
return utils.NewCGRError(utils.REDIS,
|
||||
@@ -1374,8 +1377,7 @@ func (rs *RedisStorage) GetStructVersion() (rsv *StructVersion, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetResourceCfg(id string,
|
||||
skipCache bool, transactionID string) (rl *ResourceCfg, err error) {
|
||||
func (rs *RedisStorage) GetResourceCfg(id string, skipCache bool, transactionID string) (rl *ResourceCfg, err error) {
|
||||
key := utils.ResourceConfigsPrefix + id
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
@@ -1422,18 +1424,51 @@ func (rs *RedisStorage) RemoveResourceCfg(id string, transactionID string) (err
|
||||
return
|
||||
}
|
||||
|
||||
//from here
|
||||
func (rs *RedisStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) {
|
||||
key := utils.ResourcesPrefix + id
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Resource), nil
|
||||
}
|
||||
}
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
|
||||
if err == redis.ErrRespNil { // did not find the destination
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
err = utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
}
|
||||
if err = rs.ms.Unmarshal(values, &r); err != nil {
|
||||
return
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetResource(r *Resource) (err error) {
|
||||
return
|
||||
result, err := rs.ms.Marshal(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rs.Cmd("SET", utils.ResourcesPrefix+r.ID, result).Err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) RemoveResource(id string, transactionID string) (err error) {
|
||||
key := utils.ResourcesPrefix + id
|
||||
if err = rs.Cmd("DEL", key).Err; err != nil {
|
||||
return
|
||||
}
|
||||
cache.RemKey(key, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
// to here
|
||||
|
||||
func (rs *RedisStorage) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) {
|
||||
key := utils.TimingsPrefix + id
|
||||
if !skipCache {
|
||||
|
||||
Reference in New Issue
Block a user