HA tesitng for BROADCAST pool, rpcclient version upgrade

This commit is contained in:
DanB
2016-05-31 20:57:56 +02:00
parent b27c588eff
commit 8e05e19cc3
3 changed files with 156 additions and 8 deletions

View File

@@ -31,7 +31,7 @@ func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTi
rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) {
var rpcClient *rpcclient.RpcClient
var err error
rpcPool := rpcclient.NewRpcClientPool(dispatchStrategy)
rpcPool := rpcclient.NewRpcClientPool(dispatchStrategy, replyTimeout)
atLestOneConnected := false // If one connected we don't longer return errors
for _, rpcConnCfg := range rpcConnCfgs {
if rpcConnCfg.Address == utils.MetaInternal {

View File

@@ -36,7 +36,7 @@ import (
var rpcITCfgPath1, rpcITCfgPath2 string
var rpcITCfg1, rpcITCfg2 *config.CGRConfig
var rpcRAL1, rpcRAL2 *rpcclient.RpcClient
var rpcPoolFirst *rpcclient.RpcClientPool
var rpcPoolFirst, rpcPoolBroadcast *rpcclient.RpcClientPool
var ral1, ral2 *exec.Cmd
var err error
var ral1ID, ral2ID, ralRmtID string
@@ -49,7 +49,6 @@ func TestRPCITInitCfg(t *testing.T) {
}
rpcITCfgPath1 = path.Join(*dataDir, "conf", "samples", "multiral1")
rpcITCfgPath2 = path.Join(*dataDir, "conf", "samples", "multiral2")
// Init config first
rpcITCfg1, err = config.NewCGRConfigFromFolder(rpcITCfgPath1)
if err != nil {
t.Error(err)
@@ -58,6 +57,9 @@ func TestRPCITInitCfg(t *testing.T) {
if err != nil {
t.Error(err)
}
if err := engine.InitDataDb(rpcITCfg1); err != nil {
t.Fatal(err)
}
}
func TestRPCITStartSecondEngine(t *testing.T) {
@@ -70,11 +72,11 @@ func TestRPCITStartSecondEngine(t *testing.T) {
}
// Connect rpc client to rater
func TestRPCITRpcConnPool(t *testing.T) {
func TestRPCITRpcConnPoolFirst(t *testing.T) {
if !*testIntegration {
return
}
rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0)
rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil)
if err == nil {
t.Fatal("Should receive cannot connect error here")
@@ -203,10 +205,10 @@ func TestRPCITDirectedRPC(t *testing.T) {
// Special tests involving remote server (manually set)
// The server network will be manually disconnected without TCP close
func TestRPCITRmtRpcConnPool(t *testing.T) {
if !*testIntegration {
if !*testRemoteRALs {
return
}
rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0)
rpcRALRmt, err := rpcclient.NewRpcClient("tcp", "172.16.254.83:2012", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil)
if err != nil {
t.Fatal(err)
@@ -295,3 +297,149 @@ func TestRPCITRmtStatusFirstFailback(t *testing.T) {
t.Fatal("Did not do failback")
}
}
// Connect rpc client to rater
func TestRPCITRpcConnPoolBcast(t *testing.T) {
if !*testIntegration {
return
}
rpcPoolBroadcast = rpcclient.NewRpcClientPool(rpcclient.POOL_BROADCAST, time.Duration(2*time.Second))
rpcPoolBroadcast.AddClient(rpcRAL1)
rpcPoolBroadcast.AddClient(rpcRAL2)
}
func TestRPCITBcastStatusInitial(t *testing.T) {
if !*testIntegration {
return
}
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
}
}
func TestRPCITBcastStatusNoRals1(t *testing.T) {
if !*testIntegration {
return
}
if err := ral1.Process.Kill(); err != nil { // Kill the first RAL
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
}
}
func TestRPCITBcastStatusBcastNoRals(t *testing.T) {
if !*testIntegration {
return
}
if err := ral2.Process.Kill(); err != nil { // Kill the first RAL
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err == nil {
t.Error("Should get error")
}
}
func TestRPCITBcastStatusRALs2Up(t *testing.T) {
if !*testIntegration {
return
}
if ral2, err = engine.StartEngine(rpcITCfgPath2, *waitRater); err != nil {
t.Fatal(err)
}
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
}
}
func TestRPCITStatusBcastRALs1Up(t *testing.T) {
if !*testIntegration {
return
}
if ral1, err = engine.StartEngine(rpcITCfgPath1, *waitRater); err != nil {
t.Fatal(err)
}
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
}
}
func TestRPCITStatusBcastCmd(t *testing.T) {
if !*testIntegration {
return
}
var stats utils.CacheStats
if err := rpcRAL1.Call("ApierV2.GetCacheStats", utils.AttrCacheStats{}, &stats); err != nil {
t.Error(err)
} else if stats.LastLoadId != utils.NOT_AVAILABLE {
t.Errorf("Received unexpected stats: %+v", stats)
}
var loadInst engine.LoadInstance
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
if err := rpcRAL1.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil {
t.Error(err)
} else if loadInst.LoadId == "" {
t.Errorf("Empty loadId received, loadInstance: %+v", loadInst)
}
var reply string
if err := rpcPoolBroadcast.Call("ApierV1.ReloadCache", utils.AttrReloadCache{}, &reply); err != nil {
t.Error("Got error on ApierV1.ReloadCache: ", err.Error())
} else if reply != utils.OK {
t.Error("Calling ApierV1.ReloadCache got reply: ", reply)
}
if err := rpcRAL1.Call("ApierV2.GetCacheStats", utils.AttrCacheStats{}, &stats); err != nil {
t.Error(err)
} else if stats.LastLoadId != loadInst.LoadId {
t.Errorf("Received unexpected stats: %+v", stats)
}
if err := rpcRAL2.Call("ApierV2.GetCacheStats", utils.AttrCacheStats{}, &stats); err != nil {
t.Error(err)
} else if stats.LastLoadId != loadInst.LoadId {
t.Errorf("Received unexpected stats: %+v", stats)
}
}
func TestRPCITStopCgrEngine(t *testing.T) {
if !*testIntegration {
return
}
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
}

2
glide.lock generated
View File

@@ -12,7 +12,7 @@ imports:
- name: github.com/cgrates/osipsdagram
version: 3d6beed663452471dec3ca194137a30d379d9e8f
- name: github.com/cgrates/rpcclient
version: d9a94e52e08bf98a288c9460ce6adb661a6c089b
version: 1d54b34b6a5531e1124728b489c44ec6a8ac231e
- name: github.com/ChrisTrenkamp/goxpath
version: 4aad8d0161aae7d17df4755d2c1e86cd1fcaaab6
subpackages: