diff --git a/apier/v1/smgenericv1.go b/apier/v1/smgenericv1.go index 6e368cb67..29607b1d5 100644 --- a/apier/v1/smgenericv1.go +++ b/apier/v1/smgenericv1.go @@ -95,7 +95,7 @@ func (self *SMGenericV1) ReplicateActiveSessions(args sessionmanager.ArgsReplica } func (self *SMGenericV1) ReplicatePassiveSessions(args sessionmanager.ArgsReplicateSessions, reply *string) error { - return self.sm.BiRPCV1ReplicateActiveSessions(nil, args, reply) + return self.sm.BiRPCV1ReplicatePassiveSessions(nil, args, reply) } // rpcclient.RpcClientConnection interface diff --git a/data/conf/samples/smgreplcmaster/cgrates.json b/data/conf/samples/smgreplcmaster/cgrates.json index 4f8e8f981..b16c257f6 100644 --- a/data/conf/samples/smgreplcmaster/cgrates.json +++ b/data/conf/samples/smgreplcmaster/cgrates.json @@ -6,9 +6,9 @@ }, "listen": { - "rpc_json": ":2012", - "rpc_gob": ":2013", - "http": ":2080", + "rpc_json": "127.0.0.1:2012", + "rpc_gob": "127.0.0.1:2013", + "http": "127.0.0.1:2080", }, "rals": { diff --git a/data/conf/samples/smgreplcslave/cgrates.json b/data/conf/samples/smgreplcslave/cgrates.json index 0502a9554..086cd6ca4 100644 --- a/data/conf/samples/smgreplcslave/cgrates.json +++ b/data/conf/samples/smgreplcslave/cgrates.json @@ -4,9 +4,9 @@ "listen": { - "rpc_json": ":22012", // RPC JSON listening address - "rpc_gob": ":22013", // RPC GOB listening address - "http": ":22080", // HTTP listening address + "rpc_json": "127.0.0.1:22012", // RPC JSON listening address + "rpc_gob": "127.0.0.1:22013", // RPC GOB listening address + "http": "127.0.0.1:22080", // HTTP listening address }, "rals": { diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 91aa1a0d7..7f53c4316 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -460,7 +460,7 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro // replicateSessions will replicate session based on configuration func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool, smgReplConns []*SMGReplicationConn) (err error) { - if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 && !passiveSessions { // Replicating active + if smg.cgrCfg.SmGenericConfig.DebitInterval != 0 && !passiveSessions { // Replicating active not supported return } ssMux := &smg.aSessionsMux diff --git a/sessionmanager/smgreplc_it_test.go b/sessionmanager/smgreplc_it_test.go index 7e2fffef3..b800061b2 100644 --- a/sessionmanager/smgreplc_it_test.go +++ b/sessionmanager/smgreplc_it_test.go @@ -215,7 +215,8 @@ func TestSMGRplcTerminate(t *testing.T) { } func TestSMGRplcManualReplicate(t *testing.T) { - if _, err := engine.StopStartEngine(smgRplcMasterCfgPath, *waitRater); err != nil { // Kill both and start Master + masterProc, err := engine.StopStartEngine(smgRplcMasterCfgPath, *waitRater) + if err != nil { // Kill both and start Master t.Fatal(err) } if smgRplcMstrRPC, err = jsonrpc.Dial("tcp", smgRplcMasterCfg.RPCJSONListen); err != nil { @@ -282,8 +283,8 @@ func TestSMGRplcManualReplicate(t *testing.T) { } argsRepl := ArgsReplicateSessions{Connections: []*config.HaPoolConfig{ &config.HaPoolConfig{ - Address: smgRplcMasterCfg.SmGenericConfig.SMGReplicationConns[0].Address, - Transport: smgRplcMasterCfg.SmGenericConfig.SMGReplicationConns[0].Transport, + Address: smgRplcSlaveCfg.RPCJSONListen, + Transport: utils.MetaJSONrpc, Synchronous: true}, }} var repply string @@ -298,6 +299,54 @@ func TestSMGRplcManualReplicate(t *testing.T) { } else if aSessions[0].Usage != time.Duration(90)*time.Second { t.Errorf("Received usage: %v", aSessions[0].Usage) } + // kill master + if err := masterProc.Process.Kill(); err != nil { + t.Errorf("Failed to kill process, error: %v", err.Error()) + } + var status map[string]interface{} + if err := smgRplcMstrRPC.Call("Responder.Status", "", &status); err == nil { // master should not longer be reachable + t.Error(err, status) + } + if err := smgRplcSlvRPC.Call("Responder.Status", "", &status); err != nil { // slave should be still operational + t.Error(err) + } + // start master + if _, err := engine.StartEngine(smgRplcMasterCfgPath, *waitRater); err != nil { + t.Fatal(err) + } + if smgRplcMstrRPC, err = jsonrpc.Dial("tcp", smgRplcMasterCfg.RPCJSONListen); err != nil { + t.Fatal(err) + } + // Master should have no session active/passive + if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions", nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err, aSessions) + } + if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions", nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err, aSessions) + } + // recover passive sessions from slave + argsRepl = ArgsReplicateSessions{Connections: []*config.HaPoolConfig{ + &config.HaPoolConfig{ + Address: smgRplcMasterCfg.RPCJSONListen, + Transport: utils.MetaJSONrpc, + Synchronous: true}, + }} + if err := smgRplcSlvRPC.Call("SMGenericV1.ReplicatePassiveSessions", argsRepl, &repply); err != nil { + t.Error(err) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated + // Master should have no session active/passive + if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions", nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err, aSessions) + } + if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions", nil, &aSessions); err != nil { + t.Error(err) + } else if len(aSessions) != 2 { + t.Errorf("Unexpected number of sessions received: %+v", aSessions) + } else if aSessions[0].Usage != time.Duration(90)*time.Second { + t.Errorf("Received usage: %v", aSessions[0].Usage) + } + } func TestSMGRplcStopCgrEngine(t *testing.T) {