diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 2ef9e5ce6..a52ccf685 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -24,7 +24,6 @@ import ( "fmt" "io/ioutil" "strings" - "sync" "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/config" @@ -38,22 +37,37 @@ var ( ) type RedisStorage struct { + dbPool *pool.Pool + maxConns int ms Marshaler cacheCfg *config.CacheConfig loadHistorySize int - poolCfg *PoolConfig - poolLock sync.RWMutex - dbPool *pool.Pool } -type PoolConfig struct { - address string - password string - dbNr int - maxConns int -} - -func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheCfg *config.CacheConfig, loadHistorySize int) (rs *RedisStorage, err error) { +func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheCfg *config.CacheConfig, loadHistorySize int) (*RedisStorage, error) { + df := func(network, addr string) (*redis.Client, error) { + client, err := redis.Dial(network, addr) + if err != nil { + return nil, err + } + if len(pass) != 0 { + if err = client.Cmd("AUTH", pass).Err; err != nil { + client.Close() + return nil, err + } + } + if db != 0 { + if err = client.Cmd("SELECT", db).Err; err != nil { + client.Close() + return nil, err + } + } + return client, nil + } + p, err := pool.NewCustom("tcp", address, maxConns, df) + if err != nil { + return nil, err + } var mrshler Marshaler if mrshlerStr == utils.MSGPACK { mrshler = NewCodecMsgpackMarshaler() @@ -62,81 +76,36 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns i } else { return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) } - rs = &RedisStorage{ms: mrshler, cacheCfg: cacheCfg, loadHistorySize: loadHistorySize, poolCfg: &PoolConfig{address: address, password: pass, dbNr: db, maxConns: maxConns}} - if err = rs.poolInit(); err != nil { - return nil, err - } - return -} - -// PoolInit is used to initialize rs.dbPool (at connect or reconnect) -func (rs *RedisStorage) poolInit() (err error) { - newConnectFunc := func(network, addr string) (*redis.Client, error) { - client, err := redis.Dial(network, addr) - if err != nil { - return nil, err - } - if len(rs.poolCfg.password) != 0 { - if err = client.Cmd("AUTH", rs.poolCfg.password).Err; err != nil { - client.Close() - return nil, err - } - } - if rs.poolCfg.dbNr != 0 { - if err = client.Cmd("SELECT", rs.poolCfg.dbNr).Err; err != nil { - client.Close() - return nil, err - } - } - return client, nil - } - rs.poolLock.Lock() - rs.dbPool, err = pool.NewCustom("tcp", rs.poolCfg.address, rs.poolCfg.maxConns, newConnectFunc) - rs.poolLock.Unlock() - return err + return &RedisStorage{dbPool: p, maxConns: maxConns, ms: mrshler, cacheCfg: cacheCfg, loadHistorySize: loadHistorySize}, nil } // This CMD function get a connection from the pool. // Handles automatic failover in case of network disconnects func (rs *RedisStorage) Cmd(cmd string, args ...interface{}) *redis.Resp { - rs.poolLock.RLock() c1, err := rs.dbPool.Get() - rs.poolLock.RUnlock() if err != nil { return redis.NewResp(err) } result := c1.Cmd(cmd, args...) if result.IsType(redis.IOErr) { // Failover mecanism utils.Logger.Warning(fmt.Sprintf(" error <%s>, attempting failover.", result.Err.Error())) - for i := 0; i < 2; i++ { // Two attempts, one on connection of original pool, one on new pool - rs.poolLock.RLock() + for i := 0; i < rs.maxConns; i++ { // Two attempts, one on connection of original pool, one on new pool c2, err := rs.dbPool.Get() - rs.poolLock.RUnlock() if err == nil { if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) { - rs.poolLock.RLock() rs.dbPool.Put(c2) - rs.poolLock.RUnlock() return result2 } } - rs.Close() - if err := rs.poolInit(); err != nil { - break - } } } else { - rs.poolLock.RLock() rs.dbPool.Put(c1) - rs.poolLock.RUnlock() } return result } func (rs *RedisStorage) Close() { - rs.poolLock.Lock() rs.dbPool.Empty() - rs.poolLock.Unlock() } func (rs *RedisStorage) Flush(ignore string) error {