From fd0cc1717e1025375533d653fcfded701ef505d2 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 4 Oct 2016 17:16:20 +0200 Subject: [PATCH] Improved redis reconnects --- engine/storage_redis.go | 144 ++++++++++++++++++---------------------- 1 file changed, 65 insertions(+), 79 deletions(-) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index a01310561..c2a6a4869 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -22,13 +22,15 @@ import ( "compress/zlib" "errors" "fmt" + "io/ioutil" + "strings" + "sync" + "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/mediocregopher/radix.v2/pool" "github.com/mediocregopher/radix.v2/redis" - "io/ioutil" - "strings" ) var ( @@ -36,40 +38,22 @@ var ( ) type RedisStorage struct { - db *pool.Pool ms Marshaler cacheCfg *config.CacheConfig loadHistorySize int - poolConfig PoolConnectionConfig + poolCfg *PoolConfig + poolLock sync.RWMutex + dbPool *pool.Pool } -type PoolConnectionConfig struct { - address string - password string - db int - mrshlerStr string - maxConns int - cacheCfg *config.CacheConfig - loadHistorySize int +type PoolConfig struct { + address string + password string + dbNr int + maxConns int } -func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheCfg *config.CacheConfig, loadHistorySize int) (*RedisStorage, error) { - - connectionConf := PoolConnectionConfig{ - address: address, - db: db, - password: pass, - mrshlerStr: mrshlerStr, - maxConns: maxConns, - cacheCfg: cacheCfg, - loadHistorySize: loadHistorySize, - } - - pool, err := connectionConf.PoolCreation(maxConns) - if err != nil { - return nil, err - } - +func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheCfg *config.CacheConfig, loadHistorySize int) (rs *RedisStorage, err error) { var mrshler Marshaler if mrshlerStr == utils.MSGPACK { mrshler = NewCodecMsgpackMarshaler() @@ -78,55 +62,38 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns i } else { return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) } - result := &RedisStorage{ - db: pool, - ms: mrshler, - cacheCfg: cacheCfg, - poolConfig: connectionConf, - loadHistorySize: loadHistorySize} - return result, nil -} - -func (config *PoolConnectionConfig) DialFunction(network string, addr string) (*redis.Client, error) { - client, err := redis.Dial(network, addr) - if err != nil { + rs = &RedisStorage{ms: mrshler, cacheCfg: cacheCfg, loadHistorySize: loadHistorySize, poolCfg: &PoolConfig{address: address, password: pass, dbNr: db, maxConns: maxConns}} + if err = rs.poolInit(); err != nil { return nil, err } - if len(config.password) != 0 { - if err = client.Cmd("AUTH", config.password).Err; err != nil { - client.Close() - return nil, err - } - } - if config.db != 0 { - if err = client.Cmd("SELECT", config.db).Err; err != nil { - client.Close() - return nil, err - } - } - return client, nil + return } -func (config *PoolConnectionConfig) PoolCreation(maxConns int) (*pool.Pool, error) { - p, err := pool.NewCustom("tcp", config.address, maxConns, config.DialFunction) - if err != nil { - return nil, err - } - return p, nil -} - -func (rs *RedisStorage) IncreasePool() error { - number_of_pools := (rs.poolConfig.maxConns - rs.db.Avail()) - utils.Logger.Warning(fmt.Sprintf("RedisClient increase pool size in %d (Current size is %d)", - number_of_pools, rs.db.Avail())) - for i := 0; i < number_of_pools; i++ { - conn, err := rs.db.Get() +// PoolInit is used to initialize rs.dbPool (at connect or reconnect) +func (rs *RedisStorage) poolInit() (err error) { + newConnectFunc := func(network, addr string) (*redis.Client, error) { + client, err := redis.Dial(network, addr) if err != nil { - return err + return nil, err } - rs.db.Put(conn) + if len(rs.poolCfg.password) != 0 { + if err = client.Cmd("AUTH", rs.poolCfg.password).Err; err != nil { + client.Close() + return nil, err + } + } + if rs.poolCfg.dbNr != 0 { + if err = client.Cmd("SELECT", rs.poolCfg.dbNr).Err; err != nil { + client.Close() + return nil, err + } + } + return client, nil } - return nil + rs.poolLock.Lock() + rs.dbPool, err = pool.NewCustom("tcp", rs.poolCfg.address, rs.poolCfg.maxConns, newConnectFunc) + rs.poolLock.Unlock() + return err } // This CMD function get a connection from the pool. Send the command and if @@ -134,22 +101,41 @@ func (rs *RedisStorage) IncreasePool() error { // increase the pool with a new connection. Good for timeouts func (rs *RedisStorage) Cmd(cmd string, args ...interface{}) *redis.Resp { - c, err := rs.db.Get() + rs.poolLock.RLock() + c1, err := rs.dbPool.Get() + rs.poolLock.RUnlock() if err != nil { return redis.NewResp(err) } - result := c.Cmd(cmd, args...) - if result.IsType(redis.IOErr) { - rs.IncreasePool() - utils.Logger.Warning(fmt.Sprintf("RedisClient error '%s'", result.String())) - return rs.Cmd(cmd, args...) + result := c1.Cmd(cmd, args...) + if result.IsType(redis.IOErr) { // Failover mecanism + utils.Logger.Warning(fmt.Sprintf(" error <%s>, attempting failover.", result.Err.Error())) + for i := 0; i < 2; i++ { // Two attempts, one on connection of original pool, one on new pool + rs.poolLock.RLock() + c2, err := rs.dbPool.Get() + rs.poolLock.RUnlock() + if err == nil { + if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) { + rs.poolLock.RLock() + rs.dbPool.Put(c2) + rs.poolLock.RUnlock() + return result2 + } + } + if err := rs.poolInit(); err != nil { + break + } + } + } else { + rs.poolLock.RLock() + rs.dbPool.Put(c1) + rs.poolLock.RUnlock() } - rs.db.Put(c) return result } func (rs *RedisStorage) Close() { - rs.db.Empty() + rs.dbPool.Empty() } func (rs *RedisStorage) Flush(ignore string) error {