From 4875228926e5ddee2807fa73db0571a3a4af15cb Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sat, 28 Jan 2012 18:54:48 +0200 Subject: [PATCH] added tests and method comments --- .gitignore | 5 --- cmd/inquirer/balancer.go | 80 +++++++++++++++++++++++++++++++++++ cmd/inquirer/balancer_test.go | 74 ++++++++++++++++++++++++++++++++ cmd/inquirer/inquirer.go | 2 +- cmd/inquirer/inquirer_test.go | 31 ++++++++++++++ cmd/inquirer/registration.go | 66 +++++++++++++++++++++++++++++ cmd/inquirer/responder.go | 9 ++++ 7 files changed, 261 insertions(+), 6 deletions(-) create mode 100644 cmd/inquirer/balancer.go create mode 100644 cmd/inquirer/balancer_test.go create mode 100644 cmd/inquirer/inquirer_test.go create mode 100644 cmd/inquirer/registration.go create mode 100644 cmd/inquirer/responder.go diff --git a/.gitignore b/.gitignore index e1249f85b..bd414288a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,8 +3,3 @@ a.out *.6 *.pyc .settings -inquirer -rater -kyoto_loader -redis_loader -test \ No newline at end of file diff --git a/cmd/inquirer/balancer.go b/cmd/inquirer/balancer.go new file mode 100644 index 000000000..562ff6cfb --- /dev/null +++ b/cmd/inquirer/balancer.go @@ -0,0 +1,80 @@ +package main + +import ( + "net/rpc" + "sync" +) + +type RaterList struct { + clientAddresses []string + clientConnections []*rpc.Client + balancerIndex int + mu sync.RWMutex +} + +/* +Constructor for RateList holding one slice for addreses and one slice for connections. +*/ +func NewRaterList() *RaterList { + r:= &RaterList{balancerIndex: 0} // leaving both slices to nil + return r +} + +/* +Adds a client to the two internal slices. +*/ +func (rl *RaterList) AddClient(address string, client *rpc.Client){ + rl.clientAddresses = append(rl.clientAddresses, address) + rl.clientConnections = append(rl.clientConnections, client) + return +} + +/* +Removes a client from the slices locking the readers and reseting the balancer index. +*/ +func (rl *RaterList) RemoveClient(address string){ + index := -1 + for i, v := range rl.clientAddresses { + if v == address { + index = i + break + } + } + if index == -1 { + return + } + rl.mu.RLock() + defer rl.mu.RUnlock() + rl.clientAddresses = append(rl.clientAddresses[:index], rl.clientAddresses[index+1:]...) + rl.clientConnections = append(rl.clientConnections[:index], rl.clientConnections[index+1:]...) + rl.balancerIndex = 0 +} + +/* +Returns a client for the specifed address. +*/ +func (rl *RaterList) GetClient(address string) (*rpc.Client, bool){ + for i, v := range rl.clientAddresses { + if v == address { + return rl.clientConnections[i], true + } + } + return nil, false +} + +/* +Returns the next available connection at each call looping at the end of connections. +*/ +func (rl *RaterList) Balance() (result *rpc.Client) { + rl.mu.Lock() + defer rl.mu.Unlock() + if rl.balancerIndex >= len(rl.clientAddresses) { + rl.balancerIndex = 0 + } + if len(rl.clientAddresses) > 0 { + result = rl.clientConnections[rl.balancerIndex] + rl.balancerIndex++ + } + + return +} diff --git a/cmd/inquirer/balancer_test.go b/cmd/inquirer/balancer_test.go new file mode 100644 index 000000000..84bd007ed --- /dev/null +++ b/cmd/inquirer/balancer_test.go @@ -0,0 +1,74 @@ +package main + +import ( + "testing" + "net/rpc" +) + +func BenchmarkBalance(b *testing.B) { + raterlist:= NewRaterList() + raterlist.AddClient("client 1", new(rpc.Client)) + raterlist.AddClient("client 2", new(rpc.Client)) + raterlist.AddClient("client 3", new(rpc.Client)) + for i := 0; i < b.N; i++ { + raterlist.Balance() + } +} + + +func TestRemoving(t *testing.T) { + raterlist:= NewRaterList() + c1:= new(rpc.Client) + c2:= new(rpc.Client) + c3:= new(rpc.Client) + raterlist.AddClient("client 1", c1) + raterlist.AddClient("client 2", c2) + raterlist.AddClient("client 3", c3) + raterlist.RemoveClient("client 2") + if raterlist.clientConnections[0] != c1 || + raterlist.clientConnections[1] != c3 || + len(raterlist.clientConnections) != 2 { + t.Error("Failed removing rater") + } +} + +func TestGet(t *testing.T) { + raterlist:= NewRaterList() + c1:= new(rpc.Client) + raterlist.AddClient("client 1", c1) + result, ok:= raterlist.GetClient("client 1") + if !ok || c1 != result { + t.Error("Get failed") + } +} + +func TestOneBalancer(t *testing.T) { + raterlist:= NewRaterList() + raterlist.AddClient("client 1", new(rpc.Client)) + c1:= raterlist.Balance() + c2:= raterlist.Balance() + if c1 != c2 { + t.Error("With only one rater these shoud be equal") + } +} + +func Test100Balancer(t *testing.T) { + raterlist:= NewRaterList() + var clients []*rpc.Client + for i:=0; i< 100; i++ { + c:= new(rpc.Client) + clients = append(clients, c) + raterlist.AddClient("client 1", c) + } + for i:=0; i< 100; i++ { + c:=raterlist.Balance() + if c != clients[i] { + t.Error("Balance did not iterate all the available clients") + } + } + c:=raterlist.Balance() + if c != clients[0] { + t.Error("Balance did not lopped from the begining") + } + +} diff --git a/cmd/inquirer/inquirer.go b/cmd/inquirer/inquirer.go index 09e24acf4..87e62f643 100644 --- a/cmd/inquirer/inquirer.go +++ b/cmd/inquirer/inquirer.go @@ -25,7 +25,7 @@ func CallRater(key string) (reply string) { client:= raterList.Balance() if client == nil { log.Print("Waiting for raters to register...") - time.Sleep(1 * time.Second) // white one second and retry + time.Sleep(1 * time.Second) // wait one second and retry } else { err = client.Call("Storage.Get", key, &reply) if err != nil { diff --git a/cmd/inquirer/inquirer_test.go b/cmd/inquirer/inquirer_test.go new file mode 100644 index 000000000..ab19482d8 --- /dev/null +++ b/cmd/inquirer/inquirer_test.go @@ -0,0 +1,31 @@ +package main + +import ( + "net/rpc" + "testing" +) + +func BenchmarkRPCGet(b *testing.B) { + b.StopTimer() + client, _ := rpc.DialHTTPPath("tcp", "localhost:2000", "/rpc") + b.StartTimer() + var reply string + for i := 0; i < b.N; i++ { + client.Call("Responder.Get", "test", &reply) + } +} + +func TestRPCGet(t *testing.T) { + client, err := rpc.DialHTTPPath("tcp", "localhost:2000", "/rpc") + if err != nil { + t.Error("Inquirer server not started!") + t.FailNow() + } + var reply string + client.Call("Responder.Get", "test", &reply) + const expect = "12223" + if reply != expect { + t.Errorf("replay == %q, want %q", reply, expect) + } +} + diff --git a/cmd/inquirer/registration.go b/cmd/inquirer/registration.go new file mode 100644 index 000000000..e10a1dd60 --- /dev/null +++ b/cmd/inquirer/registration.go @@ -0,0 +1,66 @@ +package main + +import ( + "fmt" + "log" + "net/rpc" + "time" + "os/signal" + "os" + "syscall" +) + +/* +RPC Server that handles the registering and unregistering of raters. +*/ +type RaterServer byte + +/* +Listens for SIGTERM, SIGINT, SIGQUIT system signals and shuts down all the registered raters. +*/ +func StopSingnalHandler() { + log.Print("Handling stop signals...") + sig := <-signal.Incoming + if usig, ok := sig.(os.UnixSignal); ok { + switch usig { + case syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT: + log.Printf("Caught signal %v, sending shutdownto raters\n", usig) + var reply string + for i,client:= range raterList.clientConnections { + client.Call("Storage.Shutdown", "", &reply) + log.Printf("Shutdown rater %v: %v ", raterList.clientAddresses[i], reply) + } + os.Exit(1) + } + } +} + +/* +RPC method that receives a rater address, connects to it and ads the pair to the rater list for balancing +*/ +func (rs *RaterServer) RegisterRater(clientAddress string, replay *byte) error { + time.Sleep(1 * time.Second) // wait a second for Rater to start serving + client, err := rpc.Dial("tcp", clientAddress) + if err != nil { + log.Print("Could not connect to client!") + return err + } + raterList.AddClient(clientAddress, client) + log.Print(fmt.Sprintf("Rater %v registered succesfully.", clientAddress)) + return nil +} + +/* +RPC method that recives a rater addres gets the connections and closes it and removes the pair from rater list. +*/ +func (rs *RaterServer) UnRegisterRater(clientAddress string, replay *byte) error { + client, ok := raterList.GetClient(clientAddress) + if ok { + client.Close() + raterList.RemoveClient(clientAddress) + log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress)) + } else { + log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress)) + } + return nil +} diff --git a/cmd/inquirer/responder.go b/cmd/inquirer/responder.go new file mode 100644 index 000000000..eef1f6899 --- /dev/null +++ b/cmd/inquirer/responder.go @@ -0,0 +1,9 @@ +package main + +type Responder int + +func (r *Responder) Get(args string, replay *string) error { + *replay = CallRater(args) + return nil +} +