diff --git a/apier/v1/concreqs_it_test.go b/apier/v1/concreqs_it_test.go index 088bc0788..abc2e4c3a 100644 --- a/apier/v1/concreqs_it_test.go +++ b/apier/v1/concreqs_it_test.go @@ -21,8 +21,13 @@ along with this program. If not, see package v1 import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" "net/rpc" "path" + "strings" "sync" "testing" "time" @@ -45,6 +50,8 @@ var ( testConcReqsRPCConn, testConcReqsBusyAPIs, testConcReqsQueueAPIs, + testConcReqsOnHTTPBusy, + testConcReqsOnHTTPQueue, testConcReqsKillEngine, } ) @@ -113,14 +120,17 @@ func testConcReqsBusyAPIs(t *testing.T) { } var failedAPIs int wg := new(sync.WaitGroup) + lock := new(sync.Mutex) for i := 0; i < 5; i++ { wg.Add(1) go func() { var resp string if err := concReqsRPC.Call(utils.CoreSv1Sleep, - &SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)}, + &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, &resp); err != nil { + lock.Lock() failedAPIs++ + lock.Unlock() wg.Done() return } @@ -143,7 +153,7 @@ func testConcReqsQueueAPIs(t *testing.T) { go func() { var resp string if err := concReqsRPC.Call(utils.CoreSv1Sleep, - &SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)}, + &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, &resp); err != nil { wg.Done() t.Error(err) @@ -155,7 +165,67 @@ func testConcReqsQueueAPIs(t *testing.T) { wg.Wait() } +func testConcReqsOnHTTPBusy(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_busy" { + t.SkipNow() + } + var fldAPIs int64 + wg := new(sync.WaitGroup) + lock := new(sync.Mutex) + for i := 0; i < 5; i++ { + wg.Add(1) + go func(index int) { + resp, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index)))) + if err != nil { + wg.Done() + t.Error(err) + return + } + contents, err := ioutil.ReadAll(resp.Body) + if err != nil { + wg.Done() + t.Error(err) + return + } + resp.Body.Close() + if strings.Contains(string(contents), "denying request due to maximum active requests reached") { + lock.Lock() + fldAPIs++ + lock.Unlock() + } + wg.Done() + return + }(i) + } + wg.Wait() + if fldAPIs < 2 { + t.Errorf("Expected at leat 2 APIs to wait") + } +} + +func testConcReqsOnHTTPQueue(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_queue" { + t.SkipNow() + } + wg := new(sync.WaitGroup) + for i := 0; i < 5; i++ { + wg.Add(1) + go func(index int) { + _, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index)))) + if err != nil { + wg.Done() + t.Error(err) + return + } + wg.Done() + return + }(i) + } + wg.Wait() +} + func testConcReqsKillEngine(t *testing.T) { + time.Sleep(100 * time.Millisecond) if err := engine.KillEngine(100); err != nil { t.Error(err) } diff --git a/apier/v1/core.go b/apier/v1/core.go index 80e2a965a..88cbbe096 100644 --- a/apier/v1/core.go +++ b/apier/v1/core.go @@ -50,13 +50,13 @@ func (cS *CoreSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) err return nil } -type SleepArgs struct { - SleepTime time.Duration +type DurationArgs struct { + DurationTime time.Duration } // Sleep is used to test the concurrent requests mechanism -func (cS *CoreSv1) Sleep(arg *SleepArgs, reply *string) error { - time.Sleep(arg.SleepTime) +func (cS *CoreSv1) Sleep(arg *DurationArgs, reply *string) error { + time.Sleep(arg.DurationTime) *reply = utils.OK return nil } diff --git a/cmd/cgr-tester/parallel/parallel.go b/cmd/cgr-tester/parallel/parallel.go index e63746b61..b5c08416d 100644 --- a/cmd/cgr-tester/parallel/parallel.go +++ b/cmd/cgr-tester/parallel/parallel.go @@ -33,8 +33,8 @@ func main() { log.Print("Start!") var wg sync.WaitGroup for i := 1; i < 1002; i++ { + wg.Add(1) go func(index int) { - wg.Add(1) resp, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "APIerSv1.SetAccount","params": [{"Tenant":"reglo","Account":"100%d","ActionPlanId":"PACKAGE_NEW_FOR795", "ReloadScheduler":false}], "id":%d}`, index, index)))) if err != nil { log.Print("Post error: ", err)