From 6d8c83ef2f2136a9538642c489f7074ac285be65 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 16 Jul 2019 18:07:43 +0300 Subject: [PATCH] Updated TesminateSession --- apier/v1/sessionsv1_it_test.go | 6 +- config/config_defaults.go | 1 + config/config_json_test.go | 1 + config/config_test.go | 1 + config/libconfig_json.go | 1 + config/smconfig.go | 4 + general_tests/session3_it_test.go | 156 ++++++++++++++++++++++++++++++ sessions/sessions.go | 23 +++-- 8 files changed, 181 insertions(+), 12 deletions(-) diff --git a/apier/v1/sessionsv1_it_test.go b/apier/v1/sessionsv1_it_test.go index d62672ee7..34b98ae4f 100644 --- a/apier/v1/sessionsv1_it_test.go +++ b/apier/v1/sessionsv1_it_test.go @@ -88,16 +88,14 @@ func TestSSv1ItWithPrepaid(t *testing.T) { func TestSSv1ItWithPostPaid(t *testing.T) { sSV1RequestType = utils.META_POSTPAID - sTestSessionSv1 = append(sTestSessionSv1[:len(sTestSessionSv1)-3], testSSv1ItStopCgrEngine) - for _, stest := range sTestSessionSv1 { + for _, stest := range append(sTestSessionSv1[:len(sTestSessionSv1)-3], testSSv1ItStopCgrEngine) { t.Run(sSV1RequestType, stest) } } func TestSSv1ItWithRated(t *testing.T) { sSV1RequestType = utils.META_RATED - sTestSessionSv1 = append(sTestSessionSv1[:len(sTestSessionSv1)-3], testSSv1ItStopCgrEngine) - for _, stest := range sTestSessionSv1 { + for _, stest := range append(sTestSessionSv1[:len(sTestSessionSv1)-3], testSSv1ItStopCgrEngine) { t.Run(sSV1RequestType, stest) } } diff --git a/config/config_defaults.go b/config/config_defaults.go index 095e06a87..cb1ab9274 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -316,6 +316,7 @@ const CGRATES_CFG_JSON = ` "session_indexes": [], // index sessions based on these fields for GetActiveSessions API "client_protocol": 1.0, // version of protocol to use when acting as JSON-PRC client <"0","1.0"> "channel_sync_interval": "0", // sync channels to detect stale sessions (0 to disable) + "terminate_attempts": 5 // attempts to get the session before terminating it }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 02d91d69b..5404bb1b7 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -495,6 +495,7 @@ func TestSmgJsonCfg(t *testing.T) { Session_indexes: &[]string{}, Client_protocol: utils.Float64Pointer(1.0), Channel_sync_interval: utils.StringPointer("0"), + Terminate_attempts: utils.IntPointer(5), } if cfg, err := dfCgrJsonCfg.SessionSJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index f96368f03..9d78b03a7 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -637,6 +637,7 @@ func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) { SessionIndexes: utils.StringMap{}, ClientProtocol: 1.0, ChannelSyncInterval: 0, + TerminateAttempts: 5, } if !reflect.DeepEqual(eSessionSCfg, cgrCfg.sessionSCfg) { t.Errorf("expecting: %s, received: %s", diff --git a/config/libconfig_json.go b/config/libconfig_json.go index a1cbe6ed4..fb65a1e7e 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -193,6 +193,7 @@ type SessionSJsonCfg struct { Session_indexes *[]string Client_protocol *float64 Channel_sync_interval *string + Terminate_attempts *int } // FreeSWITCHAgent config section diff --git a/config/smconfig.go b/config/smconfig.go index 397bae717..414466059 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -121,6 +121,7 @@ type SessionSCfg struct { SessionIndexes utils.StringMap ClientProtocol float64 ChannelSyncInterval time.Duration + TerminateAttempts int } func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) { @@ -244,6 +245,9 @@ func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) { return err } } + if jsnCfg.Terminate_attempts != nil { + self.TerminateAttempts = *jsnCfg.Terminate_attempts + } return nil } diff --git a/general_tests/session3_it_test.go b/general_tests/session3_it_test.go index 15a2cf109..039e11736 100644 --- a/general_tests/session3_it_test.go +++ b/general_tests/session3_it_test.go @@ -53,6 +53,13 @@ var ( testSes3ItProcessEvent, testSes3ItThreshold1002After2, testSes3ItStatMetricsAfter2, + + testSes3ItAddVoiceBalance, + testSes3ItTerminatWithoutInit, + testSes3ItInitAfterTerminate, + testSes3ItBalance, + testSes3ItCDRs, + testSes3ItStopCgrEngine, } ) @@ -238,6 +245,155 @@ func testSes3ItStatMetricsAfter2(t *testing.T) { } } +func testSes3ItAddVoiceBalance(t *testing.T) { + attrSetBalance := utils.AttrSetBalance{ + Tenant: "cgrates.org", + Account: "1002", + BalanceType: utils.VOICE, + BalanceID: utils.StringPointer("TestDynamicDebitBalance"), + Value: utils.Float64Pointer(5 * float64(time.Second)), + RatingSubject: utils.StringPointer("*zero5ms"), + } + var reply string + if err := ses3RPC.Call("ApierV2.SetBalance", attrSetBalance, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Received: %s", reply) + } + var acnt *engine.Account + attrs := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: "1002", + } + if err := ses3RPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if rply := acnt.BalanceMap[utils.VOICE].GetTotalValue(); rply != float64(5*time.Second) { + t.Errorf("Expecting: %v, received: %v", + float64(5*time.Second), rply) + } +} + +func testSes3ItTerminatWithoutInit(t *testing.T) { + go func() { // used in a gorutine to not block the test + // because it needs to call initSession when the call for Teminate is still active + args := &sessions.V1TerminateSessionArgs{ + TerminateSession: true, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSesItUpdateSession", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.ToR: utils.VOICE, + utils.OriginID: "TestTerminate", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1002", + utils.Subject: "1001", + utils.Destination: "1001", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 2 * time.Second, + }, + }, + } + var rply string + if err := ses3RPC.Call(utils.SessionSv1TerminateSession, + args, &rply); err != nil { + t.Error(err) + } + if rply != utils.OK { + t.Errorf("Unexpected reply: %s", rply) + } + }() + +} + +func testSes3ItInitAfterTerminate(t *testing.T) { + time.Sleep(3 * time.Millisecond) + args1 := &sessions.V1InitSessionArgs{ + InitSession: true, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSesItInitiateSession", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.ToR: utils.VOICE, + utils.OriginID: "TestTerminate", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1002", + utils.Subject: "1001", + utils.Destination: "1001", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 5 * time.Second, + }, + }, + } + var rply1 sessions.V1InitSessionReply + if err := ses3RPC.Call(utils.SessionSv1InitiateSession, + args1, &rply1); err != nil { + t.Error(err) + return + } else if *rply1.MaxUsage != 0 { + t.Errorf("Unexpected MaxUsage: %v", rply1.MaxUsage) + } + time.Sleep(5 * time.Millisecond) + aSessions := make([]*sessions.ExternalSession, 0) + if err := ses3RPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} +func testSes3ItBalance(t *testing.T) { + time.Sleep(10 * time.Millisecond) + var acnt *engine.Account + attrs := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: "1002", + } + if err := ses3RPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if rply := acnt.BalanceMap[utils.VOICE].GetTotalValue(); rply != float64(3*time.Second) { + t.Errorf("Expecting: %v, received: %v", + 3*time.Second, rply) + } +} + +func testSes3ItCDRs(t *testing.T) { + var reply string + if err := ses3RPC.Call(utils.SessionSv1ProcessCDR, &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSesItProccesCDR", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.ToR: utils.VOICE, + utils.OriginID: "TestTerminate", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1002", + utils.Subject: "1001", + utils.Destination: "1001", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 2 * time.Second, + }}, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Received reply: %s", reply) + } + time.Sleep(20 * time.Millisecond) + var cdrs []*engine.ExternalCDR + req := utils.RPCCDRsFilter{RunIDs: []string{"CustomerCharges"}, + Accounts: []string{"1002"}} + if err := ses3RPC.Call(utils.ApierV2GetCDRs, req, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(cdrs)) + } else if cdrs[0].Usage != "2s" { + t.Errorf("Unexpected CDR Usage received, cdr: %v %+v ", cdrs[0].Usage, cdrs[0]) + } +} func testSes3ItStopCgrEngine(t *testing.T) { if err := engine.KillEngine(100); err != nil { t.Error(err) diff --git a/sessions/sessions.go b/sessions/sessions.go index e824ec146..5e1d5f926 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -528,7 +528,7 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, return } - for i := 0; i < 3; { + for i := 0; i < sS.cgrCfg.SessionSCfg().TerminateAttempts; { var maxDebit time.Duration if maxDebit, err = sS.debitSession(s, sRunIdx, dbtIvl, nil); err != nil { utils.Logger.Warning( @@ -2487,19 +2487,26 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrRALs(err) } } - ss := sS.getRelocateSessions(cgrID, - me.GetStringIgnoreErrors(utils.InitialOriginID), - me.GetStringIgnoreErrors(utils.OriginID), - me.GetStringIgnoreErrors(utils.OriginHost)) var s *Session - if len(ss) == 0 { + fib := utils.Fib() + for i := 0; i < sS.cgrCfg.SessionSCfg().TerminateAttempts; i++ { + ss := sS.getRelocateSessions(cgrID, + me.GetStringIgnoreErrors(utils.InitialOriginID), + me.GetStringIgnoreErrors(utils.OriginID), + me.GetStringIgnoreErrors(utils.OriginHost)) + if len(ss) != 0 { + s = ss[0] + break + } + if i+1 < sS.cgrCfg.SessionSCfg().TerminateAttempts { // not last iteration + time.Sleep(time.Duration(fib()) * time.Millisecond) + continue + } if s, err = sS.initSession(args.CGREvent.Tenant, ev, sS.biJClntID(clnt), me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil { return utils.NewErrRALs(err) } - } else { - s = ss[0] } if err = sS.endSession(s, me.GetDurationPtrIgnoreErrors(utils.Usage),