Adding full set of events for concurrent sessions tests

This commit is contained in:
DanB
2020-01-05 21:05:29 +01:00
parent 084af48329
commit df1ff276b7
13 changed files with 287 additions and 51 deletions

View File

@@ -0,0 +1,94 @@
{
// CGRateS Configuration file
//
"general": {
"log_level": 7,
"reply_timeout": "5m",
},
"data_db": {
"db_type": "*internal",
},
"stor_db": {
"db_type": "*internal",
},
"rals": {
"enabled": true,
"thresholds_conns": ["*localhost"],
},
"schedulers": {
"enabled": true,
"cdrs_conns": ["*localhost"],
},
"cdrs": {
"enabled": true,
"chargers_conns":["*localhost"],
},
"attributes": {
"enabled": true,
},
"chargers": {
"enabled": true,
"attributes_conns": ["*localhost"],
},
"resources": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": ["*localhost"]
},
"stats": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": ["*localhost"],
},
"thresholds": {
"enabled": true,
"store_interval": "-1",
},
"suppliers": {
"enabled": true,
"prefix_indexed_fields":["Destination"],
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
},
"sessions": {
"enabled": true,
"suppliers_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"attributes_conns": ["*localhost"],
"rals_conns": ["*localhost"],
"cdrs_conns": ["*localhost"],
"chargers_conns": ["*localhost"],
},
"apier": {
"scheduler_conns": ["*localhost"],
},
}

View File

@@ -0,0 +1,2 @@
#Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight
cgrates.org,DEFAULT,,,*default,*none,0
1 #Tenant ID FilterIDs ActivationInterval RunID AttributeIDs Weight
2 cgrates.org DEFAULT *default *none 0

View File

@@ -0,0 +1,2 @@
#ID,DestinationsID,RatesID,RoundingMethod,RoundingDecimals,MaxCost,MaxCostStrategy
DR_1CNT,*any,RT_1CNT,*up,4,,
1 #ID DestinationsID RatesID RoundingMethod RoundingDecimals MaxCost MaxCostStrategy
2 DR_1CNT *any RT_1CNT *up 4

View File

@@ -0,0 +1,2 @@
#ID,ConnectFee,Rate,RateUnit,RateIncrement,GroupIntervalStart
RT_1CNT,0,0.01,1s,1s,0
1 #ID ConnectFee Rate RateUnit RateIncrement GroupIntervalStart
2 RT_1CNT 0 0.01 1s 1s 0

View File

@@ -0,0 +1,2 @@
#ID,DestinationRatesID,TimingID,Weight
RP_1CNT,DR_1CNT,*any,0
1 #ID DestinationRatesID TimingID Weight
2 RP_1CNT DR_1CNT *any 0

View File

@@ -0,0 +1,2 @@
#Tenant,Category,Subject,ActivationTime,RatingPlanID,FallbackSubject
cgrates.org,call,*any,2020-01-01T00:00:00Z,RP_1CNT,
1 #Tenant Category Subject ActivationTime RatingPlanID FallbackSubject
2 cgrates.org call *any 2020-01-01T00:00:00Z RP_1CNT

View File

