Add digit and verbose flags for cgr-tester

This commit is contained in:
arberkatellari
2023-07-14 11:00:41 -04:00
committed by Dan Christian Bogos
parent c2c65c0b87
commit 13506235ae
12 changed files with 167 additions and 83 deletions

View File

@@ -28,7 +28,6 @@ import (
"os"
"runtime"
"runtime/pprof"
"strings"
"sync"
"time"
@@ -48,8 +47,8 @@ var (
cfgPath = cgrTesterFlags.String("config_path", "",
"Configuration directory path.")
exec = cgrTesterFlags.String(utils.ExecCgr, utils.EmptyString, "Pick what you want to test "+
"<*cps|*get_cost>")
parallel = cgrTesterFlags.Int("parallel", 0, "run n requests in parallel")
"<*sessions|*cost>")
cps = cgrTesterFlags.Int("cps", 100, "run n requests in parallel")
datadbType = cgrTesterFlags.String("datadb_type", cgrConfig.DataDbCfg().Type, "The type of the DataDb database <redis>")
datadbHost = cgrTesterFlags.String("datadb_host", cgrConfig.DataDbCfg().Host, "The DataDb host to connect to.")
datadbPort = cgrTesterFlags.String("datadb_port", cgrConfig.DataDbCfg().Port, "The DataDb port to bind to.")
@@ -77,11 +76,11 @@ var (
dbRedisWriteTimeout = cgrTesterFlags.Duration(utils.RedisWriteTimeoutCfg, cgrConfig.DataDbCfg().Opts.RedisWriteTimeout,
"The amount of wait time until timeout for writing operations")
raterAddress = cgrTesterFlags.String("rater_address", "", "Rater address for remote tests. Empty for internal rater.")
cps = cgrTesterFlags.Int("cps", 100, "Number of calls to be initiated/second.")
minUsage = cgrTesterFlags.Duration("min_usage", 1*time.Second, "Minimum usage a session can have")
maxUsage = cgrTesterFlags.Duration("max_usage", 5*time.Second, "Maximum usage a session can have")
updateInterval = cgrTesterFlags.Duration("update_interval", 1*time.Second, "Time duration added for each session update")
requestType = cgrTesterFlags.String("request_type", utils.MetaRated, "Request type of the call")
digits = cgrTesterFlags.Int("digits", 10, "Number of digits Account and Destination will have")
tor = cgrTesterFlags.String("tor", utils.MetaVoice, "The type of record to use in queries.")
category = cgrTesterFlags.String("category", "call", "The Record category to test.")
@@ -93,9 +92,9 @@ var (
usage = cgrTesterFlags.String("usage", "1m", "The duration to use in call simulation.")
fPath = cgrTesterFlags.String("file_path", "", "read requests from file with path")
reqSep = cgrTesterFlags.String("req_separator", "\n\n", "separator for requests in file")
mu sync.RWMutex
err error
verbose = cgrTesterFlags.Bool(utils.VerboseCgr, false, "Enable detailed verbose logging output")
replyCount sync.RWMutex
err error
)
func durInternalRater(cd *engine.CallDescriptorWithAPIOpts) (time.Duration, error) {
@@ -157,9 +156,9 @@ func durRemoteRater(cd *engine.CallDescriptorWithAPIOpts) (time.Duration, error)
}
defer client.Close()
start := time.Now()
if *parallel > 0 {
if *cps > 0 {
// var divCall *rpc.Call
var sem = make(chan int, *parallel)
var sem = make(chan int, *cps)
var finish = make(chan int)
for i := 0; i < *runs; i++ {
go func() {
@@ -264,7 +263,7 @@ func main() {
}
if *fPath != "" {
frt, err := NewFileReaderTester(*fPath, *raterAddress,
*parallel, *runs, []byte(*reqSep))
*cps, *runs, []byte(*reqSep))
if err != nil {
log.Fatal(err)
}
@@ -274,67 +273,64 @@ func main() {
return
}
if exec != nil && *exec != utils.EmptyString {
tasks := strings.Split(*exec, utils.FieldsSep)
for _, taskID := range tasks {
switch taskID {
default: // unsupported taskID
err = fmt.Errorf("task <%s> is not a supported tester task", taskID)
case utils.MetaCPS:
var wg sync.WaitGroup
rplyNr := 0
for i := 0; i < *cps; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := callSession(); err != nil {
log.Fatal(err.Error())
}
mu.Lock()
rplyNr++
mu.Unlock()
}()
}
wg.Wait()
log.Printf("Number of successful calls: %v", rplyNr)
case utils.MetaGetCost:
var timeparsed time.Duration
var err error
tstart := time.Now()
timeparsed, err = utils.ParseDurationWithNanosecs(*usage)
if err != nil {
return
}
tend := tstart.Add(timeparsed)
cd := &engine.CallDescriptorWithAPIOpts{
CallDescriptor: &engine.CallDescriptor{
TimeStart: tstart,
TimeEnd: tend,
DurationIndex: 60 * time.Second,
ToR: *tor,
Category: *category,
Tenant: *tenant,
Subject: *subject,
Destination: *destination,
},
}
var duration time.Duration
if len(*raterAddress) == 0 {
duration, err = durInternalRater(cd)
} else {
duration, err = durRemoteRater(cd)
}
if err != nil {
switch *exec {
default: // unsupported task
log.Fatalf("task <%s> is not a supported tester task", *exec)
return
case utils.MetaSessionS:
digitMin := int64(math.Pow10(*digits - 1))
digitMax := int64(math.Pow10(*digits)) - 1
if *verbose {
log.Printf("Digit range: <%v - %v>", digitMin, digitMax)
}
var wg sync.WaitGroup
rplyNr := 0
for i := 0; i < *cps; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := callSessions(digitMin, digitMax); err != nil {
log.Fatal(err.Error())
} else {
log.Printf("Elapsed: %s resulted: %f req/s.", duration, float64(*runs)/duration.Seconds())
}
}
if err != nil {
log.Fatal(err)
break
}
replyCount.Lock()
rplyNr++
replyCount.Unlock()
}()
}
wg.Wait()
log.Printf("Number of successful calls: %v", rplyNr)
case utils.MetaCost:
var timeparsed time.Duration
var err error
tstart := time.Now()
timeparsed, err = utils.ParseDurationWithNanosecs(*usage)
if err != nil {
return
}
tend := tstart.Add(timeparsed)
cd := &engine.CallDescriptorWithAPIOpts{
CallDescriptor: &engine.CallDescriptor{
TimeStart: tstart,
TimeEnd: tend,
DurationIndex: 60 * time.Second,
ToR: *tor,
Category: *category,
Tenant: *tenant,
Subject: *subject,
Destination: *destination,
},
}
var duration time.Duration
if len(*raterAddress) == 0 {
duration, err = durInternalRater(cd)
} else {
duration, err = durRemoteRater(cd)
}
if err != nil {
log.Fatal(err.Error())
} else {
log.Printf("Elapsed: %s resulted: %f req/s.", duration, float64(*runs)/duration.Seconds())
}
}

View File

@@ -20,6 +20,8 @@ package main
import (
"fmt"
"log"
"math"
"time"
"github.com/cenkalti/rpc2"
@@ -39,15 +41,24 @@ func handleDisconnectSession(clnt *rpc2.Client,
return nil
}
func callSession() (err error) {
func callSessions(digitMin, digitMax int64) (err error) {
if *digits <= 0 {
return fmt.Errorf(`"digits" should be bigger than 0`)
} else if int(math.Pow10(*digits))-1 < *cps {
return fmt.Errorf(`"digits" should amount to be more than "cps"`)
}
acc := utils.RandomInteger(digitMin, digitMax)
dest := utils.RandomInteger(digitMin, digitMax)
event := &utils.CGREvent{
Tenant: *tenant,
ID: "TheEventID100000",
Time: utils.TimePointer(time.Now()),
Event: map[string]any{
utils.AccountField: *subject,
utils.Destination: *destination,
utils.AccountField: acc,
utils.Destination: dest,
utils.OriginHost: utils.Local,
utils.RequestType: *requestType,
utils.Source: utils.CGRTester,
@@ -62,7 +73,8 @@ func callSession() (err error) {
return fmt.Errorf(`"min_usage" should be smaller than "max_usage"`)
}
clntHandlers := map[string]any{utils.SessionSv1DisconnectSession: handleDisconnectSession}
clntHandlers := map[string]any{
utils.SessionSv1DisconnectSession: handleDisconnectSession}
brpc, err = utils.NewBiJSONrpcClient(tstCfg.SessionSCfg().ListenBijson, clntHandlers)
if err != nil {
return
@@ -80,15 +92,17 @@ func callSession() (err error) {
if err = brpc.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authRply); err != nil {
return
}
if *verbose {
log.Printf("Account: <%v>, Destination: <%v>, SessionSv1AuthorizeEvent reply: <%v>", acc, dest, utils.ToJSON(authRply))
}
// Delay between authorize and initiation for a more realistic case
time.Sleep(time.Duration(utils.RandomInteger(1, 5)) * time.Second)
time.Sleep(time.Duration(utils.RandomInteger(50, 100)) * time.Millisecond)
//
// SessionSv1InitiateSession
//
event.Event[utils.AnswerTime] = time.Now()
initArgs := &sessions.V1InitSessionArgs{
InitSession: true,
CGREvent: event,
@@ -98,16 +112,18 @@ func callSession() (err error) {
if err = brpc.Call(utils.SessionSv1InitiateSession, initArgs, &initRply); err != nil {
return
}
if *verbose {
log.Printf("Account: <%v>, Destination: <%v>, SessionSv1InitiateSession reply: <%v>", acc, dest, utils.ToJSON(initRply))
}
//
// SessionSv1UpdateSession
//
totalUsage := time.Duration(utils.RandomInteger(int64(*minUsage), int64(*maxUsage)))
for currentUsage := time.Duration(0); currentUsage < totalUsage; currentUsage += *updateInterval {
for currentUsage := time.Duration(1 * time.Second); currentUsage < totalUsage; currentUsage += *updateInterval {
event.Event[utils.Usage] = currentUsage.String()
upArgs := &sessions.V1UpdateSessionArgs{
GetAttributes: true,
UpdateSession: true,
CGREvent: event,
}
@@ -115,6 +131,9 @@ func callSession() (err error) {
if err = brpc.Call(utils.SessionSv1UpdateSession, upArgs, &upRply); err != nil {
return
}
if *verbose {
log.Printf("Account: <%v>, Destination: <%v>, SessionSv1UpdateSession reply: <%v>", acc, dest, utils.ToJSON(upRply))
}
}
// Delay between last update and termination for a more realistic case
@@ -133,9 +152,12 @@ func callSession() (err error) {
if err = brpc.Call(utils.SessionSv1TerminateSession, tArgs, &tRply); err != nil {
return
}
if *verbose {
log.Printf("Account: <%v>, Destination: <%v>, SessionSv1TerminateSession reply: <%v>", acc, dest, utils.ToJSON(tRply))
}
// Delay between terminate and processCDR for a more realistic case
time.Sleep(time.Duration(utils.RandomInteger(1, 3)) * time.Millisecond)
time.Sleep(time.Duration(utils.RandomInteger(20, 40)) * time.Millisecond)
//
// SessionSv1ProcessCDR
@@ -145,6 +167,8 @@ func callSession() (err error) {
if err = brpc.Call(utils.SessionSv1ProcessCDR, procArgs, &pRply); err != nil {
return
}
if *verbose {
log.Printf("Account: <%v>, Destination: <%v>, SessionSv1ProcessCDR reply: <%v>", acc, dest, utils.ToJSON(pRply))
}
return
}