diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 8da6d6737..a01310561 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -22,14 +22,13 @@ import ( "compress/zlib" "errors" "fmt" - "io/ioutil" - "strings" - "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/mediocregopher/radix.v2/pool" "github.com/mediocregopher/radix.v2/redis" + "io/ioutil" + "strings" ) var ( @@ -41,32 +40,36 @@ type RedisStorage struct { ms Marshaler cacheCfg *config.CacheConfig loadHistorySize int + poolConfig PoolConnectionConfig +} + +type PoolConnectionConfig struct { + address string + password string + db int + mrshlerStr string + maxConns int + cacheCfg *config.CacheConfig + loadHistorySize int } func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheCfg *config.CacheConfig, loadHistorySize int) (*RedisStorage, error) { - df := func(network, addr string) (*redis.Client, error) { - client, err := redis.Dial(network, addr) - if err != nil { - return nil, err - } - if len(pass) != 0 { - if err = client.Cmd("AUTH", pass).Err; err != nil { - client.Close() - return nil, err - } - } - if db != 0 { - if err = client.Cmd("SELECT", db).Err; err != nil { - client.Close() - return nil, err - } - } - return client, nil + + connectionConf := PoolConnectionConfig{ + address: address, + db: db, + password: pass, + mrshlerStr: mrshlerStr, + maxConns: maxConns, + cacheCfg: cacheCfg, + loadHistorySize: loadHistorySize, } - p, err := pool.NewCustom("tcp", address, maxConns, df) + + pool, err := connectionConf.PoolCreation(maxConns) if err != nil { return nil, err } + var mrshler Marshaler if mrshlerStr == utils.MSGPACK { mrshler = NewCodecMsgpackMarshaler() @@ -75,7 +78,74 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns i } else { return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) } - return &RedisStorage{db: p, ms: mrshler, cacheCfg: cacheCfg, loadHistorySize: loadHistorySize}, nil + result := &RedisStorage{ + db: pool, + ms: mrshler, + cacheCfg: cacheCfg, + poolConfig: connectionConf, + loadHistorySize: loadHistorySize} + return result, nil +} + +func (config *PoolConnectionConfig) DialFunction(network string, addr string) (*redis.Client, error) { + client, err := redis.Dial(network, addr) + if err != nil { + return nil, err + } + if len(config.password) != 0 { + if err = client.Cmd("AUTH", config.password).Err; err != nil { + client.Close() + return nil, err + } + } + if config.db != 0 { + if err = client.Cmd("SELECT", config.db).Err; err != nil { + client.Close() + return nil, err + } + } + return client, nil +} + +func (config *PoolConnectionConfig) PoolCreation(maxConns int) (*pool.Pool, error) { + p, err := pool.NewCustom("tcp", config.address, maxConns, config.DialFunction) + if err != nil { + return nil, err + } + return p, nil +} + +func (rs *RedisStorage) IncreasePool() error { + number_of_pools := (rs.poolConfig.maxConns - rs.db.Avail()) + utils.Logger.Warning(fmt.Sprintf("RedisClient increase pool size in %d (Current size is %d)", + number_of_pools, rs.db.Avail())) + for i := 0; i < number_of_pools; i++ { + conn, err := rs.db.Get() + if err != nil { + return err + } + rs.db.Put(conn) + } + return nil +} + +// This CMD function get a connection from the pool. Send the command and if +// the problem is a redis.IOError didn't add the connection in the pool and +// increase the pool with a new connection. Good for timeouts + +func (rs *RedisStorage) Cmd(cmd string, args ...interface{}) *redis.Resp { + c, err := rs.db.Get() + if err != nil { + return redis.NewResp(err) + } + result := c.Cmd(cmd, args...) + if result.IsType(redis.IOErr) { + rs.IncreasePool() + utils.Logger.Warning(fmt.Sprintf("RedisClient error '%s'", result.String())) + return rs.Cmd(cmd, args...) + } + rs.db.Put(c) + return result } func (rs *RedisStorage) Close() { @@ -83,7 +153,7 @@ func (rs *RedisStorage) Close() { } func (rs *RedisStorage) Flush(ignore string) error { - return rs.db.Cmd("FLUSHDB").Err + return rs.Cmd("FLUSHDB").Err } func (rs *RedisStorage) PreloadRatingCache() error { @@ -198,17 +268,12 @@ func (rs *RedisStorage) PreloadCacheForPrefix(prefix string) error { } func (rs *RedisStorage) RebuildReverseForPrefix(prefix string) error { - conn, err := rs.db.Get() - if err != nil { - return err - } - defer rs.db.Put(conn) keys, err := rs.GetKeysForPrefix(prefix) if err != nil { return err } for _, key := range keys { - err = conn.Cmd("DEL", key).Err + err = rs.Cmd("DEL", key).Err if err != nil { return err } @@ -249,7 +314,7 @@ func (rs *RedisStorage) RebuildReverseForPrefix(prefix string) error { } func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) { - r := rs.db.Cmd("KEYS", prefix+"*") + r := rs.Cmd("KEYS", prefix+"*") if r.Err != nil { return nil, r.Err } @@ -260,7 +325,7 @@ func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) { func (rs *RedisStorage) HasData(category, subject string) (bool, error) { switch category { case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX: - i, err := rs.db.Cmd("EXISTS", category+subject).Int() + i, err := rs.Cmd("EXISTS", category+subject).Int() return i == 1, err } return false, errors.New("unsupported HasData category") @@ -277,7 +342,7 @@ func (rs *RedisStorage) GetRatingPlan(key string, skipCache bool, transactionID } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", key).Bytes(); err == nil { b := bytes.NewBuffer(values) r, err := zlib.NewReader(b) if err != nil { @@ -301,7 +366,7 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan, transactionID string) (err w := zlib.NewWriter(&b) w.Write(result) w.Close() - err = rs.db.Cmd("SET", utils.RATING_PLAN_PREFIX+rp.Id, b.Bytes()).Err + err = rs.Cmd("SET", utils.RATING_PLAN_PREFIX+rp.Id, b.Bytes()).Err if err == nil && historyScribe != nil { response := 0 go historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(), &response) @@ -322,7 +387,7 @@ func (rs *RedisStorage) GetRatingProfile(key string, skipCache bool, transaction } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", key).Bytes(); err == nil { rpf = new(RatingProfile) err = rs.ms.Unmarshal(values, rpf) } @@ -332,7 +397,7 @@ func (rs *RedisStorage) GetRatingProfile(key string, skipCache bool, transaction func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile, transactionID string) (err error) { result, err := rs.ms.Marshal(rpf) - err = rs.db.Cmd("SET", utils.RATING_PROFILE_PREFIX+rpf.Id, result).Err + err = rs.Cmd("SET", utils.RATING_PROFILE_PREFIX+rpf.Id, result).Err if err == nil && historyScribe != nil { response := 0 go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(false), &response) @@ -342,17 +407,12 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile, transactionID strin } func (rs *RedisStorage) RemoveRatingProfile(key string, transactionID string) error { - conn, err := rs.db.Get() - if err != nil { - return err - } - defer rs.db.Put(conn) - keys, err := conn.Cmd("KEYS", utils.RATING_PROFILE_PREFIX+key+"*").List() + keys, err := rs.Cmd("KEYS", utils.RATING_PROFILE_PREFIX+key+"*").List() if err != nil { return err } for _, key := range keys { - if err = conn.Cmd("DEL", key).Err; err != nil { + if err = rs.Cmd("DEL", key).Err; err != nil { return err } cache2go.RemKey(key, cacheCommit(transactionID), transactionID) @@ -377,7 +437,7 @@ func (rs *RedisStorage) GetLCR(key string, skipCache bool, transactionID string) } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", key).Bytes(); err == nil { err = rs.ms.Unmarshal(values, &lcr) } else { cache2go.Set(key, nil, cacheCommit(transactionID), transactionID) @@ -390,7 +450,7 @@ func (rs *RedisStorage) GetLCR(key string, skipCache bool, transactionID string) func (rs *RedisStorage) SetLCR(lcr *LCR, transactionID string) (err error) { result, err := rs.ms.Marshal(lcr) key := utils.LCR_PREFIX + lcr.GetId() - err = rs.db.Cmd("SET", key, result).Err + err = rs.Cmd("SET", key, result).Err cache2go.RemKey(key, cacheCommit(transactionID), transactionID) return } @@ -406,7 +466,7 @@ func (rs *RedisStorage) GetDestination(key string, skipCache bool, transactionID } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); len(values) > 0 && err == nil { + if values, err = rs.Cmd("GET", key).Bytes(); len(values) > 0 && err == nil { b := bytes.NewBuffer(values) r, err := zlib.NewReader(b) if err != nil { @@ -439,7 +499,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination, transactionID string) w.Write(result) w.Close() key := utils.DESTINATION_PREFIX + dest.Id - err = rs.db.Cmd("SET", key, b.Bytes()).Err + err = rs.Cmd("SET", key, b.Bytes()).Err if err == nil && historyScribe != nil { response := 0 go historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response) @@ -458,7 +518,7 @@ func (rs *RedisStorage) GetReverseDestination(prefix string, skipCache bool, tra return nil, utils.ErrNotFound } } - if ids, err = rs.db.Cmd("SMEMBERS", prefix).List(); len(ids) > 0 && err == nil { + if ids, err = rs.Cmd("SMEMBERS", prefix).List(); len(ids) > 0 && err == nil { cache2go.Set(prefix, ids, cacheCommit(transactionID), transactionID) return ids, nil } @@ -468,7 +528,7 @@ func (rs *RedisStorage) GetReverseDestination(prefix string, skipCache bool, tra func (rs *RedisStorage) SetReverseDestination(dest *Destination, transactionID string) (err error) { for _, p := range dest.Prefixes { key := utils.REVERSE_DESTINATION_PREFIX + p - err = rs.db.Cmd("SADD", key, dest.Id).Err + err = rs.Cmd("SADD", key, dest.Id).Err if err != nil { break } @@ -484,13 +544,13 @@ func (rs *RedisStorage) RemoveDestination(destID, transactionID string) (err err if err != nil { return } - err = rs.db.Cmd("DEL", key).Err + err = rs.Cmd("DEL", key).Err if err != nil { return err } cache2go.RemKey(key, cacheCommit(transactionID), transactionID) for _, prefix := range d.Prefixes { - err = rs.db.Cmd("SREM", utils.REVERSE_DESTINATION_PREFIX+prefix, destID).Err + err = rs.Cmd("SREM", utils.REVERSE_DESTINATION_PREFIX+prefix, destID).Err if err != nil { return err } @@ -535,7 +595,7 @@ func (rs *RedisStorage) UpdateReverseDestination(oldDest, newDest *Destination, cCommit := cacheCommit(transactionID) var err error for _, obsoletePrefix := range obsoletePrefixes { - err = rs.db.Cmd("SREM", utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix, oldDest.Id).Err + err = rs.Cmd("SREM", utils.REVERSE_DESTINATION_PREFIX+obsoletePrefix, oldDest.Id).Err if err != nil { return err } @@ -544,7 +604,7 @@ func (rs *RedisStorage) UpdateReverseDestination(oldDest, newDest *Destination, // add the id to all new prefixes for _, addedPrefix := range addedPrefixes { - err = rs.db.Cmd("SADD", utils.REVERSE_DESTINATION_PREFIX+addedPrefix, newDest.Id).Err + err = rs.Cmd("SADD", utils.REVERSE_DESTINATION_PREFIX+addedPrefix, newDest.Id).Err if err != nil { return err } @@ -564,7 +624,7 @@ func (rs *RedisStorage) GetActions(key string, skipCache bool, transactionID str } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", key).Bytes(); err == nil { err = rs.ms.Unmarshal(values, &as) } cache2go.Set(key, as, cacheCommit(transactionID), transactionID) @@ -573,13 +633,13 @@ func (rs *RedisStorage) GetActions(key string, skipCache bool, transactionID str func (rs *RedisStorage) SetActions(key string, as Actions, transactionID string) (err error) { result, err := rs.ms.Marshal(&as) - err = rs.db.Cmd("SET", utils.ACTION_PREFIX+key, result).Err + err = rs.Cmd("SET", utils.ACTION_PREFIX+key, result).Err cache2go.RemKey(utils.ACTION_PREFIX+key, cacheCommit(transactionID), transactionID) return } func (rs *RedisStorage) RemoveActions(key string, transactionID string) (err error) { - err = rs.db.Cmd("DEL", utils.ACTION_PREFIX+key).Err + err = rs.Cmd("DEL", utils.ACTION_PREFIX+key).Err cache2go.RemKey(utils.ACTION_PREFIX+key, cacheCommit(transactionID), transactionID) return } @@ -595,7 +655,7 @@ func (rs *RedisStorage) GetSharedGroup(key string, skipCache bool, transactionID } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", key).Bytes(); err == nil { err = rs.ms.Unmarshal(values, &sg) } cache2go.Set(key, sg, cacheCommit(transactionID), transactionID) @@ -604,13 +664,13 @@ func (rs *RedisStorage) GetSharedGroup(key string, skipCache bool, transactionID func (rs *RedisStorage) SetSharedGroup(sg *SharedGroup, transactionID string) (err error) { result, err := rs.ms.Marshal(sg) - err = rs.db.Cmd("SET", utils.SHARED_GROUP_PREFIX+sg.Id, result).Err + err = rs.Cmd("SET", utils.SHARED_GROUP_PREFIX+sg.Id, result).Err cache2go.RemKey(utils.SHARED_GROUP_PREFIX+sg.Id, cacheCommit(transactionID), transactionID) return } func (rs *RedisStorage) GetAccount(key string) (*Account, error) { - rpl := rs.db.Cmd("GET", utils.ACCOUNT_PREFIX+key) + rpl := rs.Cmd("GET", utils.ACCOUNT_PREFIX+key) if rpl.Err != nil { return nil, rpl.Err } else if rpl.IsType(redis.Nil) { @@ -641,18 +701,18 @@ func (rs *RedisStorage) SetAccount(ub *Account) (err error) { } } result, err := rs.ms.Marshal(ub) - err = rs.db.Cmd("SET", utils.ACCOUNT_PREFIX+ub.ID, result).Err + err = rs.Cmd("SET", utils.ACCOUNT_PREFIX+ub.ID, result).Err return } func (rs *RedisStorage) RemoveAccount(key string) (err error) { - return rs.db.Cmd("DEL", utils.ACCOUNT_PREFIX+key).Err + return rs.Cmd("DEL", utils.ACCOUNT_PREFIX+key).Err } func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) { var values []byte - if values, err = rs.db.Cmd("GET", utils.CDR_STATS_QUEUE_PREFIX+key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", utils.CDR_STATS_QUEUE_PREFIX+key).Bytes(); err == nil { sq = &StatsQueue{} err = rs.ms.Unmarshal(values, &sq) } @@ -661,23 +721,18 @@ func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) func (rs *RedisStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { result, err := rs.ms.Marshal(sq) - err = rs.db.Cmd("SET", utils.CDR_STATS_QUEUE_PREFIX+sq.GetId(), result).Err + err = rs.Cmd("SET", utils.CDR_STATS_QUEUE_PREFIX+sq.GetId(), result).Err return } func (rs *RedisStorage) GetSubscribers() (result map[string]*SubscriberData, err error) { - conn, err := rs.db.Get() - if err != nil { - return nil, err - } - defer rs.db.Put(conn) - keys, err := conn.Cmd("KEYS", utils.PUBSUB_SUBSCRIBERS_PREFIX+"*").List() + keys, err := rs.Cmd("KEYS", utils.PUBSUB_SUBSCRIBERS_PREFIX+"*").List() if err != nil { return nil, err } result = make(map[string]*SubscriberData) for _, key := range keys { - if values, err := conn.Cmd("GET", key).Bytes(); err == nil { + if values, err := rs.Cmd("GET", key).Bytes(); err == nil { sub := &SubscriberData{} err = rs.ms.Unmarshal(values, sub) result[key[len(utils.PUBSUB_SUBSCRIBERS_PREFIX):]] = sub @@ -693,11 +748,11 @@ func (rs *RedisStorage) SetSubscriber(key string, sub *SubscriberData) (err erro if err != nil { return err } - return rs.db.Cmd("SET", utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result).Err + return rs.Cmd("SET", utils.PUBSUB_SUBSCRIBERS_PREFIX+key, result).Err } func (rs *RedisStorage) RemoveSubscriber(key string) (err error) { - err = rs.db.Cmd("DEL", utils.PUBSUB_SUBSCRIBERS_PREFIX+key).Err + err = rs.Cmd("DEL", utils.PUBSUB_SUBSCRIBERS_PREFIX+key).Err return } @@ -706,12 +761,12 @@ func (rs *RedisStorage) SetUser(up *UserProfile) (err error) { if err != nil { return err } - return rs.db.Cmd("SET", utils.USERS_PREFIX+up.GetId(), result).Err + return rs.Cmd("SET", utils.USERS_PREFIX+up.GetId(), result).Err } func (rs *RedisStorage) GetUser(key string) (up *UserProfile, err error) { var values []byte - if values, err = rs.db.Cmd("GET", utils.USERS_PREFIX+key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", utils.USERS_PREFIX+key).Bytes(); err == nil { up = &UserProfile{} err = rs.ms.Unmarshal(values, &up) } @@ -719,17 +774,12 @@ func (rs *RedisStorage) GetUser(key string) (up *UserProfile, err error) { } func (rs *RedisStorage) GetUsers() (result []*UserProfile, err error) { - conn, err := rs.db.Get() - if err != nil { - return nil, err - } - defer rs.db.Put(conn) - keys, err := conn.Cmd("KEYS", utils.USERS_PREFIX+"*").List() + keys, err := rs.Cmd("KEYS", utils.USERS_PREFIX+"*").List() if err != nil { return nil, err } for _, key := range keys { - if values, err := conn.Cmd("GET", key).Bytes(); err == nil { + if values, err := rs.Cmd("GET", key).Bytes(); err == nil { up := &UserProfile{} err = rs.ms.Unmarshal(values, up) result = append(result, up) @@ -741,7 +791,7 @@ func (rs *RedisStorage) GetUsers() (result []*UserProfile, err error) { } func (rs *RedisStorage) RemoveUser(key string) (err error) { - return rs.db.Cmd("DEL", utils.USERS_PREFIX+key).Err + return rs.Cmd("DEL", utils.USERS_PREFIX+key).Err } func (rs *RedisStorage) GetAlias(key string, skipCache bool, transactionID string) (al *Alias, err error) { @@ -759,7 +809,7 @@ func (rs *RedisStorage) GetAlias(key string, skipCache bool, transactionID strin } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", key).Bytes(); err == nil { al = &Alias{Values: make(AliasValues, 0)} al.SetId(origKey) err = rs.ms.Unmarshal(values, &al.Values) @@ -777,7 +827,7 @@ func (rs *RedisStorage) SetAlias(al *Alias, transactionID string) (err error) { return err } key := utils.ALIASES_PREFIX + al.GetId() - err = rs.db.Cmd("SET", key, result).Err + err = rs.Cmd("SET", key, result).Err cache2go.RemKey(key, cacheCommit(transactionID), transactionID) return } @@ -792,7 +842,7 @@ func (rs *RedisStorage) GetReverseAlias(reverseID string, skipCache bool, transa return nil, utils.ErrNotFound } } - if ids, err = rs.db.Cmd("SMEMBERS", key).List(); len(ids) == 0 || err != nil { + if ids, err = rs.Cmd("SMEMBERS", key).List(); len(ids) == 0 || err != nil { cache2go.Set(key, nil, cacheCommit(transactionID), transactionID) return nil, utils.ErrNotFound } @@ -807,7 +857,7 @@ func (rs *RedisStorage) SetReverseAlias(al *Alias, transactionID string) (err er for _, alias := range pairs { rKey := strings.Join([]string{utils.REVERSE_ALIASES_PREFIX, alias, target, al.Context}, "") id := utils.ConcatenatedKey(al.GetId(), value.DestinationId) - err = rs.db.Cmd("SADD", rKey, id).Err + err = rs.Cmd("SADD", rKey, id).Err if err != nil { break } @@ -826,7 +876,7 @@ func (rs *RedisStorage) RemoveAlias(id string, transactionID string) (err error) if err != nil { return } - err = rs.db.Cmd("DEL", key).Err + err = rs.Cmd("DEL", key).Err if err != nil { return err } @@ -838,7 +888,7 @@ func (rs *RedisStorage) RemoveAlias(id string, transactionID string) (err error) for target, pairs := range value.Pairs { for _, alias := range pairs { rKey := utils.REVERSE_ALIASES_PREFIX + alias + target + al.Context - err = rs.db.Cmd("SREM", rKey, tmpKey).Err + err = rs.Cmd("SREM", rKey, tmpKey).Err if err != nil { return err } @@ -881,7 +931,7 @@ func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool, transactionID if limit != -1 { limit -= -1 // Decrease limit to match redis approach on lrange } - marshaleds, err := rs.db.Cmd("LRANGE", utils.LOADINST_KEY, 0, limit).ListBytes() + marshaleds, err := rs.Cmd("LRANGE", utils.LOADINST_KEY, 0, limit).ListBytes() cCommit := cacheCommit(transactionID) if err != nil { cache2go.Set(utils.LOADINST_KEY, nil, cCommit, transactionID) @@ -906,11 +956,6 @@ func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool, transactionID // Adds a single load instance to load history func (rs *RedisStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int, transactionID string) error { - conn, err := rs.db.Get() - if err != nil { - return err - } - defer rs.db.Put(conn) if loadHistSize == 0 { // Load history disabled return nil } @@ -919,16 +964,16 @@ func (rs *RedisStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize return err } _, err = Guardian.Guard(func() (interface{}, error) { // Make sure we do it locked since other instance can modify history while we read it - histLen, err := conn.Cmd("LLEN", utils.LOADINST_KEY).Int() + histLen, err := rs.Cmd("LLEN", utils.LOADINST_KEY).Int() if err != nil { return nil, err } if histLen >= loadHistSize { // Have hit maximum history allowed, remove oldest element in order to add new one - if err := conn.Cmd("RPOP", utils.LOADINST_KEY).Err; err != nil { + if err := rs.Cmd("RPOP", utils.LOADINST_KEY).Err; err != nil { return nil, err } } - err = conn.Cmd("LPUSH", utils.LOADINST_KEY, marshaled).Err + err = rs.Cmd("LPUSH", utils.LOADINST_KEY, marshaled).Err return nil, err }, 0, utils.LOADINST_KEY) @@ -947,7 +992,7 @@ func (rs *RedisStorage) GetActionTriggers(key string, skipCache bool, transactio } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", key).Bytes(); err == nil { err = rs.ms.Unmarshal(values, &atrs) } cache2go.Set(key, atrs, cacheCommit(transactionID), transactionID) @@ -955,27 +1000,22 @@ func (rs *RedisStorage) GetActionTriggers(key string, skipCache bool, transactio } func (rs *RedisStorage) SetActionTriggers(key string, atrs ActionTriggers, transactionID string) (err error) { - conn, err := rs.db.Get() - if err != nil { - return err - } - defer rs.db.Put(conn) if len(atrs) == 0 { // delete the key - return conn.Cmd("DEL", utils.ACTION_TRIGGER_PREFIX+key).Err + return rs.Cmd("DEL", utils.ACTION_TRIGGER_PREFIX+key).Err } result, err := rs.ms.Marshal(atrs) if err != nil { return err } - err = conn.Cmd("SET", utils.ACTION_TRIGGER_PREFIX+key, result).Err + err = rs.Cmd("SET", utils.ACTION_TRIGGER_PREFIX+key, result).Err cache2go.RemKey(utils.ACTION_TRIGGER_PREFIX+key, cacheCommit(transactionID), transactionID) return } func (rs *RedisStorage) RemoveActionTriggers(key string, transactionID string) (err error) { key = utils.ACTION_TRIGGER_PREFIX + key - err = rs.db.Cmd("DEL", key).Err + err = rs.Cmd("DEL", key).Err cache2go.RemKey(key, cacheCommit(transactionID), transactionID) return @@ -992,7 +1032,7 @@ func (rs *RedisStorage) GetActionPlan(key string, skipCache bool, transactionID } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", key).Bytes(); err == nil { b := bytes.NewBuffer(values) r, err := zlib.NewReader(b) if err != nil { @@ -1014,7 +1054,7 @@ func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan, overwrite boo cCommit := cacheCommit(transactionID) if len(ats.ActionTimings) == 0 { // delete the key - err = rs.db.Cmd("DEL", utils.ACTION_PLAN_PREFIX+key).Err + err = rs.Cmd("DEL", utils.ACTION_PLAN_PREFIX+key).Err cache2go.RemKey(utils.ACTION_PLAN_PREFIX+key, cCommit, transactionID) return err } @@ -1039,7 +1079,7 @@ func (rs *RedisStorage) SetActionPlan(key string, ats *ActionPlan, overwrite boo w := zlib.NewWriter(&b) w.Write(result) w.Close() - err = rs.db.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, b.Bytes()).Err + err = rs.Cmd("SET", utils.ACTION_PLAN_PREFIX+key, b.Bytes()).Err cache2go.RemKey(utils.ACTION_PLAN_PREFIX+key, cCommit, transactionID) return } @@ -1068,12 +1108,12 @@ func (rs *RedisStorage) PushTask(t *Task) error { if err != nil { return err } - return rs.db.Cmd("RPUSH", utils.TASKS_KEY, result).Err + return rs.Cmd("RPUSH", utils.TASKS_KEY, result).Err } func (rs *RedisStorage) PopTask() (t *Task, err error) { var values []byte - if values, err = rs.db.Cmd("LPOP", utils.TASKS_KEY).Bytes(); err == nil { + if values, err = rs.Cmd("LPOP", utils.TASKS_KEY).Bytes(); err == nil { t = &Task{} err = rs.ms.Unmarshal(values, t) } @@ -1091,7 +1131,7 @@ func (rs *RedisStorage) GetDerivedChargers(key string, skipCache bool, transacti } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", key).Bytes(); err == nil { err = rs.ms.Unmarshal(values, &dcs) } else { cache2go.Set(key, nil, cacheCommit(transactionID), transactionID) @@ -1105,7 +1145,7 @@ func (rs *RedisStorage) SetDerivedChargers(key string, dcs *utils.DerivedCharger key = utils.DERIVEDCHARGERS_PREFIX + key cCommit := cacheCommit(transactionID) if dcs == nil || len(dcs.Chargers) == 0 { - err = rs.db.Cmd("DEL", key).Err + err = rs.Cmd("DEL", key).Err cache2go.RemKey(key, cCommit, transactionID) return err } @@ -1113,7 +1153,7 @@ func (rs *RedisStorage) SetDerivedChargers(key string, dcs *utils.DerivedCharger if err != nil { return err } - err = rs.db.Cmd("SET", key, marshaled).Err + err = rs.Cmd("SET", key, marshaled).Err cache2go.RemKey(key, cCommit, transactionID) return } @@ -1123,29 +1163,24 @@ func (rs *RedisStorage) SetCdrStats(cs *CdrStats) error { if err != nil { return err } - return rs.db.Cmd("SET", utils.CDR_STATS_PREFIX+cs.Id, marshaled).Err + return rs.Cmd("SET", utils.CDR_STATS_PREFIX+cs.Id, marshaled).Err } func (rs *RedisStorage) GetCdrStats(key string) (cs *CdrStats, err error) { var values []byte - if values, err = rs.db.Cmd("GET", utils.CDR_STATS_PREFIX+key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", utils.CDR_STATS_PREFIX+key).Bytes(); err == nil { err = rs.ms.Unmarshal(values, &cs) } return } func (rs *RedisStorage) GetAllCdrStats() (css []*CdrStats, err error) { - conn, err := rs.db.Get() - if err != nil { - return nil, err - } - defer rs.db.Put(conn) - keys, err := conn.Cmd("KEYS", utils.CDR_STATS_PREFIX+"*").List() + keys, err := rs.Cmd("KEYS", utils.CDR_STATS_PREFIX+"*").List() if err != nil { return nil, err } for _, key := range keys { - value, err := conn.Cmd("GET", key).Bytes() + value, err := rs.Cmd("GET", key).Bytes() if err != nil { continue } @@ -1162,13 +1197,13 @@ func (rs *RedisStorage) SetStructVersion(v *StructVersion) (err error) { if err != nil { return } - return rs.db.Cmd("SET", utils.VERSION_PREFIX+"struct", result).Err + return rs.Cmd("SET", utils.VERSION_PREFIX+"struct", result).Err } func (rs *RedisStorage) GetStructVersion() (rsv *StructVersion, err error) { var values []byte rsv = &StructVersion{} - if values, err = rs.db.Cmd("GET", utils.VERSION_PREFIX+"struct").Bytes(); err == nil { + if values, err = rs.Cmd("GET", utils.VERSION_PREFIX+"struct").Bytes(); err == nil { err = rs.ms.Unmarshal(values, &rsv) } return @@ -1185,7 +1220,7 @@ func (rs *RedisStorage) GetResourceLimit(id string, skipCache bool, transactionI } } var values []byte - if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { + if values, err = rs.Cmd("GET", key).Bytes(); err == nil { err = rs.ms.Unmarshal(values, &rl) for _, fltr := range rl.Filters { if err := fltr.CompileValues(); err != nil { @@ -1202,13 +1237,13 @@ func (rs *RedisStorage) SetResourceLimit(rl *ResourceLimit, transactionID string return err } key := utils.ResourceLimitsPrefix + rl.ID - err = rs.db.Cmd("SET", key, result).Err + err = rs.Cmd("SET", key, result).Err cache2go.Set(key, rl, cacheCommit(transactionID), transactionID) return err } func (rs *RedisStorage) RemoveResourceLimit(id string, transactionID string) error { key := utils.ResourceLimitsPrefix + id - if err := rs.db.Cmd("DEL", key).Err; err != nil { + if err := rs.Cmd("DEL", key).Err; err != nil { return err } cache2go.RemKey(key, cacheCommit(transactionID), transactionID) diff --git a/glide.lock b/glide.lock index 62d1186ea..ed937a38c 100644 --- a/glide.lock +++ b/glide.lock @@ -64,7 +64,7 @@ imports: subpackages: - oid - name: github.com/mediocregopher/radix.v2 - version: ae04b3eb3731f94789205d1268e0759371166605 + version: bb6eabf33bf82d38e985e2757cc13ffdc80818a4 subpackages: - pool - redis