diff --git a/apier/v1/concreqs_it_test.go b/apier/v1/concreqs_it_test.go
index 088bc0788..9d56f5724 100644
--- a/apier/v1/concreqs_it_test.go
+++ b/apier/v1/concreqs_it_test.go
@@ -21,22 +21,29 @@ along with this program. If not, see
package v1
import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "net/http"
"net/rpc"
"path"
+ "strings"
"sync"
"testing"
"time"
- "github.com/cgrates/cgrates/utils"
+ "github.com/cenkalti/rpc2"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
)
var (
concReqsCfgPath string
concReqsCfg *config.CGRConfig
concReqsRPC *rpc.Client
+ concReqsBiRPC *rpc2.Client
concReqsConfigDIR string //run tests for specific configuration
sTestsConcReqs = []func(t *testing.T){
@@ -45,6 +52,10 @@ var (
testConcReqsRPCConn,
testConcReqsBusyAPIs,
testConcReqsQueueAPIs,
+ testConcReqsOnHTTPBusy,
+ testConcReqsOnHTTPQueue,
+ testConcReqsOnBiJSONBusy,
+ testConcReqsOnBiJSONQueue,
testConcReqsKillEngine,
}
)
@@ -98,6 +109,12 @@ func testConcReqsStartEngine(t *testing.T) {
}
}
+func handlePing(clnt *rpc2.Client, arg *DurationArgs, reply *string) error {
+ time.Sleep(arg.DurationTime)
+ *reply = utils.OK
+ return nil
+}
+
// Connect rpc client to rater
func testConcReqsRPCConn(t *testing.T) {
var err error
@@ -105,6 +122,10 @@ func testConcReqsRPCConn(t *testing.T) {
if err != nil {
t.Fatal(err)
}
+ if concReqsBiRPC, err = utils.NewBiJSONrpcClient(concReqsCfg.SessionSCfg().ListenBijson,
+ nil); err != nil {
+ t.Fatal(err)
+ }
}
func testConcReqsBusyAPIs(t *testing.T) {
@@ -113,14 +134,17 @@ func testConcReqsBusyAPIs(t *testing.T) {
}
var failedAPIs int
wg := new(sync.WaitGroup)
+ lock := new(sync.Mutex)
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
var resp string
if err := concReqsRPC.Call(utils.CoreSv1Sleep,
- &SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)},
+ &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&resp); err != nil {
+ lock.Lock()
failedAPIs++
+ lock.Unlock()
wg.Done()
return
}
@@ -143,7 +167,118 @@ func testConcReqsQueueAPIs(t *testing.T) {
go func() {
var resp string
if err := concReqsRPC.Call(utils.CoreSv1Sleep,
- &SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)},
+ &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
+ &resp); err != nil {
+ wg.Done()
+ t.Error(err)
+ return
+ }
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+}
+
+func testConcReqsOnHTTPBusy(t *testing.T) {
+ if concReqsConfigDIR != "conc_reqs_busy" {
+ t.SkipNow()
+ }
+ var fldAPIs int64
+ wg := new(sync.WaitGroup)
+ lock := new(sync.Mutex)
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func(index int) {
+ resp, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index))))
+ if err != nil {
+ wg.Done()
+ t.Error(err)
+ return
+ }
+ contents, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ wg.Done()
+ t.Error(err)
+ return
+ }
+ resp.Body.Close()
+ if strings.Contains(string(contents), "denying request due to maximum active requests reached") {
+ lock.Lock()
+ fldAPIs++
+ lock.Unlock()
+ }
+ wg.Done()
+ return
+ }(i)
+ }
+ wg.Wait()
+ if fldAPIs < 2 {
+ t.Errorf("Expected at leat 2 APIs to wait")
+ }
+}
+
+func testConcReqsOnHTTPQueue(t *testing.T) {
+ if concReqsConfigDIR != "conc_reqs_queue" {
+ t.SkipNow()
+ }
+ wg := new(sync.WaitGroup)
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func(index int) {
+ _, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index))))
+ if err != nil {
+ wg.Done()
+ t.Error(err)
+ return
+ }
+ wg.Done()
+ return
+ }(i)
+ }
+ wg.Wait()
+}
+
+func testConcReqsOnBiJSONBusy(t *testing.T) {
+ if concReqsConfigDIR != "conc_reqs_busy" {
+ t.SkipNow()
+ }
+ var failedAPIs int
+ wg := new(sync.WaitGroup)
+ lock := new(sync.Mutex)
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func() {
+ var resp string
+ if err := concReqsBiRPC.Call(utils.SessionSv1Sleep,
+ &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
+ &resp); err != nil {
+ fmt.Println(err)
+ lock.Lock()
+ failedAPIs++
+ lock.Unlock()
+ wg.Done()
+ return
+ }
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ if failedAPIs < 2 {
+ t.Errorf("Expected at leat 2 APIs to wait")
+ }
+}
+
+func testConcReqsOnBiJSONQueue(t *testing.T) {
+ if concReqsConfigDIR != "conc_reqs_queue" {
+ t.SkipNow()
+ }
+ wg := new(sync.WaitGroup)
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func() {
+ var resp string
+ if err := concReqsBiRPC.Call(utils.SessionSv1Sleep,
+ &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)},
&resp); err != nil {
wg.Done()
t.Error(err)
@@ -156,6 +291,7 @@ func testConcReqsQueueAPIs(t *testing.T) {
}
func testConcReqsKillEngine(t *testing.T) {
+ time.Sleep(100 * time.Millisecond)
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
diff --git a/apier/v1/core.go b/apier/v1/core.go
index 80e2a965a..88cbbe096 100644
--- a/apier/v1/core.go
+++ b/apier/v1/core.go
@@ -50,13 +50,13 @@ func (cS *CoreSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) err
return nil
}
-type SleepArgs struct {
- SleepTime time.Duration
+type DurationArgs struct {
+ DurationTime time.Duration
}
// Sleep is used to test the concurrent requests mechanism
-func (cS *CoreSv1) Sleep(arg *SleepArgs, reply *string) error {
- time.Sleep(arg.SleepTime)
+func (cS *CoreSv1) Sleep(arg *DurationArgs, reply *string) error {
+ time.Sleep(arg.DurationTime)
*reply = utils.OK
return nil
}
diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go
index 4ab290d34..a288166b9 100644
--- a/apier/v1/sessionsbirpc.go
+++ b/apier/v1/sessionsbirpc.go
@@ -19,6 +19,8 @@ along with this program. If not, see
package v1
import (
+ "time"
+
"github.com/cenkalti/rpc2"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
@@ -58,6 +60,8 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} {
utils.SessionSv1STIRAuthenticate: ssv1.BiRPCV1STIRAuthenticate,
utils.SessionSv1STIRIdentity: ssv1.BiRPCV1STIRIdentity,
+
+ utils.SessionSv1Sleep: ssv1.BiRPCV1Sleep, // Sleep method is used to test the concurrent requests mechanism
}
}
@@ -192,3 +196,9 @@ func (ssv1 *SessionSv1) BiRPCV1STIRIdentity(clnt *rpc2.Client,
args *sessions.V1STIRIdentityArgs, reply *string) error {
return ssv1.Ss.BiRPCv1STIRIdentity(nil, args, reply)
}
+
+func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *DurationArgs, reply *string) error {
+ time.Sleep(arg.DurationTime)
+ *reply = utils.OK
+ return nil
+}
diff --git a/cmd/cgr-tester/parallel/parallel.go b/cmd/cgr-tester/parallel/parallel.go
index e63746b61..b5c08416d 100644
--- a/cmd/cgr-tester/parallel/parallel.go
+++ b/cmd/cgr-tester/parallel/parallel.go
@@ -33,8 +33,8 @@ func main() {
log.Print("Start!")
var wg sync.WaitGroup
for i := 1; i < 1002; i++ {
+ wg.Add(1)
go func(index int) {
- wg.Add(1)
resp, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "APIerSv1.SetAccount","params": [{"Tenant":"reglo","Account":"100%d","ActionPlanId":"PACKAGE_NEW_FOR795", "ReloadScheduler":false}], "id":%d}`, index, index))))
if err != nil {
log.Print("Post error: ", err)
diff --git a/data/conf/samples/conc_reqs_busy/cgrates.json b/data/conf/samples/conc_reqs_busy/cgrates.json
index 5aa2d7adc..d10fb36cc 100644
--- a/data/conf/samples/conc_reqs_busy/cgrates.json
+++ b/data/conf/samples/conc_reqs_busy/cgrates.json
@@ -5,32 +5,38 @@
"node_id": "ConcurrentBusyEngine",
"reply_timeout": "50s",
"concurrent_requests": 2,
- "concurrent_strategy": "*busy",
+ "concurrent_strategy": "*busy"
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
- "http": ":2080",
+ "http": ":2080"
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type:
"db_port": 6379, // data_db port to reach the database
- "db_name": "10", // data_db database name to connect to
+ "db_name": "10" // data_db database name to connect to
},
"stor_db": {
- "db_password": "CGRateS.org",
+ "db_password": "CGRateS.org"
},
"apiers": {
- "enabled": true,
+ "enabled": true
},
-}
+"sessions": {
+ "enabled": true,
+ "listen_bijson": "127.0.0.1:2014"
+},
+
+
+},
diff --git a/data/conf/samples/conc_reqs_queue/cgrates.json b/data/conf/samples/conc_reqs_queue/cgrates.json
index b0ecf40db..300b02506 100644
--- a/data/conf/samples/conc_reqs_queue/cgrates.json
+++ b/data/conf/samples/conc_reqs_queue/cgrates.json
@@ -32,4 +32,10 @@
},
+"sessions": {
+ "enabled": true,
+ "listen_bijson": "127.0.0.1:2014"
+},
+
+
}
diff --git a/utils/concreqs_bijson_codec.go b/utils/concreqs_bijson_codec.go
index 4e70b6fb4..f4730c5ef 100644
--- a/utils/concreqs_bijson_codec.go
+++ b/utils/concreqs_bijson_codec.go
@@ -154,6 +154,9 @@ func (c *concReqsBiJSONCoded) ReadRequestBody(x interface{}) error {
}
func (c *concReqsBiJSONCoded) ReadResponseBody(x interface{}) error {
+ if err := ConReqs.Allocate(); err != nil {
+ return err
+ }
if x == nil {
return nil
}
diff --git a/utils/consts.go b/utils/consts.go
index b33883b56..81151e40f 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -1442,6 +1442,7 @@ const (
SessionSv1DisconnectWarning = "SessionSv1.DisconnectWarning"
SessionSv1STIRAuthenticate = "SessionSv1.STIRAuthenticate"
SessionSv1STIRIdentity = "SessionSv1.STIRIdentity"
+ SessionSv1Sleep = "SessionSv1.Sleep"
)
// Responder APIs