From f6e3744b00d1312f758eb41759bb09509bc157e4 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 8 Jul 2020 15:29:57 +0300 Subject: [PATCH 1/2] Add integration test for http concurrent requests --- apier/v1/concreqs_it_test.go | 74 ++++++++++++++++++++++++++++- apier/v1/core.go | 8 ++-- cmd/cgr-tester/parallel/parallel.go | 2 +- 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/apier/v1/concreqs_it_test.go b/apier/v1/concreqs_it_test.go index 088bc0788..abc2e4c3a 100644 --- a/apier/v1/concreqs_it_test.go +++ b/apier/v1/concreqs_it_test.go @@ -21,8 +21,13 @@ along with this program. If not, see package v1 import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" "net/rpc" "path" + "strings" "sync" "testing" "time" @@ -45,6 +50,8 @@ var ( testConcReqsRPCConn, testConcReqsBusyAPIs, testConcReqsQueueAPIs, + testConcReqsOnHTTPBusy, + testConcReqsOnHTTPQueue, testConcReqsKillEngine, } ) @@ -113,14 +120,17 @@ func testConcReqsBusyAPIs(t *testing.T) { } var failedAPIs int wg := new(sync.WaitGroup) + lock := new(sync.Mutex) for i := 0; i < 5; i++ { wg.Add(1) go func() { var resp string if err := concReqsRPC.Call(utils.CoreSv1Sleep, - &SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)}, + &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, &resp); err != nil { + lock.Lock() failedAPIs++ + lock.Unlock() wg.Done() return } @@ -143,7 +153,7 @@ func testConcReqsQueueAPIs(t *testing.T) { go func() { var resp string if err := concReqsRPC.Call(utils.CoreSv1Sleep, - &SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)}, + &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, &resp); err != nil { wg.Done() t.Error(err) @@ -155,7 +165,67 @@ func testConcReqsQueueAPIs(t *testing.T) { wg.Wait() } +func testConcReqsOnHTTPBusy(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_busy" { + t.SkipNow() + } + var fldAPIs int64 + wg := new(sync.WaitGroup) + lock := new(sync.Mutex) + for i := 0; i < 5; i++ { + wg.Add(1) + go func(index int) { + resp, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index)))) + if err != nil { + wg.Done() + t.Error(err) + return + } + contents, err := ioutil.ReadAll(resp.Body) + if err != nil { + wg.Done() + t.Error(err) + return + } + resp.Body.Close() + if strings.Contains(string(contents), "denying request due to maximum active requests reached") { + lock.Lock() + fldAPIs++ + lock.Unlock() + } + wg.Done() + return + }(i) + } + wg.Wait() + if fldAPIs < 2 { + t.Errorf("Expected at leat 2 APIs to wait") + } +} + +func testConcReqsOnHTTPQueue(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_queue" { + t.SkipNow() + } + wg := new(sync.WaitGroup) + for i := 0; i < 5; i++ { + wg.Add(1) + go func(index int) { + _, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index)))) + if err != nil { + wg.Done() + t.Error(err) + return + } + wg.Done() + return + }(i) + } + wg.Wait() +} + func testConcReqsKillEngine(t *testing.T) { + time.Sleep(100 * time.Millisecond) if err := engine.KillEngine(100); err != nil { t.Error(err) } diff --git a/apier/v1/core.go b/apier/v1/core.go index 80e2a965a..88cbbe096 100644 --- a/apier/v1/core.go +++ b/apier/v1/core.go @@ -50,13 +50,13 @@ func (cS *CoreSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) err return nil } -type SleepArgs struct { - SleepTime time.Duration +type DurationArgs struct { + DurationTime time.Duration } // Sleep is used to test the concurrent requests mechanism -func (cS *CoreSv1) Sleep(arg *SleepArgs, reply *string) error { - time.Sleep(arg.SleepTime) +func (cS *CoreSv1) Sleep(arg *DurationArgs, reply *string) error { + time.Sleep(arg.DurationTime) *reply = utils.OK return nil } diff --git a/cmd/cgr-tester/parallel/parallel.go b/cmd/cgr-tester/parallel/parallel.go index e63746b61..b5c08416d 100644 --- a/cmd/cgr-tester/parallel/parallel.go +++ b/cmd/cgr-tester/parallel/parallel.go @@ -33,8 +33,8 @@ func main() { log.Print("Start!") var wg sync.WaitGroup for i := 1; i < 1002; i++ { + wg.Add(1) go func(index int) { - wg.Add(1) resp, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "APIerSv1.SetAccount","params": [{"Tenant":"reglo","Account":"100%d","ActionPlanId":"PACKAGE_NEW_FOR795", "ReloadScheduler":false}], "id":%d}`, index, index)))) if err != nil { log.Print("Post error: ", err) From 56c2a1cde16bc677212d75c373a40ab22fd8183c Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 8 Jul 2020 18:02:37 +0300 Subject: [PATCH 2/2] Test concurrent request mechanism for BiRPC --- apier/v1/concreqs_it_test.go | 68 ++++++++++++++++++- apier/v1/sessionsbirpc.go | 10 +++ data/conf/samples/conc_reqs_busy/cgrates.json | 18 +++-- .../conf/samples/conc_reqs_queue/cgrates.json | 6 ++ utils/concreqs_bijson_codec.go | 3 + utils/consts.go | 1 + 6 files changed, 99 insertions(+), 7 deletions(-) diff --git a/apier/v1/concreqs_it_test.go b/apier/v1/concreqs_it_test.go index abc2e4c3a..9d56f5724 100644 --- a/apier/v1/concreqs_it_test.go +++ b/apier/v1/concreqs_it_test.go @@ -32,16 +32,18 @@ import ( "testing" "time" - "github.com/cgrates/cgrates/utils" + "github.com/cenkalti/rpc2" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) var ( concReqsCfgPath string concReqsCfg *config.CGRConfig concReqsRPC *rpc.Client + concReqsBiRPC *rpc2.Client concReqsConfigDIR string //run tests for specific configuration sTestsConcReqs = []func(t *testing.T){ @@ -52,6 +54,8 @@ var ( testConcReqsQueueAPIs, testConcReqsOnHTTPBusy, testConcReqsOnHTTPQueue, + testConcReqsOnBiJSONBusy, + testConcReqsOnBiJSONQueue, testConcReqsKillEngine, } ) @@ -105,6 +109,12 @@ func testConcReqsStartEngine(t *testing.T) { } } +func handlePing(clnt *rpc2.Client, arg *DurationArgs, reply *string) error { + time.Sleep(arg.DurationTime) + *reply = utils.OK + return nil +} + // Connect rpc client to rater func testConcReqsRPCConn(t *testing.T) { var err error @@ -112,6 +122,10 @@ func testConcReqsRPCConn(t *testing.T) { if err != nil { t.Fatal(err) } + if concReqsBiRPC, err = utils.NewBiJSONrpcClient(concReqsCfg.SessionSCfg().ListenBijson, + nil); err != nil { + t.Fatal(err) + } } func testConcReqsBusyAPIs(t *testing.T) { @@ -224,6 +238,58 @@ func testConcReqsOnHTTPQueue(t *testing.T) { wg.Wait() } +func testConcReqsOnBiJSONBusy(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_busy" { + t.SkipNow() + } + var failedAPIs int + wg := new(sync.WaitGroup) + lock := new(sync.Mutex) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + var resp string + if err := concReqsBiRPC.Call(utils.SessionSv1Sleep, + &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, + &resp); err != nil { + fmt.Println(err) + lock.Lock() + failedAPIs++ + lock.Unlock() + wg.Done() + return + } + wg.Done() + }() + } + wg.Wait() + if failedAPIs < 2 { + t.Errorf("Expected at leat 2 APIs to wait") + } +} + +func testConcReqsOnBiJSONQueue(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_queue" { + t.SkipNow() + } + wg := new(sync.WaitGroup) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + var resp string + if err := concReqsBiRPC.Call(utils.SessionSv1Sleep, + &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, + &resp); err != nil { + wg.Done() + t.Error(err) + return + } + wg.Done() + }() + } + wg.Wait() +} + func testConcReqsKillEngine(t *testing.T) { time.Sleep(100 * time.Millisecond) if err := engine.KillEngine(100); err != nil { diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index 4ab290d34..a288166b9 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -19,6 +19,8 @@ along with this program. If not, see package v1 import ( + "time" + "github.com/cenkalti/rpc2" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" @@ -58,6 +60,8 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} { utils.SessionSv1STIRAuthenticate: ssv1.BiRPCV1STIRAuthenticate, utils.SessionSv1STIRIdentity: ssv1.BiRPCV1STIRIdentity, + + utils.SessionSv1Sleep: ssv1.BiRPCV1Sleep, // Sleep method is used to test the concurrent requests mechanism } } @@ -192,3 +196,9 @@ func (ssv1 *SessionSv1) BiRPCV1STIRIdentity(clnt *rpc2.Client, args *sessions.V1STIRIdentityArgs, reply *string) error { return ssv1.Ss.BiRPCv1STIRIdentity(nil, args, reply) } + +func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *DurationArgs, reply *string) error { + time.Sleep(arg.DurationTime) + *reply = utils.OK + return nil +} diff --git a/data/conf/samples/conc_reqs_busy/cgrates.json b/data/conf/samples/conc_reqs_busy/cgrates.json index 5aa2d7adc..d10fb36cc 100644 --- a/data/conf/samples/conc_reqs_busy/cgrates.json +++ b/data/conf/samples/conc_reqs_busy/cgrates.json @@ -5,32 +5,38 @@ "node_id": "ConcurrentBusyEngine", "reply_timeout": "50s", "concurrent_requests": 2, - "concurrent_strategy": "*busy", + "concurrent_strategy": "*busy" }, "listen": { "rpc_json": ":2012", "rpc_gob": ":2013", - "http": ":2080", + "http": ":2080" }, "data_db": { // database used to store runtime data (eg: accounts, cdr stats) "db_type": "redis", // data_db type: "db_port": 6379, // data_db port to reach the database - "db_name": "10", // data_db database name to connect to + "db_name": "10" // data_db database name to connect to }, "stor_db": { - "db_password": "CGRateS.org", + "db_password": "CGRateS.org" }, "apiers": { - "enabled": true, + "enabled": true }, -} +"sessions": { + "enabled": true, + "listen_bijson": "127.0.0.1:2014" +}, + + +}, diff --git a/data/conf/samples/conc_reqs_queue/cgrates.json b/data/conf/samples/conc_reqs_queue/cgrates.json index b0ecf40db..300b02506 100644 --- a/data/conf/samples/conc_reqs_queue/cgrates.json +++ b/data/conf/samples/conc_reqs_queue/cgrates.json @@ -32,4 +32,10 @@ }, +"sessions": { + "enabled": true, + "listen_bijson": "127.0.0.1:2014" +}, + + } diff --git a/utils/concreqs_bijson_codec.go b/utils/concreqs_bijson_codec.go index 4e70b6fb4..f4730c5ef 100644 --- a/utils/concreqs_bijson_codec.go +++ b/utils/concreqs_bijson_codec.go @@ -154,6 +154,9 @@ func (c *concReqsBiJSONCoded) ReadRequestBody(x interface{}) error { } func (c *concReqsBiJSONCoded) ReadResponseBody(x interface{}) error { + if err := ConReqs.Allocate(); err != nil { + return err + } if x == nil { return nil } diff --git a/utils/consts.go b/utils/consts.go index b33883b56..81151e40f 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1442,6 +1442,7 @@ const ( SessionSv1DisconnectWarning = "SessionSv1.DisconnectWarning" SessionSv1STIRAuthenticate = "SessionSv1.STIRAuthenticate" SessionSv1STIRIdentity = "SessionSv1.STIRIdentity" + SessionSv1Sleep = "SessionSv1.Sleep" ) // Responder APIs