Continuation for cgr-tester

This commit is contained in:
arberkatellari
2023-08-09 10:50:42 -04:00
committed by Dan Christian Bogos
parent bbdef8921e
commit ebc5fd4a21
2 changed files with 144 additions and 63 deletions

View File

@@ -80,24 +80,22 @@ var (
minUsage = cgrTesterFlags.Duration("min_usage", 1*time.Second, "Minimum usage a session can have")
maxUsage = cgrTesterFlags.Duration("max_usage", 5*time.Second, "Maximum usage a session can have")
updateInterval = cgrTesterFlags.Duration("update_interval", 1*time.Second, "Time duration added for each session update")
timeoutDur = cgrTesterFlags.Duration("timeout", 10*time.Second, "After last call, time out after this much duration")
requestType = cgrTesterFlags.String("request_type", utils.MetaRated, "Request type of the call")
digits = cgrTesterFlags.Int("digits", 10, "Number of digits Account and Destination will have")
tor = cgrTesterFlags.String("tor", utils.MetaVoice, "The type of record to use in queries.")
category = cgrTesterFlags.String("category", "call", "The Record category to test.")
tenant = cgrTesterFlags.String("tenant", "cgrates.org", "The type of record to use in queries.")
subject = cgrTesterFlags.String("subject", "1001", "The rating subject to use in queries.")
destination = cgrTesterFlags.String("destination", "1002", "The destination to use in queries.")
json = cgrTesterFlags.Bool("json", false, "Use JSON RPC")
version = cgrTesterFlags.Bool("version", false, "Prints the application version.")
usage = cgrTesterFlags.String("usage", "1m", "The duration to use in call simulation.")
fPath = cgrTesterFlags.String("file_path", "", "read requests from file with path")
reqSep = cgrTesterFlags.String("req_separator", "\n\n", "separator for requests in file")
verbose = cgrTesterFlags.Bool(utils.VerboseCgr, false, "Enable detailed verbose logging output")
err error
iterationsPerSecond = 0
iterationsMux, terminateMx sync.Mutex
countTerminate = 0
tor = cgrTesterFlags.String("tor", utils.MetaVoice, "The type of record to use in queries.")
category = cgrTesterFlags.String("category", "call", "The Record category to test.")
tenant = cgrTesterFlags.String("tenant", "cgrates.org", "The type of record to use in queries.")
subject = cgrTesterFlags.String("subject", "1001", "The rating subject to use in queries.")
destination = cgrTesterFlags.String("destination", "1002", "The destination to use in queries.")
json = cgrTesterFlags.Bool("json", false, "Use JSON RPC")
version = cgrTesterFlags.Bool("version", false, "Prints the application version.")
usage = cgrTesterFlags.String("usage", "1m", "The duration to use in call simulation.")
fPath = cgrTesterFlags.String("file_path", "", "read requests from file with path")
reqSep = cgrTesterFlags.String("req_separator", "\n\n", "separator for requests in file")
verbose = cgrTesterFlags.Bool(utils.VerboseCgr, false, "Enable detailed verbose logging output")
err error
)
func durInternalRater(cd *engine.CallDescriptorWithAPIOpts) (time.Duration, error) {
@@ -185,6 +183,51 @@ func durRemoteRater(cd *engine.CallDescriptorWithAPIOpts) (time.Duration, error)
return time.Since(start), nil
}
func printAllDurationsSummary(authDurations, initDurations, updateDurations, terminateDurations, cdrDurations []time.Duration, reqAuth, reqInit, reqUpdate, reqTerminate, reqCdr uint64) {
fmt.Printf("| %-15s | %-15s | %-15s | %-15s | %-15s | %-15s |\n", "Session", "Min", "Average", "Max", "Requests sent", "Replies received")
fmt.Println("|-----------------|-----------------|-----------------|-----------------|-----------------|------------------|")
processes := []string{"Authorize", "Initiate", "Update", "Terminate", "ProcessCDR"}
allDurations := [][]time.Duration{authDurations, initDurations, updateDurations, terminateDurations, cdrDurations}
reqCounts := []uint64{reqAuth, reqInit, reqUpdate, reqTerminate, reqCdr}
for i, process := range processes {
minDur, maxDur := findMinMaxDurations(allDurations[i])
avgDur := calculateAverageDuration(allDurations[i])
reqCount := reqCounts[i]
completedRuns := len(allDurations[i])
fmt.Printf("| %-15s | %-15s | %-15s | %-15s | %-15d | %-16d |\n", process, minDur, avgDur, maxDur, reqCount, completedRuns)
}
}
func findMinMaxDurations(durations []time.Duration) (time.Duration, time.Duration) {
if len(durations) == 0 {
return 0, 0
}
minDur := durations[0]
maxDur := durations[0]
for _, dur := range durations {
if dur < minDur {
minDur = dur
}
if dur > maxDur {
maxDur = dur
}
}
return minDur, maxDur
}
func calculateAverageDuration(durations []time.Duration) time.Duration {
if len(durations) == 0 {
return 0
}
total := int64(0)
for _, dur := range durations {
total += int64(dur)
}
avg := time.Duration(total / int64(len(durations)))
return avg
}
func main() {
if err := cgrTesterFlags.Parse(os.Args[1:]); err != nil {
return
@@ -291,29 +334,68 @@ func main() {
log.Printf("Digit range: <%v - %v>", digitMin, digitMax)
}
currentCalls := 0
rplyNr := 0
for i := 0; i < *calls+int(maxUsage.Seconds()); i++ {
if currentCalls+*cps > *calls {
*cps = *calls - currentCalls
}
if *updateInterval > *maxUsage {
log.Fatal(`"update_interval" should be smaller than "max_usage"`)
} else if *maxUsage < *minUsage {
log.Fatal(`"min_usage" should be equal or smaller than "max_usage"`)
}
var wg sync.WaitGroup
authDur := make([]time.Duration, 0, *calls)
initDur := make([]time.Duration, 0, *calls)
updateDur := make([]time.Duration, 0, *calls)
terminateDur := make([]time.Duration, 0, *calls)
cdrDur := make([]time.Duration, 0, *calls)
var reqAuth uint64
var reqInit uint64
var reqUpdate uint64
var reqTerminate uint64
var reqCdr uint64
var tmpTime time.Time
timeout := time.After(*timeoutDur)
for i := 0; i < int(math.Ceil(float64(*calls)/float64(*cps))); i++ {
for j := 0; j < *cps; j++ {
currentCalls++
if *calls < currentCalls {
break
}
totalUsage := *maxUsage
if *minUsage != *maxUsage {
totalUsage = time.Duration(utils.RandomInteger(int64(*minUsage), int64(*maxUsage)))
}
wg.Add(1)
go func() {
if err := callSessions(digitMin, digitMax); err != nil {
defer wg.Done()
timeoutStamp := time.Now().Add(totalUsage + *timeoutDur)
if timeoutStamp.Compare(tmpTime) == +1 {
tmpTime = timeoutStamp
timeout = time.After(totalUsage + *timeoutDur + 140*time.Millisecond)
}
if err := callSessions(&authDur, &initDur, &updateDur, &terminateDur, &cdrDur,
&reqAuth, &reqInit, &reqUpdate, &reqTerminate, &reqCdr,
digitMin, digitMax, totalUsage); err != nil {
log.Fatal(err.Error())
}
rplyNr++
}()
}
time.Sleep(1 * time.Second)
currentCalls += *cps
log.Printf("Iteration index: <%v>, cps: <%v>, calls finished <%v>", i, iterationsPerSecond, countTerminate)
iterationsPerSecond = 0
if countTerminate == *calls {
break
}
}
log.Printf("Number of successful calls: %v", rplyNr)
completed := make(chan struct{})
go func() {
defer close(completed)
wg.Wait()
}()
select {
case to := <-timeout:
log.Printf("Timed out: %v", to.Format("2006-01-02 15:04:05"))
printAllDurationsSummary(authDur, initDur, updateDur, terminateDur, cdrDur,
reqAuth, reqInit, reqUpdate, reqTerminate, reqCdr)
case <-completed:
printAllDurationsSummary(authDur, initDur, updateDur, terminateDur, cdrDur,
reqAuth, reqInit, reqUpdate, reqTerminate, reqCdr)
}
case utils.MetaCost:
var timeparsed time.Duration
var err error

View File

@@ -22,6 +22,8 @@ import (
"fmt"
"log"
"math"
"sync"
"sync/atomic"
"time"
"github.com/cenkalti/rpc2"
@@ -41,14 +43,16 @@ func handleDisconnectSession(clnt *rpc2.Client,
return nil
}
func callSessions(digitMin, digitMax int64) (err error) {
func callSessions(authDur, initDur, updateDur, terminateDur, cdrDur *[]time.Duration,
reqAuth, reqInit, reqUpdate, reqTerminate, reqCdr *uint64,
digitMin, digitMax int64, totalUsage time.Duration) (err error) {
if *digits <= 0 {
return fmt.Errorf(`"digits" should be bigger than 0`)
} else if int(math.Pow10(*digits))-1 < *cps {
return fmt.Errorf(`"digits" should amount to be more than "cps"`)
}
var appendMu sync.Mutex
acc := utils.RandomInteger(digitMin, digitMax)
dest := utils.RandomInteger(digitMin, digitMax)
@@ -67,12 +71,6 @@ func callSessions(digitMin, digitMax int64) (err error) {
APIOpts: map[string]any{},
}
if *updateInterval > *maxUsage {
return fmt.Errorf(`"update_interval" should be smaller than "max_usage"`)
} else if *maxUsage < *minUsage {
return fmt.Errorf(`"min_usage" should be equal or smaller than "max_usage"`)
}
clntHandlers := map[string]any{
utils.SessionSv1DisconnectSession: handleDisconnectSession}
brpc, err = utils.NewBiJSONrpcClient(tstCfg.SessionSCfg().ListenBijson, clntHandlers)
@@ -83,18 +81,21 @@ func callSessions(digitMin, digitMax int64) (err error) {
//
// SessionSv1AuthorizeEvent
//
event.Event[utils.SetupTime] = time.Now()
authArgs := &sessions.V1AuthorizeArgs{
GetMaxUsage: true,
CGREvent: event,
}
var authRply sessions.V1AuthorizeReply
atomic.AddUint64(reqAuth, 1)
authStartTime := time.Now()
if err = brpc.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authRply); err != nil {
return
}
iterationsMux.Lock()
iterationsPerSecond++
iterationsMux.Unlock()
appendMu.Lock()
*authDur = append(*authDur, time.Since(authStartTime))
appendMu.Unlock()
if *verbose {
log.Printf("Account: <%v>, Destination: <%v>, SessionSv1AuthorizeEvent reply: <%v>", acc, dest, utils.ToJSON(authRply))
}
@@ -112,9 +113,14 @@ func callSessions(digitMin, digitMax int64) (err error) {
}
var initRply sessions.V1InitSessionReply
atomic.AddUint64(reqInit, 1)
initStartTime := time.Now()
if err = brpc.Call(utils.SessionSv1InitiateSession, initArgs, &initRply); err != nil {
return
}
appendMu.Lock()
*initDur = append(*initDur, time.Since(initStartTime))
appendMu.Unlock()
if *verbose {
log.Printf("Account: <%v>, Destination: <%v>, SessionSv1InitiateSession reply: <%v>", acc, dest, utils.ToJSON(initRply))
}
@@ -122,12 +128,6 @@ func callSessions(digitMin, digitMax int64) (err error) {
//
// SessionSv1UpdateSession
//
var totalUsage time.Duration
if *minUsage == *maxUsage {
totalUsage = *maxUsage
} else {
totalUsage = time.Duration(utils.RandomInteger(int64(*minUsage), int64(*maxUsage)))
}
var currentUsage time.Duration
for currentUsage = time.Duration(1 * time.Second); currentUsage < totalUsage; currentUsage += *updateInterval {
@@ -139,9 +139,14 @@ func callSessions(digitMin, digitMax int64) (err error) {
CGREvent: event,
}
var upRply sessions.V1UpdateSessionReply
atomic.AddUint64(reqUpdate, 1)
updateStartTime := time.Now()
if err = brpc.Call(utils.SessionSv1UpdateSession, upArgs, &upRply); err != nil {
return
}
appendMu.Lock()
*updateDur = append(*updateDur, time.Since(updateStartTime))
appendMu.Unlock()
if *verbose {
log.Printf("Account: <%v>, Destination: <%v>, SessionSv1UpdateSession reply: <%v>", acc, dest, utils.ToJSON(upRply))
}
@@ -160,29 +165,18 @@ func callSessions(digitMin, digitMax int64) (err error) {
CGREvent: event,
}
var tRply string
atomic.AddUint64(reqTerminate, 1)
terminateStartTime := time.Now()
if err = brpc.Call(utils.SessionSv1TerminateSession, tArgs, &tRply); err != nil {
return
}
terminateMx.Lock()
countTerminate++
terminateMx.Unlock()
appendMu.Lock()
*terminateDur = append(*terminateDur, time.Since(terminateStartTime))
appendMu.Unlock()
if *verbose {
log.Printf("Account: <%v>, Destination: <%v>, SessionSv1TerminateSession reply: <%v>", acc, dest, utils.ToJSON(tRply))
}
if countTerminate == *calls {
go func() {
time.Sleep(10 * time.Second)
var sSRply string
if err = brpc.Call(utils.SessionSv1SyncSessions, tArgs, &sSRply); err != nil {
return
}
if *verbose {
log.Printf("Account: <%v>, Destination: <%v>, SessionSv1TerminateSession reply: <%v>", acc, dest, utils.ToJSON(sSRply))
}
}()
}
// Delay between terminate and processCDR for a more realistic case
time.Sleep(time.Duration(utils.RandomInteger(20, 40)) * time.Millisecond)
@@ -191,9 +185,14 @@ func callSessions(digitMin, digitMax int64) (err error) {
//
procArgs := event
var pRply string
atomic.AddUint64(reqCdr, 1)
cdrStartTime := time.Now()
if err = brpc.Call(utils.SessionSv1ProcessCDR, procArgs, &pRply); err != nil {
return
}
appendMu.Lock()
*cdrDur = append(*cdrDur, time.Since(cdrStartTime))
appendMu.Unlock()
if *verbose {
log.Printf("Account: <%v>, Destination: <%v>, SessionSv1ProcessCDR reply: <%v>", acc, dest, utils.ToJSON(pRply))
}