Update redis sentinel to accept multiple sentinels and failover

This commit is contained in:
TeoV
2018-10-05 02:02:32 -04:00
committed by Dan Christian Bogos
parent fb02b8f179
commit 83f21c17fe
9 changed files with 235 additions and 40 deletions

View File

@@ -17,7 +17,7 @@
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <redis|mongo>
"db_port": 16381, // data_db port to reach the database
"db_host":"127.0.0.1:16381;127.0.0.1:16382;127.0.0.1:16383",
"db_name": "10", // data_db database name to connect to
"redis_sentinel":"redis-cluster",
},

View File

@@ -1,4 +1,4 @@
bind localhost
port 16379
dir .
dir "/home/teo"

View File

@@ -1,6 +1,6 @@
bind localhost
port 16380
dir .
dir "/home/teo"
slaveof localhost 16379
slaveof 127.0.0.1 16379

View File

@@ -10,4 +10,4 @@ port 16381
sentinel monitor redis-cluster 127.0.0.1 16379 1
sentinel down-after-milliseconds redis-cluster 5000
sentinel failover-timeout redis-cluster 10000
sentinel config-epoch redis-cluster 90
sentinel config-epoch redis-cluster 92

13
data/sentinel/sentinel2.conf Executable file
View File

@@ -0,0 +1,13 @@
# Host and port we will listen for requests on
bind localhost
port 16382
#
# "redis-cluster" is the name of our cluster
#
# each sentinel process is paired with a redis-server process
#
sentinel monitor redis-cluster 127.0.0.1 16379 1
sentinel down-after-milliseconds redis-cluster 5000
sentinel failover-timeout redis-cluster 10000
sentinel config-epoch redis-cluster 92

13
data/sentinel/sentinel3.conf Executable file
View File

@@ -0,0 +1,13 @@
# Host and port we will listen for requests on
bind localhost
port 16383
#
# "redis-cluster" is the name of our cluster
#
# each sentinel process is paired with a redis-server process
#
sentinel monitor redis-cluster 127.0.0.1 16379 1
sentinel down-after-milliseconds redis-cluster 5000
sentinel failover-timeout redis-cluster 10000
sentinel config-epoch redis-cluster 92

View File

