CacheDataFromDB for RatingProfiles, various redis/mongo fixes

This commit is contained in:
DanB
2016-11-29 21:22:45 +01:00
parent c1e4d46c9c
commit b1bdbfc5a7
3 changed files with 203 additions and 131 deletions

View File

@@ -47,6 +47,7 @@ var sTestsOnStorIT = []func(t *testing.T){
testOnStorITCacheDestinations,
testOnStorITCacheReverseDestinations,
testOnStorITCacheRatingPlan,
testOnStorITCacheRatingProfile,
}
func TestOnStorITRedisConnect(t *testing.T) {
@@ -309,3 +310,29 @@ func testOnStorITCacheRatingPlan(t *testing.T) {
t.Error("Wrong item in the cache")
}
}
func testOnStorITCacheRatingProfile(t *testing.T) {
rpf := &RatingProfile{
Id: "*out:test:0:trp",
RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{
ActivationTime: time.Date(2013, 10, 1, 0, 0, 0, 0, time.UTC),
RatingPlanId: "TDRT",
FallbackKeys: []string{"*out:test:0:danb", "*out:test:0:rif"},
CdrStatQueueIds: []string{},
}},
}
if err := onStor.SetRatingProfile(rpf, utils.NonTransactional); err != nil {
t.Error(err)
}
if _, hasIt := cache.Get(utils.RATING_PROFILE_PREFIX + rpf.Id); hasIt {
t.Error("Already in cache")
}
if err := onStor.CacheDataFromDB(utils.RATING_PROFILE_PREFIX, []string{rpf.Id}, false); err != nil {
t.Error(err)
}
if itm, hasIt := cache.Get(utils.RATING_PROFILE_PREFIX + rpf.Id); !hasIt {
t.Error("Did not cache")
} else if rcvRp := itm.(*RatingProfile); !reflect.DeepEqual(rpf.Id, rcvRp.Id) { // fixme
t.Error("Wrong item in the cache", rcvRp)
}
}

View File

