mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-12 18:46:24 +05:00
Merge pull request #2272 from TeoV/master
Add integration test for http concurrent requests
This commit is contained in:
@@ -21,22 +21,29 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package v1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/rpc"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cenkalti/rpc2"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
concReqsCfgPath string
|
||||
concReqsCfg *config.CGRConfig
|
||||
concReqsRPC *rpc.Client
|
||||
concReqsBiRPC *rpc2.Client
|
||||
concReqsConfigDIR string //run tests for specific configuration
|
||||
|
||||
sTestsConcReqs = []func(t *testing.T){
|
||||
@@ -45,6 +52,10 @@ var (
|
||||
testConcReqsRPCConn,
|
||||
testConcReqsBusyAPIs,
|
||||
testConcReqsQueueAPIs,
|
||||
testConcReqsOnHTTPBusy,
|
||||
testConcReqsOnHTTPQueue,
|
||||
testConcReqsOnBiJSONBusy,
|
||||
testConcReqsOnBiJSONQueue,
|
||||
testConcReqsKillEngine,
|
||||
}
|
||||
)
|
||||
@@ -98,6 +109,12 @@ func testConcReqsStartEngine(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func handlePing(clnt *rpc2.Client, arg *DurationArgs, reply *string) error {
|
||||
time.Sleep(arg.DurationTime)
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// Connect rpc client to rater
|
||||
func testConcReqsRPCConn(t *testing.T) {
|
||||
var err error
|
||||
@@ -105,6 +122,10 @@ func testConcReqsRPCConn(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if concReqsBiRPC, err = utils.NewBiJSONrpcClient(concReqsCfg.SessionSCfg().ListenBijson,
|
||||
nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testConcReqsBusyAPIs(t *testing.T) {
|
||||
@@ -113,14 +134,17 @@ func testConcReqsBusyAPIs(t *testing.T) {
|
||||
}
|
||||
var failedAPIs int
|
||||
wg := new(sync.WaitGroup)
|
||||
lock := new(sync.Mutex)
|
||||
for i := 0; i < 5; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
var resp string
|
||||
if err := concReqsRPC.Call(utils.CoreSv1Sleep,
|
||||
&SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)},
|
||||
&DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
|
||||
&resp); err != nil {
|
||||
lock.Lock()
|
||||
failedAPIs++
|
||||
lock.Unlock()
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
@@ -143,7 +167,118 @@ func testConcReqsQueueAPIs(t *testing.T) {
|
||||
go func() {
|
||||
var resp string
|
||||
if err := concReqsRPC.Call(utils.CoreSv1Sleep,
|
||||
&SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)},
|
||||
&DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
|
||||
&resp); err != nil {
|
||||
wg.Done()
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func testConcReqsOnHTTPBusy(t *testing.T) {
|
||||
if concReqsConfigDIR != "conc_reqs_busy" {
|
||||
t.SkipNow()
|
||||
}
|
||||
var fldAPIs int64
|
||||
wg := new(sync.WaitGroup)
|
||||
lock := new(sync.Mutex)
|
||||
for i := 0; i < 5; i++ {
|
||||
wg.Add(1)
|
||||
go func(index int) {
|
||||
resp, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index))))
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
contents, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
resp.Body.Close()
|
||||
if strings.Contains(string(contents), "denying request due to maximum active requests reached") {
|
||||
lock.Lock()
|
||||
fldAPIs++
|
||||
lock.Unlock()
|
||||
}
|
||||
wg.Done()
|
||||
return
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
if fldAPIs < 2 {
|
||||
t.Errorf("Expected at leat 2 APIs to wait")
|
||||
}
|
||||
}
|
||||
|
||||
func testConcReqsOnHTTPQueue(t *testing.T) {
|
||||
if concReqsConfigDIR != "conc_reqs_queue" {
|
||||
t.SkipNow()
|
||||
}
|
||||
wg := new(sync.WaitGroup)
|
||||
for i := 0; i < 5; i++ {
|
||||
wg.Add(1)
|
||||
go func(index int) {
|
||||
_, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index))))
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
wg.Done()
|
||||
return
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func testConcReqsOnBiJSONBusy(t *testing.T) {
|
||||
if concReqsConfigDIR != "conc_reqs_busy" {
|
||||
t.SkipNow()
|
||||
}
|
||||
var failedAPIs int
|
||||
wg := new(sync.WaitGroup)
|
||||
lock := new(sync.Mutex)
|
||||
for i := 0; i < 5; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
var resp string
|
||||
if err := concReqsBiRPC.Call(utils.SessionSv1Sleep,
|
||||
&DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
|
||||
&resp); err != nil {
|
||||
fmt.Println(err)
|
||||
lock.Lock()
|
||||
failedAPIs++
|
||||
lock.Unlock()
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if failedAPIs < 2 {
|
||||
t.Errorf("Expected at leat 2 APIs to wait")
|
||||
}
|
||||
}
|
||||
|
||||
func testConcReqsOnBiJSONQueue(t *testing.T) {
|
||||
if concReqsConfigDIR != "conc_reqs_queue" {
|
||||
t.SkipNow()
|
||||
}
|
||||
wg := new(sync.WaitGroup)
|
||||
for i := 0; i < 5; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
var resp string
|
||||
if err := concReqsBiRPC.Call(utils.SessionSv1Sleep,
|
||||
&DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
|
||||
&resp); err != nil {
|
||||
wg.Done()
|
||||
t.Error(err)
|
||||
@@ -156,6 +291,7 @@ func testConcReqsQueueAPIs(t *testing.T) {
|
||||
}
|
||||
|
||||
func testConcReqsKillEngine(t *testing.T) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if err := engine.KillEngine(100); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -50,13 +50,13 @@ func (cS *CoreSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) err
|
||||
return nil
|
||||
}
|
||||
|
||||
type SleepArgs struct {
|
||||
SleepTime time.Duration
|
||||
type DurationArgs struct {
|
||||
DurationTime time.Duration
|
||||
}
|
||||
|
||||
// Sleep is used to test the concurrent requests mechanism
|
||||
func (cS *CoreSv1) Sleep(arg *SleepArgs, reply *string) error {
|
||||
time.Sleep(arg.SleepTime)
|
||||
func (cS *CoreSv1) Sleep(arg *DurationArgs, reply *string) error {
|
||||
time.Sleep(arg.DurationTime)
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package v1
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/rpc2"
|
||||
"github.com/cgrates/cgrates/sessions"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -58,6 +60,8 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} {
|
||||
|
||||
utils.SessionSv1STIRAuthenticate: ssv1.BiRPCV1STIRAuthenticate,
|
||||
utils.SessionSv1STIRIdentity: ssv1.BiRPCV1STIRIdentity,
|
||||
|
||||
utils.SessionSv1Sleep: ssv1.BiRPCV1Sleep, // Sleep method is used to test the concurrent requests mechanism
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,3 +196,9 @@ func (ssv1 *SessionSv1) BiRPCV1STIRIdentity(clnt *rpc2.Client,
|
||||
args *sessions.V1STIRIdentityArgs, reply *string) error {
|
||||
return ssv1.Ss.BiRPCv1STIRIdentity(nil, args, reply)
|
||||
}
|
||||
|
||||
func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *DurationArgs, reply *string) error {
|
||||
time.Sleep(arg.DurationTime)
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -33,8 +33,8 @@ func main() {
|
||||
log.Print("Start!")
|
||||
var wg sync.WaitGroup
|
||||
for i := 1; i < 1002; i++ {
|
||||
wg.Add(1)
|
||||
go func(index int) {
|
||||
wg.Add(1)
|
||||
resp, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "APIerSv1.SetAccount","params": [{"Tenant":"reglo","Account":"100%d","ActionPlanId":"PACKAGE_NEW_FOR795", "ReloadScheduler":false}], "id":%d}`, index, index))))
|
||||
if err != nil {
|
||||
log.Print("Post error: ", err)
|
||||
|
||||
@@ -5,32 +5,38 @@
|
||||
"node_id": "ConcurrentBusyEngine",
|
||||
"reply_timeout": "50s",
|
||||
"concurrent_requests": 2,
|
||||
"concurrent_strategy": "*busy",
|
||||
"concurrent_strategy": "*busy"
|
||||
},
|
||||
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":2012",
|
||||
"rpc_gob": ":2013",
|
||||
"http": ":2080",
|
||||
"http": ":2080"
|
||||
},
|
||||
|
||||
|
||||
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
|
||||
"db_type": "redis", // data_db type: <redis|mongo>
|
||||
"db_port": 6379, // data_db port to reach the database
|
||||
"db_name": "10", // data_db database name to connect to
|
||||
"db_name": "10" // data_db database name to connect to
|
||||
},
|
||||
|
||||
|
||||
"stor_db": {
|
||||
"db_password": "CGRateS.org",
|
||||
"db_password": "CGRateS.org"
|
||||
},
|
||||
|
||||
|
||||
"apiers": {
|
||||
"enabled": true,
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
|
||||
}
|
||||
"sessions": {
|
||||
"enabled": true,
|
||||
"listen_bijson": "127.0.0.1:2014"
|
||||
},
|
||||
|
||||
|
||||
},
|
||||
|
||||
@@ -32,4 +32,10 @@
|
||||
},
|
||||
|
||||
|
||||
"sessions": {
|
||||
"enabled": true,
|
||||
"listen_bijson": "127.0.0.1:2014"
|
||||
},
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -154,6 +154,9 @@ func (c *concReqsBiJSONCoded) ReadRequestBody(x interface{}) error {
|
||||
}
|
||||
|
||||
func (c *concReqsBiJSONCoded) ReadResponseBody(x interface{}) error {
|
||||
if err := ConReqs.Allocate(); err != nil {
|
||||
return err
|
||||
}
|
||||
if x == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1442,6 +1442,7 @@ const (
|
||||
SessionSv1DisconnectWarning = "SessionSv1.DisconnectWarning"
|
||||
SessionSv1STIRAuthenticate = "SessionSv1.STIRAuthenticate"
|
||||
SessionSv1STIRIdentity = "SessionSv1.STIRIdentity"
|
||||
SessionSv1Sleep = "SessionSv1.Sleep"
|
||||
)
|
||||
|
||||
// Responder APIs
|
||||
|
||||
Reference in New Issue
Block a user