diff --git a/apier/v1/concreqs_it_test.go b/apier/v1/concreqs_it_test.go
new file mode 100644
index 000000000..088bc0788
--- /dev/null
+++ b/apier/v1/concreqs_it_test.go
@@ -0,0 +1,162 @@
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package v1
+
+import (
+ "net/rpc"
+ "path"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/utils"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+)
+
+var (
+ concReqsCfgPath string
+ concReqsCfg *config.CGRConfig
+ concReqsRPC *rpc.Client
+ concReqsConfigDIR string //run tests for specific configuration
+
+ sTestsConcReqs = []func(t *testing.T){
+ testConcReqsInitCfg,
+ testConcReqsStartEngine,
+ testConcReqsRPCConn,
+ testConcReqsBusyAPIs,
+ testConcReqsQueueAPIs,
+ testConcReqsKillEngine,
+ }
+)
+
+//Test start here
+func TestConcReqsBusyJSON(t *testing.T) {
+ concReqsConfigDIR = "conc_reqs_busy"
+ for _, stest := range sTestsConcReqs {
+ t.Run(concReqsConfigDIR, stest)
+ }
+}
+
+func TestConcReqsQueueJSON(t *testing.T) {
+ concReqsConfigDIR = "conc_reqs_queue"
+ for _, stest := range sTestsConcReqs {
+ t.Run(concReqsConfigDIR, stest)
+ }
+}
+
+func TestConcReqsBusyGOB(t *testing.T) {
+ concReqsConfigDIR = "conc_reqs_busy"
+ encoding = utils.StringPointer(utils.MetaGOB)
+ for _, stest := range sTestsConcReqs {
+ t.Run(concReqsConfigDIR, stest)
+ }
+}
+
+func TestConcReqsQueueGOB(t *testing.T) {
+ concReqsConfigDIR = "conc_reqs_queue"
+ encoding = utils.StringPointer(utils.MetaGOB)
+ for _, stest := range sTestsConcReqs {
+ t.Run(concReqsConfigDIR, stest)
+ }
+}
+
+func testConcReqsInitCfg(t *testing.T) {
+ var err error
+ concReqsCfgPath = path.Join(*dataDir, "conf", "samples", concReqsConfigDIR)
+ concReqsCfg, err = config.NewCGRConfigFromPath(concReqsCfgPath)
+ if err != nil {
+ t.Error(err)
+ }
+ concReqsCfg.DataFolderPath = *dataDir
+ config.SetCgrConfig(concReqsCfg)
+}
+
+// Start CGR Engine
+func testConcReqsStartEngine(t *testing.T) {
+ if _, err := engine.StopStartEngine(concReqsCfgPath, *waitRater); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Connect rpc client to rater
+func testConcReqsRPCConn(t *testing.T) {
+ var err error
+ concReqsRPC, err = newRPCClient(concReqsCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testConcReqsBusyAPIs(t *testing.T) {
+ if concReqsConfigDIR != "conc_reqs_busy" {
+ t.SkipNow()
+ }
+ var failedAPIs int
+ wg := new(sync.WaitGroup)
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func() {
+ var resp string
+ if err := concReqsRPC.Call(utils.CoreSv1Sleep,
+ &SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)},
+ &resp); err != nil {
+ failedAPIs++
+ wg.Done()
+ return
+ }
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ if failedAPIs < 2 {
+ t.Errorf("Expected at leat 2 APIs to wait")
+ }
+}
+
+func testConcReqsQueueAPIs(t *testing.T) {
+ if concReqsConfigDIR != "conc_reqs_queue" {
+ t.SkipNow()
+ }
+ wg := new(sync.WaitGroup)
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func() {
+ var resp string
+ if err := concReqsRPC.Call(utils.CoreSv1Sleep,
+ &SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)},
+ &resp); err != nil {
+ wg.Done()
+ t.Error(err)
+ return
+ }
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+}
+
+func testConcReqsKillEngine(t *testing.T) {
+ if err := engine.KillEngine(100); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/apier/v1/core.go b/apier/v1/core.go
index b9dba1b8e..80e2a965a 100644
--- a/apier/v1/core.go
+++ b/apier/v1/core.go
@@ -19,6 +19,8 @@ along with this program. If not, see
package v1
import (
+ "time"
+
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -42,8 +44,19 @@ func (cS *CoreSv1) Status(arg *utils.TenantWithArgDispatcher, reply *map[string]
return cS.cS.Status(arg, reply)
}
-// Ping used to detreminate if component is active
+// Ping used to determinate if component is active
func (cS *CoreSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error {
*reply = utils.Pong
return nil
}
+
+type SleepArgs struct {
+ SleepTime time.Duration
+}
+
+// Sleep is used to test the concurrent requests mechanism
+func (cS *CoreSv1) Sleep(arg *SleepArgs, reply *string) error {
+ time.Sleep(arg.SleepTime)
+ *reply = utils.OK
+ return nil
+}
diff --git a/data/conf/samples/conc_reqs_busy/cgrates.json b/data/conf/samples/conc_reqs_busy/cgrates.json
new file mode 100644
index 000000000..5aa2d7adc
--- /dev/null
+++ b/data/conf/samples/conc_reqs_busy/cgrates.json
@@ -0,0 +1,36 @@
+{
+
+"general": {
+ "log_level": 7,
+ "node_id": "ConcurrentBusyEngine",
+ "reply_timeout": "50s",
+ "concurrent_requests": 2,
+ "concurrent_strategy": "*busy",
+},
+
+
+"listen": {
+ "rpc_json": ":2012",
+ "rpc_gob": ":2013",
+ "http": ":2080",
+},
+
+
+"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
+ "db_type": "redis", // data_db type:
+ "db_port": 6379, // data_db port to reach the database
+ "db_name": "10", // data_db database name to connect to
+},
+
+
+"stor_db": {
+ "db_password": "CGRateS.org",
+},
+
+
+"apiers": {
+ "enabled": true,
+},
+
+
+}
diff --git a/data/conf/samples/conc_reqs_queue/cgrates.json b/data/conf/samples/conc_reqs_queue/cgrates.json
new file mode 100644
index 000000000..b0ecf40db
--- /dev/null
+++ b/data/conf/samples/conc_reqs_queue/cgrates.json
@@ -0,0 +1,35 @@
+{
+
+"general": {
+ "log_level": 7,
+ "node_id": "ConcurrentQueueEngine",
+ "reply_timeout": "50s",
+ "concurrent_requests": 2,
+ "concurrent_strategy": "*queue",
+},
+
+
+"listen": {
+ "rpc_json": ":2012",
+ "rpc_gob": ":2013",
+ "http": ":2080",
+},
+
+
+"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
+ "db_type": "redis", // data_db type:
+ "db_port": 6379, // data_db port to reach the database
+ "db_name": "10", // data_db database name to connect to
+},
+
+"stor_db": {
+ "db_password": "CGRateS.org",
+},
+
+
+"apiers": {
+ "enabled": true,
+},
+
+
+}
diff --git a/utils/concreqs_bijson_codec.go b/utils/concreqs_bijson_codec.go
index 74301dc81..4e70b6fb4 100644
--- a/utils/concreqs_bijson_codec.go
+++ b/utils/concreqs_bijson_codec.go
@@ -82,9 +82,6 @@ type clientRequest struct {
}
func (c *concReqsBiJSONCoded) ReadHeader(req *rpc2.Request, resp *rpc2.Response) error {
- if err := ConReqs.Allocate(); err != nil {
- return err
- }
c.msg = message{}
if err := c.dec.Decode(&c.msg); err != nil {
return err
@@ -137,6 +134,9 @@ func (c *concReqsBiJSONCoded) ReadHeader(req *rpc2.Request, resp *rpc2.Response)
}
func (c *concReqsBiJSONCoded) ReadRequestBody(x interface{}) error {
+ if err := ConReqs.Allocate(); err != nil {
+ return err
+ }
if x == nil {
return nil
}
@@ -179,7 +179,7 @@ func (c *concReqsBiJSONCoded) WriteRequest(r *rpc2.Request, param interface{}) e
}
func (c *concReqsBiJSONCoded) WriteResponse(r *rpc2.Response, x interface{}) error {
- defer ConReqs.Deallocate()
+ defer ConReqs.Deallocate(r.Error)
c.mutex.Lock()
b, ok := c.pending[r.Seq]
if !ok {
diff --git a/utils/concureqs.go b/utils/concureqs.go
index ebccb6422..8f2029853 100644
--- a/utils/concureqs.go
+++ b/utils/concureqs.go
@@ -42,6 +42,8 @@ func NewConReqs(reqs int, strategy string) *ConcReqs {
return cR
}
+var errDeny = fmt.Errorf("denying request due to maximum active requests reached")
+
func (cR *ConcReqs) Allocate() (err error) {
if cR.nAReqs == 0 {
return
@@ -49,7 +51,7 @@ func (cR *ConcReqs) Allocate() (err error) {
switch cR.strategy {
case MetaBusy:
if len(cR.aReqs) == 0 {
- return fmt.Errorf("denying request due to maximum active requests reached")
+ return errDeny
}
fallthrough
case MetaQueue:
@@ -58,8 +60,8 @@ func (cR *ConcReqs) Allocate() (err error) {
return
}
-func (cR *ConcReqs) Deallocate() {
- if cR.nAReqs == 0 {
+func (cR *ConcReqs) Deallocate(errStr string) {
+ if cR.nAReqs == 0 || errStr == errDeny.Error() { // in case we receive denying request we don't need to put back the slot on channel because we returned error without getting it
return
}
cR.aReqs <- struct{}{}
diff --git a/utils/concureqs_gob_codec.go b/utils/concureqs_gob_codec.go
index 9f3757bb8..2ceb66a76 100644
--- a/utils/concureqs_gob_codec.go
+++ b/utils/concureqs_gob_codec.go
@@ -46,18 +46,18 @@ func NewConcReqsGobServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
}
func (c *concReqsGobServerCodec) ReadRequestHeader(r *rpc.Request) error {
- if err := ConReqs.Allocate(); err != nil {
- return err
- }
return c.dec.Decode(r)
}
func (c *concReqsGobServerCodec) ReadRequestBody(body interface{}) error {
+ if err := ConReqs.Allocate(); err != nil {
+ return err
+ }
return c.dec.Decode(body)
}
func (c *concReqsGobServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {
- defer ConReqs.Deallocate()
+ defer ConReqs.Deallocate(r.Error)
if err = c.enc.Encode(r); err != nil {
if c.encBuf.Flush() == nil {
// Gob couldn't encode the header. Should not happen, so if it does,
diff --git a/utils/concureqs_json_codec.go b/utils/concureqs_json_codec.go
index 5de8bc455..c85e7cd0c 100644
--- a/utils/concureqs_json_codec.go
+++ b/utils/concureqs_json_codec.go
@@ -57,9 +57,6 @@ func NewConcReqsServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
}
func (c *concReqsServerCodec) ReadRequestHeader(r *rpc.Request) error {
- if err := ConReqs.Allocate(); err != nil {
- return err
- }
c.req.reset()
if err := c.dec.Decode(&c.req); err != nil {
return err
@@ -80,6 +77,9 @@ func (c *concReqsServerCodec) ReadRequestHeader(r *rpc.Request) error {
}
func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error {
+ if err := ConReqs.Allocate(); err != nil {
+ return err
+ }
if x == nil {
return nil
}
@@ -96,7 +96,7 @@ func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error {
}
func (c *concReqsServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
- defer ConReqs.Deallocate()
+ defer ConReqs.Deallocate(r.Error)
c.mutex.Lock()
b, ok := c.pending[r.Seq]
if !ok {
diff --git a/utils/consts.go b/utils/consts.go
index 1ae3d3ada..b33883b56 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -1331,6 +1331,7 @@ const (
CoreSv1 = "CoreSv1"
CoreSv1Status = "CoreSv1.Status"
CoreSv1Ping = "CoreSv1.Ping"
+ CoreSv1Sleep = "CoreSv1.Sleep"
)
// RouteS APIs
diff --git a/utils/json_codec.go b/utils/json_codec.go
index 6cdcc9370..882ee2a4b 100644
--- a/utils/json_codec.go
+++ b/utils/json_codec.go
@@ -140,7 +140,7 @@ func (c *jsonServerCodec) ReadRequestBody(x interface{}) error {
var null = json.RawMessage([]byte("null"))
func (c *jsonServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
- defer ConReqs.Deallocate()
+ defer ConReqs.Deallocate(r.Error)
c.mutex.Lock()
b, ok := c.pending[r.Seq]
if !ok {