diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index cee1f2eeb..cafad76e2 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -5,6 +5,7 @@ "general": { "log_level": 7, + "reply_timeout": "20s", }, @@ -28,7 +29,7 @@ "rals": { "enabled": true, "thresholds_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"}, + {"address": "*internal"}, ], }, @@ -36,7 +37,7 @@ "scheduler": { "enabled": true, "cdrs_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"}, + {"address": "*internal"}, ], }, @@ -44,7 +45,7 @@ "cdrs": { "enabled": true, "chargers_conns":[ - {"address": "127.0.0.1:2012", "transport":"*json"}, + {"address": "*internal"}, ], }, @@ -198,7 +199,7 @@ "chargers": { "enabled": true, "attributes_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"}, + {"address": "*internal"}, ], }, @@ -233,7 +234,7 @@ {"address": "*internal"}, ], "resources_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"}, + {"address": "*internal"}, ], }, @@ -250,13 +251,13 @@ {"address": "*internal"} ], "rals_conns": [ - {"address": "*internal"} + {"address": "*internal"}, ], "cdrs_conns": [ {"address": "*internal"} ], "chargers_conns": [ - {"address": "*internal"} + {"address": "*internal"}, ], }, diff --git a/sessions/sessions.go b/sessions/sessions.go index 3f19ebcd6..fb15889bd 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -910,6 +910,21 @@ func (sS *SessionS) getSessionIDsMatchingIndexes(fltrs map[string]string, func (sS *SessionS) asActiveSessions(fltrs map[string]string, count, psv bool) (aSs []*ActiveSession, counter int, err error) { aSs = make([]*ActiveSession, 0) // Make sure we return at least empty list and not nil + + if len(fltrs) == 0 { // no filters applied + ss := sS.getSessions(utils.EmptyString, psv) + if count { + return nil, len(ss), nil + } + aSs = make([]*ActiveSession, len(ss)) + for _, s := range ss { + aSs = append(aSs, + s.AsActiveSessions(sS.cgrCfg.GeneralCfg().DefaultTimezone, + sS.cgrCfg.GeneralCfg().NodeID)...) // Expensive for large number of sessions + } + return + } + // Check first based on indexes so we can downsize the list of matching sessions matchingSessionIDs, checkedFilters := sS.getSessionIDsMatchingIndexes(fltrs, psv) if len(matchingSessionIDs) == 0 && len(checkedFilters) != 0 { diff --git a/sessions/sessions_bench_test.go b/sessions/sessions_bench_test.go index 4b23eda4a..e491ac5af 100644 --- a/sessions/sessions_bench_test.go +++ b/sessions/sessions_bench_test.go @@ -30,6 +30,7 @@ import ( "time" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -38,6 +39,8 @@ var ( sBenchRPC *rpc.Client connOnce sync.Once initRuns = flag.Int("init_runs", 25000, "number of loops to run in init") + cps = flag.Int("cps", 2000, "number of loops to run in init") + maxCps = make(chan struct{}, *cps) ) func startRPC() { @@ -53,6 +56,23 @@ func startRPC() { } } +func loadTP() { + for i := 0; i < *cps; i++ { // init CPS limitation + maxCps <- struct{}{} + } + if err := engine.InitDataDb(sBenchCfg); err != nil { + log.Fatal(err) + } + attrs := &utils.AttrLoadTpFromFolder{ + FolderPath: path.Join(config.CgrConfig().DataFolderPath, "tariffplans", "tutorial")} + var tpLoadInst utils.LoadInstance + if err := sBenchRPC.Call("ApierV2.LoadTariffPlanFromFolder", + attrs, &tpLoadInst); err != nil { + log.Fatal(err) + } + time.Sleep(time.Duration(100) * time.Millisecond) // Give time for scheduler to execute topups +} + func addBalance(sBenchRPC *rpc.Client, sraccount string) { attrSetBalance := utils.AttrSetBalance{ Tenant: "cgrates.org", @@ -72,35 +92,38 @@ func addAccouns() { var wg sync.WaitGroup for i := 0; i < *initRuns; i++ { wg.Add(1) - go func(i int, sBenchRPC *rpc.Client) { + go func(i int) { + oneCps := <-maxCps // queue here for maxCps + defer func() { maxCps <- oneCps }() addBalance(sBenchRPC, fmt.Sprintf("1001%v", i)) - addBalance(sBenchRPC, fmt.Sprintf("1002%v", i)) wg.Done() - }(i, sBenchRPC) + }(i) } wg.Wait() } func sendInit() { - initArgs := &V1InitSessionArgs{ - InitSession: true, - CGREvent: utils.CGREvent{ - Tenant: "cgrates.org", - ID: "", - Event: map[string]interface{}{ - utils.EVENT_NAME: "TEST_EVENT", - utils.ToR: utils.VOICE, - utils.Category: "call", - utils.Tenant: "cgrates.org", - utils.RequestType: utils.META_PREPAID, - utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC), - }, - }, - } var wg sync.WaitGroup for i := 0; i < *initRuns; i++ { wg.Add(1) go func(i int) { + oneCps := <-maxCps // queue here for maxCps + defer func() { maxCps <- oneCps }() + initArgs := &V1InitSessionArgs{ + InitSession: true, + CGREvent: utils.CGREvent{ + Tenant: "cgrates.org", + ID: "", + Event: map[string]interface{}{ + utils.EVENT_NAME: "TEST_EVENT", + utils.ToR: utils.VOICE, + utils.Category: "call", + utils.Tenant: "cgrates.org", + utils.RequestType: utils.META_PREPAID, + utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC), + }, + }, + } initArgs.ID = utils.UUIDSha1Prefix() initArgs.Event[utils.OriginID] = utils.UUIDSha1Prefix() initArgs.Event[utils.Account] = fmt.Sprintf("1001%v", i) @@ -130,7 +153,8 @@ func getCount() int { func BenchmarkSendInitSession(b *testing.B) { connOnce.Do(func() { startRPC() - // addAccouns() + loadTP() + addAccouns() sendInit() // time.Sleep(3 * time.Minute) })