Update concurrent mechanism and add integration tests for json and gob

This commit is contained in:
TeoV
2020-07-08 12:47:08 +03:00
parent 6d0eb22298
commit 8695536910
10 changed files with 266 additions and 17 deletions

View File

@@ -0,0 +1,162 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"net/rpc"
"path"
"sync"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
)
var (
concReqsCfgPath string
concReqsCfg *config.CGRConfig
concReqsRPC *rpc.Client
concReqsConfigDIR string //run tests for specific configuration
sTestsConcReqs = []func(t *testing.T){
testConcReqsInitCfg,
testConcReqsStartEngine,
testConcReqsRPCConn,
testConcReqsBusyAPIs,
testConcReqsQueueAPIs,
testConcReqsKillEngine,
}
)
//Test start here
func TestConcReqsBusyJSON(t *testing.T) {
concReqsConfigDIR = "conc_reqs_busy"
for _, stest := range sTestsConcReqs {
t.Run(concReqsConfigDIR, stest)
}
}
func TestConcReqsQueueJSON(t *testing.T) {
concReqsConfigDIR = "conc_reqs_queue"
for _, stest := range sTestsConcReqs {
t.Run(concReqsConfigDIR, stest)
}
}
func TestConcReqsBusyGOB(t *testing.T) {
concReqsConfigDIR = "conc_reqs_busy"
encoding = utils.StringPointer(utils.MetaGOB)
for _, stest := range sTestsConcReqs {
t.Run(concReqsConfigDIR, stest)
}
}
func TestConcReqsQueueGOB(t *testing.T) {
concReqsConfigDIR = "conc_reqs_queue"
encoding = utils.StringPointer(utils.MetaGOB)
for _, stest := range sTestsConcReqs {
t.Run(concReqsConfigDIR, stest)
}
}
func testConcReqsInitCfg(t *testing.T) {
var err error
concReqsCfgPath = path.Join(*dataDir, "conf", "samples", concReqsConfigDIR)
concReqsCfg, err = config.NewCGRConfigFromPath(concReqsCfgPath)
if err != nil {
t.Error(err)
}
concReqsCfg.DataFolderPath = *dataDir
config.SetCgrConfig(concReqsCfg)
}
// Start CGR Engine
func testConcReqsStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(concReqsCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func testConcReqsRPCConn(t *testing.T) {
var err error
concReqsRPC, err = newRPCClient(concReqsCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
func testConcReqsBusyAPIs(t *testing.T) {
if concReqsConfigDIR != "conc_reqs_busy" {
t.SkipNow()
}
var failedAPIs int
wg := new(sync.WaitGroup)
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
var resp string
if err := concReqsRPC.Call(utils.CoreSv1Sleep,
&SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)},
&resp); err != nil {
failedAPIs++
wg.Done()
return
}
wg.Done()
}()
}
wg.Wait()
if failedAPIs < 2 {
t.Errorf("Expected at leat 2 APIs to wait")
}
}
func testConcReqsQueueAPIs(t *testing.T) {
if concReqsConfigDIR != "conc_reqs_queue" {
t.SkipNow()
}
wg := new(sync.WaitGroup)
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
var resp string
if err := concReqsRPC.Call(utils.CoreSv1Sleep,
&SleepArgs{SleepTime: time.Duration(10 * time.Millisecond)},
&resp); err != nil {
wg.Done()
t.Error(err)
return
}
wg.Done()
}()
}
wg.Wait()
}
func testConcReqsKillEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
}

View File

