Finish replication test for sessions

This commit is contained in:
TeoV
2019-02-08 10:37:14 -05:00
committed by Dan Christian Bogos
parent d57be00595
commit 20872bc18a
5 changed files with 113 additions and 38 deletions

View File

@@ -39,6 +39,12 @@
"sessions": {
"enabled": true, // starts SessionManager service: <true|false>
"listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests
"session_replication_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"},
],
"rals_conns": [
{"address": "127.0.0.1:22012", "transport": "*json"},
],
},
}

View File

@@ -193,6 +193,7 @@ type SRun struct {
// Clone returns the cloned version of SRun
func (sr *SRun) Clone() *SRun {
return &SRun{
Event: sr.Event.Clone(),
CD: sr.CD.Clone(),
EventCost: sr.EventCost.Clone(),
ExtraDuration: sr.ExtraDuration,

View File

@@ -662,7 +662,8 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplC
}
ss := sS.getSessions(cgrID, psv)
if len(ss) == 0 {
ss = []*Session{&Session{CGRID: cgrID}} // session scheduled to be removed from remote
// session scheduled to be removed from remote (initiate also the EventStart to avoid the panic)
ss = []*Session{&Session{CGRID: cgrID, EventStart: engine.NewSafEvent(nil)}}
}
var wg sync.WaitGroup
for _, rplConn := range sS.sReplConns {
@@ -674,7 +675,7 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplC
sCln := s.Clone()
var rply string
if err := conn.Call(utils.SessionSv1SetPassiveSession,
s.Clone(), &rply); err != nil {
sCln, &rply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> cannot replicate session with id <%s>, err: %s",
utils.SessionS, sCln.CGRID, err.Error()))
@@ -1195,6 +1196,7 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage
var maxUsageSet bool // so we know if we have set the 0 on purpose
prepaidReqs := []string{utils.META_PREPAID, utils.META_PSEUDOPREPAID}
for i, sr := range s.SRuns {
var rplyMaxUsage time.Duration
if !utils.IsSliceMember(prepaidReqs,
sr.Event.GetStringIgnoreErrors(utils.RequestType)) {
@@ -1216,6 +1218,9 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage
// endSession will end a session from outside
func (sS *SessionS) endSession(s *Session, tUsage *time.Duration) (err error) {
//check if we have replicate connection and close the session there
defer sS.replicateSessions(s.CGRID, true, sS.sReplConns)
s.Lock() // no need to release it untill end since the session should be anyway closed
sS.unregisterSession(s.CGRID, false)
for sRunIdx, sr := range s.SRuns {
@@ -1416,6 +1421,12 @@ func (sS *SessionS) BiRPCv1SetPassiveSession(clnt rpcclient.RpcClientConnection,
return utils.ErrServerError
}
} else {
//if we have an active session with the same CGRID
//we unregister it first then regiser the new one
if len(sS.getSessions(s.CGRID, false)) != 0 {
sS.unregisterSession(s.CGRID, false)
}
sS.registerSession(s, true)
}
*reply = utils.OK
@@ -2111,28 +2122,6 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection,
me.GetDurationPtrIgnoreErrors(utils.Usage)); err != nil {
return utils.NewErrRALs(err)
}
//check if we have replicate connection and close the session there
if len(sS.sReplConns) != 0 {
var wg sync.WaitGroup
for _, rplConn := range sS.sReplConns {
if rplConn.Synchronous {
wg.Add(1)
}
go func(conn rpcclient.RpcClientConnection, sync bool, s *Session) {
var rply string
if err := conn.Call(utils.SessionSv1TerminateSession,
args, &rply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> cannot termainte session with id <%s>, err: %s",
utils.SessionS, s.Clone().CGRID, err.Error()))
}
if sync {
wg.Done()
}
}(rplConn.Connection, rplConn.Synchronous, s)
}
wg.Wait() // wait for synchronous replication to finish
}
}
if args.ReleaseResources {
if sS.resS == nil {

View File

@@ -0,0 +1,5 @@
package sessions
//terminate pentru o sesiune pasiva
//pentru update fara sesiune si terminate fara sesiune

View File

@@ -156,7 +156,7 @@ func TestSessionSRplInitiate(t *testing.T) {
} else if len(aSessions) != 1 {
t.Errorf("Unexpected number of sessions received: %+v", utils.ToIJSON(aSessions))
} else if aSessions[0].Usage != time.Duration(90)*time.Second {
t.Errorf("Received usage: %v", aSessions[0].Usage)
t.Errorf("Expecting : %+v, received: %+v", time.Duration(90)*time.Second, aSessions[0].Usage)
}
//check if the session was created as passive session on slave
@@ -167,7 +167,79 @@ func TestSessionSRplInitiate(t *testing.T) {
} else if len(pSessions) != 1 {
t.Errorf("PassiveSessions: %+v", pSessions)
} else if pSessions[0].Usage != time.Duration(90*time.Second) {
t.Errorf("Expecting : %+v, received: %+v", time.Duration(90)*time.Second, pSessions[0].Usage)
}
}
func TestSessionSRplUpdate(t *testing.T) {
//update the session on slave so the session should became active
usage := time.Duration(1 * time.Minute)
argsUpdate := &V1UpdateSessionArgs{
UpdateSession: true,
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSessionSRplUpdate",
Event: map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.Tenant: "cgrates.org",
utils.OriginID: "123451",
utils.ToR: utils.VOICE,
utils.RequestType: utils.META_PREPAID,
utils.Account: "1001",
utils.Subject: "1001",
utils.Destination: "1004",
utils.Category: "call",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC),
utils.Usage: usage,
},
},
}
var updtRpl V1UpdateSessionReply
if err := smgRplcSlvRPC.Call(utils.SessionSv1UpdateSession,
argsUpdate, &updtRpl); err != nil {
t.Error(err)
}
if *updtRpl.MaxUsage != usage {
t.Errorf("Expecting : %+v, received: %+v", usage, *updtRpl.MaxUsage)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
var aSessions []*ActiveSession
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions,
map[string]string{utils.OriginID: "123451"}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 1 {
t.Errorf("Unexpected number of sessions received: %+v", aSessions)
} else if aSessions[0].Usage != time.Duration(150)*time.Second {
t.Errorf("Expecting : %+v, received: %+v", time.Duration(150)*time.Second, aSessions[0].Usage)
}
var pSessions []*ActiveSession
// Make sure we don't have passive session on active host
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil,
&pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
// Master should not longer have activeSession
if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions,
map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Error: %v with len(aSessions)=%v , session : %+v", err, len(aSessions), utils.ToJSON(aSessions))
}
cgrID := GetSetCGRID(engine.NewSafEvent(argsUpdate.Event))
// Make sure session was replicated
if err := smgRplcMstrRPC.Call(utils.SessionSv1GetPassiveSessions,
nil, &pSessions); err != nil {
t.Error(err)
} else if len(pSessions) != 1 {
t.Errorf("PassiveSessions: %+v", pSessions)
} else if pSessions[0].CGRID != cgrID {
t.Errorf("PassiveSession: %+v", pSessions[0])
} else if pSessions[0].Usage != time.Duration(150*time.Second) {
t.Errorf("Expecting : %+v, received: %+v", time.Duration(150)*time.Second, pSessions[0].Usage)
}
}
@@ -189,7 +261,7 @@ func TestSessionSRplTerminate(t *testing.T) {
utils.Category: "call",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC),
utils.Usage: time.Duration(1*time.Minute + 30*time.Second),
utils.Usage: time.Duration(2*time.Minute + 30*time.Second),
},
},
}
@@ -203,12 +275,21 @@ func TestSessionSRplTerminate(t *testing.T) {
if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions,
map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err, aSessions)
t.Errorf("Error: %v with len(aSessions)=%v , session : %+v", err, len(aSessions), utils.ToIJSON(aSessions))
}
var pSessions []*ActiveSession
//check if the session was terminated on slave
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions,
nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions,
nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Error: %v with len(aSessions)=%v , session : %+v", err, len(aSessions), utils.ToIJSON(aSessions))
}
// check to don't have passive session on master and slave
var pSessions []*ActiveSession
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil,
&pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Error: %v with len(pSessions)=%v , session : %+v", err, len(pSessions), utils.ToIJSON(pSessions))
}
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil,
&pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Error: %v with len(pSessions)=%v , session : %+v", err, len(pSessions), utils.ToIJSON(pSessions))
}
}
@@ -335,14 +416,6 @@ func TestSessionSRplManualReplicate(t *testing.T) {
if err := smgRplcSlvRPC.Call("Responder.Status", "", &status); err != nil { // slave should be still operational
t.Error(err)
}
// check if the passive sessions from slave became active
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 2 {
t.Errorf("Unexpected number of sessions received: %+v", utils.ToJSON(aSessions))
} else if aSessions[0].Usage != time.Duration(90)*time.Second && aSessions[1].Usage != time.Duration(90)*time.Second {
t.Errorf("Received usage: %v", aSessions[0].Usage)
}
// start master
if _, err := engine.StartEngine(smgRplcMasterCfgPath, *waitRater); err != nil {
t.Fatal(err)
@@ -359,6 +432,7 @@ func TestSessionSRplManualReplicate(t *testing.T) {
}
// recover passive sessions from slave
argsRepl = ArgsReplicateSessions{
Passive: true,
Connections: []*config.HaPoolConfig{
{
Address: smgRplcMasterCfg.ListenCfg().RPCJSONListen,