added tests and method comments

This commit is contained in:
Radu Ioan Fericean
2012-01-28 18:54:48 +02:00
parent a80a234467
commit 4875228926
7 changed files with 261 additions and 6 deletions

5
.gitignore vendored
View File

@@ -3,8 +3,3 @@ a.out
*.6
*.pyc
.settings
inquirer
rater
kyoto_loader
redis_loader
test

80
cmd/inquirer/balancer.go Normal file
View File

@@ -0,0 +1,80 @@
package main
import (
"net/rpc"
"sync"
)
type RaterList struct {
clientAddresses []string
clientConnections []*rpc.Client
balancerIndex int
mu sync.RWMutex
}
/*
Constructor for RateList holding one slice for addreses and one slice for connections.
*/
func NewRaterList() *RaterList {
r:= &RaterList{balancerIndex: 0} // leaving both slices to nil
return r
}
/*
Adds a client to the two internal slices.
*/
func (rl *RaterList) AddClient(address string, client *rpc.Client){
rl.clientAddresses = append(rl.clientAddresses, address)
rl.clientConnections = append(rl.clientConnections, client)
return
}
/*
Removes a client from the slices locking the readers and reseting the balancer index.
*/
func (rl *RaterList) RemoveClient(address string){
index := -1
for i, v := range rl.clientAddresses {
if v == address {
index = i
break
}
}
if index == -1 {
return
}
rl.mu.RLock()
defer rl.mu.RUnlock()
rl.clientAddresses = append(rl.clientAddresses[:index], rl.clientAddresses[index+1:]...)
rl.clientConnections = append(rl.clientConnections[:index], rl.clientConnections[index+1:]...)
rl.balancerIndex = 0
}
/*
Returns a client for the specifed address.
*/
func (rl *RaterList) GetClient(address string) (*rpc.Client, bool){
for i, v := range rl.clientAddresses {
if v == address {
return rl.clientConnections[i], true
}
}
return nil, false
}
/*
Returns the next available connection at each call looping at the end of connections.
*/
func (rl *RaterList) Balance() (result *rpc.Client) {
rl.mu.Lock()
defer rl.mu.Unlock()
if rl.balancerIndex >= len(rl.clientAddresses) {
rl.balancerIndex = 0
}
if len(rl.clientAddresses) > 0 {
result = rl.clientConnections[rl.balancerIndex]
rl.balancerIndex++
}
return
}

View File

@@ -0,0 +1,74 @@
package main
import (
"testing"
"net/rpc"
)
func BenchmarkBalance(b *testing.B) {
raterlist:= NewRaterList()
raterlist.AddClient("client 1", new(rpc.Client))
raterlist.AddClient("client 2", new(rpc.Client))
raterlist.AddClient("client 3", new(rpc.Client))
for i := 0; i < b.N; i++ {
raterlist.Balance()
}
}
func TestRemoving(t *testing.T) {
raterlist:= NewRaterList()
c1:= new(rpc.Client)
c2:= new(rpc.Client)
c3:= new(rpc.Client)
raterlist.AddClient("client 1", c1)
raterlist.AddClient("client 2", c2)
raterlist.AddClient("client 3", c3)
raterlist.RemoveClient("client 2")
if raterlist.clientConnections[0] != c1 ||
raterlist.clientConnections[1] != c3 ||
len(raterlist.clientConnections) != 2 {
t.Error("Failed removing rater")
}
}
func TestGet(t *testing.T) {
raterlist:= NewRaterList()
c1:= new(rpc.Client)
raterlist.AddClient("client 1", c1)
result, ok:= raterlist.GetClient("client 1")
if !ok || c1 != result {
t.Error("Get failed")
}
}
func TestOneBalancer(t *testing.T) {
raterlist:= NewRaterList()
raterlist.AddClient("client 1", new(rpc.Client))
c1:= raterlist.Balance()
c2:= raterlist.Balance()
if c1 != c2 {
t.Error("With only one rater these shoud be equal")
}
}
func Test100Balancer(t *testing.T) {
raterlist:= NewRaterList()
var clients []*rpc.Client
for i:=0; i< 100; i++ {
c:= new(rpc.Client)
clients = append(clients, c)
raterlist.AddClient("client 1", c)
}
for i:=0; i< 100; i++ {
c:=raterlist.Balance()
if c != clients[i] {
t.Error("Balance did not iterate all the available clients")
}
}
c:=raterlist.Balance()
if c != clients[0] {
t.Error("Balance did not lopped from the begining")
}
}

View File

@@ -25,7 +25,7 @@ func CallRater(key string) (reply string) {
client:= raterList.Balance()
if client == nil {
log.Print("Waiting for raters to register...")
time.Sleep(1 * time.Second) // white one second and retry
time.Sleep(1 * time.Second) // wait one second and retry
} else {
err = client.Call("Storage.Get", key, &reply)
if err != nil {

View File

@@ -0,0 +1,31 @@
package main
import (
"net/rpc"
"testing"
)
func BenchmarkRPCGet(b *testing.B) {
b.StopTimer()
client, _ := rpc.DialHTTPPath("tcp", "localhost:2000", "/rpc")
b.StartTimer()
var reply string
for i := 0; i < b.N; i++ {
client.Call("Responder.Get", "test", &reply)
}
}
func TestRPCGet(t *testing.T) {
client, err := rpc.DialHTTPPath("tcp", "localhost:2000", "/rpc")
if err != nil {
t.Error("Inquirer server not started!")
t.FailNow()
}
var reply string
client.Call("Responder.Get", "test", &reply)
const expect = "12223"
if reply != expect {
t.Errorf("replay == %q, want %q", reply, expect)
}
}

View File

@@ -0,0 +1,66 @@
package main
import (
"fmt"
"log"
"net/rpc"
"time"
"os/signal"
"os"
"syscall"
)
/*
RPC Server that handles the registering and unregistering of raters.
*/
type RaterServer byte
/*
Listens for SIGTERM, SIGINT, SIGQUIT system signals and shuts down all the registered raters.
*/
func StopSingnalHandler() {
log.Print("Handling stop signals...")
sig := <-signal.Incoming
if usig, ok := sig.(os.UnixSignal); ok {
switch usig {
case syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT:
log.Printf("Caught signal %v, sending shutdownto raters\n", usig)
var reply string
for i,client:= range raterList.clientConnections {
client.Call("Storage.Shutdown", "", &reply)
log.Printf("Shutdown rater %v: %v ", raterList.clientAddresses[i], reply)
}
os.Exit(1)
}
}
}
/*
RPC method that receives a rater address, connects to it and ads the pair to the rater list for balancing
*/
func (rs *RaterServer) RegisterRater(clientAddress string, replay *byte) error {
time.Sleep(1 * time.Second) // wait a second for Rater to start serving
client, err := rpc.Dial("tcp", clientAddress)
if err != nil {
log.Print("Could not connect to client!")
return err
}
raterList.AddClient(clientAddress, client)
log.Print(fmt.Sprintf("Rater %v registered succesfully.", clientAddress))
return nil
}
/*
RPC method that recives a rater addres gets the connections and closes it and removes the pair from rater list.
*/
func (rs *RaterServer) UnRegisterRater(clientAddress string, replay *byte) error {
client, ok := raterList.GetClient(clientAddress)
if ok {
client.Close()
raterList.RemoveClient(clientAddress)
log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress))
} else {
log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress))
}
return nil
}

View File

@@ -0,0 +1,9 @@
package main
type Responder int
func (r *Responder) Get(args string, replay *string) error {
*replay = CallRater(args)
return nil
}