Almost finishing replication test

This commit is contained in:
TeoV
2019-02-07 09:46:18 -05:00
committed by Dan Christian Bogos
parent 29429930e8
commit d57be00595
7 changed files with 102 additions and 133 deletions

View File

@@ -109,3 +109,8 @@ func (ssv1 *SessionSv1) Ping(ign string, reply *string) error {
func (ssv1 *SessionSv1) ReplicateSessions(args sessions.ArgsReplicateSessions, rply *string) error {
return ssv1.Ss.BiRPCv1ReplicateSessions(nil, args, rply)
}
func (ssv1 *SessionSv1) SetPassiveSession(args *sessions.Session,
reply *string) error {
return ssv1.Ss.BiRPCv1SetPassiveSession(nil, args, reply)
}

View File

@@ -47,6 +47,7 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} {
utils.SessionSv1Ping: ssv1.BiRPCPing,
utils.SessionSv1ReplicateSessions: ssv1.BiRPCv1ReplicateSessions,
utils.SessionSv1SetPassiveSession: ssv1.BiRPCv1SetPassiveSession,
}
}
@@ -132,3 +133,8 @@ func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(clnt *rpc2.Client,
args sessions.ArgsReplicateSessions, reply *string) error {
return ssv1.BiRPCv1ReplicateSessions(clnt, args, reply)
}
func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(clnt *rpc2.Client,
args *sessions.Session, reply *string) error {
return ssv1.BiRPCv1SetPassiveSession(clnt, args, reply)
}

View File

@@ -3,6 +3,7 @@
// Copyright (C) ITsysCOM GmbH
"general": {
"log_level": 7,
"node_id":"MasterReplication",
},
"listen": {

View File

@@ -1,7 +1,10 @@
{
// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
"general": {
"log_level": 7,
"node_id":"SlaveReplication",
},
"listen": {
"rpc_json": "127.0.0.1:22012", // RPC JSON listening address
@@ -36,9 +39,6 @@
"sessions": {
"enabled": true, // starts SessionManager service: <true|false>
"listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests
"session_replication_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"},
],
},
}

View File

