diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index b033f6dd3..ee8bfad5a 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -80,24 +80,22 @@ var ( minUsage = cgrTesterFlags.Duration("min_usage", 1*time.Second, "Minimum usage a session can have") maxUsage = cgrTesterFlags.Duration("max_usage", 5*time.Second, "Maximum usage a session can have") updateInterval = cgrTesterFlags.Duration("update_interval", 1*time.Second, "Time duration added for each session update") + timeoutDur = cgrTesterFlags.Duration("timeout", 10*time.Second, "After last call, time out after this much duration") requestType = cgrTesterFlags.String("request_type", utils.MetaRated, "Request type of the call") digits = cgrTesterFlags.Int("digits", 10, "Number of digits Account and Destination will have") - tor = cgrTesterFlags.String("tor", utils.MetaVoice, "The type of record to use in queries.") - category = cgrTesterFlags.String("category", "call", "The Record category to test.") - tenant = cgrTesterFlags.String("tenant", "cgrates.org", "The type of record to use in queries.") - subject = cgrTesterFlags.String("subject", "1001", "The rating subject to use in queries.") - destination = cgrTesterFlags.String("destination", "1002", "The destination to use in queries.") - json = cgrTesterFlags.Bool("json", false, "Use JSON RPC") - version = cgrTesterFlags.Bool("version", false, "Prints the application version.") - usage = cgrTesterFlags.String("usage", "1m", "The duration to use in call simulation.") - fPath = cgrTesterFlags.String("file_path", "", "read requests from file with path") - reqSep = cgrTesterFlags.String("req_separator", "\n\n", "separator for requests in file") - verbose = cgrTesterFlags.Bool(utils.VerboseCgr, false, "Enable detailed verbose logging output") - err error - iterationsPerSecond = 0 - iterationsMux, terminateMx sync.Mutex - countTerminate = 0 + tor = cgrTesterFlags.String("tor", utils.MetaVoice, "The type of record to use in queries.") + category = cgrTesterFlags.String("category", "call", "The Record category to test.") + tenant = cgrTesterFlags.String("tenant", "cgrates.org", "The type of record to use in queries.") + subject = cgrTesterFlags.String("subject", "1001", "The rating subject to use in queries.") + destination = cgrTesterFlags.String("destination", "1002", "The destination to use in queries.") + json = cgrTesterFlags.Bool("json", false, "Use JSON RPC") + version = cgrTesterFlags.Bool("version", false, "Prints the application version.") + usage = cgrTesterFlags.String("usage", "1m", "The duration to use in call simulation.") + fPath = cgrTesterFlags.String("file_path", "", "read requests from file with path") + reqSep = cgrTesterFlags.String("req_separator", "\n\n", "separator for requests in file") + verbose = cgrTesterFlags.Bool(utils.VerboseCgr, false, "Enable detailed verbose logging output") + err error ) func durInternalRater(cd *engine.CallDescriptorWithAPIOpts) (time.Duration, error) { @@ -185,6 +183,51 @@ func durRemoteRater(cd *engine.CallDescriptorWithAPIOpts) (time.Duration, error) return time.Since(start), nil } +func printAllDurationsSummary(authDurations, initDurations, updateDurations, terminateDurations, cdrDurations []time.Duration, reqAuth, reqInit, reqUpdate, reqTerminate, reqCdr uint64) { + fmt.Printf("| %-15s | %-15s | %-15s | %-15s | %-15s | %-15s |\n", "Session", "Min", "Average", "Max", "Requests sent", "Replies received") + fmt.Println("|-----------------|-----------------|-----------------|-----------------|-----------------|------------------|") + + processes := []string{"Authorize", "Initiate", "Update", "Terminate", "ProcessCDR"} + allDurations := [][]time.Duration{authDurations, initDurations, updateDurations, terminateDurations, cdrDurations} + reqCounts := []uint64{reqAuth, reqInit, reqUpdate, reqTerminate, reqCdr} + + for i, process := range processes { + minDur, maxDur := findMinMaxDurations(allDurations[i]) + avgDur := calculateAverageDuration(allDurations[i]) + reqCount := reqCounts[i] + completedRuns := len(allDurations[i]) + + fmt.Printf("| %-15s | %-15s | %-15s | %-15s | %-15d | %-16d |\n", process, minDur, avgDur, maxDur, reqCount, completedRuns) + } +} +func findMinMaxDurations(durations []time.Duration) (time.Duration, time.Duration) { + if len(durations) == 0 { + return 0, 0 + } + minDur := durations[0] + maxDur := durations[0] + for _, dur := range durations { + if dur < minDur { + minDur = dur + } + if dur > maxDur { + maxDur = dur + } + } + return minDur, maxDur +} +func calculateAverageDuration(durations []time.Duration) time.Duration { + if len(durations) == 0 { + return 0 + } + total := int64(0) + for _, dur := range durations { + total += int64(dur) + } + avg := time.Duration(total / int64(len(durations))) + return avg +} + func main() { if err := cgrTesterFlags.Parse(os.Args[1:]); err != nil { return @@ -291,29 +334,68 @@ func main() { log.Printf("Digit range: <%v - %v>", digitMin, digitMax) } currentCalls := 0 - rplyNr := 0 - - for i := 0; i < *calls+int(maxUsage.Seconds()); i++ { - if currentCalls+*cps > *calls { - *cps = *calls - currentCalls - } + if *updateInterval > *maxUsage { + log.Fatal(`"update_interval" should be smaller than "max_usage"`) + } else if *maxUsage < *minUsage { + log.Fatal(`"min_usage" should be equal or smaller than "max_usage"`) + } + var wg sync.WaitGroup + authDur := make([]time.Duration, 0, *calls) + initDur := make([]time.Duration, 0, *calls) + updateDur := make([]time.Duration, 0, *calls) + terminateDur := make([]time.Duration, 0, *calls) + cdrDur := make([]time.Duration, 0, *calls) + var reqAuth uint64 + var reqInit uint64 + var reqUpdate uint64 + var reqTerminate uint64 + var reqCdr uint64 + var tmpTime time.Time + timeout := time.After(*timeoutDur) + for i := 0; i < int(math.Ceil(float64(*calls)/float64(*cps))); i++ { for j := 0; j < *cps; j++ { + currentCalls++ + if *calls < currentCalls { + break + } + totalUsage := *maxUsage + if *minUsage != *maxUsage { + totalUsage = time.Duration(utils.RandomInteger(int64(*minUsage), int64(*maxUsage))) + } + wg.Add(1) go func() { - if err := callSessions(digitMin, digitMax); err != nil { + defer wg.Done() + timeoutStamp := time.Now().Add(totalUsage + *timeoutDur) + if timeoutStamp.Compare(tmpTime) == +1 { + tmpTime = timeoutStamp + timeout = time.After(totalUsage + *timeoutDur + 140*time.Millisecond) + } + if err := callSessions(&authDur, &initDur, &updateDur, &terminateDur, &cdrDur, + &reqAuth, &reqInit, &reqUpdate, &reqTerminate, &reqCdr, + digitMin, digitMax, totalUsage); err != nil { log.Fatal(err.Error()) } - rplyNr++ }() + } time.Sleep(1 * time.Second) - currentCalls += *cps - log.Printf("Iteration index: <%v>, cps: <%v>, calls finished <%v>", i, iterationsPerSecond, countTerminate) - iterationsPerSecond = 0 - if countTerminate == *calls { - break - } } - log.Printf("Number of successful calls: %v", rplyNr) + completed := make(chan struct{}) + go func() { + defer close(completed) + wg.Wait() + + }() + + select { + case to := <-timeout: + log.Printf("Timed out: %v", to.Format("2006-01-02 15:04:05")) + printAllDurationsSummary(authDur, initDur, updateDur, terminateDur, cdrDur, + reqAuth, reqInit, reqUpdate, reqTerminate, reqCdr) + case <-completed: + printAllDurationsSummary(authDur, initDur, updateDur, terminateDur, cdrDur, + reqAuth, reqInit, reqUpdate, reqTerminate, reqCdr) + } case utils.MetaCost: var timeparsed time.Duration var err error diff --git a/cmd/cgr-tester/sessions.go b/cmd/cgr-tester/sessions.go index 232314c7e..1ee8453d7 100644 --- a/cmd/cgr-tester/sessions.go +++ b/cmd/cgr-tester/sessions.go @@ -22,6 +22,8 @@ import ( "fmt" "log" "math" + "sync" + "sync/atomic" "time" "github.com/cenkalti/rpc2" @@ -41,14 +43,16 @@ func handleDisconnectSession(clnt *rpc2.Client, return nil } -func callSessions(digitMin, digitMax int64) (err error) { +func callSessions(authDur, initDur, updateDur, terminateDur, cdrDur *[]time.Duration, + reqAuth, reqInit, reqUpdate, reqTerminate, reqCdr *uint64, + digitMin, digitMax int64, totalUsage time.Duration) (err error) { if *digits <= 0 { return fmt.Errorf(`"digits" should be bigger than 0`) } else if int(math.Pow10(*digits))-1 < *cps { return fmt.Errorf(`"digits" should amount to be more than "cps"`) } - + var appendMu sync.Mutex acc := utils.RandomInteger(digitMin, digitMax) dest := utils.RandomInteger(digitMin, digitMax) @@ -67,12 +71,6 @@ func callSessions(digitMin, digitMax int64) (err error) { APIOpts: map[string]any{}, } - if *updateInterval > *maxUsage { - return fmt.Errorf(`"update_interval" should be smaller than "max_usage"`) - } else if *maxUsage < *minUsage { - return fmt.Errorf(`"min_usage" should be equal or smaller than "max_usage"`) - } - clntHandlers := map[string]any{ utils.SessionSv1DisconnectSession: handleDisconnectSession} brpc, err = utils.NewBiJSONrpcClient(tstCfg.SessionSCfg().ListenBijson, clntHandlers) @@ -83,18 +81,21 @@ func callSessions(digitMin, digitMax int64) (err error) { // // SessionSv1AuthorizeEvent // + event.Event[utils.SetupTime] = time.Now() authArgs := &sessions.V1AuthorizeArgs{ GetMaxUsage: true, CGREvent: event, } var authRply sessions.V1AuthorizeReply + atomic.AddUint64(reqAuth, 1) + authStartTime := time.Now() if err = brpc.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authRply); err != nil { return } - iterationsMux.Lock() - iterationsPerSecond++ - iterationsMux.Unlock() + appendMu.Lock() + *authDur = append(*authDur, time.Since(authStartTime)) + appendMu.Unlock() if *verbose { log.Printf("Account: <%v>, Destination: <%v>, SessionSv1AuthorizeEvent reply: <%v>", acc, dest, utils.ToJSON(authRply)) } @@ -112,9 +113,14 @@ func callSessions(digitMin, digitMax int64) (err error) { } var initRply sessions.V1InitSessionReply + atomic.AddUint64(reqInit, 1) + initStartTime := time.Now() if err = brpc.Call(utils.SessionSv1InitiateSession, initArgs, &initRply); err != nil { return } + appendMu.Lock() + *initDur = append(*initDur, time.Since(initStartTime)) + appendMu.Unlock() if *verbose { log.Printf("Account: <%v>, Destination: <%v>, SessionSv1InitiateSession reply: <%v>", acc, dest, utils.ToJSON(initRply)) } @@ -122,12 +128,6 @@ func callSessions(digitMin, digitMax int64) (err error) { // // SessionSv1UpdateSession // - var totalUsage time.Duration - if *minUsage == *maxUsage { - totalUsage = *maxUsage - } else { - totalUsage = time.Duration(utils.RandomInteger(int64(*minUsage), int64(*maxUsage))) - } var currentUsage time.Duration for currentUsage = time.Duration(1 * time.Second); currentUsage < totalUsage; currentUsage += *updateInterval { @@ -139,9 +139,14 @@ func callSessions(digitMin, digitMax int64) (err error) { CGREvent: event, } var upRply sessions.V1UpdateSessionReply + atomic.AddUint64(reqUpdate, 1) + updateStartTime := time.Now() if err = brpc.Call(utils.SessionSv1UpdateSession, upArgs, &upRply); err != nil { return } + appendMu.Lock() + *updateDur = append(*updateDur, time.Since(updateStartTime)) + appendMu.Unlock() if *verbose { log.Printf("Account: <%v>, Destination: <%v>, SessionSv1UpdateSession reply: <%v>", acc, dest, utils.ToJSON(upRply)) } @@ -160,29 +165,18 @@ func callSessions(digitMin, digitMax int64) (err error) { CGREvent: event, } var tRply string + atomic.AddUint64(reqTerminate, 1) + terminateStartTime := time.Now() if err = brpc.Call(utils.SessionSv1TerminateSession, tArgs, &tRply); err != nil { return } - terminateMx.Lock() - countTerminate++ - terminateMx.Unlock() + appendMu.Lock() + *terminateDur = append(*terminateDur, time.Since(terminateStartTime)) + appendMu.Unlock() if *verbose { log.Printf("Account: <%v>, Destination: <%v>, SessionSv1TerminateSession reply: <%v>", acc, dest, utils.ToJSON(tRply)) } - if countTerminate == *calls { - go func() { - time.Sleep(10 * time.Second) - var sSRply string - if err = brpc.Call(utils.SessionSv1SyncSessions, tArgs, &sSRply); err != nil { - return - } - if *verbose { - log.Printf("Account: <%v>, Destination: <%v>, SessionSv1TerminateSession reply: <%v>", acc, dest, utils.ToJSON(sSRply)) - } - }() - } - // Delay between terminate and processCDR for a more realistic case time.Sleep(time.Duration(utils.RandomInteger(20, 40)) * time.Millisecond) @@ -191,9 +185,14 @@ func callSessions(digitMin, digitMax int64) (err error) { // procArgs := event var pRply string + atomic.AddUint64(reqCdr, 1) + cdrStartTime := time.Now() if err = brpc.Call(utils.SessionSv1ProcessCDR, procArgs, &pRply); err != nil { return } + appendMu.Lock() + *cdrDur = append(*cdrDur, time.Since(cdrStartTime)) + appendMu.Unlock() if *verbose { log.Printf("Account: <%v>, Destination: <%v>, SessionSv1ProcessCDR reply: <%v>", acc, dest, utils.ToJSON(pRply)) }