diff --git a/data/conf/samples/tutredis_cluster/cgrates.json b/data/conf/samples/tutredis_cluster/cgrates.json index aa24c58c3..611b995da 100755 --- a/data/conf/samples/tutredis_cluster/cgrates.json +++ b/data/conf/samples/tutredis_cluster/cgrates.json @@ -17,9 +17,10 @@ "data_db": { // database used to store runtime data (eg: accounts, cdr stats) "db_type": "redis", // data_db type: - "db_host":"127.0.0.1:7001;127.0.0.1:7002", + "db_host":"127.0.0.1:7001", "db_name": "10", // data_db database name to connect to "redis_cluster": true, + "cluster_sync": "100ms", // the sync interval for the redis cluster }, diff --git a/data/redis_cluster/node1.conf b/data/redis_cluster/node1.conf index 4a494b423..e06ce1120 100644 --- a/data/redis_cluster/node1.conf +++ b/data/redis_cluster/node1.conf @@ -3,7 +3,9 @@ port 7001 cluster-enabled yes cluster-config-file cluster-node-1.conf -cluster-node-timeout 5000 +cluster-node-timeout 500 appendonly yes dir "/tmp/cluster" + +slave-read-only no \ No newline at end of file diff --git a/data/redis_cluster/node2.conf b/data/redis_cluster/node2.conf index 65d1464ee..46f7588bf 100644 --- a/data/redis_cluster/node2.conf +++ b/data/redis_cluster/node2.conf @@ -3,7 +3,8 @@ port 7002 cluster-enabled yes cluster-config-file cluster-node-2.conf -cluster-node-timeout 5000 +cluster-node-timeout 500 appendonly yes dir "/tmp/cluster" +slave-read-only no \ No newline at end of file diff --git a/data/redis_cluster/node3.conf b/data/redis_cluster/node3.conf index c9e8a9ab8..4593d8d24 100644 --- a/data/redis_cluster/node3.conf +++ b/data/redis_cluster/node3.conf @@ -3,7 +3,8 @@ port 7003 cluster-enabled yes cluster-config-file cluster-node-3.conf -cluster-node-timeout 5000 +cluster-node-timeout 500 appendonly yes dir "/tmp/cluster" +slave-read-only no \ No newline at end of file diff --git a/data/redis_cluster/node4.conf b/data/redis_cluster/node4.conf index cc5f9df5a..5f75b35c5 100644 --- a/data/redis_cluster/node4.conf +++ b/data/redis_cluster/node4.conf @@ -3,7 +3,8 @@ port 7004 cluster-enabled yes cluster-config-file cluster-node-4.conf -cluster-node-timeout 5000 +cluster-node-timeout 500 appendonly yes dir "/tmp/cluster" +slave-read-only no \ No newline at end of file diff --git a/data/redis_cluster/node5.conf b/data/redis_cluster/node5.conf index 35e6fc98f..c0327ea80 100644 --- a/data/redis_cluster/node5.conf +++ b/data/redis_cluster/node5.conf @@ -3,7 +3,8 @@ port 7005 cluster-enabled yes cluster-config-file cluster-node-5.conf -cluster-node-timeout 5000 +cluster-node-timeout 500 appendonly yes dir "/tmp/cluster" +slave-read-only no \ No newline at end of file diff --git a/data/redis_cluster/node6.conf b/data/redis_cluster/node6.conf index eecfa9e94..9b1088da9 100644 --- a/data/redis_cluster/node6.conf +++ b/data/redis_cluster/node6.conf @@ -3,7 +3,8 @@ port 7006 cluster-enabled yes cluster-config-file cluster-node-6.conf -cluster-node-timeout 5000 +cluster-node-timeout 500 appendonly yes dir "/tmp/cluster" +slave-read-only no \ No newline at end of file diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 84b3ec31e..82079986a 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -69,9 +69,7 @@ const ( func NewRedisStorage(address string, db int, user, pass, mrshlerStr string, maxConns int, sentinelName string, isCluster bool, clusterSync, clusterOnDownDelay time.Duration) (rs *RedisStorage, err error) { - rs = new(RedisStorage) - if rs.ms, err = NewMarshaler(mrshlerStr); err != nil { rs = nil return diff --git a/general_tests/redis_cluster_it_test.go b/general_tests/redis_cluster_it_test.go index c6085bda2..0b1fdb16d 100644 --- a/general_tests/redis_cluster_it_test.go +++ b/general_tests/redis_cluster_it_test.go @@ -28,7 +28,9 @@ import ( "os" "os/exec" "path" + "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -78,10 +80,15 @@ var ( testClsrFlushDb, testClsrStartEngine, testClsrRPCConection, + testClsrSetGetAttribute, + testClsrStopMaster, + testClsrSetGetAttribute2, + testClsrReStartMaster, + testClsrGetAttribute, testClsrStopNodes, testClsrKillEngine, - testClsrPrintOutput, testClsrDeleteFolder, + // testClsrPrintOutput, } clsrRedisCliArgs = []string{ @@ -147,6 +154,7 @@ func testClsrCreateCluster(t *testing.T) { t.Errorf("Could not create the cluster because %s", err) t.Logf("The output was:\n %s", stdOut.String()) // print the output to debug the error } + time.Sleep(200 * time.Millisecond) } func testClsrInitConfig(t *testing.T) { @@ -165,7 +173,7 @@ func testClsrFlushDb(t *testing.T) { } func testClsrStartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(clsrEngineCfgPath, 2000); err != nil { + if _, err := engine.StopStartEngine(clsrEngineCfgPath, 200); err != nil { t.Fatal(err) } } @@ -178,6 +186,117 @@ func testClsrRPCConection(t *testing.T) { } } +func testClsrSetGetAttribute(t *testing.T) { + alsPrf := &engine.AttributeProfile{ + Tenant: "cgrates.org", + ID: "ClsrTest", + Contexts: []string{utils.MetaSessionS, utils.MetaCDRs}, + FilterIDs: []string{"*string:~*req.Account:1001"}, + Attributes: []*engine.Attribute{ + { + Path: utils.MetaReq + utils.NestingSep + utils.Subject, + Value: config.NewRSRParsersMustCompile("1001", utils.INFIELD_SEP), + }, + }, + Weight: 20, + } + alsPrf.Compile() + var result string + if err := clsrRPC.Call(utils.APIerSv1SetAttributeProfile, alsPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + var reply *engine.AttributeProfile + if err := clsrRPC.Call(utils.APIerSv1GetAttributeProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "ClsrTest"}, &reply); err != nil { + t.Fatal(err) + } + reply.Compile() + if !reflect.DeepEqual(alsPrf, reply) { + t.Errorf("Expecting : %+v, received: %+v", alsPrf, reply) + } +} + +func testClsrStopMaster(t *testing.T) { + path := fmt.Sprintf(clsrNodeCfgPath, 3) + if err = clsrNodes[path].Process.Kill(); err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) +} + +func testClsrSetGetAttribute2(t *testing.T) { + alsPrf := &engine.AttributeProfile{ + Tenant: "cgrates.org", + ID: "ClsrTest", + Contexts: []string{utils.MetaSessionS, utils.MetaCDRs}, + FilterIDs: []string{"*string:~*req.Account:1001"}, + Attributes: []*engine.Attribute{ + { + Path: utils.MetaReq + utils.NestingSep + utils.Subject, + Value: config.NewRSRParsersMustCompile("1001", utils.INFIELD_SEP), + }, + }, + Weight: 20, + } + alsPrf.Compile() + var reply *engine.AttributeProfile + if err := clsrRPC.Call(utils.APIerSv1GetAttributeProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "ClsrTest"}, &reply); err != nil { + t.Fatal(err) + } + reply.Compile() + if !reflect.DeepEqual(alsPrf, reply) { + t.Errorf("Expecting : %+v, received: %+v", alsPrf, reply) + } + // add another attribute + alsPrf.ID += "2" + var result string + if err := clsrRPC.Call(utils.APIerSv1SetAttributeProfile, alsPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func testClsrReStartMaster(t *testing.T) { + path := fmt.Sprintf(clsrNodeCfgPath, 3) + clsrNodes[path] = exec.Command(clsrRedisCmd, path) + clsrOutput[path] = bytes.NewBuffer(nil) + clsrNodes[path].Stdout = clsrOutput[path] + if err := clsrNodes[path].Start(); err != nil { + t.Fatalf("Could not start node %v because %s", 3, err) + } + time.Sleep(200 * time.Millisecond) +} + +func testClsrGetAttribute(t *testing.T) { + alsPrf := &engine.AttributeProfile{ + Tenant: "cgrates.org", + ID: "ClsrTest2", + Contexts: []string{utils.MetaSessionS, utils.MetaCDRs}, + FilterIDs: []string{"*string:~*req.Account:1001"}, + Attributes: []*engine.Attribute{ + { + Path: utils.MetaReq + utils.NestingSep + utils.Subject, + Value: config.NewRSRParsersMustCompile("1001", utils.INFIELD_SEP), + }, + }, + Weight: 20, + } + alsPrf.Compile() + var reply *engine.AttributeProfile + if err := clsrRPC.Call(utils.APIerSv1GetAttributeProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "ClsrTest2"}, &reply); err != nil { + t.Fatal(err) + } + reply.Compile() + if !reflect.DeepEqual(alsPrf, reply) { + t.Errorf("Expecting : %+v, received: %+v", alsPrf, reply) + } +} + func testClsrStopNodes(t *testing.T) { for path, node := range clsrNodes { if err := node.Process.Kill(); err != nil {