From 158330d49cc50122b78618b494542fc4fac8a30c Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 10 Jan 2017 18:00:00 +0100 Subject: [PATCH] RemAccountActionPlans in storage --- engine/storage_interface.go | 1 + engine/storage_map.go | 3 ++ engine/storage_mongo_datadb.go | 55 +++++++++++++++++++++++----------- engine/storage_redis.go | 51 ++++++++++++++++++++----------- 4 files changed, 75 insertions(+), 35 deletions(-) diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 5d539c621..921736143 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -152,6 +152,7 @@ type DataDB interface { GetAllActionPlans() (map[string]*ActionPlan, error) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) + RemAccountActionPlans(acntID string, apIDs []string) (err error) PushTask(*Task) error PopTask() (*Task, error) LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) error diff --git a/engine/storage_map.go b/engine/storage_map.go index 3658f9d6f..daa2d0887 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1030,6 +1030,9 @@ func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, trans func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) { return } +func (ms *MapStorage) RemAccountActionPlans(acntID string, apIDs []string) (err error) { + return +} func (ms *MapStorage) PushTask(t *Task) error { ms.mu.Lock() diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 110cdf57f..3c83f5ef0 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -382,7 +382,7 @@ func (ms *MongoStorage) RebuildReverseForPrefix(prefix string) (err error) { return nil } -func (ms *MongoStorage) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) (err error) { +func (ms *MongoStorage) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aaPlIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) (err error) { for key, ids := range map[string][]string{ utils.DESTINATION_PREFIX: dstIDs, utils.REVERSE_DESTINATION_PREFIX: rvDstIDs, @@ -390,7 +390,7 @@ func (ms *MongoStorage) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs utils.RATING_PROFILE_PREFIX: rpfIDs, utils.ACTION_PREFIX: actIDs, utils.ACTION_PLAN_PREFIX: aplIDs, - utils.AccountActionPlansPrefix: aapIDs, + utils.AccountActionPlansPrefix: aaPlIDs, utils.ACTION_TRIGGER_PREFIX: atrgIDs, utils.SHARED_GROUP_PREFIX: sgIDs, utils.LCR_PREFIX: lcrIDs, @@ -1598,7 +1598,7 @@ func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err return } -func (ms *MongoStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) { +func (ms *MongoStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (aPlIDs []string, err error) { cacheKey := utils.AccountActionPlansPrefix + acntID if !skipCache { if x, ok := cache.Get(cacheKey); ok { @@ -1621,27 +1621,21 @@ func (ms *MongoStorage) GetAccountActionPlans(acntID string, skipCache bool, tra } return } - apIDs = kv.Value - cache.Set(cacheKey, apIDs, cacheCommit(transactionID), transactionID) + aPlIDs = kv.Value + cache.Set(cacheKey, aPlIDs, cacheCommit(transactionID), transactionID) return } -func (ms *MongoStorage) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) { +func (ms *MongoStorage) SetAccountActionPlans(acntID string, aPlIDs []string, overwrite bool) (err error) { session, col := ms.conn(colAAp) defer session.Close() - if len(apIDs) == 0 { - if err = col.Remove(bson.M{"key": acntID}); err == nil { - cache.RemKey(utils.AccountActionPlansPrefix+acntID, true, utils.NonTransactional) - } - return - } if !overwrite { - if oldAPIds, err := ms.GetAccountActionPlans(acntID, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { + if oldaPlIDs, err := ms.GetAccountActionPlans(acntID, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { return err } else { - for _, oldAPid := range oldAPIds { - if !utils.IsSliceMember(apIDs, oldAPid) { - apIDs = append(apIDs, oldAPid) + for _, oldAPid := range oldaPlIDs { + if !utils.IsSliceMember(aPlIDs, oldAPid) { + aPlIDs = append(aPlIDs, oldAPid) } } } @@ -1649,7 +1643,34 @@ func (ms *MongoStorage) SetAccountActionPlans(acntID string, apIDs []string, ove _, err = col.Upsert(bson.M{"key": acntID}, &struct { Key string Value []string - }{Key: acntID, Value: apIDs}) + }{Key: acntID, Value: aPlIDs}) + return +} + +func (ms *MongoStorage) RemAccountActionPlans(acntID string, aPlIDs []string) (err error) { + session, col := ms.conn(colAAp) + defer session.Close() + if len(aPlIDs) == 0 { + return col.Remove(bson.M{"key": acntID}) + } + oldAPlIDs, err := ms.GetAccountActionPlans(acntID, false, utils.NonTransactional) + if err != nil { + return err + } + for i := 0; i < len(oldAPlIDs); { + if utils.IsSliceMember(aPlIDs, oldAPlIDs[i]) { + oldAPlIDs = append(oldAPlIDs[:i], oldAPlIDs[i+1:]...) + continue // if we have stripped, don't increase index so we can check next element by next run + } + i++ + } + if len(oldAPlIDs) == 0 { // no more elements, remove the reference + return col.Remove(bson.M{"key": acntID}) + } + _, err = col.Upsert(bson.M{"key": acntID}, &struct { + Key string + Value []string + }{Key: acntID, Value: aPlIDs}) return } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 12ad9edb3..6ea6144f5 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -113,7 +113,7 @@ func (rs *RedisStorage) Flush(ignore string) error { return rs.Cmd("FLUSHDB").Err } -func (rs *RedisStorage) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) (err error) { +func (rs *RedisStorage) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aaPlIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) (err error) { for key, ids := range map[string][]string{ utils.DESTINATION_PREFIX: dstIDs, utils.REVERSE_DESTINATION_PREFIX: rvDstIDs, @@ -121,7 +121,7 @@ func (rs *RedisStorage) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs utils.RATING_PROFILE_PREFIX: rpfIDs, utils.ACTION_PREFIX: actIDs, utils.ACTION_PLAN_PREFIX: aplIDs, - utils.AccountActionPlansPrefix: aapIDs, + utils.AccountActionPlansPrefix: aaPlIDs, utils.ACTION_TRIGGER_PREFIX: atrgIDs, utils.SHARED_GROUP_PREFIX: sgIDs, utils.LCR_PREFIX: lcrIDs, @@ -1208,7 +1208,7 @@ func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err return } -func (rs *RedisStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) { +func (rs *RedisStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (aPlIDs []string, err error) { key := utils.AccountActionPlansPrefix + acntID if !skipCache { if x, ok := cache.Get(key); ok { @@ -1226,37 +1226,52 @@ func (rs *RedisStorage) GetAccountActionPlans(acntID string, skipCache bool, tra } return } - if err = rs.ms.Unmarshal(values, &apIDs); err != nil { + if err = rs.ms.Unmarshal(values, &aPlIDs); err != nil { return } - cache.Set(key, apIDs, cacheCommit(transactionID), transactionID) + cache.Set(key, aPlIDs, cacheCommit(transactionID), transactionID) return } -func (rs *RedisStorage) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) { - key := utils.AccountActionPlansPrefix + acntID - if len(apIDs) == 0 { - return rs.Cmd("DEL", key).Err - } +func (rs *RedisStorage) SetAccountActionPlans(acntID string, aPlIDs []string, overwrite bool) (err error) { if !overwrite { - if oldAPIds, err := rs.GetAccountActionPlans(acntID, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { + if oldaPlIDs, err := rs.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { return err } else { - for _, oldAPid := range oldAPIds { - if !utils.IsSliceMember(apIDs, oldAPid) { - apIDs = append(apIDs, oldAPid) + for _, oldAPid := range oldaPlIDs { + if !utils.IsSliceMember(aPlIDs, oldAPid) { + aPlIDs = append(aPlIDs, oldAPid) } } } } var result []byte - if result, err = rs.ms.Marshal(apIDs); err != nil { + if result, err = rs.ms.Marshal(aPlIDs); err != nil { return err } - if err = rs.Cmd("SET", key, result).Err; err != nil { - return + return rs.Cmd("SET", utils.AccountActionPlansPrefix+acntID, result).Err +} + +func (rs *RedisStorage) RemAccountActionPlans(acntID string, aPlIDs []string) (err error) { + key := utils.AccountActionPlansPrefix + acntID + if len(aPlIDs) == 0 { + return rs.Cmd("DEL", key).Err } - return + oldaPlIDs, err := rs.GetAccountActionPlans(acntID, true, utils.NonTransactional) + if err != nil { + return err + } + for i := 0; i < len(oldaPlIDs); { + if utils.IsSliceMember(aPlIDs, oldaPlIDs[i]) { + oldaPlIDs = append(oldaPlIDs[:i], oldaPlIDs[i+1:]...) + continue // if we have stripped, don't increase index so we can check next element by next run + } + i++ + } + if len(oldaPlIDs) == 0 { // no more elements, remove the reference + return rs.Cmd("DEL", key).Err + } + return rs.Cmd("SET", key, oldaPlIDs).Err } func (rs *RedisStorage) PushTask(t *Task) error {