SessionS - optimize GetActiveSessions when no filters present

This commit is contained in:
DanB
2019-04-25 14:30:25 +02:00
parent a350907892
commit da8d971f52
3 changed files with 66 additions and 26 deletions

View File

@@ -5,6 +5,7 @@
"general": {
"log_level": 7,
"reply_timeout": "20s",
},
@@ -28,7 +29,7 @@
"rals": {
"enabled": true,
"thresholds_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
{"address": "*internal"},
],
},
@@ -36,7 +37,7 @@
"scheduler": {
"enabled": true,
"cdrs_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
{"address": "*internal"},
],
},
@@ -44,7 +45,7 @@
"cdrs": {
"enabled": true,
"chargers_conns":[
{"address": "127.0.0.1:2012", "transport":"*json"},
{"address": "*internal"},
],
},
@@ -198,7 +199,7 @@
"chargers": {
"enabled": true,
"attributes_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
{"address": "*internal"},
],
},
@@ -233,7 +234,7 @@
{"address": "*internal"},
],
"resources_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
{"address": "*internal"},
],
},
@@ -250,13 +251,13 @@
{"address": "*internal"}
],
"rals_conns": [
{"address": "*internal"}
{"address": "*internal"},
],
"cdrs_conns": [
{"address": "*internal"}
],
"chargers_conns": [
{"address": "*internal"}
{"address": "*internal"},
],
},

View File

@@ -910,6 +910,21 @@ func (sS *SessionS) getSessionIDsMatchingIndexes(fltrs map[string]string,
func (sS *SessionS) asActiveSessions(fltrs map[string]string,
count, psv bool) (aSs []*ActiveSession, counter int, err error) {
aSs = make([]*ActiveSession, 0) // Make sure we return at least empty list and not nil
if len(fltrs) == 0 { // no filters applied
ss := sS.getSessions(utils.EmptyString, psv)
if count {
return nil, len(ss), nil
}
aSs = make([]*ActiveSession, len(ss))
for _, s := range ss {
aSs = append(aSs,
s.AsActiveSessions(sS.cgrCfg.GeneralCfg().DefaultTimezone,
sS.cgrCfg.GeneralCfg().NodeID)...) // Expensive for large number of sessions
}
return
}
// Check first based on indexes so we can downsize the list of matching sessions
matchingSessionIDs, checkedFilters := sS.getSessionIDsMatchingIndexes(fltrs, psv)
if len(matchingSessionIDs) == 0 && len(checkedFilters) != 0 {

View File

@@ -30,6 +30,7 @@ import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -38,6 +39,8 @@ var (
sBenchRPC *rpc.Client
connOnce sync.Once
initRuns = flag.Int("init_runs", 25000, "number of loops to run in init")
cps = flag.Int("cps", 2000, "number of loops to run in init")
maxCps = make(chan struct{}, *cps)
)
func startRPC() {
@@ -53,6 +56,23 @@ func startRPC() {
}
}
func loadTP() {
for i := 0; i < *cps; i++ { // init CPS limitation
maxCps <- struct{}{}
}
if err := engine.InitDataDb(sBenchCfg); err != nil {
log.Fatal(err)
}
attrs := &utils.AttrLoadTpFromFolder{
FolderPath: path.Join(config.CgrConfig().DataFolderPath, "tariffplans", "tutorial")}
var tpLoadInst utils.LoadInstance
if err := sBenchRPC.Call("ApierV2.LoadTariffPlanFromFolder",
attrs, &tpLoadInst); err != nil {
log.Fatal(err)
}
time.Sleep(time.Duration(100) * time.Millisecond) // Give time for scheduler to execute topups
}
func addBalance(sBenchRPC *rpc.Client, sraccount string) {
attrSetBalance := utils.AttrSetBalance{
Tenant: "cgrates.org",
@@ -72,35 +92,38 @@ func addAccouns() {
var wg sync.WaitGroup
for i := 0; i < *initRuns; i++ {
wg.Add(1)
go func(i int, sBenchRPC *rpc.Client) {
go func(i int) {
oneCps := <-maxCps // queue here for maxCps
defer func() { maxCps <- oneCps }()
addBalance(sBenchRPC, fmt.Sprintf("1001%v", i))
addBalance(sBenchRPC, fmt.Sprintf("1002%v", i))
wg.Done()
}(i, sBenchRPC)
}(i)
}
wg.Wait()
}
func sendInit() {
initArgs := &V1InitSessionArgs{
InitSession: true,
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "",
Event: map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: utils.VOICE,
utils.Category: "call",
utils.Tenant: "cgrates.org",
utils.RequestType: utils.META_PREPAID,
utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
},
},
}
var wg sync.WaitGroup
for i := 0; i < *initRuns; i++ {
wg.Add(1)
go func(i int) {
oneCps := <-maxCps // queue here for maxCps
defer func() { maxCps <- oneCps }()
initArgs := &V1InitSessionArgs{
InitSession: true,
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "",
Event: map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: utils.VOICE,
utils.Category: "call",
utils.Tenant: "cgrates.org",
utils.RequestType: utils.META_PREPAID,
utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC),
},
},
}
initArgs.ID = utils.UUIDSha1Prefix()
initArgs.Event[utils.OriginID] = utils.UUIDSha1Prefix()
initArgs.Event[utils.Account] = fmt.Sprintf("1001%v", i)
@@ -130,7 +153,8 @@ func getCount() int {
func BenchmarkSendInitSession(b *testing.B) {
connOnce.Do(func() {
startRPC()
// addAccouns()
loadTP()
addAccouns()
sendInit()
// time.Sleep(3 * time.Minute)
})