@@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -42,8 +44,19 @@ func (cS *CoreSv1) Status(arg *utils.TenantWithArgDispatcher, reply *map[string]
return cS.cS.Status(arg, reply)
}
// Ping used to detreminate if component is active
// Ping used to determinate if component is active
func (cS *CoreSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error {
*reply = utils.Pong
return nil
}
type SleepArgs struct {
SleepTime time.Duration
}
// Sleep is used to test the concurrent requests mechanism
func (cS *CoreSv1) Sleep(arg *SleepArgs, reply *string) error {
time.Sleep(arg.SleepTime)
*reply = utils.OK
return nil
}

View File

@@ -0,0 +1,36 @@
{
"general": {
"log_level": 7,
"node_id": "ConcurrentBusyEngine",
"reply_timeout": "50s",
"concurrent_requests": 2,
"concurrent_strategy": "*busy",
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <redis|mongo>
"db_port": 6379, // data_db port to reach the database
"db_name": "10", // data_db database name to connect to
},
"stor_db": {
"db_password": "CGRateS.org",
},
"apiers": {
"enabled": true,
},
}

View File

@@ -0,0 +1,35 @@
{
"general": {
"log_level": 7,
"node_id": "ConcurrentQueueEngine",
"reply_timeout": "50s",
"concurrent_requests": 2,
"concurrent_strategy": "*queue",
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <redis|mongo>
"db_port": 6379, // data_db port to reach the database
"db_name": "10", // data_db database name to connect to
},
"stor_db": {
"db_password": "CGRateS.org",
},
"apiers": {
"enabled": true,
},
}

View File

@@ -82,9 +82,6 @@ type clientRequest struct {
}
func (c *concReqsBiJSONCoded) ReadHeader(req *rpc2.Request, resp *rpc2.Response) error {
if err := ConReqs.Allocate(); err != nil {
return err
}
c.msg = message{}
if err := c.dec.Decode(&c.msg); err != nil {
return err
@@ -137,6 +134,9 @@ func (c *concReqsBiJSONCoded) ReadHeader(req *rpc2.Request, resp *rpc2.Response)
}
func (c *concReqsBiJSONCoded) ReadRequestBody(x interface{}) error {
if err := ConReqs.Allocate(); err != nil {
return err
}
if x == nil {
return nil
}
@@ -179,7 +179,7 @@ func (c *concReqsBiJSONCoded) WriteRequest(r *rpc2.Request, param interface{}) e
}
func (c *concReqsBiJSONCoded) WriteResponse(r *rpc2.Response, x interface{}) error {
defer ConReqs.Deallocate()
defer ConReqs.Deallocate(r.Error)
c.mutex.Lock()
b, ok := c.pending[r.Seq]
if !ok {

View File

@@ -42,6 +42,8 @@ func NewConReqs(reqs int, strategy string) *ConcReqs {
return cR
}
var errDeny = fmt.Errorf("denying request due to maximum active requests reached")
func (cR *ConcReqs) Allocate() (err error) {
if cR.nAReqs == 0 {
return
@@ -49,7 +51,7 @@ func (cR *ConcReqs) Allocate() (err error) {
switch cR.strategy {
case MetaBusy:
if len(cR.aReqs) == 0 {
return fmt.Errorf("denying request due to maximum active requests reached")
return errDeny
}
fallthrough
case MetaQueue:
@@ -58,8 +60,8 @@ func (cR *ConcReqs) Allocate() (err error) {
return
}
func (cR *ConcReqs) Deallocate() {
if cR.nAReqs == 0 {
func (cR *ConcReqs) Deallocate(errStr string) {
if cR.nAReqs == 0 || errStr == errDeny.Error() { // in case we receive denying request we don't need to put back the slot on channel because we returned error without getting it
return
}
cR.aReqs <- struct{}{}

View File

@@ -46,18 +46,18 @@ func NewConcReqsGobServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
}
func (c *concReqsGobServerCodec) ReadRequestHeader(r *rpc.Request) error {
if err := ConReqs.Allocate(); err != nil {
return err
}
return c.dec.Decode(r)
}
func (c *concReqsGobServerCodec) ReadRequestBody(body interface{}) error {
if err := ConReqs.Allocate(); err != nil {
return err
}
return c.dec.Decode(body)
}
func (c *concReqsGobServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {
defer ConReqs.Deallocate()
defer ConReqs.Deallocate(r.Error)
if err = c.enc.Encode(r); err != nil {
if c.encBuf.Flush() == nil {
// Gob couldn't encode the header. Should not happen, so if it does,

View File

@@ -57,9 +57,6 @@ func NewConcReqsServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
}
func (c *concReqsServerCodec) ReadRequestHeader(r *rpc.Request) error {
if err := ConReqs.Allocate(); err != nil {
return err
}
c.req.reset()
if err := c.dec.Decode(&c.req); err != nil {
return err
@@ -80,6 +77,9 @@ func (c *concReqsServerCodec) ReadRequestHeader(r *rpc.Request) error {
}
func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error {
if err := ConReqs.Allocate(); err != nil {
return err
}
if x == nil {
return nil
}
@@ -96,7 +96,7 @@ func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error {
}
func (c *concReqsServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
defer ConReqs.Deallocate()
defer ConReqs.Deallocate(r.Error)
c.mutex.Lock()
b, ok := c.pending[r.Seq]
if !ok {

View File

@@ -1331,6 +1331,7 @@ const (
CoreSv1 = "CoreSv1"
CoreSv1Status = "CoreSv1.Status"
CoreSv1Ping = "CoreSv1.Ping"
CoreSv1Sleep = "CoreSv1.Sleep"
)
// RouteS APIs

View File

@@ -140,7 +140,7 @@ func (c *jsonServerCodec) ReadRequestBody(x interface{}) error {
var null = json.RawMessage([]byte("null"))
func (c *jsonServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
defer ConReqs.Deallocate()
defer ConReqs.Deallocate(r.Error)
c.mutex.Lock()
b, ok := c.pending[r.Seq]
if !ok {