@@ -475,7 +475,8 @@ func (ms *MongoStorage) PreloadCacheForPrefix(prefix string) error {
func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached bool) (err error) {
if !utils.IsSliceMember([]string{utils.DESTINATION_PREFIX,
utils.REVERSE_DESTINATION_PREFIX,
utils.RATING_PLAN_PREFIX}, prfx) {
utils.RATING_PLAN_PREFIX,
utils.RATING_PROFILE_PREFIX}, prfx) {
return utils.NewCGRError(utils.REDIS,
utils.MandatoryIEMissingCaps,
utils.UnsupportedCachePrefix,
@@ -502,6 +503,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
_, err = ms.GetReverseDestination(dataID, false, utils.NonTransactional)
case utils.RATING_PLAN_PREFIX:
_, err = ms.GetRatingPlan(dataID, false, utils.NonTransactional)
case utils.RATING_PROFILE_PREFIX:
_, err = ms.GetRatingProfile(dataID, false, utils.NonTransactional)
}
if err != nil {
return utils.NewCGRError(utils.REDIS,
@@ -609,12 +612,13 @@ func (ms *MongoStorage) HasData(category, subject string) (bool, error) {
}
func (ms *MongoStorage) GetRatingPlan(key string, skipCache bool, transactionID string) (rp *RatingPlan, err error) {
cacheKey := utils.RATING_PLAN_PREFIX + key
if !skipCache {
if x, ok := cache.Get(utils.RATING_PLAN_PREFIX + key); ok {
if x != nil {
return x.(*RatingPlan), nil
if x, ok := cache.Get(cacheKey); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return nil, utils.ErrNotFound
return x.(*RatingPlan), nil
}
}
rp = new(RatingPlan)
@@ -624,24 +628,27 @@ func (ms *MongoStorage) GetRatingPlan(key string, skipCache bool, transactionID
}
session, col := ms.conn(colRpl)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&kv)
if err == nil {
b := bytes.NewBuffer(kv.Value)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
err = ms.ms.Unmarshal(out, &rp)
if err != nil {
return nil, err
if err = col.Find(bson.M{"key": key}).One(&kv); err != nil {
if err == mgo.ErrNotFound {
cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
return
}
cache.Set(utils.RATING_PLAN_PREFIX+key, rp, cacheCommit(transactionID), transactionID)
b := bytes.NewBuffer(kv.Value)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
if err = ms.ms.Unmarshal(out, &rp); err != nil {
return nil, err
}
cache.Set(cacheKey, rp, cacheCommit(transactionID), transactionID)
return
}
@@ -669,31 +676,36 @@ func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan, transactionID string) erro
}
func (ms *MongoStorage) GetRatingProfile(key string, skipCache bool, transactionID string) (rp *RatingProfile, err error) {
cacheKey := utils.RATING_PROFILE_PREFIX + key
if !skipCache {
if x, ok := cache.Get(utils.RATING_PROFILE_PREFIX + key); ok {
if x != nil {
return x.(*RatingProfile), nil
if x, ok := cache.Get(cacheKey); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return nil, utils.ErrNotFound
return x.(*RatingProfile), nil
}
}
rp = new(RatingProfile)
session, col := ms.conn(colRpf)
defer session.Close()
err = col.Find(bson.M{"id": key}).One(rp)
if err == nil {
cache.Set(utils.RATING_PROFILE_PREFIX+key, rp, cacheCommit(transactionID), transactionID)
} else {
cache.Set(utils.RATING_PROFILE_PREFIX+key, nil, cacheCommit(transactionID), transactionID)
rp = new(RatingProfile)
if err = col.Find(bson.M{"id": key}).One(rp); err != nil {
if err == mgo.ErrNotFound {
cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
return
}
cache.Set(cacheKey, rp, cacheCommit(transactionID), transactionID)
return
}
func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile, transactionID string) error {
func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile, transactionID string) (err error) {
session, col := ms.conn(colRpf)
defer session.Close()
_, err := col.Upsert(bson.M{"id": rp.Id}, rp)
if err == nil && historyScribe != nil {
if _, err = col.Upsert(bson.M{"id": rp.Id}, rp); err != nil {
return
}
if historyScribe != nil {
var response int
historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(false), &response)
}
@@ -758,44 +770,46 @@ func (ms *MongoStorage) SetLCR(lcr *LCR, transactionID string) error {
}
func (ms *MongoStorage) GetDestination(key string, skipCache bool, transactionID string) (result *Destination, err error) {
cacheKey := utils.DESTINATION_PREFIX + key
if !skipCache {
if x, ok := cache.Get(utils.DESTINATION_PREFIX + key); ok {
if x != nil {
return x.(*Destination), nil
if x, ok := cache.Get(cacheKey); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return nil, utils.ErrNotFound
return x.(*Destination), nil
}
}
result = new(Destination)
var kv struct {
Key string
Value []byte
}
session, col := ms.conn(colDst)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&kv)
if err == nil {
b := bytes.NewBuffer(kv.Value)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
err = ms.ms.Unmarshal(out, &result)
if err != nil {
return nil, err
if err = col.Find(bson.M{"key": key}).One(&kv); err != nil {
if err == mgo.ErrNotFound {
cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
return
}
b := bytes.NewBuffer(kv.Value)
r, err := zlib.NewReader(b)
if err != nil {
result = nil
return nil, err
}
cache.Set(utils.DESTINATION_PREFIX+key, result, cacheCommit(transactionID), transactionID)
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
err = ms.ms.Unmarshal(out, &result)
if err != nil {
return nil, err
}
cache.Set(cacheKey, result, cacheCommit(transactionID), transactionID)
return
}
func (ms *MongoStorage) SetDestination(dest *Destination, transactionID string) (err error) {
result, err := ms.ms.Marshal(dest)
if err != nil {
@@ -807,25 +821,28 @@ func (ms *MongoStorage) SetDestination(dest *Destination, transactionID string)
w.Close()
session, col := ms.conn(colDst)
defer session.Close()
_, err = col.Upsert(bson.M{"key": dest.Id}, &struct {
if _, err = col.Upsert(bson.M{"key": dest.Id}, &struct {
Key string
Value []byte
}{Key: dest.Id, Value: b.Bytes()})
cache.RemKey(utils.DESTINATION_PREFIX+dest.Id, cacheCommit(transactionID), transactionID)
if err == nil && historyScribe != nil {
}{Key: dest.Id, Value: b.Bytes()}); err != nil {
return
}
if historyScribe != nil {
var response int
historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response)
}
cache.RemKey(utils.DESTINATION_PREFIX+dest.Id, cacheCommit(transactionID), transactionID)
return
}
func (ms *MongoStorage) GetReverseDestination(prefix string, skipCache bool, transactionID string) (ids []string, err error) {
cacheKey := utils.REVERSE_DESTINATION_PREFIX + prefix
if !skipCache {
if x, ok := cache.Get(utils.REVERSE_DESTINATION_PREFIX + prefix); ok {
if x != nil {
return x.([]string), nil
if x, ok := cache.Get(cacheKey); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return nil, utils.ErrNotFound
return x.([]string), nil
}
}
var result struct {
@@ -834,24 +851,27 @@ func (ms *MongoStorage) GetReverseDestination(prefix string, skipCache bool, tra
}
session, col := ms.conn(colRds)
defer session.Close()
err = col.Find(bson.M{"key": prefix}).One(&result)
if err == nil {
ids = result.Value
if err = col.Find(bson.M{"key": prefix}).One(&result); err != nil {
if err == mgo.ErrNotFound {
cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
return
}
cache.Set(utils.REVERSE_DESTINATION_PREFIX+prefix, ids, cacheCommit(transactionID), transactionID)
ids = result.Value
cache.Set(cacheKey, ids, cacheCommit(transactionID), transactionID)
return
}
func (ms *MongoStorage) SetReverseDestination(dest *Destination, transactionID string) (err error) {
session, col := ms.conn(colRds)
defer session.Close()
cCommig := cacheCommit(transactionID)
cCommit := cacheCommit(transactionID)
for _, p := range dest.Prefixes {
_, err = col.Upsert(bson.M{"key": p}, bson.M{"$addToSet": bson.M{"value": dest.Id}})
if err != nil {
if _, err = col.Upsert(bson.M{"key": p}, bson.M{"$addToSet": bson.M{"value": dest.Id}}); err != nil {
break
}
cache.RemKey(utils.REVERSE_DESTINATION_PREFIX+p, cCommig, transactionID)
cache.RemKey(utils.REVERSE_DESTINATION_PREFIX+p, cCommit, transactionID)
}
return
}

View File

@@ -275,7 +275,8 @@ func (rs *RedisStorage) RebuildReverseForPrefix(prefix string) error {
func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached bool) (err error) {
if !utils.IsSliceMember([]string{utils.DESTINATION_PREFIX,
utils.REVERSE_DESTINATION_PREFIX,
utils.RATING_PLAN_PREFIX}, prfx) {
utils.RATING_PLAN_PREFIX,
utils.RATING_PROFILE_PREFIX}, prfx) {
return utils.NewCGRError(utils.REDIS,
utils.MandatoryIEMissingCaps,
utils.UnsupportedCachePrefix,
@@ -302,6 +303,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
_, err = rs.GetReverseDestination(dataID, false, utils.NonTransactional)
case utils.RATING_PLAN_PREFIX:
_, err = rs.GetRatingPlan(dataID, false, utils.NonTransactional)
case utils.RATING_PROFILE_PREFIX:
_, err = rs.GetRatingProfile(dataID, false, utils.NonTransactional)
}
if err != nil {
return utils.NewCGRError(utils.REDIS,
@@ -335,26 +338,34 @@ func (rs *RedisStorage) GetRatingPlan(key string, skipCache bool, transactionID
key = utils.RATING_PLAN_PREFIX + key
if !skipCache {
if x, ok := cache.Get(key); ok {
if x != nil {
return x.(*RatingPlan), nil
if x == nil {
return nil, utils.ErrNotFound
}
return nil, utils.ErrNotFound
return x.(*RatingPlan), nil
}
}
var values []byte
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
b := bytes.NewBuffer(values)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
if err.Error() == "wrong type" { // did not find the destination
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
rp = new(RatingPlan)
err = rs.ms.Unmarshal(out, rp)
return nil, err
}
b := bytes.NewBuffer(values)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
rp = new(RatingPlan)
err = rs.ms.Unmarshal(out, rp)
if err != nil {
return nil, err
}
cache.Set(key, rp, cacheCommit(transactionID), transactionID)
return
@@ -377,19 +388,24 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan, transactionID string) (err
func (rs *RedisStorage) GetRatingProfile(key string, skipCache bool, transactionID string) (rpf *RatingProfile, err error) {
key = utils.RATING_PROFILE_PREFIX + key
if !skipCache {
if x, ok := cache.Get(key); ok {
if x != nil {
return x.(*RatingProfile), nil
if x == nil {
return nil, utils.ErrNotFound
}
return nil, utils.ErrNotFound
return x.(*RatingProfile), nil
}
}
var values []byte
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
rpf = new(RatingProfile)
err = rs.ms.Unmarshal(values, rpf)
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
if err.Error() == "wrong type" { // did not find the destination
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
return
}
if err = rs.ms.Unmarshal(values, &rpf); err != nil {
return
}
cache.Set(key, rpf, cacheCommit(transactionID), transactionID)
return
@@ -397,12 +413,18 @@ func (rs *RedisStorage) GetRatingProfile(key string, skipCache bool, transaction
func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile, transactionID string) (err error) {
result, err := rs.ms.Marshal(rpf)
err = rs.Cmd("SET", utils.RATING_PROFILE_PREFIX+rpf.Id, result).Err
if err == nil && historyScribe != nil {
if err != nil {
return err
}
key := utils.RATING_PROFILE_PREFIX + rpf.Id
if err = rs.Cmd("SET", key, result).Err; err != nil {
return
}
if historyScribe != nil {
response := 0
go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(false), &response)
}
cache.RemKey(utils.RATING_PROFILE_PREFIX+rpf.Id, cacheCommit(transactionID), transactionID)
cache.RemKey(key, cacheCommit(transactionID), transactionID)
return
}
@@ -460,10 +482,10 @@ func (rs *RedisStorage) GetDestination(key string, skipCache bool, transactionID
key = utils.DESTINATION_PREFIX + key
if !skipCache {
if x, ok := cache.Get(key); ok {
if x != nil {
return x.(*Destination), nil
if x == nil {
return nil, utils.ErrNotFound
}
return nil, utils.ErrNotFound
return x.(*Destination), nil
}
}
var values []byte
@@ -472,23 +494,21 @@ func (rs *RedisStorage) GetDestination(key string, skipCache bool, transactionID
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
return
}
b := bytes.NewBuffer(values)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
err = rs.ms.Unmarshal(out, &dest)
if err != nil {
return nil, err
} else {
b := bytes.NewBuffer(values)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
dest = new(Destination)
err = rs.ms.Unmarshal(out, dest)
if err != nil {
return nil, err
}
}
cache.Set(key, dest, cacheCommit(transactionID), transactionID)
return
@@ -504,8 +524,10 @@ func (rs *RedisStorage) SetDestination(dest *Destination, transactionID string)
w.Write(result)
w.Close()
key := utils.DESTINATION_PREFIX + dest.Id
err = rs.Cmd("SET", key, b.Bytes()).Err
if err == nil && historyScribe != nil {
if err = rs.Cmd("SET", key, b.Bytes()).Err; err != nil {
return err
}
if historyScribe != nil {
response := 0
go historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response)
}
@@ -513,28 +535,31 @@ func (rs *RedisStorage) SetDestination(dest *Destination, transactionID string)
return
}
func (rs *RedisStorage) GetReverseDestination(prefix string, skipCache bool, transactionID string) (ids []string, err error) {
prefix = utils.REVERSE_DESTINATION_PREFIX + prefix
func (rs *RedisStorage) GetReverseDestination(key string, skipCache bool, transactionID string) (ids []string, err error) {
key = utils.REVERSE_DESTINATION_PREFIX + key
if !skipCache {
if x, ok := cache.Get(prefix); ok {
if x != nil {
return x.([]string), nil
if x, ok := cache.Get(key); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return nil, utils.ErrNotFound
return x.([]string), nil
}
}
if ids, err = rs.Cmd("SMEMBERS", prefix).List(); len(ids) > 0 && err == nil {
cache.Set(prefix, ids, cacheCommit(transactionID), transactionID)
return ids, nil
if ids, err = rs.Cmd("SMEMBERS", key).List(); err != nil {
if err.Error() == "wrong type" { // did not find the destination
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
return
}
return nil, utils.ErrNotFound
cache.Set(key, ids, cacheCommit(transactionID), transactionID)
return
}
func (rs *RedisStorage) SetReverseDestination(dest *Destination, transactionID string) (err error) {
for _, p := range dest.Prefixes {
key := utils.REVERSE_DESTINATION_PREFIX + p
err = rs.Cmd("SADD", key, dest.Id).Err
if err != nil {
if err = rs.Cmd("SADD", key, dest.Id).Err; err != nil {
break
}
cache.RemKey(key, cacheCommit(transactionID), transactionID)