@@ -152,6 +152,21 @@ func (me MapEvent) GetTimeIgnoreErrors(fldName string, tmz string) (t time.Time)
return
}
// GetTimePtr returns a pointer towards time or error
func (me MapEvent) GetTimePtr(fldName, tmz string) (t *time.Time, err error) {
var tm time.Time
if tm, err = me.GetTime(fldName, tmz); err != nil {
return
}
return utils.TimePointer(tm), nil
}
// GetTimePtrIgnoreErrors returns a pointer towards time or nil if errors
func (me MapEvent) GetTimePtrIgnoreErrors(fldName, tmz string) (t *time.Time) {
t, _ = me.GetTimePtr(fldName, tmz)
return
}
// Clone returns the cloned map
func (me MapEvent) Clone() (mp MapEvent) {
if me == nil {

View File

@@ -21,11 +21,8 @@ package general_tests
import (
"encoding/json"
"errors"
"flag"
"fmt"
"net/rpc"
"net/rpc/jsonrpc"
"path"
"sync"
"testing"
@@ -44,20 +41,8 @@ var (
a1CfgPath string
a1Cfg *config.CGRConfig
a1rpc *rpc.Client
encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication")
)
func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
switch *encoding {
case utils.MetaJSON:
return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen)
case utils.MetaGOB:
return rpc.Dial(utils.TCP, cfg.RPCGOBListen)
default:
return nil, errors.New("UNSUPPORTED_RPC")
}
}
var sTestsA1it = []func(t *testing.T){
testA1itLoadConfig,
testA1itResetDataDB,

46
general_tests/libtest.go Normal file
View File

@@ -0,0 +1,46 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package general_tests
import (
"errors"
"flag"
"net/rpc"
"net/rpc/jsonrpc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
var (
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache")
encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication")
err error
)
func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
switch *encoding {
case utils.MetaJSON:
return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen)
case utils.MetaGOB:
return rpc.Dial(utils.TCP, cfg.RPCGOBListen)
default:
return nil, errors.New("UNSUPPORTED_RPC")
}
}

View File

@@ -40,8 +40,6 @@ var (
rater *rpc.Client
testCalls = flag.Bool("calls", false, "Run test calls simulation, not by default.")
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache")
sTestMCDRC = []func(t *testing.T){
testMCDRCLoadConfig,

View File

@@ -43,7 +43,6 @@ var ( // shared vars
rpcRAL1, rpcRAL2 *rpcclient.RPCClient
rpcPoolFirst, rpcPoolBroadcast *rpcclient.RPCPool
ral1, ral2 *exec.Cmd
err error
node1 = "node1"
node2 = "node2"
)

View File

@@ -1,4 +1,4 @@
// +build integration
// +build performance
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
@@ -28,6 +28,7 @@ import (
"testing"
"time"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
@@ -39,7 +40,7 @@ var (
sCncrCfg *config.CGRConfig
sCncrRPC *rpc.Client
sCncrSessions = flag.Int("concurrent_sessions", 500000, "maximum concurrent sessions created")
sCncrSessions = flag.Int("sessions", 500000, "maximum concurrent sessions created")
sCncrCps = flag.Int("cps", 50000, "maximum requests per second sent out")
cpsPool = make(chan struct{}, *sCncrCps)
@@ -53,6 +54,14 @@ func TestSCncrInternal(t *testing.T) {
}
}
// Tests starting here
func TestSCncrJSON(t *testing.T) {
sCncrCfgDIR = "sessintjson"
for _, tst := range sTestsSCncrIT {
t.Run(sCncrCfgDIR, tst)
}
}
// subtests to be executed
var sTestsSCncrIT = []func(t *testing.T){
testSCncrInitConfig,
@@ -110,7 +119,7 @@ func testSCncrLoadTP(t *testing.T) {
var loadInst string
if err := sCncrRPC.Call(utils.ApierV1LoadTariffPlanFromFolder,
&utils.AttrLoadTpFromFolder{FolderPath: path.Join(
*dataDir, "tariffplans", "testit")}, &loadInst); err != nil {
*dataDir, "tariffplans", "tp1cnt")}, &loadInst); err != nil {
t.Error(err)
}
}
@@ -139,26 +148,30 @@ func testSCncrRunSessions(t *testing.T) {
func runSession(acntID string) (err error) {
originID := utils.GenUUID() // each test with it's own OriginID
// topup as much as we know we need
topupDur := time.Duration(13000) * time.Hour
attrSetBalance := utils.AttrSetBalance{
// topup as much as we know we need for one session
topupDur := time.Duration(90) * time.Hour
var addBlcRply string
argsAddBalance := &v1.AttrAddBalance{
Tenant: "cgrates.org",
Account: acntID,
BalanceType: utils.VOICE,
Balance: map[string]interface{}{
utils.ID: "testSCncr",
utils.Value: topupDur.Nanoseconds(),
utils.Weight: 20,
},
}
var reply string
if err = sCncrRPC.Call(utils.ApierV1SetBalance,
attrSetBalance, &reply); err != nil {
Value: float64(topupDur.Nanoseconds())}
if err = sCncrRPC.Call(utils.ApierV1AddBalance, argsAddBalance, &addBlcRply); err != nil {
return
} else if reply != utils.OK {
return fmt.Errorf("received: <%s> to ApierV1.SetBalance", reply)
} else if addBlcRply != utils.OK {
return fmt.Errorf("received: <%s> to ApierV1.AddBalance", addBlcRply)
}
/*
var acnt *engine.Account
acntAttrs := &utils.AttrGetAccount{
Tenant: "cgrates.org",
Account: acntID}
if err = sCncrRPC.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil {
return
} else if vcBlnc := acnt.BalanceMap[utils.VOICE].GetTotalValue(); vcBlnc != float64(topupDur.Nanoseconds()) {
return fmt.Errorf("unexpected voice balance received: %+v", utils.ToIJSON(acnt))
}
*/
time.Sleep(time.Duration(
utils.RandomInteger(0, 100)) * time.Millisecond) // randomize between tests
@@ -184,21 +197,17 @@ func runSession(acntID string) (err error) {
if err := sCncrRPC.Call(utils.SessionSv1AuthorizeEvent, authArgs, &rplyAuth); err != nil {
return err
}
if rplyAuth.MaxUsage != authDur {
return fmt.Errorf("unexpected MaxUsage: %v to auth", rplyAuth.MaxUsage)
}
time.Sleep(time.Duration(
utils.RandomInteger(0, 100)) * time.Millisecond) // randomize between tests
utils.RandomInteger(0, 100)) * time.Millisecond)
// Init the session
initUsage := 90 * time.Second
initUsage := 1 * time.Minute
initArgs := &sessions.V1InitSessionArgs{
InitSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: fmt.Sprintf("TestSCncrInit%s", originID),
Event: map[string]interface{}{
utils.Tenant: "cgrates.org",
utils.OriginID: originID,
utils.RequestType: utils.META_PREPAID,
utils.Account: acntID,
@@ -212,8 +221,89 @@ func runSession(acntID string) (err error) {
if err := sCncrRPC.Call(utils.SessionSv1InitiateSession,
initArgs, &rplyInit); err != nil {
return err
} else if rplyInit.MaxUsage != initUsage {
} else if rplyInit.MaxUsage == 0 {
return fmt.Errorf("unexpected MaxUsage at init: %v", rplyInit.MaxUsage)
}
time.Sleep(time.Duration(
utils.RandomInteger(0, 100)) * time.Millisecond)
// Update the session
updtUsage := 1 * time.Minute
updtArgs := &sessions.V1UpdateSessionArgs{
UpdateSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: fmt.Sprintf("TestSCncrUpdate%s", originID),
Event: map[string]interface{}{
utils.OriginID: originID,
utils.Usage: updtUsage,
},
},
}
var rplyUpdt sessions.V1UpdateSessionReply
if err = sCncrRPC.Call(utils.SessionSv1UpdateSession,
updtArgs, &rplyUpdt); err != nil {
return
} else if rplyUpdt.MaxUsage == 0 {
return fmt.Errorf("unexpected MaxUsage at update: %v", rplyUpdt.MaxUsage)
}
time.Sleep(time.Duration(
utils.RandomInteger(0, 100)) * time.Millisecond)
// Terminate the session
trmntArgs := &sessions.V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: fmt.Sprintf("TestSCncrTerminate%s", originID),
Event: map[string]interface{}{
utils.OriginID: originID,
utils.Usage: time.Duration(90 * time.Second),
},
},
}
var rplyTrmnt string
if err = sCncrRPC.Call(utils.SessionSv1TerminateSession,
trmntArgs, &rplyTrmnt); err != nil {
return
} else if rplyTrmnt != utils.OK {
return fmt.Errorf("received: <%s> to SessionSv1.Terminate", rplyTrmnt)
}
time.Sleep(time.Duration(
utils.RandomInteger(0, 100)) * time.Millisecond)
// processCDR
argsCDR := &utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: fmt.Sprintf("TestSCncrCDR%s", originID),
Event: map[string]interface{}{
utils.OriginID: originID,
},
},
}
var rplyCDR string
if err = sCncrRPC.Call(utils.SessionSv1ProcessCDR,
argsCDR, &rplyCDR); err != nil {
return
} else if rplyCDR != utils.OK {
return fmt.Errorf("received: <%s> to ProcessCDR", rplyCDR)
}
time.Sleep(time.Duration(
utils.RandomInteger(0, 100)) * time.Millisecond)
/*
// make sure the account was properly refunded
var acnt *engine.Account
acntAttrs := &utils.AttrGetAccount{
Tenant: "cgrates.org",
Account: acntID}
if err = sCncrRPC.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil {
return
} else if vcBlnc := acnt.BalanceMap[utils.VOICE].GetTotalValue(); vcBlnc != 0 {
return fmt.Errorf("unexpected voice balance received: %+v", utils.ToIJSON(acnt))
} else if mnBlnc := acnt.BalanceMap[utils.MONETARY].GetTotalValue(); mnBlnc != 0 {
return fmt.Errorf("unexpected voice balance received: %+v", utils.ToIJSON(acnt))
}
*/
return
}

View File

@@ -1467,8 +1467,9 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration,
sr.CD.DurationIndex += notCharged
cc := new(engine.CallCost)
if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().RALsConns, nil, utils.ResponderDebit,
&engine.CallDescriptorWithArgDispatcher{CallDescriptor: sr.CD,
ArgDispatcher: s.ArgDispatcher}, cc); err == nil {
&engine.CallDescriptorWithArgDispatcher{
CallDescriptor: sr.CD,
ArgDispatcher: s.ArgDispatcher}, cc); err == nil {
sr.EventCost.Merge(
engine.NewEventCostFromCallCost(cc, s.CGRID,
sr.Event.GetStringIgnoreErrors(utils.RunID)))
@@ -2518,8 +2519,8 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector,
if err = sS.terminateSession(s,
ev.GetDurationPtrIgnoreErrors(utils.Usage),
ev.GetDurationPtrIgnoreErrors(utils.LastUsed),
utils.TimePointer(ev.GetTimeIgnoreErrors(utils.AnswerTime,
utils.EmptyString)), false); err != nil {
ev.GetTimePtrIgnoreErrors(utils.AnswerTime, utils.EmptyString),
false); err != nil {
return utils.NewErrRALs(err)
}
}
@@ -2636,7 +2637,6 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.ClientConnector,
if cgrEvs, err = s.asCGREvents(); err != nil {
return utils.NewErrServerError(err)
}
var withErrors bool
for _, cgrEv := range cgrEvs {
argsProc := &engine.ArgV1ProcessEvent{
@@ -3160,9 +3160,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
if err = sS.terminateSession(s,
ev.GetDurationPtrIgnoreErrors(utils.Usage),
ev.GetDurationPtrIgnoreErrors(utils.LastUsed),
utils.TimePointer(
ev.GetTimeIgnoreErrors(utils.AnswerTime,
utils.EmptyString)), false); err != nil {
ev.GetTimePtrIgnoreErrors(utils.AnswerTime, utils.EmptyString),
false); err != nil {
return utils.NewErrRALs(err)
}
}