ActionPlan caching into CacheDataFromDB

This commit is contained in:
DanB
2016-12-04 18:01:53 +01:00
parent f7a3cea9a2
commit ea82801c30
3 changed files with 118 additions and 47 deletions

View File

@@ -49,6 +49,7 @@ var sTestsOnStorIT = []func(t *testing.T){
testOnStorITCacheRatingPlan,
testOnStorITCacheRatingProfile,
testOnStorITCacheActions,
testOnStorITCacheActionPlan,
}
func TestOnStorITRedisConnect(t *testing.T) {
@@ -393,3 +394,54 @@ func testOnStorITCacheActions(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", acts, rcv)
}
}
func testOnStorITCacheActionPlan(t *testing.T) {
ap := &ActionPlan{
Id: "MORE_MINUTES",
AccountIDs: utils.StringMap{"vdf:minitsboy": true},
ActionTimings: []*ActionTiming{
&ActionTiming{
Uuid: utils.GenUUID(),
Timing: &RateInterval{
Timing: &RITiming{
Years: utils.Years{2012},
Months: utils.Months{},
MonthDays: utils.MonthDays{},
WeekDays: utils.WeekDays{},
StartTime: utils.ASAP,
},
},
Weight: 10,
ActionsID: "MINI",
},
&ActionTiming{
Uuid: utils.GenUUID(),
Timing: &RateInterval{
Timing: &RITiming{
Years: utils.Years{2012},
Months: utils.Months{},
MonthDays: utils.MonthDays{},
WeekDays: utils.WeekDays{},
StartTime: utils.ASAP,
},
},
Weight: 10,
ActionsID: "SHARED",
},
},
}
if err := onStor.SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil {
t.Error(err)
}
if _, hasIt := cache.Get(utils.ACTION_PLAN_PREFIX + ap.Id); hasIt {
t.Error("Already in cache")
}
if err := onStor.CacheDataFromDB(utils.ACTION_PLAN_PREFIX, []string{ap.Id}, false); err != nil {
t.Error(err)
}
if itm, hasIt := cache.Get(utils.ACTION_PLAN_PREFIX + ap.Id); !hasIt {
t.Error("Did not cache")
} else if rcv := itm.(*ActionPlan); !reflect.DeepEqual(ap, rcv) {
t.Errorf("Expecting: %+v, received: %+v", ap, rcv)
}
}

View File