@@ -987,6 +987,7 @@ func (sS *SessionS) getSessions(cgrID string, pSessions bool) (ss []*Session) {
var i int
for _, s := range ssMp {
ss[i] = s
i++
}
return
}
@@ -2110,6 +2111,28 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection,
me.GetDurationPtrIgnoreErrors(utils.Usage)); err != nil {
return utils.NewErrRALs(err)
}
//check if we have replicate connection and close the session there
if len(sS.sReplConns) != 0 {
var wg sync.WaitGroup
for _, rplConn := range sS.sReplConns {
if rplConn.Synchronous {
wg.Add(1)
}
go func(conn rpcclient.RpcClientConnection, sync bool, s *Session) {
var rply string
if err := conn.Call(utils.SessionSv1TerminateSession,
args, &rply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> cannot termainte session with id <%s>, err: %s",
utils.SessionS, s.Clone().CGRID, err.Error()))
}
if sync {
wg.Done()
}
}(rplConn.Connection, rplConn.Synchronous, s)
}
wg.Wait() // wait for synchronous replication to finish
}
}
if args.ReleaseResources {
if sS.resS == nil {

View File

@@ -29,7 +29,6 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
var smgRplcMasterCfgPath, smgRplcSlaveCfgPath string
@@ -87,47 +86,41 @@ func TestSessionSRplTPFromFolder(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
//add a default charger
chargerProfile := &engine.ChargerProfile{
Tenant: "cgrates.org",
ID: "Default",
RunID: "*default",
AttributeIDs: []string{"*none"},
Weight: 20,
}
var result string
if err := smgRplcMstrRPC.Call("ApierV1.SetChargerProfile", chargerProfile, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
}
func TestSessionSRplInitiate(t *testing.T) {
var pSessions []*ActiveSession
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions,
nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
var aSessions []*ActiveSession
//make sure we don't have active sessions on master and passive on slave
if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions,
nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
args := &V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSSv1ItTerminateSession",
Event: map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.Tenant: "cgrates.org",
utils.OriginID: "123451",
utils.ToR: utils.VOICE,
utils.RequestType: utils.META_PREPAID,
utils.Account: "1001",
utils.Subject: "1001",
utils.Destination: "1004",
utils.Category: "call",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC),
utils.Usage: 1*time.Minute + 30*time.Second,
},
},
}
var reply string
if err := smgRplcMstrRPC.Call(utils.SessionSv1TerminateSession,
args, &reply); err == nil ||
err.Error() != rpcclient.ErrSessionNotFound.Error() { // Update should return rpcclient.ErrSessionNotFound
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions,
nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
usage := time.Duration(1*time.Minute + 30*time.Second)
argsInit := &V1InitSessionArgs{
InitSession: true,
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSSv1ItAuth",
ID: "TestSessionSRplInitiate",
Event: map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.Tenant: "cgrates.org",
@@ -140,29 +133,34 @@ func TestSessionSRplInitiate(t *testing.T) {
utils.Category: "call",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC),
utils.Usage: 1*time.Minute + 30*time.Second,
utils.Usage: usage,
},
},
}
var initRpl *V1InitSessionReply
var initRpl V1InitSessionReply
if err := smgRplcMstrRPC.Call(utils.SessionSv1InitiateSession,
argsInit, &initRpl); err != nil {
t.Error(err)
}
if initRpl.MaxUsage != utils.DurationPointer(time.Duration(90*time.Second)) {
t.Error("Bad max usage: ", initRpl.MaxUsage)
//compare the value
if *initRpl.MaxUsage != usage {
t.Errorf("Expecting : %+v, received: %+v", usage, initRpl.MaxUsage)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
var aSessions []*ActiveSession
//check if the session was createad as active session on master
if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions,
map[string]string{utils.OriginID: "123451"}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 1 {
t.Errorf("Unexpected number of sessions received: %+v", aSessions)
t.Errorf("Unexpected number of sessions received: %+v", utils.ToIJSON(aSessions))
} else if aSessions[0].Usage != time.Duration(90)*time.Second {
t.Errorf("Received usage: %v", aSessions[0].Usage)
}
//check if the session was created as passive session on slave
var pSessions []*ActiveSession
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions,
map[string]string{utils.OriginID: "123451"}, &pSessions); err != nil {
t.Error(err)
@@ -173,79 +171,12 @@ func TestSessionSRplInitiate(t *testing.T) {
}
}
// Update on slave
func TestSessionSRplUpdate(t *testing.T) {
args := &V1UpdateSessionArgs{
UpdateSession: true,
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSSv1ItUpdateSession",
Event: map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.Tenant: "cgrates.org",
utils.OriginID: "123451",
utils.ToR: utils.VOICE,
utils.RequestType: utils.META_PREPAID,
utils.Account: "1001",
utils.Subject: "1001",
utils.Destination: "1004",
utils.Category: "call",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC),
utils.Usage: 1 * time.Minute,
},
},
}
var rply V1UpdateSessionReply
if err := smgRplcSlvRPC.Call(utils.SessionSv1UpdateSession,
args, &rply); err != nil {
t.Error(err)
} else if rply.MaxUsage != utils.DurationPointer(time.Duration(time.Minute)) {
t.Error("Bad max usage: ", rply.MaxUsage)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
var aSessions []*ActiveSession
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions,
map[string]string{utils.OriginID: "123451"}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 1 {
t.Errorf("Unexpected number of sessions received: %+v", aSessions)
} else if aSessions[0].Usage != time.Duration(150)*time.Second {
t.Errorf("Received usage: %v", aSessions[0].Usage)
}
var pSessions []*ActiveSession
// Make sure we don't have passive session on active host
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil,
&pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
// Master should not longer have activeSession
if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions,
map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
cgrID := GetSetCGRID(engine.NewSafEvent(args.CGREvent.Event))
// Make sure session was replicated
if err := smgRplcMstrRPC.Call(utils.SessionSv1GetPassiveSessions,
nil, &pSessions); err != nil {
t.Error(err)
} else if len(pSessions) != 1 {
t.Errorf("PassiveSessions: %+v", pSessions)
} else if pSessions[0].CGRID != cgrID {
t.Errorf("PassiveSession: %+v", pSessions[0])
} else if pSessions[0].Usage != time.Duration(150*time.Second) {
t.Errorf("PassiveSession: %+v", pSessions[0])
}
}
func TestSessionSRplTerminate(t *testing.T) {
args := &V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSSv1ItTerminateSession",
ID: "TestSessionSRplTerminate",
Event: map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.Tenant: "cgrates.org",
@@ -258,7 +189,7 @@ func TestSessionSRplTerminate(t *testing.T) {
utils.Category: "call",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC),
utils.Usage: 3 * time.Minute,
utils.Usage: time.Duration(1*time.Minute + 30*time.Second),
},
},
}
@@ -268,24 +199,17 @@ func TestSessionSRplTerminate(t *testing.T) {
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
var aSessions []*ActiveSession
//check if the session was terminated on master
if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions,
map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err, aSessions)
}
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions,
map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err, aSessions)
}
var pSessions map[string][]*Session
if err := smgRplcMstrRPC.Call(utils.SessionSv1GetPassiveSessions,
nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
var pSessions []*ActiveSession
//check if the session was terminated on slave
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions,
nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
t.Errorf("Error: %v with len(pSessions)=%v , session : %+v", err, len(pSessions), utils.ToIJSON(pSessions))
}
}
@@ -297,7 +221,7 @@ func TestSessionSRplManualReplicate(t *testing.T) {
if smgRplcMstrRPC, err = jsonrpc.Dial("tcp", smgRplcMasterCfg.ListenCfg().RPCJSONListen); err != nil {
t.Fatal(err)
}
// create two sessions
argsInit1 := &V1InitSessionArgs{
InitSession: true,
CGREvent: utils.CGREvent{
@@ -324,15 +248,15 @@ func TestSessionSRplManualReplicate(t *testing.T) {
InitSession: true,
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
ID: "TestSSv1ItAuth",
ID: "TestSSv1ItAuth2",
Event: map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.Tenant: "cgrates.org",
utils.OriginID: "123481",
utils.ToR: utils.VOICE,
utils.RequestType: utils.META_PREPAID,
utils.Account: "utils",
utils.Subject: "utils",
utils.Account: "1001",
utils.Subject: "1001",
utils.Destination: "1005",
utils.Category: "call",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
@@ -347,17 +271,17 @@ func TestSessionSRplManualReplicate(t *testing.T) {
if err := smgRplcMstrRPC.Call(utils.SessionSv1InitiateSession, args, &initRpl); err != nil {
t.Error(err)
}
if initRpl.MaxUsage != utils.DurationPointer(time.Duration(90*time.Second)) {
if *initRpl.MaxUsage != time.Duration(90*time.Second) {
t.Error("Bad max usage: ", initRpl.MaxUsage)
}
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
//verify if the sessions was created on master and are active
var aSessions []*ActiveSession
if err := smgRplcMstrRPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 2 {
t.Errorf("Unexpected number of sessions received: %+v", aSessions)
} else if aSessions[0].Usage != time.Duration(90)*time.Second {
t.Errorf("Unexpected number of sessions received: %+v", utils.ToJSON(aSessions))
} else if aSessions[0].Usage != time.Duration(90)*time.Second && aSessions[1].Usage != time.Duration(90)*time.Second {
t.Errorf("Received usage: %v", aSessions[0].Usage)
}
// Start slave, should not have any active session at beginning
@@ -374,7 +298,9 @@ func TestSessionSRplManualReplicate(t *testing.T) {
if smgRplcSlvRPC, err = jsonrpc.Dial("tcp", smgRplcSlaveCfg.ListenCfg().RPCJSONListen); err != nil {
t.Fatal(err)
}
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
// when we start slave after master we expect to don't have sessions
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetPassiveSessions, nil, &aSessions); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err, aSessions)
}
argsRepl := ArgsReplicateSessions{
@@ -383,7 +309,9 @@ func TestSessionSRplManualReplicate(t *testing.T) {
Address: smgRplcSlaveCfg.ListenCfg().RPCJSONListen,
Transport: utils.MetaJSONrpc,
Synchronous: true},
}}
},
}
//replicate manually the session from master to slave
var repply string
if err := smgRplcMstrRPC.Call(utils.SessionSv1ReplicateSessions, argsRepl, &repply); err != nil {
t.Error(err)
@@ -407,6 +335,14 @@ func TestSessionSRplManualReplicate(t *testing.T) {
if err := smgRplcSlvRPC.Call("Responder.Status", "", &status); err != nil { // slave should be still operational
t.Error(err)
}
// check if the passive sessions from slave became active
if err := smgRplcSlvRPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 2 {
t.Errorf("Unexpected number of sessions received: %+v", utils.ToJSON(aSessions))
} else if aSessions[0].Usage != time.Duration(90)*time.Second && aSessions[1].Usage != time.Duration(90)*time.Second {
t.Errorf("Received usage: %v", aSessions[0].Usage)
}
// start master
if _, err := engine.StartEngine(smgRplcMasterCfgPath, *waitRater); err != nil {
t.Fatal(err)
@@ -423,7 +359,6 @@ func TestSessionSRplManualReplicate(t *testing.T) {
}
// recover passive sessions from slave
argsRepl = ArgsReplicateSessions{
Passive: true,
Connections: []*config.HaPoolConfig{
{
Address: smgRplcMasterCfg.ListenCfg().RPCJSONListen,
@@ -445,7 +380,6 @@ func TestSessionSRplManualReplicate(t *testing.T) {
} else if aSessions[0].Usage != time.Duration(90)*time.Second {
t.Errorf("Received usage: %v", aSessions[0].Usage)
}
}
func TestSessionSRplStopCgrEngine(t *testing.T) {

View File

@@ -778,7 +778,7 @@ const (
SessionSv1ForceDisconnect = "SessionSv1.ForceDisconnect"
SessionSv1GetPassiveSessions = "SessionSv1.GetPassiveSessions"
SessionSv1GetPassiveSessionsCount = "SessionSv1.GetPassiveSessionsCount"
SessionSv1SetPassiveSession = "SessionSV1.SetPassiveSession"
SessionSv1SetPassiveSession = "SessionSv1.SetPassiveSession"
SMGenericV1InitiateSession = "SMGenericV1.InitiateSession"
SMGenericV2InitiateSession = "SMGenericV2.InitiateSession"
SMGenericV2UpdateSession = "SMGenericV2.UpdateSession"