From 46c17e25672abe9c8a1600834d0ea6aecb7ce17f Mon Sep 17 00:00:00 2001 From: TeoV Date: Tue, 19 Jun 2018 09:49:51 -0400 Subject: [PATCH] Update call test with sync sessions --- .../cgrates/etc/cgrates/cgrates.json | 5 ++- general_tests/tutorial_calls_test.go | 45 +++++++++++++------ sessions/sessions.go | 24 +++++----- sessions/sevent.go | 6 ++- 4 files changed, 54 insertions(+), 26 deletions(-) diff --git a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json index 7d9f6f14d..446b16eb7 100644 --- a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json @@ -80,7 +80,7 @@ "thresholds_conns": [ {"address": "127.0.0.1:2012", "transport": "*json"} ], - "debit_interval": "10s", + "debit_interval": "5s", "channel_sync_interval":"5s", }, @@ -90,6 +90,9 @@ "sessions_conns": [ {"address": "*internal"} ], + "event_socket_conns":[ + {"address": "127.0.0.1:8021", "password": "ClueCon", "reconnects": -1,"alias":""} + ], }, diff --git a/general_tests/tutorial_calls_test.go b/general_tests/tutorial_calls_test.go index f53747fbd..55caddb9f 100755 --- a/general_tests/tutorial_calls_test.go +++ b/general_tests/tutorial_calls_test.go @@ -75,7 +75,6 @@ var sTestsCalls = []func(t *testing.T){ testCallCheckThreshold1001After, testCallCheckThreshold1002After, testCallSyncSessions, - //de completat testCallStopPjsuaListener, testCallStopCgrEngine, testCallStopFS, @@ -570,22 +569,27 @@ func testCallCheckThreshold1002After(t *testing.T) { } func testCallSyncSessions(t *testing.T) { + var reply *[]*sessions.ActiveSession + // activeSessions shouldn't be active + if err := tutorialCallsRpc.Call(utils.SessionSv1GetActiveSessions, + &map[string]string{}, &reply); err.Error() != utils.ErrNotFound.Error() { + t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error()) + } // 1001 call 1002 stop the call after 12 seconds if err := engine.PjsuaCallUri( &engine.PjsuaAccount{Id: "sip:1001@127.0.0.1", Username: "1001", Password: "CGRateS.org", Realm: "*"}, - "sip:1002@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(67)*time.Second, 5071); err != nil { + "sip:1002@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(120)*time.Second, 5071); err != nil { t.Fatal(err) } - time.Sleep(1 * time.Second) // 1001 call 1003 stop the call after 11 seconds if err := engine.PjsuaCallUri( &engine.PjsuaAccount{Id: "sip:1001@127.0.0.1", Username: "1001", Password: "CGRateS.org", Realm: "*"}, - "sip:1003@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(60)*time.Second, 5073); err != nil { + "sip:1003@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(120)*time.Second, 5073); err != nil { t.Fatal(err) } time.Sleep(1 * time.Second) // get active sessions - var reply *[]*sessions.ActiveSession + if err := tutorialCallsRpc.Call(utils.SessionSv1GetActiveSessions, &map[string]string{}, &reply); err != nil { t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error()) @@ -593,19 +597,21 @@ func testCallSyncSessions(t *testing.T) { t.Errorf("expecting 2, received reply: %+v", utils.ToJSON(reply)) } - time.Sleep(10 * time.Second) + time.Sleep(3 * time.Second) //stop the FS - waitTime := time.Duration(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval + 2*time.Second) - switch optConf { case utils.Freeswitch: - engine.ForceKillProcName(utils.Freeswitch, int(waitTime.Seconds()*1e3)) + engine.ForceKillProcName(utils.Freeswitch, + int(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval.Nanoseconds()/1e6)) case utils.Kamailio: - engine.ForceKillProcName(utils.Kamailio, int(waitTime.Seconds()*1e3)) + engine.ForceKillProcName(utils.Kamailio, + int(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval.Nanoseconds()/1e6)) case utils.Opensips: - engine.ForceKillProcName(utils.Opensips, int(waitTime.Seconds()*1e3)) + engine.ForceKillProcName(utils.Opensips, + int(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval.Nanoseconds()/1e6)) case utils.Asterisk: - engine.ForceKillProcName(utils.Asterisk, int(waitTime.Seconds()*1e3)) + engine.ForceKillProcName(utils.Asterisk, + int(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval.Nanoseconds()/1e6)) default: t.Errorf("Unsuported format") } @@ -615,14 +621,25 @@ func testCallSyncSessions(t *testing.T) { &map[string]string{}, &reply); err.Error() != utils.ErrNotFound.Error() { t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error()) } + // verify cdr var rplCdrs []*engine.ExternalCDR - req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, Accounts: []string{"1001"}} + req := utils.RPCCDRsFilter{Sources: []string{utils.MetaSessionS}, + RunIDs: []string{utils.META_DEFAULT}, Accounts: []string{"1001"}} if err := tutorialCallsRpc.Call("ApierV2.GetCdrs", req, &rplCdrs); err != nil { t.Error("Unexpected error: ", err.Error()) - } else if len(rplCdrs) != 4 { // cdr from sync session + cdr from before + } else if len(rplCdrs) != 2 { // cdr from sync session + cdr from before t.Error("Unexpected number of CDRs returned: ", len(rplCdrs)) + } else if time1, err := utils.ParseDurationWithSecs(rplCdrs[0].Usage); err != nil { + t.Error(err) + } else if time1 > time.Duration(15*time.Second) { + t.Error("Unexpected time duration : ", time1) + } else if time1, err := utils.ParseDurationWithSecs(rplCdrs[1].Usage); err != nil { + t.Error(err) + } else if time1 > time.Duration(15*time.Second) { + t.Error("Unexpected time duration : ", time1) } + } func testCallStopPjsuaListener(t *testing.T) { diff --git a/sessions/sessions.go b/sessions/sessions.go index dbc0d3324..f0d7aa7d6 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -2021,20 +2021,24 @@ func (smg *SMGeneric) syncSessions() { } } } + var toBeRemoved []string smg.aSessionsMux.RLock() - for cgrid, smgSessions := range smg.activeSessions { - if _, has := queriedCGRIDs[cgrid]; has { - continue - } - for _, session := range smgSessions { - tmtr := &smgSessionTerminator{ - ttlLastUsed: &session.LastUsage, - ttlUsage: &session.TotalUsage, - } - smg.ttlTerminate(session, tmtr) + for cgrid := range smg.activeSessions { + if _, has := queriedCGRIDs[cgrid]; !has { + toBeRemoved = append(toBeRemoved, cgrid) } } smg.aSessionsMux.RUnlock() + for _, cgrID := range toBeRemoved { + aSessions := smg.getSessions(cgrID, false) + if len(aSessions[cgrID]) == 0 { + continue + } + terminator := &smgSessionTerminator{ + ttl: time.Duration(0), + } + smg.ttlTerminate(aSessions[cgrID][0], terminator) + } } func (smg *SMGeneric) BiRPCv1RegisterInternalBiJSONConn(clnt rpcclient.RpcClientConnection, diff --git a/sessions/sevent.go b/sessions/sevent.go index 8951806b1..bd045b8fa 100644 --- a/sessions/sevent.go +++ b/sessions/sevent.go @@ -310,7 +310,11 @@ func (self SMGenericEvent) GetOriginatorIP(fieldName string) string { } func (self SMGenericEvent) GetCdrSource() string { - return utils.SMG + "_" + self.GetName() + if self.GetName() != "" { + return utils.MetaSessionS + "_" + self.GetName() + } + return utils.MetaSessionS + } func (self SMGenericEvent) GetExtraFields() map[string]string {