From c2c65c0b87090e8cf5caa674bb4ee408bd2a16a8 Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Wed, 12 Jul 2023 11:13:18 -0400 Subject: [PATCH] Continue sessions cgr-tester implementation --- cmd/cgr-tester/cgr-tester.go | 109 ++++++++++++-------- cmd/cgr-tester/{callproc.go => sessions.go} | 47 +++------ utils/consts.go | 7 ++ 3 files changed, 89 insertions(+), 74 deletions(-) rename cmd/cgr-tester/{callproc.go => sessions.go} (78%) diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index ce912818b..f8d713b11 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -28,6 +28,7 @@ import ( "os" "runtime" "runtime/pprof" + "strings" "sync" "time" @@ -46,7 +47,8 @@ var ( cfgPath = cgrTesterFlags.String("config_path", "", "Configuration directory path.") - + exec = cgrTesterFlags.String(utils.ExecCgr, utils.EmptyString, "Pick what you want to test "+ + "<*cps|*get_cost>") parallel = cgrTesterFlags.Int("parallel", 0, "run n requests in parallel") datadbType = cgrTesterFlags.String("datadb_type", cgrConfig.DataDbCfg().Type, "The type of the DataDb database ") datadbHost = cgrTesterFlags.String("datadb_host", cgrConfig.DataDbCfg().Host, "The DataDb host to connect to.") @@ -79,6 +81,7 @@ var ( minUsage = cgrTesterFlags.Duration("min_usage", 1*time.Second, "Minimum usage a session can have") maxUsage = cgrTesterFlags.Duration("max_usage", 5*time.Second, "Maximum usage a session can have") updateInterval = cgrTesterFlags.Duration("update_interval", 1*time.Second, "Time duration added for each session update") + requestType = cgrTesterFlags.String("request_type", utils.MetaRated, "Request type of the call") tor = cgrTesterFlags.String("tor", utils.MetaVoice, "The type of record to use in queries.") category = cgrTesterFlags.String("category", "call", "The Record category to test.") @@ -91,6 +94,7 @@ var ( fPath = cgrTesterFlags.String("file_path", "", "read requests from file with path") reqSep = cgrTesterFlags.String("req_separator", "\n\n", "separator for requests in file") + mu sync.RWMutex err error ) @@ -270,49 +274,68 @@ func main() { return } - var timeparsed time.Duration - var err error - tstart := time.Now() - timeparsed, err = utils.ParseDurationWithNanosecs(*usage) - if err != nil { - return - } - tend := tstart.Add(timeparsed) - cd := &engine.CallDescriptorWithAPIOpts{ - CallDescriptor: &engine.CallDescriptor{ - TimeStart: tstart, - TimeEnd: tend, - DurationIndex: 60 * time.Second, - ToR: *tor, - Category: *category, - Tenant: *tenant, - Subject: *subject, - Destination: *destination, - }, - } - var duration time.Duration - if len(*raterAddress) == 0 { - duration, err = durInternalRater(cd) - } else { - duration, err = durRemoteRater(cd) - } - if err != nil { - log.Fatal(err.Error()) - } else { - log.Printf("Elapsed: %s resulted: %f req/s.", duration, float64(*runs)/duration.Seconds()) - } + if exec != nil && *exec != utils.EmptyString { + tasks := strings.Split(*exec, utils.FieldsSep) + for _, taskID := range tasks { + switch taskID { + default: // unsupported taskID + err = fmt.Errorf("task <%s> is not a supported tester task", taskID) + case utils.MetaCPS: + var wg sync.WaitGroup + rplyNr := 0 + for i := 0; i < *cps; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if err := callSession(); err != nil { + log.Fatal(err.Error()) + } + mu.Lock() + rplyNr++ + mu.Unlock() - var wg sync.WaitGroup - for i := 0; i < *cps; i++ { - wg.Add(1) - log.Println("run num: ", i) - - go func() { - defer wg.Done() - if err = callSession(); err != nil { - log.Fatal(err.Error()) + }() + } + wg.Wait() + log.Printf("Number of successful calls: %v", rplyNr) + case utils.MetaGetCost: + var timeparsed time.Duration + var err error + tstart := time.Now() + timeparsed, err = utils.ParseDurationWithNanosecs(*usage) + if err != nil { + return + } + tend := tstart.Add(timeparsed) + cd := &engine.CallDescriptorWithAPIOpts{ + CallDescriptor: &engine.CallDescriptor{ + TimeStart: tstart, + TimeEnd: tend, + DurationIndex: 60 * time.Second, + ToR: *tor, + Category: *category, + Tenant: *tenant, + Subject: *subject, + Destination: *destination, + }, + } + var duration time.Duration + if len(*raterAddress) == 0 { + duration, err = durInternalRater(cd) + } else { + duration, err = durRemoteRater(cd) + } + if err != nil { + log.Fatal(err.Error()) + } else { + log.Printf("Elapsed: %s resulted: %f req/s.", duration, float64(*runs)/duration.Seconds()) + } } - }() + if err != nil { + log.Fatal(err) + break + } + } } - wg.Wait() + } diff --git a/cmd/cgr-tester/callproc.go b/cmd/cgr-tester/sessions.go similarity index 78% rename from cmd/cgr-tester/callproc.go rename to cmd/cgr-tester/sessions.go index a64c0aed6..8801465f2 100644 --- a/cmd/cgr-tester/callproc.go +++ b/cmd/cgr-tester/sessions.go @@ -20,7 +20,6 @@ package main import ( "fmt" - "log" "time" "github.com/cenkalti/rpc2" @@ -31,7 +30,6 @@ import ( var ( brpc *rpc2.Client disconnectEvChan = make(chan *utils.AttrDisconnectSession, 1) - cpsCounter int ) func handleDisconnectSession(clnt *rpc2.Client, @@ -43,11 +41,6 @@ func handleDisconnectSession(clnt *rpc2.Client, func callSession() (err error) { - var currentUsage time.Duration - - st := time.Now().Add(2 * time.Second) - at := time.Now().Add(10 * time.Second) - event := &utils.CGREvent{ Tenant: *tenant, ID: "TheEventID100000", @@ -55,26 +48,20 @@ func callSession() (err error) { Event: map[string]any{ utils.AccountField: *subject, utils.Destination: *destination, - utils.OriginHost: "local", - utils.RequestType: utils.MetaRated, - utils.SetupTime: st, - utils.Source: "cgr_tester", + utils.OriginHost: utils.Local, + utils.RequestType: *requestType, + utils.Source: utils.CGRTester, utils.OriginID: utils.GenUUID(), }, APIOpts: map[string]any{}, } - cpsCounter += 1 - log.Printf("current call number: %+v", cpsCounter) - if *updateInterval > *maxUsage { return fmt.Errorf(`"update_interval" should be smaller than "max_usage"`) } else if *maxUsage <= *minUsage { return fmt.Errorf(`"min_usage" should be smaller than "max_usage"`) } - tstCfg.SessionSCfg().DebitInterval = 0 - clntHandlers := map[string]any{utils.SessionSv1DisconnectSession: handleDisconnectSession} brpc, err = utils.NewBiJSONrpcClient(tstCfg.SessionSCfg().ListenBijson, clntHandlers) if err != nil { @@ -84,6 +71,7 @@ func callSession() (err error) { // // SessionSv1AuthorizeEvent // + event.Event[utils.SetupTime] = time.Now() authArgs := &sessions.V1AuthorizeArgs{ GetMaxUsage: true, CGREvent: event, @@ -92,13 +80,14 @@ func callSession() (err error) { if err = brpc.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authRply); err != nil { return } - // log.Printf("auth: %+v", utils.ToJSON(authRply)) + + // Delay between authorize and initiation for a more realistic case + time.Sleep(time.Duration(utils.RandomInteger(1, 5)) * time.Second) // // SessionSv1InitiateSession // - event.Event[utils.AnswerTime] = at - event.Event[utils.RequestType] = utils.MetaRated + event.Event[utils.AnswerTime] = time.Now() initArgs := &sessions.V1InitSessionArgs{ InitSession: true, @@ -109,21 +98,14 @@ func callSession() (err error) { if err = brpc.Call(utils.SessionSv1InitiateSession, initArgs, &initRply); err != nil { return } - // log.Printf("init: %+v.", utils.ToJSON(initRply)) // // SessionSv1UpdateSession // totalUsage := time.Duration(utils.RandomInteger(int64(*minUsage), int64(*maxUsage))) - log.Println("randUsage", totalUsage) - for currentUsage < totalUsage { - // log.Println("currentUsage", currentUsage) - currentUsage += *updateInterval - if currentUsage >= totalUsage { - break - } - event.Event[utils.Usage] = currentUsage.String() + for currentUsage := time.Duration(0); currentUsage < totalUsage; currentUsage += *updateInterval { + event.Event[utils.Usage] = currentUsage.String() upArgs := &sessions.V1UpdateSessionArgs{ GetAttributes: true, UpdateSession: true, @@ -133,9 +115,11 @@ func callSession() (err error) { if err = brpc.Call(utils.SessionSv1UpdateSession, upArgs, &upRply); err != nil { return } - // log.Printf("update: %+v.", utils.ToJSON(upRply)) } + // Delay between last update and termination for a more realistic case + time.Sleep(time.Duration(utils.RandomInteger(10, 20)) * time.Millisecond) + // // SessionSv1TerminateSession // @@ -149,7 +133,9 @@ func callSession() (err error) { if err = brpc.Call(utils.SessionSv1TerminateSession, tArgs, &tRply); err != nil { return } - // log.Printf("terminate: %+v.", utils.ToJSON(tRply)) + + // Delay between terminate and processCDR for a more realistic case + time.Sleep(time.Duration(utils.RandomInteger(1, 3)) * time.Millisecond) // // SessionSv1ProcessCDR @@ -159,7 +145,6 @@ func callSession() (err error) { if err = brpc.Call(utils.SessionSv1ProcessCDR, procArgs, &pRply); err != nil { return } - // log.Printf("process: %+v.", utils.ToJSON(pRply)) return } diff --git a/utils/consts.go b/utils/consts.go index 8604cb323..afb82e54b 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1086,6 +1086,13 @@ const ( CapStatQueues = "StatQueues" ) +// cgr-tester Metas +const ( + MetaGetCost = "*get_cost" + MetaCPS = "*cps" + CGRTester = "CGRTester" +) + const ( TpRatingPlans = "TpRatingPlans" TpFilters = "TpFilters"