From 19fd04795e49231fa7d0c3cd1a9639f6ba58c213 Mon Sep 17 00:00:00 2001 From: TeoV Date: Tue, 5 Sep 2017 03:54:19 -0400 Subject: [PATCH] Add method Get/Set/RemResource in redis/mongo and add test for them --- engine/onstor_it_test.go | 84 ++++++++++++++++++++++++++++++++-- engine/storage_mongo_datadb.go | 51 +++++++++++++++++++-- engine/storage_redis.go | 43 +++++++++++++++-- 3 files changed, 166 insertions(+), 12 deletions(-) diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 70abcbb92..386227b83 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -60,8 +60,9 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCacheLCR, testOnStorITCacheAlias, testOnStorITCacheReverseAlias, - testOnStorITCacheResource, + testOnStorITCacheResourceCfg, testOnStorITCacheTiming, + testOnStorITCacheResource, // ToDo: test cache flush for a prefix // ToDo: testOnStorITLoadAccountingCache testOnStorITHasData, @@ -83,8 +84,9 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCRUDUser, testOnStorITCRUDAlias, testOnStorITCRUDReverseAlias, - testOnStorITCRUDResource, + testOnStorITCRUDResourceCfg, testOnStorITCRUDTiming, + testOnStorITCRUDResource, testOnStorITCRUDHistory, testOnStorITCRUDStructVersion, testOnStorITCRUDSQStoredMetrics, @@ -778,7 +780,7 @@ func testOnStorITCacheReverseAlias(t *testing.T) { } } -func testOnStorITCacheResource(t *testing.T) { +func testOnStorITCacheResourceCfg(t *testing.T) { rCfg := &ResourceCfg{ ID: "RL_TEST", Weight: 10, @@ -849,6 +851,43 @@ func testOnStorITCacheTiming(t *testing.T) { } } +//test here Cache +func testOnStorITCacheResource(t *testing.T) { + res := &Resource{ + ID: "RL1", + Usages: map[string]*ResourceUsage{ + "RU1": &ResourceUsage{ + ID: "RU1", + ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC).Local(), + Units: 2, + }, + }, + TTLIdx: []string{"RU1"}, + } + if err := onStor.SetResource(res); err != nil { + t.Error(err) + } + expectedT := []string{"res_RL1"} + if itm, err := onStor.GetKeysForPrefix(utils.ResourcesPrefix); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedT, itm) { + t.Errorf("Expected : %+v, but received %+v", expectedT, itm) + } + + if _, hasIt := cache.Get(utils.ResourcesPrefix + res.ID); hasIt { + t.Error("Already in cache") + } + if err := onStor.CacheDataFromDB(utils.ResourcesPrefix, []string{res.ID}, false); err != nil { + t.Error(err) + } + if itm, hasIt := cache.Get(utils.ResourcesPrefix + res.ID); !hasIt { + t.Error("Did not cache") + } else if rcv := itm.(*Resource); !reflect.DeepEqual(res, rcv) { + t.Errorf("Expecting: %+v, received: %+v", res, rcv) + } + +} + func testOnStorITHasData(t *testing.T) { rp := &RatingPlan{ Id: "HasData", @@ -1744,7 +1783,7 @@ func testOnStorITCRUDReverseAlias(t *testing.T) { // } } -func testOnStorITCRUDResource(t *testing.T) { +func testOnStorITCRUDResourceCfg(t *testing.T) { rL := &ResourceCfg{ ID: "RL_TEST2", Weight: 10, @@ -1853,6 +1892,43 @@ func testOnStorITCRUDHistory(t *testing.T) { } } +//test here Crud +func testOnStorITCRUDResource(t *testing.T) { + res := &Resource{ + ID: "RL1", + Usages: map[string]*ResourceUsage{ + "RU1": &ResourceUsage{ + ID: "RU1", + ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC).Local(), + Units: 2, + }, + }, + TTLIdx: []string{"RU1"}, + } + if _, rcvErr := onStor.GetResource("RL1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } + if err := onStor.SetResource(res); err != nil { + t.Error(err) + } + if rcv, err := onStor.GetResource("RL1", true, utils.NonTransactional); err != nil { + t.Error(err) + } else if !(reflect.DeepEqual(res, rcv)) { + t.Errorf("Expecting: %v, received: %v", res, rcv) + } + if rcv, err := onStor.GetResource("RL1", false, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(res, rcv) { + t.Errorf("Expecting: %v, received: %v", res, rcv) + } + if err := onStor.RemoveResource(res.ID, utils.NonTransactional); err != nil { + t.Error(err) + } + if _, rcvErr := onStor.GetResource(res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } +} + func testOnStorITCRUDStructVersion(t *testing.T) { cv := &StructVersion{ Destinations: "1", diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index b1f1300d6..1e8d843bd 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -60,6 +60,7 @@ const ( colSts = "stats" colRFI = "request_filter_indexes" colTmg = "timings" + colRes = "resources" ) var ( @@ -149,7 +150,7 @@ func (ms *MongoStorage) EnsureIndexes() (err error) { } var colectNames []string // collection names containing this index if ms.storageType == utils.DataDB { - colectNames = []string{colAct, colApl, colAAp, colAtr, colDcs, colRCfgs, colRpl, colLcr, colDst, colRds, colAls, colUsr, colLht} + colectNames = []string{colAct, colApl, colAAp, colAtr, colDcs, colRCfgs, colRpl, colLcr, colDst, colRds, colAls, colUsr, colLht, colRes} } for _, col := range colectNames { if err = db.C(col).EnsureIndex(idx); err != nil { @@ -180,7 +181,7 @@ func (ms *MongoStorage) EnsureIndexes() (err error) { Sparse: false, } for _, col := range []string{utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, - utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPStats} { + utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPStats, utils.TBLTPResources} { if err = db.C(col).EnsureIndex(idx); err != nil { return } @@ -326,6 +327,7 @@ func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool utils.ResourceConfigsPrefix: colRCfg, utils.StatsPrefix: colSts, utils.TimingsPrefix: colTmg, + utils.ResourcesPrefix: colRes, } name, ok = colMap[prefix] return @@ -467,7 +469,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached utils.ALIASES_PREFIX, utils.REVERSE_ALIASES_PREFIX, utils.ResourceConfigsPrefix, - utils.TimingsPrefix}, prfx) { + utils.TimingsPrefix, + utils.ResourcesPrefix}, prfx) { return utils.NewCGRError(utils.MONGO, utils.MandatoryIEMissingCaps, utils.UnsupportedCachePrefix, @@ -534,6 +537,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached _, err = ms.GetResourceCfg(dataID, true, utils.NonTransactional) case utils.TimingsPrefix: _, err = ms.GetTiming(dataID, true, utils.NonTransactional) + case utils.ResourcesPrefix: + _, err = ms.GetResource(dataID, true, utils.NonTransactional) } if err != nil { return utils.NewCGRError(utils.MONGO, @@ -649,6 +654,11 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er for iter.Next(&idResult) { result = append(result, utils.TimingsPrefix+idResult.Id) } + case utils.ResourcesPrefix: + iter := db.C(colRes).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() + for iter.Next(&idResult) { + result = append(result, utils.ResourcesPrefix+idResult.Id) + } default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } @@ -1892,18 +1902,51 @@ func (ms *MongoStorage) RemoveResourceCfg(id string, transactionID string) (err return nil } +//from here +//find the right collumn func (ms *MongoStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) { + key := utils.ResourcesPrefix + id + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*Resource), nil + } + } + session, col := ms.conn(colRes) + defer session.Close() + r = new(Resource) + if err = col.Find(bson.M{"id": id}).One(r); err != nil { + if err == mgo.ErrNotFound { + err = utils.ErrNotFound + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + } + return nil, err + } + cache.Set(key, r, cacheCommit(transactionID), transactionID) return } func (ms *MongoStorage) SetResource(r *Resource) (err error) { + session, col := ms.conn(colRes) + defer session.Close() + _, err = col.Upsert(bson.M{"id": r.ID}, r) return } func (ms *MongoStorage) RemoveResource(id string, transactionID string) (err error) { - return + session, col := ms.conn(colRes) + defer session.Close() + if err = col.Remove(bson.M{"id": id}); err != nil { + return + } + cache.RemKey(utils.ResourcesPrefix+id, cacheCommit(transactionID), transactionID) + return nil } +//to here + func (ms *MongoStorage) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) { key := utils.TimingsPrefix + id if !skipCache { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 37eee94d7..1474fcf75 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -231,7 +231,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached utils.ALIASES_PREFIX, utils.REVERSE_ALIASES_PREFIX, utils.ResourceConfigsPrefix, - utils.TimingsPrefix}, prfx) { + utils.TimingsPrefix, + utils.ResourcesPrefix}, prfx) { return utils.NewCGRError(utils.REDIS, utils.MandatoryIEMissingCaps, utils.UnsupportedCachePrefix, @@ -298,6 +299,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached _, err = rs.GetResourceCfg(dataID, true, utils.NonTransactional) case utils.TimingsPrefix: _, err = rs.GetTiming(dataID, true, utils.NonTransactional) + case utils.ResourcesPrefix: + _, err = rs.GetResource(dataID, true, utils.NonTransactional) } if err != nil { return utils.NewCGRError(utils.REDIS, @@ -1374,8 +1377,7 @@ func (rs *RedisStorage) GetStructVersion() (rsv *StructVersion, err error) { return } -func (rs *RedisStorage) GetResourceCfg(id string, - skipCache bool, transactionID string) (rl *ResourceCfg, err error) { +func (rs *RedisStorage) GetResourceCfg(id string, skipCache bool, transactionID string) (rl *ResourceCfg, err error) { key := utils.ResourceConfigsPrefix + id if !skipCache { if x, ok := cache.Get(key); ok { @@ -1422,18 +1424,51 @@ func (rs *RedisStorage) RemoveResourceCfg(id string, transactionID string) (err return } +//from here func (rs *RedisStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) { + key := utils.ResourcesPrefix + id + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*Resource), nil + } + } + var values []byte + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err == redis.ErrRespNil { // did not find the destination + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &r); err != nil { + return + } + cache.Set(key, r, cacheCommit(transactionID), transactionID) return } func (rs *RedisStorage) SetResource(r *Resource) (err error) { - return + result, err := rs.ms.Marshal(r) + if err != nil { + return err + } + return rs.Cmd("SET", utils.ResourcesPrefix+r.ID, result).Err } func (rs *RedisStorage) RemoveResource(id string, transactionID string) (err error) { + key := utils.ResourcesPrefix + id + if err = rs.Cmd("DEL", key).Err; err != nil { + return + } + cache.RemKey(key, cacheCommit(transactionID), transactionID) return } +// to here + func (rs *RedisStorage) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) { key := utils.TimingsPrefix + id if !skipCache {