Update call test with sync sessions

This commit is contained in:
TeoV
2018-06-19 09:49:51 -04:00
committed by Dan Christian Bogos
parent 6ee65c92ce
commit 46c17e2567
4 changed files with 54 additions and 26 deletions

View File

@@ -80,7 +80,7 @@
"thresholds_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"debit_interval": "10s",
"debit_interval": "5s",
"channel_sync_interval":"5s",
},
@@ -90,6 +90,9 @@
"sessions_conns": [
{"address": "*internal"}
],
"event_socket_conns":[
{"address": "127.0.0.1:8021", "password": "ClueCon", "reconnects": -1,"alias":""}
],
},

View File

@@ -75,7 +75,6 @@ var sTestsCalls = []func(t *testing.T){
testCallCheckThreshold1001After,
testCallCheckThreshold1002After,
testCallSyncSessions,
//de completat
testCallStopPjsuaListener,
testCallStopCgrEngine,
testCallStopFS,
@@ -570,22 +569,27 @@ func testCallCheckThreshold1002After(t *testing.T) {
}
func testCallSyncSessions(t *testing.T) {
var reply *[]*sessions.ActiveSession
// activeSessions shouldn't be active
if err := tutorialCallsRpc.Call(utils.SessionSv1GetActiveSessions,
&map[string]string{}, &reply); err.Error() != utils.ErrNotFound.Error() {
t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error())
}
// 1001 call 1002 stop the call after 12 seconds
if err := engine.PjsuaCallUri(
&engine.PjsuaAccount{Id: "sip:1001@127.0.0.1", Username: "1001", Password: "CGRateS.org", Realm: "*"},
"sip:1002@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(67)*time.Second, 5071); err != nil {
"sip:1002@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(120)*time.Second, 5071); err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Second)
// 1001 call 1003 stop the call after 11 seconds
if err := engine.PjsuaCallUri(
&engine.PjsuaAccount{Id: "sip:1001@127.0.0.1", Username: "1001", Password: "CGRateS.org", Realm: "*"},
"sip:1003@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(60)*time.Second, 5073); err != nil {
"sip:1003@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(120)*time.Second, 5073); err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Second)
// get active sessions
var reply *[]*sessions.ActiveSession
if err := tutorialCallsRpc.Call(utils.SessionSv1GetActiveSessions,
&map[string]string{}, &reply); err != nil {
t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error())
@@ -593,19 +597,21 @@ func testCallSyncSessions(t *testing.T) {
t.Errorf("expecting 2, received reply: %+v", utils.ToJSON(reply))
}
time.Sleep(10 * time.Second)
time.Sleep(3 * time.Second)
//stop the FS
waitTime := time.Duration(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval + 2*time.Second)
switch optConf {
case utils.Freeswitch:
engine.ForceKillProcName(utils.Freeswitch, int(waitTime.Seconds()*1e3))
engine.ForceKillProcName(utils.Freeswitch,
int(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval.Nanoseconds()/1e6))
case utils.Kamailio:
engine.ForceKillProcName(utils.Kamailio, int(waitTime.Seconds()*1e3))
engine.ForceKillProcName(utils.Kamailio,
int(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval.Nanoseconds()/1e6))
case utils.Opensips:
engine.ForceKillProcName(utils.Opensips, int(waitTime.Seconds()*1e3))
engine.ForceKillProcName(utils.Opensips,
int(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval.Nanoseconds()/1e6))
case utils.Asterisk:
engine.ForceKillProcName(utils.Asterisk, int(waitTime.Seconds()*1e3))
engine.ForceKillProcName(utils.Asterisk,
int(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval.Nanoseconds()/1e6))
default:
t.Errorf("Unsuported format")
}
@@ -615,14 +621,25 @@ func testCallSyncSessions(t *testing.T) {
&map[string]string{}, &reply); err.Error() != utils.ErrNotFound.Error() {
t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error())
}
// verify cdr
var rplCdrs []*engine.ExternalCDR
req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, Accounts: []string{"1001"}}
req := utils.RPCCDRsFilter{Sources: []string{utils.MetaSessionS},
RunIDs: []string{utils.META_DEFAULT}, Accounts: []string{"1001"}}
if err := tutorialCallsRpc.Call("ApierV2.GetCdrs", req, &rplCdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(rplCdrs) != 4 { // cdr from sync session + cdr from before
} else if len(rplCdrs) != 2 { // cdr from sync session + cdr from before
t.Error("Unexpected number of CDRs returned: ", len(rplCdrs))
} else if time1, err := utils.ParseDurationWithSecs(rplCdrs[0].Usage); err != nil {
t.Error(err)
} else if time1 > time.Duration(15*time.Second) {
t.Error("Unexpected time duration : ", time1)
} else if time1, err := utils.ParseDurationWithSecs(rplCdrs[1].Usage); err != nil {
t.Error(err)
} else if time1 > time.Duration(15*time.Second) {
t.Error("Unexpected time duration : ", time1)
}
}
func testCallStopPjsuaListener(t *testing.T) {

View File

@@ -2021,20 +2021,24 @@ func (smg *SMGeneric) syncSessions() {
}
}
}
var toBeRemoved []string
smg.aSessionsMux.RLock()
for cgrid, smgSessions := range smg.activeSessions {
if _, has := queriedCGRIDs[cgrid]; has {
continue
}
for _, session := range smgSessions {
tmtr := &smgSessionTerminator{
ttlLastUsed: &session.LastUsage,
ttlUsage: &session.TotalUsage,
}
smg.ttlTerminate(session, tmtr)
for cgrid := range smg.activeSessions {
if _, has := queriedCGRIDs[cgrid]; !has {
toBeRemoved = append(toBeRemoved, cgrid)
}
}
smg.aSessionsMux.RUnlock()
for _, cgrID := range toBeRemoved {
aSessions := smg.getSessions(cgrID, false)
if len(aSessions[cgrID]) == 0 {
continue
}
terminator := &smgSessionTerminator{
ttl: time.Duration(0),
}
smg.ttlTerminate(aSessions[cgrID][0], terminator)
}
}
func (smg *SMGeneric) BiRPCv1RegisterInternalBiJSONConn(clnt rpcclient.RpcClientConnection,

View File

@@ -310,7 +310,11 @@ func (self SMGenericEvent) GetOriginatorIP(fieldName string) string {
}
func (self SMGenericEvent) GetCdrSource() string {
return utils.SMG + "_" + self.GetName()
if self.GetName() != "" {
return utils.MetaSessionS + "_" + self.GetName()
}
return utils.MetaSessionS
}
func (self SMGenericEvent) GetExtraFields() map[string]string {