From d93fc46fc2ac66e501dce76987dcd5907e32b599 Mon Sep 17 00:00:00 2001 From: Tripon Alexandru-Ionut Date: Fri, 19 Apr 2019 14:00:44 +0300 Subject: [PATCH 1/2] Added *dispatchers to agents flags/subsystems --- agents/asterisk_event.go | 3 +++ agents/diamagent.go | 3 +++ agents/fsagent.go | 7 ++++++- agents/fsevent.go | 9 +++++++++ agents/httpagent.go | 3 +++ agents/kamagent.go | 6 +++++- agents/kamevent.go | 9 +++++++++ agents/radagent.go | 3 +++ .../samples/dispatchers/radagent/cgrates.json | 18 +++++++++--------- 9 files changed, 50 insertions(+), 11 deletions(-) diff --git a/agents/asterisk_event.go b/agents/asterisk_event.go index 047c5e815..c0f8fdcbe 100644 --- a/agents/asterisk_event.go +++ b/agents/asterisk_event.go @@ -292,6 +292,9 @@ func (smaEv *SMAsteriskEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) args.ProcessStats = strings.Index(smaEv.Subsystems(), utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(smaEv.Subsystems(), utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } diff --git a/agents/diamagent.go b/agents/diamagent.go index 65e3cd976..736aea040 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -255,6 +255,9 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor, } cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep) argDisp := cgrEv.ConsumeArgDispatcher() + if reqProcessor.Flags.HasKey(utils.MetaDispatchers) && argDisp == nil { + argDisp = new(utils.ArgDispatcher) + } var reqType string for _, typ := range []string{ utils.MetaDryRun, utils.MetaAuth, diff --git a/agents/fsagent.go b/agents/fsagent.go index 83e114007..c2290feed 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "reflect" + "strings" "time" "github.com/cgrates/cgrates/config" @@ -265,8 +266,12 @@ func (sm *FSsessions) onChannelHangupComplete(fsev FSEvent, connId string) { if err != nil { return } + argsDisp := cgrEv.ConsumeArgDispatcher() + if strings.Index(fsev[VarCGRSubsystems], utils.MetaDispatchers) != -1 && argsDisp == nil { + argsDisp = new(utils.ArgDispatcher) + } if err := sm.sS.Call(utils.SessionSv1ProcessCDR, - &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: cgrEv.ConsumeArgDispatcher()}, &reply); err != nil { + &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: argsDisp}, &reply); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed processing CGREvent: %s, error: <%s>", utils.FreeSWITCHAgent, utils.ToJSON(cgrEv), err.Error())) } diff --git a/agents/fsevent.go b/agents/fsevent.go index 57f573739..cef0d466e 100644 --- a/agents/fsevent.go +++ b/agents/fsevent.go @@ -425,6 +425,9 @@ func (fsev FSEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) { args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } @@ -448,6 +451,9 @@ func (fsev FSEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) { args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } @@ -470,6 +476,9 @@ func (fsev FSEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionA args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } diff --git a/agents/httpagent.go b/agents/httpagent.go index 2b2b4cd06..345bd0be2 100644 --- a/agents/httpagent.go +++ b/agents/httpagent.go @@ -105,6 +105,9 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor, } cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep) argDisp := cgrEv.ConsumeArgDispatcher() + if reqProcessor.Flags.HasKey(utils.MetaDispatchers) && argDisp == nil { + argDisp = new(utils.ArgDispatcher) + } var reqType string for _, typ := range []string{ utils.MetaDryRun, utils.MetaAuth, diff --git a/agents/kamagent.go b/agents/kamagent.go index 160f58f7a..b565fa315 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -203,8 +203,12 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) { return } cgrEv.Event[utils.OriginHost] = ka.conns[connID].RemoteAddr().String() + argsDisp := cgrEv.ConsumeArgDispatcher() + if strings.Index(kev[utils.CGRSubsystems], utils.MetaDispatchers) != -1 && argsDisp == nil { + argsDisp = new(utils.ArgDispatcher) + } if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR, - &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: cgrEv.ConsumeArgDispatcher()}, &reply); err != nil { + &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: argsDisp}, &reply); err != nil { utils.Logger.Err(fmt.Sprintf("%s> failed processing CGREvent: %s, error: %s", utils.KamailioAgent, utils.ToJSON(cgrEv), err.Error())) } diff --git a/agents/kamevent.go b/agents/kamevent.go index 1f97f67ec..0c1e3fdfb 100644 --- a/agents/kamevent.go +++ b/agents/kamevent.go @@ -192,6 +192,9 @@ func (kev KamEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) { args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } @@ -256,6 +259,9 @@ func (kev KamEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) { args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } @@ -278,6 +284,9 @@ func (kev KamEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionA args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } diff --git a/agents/radagent.go b/agents/radagent.go index 018a4df57..350b14d48 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -149,6 +149,9 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RequestProcessor, } cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep) argDisp := cgrEv.ConsumeArgDispatcher() + if reqProcessor.Flags.HasKey(utils.MetaDispatchers) && argDisp == nil { + argDisp = new(utils.ArgDispatcher) + } var reqType string for _, typ := range []string{ utils.MetaDryRun, utils.MetaAuth, diff --git a/data/conf/samples/dispatchers/radagent/cgrates.json b/data/conf/samples/dispatchers/radagent/cgrates.json index 22297e33e..9af1a7a98 100644 --- a/data/conf/samples/dispatchers/radagent/cgrates.json +++ b/data/conf/samples/dispatchers/radagent/cgrates.json @@ -43,9 +43,9 @@ "dispatchers":{ "enabled": true, - "attributes_conns": [ - {"address": "*internal"}, - ], + // "attributes_conns": [ + // {"address": "*internal"}, + // ], }, "radius_agent": { @@ -54,11 +54,11 @@ { "id": "KamailioAuth", "filters": ["*string:~*vars.*radReqType:*radAuth"], - "flags": ["*auth", "*accounts"], + "flags": ["*auth", "*accounts","*dispatchers"], "continue_on_success": false, "request_fields":[ {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"}, - {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, + // {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*prepaid", "mandatory": true}, {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", @@ -82,11 +82,11 @@ { "id": "KamailioAccountingStart", "filters": ["*string:~*req.Acct-Status-Type:Start"], - "flags": ["*initiate", "*attributes", "*resources", "*accounts"], + "flags": ["*initiate", "*attributes", "*resources", "*accounts","*dispatchers"], "continue_on_success": false, "request_fields":[ {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"}, - {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, + // {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*prepaid", "mandatory": true}, {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", @@ -110,11 +110,11 @@ { "id": "KamailioAccountingStop", "filters": ["*string:~*req.Acct-Status-Type:Stop"], - "flags": ["*terminate", "*resources", "*accounts", "*cdrs"], + "flags": ["*terminate", "*resources", "*accounts", "*cdrs","*dispatchers"], "continue_on_success": false, "request_fields":[ {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"}, - {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, + // {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*prepaid", "mandatory": true}, {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", From 610c612f35d41d590273f8cbf022fc89efa1c49e Mon Sep 17 00:00:00 2001 From: Tripon Alexandru-Ionut Date: Fri, 19 Apr 2019 16:41:10 +0300 Subject: [PATCH 2/2] Added sessions benchmark --- sessions/sessions.go | 6 +- sessions/sessions_bench_test.go | 154 ++++++++++++++++++++++++++++++++ 2 files changed, 155 insertions(+), 5 deletions(-) create mode 100644 sessions/sessions_bench_test.go diff --git a/sessions/sessions.go b/sessions/sessions.go index d941a331f..3f19ebcd6 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -921,11 +921,7 @@ func (sS *SessionS) asActiveSessions(fltrs map[string]string, delete(fltrs, fltrFldName) } } - var remainingSessions []*Session // Survived index matching - ss := sS.getSessions(fltrs[utils.CGRID], psv) - for _, s := range ss { - remainingSessions = append(remainingSessions, s) - } + remainingSessions := sS.getSessions(fltrs[utils.CGRID], psv) if len(fltrs) != 0 { // Still have some filters to match for _, s := range remainingSessions { for _, sr := range s.SRuns { diff --git a/sessions/sessions_bench_test.go b/sessions/sessions_bench_test.go new file mode 100644 index 000000000..c4e4a72e1 --- /dev/null +++ b/sessions/sessions_bench_test.go @@ -0,0 +1,154 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package sessions + +import ( + "fmt" + "log" + "net/rpc" + "net/rpc/jsonrpc" + "path" + "sync" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +var ( + SessionBenchmarkCfg *config.CGRConfig + SessionBenchmarkRPC *rpc.Client + ConnectOnce sync.Once + NoSessions int +) + +func startRPC() { + var err error + SessionBenchmarkCfg, err = config.NewCGRConfigFromPath(path.Join(config.CgrConfig().DataFolderPath, "conf", "samples", "tutmysql")) + if err != nil { + log.Fatal(err) + } + config.SetCgrConfig(SessionBenchmarkCfg) + if SessionBenchmarkRPC, err = jsonrpc.Dial("tcp", SessionBenchmarkCfg.ListenCfg().RPCJSONListen); err != nil { + log.Fatalf("Error at dialing rcp client:%v\n", err) + } +} + +func addBalance(SessionBenchmarkRPC *rpc.Client, sraccount string) { + attrSetBalance := utils.AttrSetBalance{ + Tenant: "cgrates.org", + Account: sraccount, + BalanceType: utils.VOICE, + BalanceID: utils.StringPointer("TestDynamicDebitBalance"), + Value: utils.Float64Pointer(5 * float64(time.Hour)), + RatingSubject: utils.StringPointer("*zero5ms"), + } + var reply string + if err := SessionBenchmarkRPC.Call("ApierV2.SetBalance", attrSetBalance, &reply); err != nil { + log.Fatal(err) + // } else if reply != utils.OK { + // log.Fatalf("Received: %s", reply) + } +} + +func addAccouns() { + var wg sync.WaitGroup + for i := 0; i < 23000; i++ { + wg.Add(1) + go func(i int, SessionBenchmarkRPC *rpc.Client) { + addBalance(SessionBenchmarkRPC, fmt.Sprintf("1001%v1002", i)) + addBalance(SessionBenchmarkRPC, fmt.Sprintf("1001%v1001", i)) + wg.Done() + }(i, SessionBenchmarkRPC) + } + wg.Wait() +} + +func sendInit() { + initArgs := &V1InitSessionArgs{ + InitSession: true, + CGREvent: utils.CGREvent{ + Tenant: "cgrates.org", + ID: "", + Event: map[string]interface{}{ + utils.EVENT_NAME: "TEST_EVENT", + utils.ToR: utils.VOICE, + utils.OriginID: "123491", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.Category: "call", + utils.Tenant: "cgrates.org", + utils.RequestType: utils.META_PREPAID, + utils.SetupTime: time.Date(2016, time.January, 5, 18, 30, 59, 0, time.UTC), + utils.AnswerTime: time.Date(2016, time.January, 5, 18, 31, 05, 0, time.UTC), + utils.Usage: "10", // 5MB + }, + }, + } + // var wg sync.WaitGroup + for i := 0; i < 23000; i++ { + // wg.Add(1) + // go func(i int, SessionBenchmarkRPC *rpc.Client) { + initArgs.ID = utils.UUIDSha1Prefix() + initArgs.Event[utils.OriginID] = utils.UUIDSha1Prefix() + initArgs.Event[utils.Account] = fmt.Sprintf("1001%v1002", i) + initArgs.Event[utils.Subject] = initArgs.Event[utils.Account] + initArgs.Event[utils.Destination] = fmt.Sprintf("1001%v1001", i) + + var initRpl *V1InitSessionReply + if err := SessionBenchmarkRPC.Call(utils.SessionSv1InitiateSession, + initArgs, &initRpl); err != nil { + log.Fatal(err) + } + // _ = getCount(SessionBenchmarkRPC) + // if c := getCount(SessionBenchmarkRPC); i+1 != c { + // log.Fatalf("Not Enough sessions %v!=%v", i+1, c) + // } + // wg.Done() + // }(i, SessionBenchmarkRPC) + } + // wg.Wait() +} + +func getCount(SessionBenchmarkRPC *rpc.Client) int { + var count int + if err := SessionBenchmarkRPC.Call(utils.SessionSv1GetActiveSessionsCount, + map[string]string{}, &count); err != nil { + log.Fatal(err) + } + return count +} + +func BenchmarkSendInitSession(b *testing.B) { + ConnectOnce.Do(func() { + startRPC() + // addAccouns() + sendInit() + // time.Sleep(3 * time.Minute) + }) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = getCount(SessionBenchmarkRPC) + // if count < 2000 { + // b.Fatal("Not Enough sessions") + // } + } +}