Populating

This commit is contained in:
Edwardro22
2017-02-07 14:38:01 +02:00
parent 5cc52f2493
commit 14b2a4faf4

View File

@@ -21,6 +21,7 @@ import (
"bytes"
"compress/zlib"
"errors"
"fmt"
"io/ioutil"
"strings"
"sync"
@@ -246,6 +247,120 @@ func (ms *MapStorage) PreloadCacheForPrefix(prefix string) error {
// CacheDataFromDB loads data to cache,
// prefix represents the cache prefix, IDs should be nil if all available data should be loaded
func (ms *MapStorage) CacheDataFromDB(prefix string, IDs []string, mustBeCached bool) (err error) {
if !utils.IsSliceMember([]string{utils.DESTINATION_PREFIX,
utils.REVERSE_DESTINATION_PREFIX,
utils.RATING_PLAN_PREFIX,
utils.RATING_PROFILE_PREFIX,
utils.ACTION_PREFIX,
utils.ACTION_PLAN_PREFIX,
utils.AccountActionPlansPrefix,
utils.ACTION_TRIGGER_PREFIX,
utils.SHARED_GROUP_PREFIX,
utils.DERIVEDCHARGERS_PREFIX,
utils.LCR_PREFIX,
utils.ALIASES_PREFIX,
utils.REVERSE_ALIASES_PREFIX,
utils.ResourceLimitsPrefix}, prefix) {
return utils.NewCGRError(utils.REDIS,
utils.MandatoryIEMissingCaps,
utils.UnsupportedCachePrefix,
fmt.Sprintf("prefix <%s> is not a supported cache prefix", prefix))
}
if IDs == nil {
keyIDs, err := ms.GetKeysForPrefix(prefix)
if err != nil {
return utils.NewCGRError(utils.REDIS,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("redis error <%s> querying keys for prefix: <%s>", prefix))
}
for _, keyID := range keyIDs {
if mustBeCached { // Only consider loading ids which are already in cache
if _, hasIt := cache.Get(keyID); !hasIt {
continue
}
}
IDs = append(IDs, keyID[len(prefix):])
}
var nrItems int
switch prefix {
case utils.DESTINATION_PREFIX:
nrItems = ms.cacheCfg.Destinations.Limit
case utils.REVERSE_DESTINATION_PREFIX:
nrItems = ms.cacheCfg.ReverseDestinations.Limit
case utils.RATING_PLAN_PREFIX:
nrItems = ms.cacheCfg.RatingPlans.Limit
case utils.RATING_PROFILE_PREFIX:
nrItems = ms.cacheCfg.RatingProfiles.Limit
case utils.ACTION_PREFIX:
nrItems = ms.cacheCfg.Actions.Limit
case utils.ACTION_PLAN_PREFIX:
nrItems = ms.cacheCfg.ActionPlans.Limit
case utils.AccountActionPlansPrefix:
nrItems = ms.cacheCfg.AccountActionPlans.Limit
case utils.ACTION_TRIGGER_PREFIX:
nrItems = ms.cacheCfg.ActionTriggers.Limit
case utils.SHARED_GROUP_PREFIX:
nrItems = ms.cacheCfg.SharedGroups.Limit
case utils.DERIVEDCHARGERS_PREFIX:
nrItems = ms.cacheCfg.DerivedChargers.Limit
case utils.LCR_PREFIX:
nrItems = ms.cacheCfg.Lcr.Limit
case utils.ALIASES_PREFIX:
nrItems = ms.cacheCfg.Aliases.Limit
case utils.REVERSE_ALIASES_PREFIX:
nrItems = ms.cacheCfg.ReverseAliases.Limit
case utils.ResourceLimitsPrefix:
nrItems = ms.cacheCfg.ResourceLimits.Limit
}
if nrItems != 0 && nrItems < len(IDs) {
IDs = IDs[:nrItems]
}
}
for _, dataID := range IDs {
if mustBeCached {
if _, hasIt := cache.Get(prefix + dataID); !hasIt { // only cache if previously there
continue
}
}
switch prefix {
case utils.DESTINATION_PREFIX:
_, err = ms.GetDestination(dataID, true, utils.NonTransactional)
case utils.REVERSE_DESTINATION_PREFIX:
_, err = ms.GetReverseDestination(dataID, true, utils.NonTransactional)
case utils.RATING_PLAN_PREFIX:
_, err = ms.GetRatingPlan(dataID, true, utils.NonTransactional)
case utils.RATING_PROFILE_PREFIX:
_, err = ms.GetRatingProfile(dataID, true, utils.NonTransactional)
case utils.ACTION_PREFIX:
_, err = ms.GetActions(dataID, true, utils.NonTransactional)
case utils.ACTION_PLAN_PREFIX:
_, err = ms.GetActionPlan(dataID, true, utils.NonTransactional)
case utils.AccountActionPlansPrefix:
_, err = ms.GetAccountActionPlans(dataID, true, utils.NonTransactional)
case utils.ACTION_TRIGGER_PREFIX:
_, err = ms.GetActionTriggers(dataID, true, utils.NonTransactional)
case utils.SHARED_GROUP_PREFIX:
_, err = ms.GetSharedGroup(dataID, true, utils.NonTransactional)
case utils.DERIVEDCHARGERS_PREFIX:
_, err = ms.GetDerivedChargers(dataID, true, utils.NonTransactional)
case utils.LCR_PREFIX:
_, err = ms.GetLCR(dataID, true, utils.NonTransactional)
case utils.ALIASES_PREFIX:
_, err = ms.GetAlias(dataID, true, utils.NonTransactional)
case utils.REVERSE_ALIASES_PREFIX:
_, err = ms.GetReverseAlias(dataID, true, utils.NonTransactional)
case utils.ResourceLimitsPrefix:
_, err = ms.GetResourceLimit(dataID, true, utils.NonTransactional)
}
if err != nil {
return utils.NewCGRError(utils.REDIS,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error <%s> querying MapStorage for category: <%s>, dataID: <%s>", prefix, dataID))
}
}
return
}
@@ -339,7 +454,7 @@ func (ms *MapStorage) GetRatingProfile(key string, skipCache bool, transactionID
cCommit := cacheCommit(transactionID)
if values, ok := ms.dict[key]; ok {
rpf = new(RatingProfile)
err = ms.ms.Unmarshal(values, rpf)
err = ms.ms.Unmarshal(values, &rpf)
} else {
cache.Set(key, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
@@ -435,14 +550,18 @@ func (ms *MapStorage) GetDestination(key string, skipCache bool, transactionID s
}
r.Close()
dest = new(Destination)
err = ms.ms.Unmarshal(out, dest)
err = ms.ms.Unmarshal(out, &dest)
if err != nil {
cache.Set(key, dest, cCommit, transactionID)
cache.Set(key, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
} else {
}
if dest == nil {
cache.Set(key, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
cache.Set(key, dest, cCommit, transactionID)
return
}
@@ -504,16 +623,19 @@ func (ms *MapStorage) RemoveDestination(destID string, transactionID string) (er
if err != nil {
return
}
ms.mu.Lock()
delete(ms.dict, key)
ms.mu.Unlock()
cache.RemKey(key, cacheCommit(transactionID), transactionID)
for _, prefix := range d.Prefixes {
ms.mu.Lock()
ms.dict.srem(utils.REVERSE_DESTINATION_PREFIX+prefix, destID, ms.ms)
ms.mu.Unlock()
ms.GetReverseDestination(prefix, true, transactionID) // it will recache the destination
}
return
}
@@ -573,9 +695,9 @@ func (ms *MapStorage) GetActions(key string, skipCache bool, transactionID strin
ms.mu.RLock()
defer ms.mu.RUnlock()
cCommit := cacheCommit(transactionID)
key = utils.ACTION_PREFIX + key
cachekey := utils.ACTION_PREFIX + key
if !skipCache {
if x, err := cache.GetCloned(key); err != nil {
if x, err := cache.GetCloned(cachekey); err != nil {
if err.Error() != utils.ItemNotFound {
return nil, err
}
@@ -585,13 +707,13 @@ func (ms *MapStorage) GetActions(key string, skipCache bool, transactionID strin
return x.(Actions), nil
}
}
if values, ok := ms.dict[key]; ok {
if values, ok := ms.dict[cachekey]; ok {
err = ms.ms.Unmarshal(values, &as)
} else {
cache.Set(key, nil, cCommit, transactionID)
cache.Set(cachekey, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
cache.Set(key, as, cCommit, transactionID)
cache.Set(cachekey, as, cCommit, transactionID)
return
}
@@ -599,26 +721,28 @@ func (ms *MapStorage) SetActions(key string, as Actions, transactionID string) (
ms.mu.Lock()
defer ms.mu.Unlock()
cCommit := cacheCommit(transactionID)
cachekey := utils.ACTION_PREFIX + key
result, err := ms.ms.Marshal(&as)
ms.dict[utils.ACTION_PREFIX+key] = result
cache.RemKey(utils.ACTION_PREFIX+key, cCommit, transactionID)
ms.dict[cachekey] = result
cache.RemKey(cachekey, cCommit, transactionID)
return
}
func (ms *MapStorage) RemoveActions(key string, transactionID string) (err error) {
cachekey := utils.ACTION_PREFIX + key
ms.mu.Lock()
defer ms.mu.Unlock()
delete(ms.dict, utils.ACTION_PREFIX+key)
cache.RemKey(utils.ACTION_PREFIX+key, cacheCommit(transactionID), transactionID)
delete(ms.dict, cachekey)
ms.mu.Unlock()
cache.RemKey(cachekey, cacheCommit(transactionID), transactionID)
return
}
func (ms *MapStorage) GetSharedGroup(key string, skipCache bool, transactionID string) (sg *SharedGroup, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key = utils.SHARED_GROUP_PREFIX + key
cachekey := utils.SHARED_GROUP_PREFIX + key
if !skipCache {
if x, ok := cache.Get(key); ok {
if x, ok := cache.Get(cachekey); ok {
if x != nil {
return x.(*SharedGroup), nil
}
@@ -626,13 +750,13 @@ func (ms *MapStorage) GetSharedGroup(key string, skipCache bool, transactionID s
}
}
cCommit := cacheCommit(transactionID)
if values, ok := ms.dict[key]; ok {
if values, ok := ms.dict[cachekey]; ok {
err = ms.ms.Unmarshal(values, &sg)
if err == nil {
cache.Set(key, sg, cCommit, transactionID)
cache.Set(cachekey, sg, cCommit, transactionID)
}
} else {
cache.Set(key, nil, cCommit, transactionID)
cache.Set(cachekey, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
return
@@ -650,10 +774,16 @@ func (ms *MapStorage) SetSharedGroup(sg *SharedGroup, transactionID string) (err
func (ms *MapStorage) GetAccount(key string) (ub *Account, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.ACCOUNT_PREFIX+key]; ok {
ub = &Account{ID: key}
err = ms.ms.Unmarshal(values, ub)
} else {
values, ok := ms.dict[utils.ACCOUNT_PREFIX+key]
if !ok {
return nil, utils.ErrNotFound
}
ub = &Account{ID: key}
err = ms.ms.Unmarshal(values, ub)
if err != nil {
return nil, err
}
if len(values) == 0 {
return nil, utils.ErrNotFound
}
return
@@ -781,41 +911,40 @@ func (ms *MapStorage) RemoveUser(key string) error {
func (ms *MapStorage) GetAlias(key string, skipCache bool, transactionID string) (al *Alias, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
origKey := key
key = utils.ALIASES_PREFIX + key
cacheKey := utils.ALIASES_PREFIX + key
if !skipCache {
if x, ok := cache.Get(key); ok {
if x, ok := cache.Get(cacheKey); ok {
if x != nil {
al = &Alias{Values: x.(AliasValues)}
al.SetId(origKey)
return al, nil
return x.(*Alias), nil
}
return nil, utils.ErrNotFound
}
}
cCommit := cacheCommit(transactionID)
if values, ok := ms.dict[key]; ok {
al = &Alias{Values: make(AliasValues, 0)}
al.SetId(key[len(utils.ALIASES_PREFIX):])
err = ms.ms.Unmarshal(values, &al.Values)
if err == nil {
cache.Set(key, al.Values, cCommit, transactionID)
}
} else {
cache.Set(key, nil, cCommit, transactionID)
values, ok := ms.dict[cacheKey]
if !ok {
cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID)
return nil, utils.ErrNotFound
}
return al, nil
al = &Alias{Values: make(AliasValues, 0)}
al.SetId(key[len(utils.ALIASES_PREFIX):])
err = ms.ms.Unmarshal(values, &al.Values)
if err != nil {
return nil, err
}
cache.Set(key, &al, cacheCommit(transactionID), transactionID)
return
}
func (ms *MapStorage) SetAlias(al *Alias, transactionID string) error {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(al.Values)
if err != nil {
return err
}
key := utils.ALIASES_PREFIX + al.GetId()
ms.mu.Lock()
defer ms.mu.Unlock()
ms.dict[key] = result
cache.RemKey(key, cacheCommit(transactionID), transactionID)
return nil
@@ -833,15 +962,15 @@ func (ms *MapStorage) GetReverseAlias(reverseID string, skipCache bool, transact
return nil, utils.ErrNotFound
}
}
var values []string
cCommit := cacheCommit(transactionID)
if idMap, ok := ms.dict.smembers(key, ms.ms); len(idMap) > 0 && ok {
values = idMap.Slice()
ids = idMap.Slice()
} else {
cache.Set(key, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
cache.Set(key, values, cCommit, transactionID)
cache.Set(key, ids, cCommit, transactionID)
return
}
@@ -1023,6 +1152,8 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error
}
func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key := utils.AccountActionPlansPrefix + acntID
if !skipCache {
if x, ok := cache.Get(key); ok {
@@ -1032,9 +1163,7 @@ func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, trans
return x.([]string), nil
}
}
ms.mu.RLock()
values, ok := ms.dict[key]
ms.mu.RUnlock()
if !ok {
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
@@ -1049,8 +1178,7 @@ func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, trans
func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) {
if !overwrite {
oldaPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
if oldaPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound {
return err
} else {
for _, oldAPid := range oldaPlIDs {
@@ -1060,7 +1188,6 @@ func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overw
}
}
}
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(apIDs)
@@ -1068,12 +1195,11 @@ func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overw
return err
}
ms.dict[utils.AccountActionPlansPrefix+acntID] = result
return
}
func (ms *MapStorage) RemAccountActionPlans(acntID string, apIDs []string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
key := utils.AccountActionPlansPrefix + acntID
if len(apIDs) == 0 {
delete(ms.dict, key)
@@ -1090,10 +1216,17 @@ func (ms *MapStorage) RemAccountActionPlans(acntID string, apIDs []string) (err
}
i++
}
ms.mu.Lock()
defer ms.mu.Unlock()
if len(oldaPlIDs) == 0 {
delete(ms.dict, key)
return
}
var result []byte
if result, err = ms.ms.Marshal(oldaPlIDs); err != nil {
return err
}
ms.dict[key] = result
return
}
@@ -1273,7 +1406,8 @@ func (ms *MapStorage) SetResourceLimit(rl *ResourceLimit, transactionID string)
if err != nil {
return err
}
ms.dict[utils.ResourceLimitsPrefix+rl.ID] = result
key := utils.ResourceLimitsPrefix + rl.ID
ms.dict[key] = result
return nil
}
@@ -1310,9 +1444,10 @@ func (ms *MapStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[s
return
}
func (ms *MapStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) {
cacheKey := dbKey + fieldValKey
ms.mu.RLock()
defer ms.mu.RUnlock()
if x, ok := cache.Get(dbKey + fieldValKey); ok { // Attempt to find in cache first
if x, ok := cache.Get(cacheKey); ok { // Attempt to find in cache first
if x != nil {
return x.(utils.StringMap), nil
}
@@ -1321,7 +1456,7 @@ func (ms *MapStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs ut
// Not found in cache, check in DB
values, ok := ms.dict[dbKey]
if !ok {
cache.Set(dbKey+fieldValKey, nil, true, utils.NonTransactional)
cache.Set(cacheKey, nil, true, utils.NonTransactional)
return nil, utils.ErrNotFound
}
var indexes map[string]map[string]utils.StringMap
@@ -1332,7 +1467,12 @@ func (ms *MapStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs ut
if _, hasIt := indexes[keySplt[0]]; hasIt {
itemIDs = indexes[keySplt[0]][keySplt[1]]
}
cache.Set(dbKey+fieldValKey, itemIDs, true, utils.NonTransactional)
//Verify items
if len(itemIDs) == 0 {
cache.Set(cacheKey, nil, true, utils.NonTransactional)
return nil, utils.ErrNotFound
}
cache.Set(cacheKey, itemIDs, true, utils.NonTransactional)
return
}