From 14b2a4faf44d46a911f6be24a9f2a5331194b4c3 Mon Sep 17 00:00:00 2001 From: Edwardro22 Date: Tue, 7 Feb 2017 14:38:01 +0200 Subject: [PATCH] Populating --- engine/storage_map.go | 252 ++++++++++++++++++++++++++++++++---------- 1 file changed, 196 insertions(+), 56 deletions(-) diff --git a/engine/storage_map.go b/engine/storage_map.go index 722572cf7..86ce81a65 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -21,6 +21,7 @@ import ( "bytes" "compress/zlib" "errors" + "fmt" "io/ioutil" "strings" "sync" @@ -246,6 +247,120 @@ func (ms *MapStorage) PreloadCacheForPrefix(prefix string) error { // CacheDataFromDB loads data to cache, // prefix represents the cache prefix, IDs should be nil if all available data should be loaded func (ms *MapStorage) CacheDataFromDB(prefix string, IDs []string, mustBeCached bool) (err error) { + if !utils.IsSliceMember([]string{utils.DESTINATION_PREFIX, + utils.REVERSE_DESTINATION_PREFIX, + utils.RATING_PLAN_PREFIX, + utils.RATING_PROFILE_PREFIX, + utils.ACTION_PREFIX, + utils.ACTION_PLAN_PREFIX, + utils.AccountActionPlansPrefix, + utils.ACTION_TRIGGER_PREFIX, + utils.SHARED_GROUP_PREFIX, + utils.DERIVEDCHARGERS_PREFIX, + utils.LCR_PREFIX, + utils.ALIASES_PREFIX, + utils.REVERSE_ALIASES_PREFIX, + utils.ResourceLimitsPrefix}, prefix) { + return utils.NewCGRError(utils.REDIS, + utils.MandatoryIEMissingCaps, + utils.UnsupportedCachePrefix, + fmt.Sprintf("prefix <%s> is not a supported cache prefix", prefix)) + } + + if IDs == nil { + keyIDs, err := ms.GetKeysForPrefix(prefix) + if err != nil { + return utils.NewCGRError(utils.REDIS, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("redis error <%s> querying keys for prefix: <%s>", prefix)) + } + for _, keyID := range keyIDs { + if mustBeCached { // Only consider loading ids which are already in cache + if _, hasIt := cache.Get(keyID); !hasIt { + continue + } + } + IDs = append(IDs, keyID[len(prefix):]) + } + var nrItems int + switch prefix { + case utils.DESTINATION_PREFIX: + nrItems = ms.cacheCfg.Destinations.Limit + case utils.REVERSE_DESTINATION_PREFIX: + nrItems = ms.cacheCfg.ReverseDestinations.Limit + case utils.RATING_PLAN_PREFIX: + nrItems = ms.cacheCfg.RatingPlans.Limit + case utils.RATING_PROFILE_PREFIX: + nrItems = ms.cacheCfg.RatingProfiles.Limit + case utils.ACTION_PREFIX: + nrItems = ms.cacheCfg.Actions.Limit + case utils.ACTION_PLAN_PREFIX: + nrItems = ms.cacheCfg.ActionPlans.Limit + case utils.AccountActionPlansPrefix: + nrItems = ms.cacheCfg.AccountActionPlans.Limit + case utils.ACTION_TRIGGER_PREFIX: + nrItems = ms.cacheCfg.ActionTriggers.Limit + case utils.SHARED_GROUP_PREFIX: + nrItems = ms.cacheCfg.SharedGroups.Limit + case utils.DERIVEDCHARGERS_PREFIX: + nrItems = ms.cacheCfg.DerivedChargers.Limit + case utils.LCR_PREFIX: + nrItems = ms.cacheCfg.Lcr.Limit + case utils.ALIASES_PREFIX: + nrItems = ms.cacheCfg.Aliases.Limit + case utils.REVERSE_ALIASES_PREFIX: + nrItems = ms.cacheCfg.ReverseAliases.Limit + case utils.ResourceLimitsPrefix: + nrItems = ms.cacheCfg.ResourceLimits.Limit + } + if nrItems != 0 && nrItems < len(IDs) { + IDs = IDs[:nrItems] + } + } + for _, dataID := range IDs { + if mustBeCached { + if _, hasIt := cache.Get(prefix + dataID); !hasIt { // only cache if previously there + continue + } + } + switch prefix { + case utils.DESTINATION_PREFIX: + _, err = ms.GetDestination(dataID, true, utils.NonTransactional) + case utils.REVERSE_DESTINATION_PREFIX: + _, err = ms.GetReverseDestination(dataID, true, utils.NonTransactional) + case utils.RATING_PLAN_PREFIX: + _, err = ms.GetRatingPlan(dataID, true, utils.NonTransactional) + case utils.RATING_PROFILE_PREFIX: + _, err = ms.GetRatingProfile(dataID, true, utils.NonTransactional) + case utils.ACTION_PREFIX: + _, err = ms.GetActions(dataID, true, utils.NonTransactional) + case utils.ACTION_PLAN_PREFIX: + _, err = ms.GetActionPlan(dataID, true, utils.NonTransactional) + case utils.AccountActionPlansPrefix: + _, err = ms.GetAccountActionPlans(dataID, true, utils.NonTransactional) + case utils.ACTION_TRIGGER_PREFIX: + _, err = ms.GetActionTriggers(dataID, true, utils.NonTransactional) + case utils.SHARED_GROUP_PREFIX: + _, err = ms.GetSharedGroup(dataID, true, utils.NonTransactional) + case utils.DERIVEDCHARGERS_PREFIX: + _, err = ms.GetDerivedChargers(dataID, true, utils.NonTransactional) + case utils.LCR_PREFIX: + _, err = ms.GetLCR(dataID, true, utils.NonTransactional) + case utils.ALIASES_PREFIX: + _, err = ms.GetAlias(dataID, true, utils.NonTransactional) + case utils.REVERSE_ALIASES_PREFIX: + _, err = ms.GetReverseAlias(dataID, true, utils.NonTransactional) + case utils.ResourceLimitsPrefix: + _, err = ms.GetResourceLimit(dataID, true, utils.NonTransactional) + } + if err != nil { + return utils.NewCGRError(utils.REDIS, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error <%s> querying MapStorage for category: <%s>, dataID: <%s>", prefix, dataID)) + } + } return } @@ -339,7 +454,7 @@ func (ms *MapStorage) GetRatingProfile(key string, skipCache bool, transactionID cCommit := cacheCommit(transactionID) if values, ok := ms.dict[key]; ok { rpf = new(RatingProfile) - err = ms.ms.Unmarshal(values, rpf) + err = ms.ms.Unmarshal(values, &rpf) } else { cache.Set(key, nil, cCommit, transactionID) return nil, utils.ErrNotFound @@ -435,14 +550,18 @@ func (ms *MapStorage) GetDestination(key string, skipCache bool, transactionID s } r.Close() dest = new(Destination) - err = ms.ms.Unmarshal(out, dest) + err = ms.ms.Unmarshal(out, &dest) if err != nil { - cache.Set(key, dest, cCommit, transactionID) + cache.Set(key, nil, cCommit, transactionID) + return nil, utils.ErrNotFound } - } else { + } + if dest == nil { cache.Set(key, nil, cCommit, transactionID) return nil, utils.ErrNotFound } + cache.Set(key, dest, cCommit, transactionID) + return } @@ -504,16 +623,19 @@ func (ms *MapStorage) RemoveDestination(destID string, transactionID string) (er if err != nil { return } + ms.mu.Lock() delete(ms.dict, key) ms.mu.Unlock() cache.RemKey(key, cacheCommit(transactionID), transactionID) + for _, prefix := range d.Prefixes { ms.mu.Lock() ms.dict.srem(utils.REVERSE_DESTINATION_PREFIX+prefix, destID, ms.ms) ms.mu.Unlock() ms.GetReverseDestination(prefix, true, transactionID) // it will recache the destination } + return } @@ -573,9 +695,9 @@ func (ms *MapStorage) GetActions(key string, skipCache bool, transactionID strin ms.mu.RLock() defer ms.mu.RUnlock() cCommit := cacheCommit(transactionID) - key = utils.ACTION_PREFIX + key + cachekey := utils.ACTION_PREFIX + key if !skipCache { - if x, err := cache.GetCloned(key); err != nil { + if x, err := cache.GetCloned(cachekey); err != nil { if err.Error() != utils.ItemNotFound { return nil, err } @@ -585,13 +707,13 @@ func (ms *MapStorage) GetActions(key string, skipCache bool, transactionID strin return x.(Actions), nil } } - if values, ok := ms.dict[key]; ok { + if values, ok := ms.dict[cachekey]; ok { err = ms.ms.Unmarshal(values, &as) } else { - cache.Set(key, nil, cCommit, transactionID) + cache.Set(cachekey, nil, cCommit, transactionID) return nil, utils.ErrNotFound } - cache.Set(key, as, cCommit, transactionID) + cache.Set(cachekey, as, cCommit, transactionID) return } @@ -599,26 +721,28 @@ func (ms *MapStorage) SetActions(key string, as Actions, transactionID string) ( ms.mu.Lock() defer ms.mu.Unlock() cCommit := cacheCommit(transactionID) + cachekey := utils.ACTION_PREFIX + key result, err := ms.ms.Marshal(&as) - ms.dict[utils.ACTION_PREFIX+key] = result - cache.RemKey(utils.ACTION_PREFIX+key, cCommit, transactionID) + ms.dict[cachekey] = result + cache.RemKey(cachekey, cCommit, transactionID) return } func (ms *MapStorage) RemoveActions(key string, transactionID string) (err error) { + cachekey := utils.ACTION_PREFIX + key ms.mu.Lock() - defer ms.mu.Unlock() - delete(ms.dict, utils.ACTION_PREFIX+key) - cache.RemKey(utils.ACTION_PREFIX+key, cacheCommit(transactionID), transactionID) + delete(ms.dict, cachekey) + ms.mu.Unlock() + cache.RemKey(cachekey, cacheCommit(transactionID), transactionID) return } func (ms *MapStorage) GetSharedGroup(key string, skipCache bool, transactionID string) (sg *SharedGroup, err error) { ms.mu.RLock() defer ms.mu.RUnlock() - key = utils.SHARED_GROUP_PREFIX + key + cachekey := utils.SHARED_GROUP_PREFIX + key if !skipCache { - if x, ok := cache.Get(key); ok { + if x, ok := cache.Get(cachekey); ok { if x != nil { return x.(*SharedGroup), nil } @@ -626,13 +750,13 @@ func (ms *MapStorage) GetSharedGroup(key string, skipCache bool, transactionID s } } cCommit := cacheCommit(transactionID) - if values, ok := ms.dict[key]; ok { + if values, ok := ms.dict[cachekey]; ok { err = ms.ms.Unmarshal(values, &sg) if err == nil { - cache.Set(key, sg, cCommit, transactionID) + cache.Set(cachekey, sg, cCommit, transactionID) } } else { - cache.Set(key, nil, cCommit, transactionID) + cache.Set(cachekey, nil, cCommit, transactionID) return nil, utils.ErrNotFound } return @@ -650,10 +774,16 @@ func (ms *MapStorage) SetSharedGroup(sg *SharedGroup, transactionID string) (err func (ms *MapStorage) GetAccount(key string) (ub *Account, err error) { ms.mu.RLock() defer ms.mu.RUnlock() - if values, ok := ms.dict[utils.ACCOUNT_PREFIX+key]; ok { - ub = &Account{ID: key} - err = ms.ms.Unmarshal(values, ub) - } else { + values, ok := ms.dict[utils.ACCOUNT_PREFIX+key] + if !ok { + return nil, utils.ErrNotFound + } + ub = &Account{ID: key} + err = ms.ms.Unmarshal(values, ub) + if err != nil { + return nil, err + } + if len(values) == 0 { return nil, utils.ErrNotFound } return @@ -781,41 +911,40 @@ func (ms *MapStorage) RemoveUser(key string) error { func (ms *MapStorage) GetAlias(key string, skipCache bool, transactionID string) (al *Alias, err error) { ms.mu.RLock() defer ms.mu.RUnlock() - origKey := key - key = utils.ALIASES_PREFIX + key + cacheKey := utils.ALIASES_PREFIX + key if !skipCache { - if x, ok := cache.Get(key); ok { + if x, ok := cache.Get(cacheKey); ok { if x != nil { - al = &Alias{Values: x.(AliasValues)} - al.SetId(origKey) - return al, nil + return x.(*Alias), nil } return nil, utils.ErrNotFound } } - cCommit := cacheCommit(transactionID) - if values, ok := ms.dict[key]; ok { - al = &Alias{Values: make(AliasValues, 0)} - al.SetId(key[len(utils.ALIASES_PREFIX):]) - err = ms.ms.Unmarshal(values, &al.Values) - if err == nil { - cache.Set(key, al.Values, cCommit, transactionID) - } - } else { - cache.Set(key, nil, cCommit, transactionID) + values, ok := ms.dict[cacheKey] + if !ok { + cache.Set(cacheKey, nil, cacheCommit(transactionID), transactionID) return nil, utils.ErrNotFound } - return al, nil + al = &Alias{Values: make(AliasValues, 0)} + al.SetId(key[len(utils.ALIASES_PREFIX):]) + err = ms.ms.Unmarshal(values, &al.Values) + if err != nil { + return nil, err + } + + cache.Set(key, &al, cacheCommit(transactionID), transactionID) + return } func (ms *MapStorage) SetAlias(al *Alias, transactionID string) error { - ms.mu.Lock() - defer ms.mu.Unlock() + result, err := ms.ms.Marshal(al.Values) if err != nil { return err } key := utils.ALIASES_PREFIX + al.GetId() + ms.mu.Lock() + defer ms.mu.Unlock() ms.dict[key] = result cache.RemKey(key, cacheCommit(transactionID), transactionID) return nil @@ -833,15 +962,15 @@ func (ms *MapStorage) GetReverseAlias(reverseID string, skipCache bool, transact return nil, utils.ErrNotFound } } - var values []string + cCommit := cacheCommit(transactionID) if idMap, ok := ms.dict.smembers(key, ms.ms); len(idMap) > 0 && ok { - values = idMap.Slice() + ids = idMap.Slice() } else { cache.Set(key, nil, cCommit, transactionID) return nil, utils.ErrNotFound } - cache.Set(key, values, cCommit, transactionID) + cache.Set(key, ids, cCommit, transactionID) return } @@ -1023,6 +1152,8 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error } func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, transactionID string) (apIDs []string, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() key := utils.AccountActionPlansPrefix + acntID if !skipCache { if x, ok := cache.Get(key); ok { @@ -1032,9 +1163,7 @@ func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, trans return x.([]string), nil } } - ms.mu.RLock() values, ok := ms.dict[key] - ms.mu.RUnlock() if !ok { cache.Set(key, nil, cacheCommit(transactionID), transactionID) err = utils.ErrNotFound @@ -1049,8 +1178,7 @@ func (ms *MapStorage) GetAccountActionPlans(acntID string, skipCache bool, trans func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) { if !overwrite { - oldaPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { + if oldaPlIDs, err := ms.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { return err } else { for _, oldAPid := range oldaPlIDs { @@ -1060,7 +1188,6 @@ func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overw } } } - ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(apIDs) @@ -1068,12 +1195,11 @@ func (ms *MapStorage) SetAccountActionPlans(acntID string, apIDs []string, overw return err } ms.dict[utils.AccountActionPlansPrefix+acntID] = result + return } func (ms *MapStorage) RemAccountActionPlans(acntID string, apIDs []string) (err error) { - ms.mu.Lock() - defer ms.mu.Unlock() key := utils.AccountActionPlansPrefix + acntID if len(apIDs) == 0 { delete(ms.dict, key) @@ -1090,10 +1216,17 @@ func (ms *MapStorage) RemAccountActionPlans(acntID string, apIDs []string) (err } i++ } + ms.mu.Lock() + defer ms.mu.Unlock() if len(oldaPlIDs) == 0 { delete(ms.dict, key) return } + var result []byte + if result, err = ms.ms.Marshal(oldaPlIDs); err != nil { + return err + } + ms.dict[key] = result return } @@ -1273,7 +1406,8 @@ func (ms *MapStorage) SetResourceLimit(rl *ResourceLimit, transactionID string) if err != nil { return err } - ms.dict[utils.ResourceLimitsPrefix+rl.ID] = result + key := utils.ResourceLimitsPrefix + rl.ID + ms.dict[key] = result return nil } @@ -1310,9 +1444,10 @@ func (ms *MapStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[s return } func (ms *MapStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) { + cacheKey := dbKey + fieldValKey ms.mu.RLock() defer ms.mu.RUnlock() - if x, ok := cache.Get(dbKey + fieldValKey); ok { // Attempt to find in cache first + if x, ok := cache.Get(cacheKey); ok { // Attempt to find in cache first if x != nil { return x.(utils.StringMap), nil } @@ -1321,7 +1456,7 @@ func (ms *MapStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs ut // Not found in cache, check in DB values, ok := ms.dict[dbKey] if !ok { - cache.Set(dbKey+fieldValKey, nil, true, utils.NonTransactional) + cache.Set(cacheKey, nil, true, utils.NonTransactional) return nil, utils.ErrNotFound } var indexes map[string]map[string]utils.StringMap @@ -1332,7 +1467,12 @@ func (ms *MapStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs ut if _, hasIt := indexes[keySplt[0]]; hasIt { itemIDs = indexes[keySplt[0]][keySplt[1]] } - cache.Set(dbKey+fieldValKey, itemIDs, true, utils.NonTransactional) + //Verify items + if len(itemIDs) == 0 { + cache.Set(cacheKey, nil, true, utils.NonTransactional) + return nil, utils.ErrNotFound + } + cache.Set(cacheKey, itemIDs, true, utils.NonTransactional) return }