Concurrent sessions with sub-sub tests

This commit is contained in:
DanB
2020-01-08 19:30:58 +01:00
parent fb15513bb2
commit 2c93ef6611
3 changed files with 57 additions and 58 deletions

View File

@@ -19,7 +19,7 @@ package general_tests
import (
"errors"
"flag"
//"flag"
"net/rpc"
"net/rpc/jsonrpc"
@@ -28,14 +28,17 @@ import (
)
var (
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache")
encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication")
err error
//dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
//waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache")
waitRater = 100
//encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication")
encoding = utils.MetaJSON
err error
dataDir = "/usr/share/cgrates"
)
func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
switch *encoding {
switch encoding {
case utils.MetaJSON:
return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen)
case utils.MetaGOB:

View File

@@ -29,7 +29,7 @@ import (
"time"
"github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/apier/v2"
"github.com/cgrates/cgrates/apier/v2"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
@@ -45,13 +45,15 @@ var (
sCncrCps = flag.Int("cps", 50000, "maximum requests per second sent out")
cpsPool = make(chan struct{}, *sCncrCps)
acntIDs = make(chan string, 1)
wg sync.WaitGroup
)
// Tests starting here
func TestSCncrInternal(t *testing.T) {
sCncrCfgDIR = "sessinternal"
for _, tst := range sTestsSCncrIT {
t.Run(sCncrCfgDIR, tst)
t.Run("InternalConn", tst)
}
}
@@ -59,7 +61,7 @@ func TestSCncrInternal(t *testing.T) {
func TestSCncrJSON(t *testing.T) {
sCncrCfgDIR = "sessintjson"
for _, tst := range sTestsSCncrIT {
t.Run(sCncrCfgDIR, tst)
t.Run("JSONConn", tst)
}
}
@@ -76,7 +78,7 @@ var sTestsSCncrIT = []func(t *testing.T){
}
func testSCncrInitConfig(t *testing.T) {
sCncrCfgPath = path.Join(*dataDir, "conf", "samples", sCncrCfgDIR)
sCncrCfgPath = path.Join(dataDir, "conf", "samples", sCncrCfgDIR)
if sCncrCfg, err = config.NewCGRConfigFromPath(sCncrCfgPath); err != nil {
t.Fatal(err)
}
@@ -96,7 +98,7 @@ func testSCncrInitStorDB(t *testing.T) {
}
func testSCncrStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(sCncrCfgPath, *waitRater); err != nil {
if _, err := engine.StopStartEngine(sCncrCfgPath, waitRater); err != nil {
t.Fatal(err)
}
}
@@ -111,7 +113,7 @@ func testSCncrRPCConn(t *testing.T) {
}
func testSCncrKillEngine(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
if err := engine.KillEngine(waitRater); err != nil {
t.Error(err)
}
}
@@ -120,14 +122,14 @@ func testSCncrLoadTP(t *testing.T) {
var loadInst string
if err := sCncrRPC.Call(utils.ApierV1LoadTariffPlanFromFolder,
&utils.AttrLoadTpFromFolder{FolderPath: path.Join(
*dataDir, "tariffplans", "tp1cnt")}, &loadInst); err != nil {
dataDir, "tariffplans", "tp1cnt")}, &loadInst); err != nil {
t.Error(err)
}
attrPrfl := &v2.AttributeWithCache{
ExternalAttributeProfile: &engine.ExternalAttributeProfile{
Tenant: "cgrates.org",
ID: "AttrConcurrentSessions",
Contexts: []string{utils.ANY},
Tenant: "cgrates.org",
ID: "AttrConcurrentSessions",
Contexts: []string{utils.ANY},
Attributes: []*engine.ExternalAttribute{
{
FieldName: "TestType",
@@ -146,12 +148,11 @@ func testSCncrLoadTP(t *testing.T) {
}
func testSCncrRunSessions(t *testing.T) {
acntIDs := utils.NewStringSet(nil)
acntIDsSet := utils.NewStringSet(nil)
bufferTopup := time.Duration(8760) * time.Hour
var wg sync.WaitGroup
for i := 0; i < *sCncrSessions; i++ {
acntID := fmt.Sprintf("100%d", utils.RandomInteger(100, 200))
if !acntIDs.Has(acntID) {
if !acntIDsSet.Has(acntID) {
// Special balance BUFFER to cover concurrency on MAIN one
argsAddBalance := &v1.AttrAddBalance{
Tenant: "cgrates.org",
@@ -168,24 +169,14 @@ func testSCncrRunSessions(t *testing.T) {
} else if addBlcRply != utils.OK {
t.Errorf("received: <%s>", addBlcRply)
}
acntIDs.Add(acntID)
acntIDsSet.Add(acntID)
}
acntIDs <- acntID
wg.Add(1)
go func(acntID string) {
cpsPool <- struct{}{} // push here up to cps
go func() { // allow more requests after a second
time.Sleep(time.Duration(time.Second))
<-cpsPool
}()
err := runSession(acntID)
wg.Done()
if err != nil {
t.Error(err)
}
}(acntID)
go t.Run(fmt.Sprintf("RunSession#%d", i), testRunSession)
}
wg.Wait()
for acntID := range acntIDs.Data() {
for acntID := range acntIDsSet.Data() {
// make sure the account was properly refunded
var acnt *engine.Account
acntAttrs := &utils.AttrGetAccount{
@@ -202,9 +193,15 @@ func testSCncrRunSessions(t *testing.T) {
}
// runSession runs one session
func runSession(acntID string) (err error) {
func testRunSession(t *testing.T) {
defer wg.Done() // decrease group counter one out from test
cpsPool <- struct{}{} // push here up to cps
go func() { // allow more requests after a second
time.Sleep(time.Duration(time.Second))
<-cpsPool
}()
acntID := <-acntIDs
originID := utils.GenUUID() // each test with it's own OriginID
// topup as much as we know we need for one session
mainTopup := time.Duration(90) * time.Second
var addBlcRply string
@@ -219,9 +216,9 @@ func runSession(acntID string) (err error) {
},
}
if err = sCncrRPC.Call(utils.ApierV1AddBalance, argsAddBalance, &addBlcRply); err != nil {
return
t.Error(err)
} else if addBlcRply != utils.OK {
return fmt.Errorf("received: <%s> to ApierV1.AddBalance", addBlcRply)
t.Errorf("received: <%s> to ApierV1.AddBalance", addBlcRply)
}
time.Sleep(time.Duration(
utils.RandomInteger(0, 100)) * time.Millisecond) // randomize between tests
@@ -246,7 +243,7 @@ func runSession(acntID string) (err error) {
}
var rplyAuth sessions.V1AuthorizeReply
if err := sCncrRPC.Call(utils.SessionSv1AuthorizeEvent, authArgs, &rplyAuth); err != nil {
return err
t.Error(err)
}
time.Sleep(time.Duration(
utils.RandomInteger(0, 100)) * time.Millisecond)
@@ -254,8 +251,8 @@ func runSession(acntID string) (err error) {
// Init the session
initUsage := 1 * time.Minute
initArgs := &sessions.V1InitSessionArgs{
InitSession: true,
GetAttributes: true,
InitSession: true,
GetAttributes: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: fmt.Sprintf("TestSCncrInit%s", originID),
@@ -272,9 +269,9 @@ func runSession(acntID string) (err error) {
var rplyInit sessions.V1InitSessionReply
if err := sCncrRPC.Call(utils.SessionSv1InitiateSession,
initArgs, &rplyInit); err != nil {
return err
t.Error(err)
} else if rplyInit.MaxUsage == 0 {
return fmt.Errorf("unexpected MaxUsage at init: %v", rplyInit.MaxUsage)
t.Errorf("unexpected MaxUsage at init: %v", rplyInit.MaxUsage)
}
time.Sleep(time.Duration(
utils.RandomInteger(0, 100)) * time.Millisecond)
@@ -297,11 +294,11 @@ func runSession(acntID string) (err error) {
},
}
var rplyUpdt sessions.V1UpdateSessionReply
if err = sCncrRPC.Call(utils.SessionSv1UpdateSession,
if err := sCncrRPC.Call(utils.SessionSv1UpdateSession,
updtArgs, &rplyUpdt); err != nil {
return
t.Error(err)
} else if rplyUpdt.MaxUsage == 0 {
return fmt.Errorf("unexpected MaxUsage at update: %v", rplyUpdt.MaxUsage)
t.Errorf("unexpected MaxUsage at update: %v", rplyUpdt.MaxUsage)
}
time.Sleep(time.Duration(
utils.RandomInteger(0, 100)) * time.Millisecond)
@@ -319,11 +316,11 @@ func runSession(acntID string) (err error) {
},
}
var rplyTrmnt string
if err = sCncrRPC.Call(utils.SessionSv1TerminateSession,
if err := sCncrRPC.Call(utils.SessionSv1TerminateSession,
trmntArgs, &rplyTrmnt); err != nil {
return
t.Error(err)
} else if rplyTrmnt != utils.OK {
return fmt.Errorf("received: <%s> to SessionSv1.Terminate", rplyTrmnt)
t.Errorf("received: <%s> to SessionSv1.Terminate", rplyTrmnt)
}
time.Sleep(time.Duration(
utils.RandomInteger(0, 100)) * time.Millisecond)
@@ -339,21 +336,20 @@ func runSession(acntID string) (err error) {
},
}
var rplyCDR string
if err = sCncrRPC.Call(utils.SessionSv1ProcessCDR,
if err := sCncrRPC.Call(utils.SessionSv1ProcessCDR,
argsCDR, &rplyCDR); err != nil {
return
t.Error(err)
} else if rplyCDR != utils.OK {
return fmt.Errorf("received: <%s> to ProcessCDR", rplyCDR)
t.Errorf("received: <%s> to ProcessCDR", rplyCDR)
}
time.Sleep(time.Duration(20) * time.Millisecond)
var cdrs []*engine.ExternalCDR
argCDRs := utils.RPCCDRsFilter{OriginIDs: []string{originID}}
if err = sCncrRPC.Call(utils.ApierV2GetCDRs, argCDRs, &cdrs); err != nil {
return
if err := sCncrRPC.Call(utils.ApierV2GetCDRs, argCDRs, &cdrs); err != nil {
t.Error(err)
} else if len(cdrs) != 1 {
return fmt.Errorf("unexpected number of CDRs returned: %d", len(cdrs))
} else if cdrs[0].Usage != "1m30s" {
return fmt.Errorf("unexpected usage of CDR: %+v", cdrs[0])
t.Errorf("unexpected number of CDRs returned: %d", len(cdrs))
} else if cdrs[0].Usage != "2m30s" {
t.Errorf("unexpected usage of CDR: %+v", cdrs[0])
}
return
}

View File

@@ -2974,7 +2974,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
rplyAttr, err := sS.processAttributes(args.CGREvent, args.ArgDispatcher,
argsFlagsWithParams.ParamsSlice(utils.MetaAttributes))
if err == nil {
args.CGREvent = rplyAttr.CGREvent
args.CGREvent = rplyAttr.CGREvent.Clone()
if tntIface, has := args.CGREvent.Event[utils.MetaTenant]; has {
// special case when we want to overwrite the tenant
args.CGREvent.Tenant = tntIface.(string)