diff --git a/data/conf/samples/tutsentinel/cgrates.json b/data/conf/samples/tutsentinel/cgrates.json index 343a33f94..5ef6c1530 100755 --- a/data/conf/samples/tutsentinel/cgrates.json +++ b/data/conf/samples/tutsentinel/cgrates.json @@ -17,7 +17,7 @@ "data_db": { // database used to store runtime data (eg: accounts, cdr stats) "db_type": "redis", // data_db type: - "db_port": 16381, // data_db port to reach the database + "db_host":"127.0.0.1:16381;127.0.0.1:16382;127.0.0.1:16383", "db_name": "10", // data_db database name to connect to "redis_sentinel":"redis-cluster", }, diff --git a/data/sentinel/node1.conf b/data/sentinel/node1.conf index 4c0beff67..ec471c0e0 100755 --- a/data/sentinel/node1.conf +++ b/data/sentinel/node1.conf @@ -1,4 +1,4 @@ bind localhost port 16379 -dir . +dir "/home/teo" diff --git a/data/sentinel/node2.conf b/data/sentinel/node2.conf index a8aeb14cb..220a14cd3 100755 --- a/data/sentinel/node2.conf +++ b/data/sentinel/node2.conf @@ -1,6 +1,6 @@ bind localhost port 16380 -dir . +dir "/home/teo" -slaveof localhost 16379 \ No newline at end of file +slaveof 127.0.0.1 16379 \ No newline at end of file diff --git a/data/sentinel/sentinel1.conf b/data/sentinel/sentinel1.conf index 1c30f7dff..5b72aed5f 100755 --- a/data/sentinel/sentinel1.conf +++ b/data/sentinel/sentinel1.conf @@ -10,4 +10,4 @@ port 16381 sentinel monitor redis-cluster 127.0.0.1 16379 1 sentinel down-after-milliseconds redis-cluster 5000 sentinel failover-timeout redis-cluster 10000 -sentinel config-epoch redis-cluster 90 +sentinel config-epoch redis-cluster 92 diff --git a/data/sentinel/sentinel2.conf b/data/sentinel/sentinel2.conf new file mode 100755 index 000000000..5ccdd8264 --- /dev/null +++ b/data/sentinel/sentinel2.conf @@ -0,0 +1,13 @@ +# Host and port we will listen for requests on +bind localhost +port 16382 + +# +# "redis-cluster" is the name of our cluster +# +# each sentinel process is paired with a redis-server process +# +sentinel monitor redis-cluster 127.0.0.1 16379 1 +sentinel down-after-milliseconds redis-cluster 5000 +sentinel failover-timeout redis-cluster 10000 +sentinel config-epoch redis-cluster 92 diff --git a/data/sentinel/sentinel3.conf b/data/sentinel/sentinel3.conf new file mode 100755 index 000000000..296605514 --- /dev/null +++ b/data/sentinel/sentinel3.conf @@ -0,0 +1,13 @@ +# Host and port we will listen for requests on +bind localhost +port 16383 + +# +# "redis-cluster" is the name of our cluster +# +# each sentinel process is paired with a redis-server process +# +sentinel monitor redis-cluster 127.0.0.1 16379 1 +sentinel down-after-milliseconds redis-cluster 5000 +sentinel failover-timeout redis-cluster 10000 +sentinel config-epoch redis-cluster 92 \ No newline at end of file diff --git a/engine/storage_redis.go b/engine/storage_redis.go index d86471684..0a776fece 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -26,6 +26,7 @@ import ( "io/ioutil" "strconv" "strings" + "sync" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/guardian" @@ -37,12 +38,20 @@ import ( ) type RedisStorage struct { - dbPool *pool.Pool - maxConns int - ms Marshaler - cacheCfg config.CacheConfig - sentinelClient *sentinel.Client - sentinelName string + dbPool *pool.Pool + maxConns int + ms Marshaler + cacheCfg config.CacheConfig + sentinelName string + sentinelInsts []*sentinelInst + db int //database number used when recconect sentinel + pass string //password used when recconect sentinel + sentinelMux sync.RWMutex +} + +type sentinelInst struct { + addr string + conn *sentinel.Client } func NewRedisStorage(address string, db int, pass, mrshlerStr string, @@ -78,13 +87,19 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, } if sentinelName != "" { - client, err := sentinel.NewClientCustom("tcp", address, maxConns, df, sentinelName) - if err != nil { - return nil, err + var err error + addrs := strings.Split(address, ";") + sentinelInsts := make([]*sentinelInst, len(addrs)) + for i, addr := range addrs { + sentinelInsts[i] = &sentinelInst{addr: addr} + if sentinelInsts[i].conn, err = sentinel.NewClientCustom("tcp", + addr, maxConns, df, sentinelName); err != nil { + return nil, err + } } return &RedisStorage{maxConns: maxConns, ms: mrshler, - cacheCfg: cacheCfg, sentinelClient: client, - sentinelName: sentinelName}, nil + cacheCfg: cacheCfg, sentinelName: sentinelName, + sentinelInsts: sentinelInsts, db: db, pass: pass}, nil } else { p, err := pool.NewCustom("tcp", address, maxConns, df) if err != nil { @@ -95,17 +110,68 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, } } +func reconnectSentinel(addr, sentinelName string, db int, pass string, maxConns int) (*sentinel.Client, error) { + df := func(network, addr string) (*redis.Client, error) { + client, err := redis.Dial(network, addr) + if err != nil { + return nil, err + } + if len(pass) != 0 { + if err = client.Cmd("AUTH", pass).Err; err != nil { + client.Close() + return nil, err + } + } + if db != 0 { + if err = client.Cmd("SELECT", db).Err; err != nil { + client.Close() + return nil, err + } + } + return client, nil + } + return sentinel.NewClientCustom("tcp", addr, maxConns, df, sentinelName) +} + // This CMD function get a connection from the pool. // Handles automatic failover in case of network disconnects func (rs *RedisStorage) Cmd(cmd string, args ...interface{}) *redis.Resp { if rs.sentinelName != "" { - conn, err := rs.sentinelClient.GetMaster(rs.sentinelName) - if err != nil { - return redis.NewResp(err) + var err error + for i := range rs.sentinelInsts { + rs.sentinelMux.Lock() + + if rs.sentinelInsts[i].conn == nil { + rs.sentinelInsts[i].conn, err = reconnectSentinel(rs.sentinelInsts[i].addr, + rs.sentinelName, rs.db, rs.pass, rs.maxConns) + if err != nil { + if i == len(rs.sentinelInsts)-1 { + rs.sentinelMux.Unlock() + return redis.NewResp(fmt.Errorf("No sentinels active")) + } + rs.sentinelMux.Unlock() + continue + } + } + sConn := rs.sentinelInsts[i].conn + rs.sentinelMux.Unlock() + + conn, err := sConn.GetMaster(rs.sentinelName) + if err != nil { + if i == len(rs.sentinelInsts)-1 { + return redis.NewResp(fmt.Errorf("No sentinels active")) + } + rs.sentinelMux.Lock() + rs.sentinelInsts[i].conn = nil + rs.sentinelMux.Unlock() + utils.Logger.Warning(fmt.Sprintf(" sentinel error: %s ", + err.Error())) + continue + } + result := conn.Cmd(cmd, args...) + sConn.PutMaster(rs.sentinelName, conn) + return result } - result := conn.Cmd(cmd, args...) - rs.sentinelClient.PutMaster(rs.sentinelName, conn) - return result } c1, err := rs.dbPool.Get() @@ -131,7 +197,9 @@ func (rs *RedisStorage) Cmd(cmd string, args ...interface{}) *redis.Resp { } func (rs *RedisStorage) Close() { - rs.dbPool.Empty() + if rs.dbPool != nil { + rs.dbPool.Empty() + } } func (rs *RedisStorage) Flush(ignore string) error { diff --git a/engine/storage_utils.go b/engine/storage_utils.go index e1a57aabd..e506d5b23 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "strconv" + "strings" "time" "github.com/cgrates/cgrates/config" @@ -41,7 +42,7 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin utils.Logger.Crit("Redis db name must be an integer!") return nil, err } - if port != "" { + if port != "" && strings.Index(host, ":") == -1 { host += ":" + port } d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheCfg, sentinelName) diff --git a/general_tests/sentinel_it_test.go b/general_tests/sentinel_it_test.go index a7d0a6a58..e302cad55 100755 --- a/general_tests/sentinel_it_test.go +++ b/general_tests/sentinel_it_test.go @@ -22,7 +22,7 @@ package general_tests import ( "flag" - "fmt" + //"fmt" "net/rpc" "net/rpc/jsonrpc" "os/exec" @@ -36,14 +36,18 @@ import ( ) var ( - node1ConfigPath = path.Join(*dataDir, "sentinel", "node1.conf") - node2ConfigPath = path.Join(*dataDir, "sentinel", "node2.conf") - sentinelConfigPath = path.Join(*dataDir, "sentinel", "sentinel1.conf") - engineConfigPath = path.Join(*dataDir, "conf", "samples", "tutsentinel") - sentinelConfig *config.CGRConfig - sentinelRPC *rpc.Client - node1exec, node2exec, sentinelexec *exec.Cmd - redisSentinel = flag.Bool("redis_sentinel", false, "Run tests with redis sentinel") + node1ConfigPath = path.Join(*dataDir, "sentinel", "node1.conf") + node2ConfigPath = path.Join(*dataDir, "sentinel", "node2.conf") + sentinel1ConfigPath = path.Join(*dataDir, "sentinel", "sentinel1.conf") + sentinel2ConfigPath = path.Join(*dataDir, "sentinel", "sentinel2.conf") + sentinel3ConfigPath = path.Join(*dataDir, "sentinel", "sentinel3.conf") + engineConfigPath = path.Join(*dataDir, "conf", "samples", "tutsentinel") + sentinelConfig *config.CGRConfig + sentinelRPC *rpc.Client + node1exec, node2exec, + sentinelexec1, sentinelexec2, + sentinelexec3 *exec.Cmd + redisSentinel = flag.Bool("redis_sentinel", false, "Run tests with redis sentinel") ) var sTestsRds = []func(t *testing.T){ @@ -53,7 +57,9 @@ var sTestsRds = []func(t *testing.T){ testRedisSentinelStartEngine, testRedisSentinelRPCCon, testRedisSentinelSetGetAttribute, - testRedisSentinelShutDownNode1, + testRedisSentinelInsertion, + testRedisSentinelShutDownSentinel1, + testRedisSentinelInsertion2, testRedisSentinelGetAttrAfterFailover, testRedisSentinelKillEngine, } @@ -61,7 +67,9 @@ var sTestsRds = []func(t *testing.T){ // Before running these tests make sure node1.conf, node2.conf, sentinel1.conf are the next // Node1 will be master and start at port 16379 // Node2 will be slave of node1 and start at port 16380 -// Sentinel will be started at port 16381 and will watch Node1 +// Sentinel1 will be started at port 16381 and will watch Node1 +// Sentinel2 will be started at port 16382 and will watch Node1 +// Sentinel3 will be started at port 16383 and will watch Node1 func TestRedisSentinel(t *testing.T) { if !*redisSentinel { return @@ -80,8 +88,16 @@ func testRedisSentinelStartNodes(t *testing.T) { if err := node2exec.Start(); err != nil { t.Error(err) } - sentinelexec = exec.Command("redis-sentinel", sentinelConfigPath) - if err := sentinelexec.Start(); err != nil { + sentinelexec1 = exec.Command("redis-sentinel", sentinel1ConfigPath) + if err := sentinelexec1.Start(); err != nil { + t.Error(err) + } + sentinelexec2 = exec.Command("redis-sentinel", sentinel2ConfigPath) + if err := sentinelexec2.Start(); err != nil { + t.Error(err) + } + sentinelexec3 = exec.Command("redis-sentinel", sentinel3ConfigPath) + if err := sentinelexec3.Start(); err != nil { t.Error(err) } } @@ -150,13 +166,97 @@ func testRedisSentinelSetGetAttribute(t *testing.T) { } } -// Here we kill node1 and sentinel will do failover and promote node2 to be master -func testRedisSentinelShutDownNode1(t *testing.T) { - if err := node1exec.Process.Kill(); err != nil { // Kill the master +func testRedisSentinelInsertion(t *testing.T) { + alsPrf := &engine.AttributeProfile{ + Tenant: "cgrates.org", + ID: "ApierTest", + Contexts: []string{utils.MetaSessionS, utils.MetaCDRs}, + FilterIDs: []string{"*string:Account:1001"}, + Attributes: []*engine.Attribute{ + &engine.Attribute{ + FieldName: utils.Subject, + Initial: utils.ANY, + Substitute: config.NewRSRParsersMustCompile("1001", true), + Append: true, + }, + }, + Weight: 20, + } + orgiginID := alsPrf.ID + "_" + id := alsPrf.ID + "_0" + index := 0 + var result string + for i := 0; i < 25; i++ { + t.Run("add", func(t *testing.T) { + t.Parallel() + alsPrf.ID = id + if err := sentinelRPC.Call("ApierV1.SetAttributeProfile", alsPrf, &result); err != nil { + t.Error(err) + } + index = index + 1 + id = orgiginID + string(index) + }) + t.Run("add2", func(t *testing.T) { + t.Parallel() + alsPrf.ID = id + if err := sentinelRPC.Call("ApierV1.SetAttributeProfile", alsPrf, &result); err != nil { + t.Error(err) + } + index = index + 1 + id = orgiginID + string(index) + }) + } +} + +// ShutDown first sentinel and the second one shoud take his place +func testRedisSentinelShutDownSentinel1(t *testing.T) { + if err := sentinelexec1.Process.Kill(); err != nil { // Kill the master t.Error(err) } } +func testRedisSentinelInsertion2(t *testing.T) { + alsPrf := &engine.AttributeProfile{ + Tenant: "cgrates.org", + ID: "ApierTest", + Contexts: []string{utils.MetaSessionS, utils.MetaCDRs}, + FilterIDs: []string{"*string:Account:1001"}, + Attributes: []*engine.Attribute{ + &engine.Attribute{ + FieldName: utils.Subject, + Initial: utils.ANY, + Substitute: config.NewRSRParsersMustCompile("1001", true), + Append: true, + }, + }, + Weight: 20, + } + orgiginID := alsPrf.ID + "_" + id := alsPrf.ID + "_0" + index := 0 + var result string + for i := 0; i < 25; i++ { + t.Run("add", func(t *testing.T) { + t.Parallel() + alsPrf.ID = id + if err := sentinelRPC.Call("ApierV1.SetAttributeProfile", alsPrf, &result); err != nil { + t.Error(err) + } + index = index + 1 + id = orgiginID + string(index) + }) + t.Run("add2", func(t *testing.T) { + t.Parallel() + alsPrf.ID = id + if err := sentinelRPC.Call("ApierV1.SetAttributeProfile", alsPrf, &result); err != nil { + t.Error(err) + } + index = index + 1 + id = orgiginID + string(index) + }) + } +} + // After we kill node1 check the data if was replicated in node2 func testRedisSentinelGetAttrAfterFailover(t *testing.T) { alsPrf := &engine.AttributeProfile{