From 8e05e19cc3a91daea29bfa51d52202e459c5601f Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 31 May 2016 20:57:56 +0200 Subject: [PATCH] HA tesitng for BROADCAST pool, rpcclient version upgrade --- engine/libengine.go | 2 +- general_tests/rpcclient_it_test.go | 160 +++++++++++++++++++++++++++-- glide.lock | 2 +- 3 files changed, 156 insertions(+), 8 deletions(-) diff --git a/engine/libengine.go b/engine/libengine.go index 531b89397..65d1371ec 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -31,7 +31,7 @@ func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTi rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) { var rpcClient *rpcclient.RpcClient var err error - rpcPool := rpcclient.NewRpcClientPool(dispatchStrategy) + rpcPool := rpcclient.NewRpcClientPool(dispatchStrategy, replyTimeout) atLestOneConnected := false // If one connected we don't longer return errors for _, rpcConnCfg := range rpcConnCfgs { if rpcConnCfg.Address == utils.MetaInternal { diff --git a/general_tests/rpcclient_it_test.go b/general_tests/rpcclient_it_test.go index f478f5024..41747d73f 100644 --- a/general_tests/rpcclient_it_test.go +++ b/general_tests/rpcclient_it_test.go @@ -36,7 +36,7 @@ import ( var rpcITCfgPath1, rpcITCfgPath2 string var rpcITCfg1, rpcITCfg2 *config.CGRConfig var rpcRAL1, rpcRAL2 *rpcclient.RpcClient -var rpcPoolFirst *rpcclient.RpcClientPool +var rpcPoolFirst, rpcPoolBroadcast *rpcclient.RpcClientPool var ral1, ral2 *exec.Cmd var err error var ral1ID, ral2ID, ralRmtID string @@ -49,7 +49,6 @@ func TestRPCITInitCfg(t *testing.T) { } rpcITCfgPath1 = path.Join(*dataDir, "conf", "samples", "multiral1") rpcITCfgPath2 = path.Join(*dataDir, "conf", "samples", "multiral2") - // Init config first rpcITCfg1, err = config.NewCGRConfigFromFolder(rpcITCfgPath1) if err != nil { t.Error(err) @@ -58,6 +57,9 @@ func TestRPCITInitCfg(t *testing.T) { if err != nil { t.Error(err) } + if err := engine.InitDataDb(rpcITCfg1); err != nil { + t.Fatal(err) + } } func TestRPCITStartSecondEngine(t *testing.T) { @@ -70,11 +72,11 @@ func TestRPCITStartSecondEngine(t *testing.T) { } // Connect rpc client to rater -func TestRPCITRpcConnPool(t *testing.T) { +func TestRPCITRpcConnPoolFirst(t *testing.T) { if !*testIntegration { return } - rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST) + rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0) rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil) if err == nil { t.Fatal("Should receive cannot connect error here") @@ -203,10 +205,10 @@ func TestRPCITDirectedRPC(t *testing.T) { // Special tests involving remote server (manually set) // The server network will be manually disconnected without TCP close func TestRPCITRmtRpcConnPool(t *testing.T) { - if !*testIntegration { + if !*testRemoteRALs { return } - rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST) + rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST, 0) rpcRALRmt, err := rpcclient.NewRpcClient("tcp", "172.16.254.83:2012", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil) if err != nil { t.Fatal(err) @@ -295,3 +297,149 @@ func TestRPCITRmtStatusFirstFailback(t *testing.T) { t.Fatal("Did not do failback") } } + +// Connect rpc client to rater +func TestRPCITRpcConnPoolBcast(t *testing.T) { + if !*testIntegration { + return + } + rpcPoolBroadcast = rpcclient.NewRpcClientPool(rpcclient.POOL_BROADCAST, time.Duration(2*time.Second)) + rpcPoolBroadcast.AddClient(rpcRAL1) + rpcPoolBroadcast.AddClient(rpcRAL2) +} + +func TestRPCITBcastStatusInitial(t *testing.T) { + if !*testIntegration { + return + } + var status map[string]interface{} + if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } + if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } +} + +func TestRPCITBcastStatusNoRals1(t *testing.T) { + if !*testIntegration { + return + } + if err := ral1.Process.Kill(); err != nil { // Kill the first RAL + t.Error(err) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) + var status map[string]interface{} + if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } + if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } +} + +func TestRPCITBcastStatusBcastNoRals(t *testing.T) { + if !*testIntegration { + return + } + if err := ral2.Process.Kill(); err != nil { // Kill the first RAL + t.Error(err) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) + var status map[string]interface{} + if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err == nil { + t.Error("Should get error") + } +} + +func TestRPCITBcastStatusRALs2Up(t *testing.T) { + if !*testIntegration { + return + } + if ral2, err = engine.StartEngine(rpcITCfgPath2, *waitRater); err != nil { + t.Fatal(err) + } + var status map[string]interface{} + if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } + if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } +} + +func TestRPCITStatusBcastRALs1Up(t *testing.T) { + if !*testIntegration { + return + } + if ral1, err = engine.StartEngine(rpcITCfgPath1, *waitRater); err != nil { + t.Fatal(err) + } + var status map[string]interface{} + if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } + if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } +} + +func TestRPCITStatusBcastCmd(t *testing.T) { + if !*testIntegration { + return + } + var stats utils.CacheStats + if err := rpcRAL1.Call("ApierV2.GetCacheStats", utils.AttrCacheStats{}, &stats); err != nil { + t.Error(err) + } else if stats.LastLoadId != utils.NOT_AVAILABLE { + t.Errorf("Received unexpected stats: %+v", stats) + } + var loadInst engine.LoadInstance + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} + if err := rpcRAL1.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil { + t.Error(err) + } else if loadInst.LoadId == "" { + t.Errorf("Empty loadId received, loadInstance: %+v", loadInst) + } + var reply string + if err := rpcPoolBroadcast.Call("ApierV1.ReloadCache", utils.AttrReloadCache{}, &reply); err != nil { + t.Error("Got error on ApierV1.ReloadCache: ", err.Error()) + } else if reply != utils.OK { + t.Error("Calling ApierV1.ReloadCache got reply: ", reply) + } + if err := rpcRAL1.Call("ApierV2.GetCacheStats", utils.AttrCacheStats{}, &stats); err != nil { + t.Error(err) + } else if stats.LastLoadId != loadInst.LoadId { + t.Errorf("Received unexpected stats: %+v", stats) + } + if err := rpcRAL2.Call("ApierV2.GetCacheStats", utils.AttrCacheStats{}, &stats); err != nil { + t.Error(err) + } else if stats.LastLoadId != loadInst.LoadId { + t.Errorf("Received unexpected stats: %+v", stats) + } +} + +func TestRPCITStopCgrEngine(t *testing.T) { + if !*testIntegration { + return + } + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/glide.lock b/glide.lock index 7157b9e78..5b8f5cddc 100644 --- a/glide.lock +++ b/glide.lock @@ -12,7 +12,7 @@ imports: - name: github.com/cgrates/osipsdagram version: 3d6beed663452471dec3ca194137a30d379d9e8f - name: github.com/cgrates/rpcclient - version: d9a94e52e08bf98a288c9460ce6adb661a6c089b + version: 1d54b34b6a5531e1124728b489c44ec6a8ac231e - name: github.com/ChrisTrenkamp/goxpath version: 4aad8d0161aae7d17df4755d2c1e86cd1fcaaab6 subpackages: