Update calls test with sync session

This commit is contained in:
TeoV
2018-06-18 11:25:14 -04:00
committed by Dan Christian Bogos
parent f93c0238ac
commit 6ee65c92ce
8 changed files with 129 additions and 50 deletions

View File

@@ -21,7 +21,6 @@ package agents
import (
"errors"
"fmt"
//"net"
"time"
"github.com/cgrates/cgrates/config"
@@ -248,6 +247,7 @@ func (sm *FSsessions) onChannelHangupComplete(fsev FSEvent, connId string) {
return
}
var reply string
fsev[VarCGROriginHost] = sm.conns[connId].cfg.Alias
if fsev[VarAnswerEpoch] != "0" { // call was answered
if err := sm.smg.Call(utils.SessionSv1TerminateSession,
fsev.V1TerminateSessionArgs(), &reply); err != nil {

View File

@@ -306,7 +306,7 @@ func (fsev FSEvent) ParseEventValue(rsrFld *utils.RSRField, timezone string) (pa
case utils.OriginID:
return rsrFld.Parse(fsev.GetUUID())
case utils.OriginHost:
return rsrFld.Parse(fsev["FreeSWITCH-IPv4"])
return rsrFld.Parse(utils.FirstNonEmpty(fsev[VarCGROriginHost], fsev[FS_IPv4]))
case utils.Source:
return rsrFld.Parse("FS_EVENT")
case utils.RequestType:

View File

@@ -43,8 +43,8 @@ func TestCgrCfgLoadWithDefaults(t *testing.T) {
"freeswitch_agent": {
"enabled": true, // starts SessionManager service: <true|false>
"event_socket_conns":[ // instantiate connections to multiple FreeSWITCH servers
{"address": "1.2.3.4:8021", "password": "ClueCon", "reconnects": 3, "alias":""},
{"address": "1.2.3.5:8021", "password": "ClueCon", "reconnects": 5, "alias":""}
{"address": "1.2.3.4:8021", "password": "ClueCon", "reconnects": 3, "alias":"123"},
{"address": "1.2.3.5:8021", "password": "ClueCon", "reconnects": 5, "alias":"124"}
],
},
@@ -55,8 +55,8 @@ func TestCgrCfgLoadWithDefaults(t *testing.T) {
}
eCgrCfg.fsAgentCfg.Enabled = true
eCgrCfg.fsAgentCfg.EventSocketConns = []*FsConnConfig{
&FsConnConfig{Address: "1.2.3.4:8021", Password: "ClueCon", Reconnects: 3, Alias: ""},
&FsConnConfig{Address: "1.2.3.5:8021", Password: "ClueCon", Reconnects: 5, Alias: ""},
&FsConnConfig{Address: "1.2.3.4:8021", Password: "ClueCon", Reconnects: 3, Alias: "123"},
&FsConnConfig{Address: "1.2.3.5:8021", Password: "ClueCon", Reconnects: 5, Alias: "124"},
}
if cgrCfg, err := NewCGRConfigFromJsonStringWithDefaults(JSN_CFG); err != nil {
t.Error(err)
@@ -714,7 +714,7 @@ func TestCgrCfgJSONDefaultsFsAgentConfig(t *testing.T) {
MaxWaitConnection: 2 * time.Second,
EventSocketConns: []*FsConnConfig{
&FsConnConfig{Address: "127.0.0.1:8021",
Password: "ClueCon", Reconnects: 5, Alias: ""}},
Password: "ClueCon", Reconnects: 5, Alias: "127.0.0.1:8021"}},
}
if !reflect.DeepEqual(cgrCfg.fsAgentCfg, eFsAgentCfg) {

View File

@@ -87,7 +87,7 @@ func (self *FsConnConfig) loadFromJsonCfg(jsnCfg *FsConnJsonCfg) error {
self.Reconnects = *jsnCfg.Reconnects
}
self.Alias = self.Address
if jsnCfg.Alias != nil {
if jsnCfg.Alias != nil && *jsnCfg.Alias != "" {
self.Alias = *jsnCfg.Alias
}

View File

@@ -81,6 +81,7 @@
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"debit_interval": "10s",
"channel_sync_interval":"5s",
},

View File

@@ -209,6 +209,14 @@ func KillProcName(procName string, waitMs int) error {
return nil
}
func ForceKillProcName(procName string, waitMs int) error {
if err := exec.Command("pkill", "-9", procName).Run(); err != nil {
return err
}
time.Sleep(time.Duration(waitMs) * time.Millisecond)
return nil
}
func CallScript(scriptPath string, subcommand string, waitMs int) error {
if err := exec.Command(scriptPath, subcommand).Run(); err != nil {
return err

View File

@@ -74,6 +74,7 @@ var sTestsCalls = []func(t *testing.T){
testCallCheckResourceRelease,
testCallCheckThreshold1001After,
testCallCheckThreshold1002After,
testCallSyncSessions,
//de completat
testCallStopPjsuaListener,
testCallStopCgrEngine,
@@ -114,26 +115,29 @@ func TestAsteriskCalls(t *testing.T) {
func testCallInitCfg(t *testing.T) {
// Init config first
var err error
if optConf == utils.Freeswitch {
switch optConf {
case utils.Freeswitch:
tutorialCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*fsConfig, "cgrates", "etc", "cgrates"))
if err != nil {
t.Error(err)
}
} else if optConf == utils.Kamailio {
case utils.Kamailio:
tutorialCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*kamConfig, "cgrates", "etc", "cgrates"))
if err != nil {
t.Error(err)
}
} else if optConf == utils.Opensips {
case utils.Opensips:
tutorialCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*oSipsConfig, "cgrates", "etc", "cgrates"))
if err != nil {
t.Error(err)
}
} else if optConf == utils.Asterisk {
case utils.Asterisk:
tutorialCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*ariConf, "cgrates", "etc", "cgrates"))
if err != nil {
t.Error(err)
}
default:
t.Error("Invalid option")
}
tutorialCallsCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
@@ -156,69 +160,82 @@ func testCallResetStorDb(t *testing.T) {
// start FS server
func testCallStartFS(t *testing.T) {
if optConf == utils.Freeswitch {
switch optConf {
case utils.Freeswitch:
engine.KillProcName(utils.Freeswitch, 5000)
if err := engine.CallScript(path.Join(*fsConfig, "freeswitch", "etc", "init.d", "freeswitch"), "start", 3000); err != nil {
t.Fatal(err)
}
} else if optConf == utils.Kamailio {
case utils.Kamailio:
engine.KillProcName(utils.Kamailio, 5000)
if err := engine.CallScript(path.Join(*kamConfig, "kamailio", "etc", "init.d", "kamailio"), "start", 3000); err != nil {
t.Fatal(err)
}
} else if optConf == utils.Opensips {
case utils.Opensips:
engine.KillProcName(utils.Kamailio, 5000)
if err := engine.CallScript(path.Join(*oSipsConfig, "opensips", "etc", "init.d", "opensips"), "start", 3000); err != nil {
t.Fatal(err)
}
} else if optConf == utils.Asterisk {
case utils.Asterisk:
engine.KillProcName(utils.Asterisk, 5000)
if err := engine.CallScript(path.Join(*ariConf, "asterisk", "etc", "init.d", "asterisk"), "start", 3000); err != nil {
t.Fatal(err)
}
default:
t.Error("Invalid option")
}
}
// Start CGR Engine
func testCallStartEngine(t *testing.T) {
engine.KillProcName("cgr-engine", *waitRater)
if optConf == utils.Freeswitch {
switch optConf {
case utils.Freeswitch:
if err := engine.CallScript(path.Join(*fsConfig, "cgrates", "etc", "init.d", "cgrates"), "start", 100); err != nil {
t.Fatal(err)
}
} else if optConf == utils.Kamailio {
case utils.Kamailio:
if err := engine.CallScript(path.Join(*kamConfig, "cgrates", "etc", "init.d", "cgrates"), "start", 100); err != nil {
t.Fatal(err)
}
} else if optConf == utils.Opensips {
case utils.Opensips:
if err := engine.CallScript(path.Join(*oSipsConfig, "cgrates", "etc", "init.d", "cgrates"), "start", 100); err != nil {
t.Fatal(err)
}
} else if optConf == utils.Asterisk {
case utils.Asterisk:
if err := engine.CallScript(path.Join(*ariConf, "cgrates", "etc", "init.d", "cgrates"), "start", 100); err != nil {
t.Fatal(err)
}
default:
t.Error("invalid option")
}
}
// Restart FS so we make sure reconnects are working
func testCallRestartFS(t *testing.T) {
if optConf == utils.Freeswitch {
if err := engine.CallScript(path.Join(*fsConfig, "freeswitch", "etc", "init.d", "freeswitch"), "restart", 5000); err != nil {
switch optConf {
case utils.Freeswitch:
engine.KillProcName(utils.Freeswitch, 5000)
if err := engine.CallScript(path.Join(*fsConfig, "freeswitch", "etc", "init.d", "freeswitch"), "restart", 3000); err != nil {
t.Fatal(err)
}
} else if optConf == utils.Kamailio {
if err := engine.CallScript(path.Join(*kamConfig, "kamailio", "etc", "init.d", "kamailio"), "restart", 5000); err != nil {
case utils.Kamailio:
engine.KillProcName(utils.Kamailio, 5000)
if err := engine.CallScript(path.Join(*kamConfig, "kamailio", "etc", "init.d", "kamailio"), "restart", 3000); err != nil {
t.Fatal(err)
}
} else if optConf == utils.Opensips {
if err := engine.CallScript(path.Join(*oSipsConfig, "opensips", "etc", "init.d", "opensips"), "restart", 5000); err != nil {
case utils.Opensips:
engine.KillProcName(utils.Kamailio, 5000)
if err := engine.CallScript(path.Join(*oSipsConfig, "opensips", "etc", "init.d", "opensips"), "restart", 3000); err != nil {
t.Fatal(err)
}
} else if optConf == utils.Asterisk {
if err := engine.CallScript(path.Join(*ariConf, "asterisk", "etc", "init.d", "asterisk"), "restart", 5000); err != nil {
case utils.Asterisk:
engine.KillProcName(utils.Asterisk, 5000)
if err := engine.CallScript(path.Join(*ariConf, "asterisk", "etc", "init.d", "asterisk"), "restart", 3000); err != nil {
t.Fatal(err)
}
default:
t.Error("Invalid option")
}
}
@@ -552,6 +569,62 @@ func testCallCheckThreshold1002After(t *testing.T) {
}
}
func testCallSyncSessions(t *testing.T) {
// 1001 call 1002 stop the call after 12 seconds
if err := engine.PjsuaCallUri(
&engine.PjsuaAccount{Id: "sip:1001@127.0.0.1", Username: "1001", Password: "CGRateS.org", Realm: "*"},
"sip:1002@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(67)*time.Second, 5071); err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Second)
// 1001 call 1003 stop the call after 11 seconds
if err := engine.PjsuaCallUri(
&engine.PjsuaAccount{Id: "sip:1001@127.0.0.1", Username: "1001", Password: "CGRateS.org", Realm: "*"},
"sip:1003@127.0.0.1", "sip:127.0.0.1:5080", time.Duration(60)*time.Second, 5073); err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Second)
// get active sessions
var reply *[]*sessions.ActiveSession
if err := tutorialCallsRpc.Call(utils.SessionSv1GetActiveSessions,
&map[string]string{}, &reply); err != nil {
t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error())
} else if len(*reply) != 2 {
t.Errorf("expecting 2, received reply: %+v", utils.ToJSON(reply))
}
time.Sleep(10 * time.Second)
//stop the FS
waitTime := time.Duration(tutorialCallsCfg.SessionSCfg().ChannelSyncInterval + 2*time.Second)
switch optConf {
case utils.Freeswitch:
engine.ForceKillProcName(utils.Freeswitch, int(waitTime.Seconds()*1e3))
case utils.Kamailio:
engine.ForceKillProcName(utils.Kamailio, int(waitTime.Seconds()*1e3))
case utils.Opensips:
engine.ForceKillProcName(utils.Opensips, int(waitTime.Seconds()*1e3))
case utils.Asterisk:
engine.ForceKillProcName(utils.Asterisk, int(waitTime.Seconds()*1e3))
default:
t.Errorf("Unsuported format")
}
// activeSessions shouldn't be active
if err := tutorialCallsRpc.Call(utils.SessionSv1GetActiveSessions,
&map[string]string{}, &reply); err.Error() != utils.ErrNotFound.Error() {
t.Error("Got error on SessionSv1.GetActiveSessions: ", err.Error())
}
// verify cdr
var rplCdrs []*engine.ExternalCDR
req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, Accounts: []string{"1001"}}
if err := tutorialCallsRpc.Call("ApierV2.GetCdrs", req, &rplCdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(rplCdrs) != 4 { // cdr from sync session + cdr from before
t.Error("Unexpected number of CDRs returned: ", len(rplCdrs))
}
}
func testCallStopPjsuaListener(t *testing.T) {
tutorialCallsPjSuaListener.Write([]byte("q\n")) // Close pjsua
time.Sleep(time.Duration(1) * time.Second) // Allow pjsua to finish it's tasks, eg un-REGISTER
@@ -564,13 +637,16 @@ func testCallStopCgrEngine(t *testing.T) {
}
func testCallStopFS(t *testing.T) {
if optConf == utils.Freeswitch {
engine.KillProcName(utils.Freeswitch, 1000)
} else if optConf == utils.Kamailio {
engine.KillProcName(utils.Kamailio, 1000)
} else if optConf == utils.Opensips {
engine.KillProcName(utils.Opensips, 1000)
} else if optConf == utils.Asterisk {
engine.KillProcName(utils.Asterisk, 1000)
switch optConf {
case utils.Freeswitch:
engine.ForceKillProcName(utils.Freeswitch, 1000)
case utils.Kamailio:
engine.ForceKillProcName(utils.Kamailio, 1000)
case utils.Opensips:
engine.ForceKillProcName(utils.Opensips, 1000)
case utils.Asterisk:
engine.ForceKillProcName(utils.Asterisk, 1000)
default:
t.Errorf("Unsuported format")
}
}

View File

@@ -2007,7 +2007,6 @@ func (smg *SMGeneric) syncSessions() {
rpcClnts = append(rpcClnts, conn)
}
queriedCGRIDs := make(utils.StringMap)
utils.Logger.Info("Enter on sync --------------")
for _, conn := range rpcClnts {
var queriedSessionIDs []*SessionID
if conn != nil {
@@ -2017,28 +2016,23 @@ func (smg *SMGeneric) syncSessions() {
fmt.Sprintf("error quering session ids : %+v", err))
continue
}
utils.Logger.Info(fmt.Sprintf("queriedSessionIDs : %+v", utils.ToJSON(queriedSessionIDs)))
for _, sessionID := range queriedSessionIDs {
queriedCGRIDs[sessionID.CGRID()] = true
}
}
}
utils.Logger.Info(fmt.Sprintf("queriedCGRIDs : %+v", queriedCGRIDs))
smg.aSessionsMux.RLock()
utils.Logger.Info(fmt.Sprintf("smg.activeSessions : %+v", smg.activeSessions))
for cgrid, _ := range smg.activeSessions {
for cgrid, smgSessions := range smg.activeSessions {
if _, has := queriedCGRIDs[cgrid]; has {
utils.Logger.Info("gaseste CGRID")
continue
}
utils.Logger.Info("nu gaseste CGRID")
// for _, session := range smgSessions {
// tmtr := &smgSessionTerminator{
// ttlLastUsed: &session.LastUsage,
// ttlUsage: &session.TotalUsage,
// }
// smg.ttlTerminate(session, tmtr)
// }
for _, session := range smgSessions {
tmtr := &smgSessionTerminator{
ttlLastUsed: &session.LastUsage,
ttlUsage: &session.TotalUsage,
}
smg.ttlTerminate(session, tmtr)
}
}
smg.aSessionsMux.RUnlock()
}