Improved redis reconnects

This commit is contained in:
DanB
2016-10-04 17:16:20 +02:00
parent e2030470e2
commit fd0cc1717e

View File

@@ -22,13 +22,15 @@ import (
"compress/zlib"
"errors"
"fmt"
"io/ioutil"
"strings"
"sync"
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/mediocregopher/radix.v2/pool"
"github.com/mediocregopher/radix.v2/redis"
"io/ioutil"
"strings"
)
var (
@@ -36,40 +38,22 @@ var (
)
type RedisStorage struct {
db *pool.Pool
ms Marshaler
cacheCfg *config.CacheConfig
loadHistorySize int
poolConfig PoolConnectionConfig
poolCfg *PoolConfig
poolLock sync.RWMutex
dbPool *pool.Pool
}
type PoolConnectionConfig struct {
address string
password string
db int
mrshlerStr string
maxConns int
cacheCfg *config.CacheConfig
loadHistorySize int
type PoolConfig struct {
address string
password string
dbNr int
maxConns int
}
func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheCfg *config.CacheConfig, loadHistorySize int) (*RedisStorage, error) {
connectionConf := PoolConnectionConfig{
address: address,
db: db,
password: pass,
mrshlerStr: mrshlerStr,
maxConns: maxConns,
cacheCfg: cacheCfg,
loadHistorySize: loadHistorySize,
}
pool, err := connectionConf.PoolCreation(maxConns)
if err != nil {
return nil, err
}
func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheCfg *config.CacheConfig, loadHistorySize int) (rs *RedisStorage, err error) {
var mrshler Marshaler
if mrshlerStr == utils.MSGPACK {
mrshler = NewCodecMsgpackMarshaler()
@@ -78,55 +62,38 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns i
} else {
return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
}
result := &RedisStorage{
db: pool,
ms: mrshler,
cacheCfg: cacheCfg,
poolConfig: connectionConf,
loadHistorySize: loadHistorySize}
return result, nil
}
func (config *PoolConnectionConfig) DialFunction(network string, addr string) (*redis.Client, error) {
client, err := redis.Dial(network, addr)
if err != nil {
rs = &RedisStorage{ms: mrshler, cacheCfg: cacheCfg, loadHistorySize: loadHistorySize, poolCfg: &PoolConfig{address: address, password: pass, dbNr: db, maxConns: maxConns}}
if err = rs.poolInit(); err != nil {
return nil, err
}
if len(config.password) != 0 {
if err = client.Cmd("AUTH", config.password).Err; err != nil {
client.Close()
return nil, err
}
}
if config.db != 0 {
if err = client.Cmd("SELECT", config.db).Err; err != nil {
client.Close()
return nil, err
}
}
return client, nil
return
}
func (config *PoolConnectionConfig) PoolCreation(maxConns int) (*pool.Pool, error) {
p, err := pool.NewCustom("tcp", config.address, maxConns, config.DialFunction)
if err != nil {
return nil, err
}
return p, nil
}
func (rs *RedisStorage) IncreasePool() error {
number_of_pools := (rs.poolConfig.maxConns - rs.db.Avail())
utils.Logger.Warning(fmt.Sprintf("RedisClient increase pool size in %d (Current size is %d)",
number_of_pools, rs.db.Avail()))
for i := 0; i < number_of_pools; i++ {
conn, err := rs.db.Get()
// PoolInit is used to initialize rs.dbPool (at connect or reconnect)
func (rs *RedisStorage) poolInit() (err error) {
newConnectFunc := func(network, addr string) (*redis.Client, error) {
client, err := redis.Dial(network, addr)
if err != nil {
return err
return nil, err
}
rs.db.Put(conn)
if len(rs.poolCfg.password) != 0 {
if err = client.Cmd("AUTH", rs.poolCfg.password).Err; err != nil {
client.Close()
return nil, err
}
}
if rs.poolCfg.dbNr != 0 {
if err = client.Cmd("SELECT", rs.poolCfg.dbNr).Err; err != nil {
client.Close()
return nil, err
}
}
return client, nil
}
return nil
rs.poolLock.Lock()
rs.dbPool, err = pool.NewCustom("tcp", rs.poolCfg.address, rs.poolCfg.maxConns, newConnectFunc)
rs.poolLock.Unlock()
return err
}
// This CMD function get a connection from the pool. Send the command and if
@@ -134,22 +101,41 @@ func (rs *RedisStorage) IncreasePool() error {
// increase the pool with a new connection. Good for timeouts
func (rs *RedisStorage) Cmd(cmd string, args ...interface{}) *redis.Resp {
c, err := rs.db.Get()
rs.poolLock.RLock()
c1, err := rs.dbPool.Get()
rs.poolLock.RUnlock()
if err != nil {
return redis.NewResp(err)
}
result := c.Cmd(cmd, args...)
if result.IsType(redis.IOErr) {
rs.IncreasePool()
utils.Logger.Warning(fmt.Sprintf("RedisClient error '%s'", result.String()))
return rs.Cmd(cmd, args...)
result := c1.Cmd(cmd, args...)
if result.IsType(redis.IOErr) { // Failover mecanism
utils.Logger.Warning(fmt.Sprintf("<RedisStorage> error <%s>, attempting failover.", result.Err.Error()))
for i := 0; i < 2; i++ { // Two attempts, one on connection of original pool, one on new pool
rs.poolLock.RLock()
c2, err := rs.dbPool.Get()
rs.poolLock.RUnlock()
if err == nil {
if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) {
rs.poolLock.RLock()
rs.dbPool.Put(c2)
rs.poolLock.RUnlock()
return result2
}
}
if err := rs.poolInit(); err != nil {
break
}
}
} else {
rs.poolLock.RLock()
rs.dbPool.Put(c1)
rs.poolLock.RUnlock()
}
rs.db.Put(c)
return result
}
func (rs *RedisStorage) Close() {
rs.db.Empty()
rs.dbPool.Empty()
}
func (rs *RedisStorage) Flush(ignore string) error {