mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Get/Set AccountActions mapping in storage, renamed OnlineStor interface into DataDB, populate APIer cache methods with AcccountActionPlans
This commit is contained in:
@@ -1079,6 +1079,7 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach
|
||||
cs.RatingProfiles = cache.CountEntries(utils.RATING_PROFILE_PREFIX)
|
||||
cs.Actions = cache.CountEntries(utils.ACTION_PREFIX)
|
||||
cs.ActionPlans = cache.CountEntries(utils.ACTION_PLAN_PREFIX)
|
||||
cs.AccountActionPlans = cache.CountEntries(utils.AccountActionPlansPrefix)
|
||||
cs.SharedGroups = cache.CountEntries(utils.SHARED_GROUP_PREFIX)
|
||||
cs.DerivedChargers = cache.CountEntries(utils.DERIVEDCHARGERS_PREFIX)
|
||||
cs.LcrProfiles = cache.CountEntries(utils.LCR_PREFIX)
|
||||
@@ -1214,6 +1215,25 @@ func (v1 *ApierV1) GetCacheKeys(args utils.ArgsCacheKeys, reply *utils.ArgsCache
|
||||
reply.ActionPlanIDs = &ids
|
||||
}
|
||||
}
|
||||
|
||||
if args.AccountActionPlanIDs != nil {
|
||||
var ids []string
|
||||
if len(*args.AccountActionPlanIDs) != 0 {
|
||||
for _, id := range *args.AccountActionPlanIDs {
|
||||
if _, hasIt := cache.Get(utils.AccountActionPlansPrefix + id); hasIt {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for _, id := range cache.GetEntryKeys(utils.AccountActionPlansPrefix) {
|
||||
ids = append(ids, id[len(utils.AccountActionPlansPrefix):])
|
||||
}
|
||||
}
|
||||
ids = args.Paginator.PaginateStringSlice(ids)
|
||||
if len(ids) != 0 {
|
||||
reply.AccountActionPlanIDs = &ids
|
||||
}
|
||||
}
|
||||
if args.ActionTriggerIDs != nil {
|
||||
var ids []string
|
||||
if len(*args.ActionTriggerIDs) != 0 {
|
||||
|
||||
5
cache/cache_store.go
vendored
5
cache/cache_store.go
vendored
@@ -152,6 +152,11 @@ func newLruStore() lrustore {
|
||||
} else {
|
||||
c[utils.ACTION_PLAN_PREFIX], _ = lru.New(10000)
|
||||
}
|
||||
if cfg != nil && cfg.AccountActionPlans != nil {
|
||||
c[utils.AccountActionPlansPrefix], _ = lru.New(cfg.AccountActionPlans.Limit)
|
||||
} else {
|
||||
c[utils.AccountActionPlansPrefix], _ = lru.New(10000)
|
||||
}
|
||||
if cfg != nil && cfg.ActionTriggers != nil {
|
||||
c[utils.ACTION_TRIGGER_PREFIX], _ = lru.New(cfg.ActionTriggers.Limit)
|
||||
} else {
|
||||
|
||||
@@ -35,7 +35,7 @@ import (
|
||||
var (
|
||||
rdsITdb *RedisStorage
|
||||
mgoITdb *MongoStorage
|
||||
onStor OnlineStorage
|
||||
onStor DataDB
|
||||
)
|
||||
|
||||
// subtests to be executed for each confDIR
|
||||
@@ -51,6 +51,7 @@ var sTestsOnStorIT = []func(t *testing.T){
|
||||
testOnStorITCacheRatingProfile,
|
||||
testOnStorITCacheActions,
|
||||
testOnStorITCacheActionPlan,
|
||||
testOnStorITCacheAccountActionPlans,
|
||||
testOnStorITCacheActionTriggers,
|
||||
testOnStorITCacheSharedGroup,
|
||||
testOnStorITCacheDerivedChargers,
|
||||
@@ -455,6 +456,25 @@ func testOnStorITCacheActionPlan(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITCacheAccountActionPlans(t *testing.T) {
|
||||
acntID := utils.ConcatenatedKey("cgrates.org", "1001")
|
||||
aAPs := []string{"PACKAGE_10_SHARED_A_5", "USE_SHARED_A", "apl_PACKAGE_1001"}
|
||||
if err := onStor.SetAccountActionPlans(acntID, aAPs); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, hasIt := cache.Get(utils.AccountActionPlansPrefix + acntID); hasIt {
|
||||
t.Error("Already in cache")
|
||||
}
|
||||
if err := onStor.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{acntID}, false); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if itm, hasIt := cache.Get(utils.AccountActionPlansPrefix + acntID); !hasIt {
|
||||
t.Error("Did not cache")
|
||||
} else if rcv := itm.([]string); !reflect.DeepEqual(aAPs, rcv) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", aAPs, rcv)
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITCacheActionTriggers(t *testing.T) {
|
||||
ats := ActionTriggers{
|
||||
&ActionTrigger{
|
||||
@@ -72,6 +72,9 @@ type RatingStorage interface {
|
||||
GetActionPlan(string, bool, string) (*ActionPlan, error)
|
||||
SetActionPlan(string, *ActionPlan, bool, string) error
|
||||
GetAllActionPlans() (map[string]*ActionPlan, error)
|
||||
GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error)
|
||||
SetAccountActionPlans(acntID string, apIDs []string) (err error)
|
||||
|
||||
PushTask(*Task) error
|
||||
PopTask() (*Task, error)
|
||||
// CacheDataFromDB loads data to cache, prefix represents the cache prefix, IDs should be nil if all available data should be loaded
|
||||
@@ -114,7 +117,7 @@ type AccountingStorage interface {
|
||||
}
|
||||
|
||||
// OnlineStorage contains methods to use for administering online data
|
||||
type OnlineStorage interface {
|
||||
type DataDB interface {
|
||||
Storage
|
||||
HasData(string, string) (bool, error)
|
||||
LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error
|
||||
@@ -147,6 +150,8 @@ type OnlineStorage interface {
|
||||
GetActionPlan(string, bool, string) (*ActionPlan, error)
|
||||
SetActionPlan(string, *ActionPlan, bool, string) error
|
||||
GetAllActionPlans() (map[string]*ActionPlan, error)
|
||||
GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error)
|
||||
SetAccountActionPlans(acntID string, apIDs []string) (err error)
|
||||
PushTask(*Task) error
|
||||
PopTask() (*Task, error)
|
||||
LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) error
|
||||
|
||||
@@ -1022,6 +1022,13 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) {
|
||||
return
|
||||
}
|
||||
func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) PushTask(t *Task) error {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
|
||||
@@ -37,6 +37,7 @@ const (
|
||||
colRds = "reverse_destinations"
|
||||
colAct = "actions"
|
||||
colApl = "action_plans"
|
||||
colAAp = "account_action_plans"
|
||||
colTsk = "tasks"
|
||||
colAtr = "action_triggers"
|
||||
colRpl = "rating_plans"
|
||||
@@ -409,6 +410,7 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
utils.RATING_PROFILE_PREFIX,
|
||||
utils.ACTION_PREFIX,
|
||||
utils.ACTION_PLAN_PREFIX,
|
||||
utils.AccountActionPlansPrefix,
|
||||
utils.ACTION_TRIGGER_PREFIX,
|
||||
utils.SHARED_GROUP_PREFIX,
|
||||
utils.DERIVEDCHARGERS_PREFIX,
|
||||
@@ -451,6 +453,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
nrItems = ms.cacheCfg.Actions.Limit
|
||||
case utils.ACTION_PLAN_PREFIX:
|
||||
nrItems = ms.cacheCfg.ActionPlans.Limit
|
||||
case utils.AccountActionPlansPrefix:
|
||||
nrItems = ms.cacheCfg.AccountActionPlans.Limit
|
||||
case utils.ACTION_TRIGGER_PREFIX:
|
||||
nrItems = ms.cacheCfg.ActionTriggers.Limit
|
||||
case utils.SHARED_GROUP_PREFIX:
|
||||
@@ -489,6 +493,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
_, err = ms.GetActions(dataID, true, utils.NonTransactional)
|
||||
case utils.ACTION_PLAN_PREFIX:
|
||||
_, err = ms.GetActionPlan(dataID, true, utils.NonTransactional)
|
||||
case utils.AccountActionPlansPrefix:
|
||||
_, err = ms.GetAccountActionPlans(dataID, true, utils.NonTransactional)
|
||||
case utils.ACTION_TRIGGER_PREFIX:
|
||||
_, err = ms.GetActionTriggers(dataID, true, utils.NonTransactional)
|
||||
case utils.SHARED_GROUP_PREFIX:
|
||||
@@ -1058,7 +1064,7 @@ func (ms *MongoStorage) SetSharedGroup(sg *SharedGroup, transactionID string) (e
|
||||
if _, err = col.Upsert(bson.M{"id": sg.Id}, sg); err != nil {
|
||||
return
|
||||
}
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetAccount(key string) (result *Account, err error) {
|
||||
@@ -1313,7 +1319,6 @@ func (ms *MongoStorage) RemoveAlias(key, transactionID string) (err error) {
|
||||
cCommit := cacheCommit(transactionID)
|
||||
cache.RemKey(key, cCommit, transactionID)
|
||||
session.Close()
|
||||
|
||||
session, col = ms.conn(colRls)
|
||||
defer session.Close()
|
||||
for _, value := range al.Values {
|
||||
@@ -1577,6 +1582,50 @@ func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) {
|
||||
cacheKey := utils.AccountActionPlansPrefix + acntID
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(cacheKey); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.([]string), nil
|
||||
}
|
||||
}
|
||||
session, col := ms.conn(colAAp)
|
||||
defer session.Close()
|
||||
var kv struct {
|
||||
Key string
|
||||
Value []string
|
||||
}
|
||||
if err = col.Find(bson.M{"key": acntID}).One(&kv); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID)
|
||||
err = utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
}
|
||||
apIDs = kv.Value
|
||||
cache.Set(cacheKey, apIDs, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetAccountActionPlans(acntID string, apIDs []string) (err error) {
|
||||
session, col := ms.conn(colAAp)
|
||||
defer session.Close()
|
||||
if len(apIDs) == 0 {
|
||||
if err = col.Remove(bson.M{"key": acntID}); err == nil {
|
||||
cache.RemKey(utils.AccountActionPlansPrefix+acntID, true, utils.NonTransactional)
|
||||
}
|
||||
return
|
||||
}
|
||||
_, err = col.Upsert(bson.M{"key": acntID}, &struct {
|
||||
Key string
|
||||
Value []string
|
||||
}{Key: acntID, Value: apIDs})
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) PushTask(t *Task) error {
|
||||
session, col := ms.conn(colTsk)
|
||||
defer session.Close()
|
||||
|
||||
@@ -201,6 +201,7 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
utils.RATING_PROFILE_PREFIX,
|
||||
utils.ACTION_PREFIX,
|
||||
utils.ACTION_PLAN_PREFIX,
|
||||
utils.AccountActionPlansPrefix,
|
||||
utils.ACTION_TRIGGER_PREFIX,
|
||||
utils.SHARED_GROUP_PREFIX,
|
||||
utils.DERIVEDCHARGERS_PREFIX,
|
||||
@@ -243,6 +244,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
nrItems = rs.cacheCfg.Actions.Limit
|
||||
case utils.ACTION_PLAN_PREFIX:
|
||||
nrItems = rs.cacheCfg.ActionPlans.Limit
|
||||
case utils.AccountActionPlansPrefix:
|
||||
nrItems = rs.cacheCfg.AccountActionPlans.Limit
|
||||
case utils.ACTION_TRIGGER_PREFIX:
|
||||
nrItems = rs.cacheCfg.ActionTriggers.Limit
|
||||
case utils.SHARED_GROUP_PREFIX:
|
||||
@@ -281,6 +284,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
_, err = rs.GetActions(dataID, true, utils.NonTransactional)
|
||||
case utils.ACTION_PLAN_PREFIX:
|
||||
_, err = rs.GetActionPlan(dataID, true, utils.NonTransactional)
|
||||
case utils.AccountActionPlansPrefix:
|
||||
_, err = rs.GetAccountActionPlans(dataID, true, utils.NonTransactional)
|
||||
case utils.ACTION_TRIGGER_PREFIX:
|
||||
_, err = rs.GetActionTriggers(dataID, true, utils.NonTransactional)
|
||||
case utils.SHARED_GROUP_PREFIX:
|
||||
@@ -1187,6 +1192,46 @@ func (rs *RedisStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) {
|
||||
key := utils.AccountActionPlansPrefix + acntID
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.([]string), nil
|
||||
}
|
||||
}
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
|
||||
if err.Error() == "wrong type" { // did not find the destination
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
err = utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
}
|
||||
if err = rs.ms.Unmarshal(values, &apIDs); err != nil {
|
||||
return
|
||||
}
|
||||
cache.Set(key, apIDs, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetAccountActionPlans(acntID string, apIDs []string) (err error) {
|
||||
key := utils.AccountActionPlansPrefix + acntID
|
||||
if len(apIDs) == 0 {
|
||||
return rs.Cmd("DEL", key).Err
|
||||
}
|
||||
var result []byte
|
||||
if result, err = rs.ms.Marshal(apIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = rs.Cmd("SET", key, result).Err; err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) PushTask(t *Task) error {
|
||||
result, err := rs.ms.Marshal(t)
|
||||
if err != nil {
|
||||
|
||||
@@ -599,6 +599,7 @@ type ArgsCache struct {
|
||||
RatingProfileIDs *[]string
|
||||
ActionIDs *[]string
|
||||
ActionPlanIDs *[]string
|
||||
AccountActionPlanIDs *[]string
|
||||
ActionTriggerIDs *[]string
|
||||
SharedGroupIDs *[]string
|
||||
LCRids *[]string
|
||||
@@ -632,6 +633,7 @@ type CacheStats struct {
|
||||
RatingProfiles int
|
||||
Actions int
|
||||
ActionPlans int
|
||||
AccountActionPlans int
|
||||
SharedGroups int
|
||||
DerivedChargers int
|
||||
LcrProfiles int
|
||||
|
||||
Reference in New Issue
Block a user