From 6ee65c92ce45274a217df52b8a6f76999b1a458c Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 18 Jun 2018 11:25:14 -0400 Subject: [PATCH] Update calls test with sync session --- agents/fsagent.go | 2 +- agents/fsevent.go | 2 +- config/config_test.go | 10 +- config/smconfig.go | 2 +- .../cgrates/etc/cgrates/cgrates.json | 1 + engine/libtest.go | 8 ++ general_tests/tutorial_calls_test.go | 132 ++++++++++++++---- sessions/sessions.go | 22 ++- 8 files changed, 129 insertions(+), 50 deletions(-) diff --git a/agents/fsagent.go b/agents/fsagent.go index e47839241..d6b33e18a 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -21,7 +21,6 @@ package agents import ( "errors" "fmt" - //"net" "time" "github.com/cgrates/cgrates/config" @@ -248,6 +247,7 @@ func (sm *FSsessions) onChannelHangupComplete(fsev FSEvent, connId string) { return } var reply string + fsev[VarCGROriginHost] = sm.conns[connId].cfg.Alias if fsev[VarAnswerEpoch] != "0" { // call was answered if err := sm.smg.Call(utils.SessionSv1TerminateSession, fsev.V1TerminateSessionArgs(), &reply); err != nil { diff --git a/agents/fsevent.go b/agents/fsevent.go index c2265a33e..e49c8c430 100644 --- a/agents/fsevent.go +++ b/agents/fsevent.go @@ -306,7 +306,7 @@ func (fsev FSEvent) ParseEventValue(rsrFld *utils.RSRField, timezone string) (pa case utils.OriginID: return rsrFld.Parse(fsev.GetUUID()) case utils.OriginHost: - return rsrFld.Parse(fsev["FreeSWITCH-IPv4"]) + return rsrFld.Parse(utils.FirstNonEmpty(fsev[VarCGROriginHost], fsev[FS_IPv4])) case utils.Source: return rsrFld.Parse("FS_EVENT") case utils.RequestType: diff --git a/config/config_test.go b/config/config_test.go index c3d2eea7c..9b6435984 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -43,8 +43,8 @@ func TestCgrCfgLoadWithDefaults(t *testing.T) { "freeswitch_agent": { "enabled": true, // starts SessionManager service: "event_socket_conns":[ // instantiate connections to multiple FreeSWITCH servers - {"address": "1.2.3.4:8021", "password": "ClueCon", "reconnects": 3, "alias":""}, - {"address": "1.2.3.5:8021", "password": "ClueCon", "reconnects": 5, "alias":""} + {"address": "1.2.3.4:8021", "password": "ClueCon", "reconnects": 3, "alias":"123"}, + {"address": "1.2.3.5:8021", "password": "ClueCon", "reconnects": 5, "alias":"124"} ], }, @@ -55,8 +55,8 @@ func TestCgrCfgLoadWithDefaults(t *testing.T) { } eCgrCfg.fsAgentCfg.Enabled = true eCgrCfg.fsAgentCfg.EventSocketConns = []*FsConnConfig{ - &FsConnConfig{Address: "1.2.3.4:8021", Password: "ClueCon", Reconnects: 3, Alias: ""}, - &FsConnConfig{Address: "1.2.3.5:8021", Password: "ClueCon", Reconnects: 5, Alias: ""}, + &FsConnConfig{Address: "1.2.3.4:8021", Password: "ClueCon", Reconnects: 3, Alias: "123"}, + &FsConnConfig{Address: "1.2.3.5:8021", Password: "ClueCon", Reconnects: 5, Alias: "124"}, } if cgrCfg, err := NewCGRConfigFromJsonStringWithDefaults(JSN_CFG); err != nil { t.Error(err) @@ -714,7 +714,7 @@ func TestCgrCfgJSONDefaultsFsAgentConfig(t *testing.T) { MaxWaitConnection: 2 * time.Second, EventSocketConns: []*FsConnConfig{ &FsConnConfig{Address: "127.0.0.1:8021", - Password: "ClueCon", Reconnects: 5, Alias: ""}}, + Password: "ClueCon", Reconnects: 5, Alias: "127.0.0.1:8021"}}, } if !reflect.DeepEqual(cgrCfg.fsAgentCfg, eFsAgentCfg) { diff --git a/config/smconfig.go b/config/smconfig.go index 1c9488952..cc96b9300 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -87,7 +87,7 @@ func (self *FsConnConfig) loadFromJsonCfg(jsnCfg *FsConnJsonCfg) error { self.Reconnects = *jsnCfg.Reconnects } self.Alias = self.Address - if jsnCfg.Alias != nil { + if jsnCfg.Alias != nil && *jsnCfg.Alias != "" { self.Alias = *jsnCfg.Alias } diff --git a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json index d08afa23e..7d9f6f14d 100644 --- a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json @@ -81,6 +81,7 @@ {"address": "127.0.0.1:2012", "transport": "*json"} ], "debit_interval": "10s", + "channel_sync_interval":"5s", }, diff --git a/engine/libtest.go b/engine/libtest.go index 52a6ac602..1be8cb8a5 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -209,6 +209,14 @@ func KillProcName(procName string, waitMs int) error { return nil } +func ForceKillProcName(procName string, waitMs int) error { + if err := exec.Command("pkill", "-9", procName).Run(); err != nil { + return err + } + time.Sleep(time.Duration(waitMs) * time.Millisecond) + return nil +} + func CallScript(scriptPath string, subcommand string, waitMs int) error { if err := exec.Command(scriptPath, subcommand).Run(); err != nil { return err diff --git a/general_tests/tutorial_calls_test.go b/general_tests/tutorial_calls_test.go index 9307c311d..f53747fbd 100755 --- a/general_tests/tutorial_calls_test.go +++ b/general_tests/tutorial_calls_test.go @@ -74,6 +74,7 @@ var sTestsCalls = []func(t *testing.T){ testCallCheckResourceRelease, testCallCheckThreshold1001After, testCallCheckThreshold1002After, + testCallSyncSessions, //de completat testCallStopPjsuaListener, testCallStopCgrEngine, @@ -114,26 +115,29 @@ func TestAsteriskCalls(t *testing.T) { func testCallInitCfg(t *testing.T) { // Init config first var err error - if optConf == utils.Freeswitch { + switch optConf { + case utils.Freeswitch: tutorialCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*fsConfig, "cgrates", "etc", "cgrates")) if err != nil { t.Error(err) } - } else if optConf == utils.Kamailio { + case utils.Kamailio: tutorialCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*kamConfig, "cgrates", "etc", "cgrates")) if err != nil { t.Error(err) } - } else if optConf == utils.Opensips { + case utils.Opensips: tutorialCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*oSipsConfig, "cgrates", "etc", "cgrates")) if err != nil { t.Error(err) } - } else if optConf == utils.Asterisk { + case utils.Asterisk: tutorialCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*ariConf, "cgrates", "etc", "cgrates")) if err != nil { t.Error(err) } + default: + t.Error("Invalid option") } tutorialCallsCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() @@ -156,69 +160,82 @@ func testCallResetStorDb(t *testing.T) { // start FS server func testCallStartFS(t *testing.T) { - if optConf == utils.Freeswitch { + switch optConf { + case utils.Freeswitch: engine.KillProcName(utils.Freeswitch, 5000) if err := engine.CallScript(path.Join(*fsConfig, "freeswitch", "etc", "init.d", "freeswitch"), "start", 3000); err != nil { t.Fatal(err) } - } else if optConf == utils.Kamailio { + case utils.Kamailio: engine.KillProcName(utils.Kamailio, 5000) if err := engine.CallScript(path.Join(*kamConfig, "kamailio", "etc", "init.d", "kamailio"), "start", 3000); err != nil { t.Fatal(err) } - } else if optConf == utils.Opensips { + case utils.Opensips: engine.KillProcName(utils.Kamailio, 5000) if err := engine.CallScript(path.Join(*oSipsConfig, "opensips", "etc", "init.d", "opensips"), "start", 3000); err != nil { t.Fatal(err) } - } else if optConf == utils.Asterisk { + case utils.Asterisk: engine.KillProcName(utils.Asterisk, 5000) if err := engine.CallScript(path.Join(*ariConf, "asterisk", "etc", "init.d", "asterisk"), "start", 3000); err != nil { t.Fatal(err) } + default: + t.Error("Invalid option") } } // Start CGR Engine func testCallStartEngine(t *testing.T) { engine.KillProcName("cgr-engine", *waitRater) - if optConf == utils.Freeswitch { + switch optConf { + case utils.Freeswitch: if err := engine.CallScript(path.Join(*fsConfig, "cgrates", "etc", "init.d", "cgrates"), "start", 100); err != nil { t.Fatal(err) } - } else if optConf == utils.Kamailio { + case utils.Kamailio: if err := engine.CallScript(path.Join(*kamConfig, "cgrates", "etc", "init.d", "cgrates"), "start", 100); err != nil { t.Fatal(err) } - } else if optConf == utils.Opensips { + case utils.Opensips: if err := engine.CallScript(path.Join(*oSipsConfig, "cgrates", "etc", "init.d", "cgrates"), "start", 100); err != nil { t.Fatal(err) } - } else if optConf == utils.Asterisk { + case utils.Asterisk: if err := engine.CallScript(path.Join(*ariConf, "cgrates", "etc", "init.d", "cgrates"), "start", 100); err != nil { t.Fatal(err) } + default: + t.Error("invalid option") } } // Restart FS so we make sure reconnects are working func testCallRestartFS(t *testing.T) { - if optConf == utils.Freeswitch { - if err := engine.CallScript(path.Join(*fsConfig, "freeswitch", "etc", "init.d", "freeswitch"), "restart", 5000); err != nil { + switch optConf { + case utils.Freeswitch: + engine.KillProcName(utils.Freeswitch, 5000) + if err := engine.CallScript(path.Join(*fsConfig, "freeswitch", "etc", "init.d", "freeswitch"), "restart", 3000); err != nil { t.Fatal(err) } - } else if optConf == utils.Kamailio { - if err := engine.CallScript(path.Join(*kamConfig, "kamailio", "etc", "init.d", "kamailio"), "restart", 5000); err != nil { + case utils.Kamailio: + engine.KillProcName(utils.Kamailio, 5000) + if err := engine.CallScript(path.Join(*kamConfig, "kamailio", "etc", "init.d", "kamailio"), "restart", 3000); err != nil { t.Fatal(err) } - } else if optConf == utils.Opensips { - if err := engine.CallScript(path.Join(*oSipsConfig, "opensips", "etc", "init.d", "opensips"), "restart", 5000); err != nil { + case utils.Opensips: + engine.KillProcName(utils.Kamailio, 5000) + if err := engine.CallScript(path.Join(*oSipsConfig, "opensips", "etc", "init.d", "opensips"), "restart", 3000); err != nil { t.Fatal(err) } - } else if optConf == utils.Asterisk { - if err := engine.CallScript(path.Join(*ariConf, "asterisk", "etc", "init.d", "asterisk"), "restart", 5000); err != nil { + case utils.Asterisk: + engine.KillProcName(utils.Asterisk, 5000) + if err := engine.CallScript(path.Join(*ariConf, "asterisk", "etc", "init.d", "asterisk"), "restart", 3000); err != nil { t.Fatal(err) } + default: + t.Error("Invalid option") } } @@ -552,6 +569,62 @@ func testCallCheckThreshold1002After(t *testing.T) { } } +func testCallSyncSessions(t *testing.T) { + // 1001 call 1002 stop the call after 12 seconds + if err := engine.PjsuaCallUri( + &engine.PjsuaAccount{Id: "sip:1001@127.0.0.1", Username: "1001", Password: "CGRateS.org", Realm: "*"}, + "sip:1002@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(67)*time.Second, 5071); err != nil { + t.Fatal(err) + } + time.Sleep(1 * time.Second) + // 1001 call 1003 stop the call after 11 seconds + if err := engine.PjsuaCallUri( + &engine.PjsuaAccount{Id: "sip:1001@127.0.0.1", Username: "1001", Password: "CGRateS.org", Realm: "*"}, + "sip:1003@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(60)*time.Second, 5073); err != nil { + t.Fatal(err) + } + time.Sleep(1 * time.Second) + // get active sessions + var reply *[]*sessions.ActiveSession + if err := tutorialCallsRpc.Call(utils.SessionSv1GetActiveSessions, + &map[string]string{}, &reply); err != nil { + t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error()) + } else if len(*reply) != 2 { + t.Errorf("expecting 2, received reply: %+v", utils.ToJSON(reply)) + } + + time.Sleep(10 * time.Second) + //stop the FS + waitTime := time.Duration(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval + 2*time.Second) + + switch optConf { + case utils.Freeswitch: + engine.ForceKillProcName(utils.Freeswitch, int(waitTime.Seconds()*1e3)) + case utils.Kamailio: + engine.ForceKillProcName(utils.Kamailio, int(waitTime.Seconds()*1e3)) + case utils.Opensips: + engine.ForceKillProcName(utils.Opensips, int(waitTime.Seconds()*1e3)) + case utils.Asterisk: + engine.ForceKillProcName(utils.Asterisk, int(waitTime.Seconds()*1e3)) + default: + t.Errorf("Unsuported format") + } + + // activeSessions shouldn't be active + if err := tutorialCallsRpc.Call(utils.SessionSv1GetActiveSessions, + &map[string]string{}, &reply); err.Error() != utils.ErrNotFound.Error() { + t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error()) + } + // verify cdr + var rplCdrs []*engine.ExternalCDR + req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, Accounts: []string{"1001"}} + if err := tutorialCallsRpc.Call("ApierV2.GetCdrs", req, &rplCdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(rplCdrs) != 4 { // cdr from sync session + cdr from before + t.Error("Unexpected number of CDRs returned: ", len(rplCdrs)) + } +} + func testCallStopPjsuaListener(t *testing.T) { tutorialCallsPjSuaListener.Write([]byte("q\n")) // Close pjsua time.Sleep(time.Duration(1) * time.Second) // Allow pjsua to finish it's tasks, eg un-REGISTER @@ -564,13 +637,16 @@ func testCallStopCgrEngine(t *testing.T) { } func testCallStopFS(t *testing.T) { - if optConf == utils.Freeswitch { - engine.KillProcName(utils.Freeswitch, 1000) - } else if optConf == utils.Kamailio { - engine.KillProcName(utils.Kamailio, 1000) - } else if optConf == utils.Opensips { - engine.KillProcName(utils.Opensips, 1000) - } else if optConf == utils.Asterisk { - engine.KillProcName(utils.Asterisk, 1000) + switch optConf { + case utils.Freeswitch: + engine.ForceKillProcName(utils.Freeswitch, 1000) + case utils.Kamailio: + engine.ForceKillProcName(utils.Kamailio, 1000) + case utils.Opensips: + engine.ForceKillProcName(utils.Opensips, 1000) + case utils.Asterisk: + engine.ForceKillProcName(utils.Asterisk, 1000) + default: + t.Errorf("Unsuported format") } } diff --git a/sessions/sessions.go b/sessions/sessions.go index 3290e5eb4..dbc0d3324 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -2007,7 +2007,6 @@ func (smg *SMGeneric) syncSessions() { rpcClnts = append(rpcClnts, conn) } queriedCGRIDs := make(utils.StringMap) - utils.Logger.Info("Enter on sync --------------") for _, conn := range rpcClnts { var queriedSessionIDs []*SessionID if conn != nil { @@ -2017,28 +2016,23 @@ func (smg *SMGeneric) syncSessions() { fmt.Sprintf("error quering session ids : %+v", err)) continue } - utils.Logger.Info(fmt.Sprintf("queriedSessionIDs : %+v", utils.ToJSON(queriedSessionIDs))) for _, sessionID := range queriedSessionIDs { queriedCGRIDs[sessionID.CGRID()] = true } } } - utils.Logger.Info(fmt.Sprintf("queriedCGRIDs : %+v", queriedCGRIDs)) smg.aSessionsMux.RLock() - utils.Logger.Info(fmt.Sprintf("smg.activeSessions : %+v", smg.activeSessions)) - for cgrid, _ := range smg.activeSessions { + for cgrid, smgSessions := range smg.activeSessions { if _, has := queriedCGRIDs[cgrid]; has { - utils.Logger.Info("gaseste CGRID") continue } - utils.Logger.Info("nu gaseste CGRID") - // for _, session := range smgSessions { - // tmtr := &smgSessionTerminator{ - // ttlLastUsed: &session.LastUsage, - // ttlUsage: &session.TotalUsage, - // } - // smg.ttlTerminate(session, tmtr) - // } + for _, session := range smgSessions { + tmtr := &smgSessionTerminator{ + ttlLastUsed: &session.LastUsage, + ttlUsage: &session.TotalUsage, + } + smg.ttlTerminate(session, tmtr) + } } smg.aSessionsMux.RUnlock() }