diff --git a/data/conf/samples/sessintjson/cgrates.json b/data/conf/samples/sessintjson/cgrates.json new file mode 100644 index 000000000..ad87241a2 --- /dev/null +++ b/data/conf/samples/sessintjson/cgrates.json @@ -0,0 +1,94 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "log_level": 7, + "reply_timeout": "5m", +}, + + +"data_db": { + "db_type": "*internal", +}, + + +"stor_db": { + "db_type": "*internal", +}, + + +"rals": { + "enabled": true, + "thresholds_conns": ["*localhost"], +}, + + +"schedulers": { + "enabled": true, + "cdrs_conns": ["*localhost"], +}, + + +"cdrs": { + "enabled": true, + "chargers_conns":["*localhost"], +}, + + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*localhost"], +}, + + +"resources": { + "enabled": true, + "store_interval": "-1", + "thresholds_conns": ["*localhost"] +}, + + +"stats": { + "enabled": true, + "store_interval": "-1", + "thresholds_conns": ["*localhost"], +}, + +"thresholds": { + "enabled": true, + "store_interval": "-1", +}, + + +"suppliers": { + "enabled": true, + "prefix_indexed_fields":["Destination"], + "stats_conns": ["*localhost"], + "resources_conns": ["*localhost"], +}, + + +"sessions": { + "enabled": true, + "suppliers_conns": ["*localhost"], + "resources_conns": ["*localhost"], + "attributes_conns": ["*localhost"], + "rals_conns": ["*localhost"], + "cdrs_conns": ["*localhost"], + "chargers_conns": ["*localhost"], +}, + + +"apier": { + "scheduler_conns": ["*localhost"], +}, + + +} diff --git a/data/tariffplans/tp1cnt/Chargers.csv b/data/tariffplans/tp1cnt/Chargers.csv new file mode 100644 index 000000000..c270b5867 --- /dev/null +++ b/data/tariffplans/tp1cnt/Chargers.csv @@ -0,0 +1,2 @@ +#Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight +cgrates.org,DEFAULT,,,*default,*none,0 \ No newline at end of file diff --git a/data/tariffplans/tp1cnt/DestinationRates.csv b/data/tariffplans/tp1cnt/DestinationRates.csv new file mode 100644 index 000000000..e447c4e86 --- /dev/null +++ b/data/tariffplans/tp1cnt/DestinationRates.csv @@ -0,0 +1,2 @@ +#ID,DestinationsID,RatesID,RoundingMethod,RoundingDecimals,MaxCost,MaxCostStrategy +DR_1CNT,*any,RT_1CNT,*up,4,, diff --git a/data/tariffplans/tp1cnt/Rates.csv b/data/tariffplans/tp1cnt/Rates.csv new file mode 100644 index 000000000..669c6d0f1 --- /dev/null +++ b/data/tariffplans/tp1cnt/Rates.csv @@ -0,0 +1,2 @@ +#ID,ConnectFee,Rate,RateUnit,RateIncrement,GroupIntervalStart +RT_1CNT,0,0.01,1s,1s,0 \ No newline at end of file diff --git a/data/tariffplans/tp1cnt/RatingPlans.csv b/data/tariffplans/tp1cnt/RatingPlans.csv new file mode 100644 index 000000000..d0e541ee8 --- /dev/null +++ b/data/tariffplans/tp1cnt/RatingPlans.csv @@ -0,0 +1,2 @@ +#ID,DestinationRatesID,TimingID,Weight +RP_1CNT,DR_1CNT,*any,0 diff --git a/data/tariffplans/tp1cnt/RatingProfiles.csv b/data/tariffplans/tp1cnt/RatingProfiles.csv new file mode 100644 index 000000000..9b7b7c523 --- /dev/null +++ b/data/tariffplans/tp1cnt/RatingProfiles.csv @@ -0,0 +1,2 @@ +#Tenant,Category,Subject,ActivationTime,RatingPlanID,FallbackSubject +cgrates.org,call,*any,2020-01-01T00:00:00Z,RP_1CNT, diff --git a/engine/mapevent.go b/engine/mapevent.go index 55958906e..8160cf06f 100644 --- a/engine/mapevent.go +++ b/engine/mapevent.go @@ -152,6 +152,21 @@ func (me MapEvent) GetTimeIgnoreErrors(fldName string, tmz string) (t time.Time) return } +// GetTimePtr returns a pointer towards time or error +func (me MapEvent) GetTimePtr(fldName, tmz string) (t *time.Time, err error) { + var tm time.Time + if tm, err = me.GetTime(fldName, tmz); err != nil { + return + } + return utils.TimePointer(tm), nil +} + +// GetTimePtrIgnoreErrors returns a pointer towards time or nil if errors +func (me MapEvent) GetTimePtrIgnoreErrors(fldName, tmz string) (t *time.Time) { + t, _ = me.GetTimePtr(fldName, tmz) + return +} + // Clone returns the cloned map func (me MapEvent) Clone() (mp MapEvent) { if me == nil { diff --git a/general_tests/a1_it_test.go b/general_tests/a1_it_test.go index f86904b1f..16f1fad60 100644 --- a/general_tests/a1_it_test.go +++ b/general_tests/a1_it_test.go @@ -21,11 +21,8 @@ package general_tests import ( "encoding/json" - "errors" - "flag" "fmt" "net/rpc" - "net/rpc/jsonrpc" "path" "sync" "testing" @@ -44,20 +41,8 @@ var ( a1CfgPath string a1Cfg *config.CGRConfig a1rpc *rpc.Client - encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication") ) -func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) { - switch *encoding { - case utils.MetaJSON: - return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen) - case utils.MetaGOB: - return rpc.Dial(utils.TCP, cfg.RPCGOBListen) - default: - return nil, errors.New("UNSUPPORTED_RPC") - } -} - var sTestsA1it = []func(t *testing.T){ testA1itLoadConfig, testA1itResetDataDB, diff --git a/general_tests/libtest.go b/general_tests/libtest.go new file mode 100644 index 000000000..de11c0d2b --- /dev/null +++ b/general_tests/libtest.go @@ -0,0 +1,46 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "errors" + "flag" + "net/rpc" + "net/rpc/jsonrpc" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +var ( + dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") + waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache") + encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication") + err error +) + +func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) { + switch *encoding { + case utils.MetaJSON: + return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen) + case utils.MetaGOB: + return rpc.Dial(utils.TCP, cfg.RPCGOBListen) + default: + return nil, errors.New("UNSUPPORTED_RPC") + } +} diff --git a/general_tests/multiplecdrc_it_test.go b/general_tests/multiplecdrc_it_test.go index e2c9259a8..3a83ef680 100644 --- a/general_tests/multiplecdrc_it_test.go +++ b/general_tests/multiplecdrc_it_test.go @@ -40,8 +40,6 @@ var ( rater *rpc.Client testCalls = flag.Bool("calls", false, "Run test calls simulation, not by default.") - dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") - waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache") sTestMCDRC = []func(t *testing.T){ testMCDRCLoadConfig, diff --git a/general_tests/rpcclient_it_test.go b/general_tests/rpcclient_it_test.go index 679d68f78..568fac0a7 100644 --- a/general_tests/rpcclient_it_test.go +++ b/general_tests/rpcclient_it_test.go @@ -43,7 +43,6 @@ var ( // shared vars rpcRAL1, rpcRAL2 *rpcclient.RPCClient rpcPoolFirst, rpcPoolBroadcast *rpcclient.RPCPool ral1, ral2 *exec.Cmd - err error node1 = "node1" node2 = "node2" ) diff --git a/general_tests/sessions_concur_test.go b/general_tests/sessions_concur_test.go index ea392be9a..de8a13da9 100644 --- a/general_tests/sessions_concur_test.go +++ b/general_tests/sessions_concur_test.go @@ -1,4 +1,4 @@ -// +build integration +// +build performance /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments @@ -28,6 +28,7 @@ import ( "testing" "time" + v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" @@ -39,7 +40,7 @@ var ( sCncrCfg *config.CGRConfig sCncrRPC *rpc.Client - sCncrSessions = flag.Int("concurrent_sessions", 500000, "maximum concurrent sessions created") + sCncrSessions = flag.Int("sessions", 500000, "maximum concurrent sessions created") sCncrCps = flag.Int("cps", 50000, "maximum requests per second sent out") cpsPool = make(chan struct{}, *sCncrCps) @@ -53,6 +54,14 @@ func TestSCncrInternal(t *testing.T) { } } +// Tests starting here +func TestSCncrJSON(t *testing.T) { + sCncrCfgDIR = "sessintjson" + for _, tst := range sTestsSCncrIT { + t.Run(sCncrCfgDIR, tst) + } +} + // subtests to be executed var sTestsSCncrIT = []func(t *testing.T){ testSCncrInitConfig, @@ -110,7 +119,7 @@ func testSCncrLoadTP(t *testing.T) { var loadInst string if err := sCncrRPC.Call(utils.ApierV1LoadTariffPlanFromFolder, &utils.AttrLoadTpFromFolder{FolderPath: path.Join( - *dataDir, "tariffplans", "testit")}, &loadInst); err != nil { + *dataDir, "tariffplans", "tp1cnt")}, &loadInst); err != nil { t.Error(err) } } @@ -139,26 +148,30 @@ func testSCncrRunSessions(t *testing.T) { func runSession(acntID string) (err error) { originID := utils.GenUUID() // each test with it's own OriginID - // topup as much as we know we need - topupDur := time.Duration(13000) * time.Hour - attrSetBalance := utils.AttrSetBalance{ + // topup as much as we know we need for one session + topupDur := time.Duration(90) * time.Hour + var addBlcRply string + argsAddBalance := &v1.AttrAddBalance{ Tenant: "cgrates.org", Account: acntID, BalanceType: utils.VOICE, - Balance: map[string]interface{}{ - utils.ID: "testSCncr", - utils.Value: topupDur.Nanoseconds(), - utils.Weight: 20, - }, - } - var reply string - if err = sCncrRPC.Call(utils.ApierV1SetBalance, - attrSetBalance, &reply); err != nil { + Value: float64(topupDur.Nanoseconds())} + if err = sCncrRPC.Call(utils.ApierV1AddBalance, argsAddBalance, &addBlcRply); err != nil { return - } else if reply != utils.OK { - return fmt.Errorf("received: <%s> to ApierV1.SetBalance", reply) + } else if addBlcRply != utils.OK { + return fmt.Errorf("received: <%s> to ApierV1.AddBalance", addBlcRply) } - + /* + var acnt *engine.Account + acntAttrs := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: acntID} + if err = sCncrRPC.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil { + return + } else if vcBlnc := acnt.BalanceMap[utils.VOICE].GetTotalValue(); vcBlnc != float64(topupDur.Nanoseconds()) { + return fmt.Errorf("unexpected voice balance received: %+v", utils.ToIJSON(acnt)) + } + */ time.Sleep(time.Duration( utils.RandomInteger(0, 100)) * time.Millisecond) // randomize between tests @@ -184,21 +197,17 @@ func runSession(acntID string) (err error) { if err := sCncrRPC.Call(utils.SessionSv1AuthorizeEvent, authArgs, &rplyAuth); err != nil { return err } - if rplyAuth.MaxUsage != authDur { - return fmt.Errorf("unexpected MaxUsage: %v to auth", rplyAuth.MaxUsage) - } time.Sleep(time.Duration( - utils.RandomInteger(0, 100)) * time.Millisecond) // randomize between tests + utils.RandomInteger(0, 100)) * time.Millisecond) // Init the session - initUsage := 90 * time.Second + initUsage := 1 * time.Minute initArgs := &sessions.V1InitSessionArgs{ InitSession: true, CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", ID: fmt.Sprintf("TestSCncrInit%s", originID), Event: map[string]interface{}{ - utils.Tenant: "cgrates.org", utils.OriginID: originID, utils.RequestType: utils.META_PREPAID, utils.Account: acntID, @@ -212,8 +221,89 @@ func runSession(acntID string) (err error) { if err := sCncrRPC.Call(utils.SessionSv1InitiateSession, initArgs, &rplyInit); err != nil { return err - } else if rplyInit.MaxUsage != initUsage { + } else if rplyInit.MaxUsage == 0 { return fmt.Errorf("unexpected MaxUsage at init: %v", rplyInit.MaxUsage) } + time.Sleep(time.Duration( + utils.RandomInteger(0, 100)) * time.Millisecond) + + // Update the session + updtUsage := 1 * time.Minute + updtArgs := &sessions.V1UpdateSessionArgs{ + UpdateSession: true, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TestSCncrUpdate%s", originID), + Event: map[string]interface{}{ + utils.OriginID: originID, + utils.Usage: updtUsage, + }, + }, + } + var rplyUpdt sessions.V1UpdateSessionReply + if err = sCncrRPC.Call(utils.SessionSv1UpdateSession, + updtArgs, &rplyUpdt); err != nil { + return + } else if rplyUpdt.MaxUsage == 0 { + return fmt.Errorf("unexpected MaxUsage at update: %v", rplyUpdt.MaxUsage) + } + time.Sleep(time.Duration( + utils.RandomInteger(0, 100)) * time.Millisecond) + + // Terminate the session + trmntArgs := &sessions.V1TerminateSessionArgs{ + TerminateSession: true, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TestSCncrTerminate%s", originID), + Event: map[string]interface{}{ + utils.OriginID: originID, + utils.Usage: time.Duration(90 * time.Second), + }, + }, + } + var rplyTrmnt string + if err = sCncrRPC.Call(utils.SessionSv1TerminateSession, + trmntArgs, &rplyTrmnt); err != nil { + return + } else if rplyTrmnt != utils.OK { + return fmt.Errorf("received: <%s> to SessionSv1.Terminate", rplyTrmnt) + } + time.Sleep(time.Duration( + utils.RandomInteger(0, 100)) * time.Millisecond) + + // processCDR + argsCDR := &utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("TestSCncrCDR%s", originID), + Event: map[string]interface{}{ + utils.OriginID: originID, + }, + }, + } + var rplyCDR string + if err = sCncrRPC.Call(utils.SessionSv1ProcessCDR, + argsCDR, &rplyCDR); err != nil { + return + } else if rplyCDR != utils.OK { + return fmt.Errorf("received: <%s> to ProcessCDR", rplyCDR) + } + time.Sleep(time.Duration( + utils.RandomInteger(0, 100)) * time.Millisecond) + /* + // make sure the account was properly refunded + var acnt *engine.Account + acntAttrs := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: acntID} + if err = sCncrRPC.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil { + return + } else if vcBlnc := acnt.BalanceMap[utils.VOICE].GetTotalValue(); vcBlnc != 0 { + return fmt.Errorf("unexpected voice balance received: %+v", utils.ToIJSON(acnt)) + } else if mnBlnc := acnt.BalanceMap[utils.MONETARY].GetTotalValue(); mnBlnc != 0 { + return fmt.Errorf("unexpected voice balance received: %+v", utils.ToIJSON(acnt)) + } + */ return } diff --git a/sessions/sessions.go b/sessions/sessions.go index 120282c75..05af43e54 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -1467,8 +1467,9 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration, sr.CD.DurationIndex += notCharged cc := new(engine.CallCost) if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().RALsConns, nil, utils.ResponderDebit, - &engine.CallDescriptorWithArgDispatcher{CallDescriptor: sr.CD, - ArgDispatcher: s.ArgDispatcher}, cc); err == nil { + &engine.CallDescriptorWithArgDispatcher{ + CallDescriptor: sr.CD, + ArgDispatcher: s.ArgDispatcher}, cc); err == nil { sr.EventCost.Merge( engine.NewEventCostFromCallCost(cc, s.CGRID, sr.Event.GetStringIgnoreErrors(utils.RunID))) @@ -2518,8 +2519,8 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector, if err = sS.terminateSession(s, ev.GetDurationPtrIgnoreErrors(utils.Usage), ev.GetDurationPtrIgnoreErrors(utils.LastUsed), - utils.TimePointer(ev.GetTimeIgnoreErrors(utils.AnswerTime, - utils.EmptyString)), false); err != nil { + ev.GetTimePtrIgnoreErrors(utils.AnswerTime, utils.EmptyString), + false); err != nil { return utils.NewErrRALs(err) } } @@ -2636,7 +2637,6 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.ClientConnector, if cgrEvs, err = s.asCGREvents(); err != nil { return utils.NewErrServerError(err) } - var withErrors bool for _, cgrEv := range cgrEvs { argsProc := &engine.ArgV1ProcessEvent{ @@ -3160,9 +3160,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, if err = sS.terminateSession(s, ev.GetDurationPtrIgnoreErrors(utils.Usage), ev.GetDurationPtrIgnoreErrors(utils.LastUsed), - utils.TimePointer( - ev.GetTimeIgnoreErrors(utils.AnswerTime, - utils.EmptyString)), false); err != nil { + ev.GetTimePtrIgnoreErrors(utils.AnswerTime, utils.EmptyString), + false); err != nil { return utils.NewErrRALs(err) } }