@@ -26,6 +26,7 @@ import (
"io/ioutil"
"strconv"
"strings"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
@@ -37,12 +38,20 @@ import (
)
type RedisStorage struct {
dbPool *pool.Pool
maxConns int
ms Marshaler
cacheCfg config.CacheConfig
sentinelClient *sentinel.Client
sentinelName string
dbPool *pool.Pool
maxConns int
ms Marshaler
cacheCfg config.CacheConfig
sentinelName string
sentinelInsts []*sentinelInst
db int //database number used when recconect sentinel
pass string //password used when recconect sentinel
sentinelMux sync.RWMutex
}
type sentinelInst struct {
addr string
conn *sentinel.Client
}
func NewRedisStorage(address string, db int, pass, mrshlerStr string,
@@ -78,13 +87,19 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string,
}
if sentinelName != "" {
client, err := sentinel.NewClientCustom("tcp", address, maxConns, df, sentinelName)
if err != nil {
return nil, err
var err error
addrs := strings.Split(address, ";")
sentinelInsts := make([]*sentinelInst, len(addrs))
for i, addr := range addrs {
sentinelInsts[i] = &sentinelInst{addr: addr}
if sentinelInsts[i].conn, err = sentinel.NewClientCustom("tcp",
addr, maxConns, df, sentinelName); err != nil {
return nil, err
}
}
return &RedisStorage{maxConns: maxConns, ms: mrshler,
cacheCfg: cacheCfg, sentinelClient: client,
sentinelName: sentinelName}, nil
cacheCfg: cacheCfg, sentinelName: sentinelName,
sentinelInsts: sentinelInsts, db: db, pass: pass}, nil
} else {
p, err := pool.NewCustom("tcp", address, maxConns, df)
if err != nil {
@@ -95,17 +110,68 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string,
}
}
func reconnectSentinel(addr, sentinelName string, db int, pass string, maxConns int) (*sentinel.Client, error) {
df := func(network, addr string) (*redis.Client, error) {
client, err := redis.Dial(network, addr)
if err != nil {
return nil, err
}
if len(pass) != 0 {
if err = client.Cmd("AUTH", pass).Err; err != nil {
client.Close()
return nil, err
}
}
if db != 0 {
if err = client.Cmd("SELECT", db).Err; err != nil {
client.Close()
return nil, err
}
}
return client, nil
}
return sentinel.NewClientCustom("tcp", addr, maxConns, df, sentinelName)
}
// This CMD function get a connection from the pool.
// Handles automatic failover in case of network disconnects
func (rs *RedisStorage) Cmd(cmd string, args ...interface{}) *redis.Resp {
if rs.sentinelName != "" {
conn, err := rs.sentinelClient.GetMaster(rs.sentinelName)
if err != nil {
return redis.NewResp(err)
var err error
for i := range rs.sentinelInsts {
rs.sentinelMux.Lock()
if rs.sentinelInsts[i].conn == nil {
rs.sentinelInsts[i].conn, err = reconnectSentinel(rs.sentinelInsts[i].addr,
rs.sentinelName, rs.db, rs.pass, rs.maxConns)
if err != nil {
if i == len(rs.sentinelInsts)-1 {
rs.sentinelMux.Unlock()
return redis.NewResp(fmt.Errorf("No sentinels active"))
}
rs.sentinelMux.Unlock()
continue
}
}
sConn := rs.sentinelInsts[i].conn
rs.sentinelMux.Unlock()
conn, err := sConn.GetMaster(rs.sentinelName)
if err != nil {
if i == len(rs.sentinelInsts)-1 {
return redis.NewResp(fmt.Errorf("No sentinels active"))
}
rs.sentinelMux.Lock()
rs.sentinelInsts[i].conn = nil
rs.sentinelMux.Unlock()
utils.Logger.Warning(fmt.Sprintf("<RedisStorage> sentinel error: %s ",
err.Error()))
continue
}
result := conn.Cmd(cmd, args...)
sConn.PutMaster(rs.sentinelName, conn)
return result
}
result := conn.Cmd(cmd, args...)
rs.sentinelClient.PutMaster(rs.sentinelName, conn)
return result
}
c1, err := rs.dbPool.Get()
@@ -131,7 +197,9 @@ func (rs *RedisStorage) Cmd(cmd string, args ...interface{}) *redis.Resp {
}
func (rs *RedisStorage) Close() {
rs.dbPool.Empty()
if rs.dbPool != nil {
rs.dbPool.Empty()
}
}
func (rs *RedisStorage) Flush(ignore string) error {

View File

@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
@@ -41,7 +42,7 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin
utils.Logger.Crit("Redis db name must be an integer!")
return nil, err
}
if port != "" {
if port != "" && strings.Index(host, ":") == -1 {
host += ":" + port
}
d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheCfg, sentinelName)

View File

@@ -22,7 +22,7 @@ package general_tests
import (
"flag"
"fmt"
//"fmt"
"net/rpc"
"net/rpc/jsonrpc"
"os/exec"
@@ -36,14 +36,18 @@ import (
)
var (
node1ConfigPath = path.Join(*dataDir, "sentinel", "node1.conf")
node2ConfigPath = path.Join(*dataDir, "sentinel", "node2.conf")
sentinelConfigPath = path.Join(*dataDir, "sentinel", "sentinel1.conf")
engineConfigPath = path.Join(*dataDir, "conf", "samples", "tutsentinel")
sentinelConfig *config.CGRConfig
sentinelRPC *rpc.Client
node1exec, node2exec, sentinelexec *exec.Cmd
redisSentinel = flag.Bool("redis_sentinel", false, "Run tests with redis sentinel")
node1ConfigPath = path.Join(*dataDir, "sentinel", "node1.conf")
node2ConfigPath = path.Join(*dataDir, "sentinel", "node2.conf")
sentinel1ConfigPath = path.Join(*dataDir, "sentinel", "sentinel1.conf")
sentinel2ConfigPath = path.Join(*dataDir, "sentinel", "sentinel2.conf")
sentinel3ConfigPath = path.Join(*dataDir, "sentinel", "sentinel3.conf")
engineConfigPath = path.Join(*dataDir, "conf", "samples", "tutsentinel")
sentinelConfig *config.CGRConfig
sentinelRPC *rpc.Client
node1exec, node2exec,
sentinelexec1, sentinelexec2,
sentinelexec3 *exec.Cmd
redisSentinel = flag.Bool("redis_sentinel", false, "Run tests with redis sentinel")
)
var sTestsRds = []func(t *testing.T){
@@ -53,7 +57,9 @@ var sTestsRds = []func(t *testing.T){
testRedisSentinelStartEngine,
testRedisSentinelRPCCon,
testRedisSentinelSetGetAttribute,
testRedisSentinelShutDownNode1,
testRedisSentinelInsertion,
testRedisSentinelShutDownSentinel1,
testRedisSentinelInsertion2,
testRedisSentinelGetAttrAfterFailover,
testRedisSentinelKillEngine,
}
@@ -61,7 +67,9 @@ var sTestsRds = []func(t *testing.T){
// Before running these tests make sure node1.conf, node2.conf, sentinel1.conf are the next
// Node1 will be master and start at port 16379
// Node2 will be slave of node1 and start at port 16380
// Sentinel will be started at port 16381 and will watch Node1
// Sentinel1 will be started at port 16381 and will watch Node1
// Sentinel2 will be started at port 16382 and will watch Node1
// Sentinel3 will be started at port 16383 and will watch Node1
func TestRedisSentinel(t *testing.T) {
if !*redisSentinel {
return
@@ -80,8 +88,16 @@ func testRedisSentinelStartNodes(t *testing.T) {
if err := node2exec.Start(); err != nil {
t.Error(err)
}
sentinelexec = exec.Command("redis-sentinel", sentinelConfigPath)
if err := sentinelexec.Start(); err != nil {
sentinelexec1 = exec.Command("redis-sentinel", sentinel1ConfigPath)
if err := sentinelexec1.Start(); err != nil {
t.Error(err)
}
sentinelexec2 = exec.Command("redis-sentinel", sentinel2ConfigPath)
if err := sentinelexec2.Start(); err != nil {
t.Error(err)
}
sentinelexec3 = exec.Command("redis-sentinel", sentinel3ConfigPath)
if err := sentinelexec3.Start(); err != nil {
t.Error(err)
}
}
@@ -150,13 +166,97 @@ func testRedisSentinelSetGetAttribute(t *testing.T) {
}
}
// Here we kill node1 and sentinel will do failover and promote node2 to be master
func testRedisSentinelShutDownNode1(t *testing.T) {
if err := node1exec.Process.Kill(); err != nil { // Kill the master
func testRedisSentinelInsertion(t *testing.T) {
alsPrf := &engine.AttributeProfile{
Tenant: "cgrates.org",
ID: "ApierTest",
Contexts: []string{utils.MetaSessionS, utils.MetaCDRs},
FilterIDs: []string{"*string:Account:1001"},
Attributes: []*engine.Attribute{
&engine.Attribute{
FieldName: utils.Subject,
Initial: utils.ANY,
Substitute: config.NewRSRParsersMustCompile("1001", true),
Append: true,
},
},
Weight: 20,
}
orgiginID := alsPrf.ID + "_"
id := alsPrf.ID + "_0"
index := 0
var result string
for i := 0; i < 25; i++ {
t.Run("add", func(t *testing.T) {
t.Parallel()
alsPrf.ID = id
if err := sentinelRPC.Call("ApierV1.SetAttributeProfile", alsPrf, &result); err != nil {
t.Error(err)
}
index = index + 1
id = orgiginID + string(index)
})
t.Run("add2", func(t *testing.T) {
t.Parallel()
alsPrf.ID = id
if err := sentinelRPC.Call("ApierV1.SetAttributeProfile", alsPrf, &result); err != nil {
t.Error(err)
}
index = index + 1
id = orgiginID + string(index)
})
}
}
// ShutDown first sentinel and the second one shoud take his place
func testRedisSentinelShutDownSentinel1(t *testing.T) {
if err := sentinelexec1.Process.Kill(); err != nil { // Kill the master
t.Error(err)
}
}
func testRedisSentinelInsertion2(t *testing.T) {
alsPrf := &engine.AttributeProfile{
Tenant: "cgrates.org",
ID: "ApierTest",
Contexts: []string{utils.MetaSessionS, utils.MetaCDRs},
FilterIDs: []string{"*string:Account:1001"},
Attributes: []*engine.Attribute{
&engine.Attribute{
FieldName: utils.Subject,
Initial: utils.ANY,
Substitute: config.NewRSRParsersMustCompile("1001", true),
Append: true,
},
},
Weight: 20,
}
orgiginID := alsPrf.ID + "_"
id := alsPrf.ID + "_0"
index := 0
var result string
for i := 0; i < 25; i++ {
t.Run("add", func(t *testing.T) {
t.Parallel()
alsPrf.ID = id
if err := sentinelRPC.Call("ApierV1.SetAttributeProfile", alsPrf, &result); err != nil {
t.Error(err)
}
index = index + 1
id = orgiginID + string(index)
})
t.Run("add2", func(t *testing.T) {
t.Parallel()
alsPrf.ID = id
if err := sentinelRPC.Call("ApierV1.SetAttributeProfile", alsPrf, &result); err != nil {
t.Error(err)
}
index = index + 1
id = orgiginID + string(index)
})
}
}
// After we kill node1 check the data if was replicated in node2
func testRedisSentinelGetAttrAfterFailover(t *testing.T) {
alsPrf := &engine.AttributeProfile{