diff --git a/data/conf/samples/smgreplcslave/cgrates.json b/data/conf/samples/smgreplcslave/cgrates.json index cdb3011cf..1f286dddb 100644 --- a/data/conf/samples/smgreplcslave/cgrates.json +++ b/data/conf/samples/smgreplcslave/cgrates.json @@ -39,6 +39,12 @@ "sessions": { "enabled": true, // starts SessionManager service: "listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests + "session_replication_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"}, + ], + "rals_conns": [ + {"address": "127.0.0.1:22012", "transport": "*json"}, + ], }, } diff --git a/sessions/session.go b/sessions/session.go index bcd5997b4..77d9ed8a1 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -193,6 +193,7 @@ type SRun struct { // Clone returns the cloned version of SRun func (sr *SRun) Clone() *SRun { return &SRun{ + Event: sr.Event.Clone(), CD: sr.CD.Clone(), EventCost: sr.EventCost.Clone(), ExtraDuration: sr.ExtraDuration, diff --git a/sessions/sessions.go b/sessions/sessions.go index 353fd3130..7e7ba3256 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -662,7 +662,8 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplC } ss := sS.getSessions(cgrID, psv) if len(ss) == 0 { - ss = []*Session{&Session{CGRID: cgrID}} // session scheduled to be removed from remote + // session scheduled to be removed from remote (initiate also the EventStart to avoid the panic) + ss = []*Session{&Session{CGRID: cgrID, EventStart: engine.NewSafEvent(nil)}} } var wg sync.WaitGroup for _, rplConn := range sS.sReplConns { @@ -674,7 +675,7 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplC sCln := s.Clone() var rply string if err := conn.Call(utils.SessionSv1SetPassiveSession, - s.Clone(), &rply); err != nil { + sCln, &rply); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> cannot replicate session with id <%s>, err: %s", utils.SessionS, sCln.CGRID, err.Error())) @@ -1195,6 +1196,7 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage var maxUsageSet bool // so we know if we have set the 0 on purpose prepaidReqs := []string{utils.META_PREPAID, utils.META_PSEUDOPREPAID} for i, sr := range s.SRuns { + var rplyMaxUsage time.Duration if !utils.IsSliceMember(prepaidReqs, sr.Event.GetStringIgnoreErrors(utils.RequestType)) { @@ -1216,6 +1218,9 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage // endSession will end a session from outside func (sS *SessionS) endSession(s *Session, tUsage *time.Duration) (err error) { + //check if we have replicate connection and close the session there + defer sS.replicateSessions(s.CGRID, true, sS.sReplConns) + s.Lock() // no need to release it untill end since the session should be anyway closed sS.unregisterSession(s.CGRID, false) for sRunIdx, sr := range s.SRuns { @@ -1416,6 +1421,12 @@ func (sS *SessionS) BiRPCv1SetPassiveSession(clnt rpcclient.RpcClientConnection, return utils.ErrServerError } } else { + //if we have an active session with the same CGRID + //we unregister it first then regiser the new one + if len(sS.getSessions(s.CGRID, false)) != 0 { + sS.unregisterSession(s.CGRID, false) + + } sS.registerSession(s, true) } *reply = utils.OK @@ -2111,28 +2122,6 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, me.GetDurationPtrIgnoreErrors(utils.Usage)); err != nil { return utils.NewErrRALs(err) } - //check if we have replicate connection and close the session there - if len(sS.sReplConns) != 0 { - var wg sync.WaitGroup - for _, rplConn := range sS.sReplConns { - if rplConn.Synchronous { - wg.Add(1) - } - go func(conn rpcclient.RpcClientConnection, sync bool, s *Session) { - var rply string - if err := conn.Call(utils.SessionSv1TerminateSession, - args, &rply); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> cannot termainte session with id <%s>, err: %s", - utils.SessionS, s.Clone().CGRID, err.Error())) - } - if sync { - wg.Done() - } - }(rplConn.Connection, rplConn.Synchronous, s) - } - wg.Wait() // wait for synchronous replication to finish - } } if args.ReleaseResources { if sS.resS == nil { diff --git a/sessions/sessions_it_test.go b/sessions/sessions_it_test.go new file mode 100644 index 000000000..33008b3f2 --- /dev/null +++ b/sessions/sessions_it_test.go @@ -0,0 +1,5 @@ +package sessions + +//terminate pentru o sesiune pasiva + +//pentru update fara sesiune si terminate fara sesiune diff --git a/sessions/sessions_rpl_it_test.go b/sessions/sessions_rpl_it_test.go index 45618da1a..4013a4245 100644 --- a/sessions/sessions_rpl_it_test.go +++ b/sessions/sessions_rpl_it_test.go @@ -156,7 +156,7 @@ func TestSessionSRplInitiate(t *testing.T) { } else if len(aSessions) != 1 { t.Errorf("Unexpected number of sessions received: %+v", utils.ToIJSON(aSessions)) } else if aSessions[0].Usage != time.Duration(90)*time.Second { - t.Errorf("Received usage: %v", aSessions[0].Usage) + t.Errorf("Expecting : %+v, received: %+v", time.Duration(90)*time.Second, aSessions[0].Usage) } //check if the session was created as passive session on slave @@ -167,7 +167,79 @@ func TestSessionSRplInitiate(t *testing.T) { } else if len(pSessions) != 1 { t.Errorf("PassiveSessions: %+v", pSessions) } else if pSessions[0].Usage != time.Duration(90*time.Second) { + t.Errorf("Expecting : %+v, received: %+v", time.Duration(90)*time.Second, pSessions[0].Usage) + } +} + +func TestSessionSRplUpdate(t *testing.T) { + //update the session on slave so the session should became active + usage := time.Duration(1 * time.Minute) + argsUpdate := &V1UpdateSessionArgs{ + UpdateSession: true, + CGREvent: utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSessionSRplUpdate", + Event: map[string]interface{}{ + utils.EVENT_NAME: "TEST_EVENT", + utils.Tenant: "cgrates.org", + utils.OriginID: "123451", + utils.ToR: utils.VOICE, + utils.RequestType: utils.META_PREPAID, + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1004", + utils.Category: "call", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: usage, + }, + }, + } + var updtRpl V1UpdateSessionReply + if err := smgRplcSlvRPC.Call(utils.SessionSv1UpdateSession, + argsUpdate, &updtRpl); err != nil { + t.Error(err) + } + if *updtRpl.MaxUsage != usage { + t.Errorf("Expecting : %+v, received: %+v", usage, *updtRpl.MaxUsage) + } + + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated + var aSessions []*ActiveSession + if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions, + map[string]string{utils.OriginID: "123451"}, &aSessions); err != nil { + t.Error(err) + } else if len(aSessions) != 1 { + t.Errorf("Unexpected number of sessions received: %+v", aSessions) + } else if aSessions[0].Usage != time.Duration(150)*time.Second { + t.Errorf("Expecting : %+v, received: %+v", time.Duration(150)*time.Second, aSessions[0].Usage) + } + + var pSessions []*ActiveSession + // Make sure we don't have passive session on active host + if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil, + &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + + // Master should not longer have activeSession + if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions, + map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Error: %v with len(aSessions)=%v , session : %+v", err, len(aSessions), utils.ToJSON(aSessions)) + } + + cgrID := GetSetCGRID(engine.NewSafEvent(argsUpdate.Event)) + // Make sure session was replicated + if err := smgRplcMstrRPC.Call(utils.SessionSv1GetPassiveSessions, + nil, &pSessions); err != nil { + t.Error(err) + } else if len(pSessions) != 1 { + t.Errorf("PassiveSessions: %+v", pSessions) + } else if pSessions[0].CGRID != cgrID { t.Errorf("PassiveSession: %+v", pSessions[0]) + } else if pSessions[0].Usage != time.Duration(150*time.Second) { + t.Errorf("Expecting : %+v, received: %+v", time.Duration(150)*time.Second, pSessions[0].Usage) } } @@ -189,7 +261,7 @@ func TestSessionSRplTerminate(t *testing.T) { utils.Category: "call", utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), - utils.Usage: time.Duration(1*time.Minute + 30*time.Second), + utils.Usage: time.Duration(2*time.Minute + 30*time.Second), }, }, } @@ -203,12 +275,21 @@ func TestSessionSRplTerminate(t *testing.T) { if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions, map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err, aSessions) + t.Errorf("Error: %v with len(aSessions)=%v , session : %+v", err, len(aSessions), utils.ToIJSON(aSessions)) } - var pSessions []*ActiveSession //check if the session was terminated on slave - if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, - nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions, + nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Error: %v with len(aSessions)=%v , session : %+v", err, len(aSessions), utils.ToIJSON(aSessions)) + } + // check to don't have passive session on master and slave + var pSessions []*ActiveSession + if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil, + &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Error: %v with len(pSessions)=%v , session : %+v", err, len(pSessions), utils.ToIJSON(pSessions)) + } + if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil, + &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Errorf("Error: %v with len(pSessions)=%v , session : %+v", err, len(pSessions), utils.ToIJSON(pSessions)) } } @@ -335,14 +416,6 @@ func TestSessionSRplManualReplicate(t *testing.T) { if err := smgRplcSlvRPC.Call("Responder.Status", "", &status); err != nil { // slave should be still operational t.Error(err) } - // check if the passive sessions from slave became active - if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil { - t.Error(err) - } else if len(aSessions) != 2 { - t.Errorf("Unexpected number of sessions received: %+v", utils.ToJSON(aSessions)) - } else if aSessions[0].Usage != time.Duration(90)*time.Second && aSessions[1].Usage != time.Duration(90)*time.Second { - t.Errorf("Received usage: %v", aSessions[0].Usage) - } // start master if _, err := engine.StartEngine(smgRplcMasterCfgPath, *waitRater); err != nil { t.Fatal(err) @@ -359,6 +432,7 @@ func TestSessionSRplManualReplicate(t *testing.T) { } // recover passive sessions from slave argsRepl = ArgsReplicateSessions{ + Passive: true, Connections: []*config.HaPoolConfig{ { Address: smgRplcMasterCfg.ListenCfg().RPCJSONListen,