Add integration test for http concurrent requests

This commit is contained in:
TeoV
2020-07-08 15:29:57 +03:00
parent ed6886fcb3
commit f6e3744b00
3 changed files with 77 additions and 7 deletions

View File

@@ -21,8 +21,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/rpc"
"path"
"strings"
"sync"
"testing"
"time"
@@ -45,6 +50,8 @@ var (
testConcReqsRPCConn,
testConcReqsBusyAPIs,
testConcReqsQueueAPIs,
testConcReqsOnHTTPBusy,
testConcReqsOnHTTPQueue,
testConcReqsKillEngine,
}
)
@@ -113,14 +120,17 @@ func testConcReqsBusyAPIs(t *testing.T) {
}
var failedAPIs int
wg := new(sync.WaitGroup)
lock := new(sync.Mutex)
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
var resp string
if err := concReqsRPC.Call(utils.CoreSv1Sleep,
&SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)},
&DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&resp); err != nil {
lock.Lock()
failedAPIs++
lock.Unlock()
wg.Done()
return
}
@@ -143,7 +153,7 @@ func testConcReqsQueueAPIs(t *testing.T) {
go func() {
var resp string
if err := concReqsRPC.Call(utils.CoreSv1Sleep,
&SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)},
&DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&resp); err != nil {
wg.Done()
t.Error(err)
@@ -155,7 +165,67 @@ func testConcReqsQueueAPIs(t *testing.T) {
wg.Wait()
}
func testConcReqsOnHTTPBusy(t *testing.T) {
if concReqsConfigDIR != "conc_reqs_busy" {
t.SkipNow()
}
var fldAPIs int64
wg := new(sync.WaitGroup)
lock := new(sync.Mutex)
for i := 0; i < 5; i++ {
wg.Add(1)
go func(index int) {
resp, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index))))
if err != nil {
wg.Done()
t.Error(err)
return
}
contents, err := ioutil.ReadAll(resp.Body)
if err != nil {
wg.Done()
t.Error(err)
return
}
resp.Body.Close()
if strings.Contains(string(contents), "denying request due to maximum active requests reached") {
lock.Lock()
fldAPIs++
lock.Unlock()
}
wg.Done()
return
}(i)
}
wg.Wait()
if fldAPIs < 2 {
t.Errorf("Expected at leat 2 APIs to wait")
}
}
func testConcReqsOnHTTPQueue(t *testing.T) {
if concReqsConfigDIR != "conc_reqs_queue" {
t.SkipNow()
}
wg := new(sync.WaitGroup)
for i := 0; i < 5; i++ {
wg.Add(1)
go func(index int) {
_, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index))))
if err != nil {
wg.Done()
t.Error(err)
return
}
wg.Done()
return
}(i)
}
wg.Wait()
}
func testConcReqsKillEngine(t *testing.T) {
time.Sleep(100 * time.Millisecond)
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}

View File

@@ -50,13 +50,13 @@ func (cS *CoreSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) err
return nil
}
type SleepArgs struct {
SleepTime time.Duration
type DurationArgs struct {
DurationTime time.Duration
}
// Sleep is used to test the concurrent requests mechanism
func (cS *CoreSv1) Sleep(arg *SleepArgs, reply *string) error {
time.Sleep(arg.SleepTime)
func (cS *CoreSv1) Sleep(arg *DurationArgs, reply *string) error {
time.Sleep(arg.DurationTime)
*reply = utils.OK
return nil
}

View File

@@ -33,8 +33,8 @@ func main() {
log.Print("Start!")
var wg sync.WaitGroup
for i := 1; i < 1002; i++ {
wg.Add(1)
go func(index int) {
wg.Add(1)
resp, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "APIerSv1.SetAccount","params": [{"Tenant":"reglo","Account":"100%d","ActionPlanId":"PACKAGE_NEW_FOR795", "ReloadScheduler":false}], "id":%d}`, index, index))))
if err != nil {
log.Print("Post error: ", err)