From 58c9e16e62ab5486584d3d87b6f3b0686973809b Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 29 May 2012 23:15:09 +0300 Subject: [PATCH] using channel for balancer --- balancer/balancer.go | 42 +++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index 8f991f810..f11806dd1 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -27,7 +27,7 @@ import ( type Balancer struct { clientAddresses []string // we need to hold these two slices because maps fo not keep order clientConnections []*rpc.Client - balancerIndex int + balancerChannel chan *rpc.Client mu sync.RWMutex } @@ -35,7 +35,18 @@ type Balancer struct { Constructor for RateList holding one slice for addreses and one slice for connections. */ func NewBalancer() *Balancer { - r := &Balancer{balancerIndex: 0} // leaving both slices to nil + r := &Balancer{balancerChannel: make(chan *rpc.Client)} // leaving both slices to nil + go func() { + for { + if len(r.clientConnections) > 0 { + for _, c := range r.clientConnections { + r.balancerChannel <- c + } + } else { + r.balancerChannel <- nil + } + } + }() return r } @@ -43,6 +54,8 @@ func NewBalancer() *Balancer { Adds a client to the two internal slices. */ func (bl *Balancer) AddClient(address string, client *rpc.Client) { + bl.mu.Lock() + defer bl.mu.Unlock() bl.clientAddresses = append(bl.clientAddresses, address) bl.clientConnections = append(bl.clientConnections, client) return @@ -62,17 +75,19 @@ func (bl *Balancer) RemoveClient(address string) { if index == -1 { return } - bl.mu.RLock() - defer bl.mu.RUnlock() + bl.mu.Lock() + defer bl.mu.Unlock() bl.clientAddresses = append(bl.clientAddresses[:index], bl.clientAddresses[index+1:]...) bl.clientConnections = append(bl.clientConnections[:index], bl.clientConnections[index+1:]...) - bl.balancerIndex = 0 + <-bl.balancerChannel } /* Returns a client for the specifed address. */ func (bl *Balancer) GetClient(address string) (*rpc.Client, bool) { + bl.mu.RLock() + defer bl.mu.RUnlock() for i, v := range bl.clientAddresses { if v == address { return bl.clientConnections[i], true @@ -85,20 +100,15 @@ func (bl *Balancer) GetClient(address string) (*rpc.Client, bool) { Returns the next available connection at each call looping at the end of connections. */ func (bl *Balancer) Balance() (result *rpc.Client) { - bl.mu.Lock() - defer bl.mu.Unlock() - if bl.balancerIndex >= len(bl.clientAddresses) { - bl.balancerIndex = 0 - } - if len(bl.clientAddresses) > 0 { - result = bl.clientConnections[bl.balancerIndex] - bl.balancerIndex++ - } + bl.mu.RLock() + defer bl.mu.RUnlock() - return + return <-bl.balancerChannel } func (bl *Balancer) Shutdown() { + bl.mu.Lock() + defer bl.mu.Unlock() var reply string for i, client := range bl.clientConnections { client.Call("Responder.Shutdown", "", &reply) @@ -107,5 +117,7 @@ func (bl *Balancer) Shutdown() { } func (bl *Balancer) GetClientAddresses() []string { + bl.mu.RLock() + defer bl.mu.RUnlock() return bl.clientAddresses }