diff --git a/engine/storage_map.go b/engine/storage_map.go index 44db82c39..bfc7f1b40 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -32,13 +32,44 @@ import ( ) type MapStorage struct { - dict map[string][]byte + dict storage tasks [][]byte ms Marshaler mu sync.RWMutex cacheDumpDir string } +type storage map[string][]byte + +func (s storage) sadd(key, value string, ms Marshaler) { + idMap := utils.StringMap{} + if values, ok := s[key]; ok { + ms.Unmarshal(values, &idMap) + } + idMap[value] = true + values, _ := ms.Marshal(idMap) + s[key] = values +} + +func (s storage) srem(key, value string, ms Marshaler) { + idMap := utils.StringMap{} + if values, ok := s[key]; ok { + ms.Unmarshal(values, &idMap) + } + delete(idMap, value) + values, _ := ms.Marshal(idMap) + s[key] = values +} + +func (s storage) smembers(key string, ms Marshaler) (idMap utils.StringMap, ok bool) { + var values []byte + values, ok = s[key] + if ok { + ms.Unmarshal(values, &idMap) + } + return +} + func NewMapStorage() (*MapStorage, error) { return &MapStorage{dict: make(map[string][]byte), ms: NewCodecMsgpackMarshaler(), cacheDumpDir: "/tmp/cgrates"}, nil } @@ -57,10 +88,57 @@ func (ms *MapStorage) Flush(ignore string) error { } func (ms *MapStorage) RebuildReverseForPrefix(prefix string) error { + // FIXME: should do transaction + keys, err := ms.GetKeysForPrefix(prefix) + if err != nil { + return err + } + for _, key := range keys { + ms.mu.Lock() + delete(ms.dict, key) + ms.mu.Unlock() + } + switch prefix { + case utils.REVERSE_DESTINATION_PREFIX: + keys, err = ms.GetKeysForPrefix(utils.DESTINATION_PREFIX) + if err != nil { + return err + } + for _, key := range keys { + dest, err := ms.GetDestination(key[len(utils.DESTINATION_PREFIX):], false) + if err != nil { + return err + } + if err := ms.SetReverseDestination(dest, false); err != nil { + return err + } + } + case utils.REVERSE_ALIASES_PREFIX: + keys, err = ms.GetKeysForPrefix(utils.ALIASES_PREFIX) + if err != nil { + return err + } + for _, key := range keys { + al, err := ms.GetAlias(key[len(utils.ALIASES_PREFIX):], false) + if err != nil { + return err + } + if err := ms.SetReverseAlias(al, false); err != nil { + return err + } + } + default: + return utils.ErrInvalidKey + } return nil } func (ms *MapStorage) PreloadRatingCache() error { + err := ms.PreloadCacheForPrefix(utils.RATING_PLAN_PREFIX) + if err != nil { + return err + } + // add more prefixes if needed return nil } @@ -69,6 +147,26 @@ func (ms *MapStorage) PreloadAccountingCache() error { } func (ms *MapStorage) PreloadCacheForPrefix(prefix string) error { + cache2go.BeginTransaction() + cache2go.RemPrefixKey(prefix) + keyList, err := ms.GetKeysForPrefix(prefix) + if err != nil { + cache2go.RollbackTransaction() + return err + } + switch prefix { + case utils.RATING_PLAN_PREFIX: + for _, key := range keyList { + _, err := ms.GetRatingPlan(key[len(utils.RATING_PLAN_PREFIX):], true) + if err != nil { + cache2go.RollbackTransaction() + return err + } + } + default: + return utils.ErrInvalidKey + } + cache2go.CommitTransaction() return nil } @@ -294,21 +392,118 @@ func (ms *MapStorage) SetDestination(dest *Destination, cache bool) (err error) } func (ms *MapStorage) GetReverseDestination(prefix string, skipCache bool) (ids []string, err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + prefix = utils.REVERSE_DESTINATION_PREFIX + prefix + if !skipCache { + if x, ok := cache2go.Get(prefix); ok { + if x != nil { + return x.([]string), nil + } + return nil, utils.ErrNotFound + } + } + + if idMap, ok := ms.dict.smembers(prefix, ms.ms); ok { + ids = idMap.Slice() + } else { + cache2go.Set(prefix, nil) + return nil, utils.ErrNotFound + } + + cache2go.Set(prefix, ids) return } func (ms *MapStorage) SetReverseDestination(dest *Destination, cache bool) (err error) { + for _, p := range dest.Prefixes { + key := utils.REVERSE_DESTINATION_PREFIX + p + ms.mu.Lock() + ms.dict.sadd(key, dest.Id, ms.ms) + ms.mu.Unlock() + + if cache && err == nil { + _, err = ms.GetReverseDestination(p, true) // will recache + } + } return } func (ms *MapStorage) RemoveDestination(destID string) (err error) { + key := utils.DESTINATION_PREFIX + destID + // get destination for prefix list + d, err := ms.GetDestination(destID, false) + if err != nil { + return + } + ms.mu.Lock() + delete(ms.dict, key) + ms.mu.Unlock() + cache2go.RemKey(key) + for _, prefix := range d.Prefixes { + ms.mu.Lock() + ms.dict.srem(utils.REVERSE_DESTINATION_PREFIX+prefix, destID, ms.ms) + ms.mu.Unlock() + + ms.GetReverseDestination(prefix, true) // it will recache the destination + } return } func (ms *MapStorage) UpdateReverseDestination(oldDest, newDest *Destination, cache bool) error { - return nil + //log.Printf("Old: %+v, New: %+v", oldDest, newDest) + var obsoletePrefixes []string + var addedPrefixes []string + var found bool + for _, oldPrefix := range oldDest.Prefixes { + found = false + for _, newPrefix := range newDest.Prefixes { + if oldPrefix == newPrefix { + found = true + break + } + } + if !found { + obsoletePrefixes = append(obsoletePrefixes, oldPrefix) + } + } + + for _, newPrefix := range newDest.Prefixes { + found = false + for _, oldPrefix := range oldDest.Prefixes { + if newPrefix == oldPrefix { + found = true + break + } + } + if !found { + addedPrefixes = append(addedPrefixes, newPrefix) + } + } + //log.Print("Obsolete prefixes: ", obsoletePrefixes) + //log.Print("Added prefixes: ", addedPrefixes) + // remove id for all obsolete prefixes + var err error + for _, obsoletePrefix := range obsoletePrefixes { + ms.mu.Lock() + ms.dict.srem(utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix, oldDest.Id, ms.ms) + ms.mu.Unlock() + cache2go.RemKey(utils.REVERSE_DESTINATION_PREFIX + obsoletePrefix) + } + + // add the id to all new prefixes + for _, addedPrefix := range addedPrefixes { + ms.mu.Lock() + ms.dict.sadd(utils.REVERSE_DESTINATION_PREFIX+addedPrefix, newDest.Id, ms.ms) + ms.mu.Unlock() + cache2go.RemKey(utils.REVERSE_DESTINATION_PREFIX + addedPrefix) + if cache { + ms.GetReverseDestination(addedPrefix, true) // will recache + } + } + return err } func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err error) { @@ -563,25 +758,80 @@ func (ms *MapStorage) SetAlias(al *Alias, cache bool) error { } func (ms *MapStorage) GetReverseAlias(reverseID string, skipCache bool) (ids []string, err error) { - + ms.mu.Lock() + defer ms.mu.Unlock() + key := utils.REVERSE_ALIASES_PREFIX + reverseID + if !skipCache { + if x, ok := cache2go.Get(key); ok { + if x != nil { + return x.([]string), nil + } + return nil, utils.ErrNotFound + } + } + var values []string + if idMap, ok := ms.dict.smembers(key, ms.ms); len(idMap) > 0 && ok { + values = idMap.Slice() + } else { + cache2go.Set(key, nil) + return nil, utils.ErrNotFound + } + cache2go.Set(key, values) return + } func (ms *MapStorage) SetReverseAlias(al *Alias, cache bool) (err error) { + for _, value := range al.Values { + for target, pairs := range value.Pairs { + for _, alias := range pairs { + rKey := strings.Join([]string{utils.REVERSE_ALIASES_PREFIX, alias, target, al.Context}, "") + id := utils.ConcatenatedKey(al.GetId(), value.DestinationId) + ms.mu.Lock() + ms.dict.sadd(rKey, id, ms.ms) + ms.mu.Unlock() + if cache && err == nil { + ms.GetReverseAlias(rKey[len(utils.REVERSE_ALIASES_PREFIX):], true) // will recache + } + } + } + } return } func (ms *MapStorage) RemoveAlias(key string) error { + // get alias for values list + al, err := ms.GetAlias(key, false) + if err != nil { + return err + } + ms.mu.Lock() defer ms.mu.Unlock() key = utils.ALIASES_PREFIX + key + aliasValues := make(AliasValues, 0) if values, ok := ms.dict[key]; ok { ms.ms.Unmarshal(values, &aliasValues) } delete(ms.dict, key) cache2go.RemKey(key) + for _, value := range al.Values { + tmpKey := utils.ConcatenatedKey(al.GetId(), value.DestinationId) + for target, pairs := range value.Pairs { + for _, alias := range pairs { + rKey := utils.REVERSE_ALIASES_PREFIX + alias + target + al.Context + ms.dict.srem(rKey, tmpKey, ms.ms) + + cache2go.RemKey(rKey) + /*_, err = rs.GetReverseAlias(rKey, true) // recache + if err != nil { + return err + }*/ + } + } + } return nil } @@ -763,13 +1013,14 @@ func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils func (ms *MapStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers, cache bool) error { ms.mu.Lock() defer ms.mu.Unlock() + key = utils.DERIVEDCHARGERS_PREFIX + key if dcs == nil || len(dcs.Chargers) == 0 { - delete(ms.dict, utils.DERIVEDCHARGERS_PREFIX+key) - cache2go.RemKey(utils.DERIVEDCHARGERS_PREFIX + key) + delete(ms.dict, key) + cache2go.RemKey(key) return nil } result, err := ms.ms.Marshal(dcs) - ms.dict[utils.DERIVEDCHARGERS_PREFIX+key] = result + ms.dict[key] = result if cache && err == nil { cache2go.Set(key, dcs) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 3feed8a50..ae76d291f 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -440,10 +440,7 @@ func (rs *RedisStorage) RemoveDestination(destID string) (err error) { if err != nil { return err } - _, err = rs.GetReverseDestination(prefix, true) // it will recache the destination - if err != nil { - return err - } + rs.GetReverseDestination(prefix, true) // it will recache the destination } return } @@ -487,14 +484,12 @@ func (rs *RedisStorage) UpdateReverseDestination(oldDest, newDest *Destination, if err != nil { return err } - if cache { - cache2go.RemKey(utils.REVERSE_DESTINATION_PREFIX + obsoletePrefix) - } + cache2go.RemKey(utils.REVERSE_DESTINATION_PREFIX + obsoletePrefix) } // add the id to all new prefixes for _, addedPrefix := range addedPrefixes { - err = rs.db.Cmd("SADD", utils.REVERSE_DESTINATION_PREFIX+addedPrefix, oldDest.Id).Err + err = rs.db.Cmd("SADD", utils.REVERSE_DESTINATION_PREFIX+addedPrefix, newDest.Id).Err if err != nil { return err } @@ -751,8 +746,7 @@ func (rs *RedisStorage) GetReverseAlias(reverseID string, skipCache bool) (ids [ } } var values []string - if values, err = rs.db.Cmd("SMEMBERS", key).List(); len(values) > 0 && err == nil { - } else { + if values, err = rs.db.Cmd("SMEMBERS", key).List(); len(values) == 0 || err != nil { cache2go.Set(key, nil) return nil, utils.ErrNotFound } @@ -771,7 +765,7 @@ func (rs *RedisStorage) SetReverseAlias(al *Alias, cache bool) (err error) { break } if cache && err == nil { - _, err = rs.GetReverseAlias(rKey[len(utils.REVERSE_ALIASES_PREFIX):], true) // will recache + rs.GetReverseAlias(rKey[len(utils.REVERSE_ALIASES_PREFIX):], true) // will recache } } } @@ -814,52 +808,7 @@ func (rs *RedisStorage) RemoveAlias(id string) (err error) { } func (rs *RedisStorage) UpdateReverseAlias(oldAl, newAl *Alias) error { - /*var obsoleteDestinations []string - var addedDestinations []string - var found bool - for _, oldValue := range oldAl.Values { - found = false - for _, newPrefix := range newDest.Destinations { - if oldPrefix == newPrefix { - found = true - break - } - } - if !found { - obsoleteDestinations = append(obsoleteDestinations, oldPrefix) - } - } - for _, newPrefix := range newDest.Destinations { - found = false - for _, oldPrefix := range oldDest.Destinations { - if newPrefix == oldPrefix { - found = true - break - } - } - if !found { - addedDestinations = append(addedDestinations, newPrefix) - } - } - - // remove id for all obsolete prefixes - var err error - for _, obsoletePrefix := range obsoleteDestinations { - err = rs.db.Cmd("SREM", utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix, oldDest.Id).Err - if err != nil { - return err - } - cache2go.RemKey(utils.REVERSE_DESTINATION_PREFIX + obsoletePrefix) - } - - // add the id to all new prefixes - for _, addedPrefix := range addedDestinations { - err = rs.db.Cmd("SADD", utils.REVERSE_ALIASES_PREFIX, addedPrefix, oldDest.Id).Err - if err != nil { - return err - } - }*/ return nil } diff --git a/utils/map.go b/utils/map.go index f9f63df76..7f68897f7 100644 --- a/utils/map.go +++ b/utils/map.go @@ -128,7 +128,7 @@ func (sm StringMap) IsEmpty() bool { } func StringMapFromSlice(s []string) StringMap { - result := make(StringMap) + result := make(StringMap, len(s)) for _, v := range s { v = strings.TrimSpace(v) if v != "" {