Merge pull request #551 from eloycoto/redis-client

Redis: Added method to reconnect on redis if any IOErr reply. Fix #543
This commit is contained in:
Dan Christian Bogos
2016-10-04 13:39:37 +02:00
committed by GitHub
2 changed files with 164 additions and 129 deletions

View File

@@ -22,14 +22,13 @@ import (
"compress/zlib"
"errors"
"fmt"
"io/ioutil"
"strings"
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/mediocregopher/radix.v2/pool"
"github.com/mediocregopher/radix.v2/redis"
"io/ioutil"
"strings"
)
var (
@@ -41,32 +40,36 @@ type RedisStorage struct {
ms Marshaler
cacheCfg *config.CacheConfig
loadHistorySize int
poolConfig PoolConnectionConfig
}
type PoolConnectionConfig struct {
address string
password string
db int
mrshlerStr string
maxConns int
cacheCfg *config.CacheConfig
loadHistorySize int
}
func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheCfg *config.CacheConfig, loadHistorySize int) (*RedisStorage, error) {
df := func(network, addr string) (*redis.Client, error) {
client, err := redis.Dial(network, addr)
if err != nil {
return nil, err
}
if len(pass) != 0 {
if err = client.Cmd("AUTH", pass).Err; err != nil {
client.Close()
return nil, err
}
}
if db != 0 {
if err = client.Cmd("SELECT", db).Err; err != nil {
client.Close()
return nil, err
}
}
return client, nil
connectionConf := PoolConnectionConfig{
address: address,
db: db,
password: pass,
mrshlerStr: mrshlerStr,
maxConns: maxConns,
cacheCfg: cacheCfg,
loadHistorySize: loadHistorySize,
}
p, err := pool.NewCustom("tcp", address, maxConns, df)
pool, err := connectionConf.PoolCreation(maxConns)
if err != nil {
return nil, err
}
var mrshler Marshaler
if mrshlerStr == utils.MSGPACK {
mrshler = NewCodecMsgpackMarshaler()
@@ -75,7 +78,74 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns i
} else {
return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
}
return &RedisStorage{db: p, ms: mrshler, cacheCfg: cacheCfg, loadHistorySize: loadHistorySize}, nil
result := &RedisStorage{
db: pool,
ms: mrshler,
cacheCfg: cacheCfg,
poolConfig: connectionConf,
loadHistorySize: loadHistorySize}
return result, nil
}
func (config *PoolConnectionConfig) DialFunction(network string, addr string) (*redis.Client, error) {
client, err := redis.Dial(network, addr)
if err != nil {
return nil, err
}
if len(config.password) != 0 {
if err = client.Cmd("AUTH", config.password).Err; err != nil {
client.Close()
return nil, err
}
}
if config.db != 0 {
if err = client.Cmd("SELECT", config.db).Err; err != nil {
client.Close()
return nil, err
}
}
return client, nil
}
func (config *PoolConnectionConfig) PoolCreation(maxConns int) (*pool.Pool, error) {
p, err := pool.NewCustom("tcp", config.address, maxConns, config.DialFunction)
if err != nil {
return nil, err
}
return p, nil
}
func (rs *RedisStorage) IncreasePool() error {
number_of_pools := (rs.poolConfig.maxConns - rs.db.Avail())
utils.Logger.Warning(fmt.Sprintf("RedisClient increase pool size in %d (Current size is %d)",
number_of_pools, rs.db.Avail()))
for i := 0; i < number_of_pools; i++ {
conn, err := rs.db.Get()
if err != nil {
return err
}
rs.db.Put(conn)
}
return nil
}
// This CMD function get a connection from the pool. Send the command and if
// the problem is a redis.IOError didn't add the connection in the pool and
// increase the pool with a new connection. Good for timeouts
func (rs *RedisStorage) Cmd(cmd string, args ...interface{}) *redis.Resp {
c, err := rs.db.Get()
if err != nil {
return redis.NewResp(err)
}
result := c.Cmd(cmd, args...)
if result.IsType(redis.IOErr) {
rs.IncreasePool()
utils.Logger.Warning(fmt.Sprintf("RedisClient error '%s'", result.String()))
return rs.Cmd(cmd, args...)
}
rs.db.Put(c)
return result
}
func (rs *RedisStorage) Close() {
@@ -83,7 +153,7 @@ func (rs *RedisStorage) Close() {
}
func (rs *RedisStorage) Flush(ignore string) error {
return rs.db.Cmd("FLUSHDB").Err
return rs.Cmd("FLUSHDB").Err
}
func (rs *RedisStorage) PreloadRatingCache() error {
@@ -198,17 +268,12 @@ func (rs *RedisStorage) PreloadCacheForPrefix(prefix string) error {
}
func (rs *RedisStorage) RebuildReverseForPrefix(prefix string) error {
conn, err := rs.db.Get()
if err != nil {
return err
}
defer rs.db.Put(conn)
keys, err := rs.GetKeysForPrefix(prefix)
if err != nil {
return err
}
for _, key := range keys {
err = conn.Cmd("DEL", key).Err
err = rs.Cmd("DEL", key).Err
if err != nil {
return err
}
@@ -249,7 +314,7 @@ func (rs *RedisStorage) RebuildReverseForPrefix(prefix string) error {
}
func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) {
r := rs.db.Cmd("KEYS", prefix+"*")
r := rs.Cmd("KEYS", prefix+"*")
if r.Err != nil {
return nil, r.Err
}
@@ -260,7 +325,7 @@ func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) {
func (rs *RedisStorage) HasData(category, subject string) (bool, error) {
switch category {
case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX:
i, err := rs.db.Cmd("EXISTS", category+subject).Int()
i, err := rs.Cmd("EXISTS", category+subject).Int()
return i == 1, err
}
return false, errors.New("unsupported HasData category")
@@ -277,7 +342,7 @@ func (rs *RedisStorage) GetRatingPlan(key string, skipCache bool, transactionID
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
b := bytes.NewBuffer(values)
r, err := zlib.NewReader(b)
if err != nil {
@@ -301,7 +366,7 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan, transactionID string) (err
w := zlib.NewWriter(&b)
w.Write(result)
w.Close()
err = rs.db.Cmd("SET", utils.RATING_PLAN_PREFIX+rp.Id, b.Bytes()).Err
err = rs.Cmd("SET", utils.RATING_PLAN_PREFIX+rp.Id, b.Bytes()).Err
if err == nil && historyScribe != nil {
response := 0
go historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(), &response)
@@ -322,7 +387,7 @@ func (rs *RedisStorage) GetRatingProfile(key string, skipCache bool, transaction
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
rpf = new(RatingProfile)
err = rs.ms.Unmarshal(values, rpf)
}
@@ -332,7 +397,7 @@ func (rs *RedisStorage) GetRatingProfile(key string, skipCache bool, transaction
func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile, transactionID string) (err error) {
result, err := rs.ms.Marshal(rpf)
err = rs.db.Cmd("SET", utils.RATING_PROFILE_PREFIX+rpf.Id, result).Err
err = rs.Cmd("SET", utils.RATING_PROFILE_PREFIX+rpf.Id, result).Err
if err == nil && historyScribe != nil {
response := 0
go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(false), &response)
@@ -342,17 +407,12 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile, transactionID strin
}
func (rs *RedisStorage) RemoveRatingProfile(key string, transactionID string) error {
conn, err := rs.db.Get()
if err != nil {
return err
}
defer rs.db.Put(conn)
keys, err := conn.Cmd("KEYS", utils.RATING_PROFILE_PREFIX+key+"*").List()
keys, err := rs.Cmd("KEYS", utils.RATING_PROFILE_PREFIX+key+"*").List()
if err != nil {
return err
}
for _, key := range keys {
if err = conn.Cmd("DEL", key).Err; err != nil {
if err = rs.Cmd("DEL", key).Err; err != nil {
return err
}
cache2go.RemKey(key, cacheCommit(transactionID), transactionID)
@@ -377,7 +437,7 @@ func (rs *RedisStorage) GetLCR(key string, skipCache bool, transactionID string)
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
err = rs.ms.Unmarshal(values, &lcr)
} else {
cache2go.Set(key, nil, cacheCommit(transactionID), transactionID)
@@ -390,7 +450,7 @@ func (rs *RedisStorage) GetLCR(key string, skipCache bool, transactionID string)
func (rs *RedisStorage) SetLCR(lcr *LCR, transactionID string) (err error) {
result, err := rs.ms.Marshal(lcr)
key := utils.LCR_PREFIX + lcr.GetId()
err = rs.db.Cmd("SET", key, result).Err
err = rs.Cmd("SET", key, result).Err
cache2go.RemKey(key, cacheCommit(transactionID), transactionID)
return
}
@@ -406,7 +466,7 @@ func (rs *RedisStorage) GetDestination(key string, skipCache bool, transactionID
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); len(values) > 0 && err == nil {
if values, err = rs.Cmd("GET", key).Bytes(); len(values) > 0 && err == nil {
b := bytes.NewBuffer(values)
r, err := zlib.NewReader(b)
if err != nil {
@@ -439,7 +499,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination, transactionID string)
w.Write(result)
w.Close()
key := utils.DESTINATION_PREFIX + dest.Id
err = rs.db.Cmd("SET", key, b.Bytes()).Err
err = rs.Cmd("SET", key, b.Bytes()).Err
if err == nil && historyScribe != nil {
response := 0
go historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response)
@@ -458,7 +518,7 @@ func (rs *RedisStorage) GetReverseDestination(prefix string, skipCache bool, tra
return nil, utils.ErrNotFound
}
}
if ids, err = rs.db.Cmd("SMEMBERS", prefix).List(); len(ids) > 0 && err == nil {
if ids, err = rs.Cmd("SMEMBERS", prefix).List(); len(ids) > 0 && err == nil {
cache2go.Set(prefix, ids, cacheCommit(transactionID), transactionID)
return ids, nil
}
@@ -468,7 +528,7 @@ func (rs *RedisStorage) GetReverseDestination(prefix string, skipCache bool, tra
func (rs *RedisStorage) SetReverseDestination(dest *Destination, transactionID string) (err error) {
for _, p := range dest.Prefixes {
key := utils.REVERSE_DESTINATION_PREFIX + p
err = rs.db.Cmd("SADD", key, dest.Id).Err
err = rs.Cmd("SADD", key, dest.Id).Err
if err != nil {
break
}
@@ -484,13 +544,13 @@ func (rs *RedisStorage) RemoveDestination(destID, transactionID string) (err err
if err != nil {
return
}
err = rs.db.Cmd("DEL", key).Err
err = rs.Cmd("DEL", key).Err
if err != nil {
return err
}
cache2go.RemKey(key, cacheCommit(transactionID), transactionID)
for _, prefix := range d.Prefixes {
err = rs.db.Cmd("SREM", utils.REVERSE_DESTINATION_PREFIX+prefix, destID).Err
err = rs.Cmd("SREM", utils.REVERSE_DESTINATION_PREFIX+prefix, destID).Err
if err != nil {
return err
}
@@ -535,7 +595,7 @@ func (rs *RedisStorage) UpdateReverseDestination(oldDest, newDest *Destination,
cCommit := cacheCommit(transactionID)
var err error
for _, obsoletePrefix := range obsoletePrefixes {
err = rs.db.Cmd("SREM", utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix, oldDest.Id).Err
err = rs.Cmd("SREM", utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix, oldDest.Id).Err
if err != nil {
return err
}
@@ -544,7 +604,7 @@ func (rs *RedisStorage) UpdateReverseDestination(oldDest, newDest *Destination,
// add the id to all new prefixes
for _, addedPrefix := range addedPrefixes {
err = rs.db.Cmd("SADD", utils.REVERSE_DESTINATION_PREFIX+addedPrefix, newDest.Id).Err
err = rs.Cmd("SADD", utils.REVERSE_DESTINATION_PREFIX+addedPrefix, newDest.Id).Err
if err != nil {
return err
}
@@ -564,7 +624,7 @@ func (rs *RedisStorage) GetActions(key string, skipCache bool, transactionID str
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
err = rs.ms.Unmarshal(values, &as)
}
cache2go.Set(key, as, cacheCommit(transactionID), transactionID)
@@ -573,13 +633,13 @@ func (rs *RedisStorage) GetActions(key string, skipCache bool, transactionID str
func (rs *RedisStorage) SetActions(key string, as Actions, transactionID string) (err error) {
result, err := rs.ms.Marshal(&as)
err = rs.db.Cmd("SET", utils.ACTION_PREFIX+key, result).Err
err = rs.Cmd("SET", utils.ACTION_PREFIX+key, result).Err
cache2go.RemKey(utils.ACTION_PREFIX+key, cacheCommit(transactionID), transactionID)
return
}
func (rs *RedisStorage) RemoveActions(key string, transactionID string) (err error) {
err = rs.db.Cmd("DEL", utils.ACTION_PREFIX+key).Err
err = rs.Cmd("DEL", utils.ACTION_PREFIX+key).Err
cache2go.RemKey(utils.ACTION_PREFIX+key, cacheCommit(transactionID), transactionID)
return
}
@@ -595,7 +655,7 @@ func (rs *RedisStorage) GetSharedGroup(key string, skipCache bool, transactionID
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
err = rs.ms.Unmarshal(values, &sg)
}
cache2go.Set(key, sg, cacheCommit(transactionID), transactionID)
@@ -604,13 +664,13 @@ func (rs *RedisStorage) GetSharedGroup(key string, skipCache bool, transactionID
func (rs *RedisStorage) SetSharedGroup(sg *SharedGroup, transactionID string) (err error) {
result, err := rs.ms.Marshal(sg)
err = rs.db.Cmd("SET", utils.SHARED_GROUP_PREFIX+sg.Id, result).Err
err = rs.Cmd("SET", utils.SHARED_GROUP_PREFIX+sg.Id, result).Err
cache2go.RemKey(utils.SHARED_GROUP_PREFIX+sg.Id, cacheCommit(transactionID), transactionID)
return
}
func (rs *RedisStorage) GetAccount(key string) (*Account, error) {
rpl := rs.db.Cmd("GET", utils.ACCOUNT_PREFIX+key)
rpl := rs.Cmd("GET", utils.ACCOUNT_PREFIX+key)
if rpl.Err != nil {
return nil, rpl.Err
} else if rpl.IsType(redis.Nil) {
@@ -641,18 +701,18 @@ func (rs *RedisStorage) SetAccount(ub *Account) (err error) {
}
}
result, err := rs.ms.Marshal(ub)
err = rs.db.Cmd("SET", utils.ACCOUNT_PREFIX+ub.ID, result).Err
err = rs.Cmd("SET", utils.ACCOUNT_PREFIX+ub.ID, result).Err
return
}
func (rs *RedisStorage) RemoveAccount(key string) (err error) {
return rs.db.Cmd("DEL", utils.ACCOUNT_PREFIX+key).Err
return rs.Cmd("DEL", utils.ACCOUNT_PREFIX+key).Err
}
func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) {
var values []byte
if values, err = rs.db.Cmd("GET", utils.CDR_STATS_QUEUE_PREFIX+key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", utils.CDR_STATS_QUEUE_PREFIX+key).Bytes(); err == nil {
sq = &StatsQueue{}
err = rs.ms.Unmarshal(values, &sq)
}
@@ -661,23 +721,18 @@ func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error)
func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
result, err := rs.ms.Marshal(sq)
err = rs.db.Cmd("SET", utils.CDR_STATS_QUEUE_PREFIX+sq.GetId(), result).Err
err = rs.Cmd("SET", utils.CDR_STATS_QUEUE_PREFIX+sq.GetId(), result).Err
return
}
func (rs *RedisStorage) GetSubscribers() (result map[string]*SubscriberData, err error) {
conn, err := rs.db.Get()
if err != nil {
return nil, err
}
defer rs.db.Put(conn)
keys, err := conn.Cmd("KEYS", utils.PUBSUB_SUBSCRIBERS_PREFIX+"*").List()
keys, err := rs.Cmd("KEYS", utils.PUBSUB_SUBSCRIBERS_PREFIX+"*").List()
if err != nil {
return nil, err
}
result = make(map[string]*SubscriberData)
for _, key := range keys {
if values, err := conn.Cmd("GET", key).Bytes(); err == nil {
if values, err := rs.Cmd("GET", key).Bytes(); err == nil {
sub := &SubscriberData{}
err = rs.ms.Unmarshal(values, sub)
result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = sub
@@ -693,11 +748,11 @@ func (rs *RedisStorage) SetSubscriber(key string, sub *SubscriberData) (err erro
if err != nil {
return err
}
return rs.db.Cmd("SET", utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result).Err
return rs.Cmd("SET", utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result).Err
}
func (rs *RedisStorage) RemoveSubscriber(key string) (err error) {
err = rs.db.Cmd("DEL", utils.PUBSUB_SUBSCRIBERS_PREFIX+key).Err
err = rs.Cmd("DEL", utils.PUBSUB_SUBSCRIBERS_PREFIX+key).Err
return
}
@@ -706,12 +761,12 @@ func (rs *RedisStorage) SetUser(up *UserProfile) (err error) {
if err != nil {
return err
}
return rs.db.Cmd("SET", utils.USERS_PREFIX+up.GetId(), result).Err
return rs.Cmd("SET", utils.USERS_PREFIX+up.GetId(), result).Err
}
func (rs *RedisStorage) GetUser(key string) (up *UserProfile, err error) {
var values []byte
if values, err = rs.db.Cmd("GET", utils.USERS_PREFIX+key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", utils.USERS_PREFIX+key).Bytes(); err == nil {
up = &UserProfile{}
err = rs.ms.Unmarshal(values, &up)
}
@@ -719,17 +774,12 @@ func (rs *RedisStorage) GetUser(key string) (up *UserProfile, err error) {
}
func (rs *RedisStorage) GetUsers() (result []*UserProfile, err error) {
conn, err := rs.db.Get()
if err != nil {
return nil, err
}
defer rs.db.Put(conn)
keys, err := conn.Cmd("KEYS", utils.USERS_PREFIX+"*").List()
keys, err := rs.Cmd("KEYS", utils.USERS_PREFIX+"*").List()
if err != nil {
return nil, err
}
for _, key := range keys {
if values, err := conn.Cmd("GET", key).Bytes(); err == nil {
if values, err := rs.Cmd("GET", key).Bytes(); err == nil {
up := &UserProfile{}
err = rs.ms.Unmarshal(values, up)
result = append(result, up)
@@ -741,7 +791,7 @@ func (rs *RedisStorage) GetUsers() (result []*UserProfile, err error) {
}
func (rs *RedisStorage) RemoveUser(key string) (err error) {
return rs.db.Cmd("DEL", utils.USERS_PREFIX+key).Err
return rs.Cmd("DEL", utils.USERS_PREFIX+key).Err
}
func (rs *RedisStorage) GetAlias(key string, skipCache bool, transactionID string) (al *Alias, err error) {
@@ -759,7 +809,7 @@ func (rs *RedisStorage) GetAlias(key string, skipCache bool, transactionID strin
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
al = &Alias{Values: make(AliasValues, 0)}
al.SetId(origKey)
err = rs.ms.Unmarshal(values, &al.Values)
@@ -777,7 +827,7 @@ func (rs *RedisStorage) SetAlias(al *Alias, transactionID string) (err error) {
return err
}
key := utils.ALIASES_PREFIX + al.GetId()
err = rs.db.Cmd("SET", key, result).Err
err = rs.Cmd("SET", key, result).Err
cache2go.RemKey(key, cacheCommit(transactionID), transactionID)
return
}
@@ -792,7 +842,7 @@ func (rs *RedisStorage) GetReverseAlias(reverseID string, skipCache bool, transa
return nil, utils.ErrNotFound
}
}
if ids, err = rs.db.Cmd("SMEMBERS", key).List(); len(ids) == 0 || err != nil {
if ids, err = rs.Cmd("SMEMBERS", key).List(); len(ids) == 0 || err != nil {
cache2go.Set(key, nil, cacheCommit(transactionID), transactionID)
return nil, utils.ErrNotFound
}
@@ -807,7 +857,7 @@ func (rs *RedisStorage) SetReverseAlias(al *Alias, transactionID string) (err er
for _, alias := range pairs {
rKey := strings.Join([]string{utils.REVERSE_ALIASES_PREFIX, alias, target, al.Context}, "")
id := utils.ConcatenatedKey(al.GetId(), value.DestinationId)
err = rs.db.Cmd("SADD", rKey, id).Err
err = rs.Cmd("SADD", rKey, id).Err
if err != nil {
break
}
@@ -826,7 +876,7 @@ func (rs *RedisStorage) RemoveAlias(id string, transactionID string) (err error)
if err != nil {
return
}
err = rs.db.Cmd("DEL", key).Err
err = rs.Cmd("DEL", key).Err
if err != nil {
return err
}
@@ -838,7 +888,7 @@ func (rs *RedisStorage) RemoveAlias(id string, transactionID string) (err error)
for target, pairs := range value.Pairs {
for _, alias := range pairs {
rKey := utils.REVERSE_ALIASES_PREFIX + alias + target + al.Context
err = rs.db.Cmd("SREM", rKey, tmpKey).Err
err = rs.Cmd("SREM", rKey, tmpKey).Err
if err != nil {
return err
}
@@ -881,7 +931,7 @@ func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool, transactionID
if limit != -1 {
limit -= -1 // Decrease limit to match redis approach on lrange
}
marshaleds, err := rs.db.Cmd("LRANGE", utils.LOADINST_KEY, 0, limit).ListBytes()
marshaleds, err := rs.Cmd("LRANGE", utils.LOADINST_KEY, 0, limit).ListBytes()
cCommit := cacheCommit(transactionID)
if err != nil {
cache2go.Set(utils.LOADINST_KEY, nil, cCommit, transactionID)
@@ -906,11 +956,6 @@ func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool, transactionID
// Adds a single load instance to load history
func (rs *RedisStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int, transactionID string) error {
conn, err := rs.db.Get()
if err != nil {
return err
}
defer rs.db.Put(conn)
if loadHistSize == 0 { // Load history disabled
return nil
}
@@ -919,16 +964,16 @@ func (rs *RedisStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize
return err
}
_, err = Guardian.Guard(func() (interface{}, error) { // Make sure we do it locked since other instance can modify history while we read it
histLen, err := conn.Cmd("LLEN", utils.LOADINST_KEY).Int()
histLen, err := rs.Cmd("LLEN", utils.LOADINST_KEY).Int()
if err != nil {
return nil, err
}
if histLen >= loadHistSize { // Have hit maximum history allowed, remove oldest element in order to add new one
if err := conn.Cmd("RPOP", utils.LOADINST_KEY).Err; err != nil {
if err := rs.Cmd("RPOP", utils.LOADINST_KEY).Err; err != nil {
return nil, err
}
}
err = conn.Cmd("LPUSH", utils.LOADINST_KEY, marshaled).Err
err = rs.Cmd("LPUSH", utils.LOADINST_KEY, marshaled).Err
return nil, err
}, 0, utils.LOADINST_KEY)
@@ -947,7 +992,7 @@ func (rs *RedisStorage) GetActionTriggers(key string, skipCache bool, transactio
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
err = rs.ms.Unmarshal(values, &atrs)
}
cache2go.Set(key, atrs, cacheCommit(transactionID), transactionID)
@@ -955,27 +1000,22 @@ func (rs *RedisStorage) GetActionTriggers(key string, skipCache bool, transactio
}
func (rs *RedisStorage) SetActionTriggers(key string, atrs ActionTriggers, transactionID string) (err error) {
conn, err := rs.db.Get()
if err != nil {
return err
}
defer rs.db.Put(conn)
if len(atrs) == 0 {
// delete the key
return conn.Cmd("DEL", utils.ACTION_TRIGGER_PREFIX+key).Err
return rs.Cmd("DEL", utils.ACTION_TRIGGER_PREFIX+key).Err
}
result, err := rs.ms.Marshal(atrs)
if err != nil {
return err
}
err = conn.Cmd("SET", utils.ACTION_TRIGGER_PREFIX+key, result).Err
err = rs.Cmd("SET", utils.ACTION_TRIGGER_PREFIX+key, result).Err
cache2go.RemKey(utils.ACTION_TRIGGER_PREFIX+key, cacheCommit(transactionID), transactionID)
return
}
func (rs *RedisStorage) RemoveActionTriggers(key string, transactionID string) (err error) {
key = utils.ACTION_TRIGGER_PREFIX + key
err = rs.db.Cmd("DEL", key).Err
err = rs.Cmd("DEL", key).Err
cache2go.RemKey(key, cacheCommit(transactionID), transactionID)
return
@@ -992,7 +1032,7 @@ func (rs *RedisStorage) GetActionPlan(key string, skipCache bool, transactionID
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
b := bytes.NewBuffer(values)
r, err := zlib.NewReader(b)
if err != nil {
@@ -1014,7 +1054,7 @@ func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan, overwrite boo
cCommit := cacheCommit(transactionID)
if len(ats.ActionTimings) == 0 {
// delete the key
err = rs.db.Cmd("DEL", utils.ACTION_PLAN_PREFIX+key).Err
err = rs.Cmd("DEL", utils.ACTION_PLAN_PREFIX+key).Err
cache2go.RemKey(utils.ACTION_PLAN_PREFIX+key, cCommit, transactionID)
return err
}
@@ -1039,7 +1079,7 @@ func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan, overwrite boo
w := zlib.NewWriter(&b)
w.Write(result)
w.Close()
err = rs.db.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, b.Bytes()).Err
err = rs.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, b.Bytes()).Err
cache2go.RemKey(utils.ACTION_PLAN_PREFIX+key, cCommit, transactionID)
return
}
@@ -1068,12 +1108,12 @@ func (rs *RedisStorage) PushTask(t *Task) error {
if err != nil {
return err
}
return rs.db.Cmd("RPUSH", utils.TASKS_KEY, result).Err
return rs.Cmd("RPUSH", utils.TASKS_KEY, result).Err
}
func (rs *RedisStorage) PopTask() (t *Task, err error) {
var values []byte
if values, err = rs.db.Cmd("LPOP", utils.TASKS_KEY).Bytes(); err == nil {
if values, err = rs.Cmd("LPOP", utils.TASKS_KEY).Bytes(); err == nil {
t = &Task{}
err = rs.ms.Unmarshal(values, t)
}
@@ -1091,7 +1131,7 @@ func (rs *RedisStorage) GetDerivedChargers(key string, skipCache bool, transacti
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
err = rs.ms.Unmarshal(values, &dcs)
} else {
cache2go.Set(key, nil, cacheCommit(transactionID), transactionID)
@@ -1105,7 +1145,7 @@ func (rs *RedisStorage) SetDerivedChargers(key string, dcs *utils.DerivedCharger
key = utils.DERIVEDCHARGERS_PREFIX + key
cCommit := cacheCommit(transactionID)
if dcs == nil || len(dcs.Chargers) == 0 {
err = rs.db.Cmd("DEL", key).Err
err = rs.Cmd("DEL", key).Err
cache2go.RemKey(key, cCommit, transactionID)
return err
}
@@ -1113,7 +1153,7 @@ func (rs *RedisStorage) SetDerivedChargers(key string, dcs *utils.DerivedCharger
if err != nil {
return err
}
err = rs.db.Cmd("SET", key, marshaled).Err
err = rs.Cmd("SET", key, marshaled).Err
cache2go.RemKey(key, cCommit, transactionID)
return
}
@@ -1123,29 +1163,24 @@ func (rs *RedisStorage) SetCdrStats(cs *CdrStats) error {
if err != nil {
return err
}
return rs.db.Cmd("SET", utils.CDR_STATS_PREFIX+cs.Id, marshaled).Err
return rs.Cmd("SET", utils.CDR_STATS_PREFIX+cs.Id, marshaled).Err
}
func (rs *RedisStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
var values []byte
if values, err = rs.db.Cmd("GET", utils.CDR_STATS_PREFIX+key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", utils.CDR_STATS_PREFIX+key).Bytes(); err == nil {
err = rs.ms.Unmarshal(values, &cs)
}
return
}
func (rs *RedisStorage) GetAllCdrStats() (css []*CdrStats, err error) {
conn, err := rs.db.Get()
if err != nil {
return nil, err
}
defer rs.db.Put(conn)
keys, err := conn.Cmd("KEYS", utils.CDR_STATS_PREFIX+"*").List()
keys, err := rs.Cmd("KEYS", utils.CDR_STATS_PREFIX+"*").List()
if err != nil {
return nil, err
}
for _, key := range keys {
value, err := conn.Cmd("GET", key).Bytes()
value, err := rs.Cmd("GET", key).Bytes()
if err != nil {
continue
}
@@ -1162,13 +1197,13 @@ func (rs *RedisStorage) SetStructVersion(v *StructVersion) (err error) {
if err != nil {
return
}
return rs.db.Cmd("SET", utils.VERSION_PREFIX+"struct", result).Err
return rs.Cmd("SET", utils.VERSION_PREFIX+"struct", result).Err
}
func (rs *RedisStorage) GetStructVersion() (rsv *StructVersion, err error) {
var values []byte
rsv = &StructVersion{}
if values, err = rs.db.Cmd("GET", utils.VERSION_PREFIX+"struct").Bytes(); err == nil {
if values, err = rs.Cmd("GET", utils.VERSION_PREFIX+"struct").Bytes(); err == nil {
err = rs.ms.Unmarshal(values, &rsv)
}
return
@@ -1185,7 +1220,7 @@ func (rs *RedisStorage) GetResourceLimit(id string, skipCache bool, transactionI
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
if values, err = rs.Cmd("GET", key).Bytes(); err == nil {
err = rs.ms.Unmarshal(values, &rl)
for _, fltr := range rl.Filters {
if err := fltr.CompileValues(); err != nil {
@@ -1202,13 +1237,13 @@ func (rs *RedisStorage) SetResourceLimit(rl *ResourceLimit, transactionID string
return err
}
key := utils.ResourceLimitsPrefix + rl.ID
err = rs.db.Cmd("SET", key, result).Err
err = rs.Cmd("SET", key, result).Err
cache2go.Set(key, rl, cacheCommit(transactionID), transactionID)
return err
}
func (rs *RedisStorage) RemoveResourceLimit(id string, transactionID string) error {
key := utils.ResourceLimitsPrefix + id
if err := rs.db.Cmd("DEL", key).Err; err != nil {
if err := rs.Cmd("DEL", key).Err; err != nil {
return err
}
cache2go.RemKey(key, cacheCommit(transactionID), transactionID)

2
glide.lock generated
View File

@@ -64,7 +64,7 @@ imports:
subpackages:
- oid
- name: github.com/mediocregopher/radix.v2
version: ae04b3eb3731f94789205d1268e0759371166605
version: bb6eabf33bf82d38e985e2757cc13ffdc80818a4
subpackages:
- pool
- redis