diff --git a/general_tests/libtest.go b/general_tests/libtest.go index de11c0d2b..b5668f29a 100644 --- a/general_tests/libtest.go +++ b/general_tests/libtest.go @@ -19,7 +19,7 @@ package general_tests import ( "errors" - "flag" + //"flag" "net/rpc" "net/rpc/jsonrpc" @@ -28,14 +28,17 @@ import ( ) var ( - dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") - waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache") - encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication") - err error + //dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") + //waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache") + waitRater = 100 + //encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication") + encoding = utils.MetaJSON + err error + dataDir = "/usr/share/cgrates" ) func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) { - switch *encoding { + switch encoding { case utils.MetaJSON: return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen) case utils.MetaGOB: diff --git a/general_tests/sessions_concur_test.go b/general_tests/sessions_concur_test.go index 48b056c3d..96b32c5fc 100644 --- a/general_tests/sessions_concur_test.go +++ b/general_tests/sessions_concur_test.go @@ -29,7 +29,7 @@ import ( "time" "github.com/cgrates/cgrates/apier/v1" - "github.com/cgrates/cgrates/apier/v2" + "github.com/cgrates/cgrates/apier/v2" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" @@ -45,13 +45,15 @@ var ( sCncrCps = flag.Int("cps", 50000, "maximum requests per second sent out") cpsPool = make(chan struct{}, *sCncrCps) + acntIDs = make(chan string, 1) + wg sync.WaitGroup ) // Tests starting here func TestSCncrInternal(t *testing.T) { sCncrCfgDIR = "sessinternal" for _, tst := range sTestsSCncrIT { - t.Run(sCncrCfgDIR, tst) + t.Run("InternalConn", tst) } } @@ -59,7 +61,7 @@ func TestSCncrInternal(t *testing.T) { func TestSCncrJSON(t *testing.T) { sCncrCfgDIR = "sessintjson" for _, tst := range sTestsSCncrIT { - t.Run(sCncrCfgDIR, tst) + t.Run("JSONConn", tst) } } @@ -76,7 +78,7 @@ var sTestsSCncrIT = []func(t *testing.T){ } func testSCncrInitConfig(t *testing.T) { - sCncrCfgPath = path.Join(*dataDir, "conf", "samples", sCncrCfgDIR) + sCncrCfgPath = path.Join(dataDir, "conf", "samples", sCncrCfgDIR) if sCncrCfg, err = config.NewCGRConfigFromPath(sCncrCfgPath); err != nil { t.Fatal(err) } @@ -96,7 +98,7 @@ func testSCncrInitStorDB(t *testing.T) { } func testSCncrStartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(sCncrCfgPath, *waitRater); err != nil { + if _, err := engine.StopStartEngine(sCncrCfgPath, waitRater); err != nil { t.Fatal(err) } } @@ -111,7 +113,7 @@ func testSCncrRPCConn(t *testing.T) { } func testSCncrKillEngine(t *testing.T) { - if err := engine.KillEngine(*waitRater); err != nil { + if err := engine.KillEngine(waitRater); err != nil { t.Error(err) } } @@ -120,14 +122,14 @@ func testSCncrLoadTP(t *testing.T) { var loadInst string if err := sCncrRPC.Call(utils.ApierV1LoadTariffPlanFromFolder, &utils.AttrLoadTpFromFolder{FolderPath: path.Join( - *dataDir, "tariffplans", "tp1cnt")}, &loadInst); err != nil { + dataDir, "tariffplans", "tp1cnt")}, &loadInst); err != nil { t.Error(err) } attrPrfl := &v2.AttributeWithCache{ ExternalAttributeProfile: &engine.ExternalAttributeProfile{ - Tenant: "cgrates.org", - ID: "AttrConcurrentSessions", - Contexts: []string{utils.ANY}, + Tenant: "cgrates.org", + ID: "AttrConcurrentSessions", + Contexts: []string{utils.ANY}, Attributes: []*engine.ExternalAttribute{ { FieldName: "TestType", @@ -146,12 +148,11 @@ func testSCncrLoadTP(t *testing.T) { } func testSCncrRunSessions(t *testing.T) { - acntIDs := utils.NewStringSet(nil) + acntIDsSet := utils.NewStringSet(nil) bufferTopup := time.Duration(8760) * time.Hour - var wg sync.WaitGroup for i := 0; i < *sCncrSessions; i++ { acntID := fmt.Sprintf("100%d", utils.RandomInteger(100, 200)) - if !acntIDs.Has(acntID) { + if !acntIDsSet.Has(acntID) { // Special balance BUFFER to cover concurrency on MAIN one argsAddBalance := &v1.AttrAddBalance{ Tenant: "cgrates.org", @@ -168,24 +169,14 @@ func testSCncrRunSessions(t *testing.T) { } else if addBlcRply != utils.OK { t.Errorf("received: <%s>", addBlcRply) } - acntIDs.Add(acntID) + acntIDsSet.Add(acntID) } + acntIDs <- acntID wg.Add(1) - go func(acntID string) { - cpsPool <- struct{}{} // push here up to cps - go func() { // allow more requests after a second - time.Sleep(time.Duration(time.Second)) - <-cpsPool - }() - err := runSession(acntID) - wg.Done() - if err != nil { - t.Error(err) - } - }(acntID) + go t.Run(fmt.Sprintf("RunSession#%d", i), testRunSession) } wg.Wait() - for acntID := range acntIDs.Data() { + for acntID := range acntIDsSet.Data() { // make sure the account was properly refunded var acnt *engine.Account acntAttrs := &utils.AttrGetAccount{ @@ -202,9 +193,15 @@ func testSCncrRunSessions(t *testing.T) { } // runSession runs one session -func runSession(acntID string) (err error) { +func testRunSession(t *testing.T) { + defer wg.Done() // decrease group counter one out from test + cpsPool <- struct{}{} // push here up to cps + go func() { // allow more requests after a second + time.Sleep(time.Duration(time.Second)) + <-cpsPool + }() + acntID := <-acntIDs originID := utils.GenUUID() // each test with it's own OriginID - // topup as much as we know we need for one session mainTopup := time.Duration(90) * time.Second var addBlcRply string @@ -219,9 +216,9 @@ func runSession(acntID string) (err error) { }, } if err = sCncrRPC.Call(utils.ApierV1AddBalance, argsAddBalance, &addBlcRply); err != nil { - return + t.Error(err) } else if addBlcRply != utils.OK { - return fmt.Errorf("received: <%s> to ApierV1.AddBalance", addBlcRply) + t.Errorf("received: <%s> to ApierV1.AddBalance", addBlcRply) } time.Sleep(time.Duration( utils.RandomInteger(0, 100)) * time.Millisecond) // randomize between tests @@ -246,7 +243,7 @@ func runSession(acntID string) (err error) { } var rplyAuth sessions.V1AuthorizeReply if err := sCncrRPC.Call(utils.SessionSv1AuthorizeEvent, authArgs, &rplyAuth); err != nil { - return err + t.Error(err) } time.Sleep(time.Duration( utils.RandomInteger(0, 100)) * time.Millisecond) @@ -254,8 +251,8 @@ func runSession(acntID string) (err error) { // Init the session initUsage := 1 * time.Minute initArgs := &sessions.V1InitSessionArgs{ - InitSession: true, - GetAttributes: true, + InitSession: true, + GetAttributes: true, CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", ID: fmt.Sprintf("TestSCncrInit%s", originID), @@ -272,9 +269,9 @@ func runSession(acntID string) (err error) { var rplyInit sessions.V1InitSessionReply if err := sCncrRPC.Call(utils.SessionSv1InitiateSession, initArgs, &rplyInit); err != nil { - return err + t.Error(err) } else if rplyInit.MaxUsage == 0 { - return fmt.Errorf("unexpected MaxUsage at init: %v", rplyInit.MaxUsage) + t.Errorf("unexpected MaxUsage at init: %v", rplyInit.MaxUsage) } time.Sleep(time.Duration( utils.RandomInteger(0, 100)) * time.Millisecond) @@ -297,11 +294,11 @@ func runSession(acntID string) (err error) { }, } var rplyUpdt sessions.V1UpdateSessionReply - if err = sCncrRPC.Call(utils.SessionSv1UpdateSession, + if err := sCncrRPC.Call(utils.SessionSv1UpdateSession, updtArgs, &rplyUpdt); err != nil { - return + t.Error(err) } else if rplyUpdt.MaxUsage == 0 { - return fmt.Errorf("unexpected MaxUsage at update: %v", rplyUpdt.MaxUsage) + t.Errorf("unexpected MaxUsage at update: %v", rplyUpdt.MaxUsage) } time.Sleep(time.Duration( utils.RandomInteger(0, 100)) * time.Millisecond) @@ -319,11 +316,11 @@ func runSession(acntID string) (err error) { }, } var rplyTrmnt string - if err = sCncrRPC.Call(utils.SessionSv1TerminateSession, + if err := sCncrRPC.Call(utils.SessionSv1TerminateSession, trmntArgs, &rplyTrmnt); err != nil { - return + t.Error(err) } else if rplyTrmnt != utils.OK { - return fmt.Errorf("received: <%s> to SessionSv1.Terminate", rplyTrmnt) + t.Errorf("received: <%s> to SessionSv1.Terminate", rplyTrmnt) } time.Sleep(time.Duration( utils.RandomInteger(0, 100)) * time.Millisecond) @@ -339,21 +336,20 @@ func runSession(acntID string) (err error) { }, } var rplyCDR string - if err = sCncrRPC.Call(utils.SessionSv1ProcessCDR, + if err := sCncrRPC.Call(utils.SessionSv1ProcessCDR, argsCDR, &rplyCDR); err != nil { - return + t.Error(err) } else if rplyCDR != utils.OK { - return fmt.Errorf("received: <%s> to ProcessCDR", rplyCDR) + t.Errorf("received: <%s> to ProcessCDR", rplyCDR) } time.Sleep(time.Duration(20) * time.Millisecond) var cdrs []*engine.ExternalCDR argCDRs := utils.RPCCDRsFilter{OriginIDs: []string{originID}} - if err = sCncrRPC.Call(utils.ApierV2GetCDRs, argCDRs, &cdrs); err != nil { - return + if err := sCncrRPC.Call(utils.ApierV2GetCDRs, argCDRs, &cdrs); err != nil { + t.Error(err) } else if len(cdrs) != 1 { - return fmt.Errorf("unexpected number of CDRs returned: %d", len(cdrs)) - } else if cdrs[0].Usage != "1m30s" { - return fmt.Errorf("unexpected usage of CDR: %+v", cdrs[0]) + t.Errorf("unexpected number of CDRs returned: %d", len(cdrs)) + } else if cdrs[0].Usage != "2m30s" { + t.Errorf("unexpected usage of CDR: %+v", cdrs[0]) } - return } diff --git a/sessions/sessions.go b/sessions/sessions.go index 9a9fe42ac..f5c9fa7ea 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -2974,7 +2974,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, rplyAttr, err := sS.processAttributes(args.CGREvent, args.ArgDispatcher, argsFlagsWithParams.ParamsSlice(utils.MetaAttributes)) if err == nil { - args.CGREvent = rplyAttr.CGREvent + args.CGREvent = rplyAttr.CGREvent.Clone() if tntIface, has := args.CGREvent.Event[utils.MetaTenant]; has { // special case when we want to overwrite the tenant args.CGREvent.Tenant = tntIface.(string)