From b1bdbfc5a7c338534fbe41a666b26e5431bcebe1 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 29 Nov 2016 21:22:45 +0100 Subject: [PATCH] CacheDataFromDB for RatingProfiles, various redis/mongo fixes --- engine/onstor_it_test.go | 27 ++++++ engine/storage_mongo_datadb.go | 164 ++++++++++++++++++--------------- engine/storage_redis.go | 143 ++++++++++++++++------------ 3 files changed, 203 insertions(+), 131 deletions(-) diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 1c1742277..32a032509 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -47,6 +47,7 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCacheDestinations, testOnStorITCacheReverseDestinations, testOnStorITCacheRatingPlan, + testOnStorITCacheRatingProfile, } func TestOnStorITRedisConnect(t *testing.T) { @@ -309,3 +310,29 @@ func testOnStorITCacheRatingPlan(t *testing.T) { t.Error("Wrong item in the cache") } } + +func testOnStorITCacheRatingProfile(t *testing.T) { + rpf := &RatingProfile{ + Id: "*out:test:0:trp", + RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{ + ActivationTime: time.Date(2013, 10, 1, 0, 0, 0, 0, time.UTC), + RatingPlanId: "TDRT", + FallbackKeys: []string{"*out:test:0:danb", "*out:test:0:rif"}, + CdrStatQueueIds: []string{}, + }}, + } + if err := onStor.SetRatingProfile(rpf, utils.NonTransactional); err != nil { + t.Error(err) + } + if _, hasIt := cache.Get(utils.RATING_PROFILE_PREFIX + rpf.Id); hasIt { + t.Error("Already in cache") + } + if err := onStor.CacheDataFromDB(utils.RATING_PROFILE_PREFIX, []string{rpf.Id}, false); err != nil { + t.Error(err) + } + if itm, hasIt := cache.Get(utils.RATING_PROFILE_PREFIX + rpf.Id); !hasIt { + t.Error("Did not cache") + } else if rcvRp := itm.(*RatingProfile); !reflect.DeepEqual(rpf.Id, rcvRp.Id) { // fixme + t.Error("Wrong item in the cache", rcvRp) + } +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 7b7e56aa7..f19f1a159 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -475,7 +475,8 @@ func (ms *MongoStorage) PreloadCacheForPrefix(prefix string) error { func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached bool) (err error) { if !utils.IsSliceMember([]string{utils.DESTINATION_PREFIX, utils.REVERSE_DESTINATION_PREFIX, - utils.RATING_PLAN_PREFIX}, prfx) { + utils.RATING_PLAN_PREFIX, + utils.RATING_PROFILE_PREFIX}, prfx) { return utils.NewCGRError(utils.REDIS, utils.MandatoryIEMissingCaps, utils.UnsupportedCachePrefix, @@ -502,6 +503,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached _, err = ms.GetReverseDestination(dataID, false, utils.NonTransactional) case utils.RATING_PLAN_PREFIX: _, err = ms.GetRatingPlan(dataID, false, utils.NonTransactional) + case utils.RATING_PROFILE_PREFIX: + _, err = ms.GetRatingProfile(dataID, false, utils.NonTransactional) } if err != nil { return utils.NewCGRError(utils.REDIS, @@ -609,12 +612,13 @@ func (ms *MongoStorage) HasData(category, subject string) (bool, error) { } func (ms *MongoStorage) GetRatingPlan(key string, skipCache bool, transactionID string) (rp *RatingPlan, err error) { + cacheKey := utils.RATING_PLAN_PREFIX + key if !skipCache { - if x, ok := cache.Get(utils.RATING_PLAN_PREFIX + key); ok { - if x != nil { - return x.(*RatingPlan), nil + if x, ok := cache.Get(cacheKey); ok { + if x == nil { + return nil, utils.ErrNotFound } - return nil, utils.ErrNotFound + return x.(*RatingPlan), nil } } rp = new(RatingPlan) @@ -624,24 +628,27 @@ func (ms *MongoStorage) GetRatingPlan(key string, skipCache bool, transactionID } session, col := ms.conn(colRpl) defer session.Close() - err = col.Find(bson.M{"key": key}).One(&kv) - if err == nil { - b := bytes.NewBuffer(kv.Value) - r, err := zlib.NewReader(b) - if err != nil { - return nil, err - } - out, err := ioutil.ReadAll(r) - if err != nil { - return nil, err - } - r.Close() - err = ms.ms.Unmarshal(out, &rp) - if err != nil { - return nil, err + if err = col.Find(bson.M{"key": key}).One(&kv); err != nil { + if err == mgo.ErrNotFound { + cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound } + return } - cache.Set(utils.RATING_PLAN_PREFIX+key, rp, cacheCommit(transactionID), transactionID) + b := bytes.NewBuffer(kv.Value) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + if err = ms.ms.Unmarshal(out, &rp); err != nil { + return nil, err + } + cache.Set(cacheKey, rp, cacheCommit(transactionID), transactionID) return } @@ -669,31 +676,36 @@ func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan, transactionID string) erro } func (ms *MongoStorage) GetRatingProfile(key string, skipCache bool, transactionID string) (rp *RatingProfile, err error) { + cacheKey := utils.RATING_PROFILE_PREFIX + key if !skipCache { - if x, ok := cache.Get(utils.RATING_PROFILE_PREFIX + key); ok { - if x != nil { - return x.(*RatingProfile), nil + if x, ok := cache.Get(cacheKey); ok { + if x == nil { + return nil, utils.ErrNotFound } - return nil, utils.ErrNotFound + return x.(*RatingProfile), nil } } - rp = new(RatingProfile) session, col := ms.conn(colRpf) defer session.Close() - err = col.Find(bson.M{"id": key}).One(rp) - if err == nil { - cache.Set(utils.RATING_PROFILE_PREFIX+key, rp, cacheCommit(transactionID), transactionID) - } else { - cache.Set(utils.RATING_PROFILE_PREFIX+key, nil, cacheCommit(transactionID), transactionID) + rp = new(RatingProfile) + if err = col.Find(bson.M{"id": key}).One(rp); err != nil { + if err == mgo.ErrNotFound { + cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return } + cache.Set(cacheKey, rp, cacheCommit(transactionID), transactionID) return } -func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile, transactionID string) error { +func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile, transactionID string) (err error) { session, col := ms.conn(colRpf) defer session.Close() - _, err := col.Upsert(bson.M{"id": rp.Id}, rp) - if err == nil && historyScribe != nil { + if _, err = col.Upsert(bson.M{"id": rp.Id}, rp); err != nil { + return + } + if historyScribe != nil { var response int historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(false), &response) } @@ -758,44 +770,46 @@ func (ms *MongoStorage) SetLCR(lcr *LCR, transactionID string) error { } func (ms *MongoStorage) GetDestination(key string, skipCache bool, transactionID string) (result *Destination, err error) { + cacheKey := utils.DESTINATION_PREFIX + key if !skipCache { - if x, ok := cache.Get(utils.DESTINATION_PREFIX + key); ok { - if x != nil { - return x.(*Destination), nil + if x, ok := cache.Get(cacheKey); ok { + if x == nil { + return nil, utils.ErrNotFound } - return nil, utils.ErrNotFound + return x.(*Destination), nil } } - result = new(Destination) var kv struct { Key string Value []byte } session, col := ms.conn(colDst) defer session.Close() - err = col.Find(bson.M{"key": key}).One(&kv) - if err == nil { - b := bytes.NewBuffer(kv.Value) - r, err := zlib.NewReader(b) - if err != nil { - return nil, err - } - out, err := ioutil.ReadAll(r) - if err != nil { - return nil, err - } - r.Close() - err = ms.ms.Unmarshal(out, &result) - if err != nil { - return nil, err + if err = col.Find(bson.M{"key": key}).One(&kv); err != nil { + if err == mgo.ErrNotFound { + cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound } + return } + b := bytes.NewBuffer(kv.Value) + r, err := zlib.NewReader(b) if err != nil { - result = nil + return nil, err } - cache.Set(utils.DESTINATION_PREFIX+key, result, cacheCommit(transactionID), transactionID) + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + err = ms.ms.Unmarshal(out, &result) + if err != nil { + return nil, err + } + cache.Set(cacheKey, result, cacheCommit(transactionID), transactionID) return } + func (ms *MongoStorage) SetDestination(dest *Destination, transactionID string) (err error) { result, err := ms.ms.Marshal(dest) if err != nil { @@ -807,25 +821,28 @@ func (ms *MongoStorage) SetDestination(dest *Destination, transactionID string) w.Close() session, col := ms.conn(colDst) defer session.Close() - _, err = col.Upsert(bson.M{"key": dest.Id}, &struct { + if _, err = col.Upsert(bson.M{"key": dest.Id}, &struct { Key string Value []byte - }{Key: dest.Id, Value: b.Bytes()}) - cache.RemKey(utils.DESTINATION_PREFIX+dest.Id, cacheCommit(transactionID), transactionID) - if err == nil && historyScribe != nil { + }{Key: dest.Id, Value: b.Bytes()}); err != nil { + return + } + if historyScribe != nil { var response int historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response) } + cache.RemKey(utils.DESTINATION_PREFIX+dest.Id, cacheCommit(transactionID), transactionID) return } func (ms *MongoStorage) GetReverseDestination(prefix string, skipCache bool, transactionID string) (ids []string, err error) { + cacheKey := utils.REVERSE_DESTINATION_PREFIX + prefix if !skipCache { - if x, ok := cache.Get(utils.REVERSE_DESTINATION_PREFIX + prefix); ok { - if x != nil { - return x.([]string), nil + if x, ok := cache.Get(cacheKey); ok { + if x == nil { + return nil, utils.ErrNotFound } - return nil, utils.ErrNotFound + return x.([]string), nil } } var result struct { @@ -834,24 +851,27 @@ func (ms *MongoStorage) GetReverseDestination(prefix string, skipCache bool, tra } session, col := ms.conn(colRds) defer session.Close() - err = col.Find(bson.M{"key": prefix}).One(&result) - if err == nil { - ids = result.Value + if err = col.Find(bson.M{"key": prefix}).One(&result); err != nil { + if err == mgo.ErrNotFound { + cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return } - cache.Set(utils.REVERSE_DESTINATION_PREFIX+prefix, ids, cacheCommit(transactionID), transactionID) + ids = result.Value + cache.Set(cacheKey, ids, cacheCommit(transactionID), transactionID) return } func (ms *MongoStorage) SetReverseDestination(dest *Destination, transactionID string) (err error) { session, col := ms.conn(colRds) defer session.Close() - cCommig := cacheCommit(transactionID) + cCommit := cacheCommit(transactionID) for _, p := range dest.Prefixes { - _, err = col.Upsert(bson.M{"key": p}, bson.M{"$addToSet": bson.M{"value": dest.Id}}) - if err != nil { + if _, err = col.Upsert(bson.M{"key": p}, bson.M{"$addToSet": bson.M{"value": dest.Id}}); err != nil { break } - cache.RemKey(utils.REVERSE_DESTINATION_PREFIX+p, cCommig, transactionID) + cache.RemKey(utils.REVERSE_DESTINATION_PREFIX+p, cCommit, transactionID) } return } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 35fbd20bd..0bc615bcc 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -275,7 +275,8 @@ func (rs *RedisStorage) RebuildReverseForPrefix(prefix string) error { func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached bool) (err error) { if !utils.IsSliceMember([]string{utils.DESTINATION_PREFIX, utils.REVERSE_DESTINATION_PREFIX, - utils.RATING_PLAN_PREFIX}, prfx) { + utils.RATING_PLAN_PREFIX, + utils.RATING_PROFILE_PREFIX}, prfx) { return utils.NewCGRError(utils.REDIS, utils.MandatoryIEMissingCaps, utils.UnsupportedCachePrefix, @@ -302,6 +303,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached _, err = rs.GetReverseDestination(dataID, false, utils.NonTransactional) case utils.RATING_PLAN_PREFIX: _, err = rs.GetRatingPlan(dataID, false, utils.NonTransactional) + case utils.RATING_PROFILE_PREFIX: + _, err = rs.GetRatingProfile(dataID, false, utils.NonTransactional) } if err != nil { return utils.NewCGRError(utils.REDIS, @@ -335,26 +338,34 @@ func (rs *RedisStorage) GetRatingPlan(key string, skipCache bool, transactionID key = utils.RATING_PLAN_PREFIX + key if !skipCache { if x, ok := cache.Get(key); ok { - if x != nil { - return x.(*RatingPlan), nil + if x == nil { + return nil, utils.ErrNotFound } - return nil, utils.ErrNotFound + return x.(*RatingPlan), nil } } var values []byte - if values, err = rs.Cmd("GET", key).Bytes(); err == nil { - b := bytes.NewBuffer(values) - r, err := zlib.NewReader(b) - if err != nil { - return nil, err + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err.Error() == "wrong type" { // did not find the destination + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound } - out, err := ioutil.ReadAll(r) - if err != nil { - return nil, err - } - r.Close() - rp = new(RatingPlan) - err = rs.ms.Unmarshal(out, rp) + return nil, err + } + b := bytes.NewBuffer(values) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + rp = new(RatingPlan) + err = rs.ms.Unmarshal(out, rp) + if err != nil { + return nil, err } cache.Set(key, rp, cacheCommit(transactionID), transactionID) return @@ -377,19 +388,24 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan, transactionID string) (err func (rs *RedisStorage) GetRatingProfile(key string, skipCache bool, transactionID string) (rpf *RatingProfile, err error) { key = utils.RATING_PROFILE_PREFIX + key - if !skipCache { if x, ok := cache.Get(key); ok { - if x != nil { - return x.(*RatingProfile), nil + if x == nil { + return nil, utils.ErrNotFound } - return nil, utils.ErrNotFound + return x.(*RatingProfile), nil } } var values []byte - if values, err = rs.Cmd("GET", key).Bytes(); err == nil { - rpf = new(RatingProfile) - err = rs.ms.Unmarshal(values, rpf) + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err.Error() == "wrong type" { // did not find the destination + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &rpf); err != nil { + return } cache.Set(key, rpf, cacheCommit(transactionID), transactionID) return @@ -397,12 +413,18 @@ func (rs *RedisStorage) GetRatingProfile(key string, skipCache bool, transaction func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile, transactionID string) (err error) { result, err := rs.ms.Marshal(rpf) - err = rs.Cmd("SET", utils.RATING_PROFILE_PREFIX+rpf.Id, result).Err - if err == nil && historyScribe != nil { + if err != nil { + return err + } + key := utils.RATING_PROFILE_PREFIX + rpf.Id + if err = rs.Cmd("SET", key, result).Err; err != nil { + return + } + if historyScribe != nil { response := 0 go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(false), &response) } - cache.RemKey(utils.RATING_PROFILE_PREFIX+rpf.Id, cacheCommit(transactionID), transactionID) + cache.RemKey(key, cacheCommit(transactionID), transactionID) return } @@ -460,10 +482,10 @@ func (rs *RedisStorage) GetDestination(key string, skipCache bool, transactionID key = utils.DESTINATION_PREFIX + key if !skipCache { if x, ok := cache.Get(key); ok { - if x != nil { - return x.(*Destination), nil + if x == nil { + return nil, utils.ErrNotFound } - return nil, utils.ErrNotFound + return x.(*Destination), nil } } var values []byte @@ -472,23 +494,21 @@ func (rs *RedisStorage) GetDestination(key string, skipCache bool, transactionID cache.Set(key, nil, cacheCommit(transactionID), transactionID) err = utils.ErrNotFound } + return + } + b := bytes.NewBuffer(values) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + err = rs.ms.Unmarshal(out, &dest) + if err != nil { return nil, err - } else { - b := bytes.NewBuffer(values) - r, err := zlib.NewReader(b) - if err != nil { - return nil, err - } - out, err := ioutil.ReadAll(r) - if err != nil { - return nil, err - } - r.Close() - dest = new(Destination) - err = rs.ms.Unmarshal(out, dest) - if err != nil { - return nil, err - } } cache.Set(key, dest, cacheCommit(transactionID), transactionID) return @@ -504,8 +524,10 @@ func (rs *RedisStorage) SetDestination(dest *Destination, transactionID string) w.Write(result) w.Close() key := utils.DESTINATION_PREFIX + dest.Id - err = rs.Cmd("SET", key, b.Bytes()).Err - if err == nil && historyScribe != nil { + if err = rs.Cmd("SET", key, b.Bytes()).Err; err != nil { + return err + } + if historyScribe != nil { response := 0 go historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response) } @@ -513,28 +535,31 @@ func (rs *RedisStorage) SetDestination(dest *Destination, transactionID string) return } -func (rs *RedisStorage) GetReverseDestination(prefix string, skipCache bool, transactionID string) (ids []string, err error) { - prefix = utils.REVERSE_DESTINATION_PREFIX + prefix +func (rs *RedisStorage) GetReverseDestination(key string, skipCache bool, transactionID string) (ids []string, err error) { + key = utils.REVERSE_DESTINATION_PREFIX + key if !skipCache { - if x, ok := cache.Get(prefix); ok { - if x != nil { - return x.([]string), nil + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound } - return nil, utils.ErrNotFound + return x.([]string), nil } } - if ids, err = rs.Cmd("SMEMBERS", prefix).List(); len(ids) > 0 && err == nil { - cache.Set(prefix, ids, cacheCommit(transactionID), transactionID) - return ids, nil + if ids, err = rs.Cmd("SMEMBERS", key).List(); err != nil { + if err.Error() == "wrong type" { // did not find the destination + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return } - return nil, utils.ErrNotFound + cache.Set(key, ids, cacheCommit(transactionID), transactionID) + return } func (rs *RedisStorage) SetReverseDestination(dest *Destination, transactionID string) (err error) { for _, p := range dest.Prefixes { key := utils.REVERSE_DESTINATION_PREFIX + p - err = rs.Cmd("SADD", key, dest.Id).Err - if err != nil { + if err = rs.Cmd("SADD", key, dest.Id).Err; err != nil { break } cache.RemKey(key, cacheCommit(transactionID), transactionID)