mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-25 00:58:45 +05:00
Tests for SMG PassiveSessionsReplication, small typo fix in API
This commit is contained in:
@@ -95,7 +95,7 @@ func (self *SMGenericV1) ReplicateActiveSessions(args sessionmanager.ArgsReplica
|
||||
}
|
||||
|
||||
func (self *SMGenericV1) ReplicatePassiveSessions(args sessionmanager.ArgsReplicateSessions, reply *string) error {
|
||||
return self.sm.BiRPCV1ReplicateActiveSessions(nil, args, reply)
|
||||
return self.sm.BiRPCV1ReplicatePassiveSessions(nil, args, reply)
|
||||
}
|
||||
|
||||
// rpcclient.RpcClientConnection interface
|
||||
|
||||
@@ -6,9 +6,9 @@
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":2012",
|
||||
"rpc_gob": ":2013",
|
||||
"http": ":2080",
|
||||
"rpc_json": "127.0.0.1:2012",
|
||||
"rpc_gob": "127.0.0.1:2013",
|
||||
"http": "127.0.0.1:2080",
|
||||
},
|
||||
|
||||
"rals": {
|
||||
|
||||
@@ -4,9 +4,9 @@
|
||||
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":22012", // RPC JSON listening address
|
||||
"rpc_gob": ":22013", // RPC GOB listening address
|
||||
"http": ":22080", // HTTP listening address
|
||||
"rpc_json": "127.0.0.1:22012", // RPC JSON listening address
|
||||
"rpc_gob": "127.0.0.1:22013", // RPC GOB listening address
|
||||
"http": "127.0.0.1:22080", // HTTP listening address
|
||||
},
|
||||
|
||||
"rals": {
|
||||
|
||||
@@ -460,7 +460,7 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro
|
||||
|
||||
// replicateSessions will replicate session based on configuration
|
||||
func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool, smgReplConns []*SMGReplicationConn) (err error) {
|
||||
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 && !passiveSessions { // Replicating active
|
||||
if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 && !passiveSessions { // Replicating active not supported
|
||||
return
|
||||
}
|
||||
ssMux := &smg.aSessionsMux
|
||||
|
||||
@@ -215,7 +215,8 @@ func TestSMGRplcTerminate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSMGRplcManualReplicate(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(smgRplcMasterCfgPath, *waitRater); err != nil { // Kill both and start Master
|
||||
masterProc, err := engine.StopStartEngine(smgRplcMasterCfgPath, *waitRater)
|
||||
if err != nil { // Kill both and start Master
|
||||
t.Fatal(err)
|
||||
}
|
||||
if smgRplcMstrRPC, err = jsonrpc.Dial("tcp", smgRplcMasterCfg.RPCJSONListen); err != nil {
|
||||
@@ -282,8 +283,8 @@ func TestSMGRplcManualReplicate(t *testing.T) {
|
||||
}
|
||||
argsRepl := ArgsReplicateSessions{Connections: []*config.HaPoolConfig{
|
||||
&config.HaPoolConfig{
|
||||
Address: smgRplcMasterCfg.SmGenericConfig.SMGReplicationConns[0].Address,
|
||||
Transport: smgRplcMasterCfg.SmGenericConfig.SMGReplicationConns[0].Transport,
|
||||
Address: smgRplcSlaveCfg.RPCJSONListen,
|
||||
Transport: utils.MetaJSONrpc,
|
||||
Synchronous: true},
|
||||
}}
|
||||
var repply string
|
||||
@@ -298,6 +299,54 @@ func TestSMGRplcManualReplicate(t *testing.T) {
|
||||
} else if aSessions[0].Usage != time.Duration(90)*time.Second {
|
||||
t.Errorf("Received usage: %v", aSessions[0].Usage)
|
||||
}
|
||||
// kill master
|
||||
if err := masterProc.Process.Kill(); err != nil {
|
||||
t.Errorf("Failed to kill process, error: %v", err.Error())
|
||||
}
|
||||
var status map[string]interface{}
|
||||
if err := smgRplcMstrRPC.Call("Responder.Status", "", &status); err == nil { // master should not longer be reachable
|
||||
t.Error(err, status)
|
||||
}
|
||||
if err := smgRplcSlvRPC.Call("Responder.Status", "", &status); err != nil { // slave should be still operational
|
||||
t.Error(err)
|
||||
}
|
||||
// start master
|
||||
if _, err := engine.StartEngine(smgRplcMasterCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if smgRplcMstrRPC, err = jsonrpc.Dial("tcp", smgRplcMasterCfg.RPCJSONListen); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Master should have no session active/passive
|
||||
if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions", nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err, aSessions)
|
||||
}
|
||||
if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions", nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err, aSessions)
|
||||
}
|
||||
// recover passive sessions from slave
|
||||
argsRepl = ArgsReplicateSessions{Connections: []*config.HaPoolConfig{
|
||||
&config.HaPoolConfig{
|
||||
Address: smgRplcMasterCfg.RPCJSONListen,
|
||||
Transport: utils.MetaJSONrpc,
|
||||
Synchronous: true},
|
||||
}}
|
||||
if err := smgRplcSlvRPC.Call("SMGenericV1.ReplicatePassiveSessions", argsRepl, &repply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
|
||||
// Master should have no session active/passive
|
||||
if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions", nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err, aSessions)
|
||||
}
|
||||
if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions", nil, &aSessions); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(aSessions) != 2 {
|
||||
t.Errorf("Unexpected number of sessions received: %+v", aSessions)
|
||||
} else if aSessions[0].Usage != time.Duration(90)*time.Second {
|
||||
t.Errorf("Received usage: %v", aSessions[0].Usage)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSMGRplcStopCgrEngine(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user