new balancer passes tests

This commit is contained in:
Radu Ioan Fericean
2012-05-29 23:28:42 +03:00
parent 58c9e16e62
commit 1da37912ca
2 changed files with 29 additions and 38 deletions

View File

@@ -25,21 +25,20 @@ import (
)
type Balancer struct {
clientAddresses []string // we need to hold these two slices because maps fo not keep order
clientConnections []*rpc.Client
balancerChannel chan *rpc.Client
mu sync.RWMutex
clients map[string]*rpc.Client
balancerChannel chan *rpc.Client
mu sync.RWMutex
}
/*
Constructor for RateList holding one slice for addreses and one slice for connections.
*/
func NewBalancer() *Balancer {
r := &Balancer{balancerChannel: make(chan *rpc.Client)} // leaving both slices to nil
r := &Balancer{clients: make(map[string]*rpc.Client), balancerChannel: make(chan *rpc.Client)} // leaving both slices to nil
go func() {
for {
if len(r.clientConnections) > 0 {
for _, c := range r.clientConnections {
if len(r.clients) > 0 {
for _, c := range r.clients {
r.balancerChannel <- c
}
} else {
@@ -56,8 +55,7 @@ Adds a client to the two internal slices.
func (bl *Balancer) AddClient(address string, client *rpc.Client) {
bl.mu.Lock()
defer bl.mu.Unlock()
bl.clientAddresses = append(bl.clientAddresses, address)
bl.clientConnections = append(bl.clientConnections, client)
bl.clients[address] = client
return
}
@@ -65,35 +63,20 @@ func (bl *Balancer) AddClient(address string, client *rpc.Client) {
Removes a client from the slices locking the readers and reseting the balancer index.
*/
func (bl *Balancer) RemoveClient(address string) {
index := -1
for i, v := range bl.clientAddresses {
if v == address {
index = i
break
}
}
if index == -1 {
return
}
bl.mu.Lock()
defer bl.mu.Unlock()
bl.clientAddresses = append(bl.clientAddresses[:index], bl.clientAddresses[index+1:]...)
bl.clientConnections = append(bl.clientConnections[:index], bl.clientConnections[index+1:]...)
delete(bl.clients, address)
<-bl.balancerChannel
}
/*
Returns a client for the specifed address.
*/
func (bl *Balancer) GetClient(address string) (*rpc.Client, bool) {
func (bl *Balancer) GetClient(address string) (c *rpc.Client, exists bool) {
bl.mu.RLock()
defer bl.mu.RUnlock()
for i, v := range bl.clientAddresses {
if v == address {
return bl.clientConnections[i], true
}
}
return nil, false
c, exists = bl.clients[address]
return
}
/*
@@ -110,14 +93,18 @@ func (bl *Balancer) Shutdown() {
bl.mu.Lock()
defer bl.mu.Unlock()
var reply string
for i, client := range bl.clientConnections {
for address, client := range bl.clients {
client.Call("Responder.Shutdown", "", &reply)
log.Printf("Shutdown rater %v: %v ", bl.clientAddresses[i], reply)
log.Printf("Shutdown rater %v: %v ", address, reply)
}
}
func (bl *Balancer) GetClientAddresses() []string {
bl.mu.RLock()
defer bl.mu.RUnlock()
return bl.clientAddresses
var addresses []string
for a, _ := range bl.clients {
addresses = append(addresses, a)
}
return addresses
}

View File

@@ -20,6 +20,7 @@ package balancer
import (
"net/rpc"
"testing"
"fmt"
)
func BenchmarkBalance(b *testing.B) {
@@ -41,9 +42,9 @@ func TestRemoving(t *testing.T) {
balancer.AddClient("client 2", c2)
balancer.AddClient("client 3", c3)
balancer.RemoveClient("client 2")
if balancer.clientConnections[0] != c1 ||
balancer.clientConnections[1] != c3 ||
len(balancer.clientConnections) != 2 {
if balancer.clients["client 1"] != c1 ||
balancer.clients["client 3"] != c3 ||
len(balancer.clients) != 2 {
t.Error("Failed removing rater")
}
}
@@ -73,14 +74,17 @@ func Test100Balancer(t *testing.T) {
var clients []*rpc.Client
for i := 0; i < 100; i++ {
c := new(rpc.Client)
clients = append(clients, c)
balancer.AddClient("client 1", c)
balancer.AddClient(fmt.Sprintf("client%v", i), c)
}
for i := 0; i < 100; i++ {
c := balancer.Balance()
if c != clients[i] {
t.Error("Balance did not iterate all the available clients")
for _, o := range clients {
if c == o {
t.Error("Balance did not iterate all the available clients")
break
}
}
clients = append(clients, c)
}
c := balancer.Balance()
if c != clients[0] {