Continue sessions cgr-tester implementation

This commit is contained in:
arberkatellari
2023-07-12 11:13:18 -04:00
committed by Dan Christian Bogos
parent 20a91fd889
commit c2c65c0b87
3 changed files with 89 additions and 74 deletions

View File

@@ -28,6 +28,7 @@ import (
"os"
"runtime"
"runtime/pprof"
"strings"
"sync"
"time"
@@ -46,7 +47,8 @@ var (
cfgPath = cgrTesterFlags.String("config_path", "",
"Configuration directory path.")
exec = cgrTesterFlags.String(utils.ExecCgr, utils.EmptyString, "Pick what you want to test "+
"<*cps|*get_cost>")
parallel = cgrTesterFlags.Int("parallel", 0, "run n requests in parallel")
datadbType = cgrTesterFlags.String("datadb_type", cgrConfig.DataDbCfg().Type, "The type of the DataDb database <redis>")
datadbHost = cgrTesterFlags.String("datadb_host", cgrConfig.DataDbCfg().Host, "The DataDb host to connect to.")
@@ -79,6 +81,7 @@ var (
minUsage = cgrTesterFlags.Duration("min_usage", 1*time.Second, "Minimum usage a session can have")
maxUsage = cgrTesterFlags.Duration("max_usage", 5*time.Second, "Maximum usage a session can have")
updateInterval = cgrTesterFlags.Duration("update_interval", 1*time.Second, "Time duration added for each session update")
requestType = cgrTesterFlags.String("request_type", utils.MetaRated, "Request type of the call")
tor = cgrTesterFlags.String("tor", utils.MetaVoice, "The type of record to use in queries.")
category = cgrTesterFlags.String("category", "call", "The Record category to test.")
@@ -91,6 +94,7 @@ var (
fPath = cgrTesterFlags.String("file_path", "", "read requests from file with path")
reqSep = cgrTesterFlags.String("req_separator", "\n\n", "separator for requests in file")
mu sync.RWMutex
err error
)
@@ -270,49 +274,68 @@ func main() {
return
}
var timeparsed time.Duration
var err error
tstart := time.Now()
timeparsed, err = utils.ParseDurationWithNanosecs(*usage)
if err != nil {
return
}
tend := tstart.Add(timeparsed)
cd := &engine.CallDescriptorWithAPIOpts{
CallDescriptor: &engine.CallDescriptor{
TimeStart: tstart,
TimeEnd: tend,
DurationIndex: 60 * time.Second,
ToR: *tor,
Category: *category,
Tenant: *tenant,
Subject: *subject,
Destination: *destination,
},
}
var duration time.Duration
if len(*raterAddress) == 0 {
duration, err = durInternalRater(cd)
} else {
duration, err = durRemoteRater(cd)
}
if err != nil {
log.Fatal(err.Error())
} else {
log.Printf("Elapsed: %s resulted: %f req/s.", duration, float64(*runs)/duration.Seconds())
}
if exec != nil && *exec != utils.EmptyString {
tasks := strings.Split(*exec, utils.FieldsSep)
for _, taskID := range tasks {
switch taskID {
default: // unsupported taskID
err = fmt.Errorf("task <%s> is not a supported tester task", taskID)
case utils.MetaCPS:
var wg sync.WaitGroup
rplyNr := 0
for i := 0; i < *cps; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := callSession(); err != nil {
log.Fatal(err.Error())
}
mu.Lock()
rplyNr++
mu.Unlock()
var wg sync.WaitGroup
for i := 0; i < *cps; i++ {
wg.Add(1)
log.Println("run num: ", i)
go func() {
defer wg.Done()
if err = callSession(); err != nil {
log.Fatal(err.Error())
}()
}
wg.Wait()
log.Printf("Number of successful calls: %v", rplyNr)
case utils.MetaGetCost:
var timeparsed time.Duration
var err error
tstart := time.Now()
timeparsed, err = utils.ParseDurationWithNanosecs(*usage)
if err != nil {
return
}
tend := tstart.Add(timeparsed)
cd := &engine.CallDescriptorWithAPIOpts{
CallDescriptor: &engine.CallDescriptor{
TimeStart: tstart,
TimeEnd: tend,
DurationIndex: 60 * time.Second,
ToR: *tor,
Category: *category,
Tenant: *tenant,
Subject: *subject,
Destination: *destination,
},
}
var duration time.Duration
if len(*raterAddress) == 0 {
duration, err = durInternalRater(cd)
} else {
duration, err = durRemoteRater(cd)
}
if err != nil {
log.Fatal(err.Error())
} else {
log.Printf("Elapsed: %s resulted: %f req/s.", duration, float64(*runs)/duration.Seconds())
}
}
}()
if err != nil {
log.Fatal(err)
break
}
}
}
wg.Wait()
}

View File

@@ -20,7 +20,6 @@ package main
import (
"fmt"
"log"
"time"
"github.com/cenkalti/rpc2"
@@ -31,7 +30,6 @@ import (
var (
brpc *rpc2.Client
disconnectEvChan = make(chan *utils.AttrDisconnectSession, 1)
cpsCounter int
)
func handleDisconnectSession(clnt *rpc2.Client,
@@ -43,11 +41,6 @@ func handleDisconnectSession(clnt *rpc2.Client,
func callSession() (err error) {
var currentUsage time.Duration
st := time.Now().Add(2 * time.Second)
at := time.Now().Add(10 * time.Second)
event := &utils.CGREvent{
Tenant: *tenant,
ID: "TheEventID100000",
@@ -55,26 +48,20 @@ func callSession() (err error) {
Event: map[string]any{
utils.AccountField: *subject,
utils.Destination: *destination,
utils.OriginHost: "local",
utils.RequestType: utils.MetaRated,
utils.SetupTime: st,
utils.Source: "cgr_tester",
utils.OriginHost: utils.Local,
utils.RequestType: *requestType,
utils.Source: utils.CGRTester,
utils.OriginID: utils.GenUUID(),
},
APIOpts: map[string]any{},
}
cpsCounter += 1
log.Printf("current call number: %+v", cpsCounter)
if *updateInterval > *maxUsage {
return fmt.Errorf(`"update_interval" should be smaller than "max_usage"`)
} else if *maxUsage <= *minUsage {
return fmt.Errorf(`"min_usage" should be smaller than "max_usage"`)
}
tstCfg.SessionSCfg().DebitInterval = 0
clntHandlers := map[string]any{utils.SessionSv1DisconnectSession: handleDisconnectSession}
brpc, err = utils.NewBiJSONrpcClient(tstCfg.SessionSCfg().ListenBijson, clntHandlers)
if err != nil {
@@ -84,6 +71,7 @@ func callSession() (err error) {
//
// SessionSv1AuthorizeEvent
//
event.Event[utils.SetupTime] = time.Now()
authArgs := &sessions.V1AuthorizeArgs{
GetMaxUsage: true,
CGREvent: event,
@@ -92,13 +80,14 @@ func callSession() (err error) {
if err = brpc.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authRply); err != nil {
return
}
// log.Printf("auth: %+v", utils.ToJSON(authRply))
// Delay between authorize and initiation for a more realistic case
time.Sleep(time.Duration(utils.RandomInteger(1, 5)) * time.Second)
//
// SessionSv1InitiateSession
//
event.Event[utils.AnswerTime] = at
event.Event[utils.RequestType] = utils.MetaRated
event.Event[utils.AnswerTime] = time.Now()
initArgs := &sessions.V1InitSessionArgs{
InitSession: true,
@@ -109,21 +98,14 @@ func callSession() (err error) {
if err = brpc.Call(utils.SessionSv1InitiateSession, initArgs, &initRply); err != nil {
return
}
// log.Printf("init: %+v.", utils.ToJSON(initRply))
//
// SessionSv1UpdateSession
//
totalUsage := time.Duration(utils.RandomInteger(int64(*minUsage), int64(*maxUsage)))
log.Println("randUsage", totalUsage)
for currentUsage < totalUsage {
// log.Println("currentUsage", currentUsage)
currentUsage += *updateInterval
if currentUsage >= totalUsage {
break
}
event.Event[utils.Usage] = currentUsage.String()
for currentUsage := time.Duration(0); currentUsage < totalUsage; currentUsage += *updateInterval {
event.Event[utils.Usage] = currentUsage.String()
upArgs := &sessions.V1UpdateSessionArgs{
GetAttributes: true,
UpdateSession: true,
@@ -133,9 +115,11 @@ func callSession() (err error) {
if err = brpc.Call(utils.SessionSv1UpdateSession, upArgs, &upRply); err != nil {
return
}
// log.Printf("update: %+v.", utils.ToJSON(upRply))
}
// Delay between last update and termination for a more realistic case
time.Sleep(time.Duration(utils.RandomInteger(10, 20)) * time.Millisecond)
//
// SessionSv1TerminateSession
//
@@ -149,7 +133,9 @@ func callSession() (err error) {
if err = brpc.Call(utils.SessionSv1TerminateSession, tArgs, &tRply); err != nil {
return
}
// log.Printf("terminate: %+v.", utils.ToJSON(tRply))
// Delay between terminate and processCDR for a more realistic case
time.Sleep(time.Duration(utils.RandomInteger(1, 3)) * time.Millisecond)
//
// SessionSv1ProcessCDR
@@ -159,7 +145,6 @@ func callSession() (err error) {
if err = brpc.Call(utils.SessionSv1ProcessCDR, procArgs, &pRply); err != nil {
return
}
// log.Printf("process: %+v.", utils.ToJSON(pRply))
return
}