From d57be005954b21e37e210fcd34bed0b8e0812264 Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 7 Feb 2019 09:46:18 -0500 Subject: [PATCH] Almost finishing replication test --- apier/v1/sessions.go | 5 + apier/v1/sessionsbirpc.go | 6 + data/conf/samples/smgreplcmaster/cgrates.json | 1 + data/conf/samples/smgreplcslave/cgrates.json | 8 +- sessions/sessions.go | 23 +++ sessions/sessions_rpl_it_test.go | 190 ++++++------------ utils/consts.go | 2 +- 7 files changed, 102 insertions(+), 133 deletions(-) diff --git a/apier/v1/sessions.go b/apier/v1/sessions.go index 23300c9cd..e0fb94ba3 100644 --- a/apier/v1/sessions.go +++ b/apier/v1/sessions.go @@ -109,3 +109,8 @@ func (ssv1 *SessionSv1) Ping(ign string, reply *string) error { func (ssv1 *SessionSv1) ReplicateSessions(args sessions.ArgsReplicateSessions, rply *string) error { return ssv1.Ss.BiRPCv1ReplicateSessions(nil, args, rply) } + +func (ssv1 *SessionSv1) SetPassiveSession(args *sessions.Session, + reply *string) error { + return ssv1.Ss.BiRPCv1SetPassiveSession(nil, args, reply) +} diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index 942fb0b00..0266a1c20 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -47,6 +47,7 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} { utils.SessionSv1Ping: ssv1.BiRPCPing, utils.SessionSv1ReplicateSessions: ssv1.BiRPCv1ReplicateSessions, + utils.SessionSv1SetPassiveSession: ssv1.BiRPCv1SetPassiveSession, } } @@ -132,3 +133,8 @@ func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(clnt *rpc2.Client, args sessions.ArgsReplicateSessions, reply *string) error { return ssv1.BiRPCv1ReplicateSessions(clnt, args, reply) } + +func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(clnt *rpc2.Client, + args *sessions.Session, reply *string) error { + return ssv1.BiRPCv1SetPassiveSession(clnt, args, reply) +} diff --git a/data/conf/samples/smgreplcmaster/cgrates.json b/data/conf/samples/smgreplcmaster/cgrates.json index 51486131d..b109fb62e 100644 --- a/data/conf/samples/smgreplcmaster/cgrates.json +++ b/data/conf/samples/smgreplcmaster/cgrates.json @@ -3,6 +3,7 @@ // Copyright (C) ITsysCOM GmbH "general": { "log_level": 7, + "node_id":"MasterReplication", }, "listen": { diff --git a/data/conf/samples/smgreplcslave/cgrates.json b/data/conf/samples/smgreplcslave/cgrates.json index fa744562a..cdb3011cf 100644 --- a/data/conf/samples/smgreplcslave/cgrates.json +++ b/data/conf/samples/smgreplcslave/cgrates.json @@ -1,7 +1,10 @@ { // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments // Copyright (C) ITsysCOM GmbH - +"general": { + "log_level": 7, + "node_id":"SlaveReplication", +}, "listen": { "rpc_json": "127.0.0.1:22012", // RPC JSON listening address @@ -36,9 +39,6 @@ "sessions": { "enabled": true, // starts SessionManager service: "listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests - "session_replication_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"}, - ], }, } diff --git a/sessions/sessions.go b/sessions/sessions.go index bfc73566d..353fd3130 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -987,6 +987,7 @@ func (sS *SessionS) getSessions(cgrID string, pSessions bool) (ss []*Session) { var i int for _, s := range ssMp { ss[i] = s + i++ } return } @@ -2110,6 +2111,28 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, me.GetDurationPtrIgnoreErrors(utils.Usage)); err != nil { return utils.NewErrRALs(err) } + //check if we have replicate connection and close the session there + if len(sS.sReplConns) != 0 { + var wg sync.WaitGroup + for _, rplConn := range sS.sReplConns { + if rplConn.Synchronous { + wg.Add(1) + } + go func(conn rpcclient.RpcClientConnection, sync bool, s *Session) { + var rply string + if err := conn.Call(utils.SessionSv1TerminateSession, + args, &rply); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> cannot termainte session with id <%s>, err: %s", + utils.SessionS, s.Clone().CGRID, err.Error())) + } + if sync { + wg.Done() + } + }(rplConn.Connection, rplConn.Synchronous, s) + } + wg.Wait() // wait for synchronous replication to finish + } } if args.ReleaseResources { if sS.resS == nil { diff --git a/sessions/sessions_rpl_it_test.go b/sessions/sessions_rpl_it_test.go index 13f544287..45618da1a 100644 --- a/sessions/sessions_rpl_it_test.go +++ b/sessions/sessions_rpl_it_test.go @@ -29,7 +29,6 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) var smgRplcMasterCfgPath, smgRplcSlaveCfgPath string @@ -87,47 +86,41 @@ func TestSessionSRplTPFromFolder(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups + + //add a default charger + chargerProfile := &engine.ChargerProfile{ + Tenant: "cgrates.org", + ID: "Default", + RunID: "*default", + AttributeIDs: []string{"*none"}, + Weight: 20, + } + var result string + if err := smgRplcMstrRPC.Call("ApierV1.SetChargerProfile", chargerProfile, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } } func TestSessionSRplInitiate(t *testing.T) { - var pSessions []*ActiveSession - if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, - nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + var aSessions []*ActiveSession + //make sure we don't have active sessions on master and passive on slave + if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions, + nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - args := &V1TerminateSessionArgs{ - TerminateSession: true, - CGREvent: utils.CGREvent{ - Tenant: "cgrates.org", - ID: "TestSSv1ItTerminateSession", - Event: map[string]interface{}{ - utils.EVENT_NAME: "TEST_EVENT", - utils.Tenant: "cgrates.org", - utils.OriginID: "123451", - utils.ToR: utils.VOICE, - utils.RequestType: utils.META_PREPAID, - utils.Account: "1001", - utils.Subject: "1001", - utils.Destination: "1004", - utils.Category: "call", - utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), - utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), - utils.Usage: 1*time.Minute + 30*time.Second, - }, - }, - } - var reply string - if err := smgRplcMstrRPC.Call(utils.SessionSv1TerminateSession, - args, &reply); err == nil || - err.Error() != rpcclient.ErrSessionNotFound.Error() { // Update should return rpcclient.ErrSessionNotFound + if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, + nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } + usage := time.Duration(1*time.Minute + 30*time.Second) argsInit := &V1InitSessionArgs{ InitSession: true, CGREvent: utils.CGREvent{ Tenant: "cgrates.org", - ID: "TestSSv1ItAuth", + ID: "TestSessionSRplInitiate", Event: map[string]interface{}{ utils.EVENT_NAME: "TEST_EVENT", utils.Tenant: "cgrates.org", @@ -140,29 +133,34 @@ func TestSessionSRplInitiate(t *testing.T) { utils.Category: "call", utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), - utils.Usage: 1*time.Minute + 30*time.Second, + utils.Usage: usage, }, }, } - var initRpl *V1InitSessionReply + var initRpl V1InitSessionReply if err := smgRplcMstrRPC.Call(utils.SessionSv1InitiateSession, argsInit, &initRpl); err != nil { t.Error(err) } - if initRpl.MaxUsage != utils.DurationPointer(time.Duration(90*time.Second)) { - t.Error("Bad max usage: ", initRpl.MaxUsage) + //compare the value + if *initRpl.MaxUsage != usage { + t.Errorf("Expecting : %+v, received: %+v", usage, initRpl.MaxUsage) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated - var aSessions []*ActiveSession + + //check if the session was createad as active session on master if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions, map[string]string{utils.OriginID: "123451"}, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 { - t.Errorf("Unexpected number of sessions received: %+v", aSessions) + t.Errorf("Unexpected number of sessions received: %+v", utils.ToIJSON(aSessions)) } else if aSessions[0].Usage != time.Duration(90)*time.Second { t.Errorf("Received usage: %v", aSessions[0].Usage) } + + //check if the session was created as passive session on slave + var pSessions []*ActiveSession if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, map[string]string{utils.OriginID: "123451"}, &pSessions); err != nil { t.Error(err) @@ -173,79 +171,12 @@ func TestSessionSRplInitiate(t *testing.T) { } } -// Update on slave -func TestSessionSRplUpdate(t *testing.T) { - args := &V1UpdateSessionArgs{ - UpdateSession: true, - CGREvent: utils.CGREvent{ - Tenant: "cgrates.org", - ID: "TestSSv1ItUpdateSession", - Event: map[string]interface{}{ - utils.EVENT_NAME: "TEST_EVENT", - utils.Tenant: "cgrates.org", - utils.OriginID: "123451", - utils.ToR: utils.VOICE, - utils.RequestType: utils.META_PREPAID, - utils.Account: "1001", - utils.Subject: "1001", - utils.Destination: "1004", - utils.Category: "call", - utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), - utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), - utils.Usage: 1 * time.Minute, - }, - }, - } - - var rply V1UpdateSessionReply - if err := smgRplcSlvRPC.Call(utils.SessionSv1UpdateSession, - args, &rply); err != nil { - t.Error(err) - } else if rply.MaxUsage != utils.DurationPointer(time.Duration(time.Minute)) { - t.Error("Bad max usage: ", rply.MaxUsage) - } - time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated - var aSessions []*ActiveSession - if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions, - map[string]string{utils.OriginID: "123451"}, &aSessions); err != nil { - t.Error(err) - } else if len(aSessions) != 1 { - t.Errorf("Unexpected number of sessions received: %+v", aSessions) - } else if aSessions[0].Usage != time.Duration(150)*time.Second { - t.Errorf("Received usage: %v", aSessions[0].Usage) - } - var pSessions []*ActiveSession - // Make sure we don't have passive session on active host - if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil, - &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } - // Master should not longer have activeSession - if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions, - map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil || - err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } - cgrID := GetSetCGRID(engine.NewSafEvent(args.CGREvent.Event)) - // Make sure session was replicated - if err := smgRplcMstrRPC.Call(utils.SessionSv1GetPassiveSessions, - nil, &pSessions); err != nil { - t.Error(err) - } else if len(pSessions) != 1 { - t.Errorf("PassiveSessions: %+v", pSessions) - } else if pSessions[0].CGRID != cgrID { - t.Errorf("PassiveSession: %+v", pSessions[0]) - } else if pSessions[0].Usage != time.Duration(150*time.Second) { - t.Errorf("PassiveSession: %+v", pSessions[0]) - } -} - func TestSessionSRplTerminate(t *testing.T) { args := &V1TerminateSessionArgs{ TerminateSession: true, CGREvent: utils.CGREvent{ Tenant: "cgrates.org", - ID: "TestSSv1ItTerminateSession", + ID: "TestSessionSRplTerminate", Event: map[string]interface{}{ utils.EVENT_NAME: "TEST_EVENT", utils.Tenant: "cgrates.org", @@ -258,7 +189,7 @@ func TestSessionSRplTerminate(t *testing.T) { utils.Category: "call", utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), - utils.Usage: 3 * time.Minute, + utils.Usage: time.Duration(1*time.Minute + 30*time.Second), }, }, } @@ -268,24 +199,17 @@ func TestSessionSRplTerminate(t *testing.T) { } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated var aSessions []*ActiveSession + //check if the session was terminated on master if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions, map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err, aSessions) } - if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions, - map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil || - err.Error() != utils.ErrNotFound.Error() { - t.Error(err, aSessions) - } - var pSessions map[string][]*Session - if err := smgRplcMstrRPC.Call(utils.SessionSv1GetPassiveSessions, - nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } + var pSessions []*ActiveSession + //check if the session was terminated on slave if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) + t.Errorf("Error: %v with len(pSessions)=%v , session : %+v", err, len(pSessions), utils.ToIJSON(pSessions)) } } @@ -297,7 +221,7 @@ func TestSessionSRplManualReplicate(t *testing.T) { if smgRplcMstrRPC, err = jsonrpc.Dial("tcp", smgRplcMasterCfg.ListenCfg().RPCJSONListen); err != nil { t.Fatal(err) } - + // create two sessions argsInit1 := &V1InitSessionArgs{ InitSession: true, CGREvent: utils.CGREvent{ @@ -324,15 +248,15 @@ func TestSessionSRplManualReplicate(t *testing.T) { InitSession: true, CGREvent: utils.CGREvent{ Tenant: "cgrates.org", - ID: "TestSSv1ItAuth", + ID: "TestSSv1ItAuth2", Event: map[string]interface{}{ utils.EVENT_NAME: "TEST_EVENT", utils.Tenant: "cgrates.org", utils.OriginID: "123481", utils.ToR: utils.VOICE, utils.RequestType: utils.META_PREPAID, - utils.Account: "utils", - utils.Subject: "utils", + utils.Account: "1001", + utils.Subject: "1001", utils.Destination: "1005", utils.Category: "call", utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), @@ -347,17 +271,17 @@ func TestSessionSRplManualReplicate(t *testing.T) { if err := smgRplcMstrRPC.Call(utils.SessionSv1InitiateSession, args, &initRpl); err != nil { t.Error(err) } - if initRpl.MaxUsage != utils.DurationPointer(time.Duration(90*time.Second)) { + if *initRpl.MaxUsage != time.Duration(90*time.Second) { t.Error("Bad max usage: ", initRpl.MaxUsage) } } - time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated + //verify if the sessions was created on master and are active var aSessions []*ActiveSession if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 2 { - t.Errorf("Unexpected number of sessions received: %+v", aSessions) - } else if aSessions[0].Usage != time.Duration(90)*time.Second { + t.Errorf("Unexpected number of sessions received: %+v", utils.ToJSON(aSessions)) + } else if aSessions[0].Usage != time.Duration(90)*time.Second && aSessions[1].Usage != time.Duration(90)*time.Second { t.Errorf("Received usage: %v", aSessions[0].Usage) } // Start slave, should not have any active session at beginning @@ -374,7 +298,9 @@ func TestSessionSRplManualReplicate(t *testing.T) { if smgRplcSlvRPC, err = jsonrpc.Dial("tcp", smgRplcSlaveCfg.ListenCfg().RPCJSONListen); err != nil { t.Fatal(err) } - if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + // when we start slave after master we expect to don't have sessions + if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil, &aSessions); err == nil || + err.Error() != utils.ErrNotFound.Error() { t.Error(err, aSessions) } argsRepl := ArgsReplicateSessions{ @@ -383,7 +309,9 @@ func TestSessionSRplManualReplicate(t *testing.T) { Address: smgRplcSlaveCfg.ListenCfg().RPCJSONListen, Transport: utils.MetaJSONrpc, Synchronous: true}, - }} + }, + } + //replicate manually the session from master to slave var repply string if err := smgRplcMstrRPC.Call(utils.SessionSv1ReplicateSessions, argsRepl, &repply); err != nil { t.Error(err) @@ -407,6 +335,14 @@ func TestSessionSRplManualReplicate(t *testing.T) { if err := smgRplcSlvRPC.Call("Responder.Status", "", &status); err != nil { // slave should be still operational t.Error(err) } + // check if the passive sessions from slave became active + if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil { + t.Error(err) + } else if len(aSessions) != 2 { + t.Errorf("Unexpected number of sessions received: %+v", utils.ToJSON(aSessions)) + } else if aSessions[0].Usage != time.Duration(90)*time.Second && aSessions[1].Usage != time.Duration(90)*time.Second { + t.Errorf("Received usage: %v", aSessions[0].Usage) + } // start master if _, err := engine.StartEngine(smgRplcMasterCfgPath, *waitRater); err != nil { t.Fatal(err) @@ -423,7 +359,6 @@ func TestSessionSRplManualReplicate(t *testing.T) { } // recover passive sessions from slave argsRepl = ArgsReplicateSessions{ - Passive: true, Connections: []*config.HaPoolConfig{ { Address: smgRplcMasterCfg.ListenCfg().RPCJSONListen, @@ -445,7 +380,6 @@ func TestSessionSRplManualReplicate(t *testing.T) { } else if aSessions[0].Usage != time.Duration(90)*time.Second { t.Errorf("Received usage: %v", aSessions[0].Usage) } - } func TestSessionSRplStopCgrEngine(t *testing.T) { diff --git a/utils/consts.go b/utils/consts.go index 08389f74b..e27c30b73 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -778,7 +778,7 @@ const ( SessionSv1ForceDisconnect = "SessionSv1.ForceDisconnect" SessionSv1GetPassiveSessions = "SessionSv1.GetPassiveSessions" SessionSv1GetPassiveSessionsCount = "SessionSv1.GetPassiveSessionsCount" - SessionSv1SetPassiveSession = "SessionSV1.SetPassiveSession" + SessionSv1SetPassiveSession = "SessionSv1.SetPassiveSession" SMGenericV1InitiateSession = "SMGenericV1.InitiateSession" SMGenericV2InitiateSession = "SMGenericV2.InitiateSession" SMGenericV2UpdateSession = "SMGenericV2.UpdateSession"