diff --git a/cmd/cgr-tester/callproc.go b/cmd/cgr-tester/callproc.go new file mode 100644 index 000000000..a64c0aed6 --- /dev/null +++ b/cmd/cgr-tester/callproc.go @@ -0,0 +1,165 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package main + +import ( + "fmt" + "log" + "time" + + "github.com/cenkalti/rpc2" + "github.com/cgrates/cgrates/sessions" + "github.com/cgrates/cgrates/utils" +) + +var ( + brpc *rpc2.Client + disconnectEvChan = make(chan *utils.AttrDisconnectSession, 1) + cpsCounter int +) + +func handleDisconnectSession(clnt *rpc2.Client, + args *utils.AttrDisconnectSession, reply *string) error { + disconnectEvChan <- args + *reply = utils.OK + return nil +} + +func callSession() (err error) { + + var currentUsage time.Duration + + st := time.Now().Add(2 * time.Second) + at := time.Now().Add(10 * time.Second) + + event := &utils.CGREvent{ + Tenant: *tenant, + ID: "TheEventID100000", + Time: utils.TimePointer(time.Now()), + Event: map[string]any{ + utils.AccountField: *subject, + utils.Destination: *destination, + utils.OriginHost: "local", + utils.RequestType: utils.MetaRated, + utils.SetupTime: st, + utils.Source: "cgr_tester", + utils.OriginID: utils.GenUUID(), + }, + APIOpts: map[string]any{}, + } + + cpsCounter += 1 + log.Printf("current call number: %+v", cpsCounter) + + if *updateInterval > *maxUsage { + return fmt.Errorf(`"update_interval" should be smaller than "max_usage"`) + } else if *maxUsage <= *minUsage { + return fmt.Errorf(`"min_usage" should be smaller than "max_usage"`) + } + + tstCfg.SessionSCfg().DebitInterval = 0 + + clntHandlers := map[string]any{utils.SessionSv1DisconnectSession: handleDisconnectSession} + brpc, err = utils.NewBiJSONrpcClient(tstCfg.SessionSCfg().ListenBijson, clntHandlers) + if err != nil { + return + } + + // + // SessionSv1AuthorizeEvent + // + authArgs := &sessions.V1AuthorizeArgs{ + GetMaxUsage: true, + CGREvent: event, + } + var authRply sessions.V1AuthorizeReply + if err = brpc.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authRply); err != nil { + return + } + // log.Printf("auth: %+v", utils.ToJSON(authRply)) + + // + // SessionSv1InitiateSession + // + event.Event[utils.AnswerTime] = at + event.Event[utils.RequestType] = utils.MetaRated + + initArgs := &sessions.V1InitSessionArgs{ + InitSession: true, + CGREvent: event, + } + + var initRply sessions.V1InitSessionReply + if err = brpc.Call(utils.SessionSv1InitiateSession, initArgs, &initRply); err != nil { + return + } + // log.Printf("init: %+v.", utils.ToJSON(initRply)) + + // + // SessionSv1UpdateSession + // + totalUsage := time.Duration(utils.RandomInteger(int64(*minUsage), int64(*maxUsage))) + log.Println("randUsage", totalUsage) + for currentUsage < totalUsage { + // log.Println("currentUsage", currentUsage) + currentUsage += *updateInterval + if currentUsage >= totalUsage { + break + } + event.Event[utils.Usage] = currentUsage.String() + + upArgs := &sessions.V1UpdateSessionArgs{ + GetAttributes: true, + UpdateSession: true, + CGREvent: event, + } + var upRply sessions.V1UpdateSessionReply + if err = brpc.Call(utils.SessionSv1UpdateSession, upArgs, &upRply); err != nil { + return + } + // log.Printf("update: %+v.", utils.ToJSON(upRply)) + } + + // + // SessionSv1TerminateSession + // + event.Event[utils.Usage] = totalUsage.String() + + tArgs := &sessions.V1TerminateSessionArgs{ + TerminateSession: true, + CGREvent: event, + } + var tRply string + if err = brpc.Call(utils.SessionSv1TerminateSession, tArgs, &tRply); err != nil { + return + } + // log.Printf("terminate: %+v.", utils.ToJSON(tRply)) + + // + // SessionSv1ProcessCDR + // + procArgs := event + var pRply string + if err = brpc.Call(utils.SessionSv1ProcessCDR, procArgs, &pRply); err != nil { + return + } + // log.Printf("process: %+v.", utils.ToJSON(pRply)) + + return +} diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 036c908d4..ce912818b 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -28,6 +28,7 @@ import ( "os" "runtime" "runtime/pprof" + "sync" "time" "github.com/cgrates/cgrates/config" @@ -73,17 +74,22 @@ var ( "The amount of wait time until timeout for reading operations") dbRedisWriteTimeout = cgrTesterFlags.Duration(utils.RedisWriteTimeoutCfg, cgrConfig.DataDbCfg().Opts.RedisWriteTimeout, "The amount of wait time until timeout for writing operations") - raterAddress = cgrTesterFlags.String("rater_address", "", "Rater address for remote tests. Empty for internal rater.") - tor = cgrTesterFlags.String("tor", utils.MetaVoice, "The type of record to use in queries.") - category = cgrTesterFlags.String("category", "call", "The Record category to test.") - tenant = cgrTesterFlags.String("tenant", "cgrates.org", "The type of record to use in queries.") - subject = cgrTesterFlags.String("subject", "1001", "The rating subject to use in queries.") - destination = cgrTesterFlags.String("destination", "1002", "The destination to use in queries.") - json = cgrTesterFlags.Bool("json", false, "Use JSON RPC") - version = cgrTesterFlags.Bool("version", false, "Prints the application version.") - usage = cgrTesterFlags.String("usage", "1m", "The duration to use in call simulation.") - fPath = cgrTesterFlags.String("file_path", "", "read requests from file with path") - reqSep = cgrTesterFlags.String("req_separator", "\n\n", "separator for requests in file") + raterAddress = cgrTesterFlags.String("rater_address", "", "Rater address for remote tests. Empty for internal rater.") + cps = cgrTesterFlags.Int("cps", 100, "Number of calls to be initiated/second.") + minUsage = cgrTesterFlags.Duration("min_usage", 1*time.Second, "Minimum usage a session can have") + maxUsage = cgrTesterFlags.Duration("max_usage", 5*time.Second, "Maximum usage a session can have") + updateInterval = cgrTesterFlags.Duration("update_interval", 1*time.Second, "Time duration added for each session update") + + tor = cgrTesterFlags.String("tor", utils.MetaVoice, "The type of record to use in queries.") + category = cgrTesterFlags.String("category", "call", "The Record category to test.") + tenant = cgrTesterFlags.String("tenant", "cgrates.org", "The type of record to use in queries.") + subject = cgrTesterFlags.String("subject", "1001", "The rating subject to use in queries.") + destination = cgrTesterFlags.String("destination", "1002", "The destination to use in queries.") + json = cgrTesterFlags.Bool("json", false, "Use JSON RPC") + version = cgrTesterFlags.Bool("version", false, "Prints the application version.") + usage = cgrTesterFlags.String("usage", "1m", "The duration to use in call simulation.") + fPath = cgrTesterFlags.String("file_path", "", "read requests from file with path") + reqSep = cgrTesterFlags.String("req_separator", "\n\n", "separator for requests in file") err error ) @@ -108,6 +114,9 @@ func durInternalRater(cd *engine.CallDescriptorWithAPIOpts) (time.Duration, erro start := time.Now() for i := 0; i < *runs; i++ { result, err = cd.GetCost() + if err != nil { + return 0, err + } if *memprofile != "" { runtime.MemProfileRate = 1 runtime.GC() @@ -265,6 +274,9 @@ func main() { var err error tstart := time.Now() timeparsed, err = utils.ParseDurationWithNanosecs(*usage) + if err != nil { + return + } tend := tstart.Add(timeparsed) cd := &engine.CallDescriptorWithAPIOpts{ CallDescriptor: &engine.CallDescriptor{ @@ -289,4 +301,18 @@ func main() { } else { log.Printf("Elapsed: %s resulted: %f req/s.", duration, float64(*runs)/duration.Seconds()) } + + var wg sync.WaitGroup + for i := 0; i < *cps; i++ { + wg.Add(1) + log.Println("run num: ", i) + + go func() { + defer wg.Done() + if err = callSession(); err != nil { + log.Fatal(err.Error()) + } + }() + } + wg.Wait() } diff --git a/utils/coreutils.go b/utils/coreutils.go index a9321f8e6..99aa1e09e 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -981,9 +981,9 @@ func CastRPCErr(err error) error { return err } -// RandomInteger returns a random integer between min and max values -func RandomInteger(min, max int) int { - return math_rand.Intn(max-min) + min +// RandomInteger returns a random 64-bit integer between min and max values +func RandomInteger(min, max int64) int64 { + return math_rand.Int63n(max-min) + min } type LoadIDsWithAPIOpts struct { diff --git a/utils/dataconverter.go b/utils/dataconverter.go index c60db5814..251c93366 100644 --- a/utils/dataconverter.go +++ b/utils/dataconverter.go @@ -453,7 +453,7 @@ func (rC *RandomConverter) Convert(in any) ( if rC.end == 0 { return rand.Int() + rC.begin, nil } else { - return RandomInteger(rC.begin, rC.end), nil + return int(RandomInteger(int64(rC.begin), int64(rC.end))), nil } } }