Updated TesminateSession

This commit is contained in:
Trial97
2019-07-16 18:07:43 +03:00
committed by Dan Christian Bogos
parent c1f49a6e73
commit 6d8c83ef2f
8 changed files with 181 additions and 12 deletions

View File

@@ -88,16 +88,14 @@ func TestSSv1ItWithPrepaid(t *testing.T) {
func TestSSv1ItWithPostPaid(t *testing.T) {
sSV1RequestType = utils.META_POSTPAID
sTestSessionSv1 = append(sTestSessionSv1[:len(sTestSessionSv1)-3], testSSv1ItStopCgrEngine)
for _, stest := range sTestSessionSv1 {
for _, stest := range append(sTestSessionSv1[:len(sTestSessionSv1)-3], testSSv1ItStopCgrEngine) {
t.Run(sSV1RequestType, stest)
}
}
func TestSSv1ItWithRated(t *testing.T) {
sSV1RequestType = utils.META_RATED
sTestSessionSv1 = append(sTestSessionSv1[:len(sTestSessionSv1)-3], testSSv1ItStopCgrEngine)
for _, stest := range sTestSessionSv1 {
for _, stest := range append(sTestSessionSv1[:len(sTestSessionSv1)-3], testSSv1ItStopCgrEngine) {
t.Run(sSV1RequestType, stest)
}
}

View File

@@ -316,6 +316,7 @@ const CGRATES_CFG_JSON = `
"session_indexes": [], // index sessions based on these fields for GetActiveSessions API
"client_protocol": 1.0, // version of protocol to use when acting as JSON-PRC client <"0","1.0">
"channel_sync_interval": "0", // sync channels to detect stale sessions (0 to disable)
"terminate_attempts": 5 // attempts to get the session before terminating it
},

View File

@@ -495,6 +495,7 @@ func TestSmgJsonCfg(t *testing.T) {
Session_indexes: &[]string{},
Client_protocol: utils.Float64Pointer(1.0),
Channel_sync_interval: utils.StringPointer("0"),
Terminate_attempts: utils.IntPointer(5),
}
if cfg, err := dfCgrJsonCfg.SessionSJsonCfg(); err != nil {
t.Error(err)

View File

@@ -637,6 +637,7 @@ func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) {
SessionIndexes: utils.StringMap{},
ClientProtocol: 1.0,
ChannelSyncInterval: 0,
TerminateAttempts: 5,
}
if !reflect.DeepEqual(eSessionSCfg, cgrCfg.sessionSCfg) {
t.Errorf("expecting: %s, received: %s",

View File

@@ -193,6 +193,7 @@ type SessionSJsonCfg struct {
Session_indexes *[]string
Client_protocol *float64
Channel_sync_interval *string
Terminate_attempts *int
}
// FreeSWITCHAgent config section

View File

@@ -121,6 +121,7 @@ type SessionSCfg struct {
SessionIndexes utils.StringMap
ClientProtocol float64
ChannelSyncInterval time.Duration
TerminateAttempts int
}
func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) {
@@ -244,6 +245,9 @@ func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) {
return err
}
}
if jsnCfg.Terminate_attempts != nil {
self.TerminateAttempts = *jsnCfg.Terminate_attempts
}
return nil
}

View File

@@ -53,6 +53,13 @@ var (
testSes3ItProcessEvent,
testSes3ItThreshold1002After2,
testSes3ItStatMetricsAfter2,
testSes3ItAddVoiceBalance,
testSes3ItTerminatWithoutInit,
testSes3ItInitAfterTerminate,
testSes3ItBalance,
testSes3ItCDRs,
testSes3ItStopCgrEngine,
}
)
@@ -238,6 +245,155 @@ func testSes3ItStatMetricsAfter2(t *testing.T) {
}
}
func testSes3ItAddVoiceBalance(t *testing.T) {
attrSetBalance := utils.AttrSetBalance{
Tenant: "cgrates.org",
Account: "1002",
BalanceType: utils.VOICE,
BalanceID: utils.StringPointer("TestDynamicDebitBalance"),
Value: utils.Float64Pointer(5 * float64(time.Second)),
RatingSubject: utils.StringPointer("*zero5ms"),
}
var reply string
if err := ses3RPC.Call("ApierV2.SetBalance", attrSetBalance, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Received: %s", reply)
}
var acnt *engine.Account
attrs := &utils.AttrGetAccount{
Tenant: "cgrates.org",
Account: "1002",
}
if err := ses3RPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
} else if rply := acnt.BalanceMap[utils.VOICE].GetTotalValue(); rply != float64(5*time.Second) {
t.Errorf("Expecting: %v, received: %v",
float64(5*time.Second), rply)
}
}
func testSes3ItTerminatWithoutInit(t *testing.T) {
go func() { // used in a gorutine to not block the test
// because it needs to call initSession when the call for Teminate is still active
args := &sessions.V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSesItUpdateSession",
Event: map[string]interface{}{
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.ToR: utils.VOICE,
utils.OriginID: "TestTerminate",
utils.RequestType: utils.META_PREPAID,
utils.Account: "1002",
utils.Subject: "1001",
utils.Destination: "1001",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC),
utils.Usage: 2 * time.Second,
},
},
}
var rply string
if err := ses3RPC.Call(utils.SessionSv1TerminateSession,
args, &rply); err != nil {
t.Error(err)
}
if rply != utils.OK {
t.Errorf("Unexpected reply: %s", rply)
}
}()
}
func testSes3ItInitAfterTerminate(t *testing.T) {
time.Sleep(3 * time.Millisecond)
args1 := &sessions.V1InitSessionArgs{
InitSession: true,
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSesItInitiateSession",
Event: map[string]interface{}{
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.ToR: utils.VOICE,
utils.OriginID: "TestTerminate",
utils.RequestType: utils.META_PREPAID,
utils.Account: "1002",
utils.Subject: "1001",
utils.Destination: "1001",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC),
utils.Usage: 5 * time.Second,
},
},
}
var rply1 sessions.V1InitSessionReply
if err := ses3RPC.Call(utils.SessionSv1InitiateSession,
args1, &rply1); err != nil {
t.Error(err)
return
} else if *rply1.MaxUsage != 0 {
t.Errorf("Unexpected MaxUsage: %v", rply1.MaxUsage)
}
time.Sleep(5 * time.Millisecond)
aSessions := make([]*sessions.ExternalSession, 0)
if err := ses3RPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func testSes3ItBalance(t *testing.T) {
time.Sleep(10 * time.Millisecond)
var acnt *engine.Account
attrs := &utils.AttrGetAccount{
Tenant: "cgrates.org",
Account: "1002",
}
if err := ses3RPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
} else if rply := acnt.BalanceMap[utils.VOICE].GetTotalValue(); rply != float64(3*time.Second) {
t.Errorf("Expecting: %v, received: %v",
3*time.Second, rply)
}
}
func testSes3ItCDRs(t *testing.T) {
var reply string
if err := ses3RPC.Call(utils.SessionSv1ProcessCDR, &utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSesItProccesCDR",
Event: map[string]interface{}{
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.ToR: utils.VOICE,
utils.OriginID: "TestTerminate",
utils.RequestType: utils.META_PREPAID,
utils.Account: "1002",
utils.Subject: "1001",
utils.Destination: "1001",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC),
utils.Usage: 2 * time.Second,
}}, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Received reply: %s", reply)
}
time.Sleep(20 * time.Millisecond)
var cdrs []*engine.ExternalCDR
req := utils.RPCCDRsFilter{RunIDs: []string{"CustomerCharges"},
Accounts: []string{"1002"}}
if err := ses3RPC.Call(utils.ApierV2GetCDRs, req, &cdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(cdrs) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
} else if cdrs[0].Usage != "2s" {
t.Errorf("Unexpected CDR Usage received, cdr: %v %+v ", cdrs[0].Usage, cdrs[0])
}
}
func testSes3ItStopCgrEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)

View File

@@ -528,7 +528,7 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int,
return
}
for i := 0; i < 3; {
for i := 0; i < sS.cgrCfg.SessionSCfg().TerminateAttempts; {
var maxDebit time.Duration
if maxDebit, err = sS.debitSession(s, sRunIdx, dbtIvl, nil); err != nil {
utils.Logger.Warning(
@@ -2487,19 +2487,26 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection,
return utils.NewErrRALs(err)
}
}
ss := sS.getRelocateSessions(cgrID,
me.GetStringIgnoreErrors(utils.InitialOriginID),
me.GetStringIgnoreErrors(utils.OriginID),
me.GetStringIgnoreErrors(utils.OriginHost))
var s *Session
if len(ss) == 0 {
fib := utils.Fib()
for i := 0; i < sS.cgrCfg.SessionSCfg().TerminateAttempts; i++ {
ss := sS.getRelocateSessions(cgrID,
me.GetStringIgnoreErrors(utils.InitialOriginID),
me.GetStringIgnoreErrors(utils.OriginID),
me.GetStringIgnoreErrors(utils.OriginHost))
if len(ss) != 0 {
s = ss[0]
break
}
if i+1 < sS.cgrCfg.SessionSCfg().TerminateAttempts { // not last iteration
time.Sleep(time.Duration(fib()) * time.Millisecond)
continue
}
if s, err = sS.initSession(args.CGREvent.Tenant,
ev, sS.biJClntID(clnt),
me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil {
return utils.NewErrRALs(err)
}
} else {
s = ss[0]
}
if err = sS.endSession(s,
me.GetDurationPtrIgnoreErrors(utils.Usage),