mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Simplified failover for redis, should hanle also timeouts
This commit is contained in:
@@ -24,7 +24,6 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -38,22 +37,37 @@ var (
|
||||
)
|
||||
|
||||
type RedisStorage struct {
|
||||
dbPool *pool.Pool
|
||||
maxConns int
|
||||
ms Marshaler
|
||||
cacheCfg *config.CacheConfig
|
||||
loadHistorySize int
|
||||
poolCfg *PoolConfig
|
||||
poolLock sync.RWMutex
|
||||
dbPool *pool.Pool
|
||||
}
|
||||
|
||||
type PoolConfig struct {
|
||||
address string
|
||||
password string
|
||||
dbNr int
|
||||
maxConns int
|
||||
}
|
||||
|
||||
func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheCfg *config.CacheConfig, loadHistorySize int) (rs *RedisStorage, err error) {
|
||||
func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns int, cacheCfg *config.CacheConfig, loadHistorySize int) (*RedisStorage, error) {
|
||||
df := func(network, addr string) (*redis.Client, error) {
|
||||
client, err := redis.Dial(network, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(pass) != 0 {
|
||||
if err = client.Cmd("AUTH", pass).Err; err != nil {
|
||||
client.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if db != 0 {
|
||||
if err = client.Cmd("SELECT", db).Err; err != nil {
|
||||
client.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
p, err := pool.NewCustom("tcp", address, maxConns, df)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var mrshler Marshaler
|
||||
if mrshlerStr == utils.MSGPACK {
|
||||
mrshler = NewCodecMsgpackMarshaler()
|
||||
@@ -62,81 +76,36 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns i
|
||||
} else {
|
||||
return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
|
||||
}
|
||||
rs = &RedisStorage{ms: mrshler, cacheCfg: cacheCfg, loadHistorySize: loadHistorySize, poolCfg: &PoolConfig{address: address, password: pass, dbNr: db, maxConns: maxConns}}
|
||||
if err = rs.poolInit(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// PoolInit is used to initialize rs.dbPool (at connect or reconnect)
|
||||
func (rs *RedisStorage) poolInit() (err error) {
|
||||
newConnectFunc := func(network, addr string) (*redis.Client, error) {
|
||||
client, err := redis.Dial(network, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(rs.poolCfg.password) != 0 {
|
||||
if err = client.Cmd("AUTH", rs.poolCfg.password).Err; err != nil {
|
||||
client.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if rs.poolCfg.dbNr != 0 {
|
||||
if err = client.Cmd("SELECT", rs.poolCfg.dbNr).Err; err != nil {
|
||||
client.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
rs.poolLock.Lock()
|
||||
rs.dbPool, err = pool.NewCustom("tcp", rs.poolCfg.address, rs.poolCfg.maxConns, newConnectFunc)
|
||||
rs.poolLock.Unlock()
|
||||
return err
|
||||
return &RedisStorage{dbPool: p, maxConns: maxConns, ms: mrshler, cacheCfg: cacheCfg, loadHistorySize: loadHistorySize}, nil
|
||||
}
|
||||
|
||||
// This CMD function get a connection from the pool.
|
||||
// Handles automatic failover in case of network disconnects
|
||||
func (rs *RedisStorage) Cmd(cmd string, args ...interface{}) *redis.Resp {
|
||||
rs.poolLock.RLock()
|
||||
c1, err := rs.dbPool.Get()
|
||||
rs.poolLock.RUnlock()
|
||||
if err != nil {
|
||||
return redis.NewResp(err)
|
||||
}
|
||||
result := c1.Cmd(cmd, args...)
|
||||
if result.IsType(redis.IOErr) { // Failover mecanism
|
||||
utils.Logger.Warning(fmt.Sprintf("<RedisStorage> error <%s>, attempting failover.", result.Err.Error()))
|
||||
for i := 0; i < 2; i++ { // Two attempts, one on connection of original pool, one on new pool
|
||||
rs.poolLock.RLock()
|
||||
for i := 0; i < rs.maxConns; i++ { // Two attempts, one on connection of original pool, one on new pool
|
||||
c2, err := rs.dbPool.Get()
|
||||
rs.poolLock.RUnlock()
|
||||
if err == nil {
|
||||
if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) {
|
||||
rs.poolLock.RLock()
|
||||
rs.dbPool.Put(c2)
|
||||
rs.poolLock.RUnlock()
|
||||
return result2
|
||||
}
|
||||
}
|
||||
rs.Close()
|
||||
if err := rs.poolInit(); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
rs.poolLock.RLock()
|
||||
rs.dbPool.Put(c1)
|
||||
rs.poolLock.RUnlock()
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) Close() {
|
||||
rs.poolLock.Lock()
|
||||
rs.dbPool.Empty()
|
||||
rs.poolLock.Unlock()
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) Flush(ignore string) error {
|
||||
|
||||
Reference in New Issue
Block a user