@@ -477,7 +477,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
utils.REVERSE_DESTINATION_PREFIX,
utils.RATING_PLAN_PREFIX,
utils.RATING_PROFILE_PREFIX,
utils.ACTION_PREFIX}, prfx) {
utils.ACTION_PREFIX,
utils.ACTION_PLAN_PREFIX}, prfx) {
return utils.NewCGRError(utils.REDIS,
utils.MandatoryIEMissingCaps,
utils.UnsupportedCachePrefix,
@@ -508,6 +509,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
_, err = ms.GetRatingProfile(dataID, false, utils.NonTransactional)
case utils.ACTION_PREFIX:
_, err = ms.GetActions(dataID, false, utils.NonTransactional)
case utils.ACTION_PLAN_PREFIX:
_, err = ms.GetActionPlan(dataID, false, utils.NonTransactional)
}
if err != nil {
return utils.NewCGRError(utils.MONGO,
@@ -1451,8 +1454,9 @@ func (ms *MongoStorage) RemoveActionTriggers(key string, transactionID string) e
}
func (ms *MongoStorage) GetActionPlan(key string, skipCache bool, transactionID string) (ats *ActionPlan, err error) {
cacheKey := utils.ACTION_PLAN_PREFIX + key
if !skipCache {
if x, err := cache.GetCloned(utils.ACTION_PLAN_PREFIX + key); err != nil {
if x, err := cache.GetCloned(cacheKey); err != nil {
if err.Error() != utils.ItemNotFound { // Only consider cache if item was found
return nil, err
}
@@ -1468,39 +1472,42 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool, transactionID
}
session, col := ms.conn(colApl)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&kv)
if err == nil {
b := bytes.NewBuffer(kv.Value)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
err = ms.ms.Unmarshal(out, &ats)
if err != nil {
return nil, err
if err = col.Find(bson.M{"key": key}).One(&kv); err != nil {
if err == mgo.ErrNotFound {
cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
return
}
cache.Set(utils.ACTION_PLAN_PREFIX+key, ats, cacheCommit(transactionID), transactionID)
b := bytes.NewBuffer(kv.Value)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
if err = ms.ms.Unmarshal(out, &ats); err != nil {
return nil, err
}
cache.Set(cacheKey, ats, cacheCommit(transactionID), transactionID)
return
}
func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool, transactionID string) error {
func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool, transactionID string) (err error) {
session, col := ms.conn(colApl)
defer session.Close()
dbKey := utils.ACTION_PLAN_PREFIX + key
// clean dots from account ids map
cCommit := cacheCommit(transactionID)
if len(ats.ActionTimings) == 0 {
cache.RemKey(utils.ACTION_PLAN_PREFIX+key, cCommit, transactionID)
err := col.Remove(bson.M{"key": key})
if err != mgo.ErrNotFound {
return err
cache.RemKey(dbKey, cCommit, transactionID)
if err = col.Remove(bson.M{"key": key}); err != nil && err == mgo.ErrNotFound {
err = nil // NotFound is good
}
return nil
return
}
if !overwrite {
// get existing action plan to merge the account ids
@@ -1521,11 +1528,13 @@ func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan, overwrite boo
w := zlib.NewWriter(&b)
w.Write(result)
w.Close()
_, err = col.Upsert(bson.M{"key": key}, &struct {
if _, err = col.Upsert(bson.M{"key": key}, &struct {
Key string
Value []byte
}{Key: key, Value: b.Bytes()})
cache.RemKey(utils.ACTION_PLAN_PREFIX+key, cCommit, transactionID)
}{Key: key, Value: b.Bytes()}); err != nil {
return
}
cache.RemKey(dbKey, cCommit, transactionID)
return err
}

View File

@@ -277,7 +277,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
utils.REVERSE_DESTINATION_PREFIX,
utils.RATING_PLAN_PREFIX,
utils.RATING_PROFILE_PREFIX,
utils.ACTION_PREFIX}, prfx) {
utils.ACTION_PREFIX,
utils.ACTION_PLAN_PREFIX}, prfx) {
return utils.NewCGRError(utils.REDIS,
utils.MandatoryIEMissingCaps,
utils.UnsupportedCachePrefix,
@@ -308,6 +309,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
_, err = rs.GetRatingProfile(dataID, false, utils.NonTransactional)
case utils.ACTION_PREFIX:
_, err = rs.GetActions(dataID, false, utils.NonTransactional)
case utils.ACTION_PLAN_PREFIX:
_, err = rs.GetActionPlan(dataID, false, utils.NonTransactional)
}
if err != nil {
return utils.NewCGRError(utils.REDIS,
@@ -1078,19 +1081,25 @@ func (rs *RedisStorage) GetActionPlan(key string, skipCache bool, transactionID
}
}
var values []byte
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
b := bytes.NewBuffer(values)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
if err.Error() == "wrong type" { // did not find the destination
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
ats = &ActionPlan{}
err = rs.ms.Unmarshal(out, &ats)
return
}
b := bytes.NewBuffer(values)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
if err = rs.ms.Unmarshal(out, &ats); err != nil {
return
}
cache.Set(key, ats, cacheCommit(transactionID), transactionID)
return
@@ -1098,11 +1107,12 @@ func (rs *RedisStorage) GetActionPlan(key string, skipCache bool, transactionID
func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool, transactionID string) (err error) {
cCommit := cacheCommit(transactionID)
dbKey := utils.ACTION_PLAN_PREFIX + key
if len(ats.ActionTimings) == 0 {
// delete the key
err = rs.Cmd("DEL", utils.ACTION_PLAN_PREFIX+key).Err
cache.RemKey(utils.ACTION_PLAN_PREFIX+key, cCommit, transactionID)
return err
err = rs.Cmd("DEL", dbKey).Err
cache.RemKey(dbKey, cCommit, transactionID)
return
}
if !overwrite {
// get existing action plan to merge the account ids
@@ -1114,8 +1124,6 @@ func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan, overwrite boo
ats.AccountIDs[accID] = true
}
}
// do not keep this in cache (will be obsolete)
cache.RemKey(utils.ACTION_PLAN_PREFIX+key, cCommit, transactionID)
}
result, err := rs.ms.Marshal(ats)
if err != nil {
@@ -1125,8 +1133,10 @@ func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan, overwrite boo
w := zlib.NewWriter(&b)
w.Write(result)
w.Close()
err = rs.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, b.Bytes()).Err
cache.RemKey(utils.ACTION_PLAN_PREFIX+key, cCommit, transactionID)
if err = rs.Cmd("SET", dbKey, b.Bytes()).Err; err != nil {
return
}
cache.RemKey(dbKey, cCommit, transactionID)
return
}