diff --git a/apier/v1/apier.go b/apier/v1/apier.go index ee4d29ff4..6708378d0 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1079,6 +1079,7 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach cs.RatingProfiles = cache.CountEntries(utils.RATING_PROFILE_PREFIX) cs.Actions = cache.CountEntries(utils.ACTION_PREFIX) cs.ActionPlans = cache.CountEntries(utils.ACTION_PLAN_PREFIX) + cs.AccountActionPlans = cache.CountEntries(utils.AccountActionPlansPrefix) cs.SharedGroups = cache.CountEntries(utils.SHARED_GROUP_PREFIX) cs.DerivedChargers = cache.CountEntries(utils.DERIVEDCHARGERS_PREFIX) cs.LcrProfiles = cache.CountEntries(utils.LCR_PREFIX) @@ -1214,6 +1215,25 @@ func (v1 *ApierV1) GetCacheKeys(args utils.ArgsCacheKeys, reply *utils.ArgsCache reply.ActionPlanIDs = &ids } } + + if args.AccountActionPlanIDs != nil { + var ids []string + if len(*args.AccountActionPlanIDs) != 0 { + for _, id := range *args.AccountActionPlanIDs { + if _, hasIt := cache.Get(utils.AccountActionPlansPrefix + id); hasIt { + ids = append(ids, id) + } + } + } else { + for _, id := range cache.GetEntryKeys(utils.AccountActionPlansPrefix) { + ids = append(ids, id[len(utils.AccountActionPlansPrefix):]) + } + } + ids = args.Paginator.PaginateStringSlice(ids) + if len(ids) != 0 { + reply.AccountActionPlanIDs = &ids + } + } if args.ActionTriggerIDs != nil { var ids []string if len(*args.ActionTriggerIDs) != 0 { diff --git a/cache/cache_store.go b/cache/cache_store.go index 4e430fe30..c69bb942e 100644 --- a/cache/cache_store.go +++ b/cache/cache_store.go @@ -152,6 +152,11 @@ func newLruStore() lrustore { } else { c[utils.ACTION_PLAN_PREFIX], _ = lru.New(10000) } + if cfg != nil && cfg.AccountActionPlans != nil { + c[utils.AccountActionPlansPrefix], _ = lru.New(cfg.AccountActionPlans.Limit) + } else { + c[utils.AccountActionPlansPrefix], _ = lru.New(10000) + } if cfg != nil && cfg.ActionTriggers != nil { c[utils.ACTION_TRIGGER_PREFIX], _ = lru.New(cfg.ActionTriggers.Limit) } else { diff --git a/engine/onstor_it_test.go b/engine/datadb_it_test.go similarity index 96% rename from engine/onstor_it_test.go rename to engine/datadb_it_test.go index 2ed5b4d74..ea62721a3 100644 --- a/engine/onstor_it_test.go +++ b/engine/datadb_it_test.go @@ -35,7 +35,7 @@ import ( var ( rdsITdb *RedisStorage mgoITdb *MongoStorage - onStor OnlineStorage + onStor DataDB ) // subtests to be executed for each confDIR @@ -51,6 +51,7 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCacheRatingProfile, testOnStorITCacheActions, testOnStorITCacheActionPlan, + testOnStorITCacheAccountActionPlans, testOnStorITCacheActionTriggers, testOnStorITCacheSharedGroup, testOnStorITCacheDerivedChargers, @@ -455,6 +456,25 @@ func testOnStorITCacheActionPlan(t *testing.T) { } } +func testOnStorITCacheAccountActionPlans(t *testing.T) { + acntID := utils.ConcatenatedKey("cgrates.org", "1001") + aAPs := []string{"PACKAGE_10_SHARED_A_5", "USE_SHARED_A", "apl_PACKAGE_1001"} + if err := onStor.SetAccountActionPlans(acntID, aAPs); err != nil { + t.Error(err) + } + if _, hasIt := cache.Get(utils.AccountActionPlansPrefix + acntID); hasIt { + t.Error("Already in cache") + } + if err := onStor.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{acntID}, false); err != nil { + t.Error(err) + } + if itm, hasIt := cache.Get(utils.AccountActionPlansPrefix + acntID); !hasIt { + t.Error("Did not cache") + } else if rcv := itm.([]string); !reflect.DeepEqual(aAPs, rcv) { + t.Errorf("Expecting: %+v, received: %+v", aAPs, rcv) + } +} + func testOnStorITCacheActionTriggers(t *testing.T) { ats := ActionTriggers{ &ActionTrigger{ diff --git a/engine/storage_interface.go b/engine/storage_interface.go index e83b62201..9b6b9b0b9 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -72,6 +72,9 @@ type RatingStorage interface { GetActionPlan(string, bool, string) (*ActionPlan, error) SetActionPlan(string, *ActionPlan, bool, string) error GetAllActionPlans() (map[string]*ActionPlan, error) + GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) + SetAccountActionPlans(acntID string, apIDs []string) (err error) + PushTask(*Task) error PopTask() (*Task, error) // CacheDataFromDB loads data to cache, prefix represents the cache prefix, IDs should be nil if all available data should be loaded @@ -114,7 +117,7 @@ type AccountingStorage interface { } // OnlineStorage contains methods to use for administering online data -type OnlineStorage interface { +type DataDB interface { Storage HasData(string, string) (bool, error) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error @@ -147,6 +150,8 @@ type OnlineStorage interface { GetActionPlan(string, bool, string) (*ActionPlan, error) SetActionPlan(string, *ActionPlan, bool, string) error GetAllActionPlans() (map[string]*ActionPlan, error) + GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) + SetAccountActionPlans(acntID string, apIDs []string) (err error) PushTask(*Task) error PopTask() (*Task, error) LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) error diff --git a/engine/storage_map.go b/engine/storage_map.go index 0ad03fa04..9f9816fa1 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1022,6 +1022,13 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error return } +func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) { + return +} +func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string) (err error) { + return +} + func (ms *MapStorage) PushTask(t *Task) error { ms.mu.Lock() defer ms.mu.Unlock() diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index b12b543fc..4a908afa8 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -37,6 +37,7 @@ const ( colRds = "reverse_destinations" colAct = "actions" colApl = "action_plans" + colAAp = "account_action_plans" colTsk = "tasks" colAtr = "action_triggers" colRpl = "rating_plans" @@ -409,6 +410,7 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, + utils.AccountActionPlansPrefix, utils.ACTION_TRIGGER_PREFIX, utils.SHARED_GROUP_PREFIX, utils.DERIVEDCHARGERS_PREFIX, @@ -451,6 +453,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached nrItems = ms.cacheCfg.Actions.Limit case utils.ACTION_PLAN_PREFIX: nrItems = ms.cacheCfg.ActionPlans.Limit + case utils.AccountActionPlansPrefix: + nrItems = ms.cacheCfg.AccountActionPlans.Limit case utils.ACTION_TRIGGER_PREFIX: nrItems = ms.cacheCfg.ActionTriggers.Limit case utils.SHARED_GROUP_PREFIX: @@ -489,6 +493,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached _, err = ms.GetActions(dataID, true, utils.NonTransactional) case utils.ACTION_PLAN_PREFIX: _, err = ms.GetActionPlan(dataID, true, utils.NonTransactional) + case utils.AccountActionPlansPrefix: + _, err = ms.GetAccountActionPlans(dataID, true, utils.NonTransactional) case utils.ACTION_TRIGGER_PREFIX: _, err = ms.GetActionTriggers(dataID, true, utils.NonTransactional) case utils.SHARED_GROUP_PREFIX: @@ -1058,7 +1064,7 @@ func (ms *MongoStorage) SetSharedGroup(sg *SharedGroup, transactionID string) (e if _, err = col.Upsert(bson.M{"id": sg.Id}, sg); err != nil { return } - return err + return } func (ms *MongoStorage) GetAccount(key string) (result *Account, err error) { @@ -1313,7 +1319,6 @@ func (ms *MongoStorage) RemoveAlias(key, transactionID string) (err error) { cCommit := cacheCommit(transactionID) cache.RemKey(key, cCommit, transactionID) session.Close() - session, col = ms.conn(colRls) defer session.Close() for _, value := range al.Values { @@ -1577,6 +1582,50 @@ func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err return } +func (ms *MongoStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) { + cacheKey := utils.AccountActionPlansPrefix + acntID + if !skipCache { + if x, ok := cache.Get(cacheKey); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.([]string), nil + } + } + session, col := ms.conn(colAAp) + defer session.Close() + var kv struct { + Key string + Value []string + } + if err = col.Find(bson.M{"key": acntID}).One(&kv); err != nil { + if err == mgo.ErrNotFound { + cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return + } + apIDs = kv.Value + cache.Set(cacheKey, apIDs, cacheCommit(transactionID), transactionID) + return +} + +func (ms *MongoStorage) SetAccountActionPlans(acntID string, apIDs []string) (err error) { + session, col := ms.conn(colAAp) + defer session.Close() + if len(apIDs) == 0 { + if err = col.Remove(bson.M{"key": acntID}); err == nil { + cache.RemKey(utils.AccountActionPlansPrefix+acntID, true, utils.NonTransactional) + } + return + } + _, err = col.Upsert(bson.M{"key": acntID}, &struct { + Key string + Value []string + }{Key: acntID, Value: apIDs}) + return +} + func (ms *MongoStorage) PushTask(t *Task) error { session, col := ms.conn(colTsk) defer session.Close() diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 7af1e709f..ec5c1e0be 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -201,6 +201,7 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, + utils.AccountActionPlansPrefix, utils.ACTION_TRIGGER_PREFIX, utils.SHARED_GROUP_PREFIX, utils.DERIVEDCHARGERS_PREFIX, @@ -243,6 +244,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached nrItems = rs.cacheCfg.Actions.Limit case utils.ACTION_PLAN_PREFIX: nrItems = rs.cacheCfg.ActionPlans.Limit + case utils.AccountActionPlansPrefix: + nrItems = rs.cacheCfg.AccountActionPlans.Limit case utils.ACTION_TRIGGER_PREFIX: nrItems = rs.cacheCfg.ActionTriggers.Limit case utils.SHARED_GROUP_PREFIX: @@ -281,6 +284,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached _, err = rs.GetActions(dataID, true, utils.NonTransactional) case utils.ACTION_PLAN_PREFIX: _, err = rs.GetActionPlan(dataID, true, utils.NonTransactional) + case utils.AccountActionPlansPrefix: + _, err = rs.GetAccountActionPlans(dataID, true, utils.NonTransactional) case utils.ACTION_TRIGGER_PREFIX: _, err = rs.GetActionTriggers(dataID, true, utils.NonTransactional) case utils.SHARED_GROUP_PREFIX: @@ -1187,6 +1192,46 @@ func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err return } +func (rs *RedisStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) { + key := utils.AccountActionPlansPrefix + acntID + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.([]string), nil + } + } + var values []byte + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err.Error() == "wrong type" { // did not find the destination + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &apIDs); err != nil { + return + } + cache.Set(key, apIDs, cacheCommit(transactionID), transactionID) + return +} + +func (rs *RedisStorage) SetAccountActionPlans(acntID string, apIDs []string) (err error) { + key := utils.AccountActionPlansPrefix + acntID + if len(apIDs) == 0 { + return rs.Cmd("DEL", key).Err + } + var result []byte + if result, err = rs.ms.Marshal(apIDs); err != nil { + return err + } + if err = rs.Cmd("SET", key, result).Err; err != nil { + return + } + return +} + func (rs *RedisStorage) PushTask(t *Task) error { result, err := rs.ms.Marshal(t) if err != nil { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index b0a16e87e..aa9b2244b 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -599,6 +599,7 @@ type ArgsCache struct { RatingProfileIDs *[]string ActionIDs *[]string ActionPlanIDs *[]string + AccountActionPlanIDs *[]string ActionTriggerIDs *[]string SharedGroupIDs *[]string LCRids *[]string @@ -632,6 +633,7 @@ type CacheStats struct { RatingProfiles int Actions int ActionPlans int + AccountActionPlans int SharedGroups int DerivedChargers int LcrProfiles int