map drivers passing tests

This commit is contained in:
Radu Ioan Fericean
2016-08-11 16:17:17 +03:00
parent a10f0aaaa9
commit c3fa8b4450
3 changed files with 263 additions and 63 deletions

View File

@@ -32,13 +32,44 @@ import (
)
type MapStorage struct {
dict map[string][]byte
dict storage
tasks [][]byte
ms Marshaler
mu sync.RWMutex
cacheDumpDir string
}
type storage map[string][]byte
func (s storage) sadd(key, value string, ms Marshaler) {
idMap := utils.StringMap{}
if values, ok := s[key]; ok {
ms.Unmarshal(values, &idMap)
}
idMap[value] = true
values, _ := ms.Marshal(idMap)
s[key] = values
}
func (s storage) srem(key, value string, ms Marshaler) {
idMap := utils.StringMap{}
if values, ok := s[key]; ok {
ms.Unmarshal(values, &idMap)
}
delete(idMap, value)
values, _ := ms.Marshal(idMap)
s[key] = values
}
func (s storage) smembers(key string, ms Marshaler) (idMap utils.StringMap, ok bool) {
var values []byte
values, ok = s[key]
if ok {
ms.Unmarshal(values, &idMap)
}
return
}
func NewMapStorage() (*MapStorage, error) {
return &MapStorage{dict: make(map[string][]byte), ms: NewCodecMsgpackMarshaler(), cacheDumpDir: "/tmp/cgrates"}, nil
}
@@ -57,10 +88,57 @@ func (ms *MapStorage) Flush(ignore string) error {
}
func (ms *MapStorage) RebuildReverseForPrefix(prefix string) error {
// FIXME: should do transaction
keys, err := ms.GetKeysForPrefix(prefix)
if err != nil {
return err
}
for _, key := range keys {
ms.mu.Lock()
delete(ms.dict, key)
ms.mu.Unlock()
}
switch prefix {
case utils.REVERSE_DESTINATION_PREFIX:
keys, err = ms.GetKeysForPrefix(utils.DESTINATION_PREFIX)
if err != nil {
return err
}
for _, key := range keys {
dest, err := ms.GetDestination(key[len(utils.DESTINATION_PREFIX):], false)
if err != nil {
return err
}
if err := ms.SetReverseDestination(dest, false); err != nil {
return err
}
}
case utils.REVERSE_ALIASES_PREFIX:
keys, err = ms.GetKeysForPrefix(utils.ALIASES_PREFIX)
if err != nil {
return err
}
for _, key := range keys {
al, err := ms.GetAlias(key[len(utils.ALIASES_PREFIX):], false)
if err != nil {
return err
}
if err := ms.SetReverseAlias(al, false); err != nil {
return err
}
}
default:
return utils.ErrInvalidKey
}
return nil
}
func (ms *MapStorage) PreloadRatingCache() error {
err := ms.PreloadCacheForPrefix(utils.RATING_PLAN_PREFIX)
if err != nil {
return err
}
// add more prefixes if needed
return nil
}
@@ -69,6 +147,26 @@ func (ms *MapStorage) PreloadAccountingCache() error {
}
func (ms *MapStorage) PreloadCacheForPrefix(prefix string) error {
cache2go.BeginTransaction()
cache2go.RemPrefixKey(prefix)
keyList, err := ms.GetKeysForPrefix(prefix)
if err != nil {
cache2go.RollbackTransaction()
return err
}
switch prefix {
case utils.RATING_PLAN_PREFIX:
for _, key := range keyList {
_, err := ms.GetRatingPlan(key[len(utils.RATING_PLAN_PREFIX):], true)
if err != nil {
cache2go.RollbackTransaction()
return err
}
}
default:
return utils.ErrInvalidKey
}
cache2go.CommitTransaction()
return nil
}
@@ -294,21 +392,118 @@ func (ms *MapStorage) SetDestination(dest *Destination, cache bool) (err error)
}
func (ms *MapStorage) GetReverseDestination(prefix string, skipCache bool) (ids []string, err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
prefix = utils.REVERSE_DESTINATION_PREFIX + prefix
if !skipCache {
if x, ok := cache2go.Get(prefix); ok {
if x != nil {
return x.([]string), nil
}
return nil, utils.ErrNotFound
}
}
if idMap, ok := ms.dict.smembers(prefix, ms.ms); ok {
ids = idMap.Slice()
} else {
cache2go.Set(prefix, nil)
return nil, utils.ErrNotFound
}
cache2go.Set(prefix, ids)
return
}
func (ms *MapStorage) SetReverseDestination(dest *Destination, cache bool) (err error) {
for _, p := range dest.Prefixes {
key := utils.REVERSE_DESTINATION_PREFIX + p
ms.mu.Lock()
ms.dict.sadd(key, dest.Id, ms.ms)
ms.mu.Unlock()
if cache && err == nil {
_, err = ms.GetReverseDestination(p, true) // will recache
}
}
return
}
func (ms *MapStorage) RemoveDestination(destID string) (err error) {
key := utils.DESTINATION_PREFIX + destID
// get destination for prefix list
d, err := ms.GetDestination(destID, false)
if err != nil {
return
}
ms.mu.Lock()
delete(ms.dict, key)
ms.mu.Unlock()
cache2go.RemKey(key)
for _, prefix := range d.Prefixes {
ms.mu.Lock()
ms.dict.srem(utils.REVERSE_DESTINATION_PREFIX+prefix, destID, ms.ms)
ms.mu.Unlock()
ms.GetReverseDestination(prefix, true) // it will recache the destination
}
return
}
func (ms *MapStorage) UpdateReverseDestination(oldDest, newDest *Destination, cache bool) error {
return nil
//log.Printf("Old: %+v, New: %+v", oldDest, newDest)
var obsoletePrefixes []string
var addedPrefixes []string
var found bool
for _, oldPrefix := range oldDest.Prefixes {
found = false
for _, newPrefix := range newDest.Prefixes {
if oldPrefix == newPrefix {
found = true
break
}
}
if !found {
obsoletePrefixes = append(obsoletePrefixes, oldPrefix)
}
}
for _, newPrefix := range newDest.Prefixes {
found = false
for _, oldPrefix := range oldDest.Prefixes {
if newPrefix == oldPrefix {
found = true
break
}
}
if !found {
addedPrefixes = append(addedPrefixes, newPrefix)
}
}
//log.Print("Obsolete prefixes: ", obsoletePrefixes)
//log.Print("Added prefixes: ", addedPrefixes)
// remove id for all obsolete prefixes
var err error
for _, obsoletePrefix := range obsoletePrefixes {
ms.mu.Lock()
ms.dict.srem(utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix, oldDest.Id, ms.ms)
ms.mu.Unlock()
cache2go.RemKey(utils.REVERSE_DESTINATION_PREFIX + obsoletePrefix)
}
// add the id to all new prefixes
for _, addedPrefix := range addedPrefixes {
ms.mu.Lock()
ms.dict.sadd(utils.REVERSE_DESTINATION_PREFIX+addedPrefix, newDest.Id, ms.ms)
ms.mu.Unlock()
cache2go.RemKey(utils.REVERSE_DESTINATION_PREFIX + addedPrefix)
if cache {
ms.GetReverseDestination(addedPrefix, true) // will recache
}
}
return err
}
func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err error) {
@@ -563,25 +758,80 @@ func (ms *MapStorage) SetAlias(al *Alias, cache bool) error {
}
func (ms *MapStorage) GetReverseAlias(reverseID string, skipCache bool) (ids []string, err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
key := utils.REVERSE_ALIASES_PREFIX + reverseID
if !skipCache {
if x, ok := cache2go.Get(key); ok {
if x != nil {
return x.([]string), nil
}
return nil, utils.ErrNotFound
}
}
var values []string
if idMap, ok := ms.dict.smembers(key, ms.ms); len(idMap) > 0 && ok {
values = idMap.Slice()
} else {
cache2go.Set(key, nil)
return nil, utils.ErrNotFound
}
cache2go.Set(key, values)
return
}
func (ms *MapStorage) SetReverseAlias(al *Alias, cache bool) (err error) {
for _, value := range al.Values {
for target, pairs := range value.Pairs {
for _, alias := range pairs {
rKey := strings.Join([]string{utils.REVERSE_ALIASES_PREFIX, alias, target, al.Context}, "")
id := utils.ConcatenatedKey(al.GetId(), value.DestinationId)
ms.mu.Lock()
ms.dict.sadd(rKey, id, ms.ms)
ms.mu.Unlock()
if cache && err == nil {
ms.GetReverseAlias(rKey[len(utils.REVERSE_ALIASES_PREFIX):], true) // will recache
}
}
}
}
return
}
func (ms *MapStorage) RemoveAlias(key string) error {
// get alias for values list
al, err := ms.GetAlias(key, false)
if err != nil {
return err
}
ms.mu.Lock()
defer ms.mu.Unlock()
key = utils.ALIASES_PREFIX + key
aliasValues := make(AliasValues, 0)
if values, ok := ms.dict[key]; ok {
ms.ms.Unmarshal(values, &aliasValues)
}
delete(ms.dict, key)
cache2go.RemKey(key)
for _, value := range al.Values {
tmpKey := utils.ConcatenatedKey(al.GetId(), value.DestinationId)
for target, pairs := range value.Pairs {
for _, alias := range pairs {
rKey := utils.REVERSE_ALIASES_PREFIX + alias + target + al.Context
ms.dict.srem(rKey, tmpKey, ms.ms)
cache2go.RemKey(rKey)
/*_, err = rs.GetReverseAlias(rKey, true) // recache
if err != nil {
return err
}*/
}
}
}
return nil
}
@@ -763,13 +1013,14 @@ func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils
func (ms *MapStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers, cache bool) error {
ms.mu.Lock()
defer ms.mu.Unlock()
key = utils.DERIVEDCHARGERS_PREFIX + key
if dcs == nil || len(dcs.Chargers) == 0 {
delete(ms.dict, utils.DERIVEDCHARGERS_PREFIX+key)
cache2go.RemKey(utils.DERIVEDCHARGERS_PREFIX + key)
delete(ms.dict, key)
cache2go.RemKey(key)
return nil
}
result, err := ms.ms.Marshal(dcs)
ms.dict[utils.DERIVEDCHARGERS_PREFIX+key] = result
ms.dict[key] = result
if cache && err == nil {
cache2go.Set(key, dcs)
}

View File

@@ -440,10 +440,7 @@ func (rs *RedisStorage) RemoveDestination(destID string) (err error) {
if err != nil {
return err
}
_, err = rs.GetReverseDestination(prefix, true) // it will recache the destination
if err != nil {
return err
}
rs.GetReverseDestination(prefix, true) // it will recache the destination
}
return
}
@@ -487,14 +484,12 @@ func (rs *RedisStorage) UpdateReverseDestination(oldDest, newDest *Destination,
if err != nil {
return err
}
if cache {
cache2go.RemKey(utils.REVERSE_DESTINATION_PREFIX + obsoletePrefix)
}
cache2go.RemKey(utils.REVERSE_DESTINATION_PREFIX + obsoletePrefix)
}
// add the id to all new prefixes
for _, addedPrefix := range addedPrefixes {
err = rs.db.Cmd("SADD", utils.REVERSE_DESTINATION_PREFIX+addedPrefix, oldDest.Id).Err
err = rs.db.Cmd("SADD", utils.REVERSE_DESTINATION_PREFIX+addedPrefix, newDest.Id).Err
if err != nil {
return err
}
@@ -751,8 +746,7 @@ func (rs *RedisStorage) GetReverseAlias(reverseID string, skipCache bool) (ids [
}
}
var values []string
if values, err = rs.db.Cmd("SMEMBERS", key).List(); len(values) > 0 && err == nil {
} else {
if values, err = rs.db.Cmd("SMEMBERS", key).List(); len(values) == 0 || err != nil {
cache2go.Set(key, nil)
return nil, utils.ErrNotFound
}
@@ -771,7 +765,7 @@ func (rs *RedisStorage) SetReverseAlias(al *Alias, cache bool) (err error) {
break
}
if cache && err == nil {
_, err = rs.GetReverseAlias(rKey[len(utils.REVERSE_ALIASES_PREFIX):], true) // will recache
rs.GetReverseAlias(rKey[len(utils.REVERSE_ALIASES_PREFIX):], true) // will recache
}
}
}
@@ -814,52 +808,7 @@ func (rs *RedisStorage) RemoveAlias(id string) (err error) {
}
func (rs *RedisStorage) UpdateReverseAlias(oldAl, newAl *Alias) error {
/*var obsoleteDestinations []string
var addedDestinations []string
var found bool
for _, oldValue := range oldAl.Values {
found = false
for _, newPrefix := range newDest.Destinations {
if oldPrefix == newPrefix {
found = true
break
}
}
if !found {
obsoleteDestinations = append(obsoleteDestinations, oldPrefix)
}
}
for _, newPrefix := range newDest.Destinations {
found = false
for _, oldPrefix := range oldDest.Destinations {
if newPrefix == oldPrefix {
found = true
break
}
}
if !found {
addedDestinations = append(addedDestinations, newPrefix)
}
}
// remove id for all obsolete prefixes
var err error
for _, obsoletePrefix := range obsoleteDestinations {
err = rs.db.Cmd("SREM", utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix, oldDest.Id).Err
if err != nil {
return err
}
cache2go.RemKey(utils.REVERSE_DESTINATION_PREFIX + obsoletePrefix)
}
// add the id to all new prefixes
for _, addedPrefix := range addedDestinations {
err = rs.db.Cmd("SADD", utils.REVERSE_ALIASES_PREFIX, addedPrefix, oldDest.Id).Err
if err != nil {
return err
}
}*/
return nil
}

View File

@@ -128,7 +128,7 @@ func (sm StringMap) IsEmpty() bool {
}
func StringMapFromSlice(s []string) StringMap {
result := make(StringMap)
result := make(StringMap, len(s))
for _, v := range s {
v = strings.TrimSpace(v)
if v != "" {