using channel for balancer

This commit is contained in:
Radu Ioan Fericean
2012-05-29 23:15:09 +03:00
parent 9d5046566c
commit 58c9e16e62

View File

@@ -27,7 +27,7 @@ import (
type Balancer struct {
clientAddresses []string // we need to hold these two slices because maps fo not keep order
clientConnections []*rpc.Client
balancerIndex int
balancerChannel chan *rpc.Client
mu sync.RWMutex
}
@@ -35,7 +35,18 @@ type Balancer struct {
Constructor for RateList holding one slice for addreses and one slice for connections.
*/
func NewBalancer() *Balancer {
r := &Balancer{balancerIndex: 0} // leaving both slices to nil
r := &Balancer{balancerChannel: make(chan *rpc.Client)} // leaving both slices to nil
go func() {
for {
if len(r.clientConnections) > 0 {
for _, c := range r.clientConnections {
r.balancerChannel <- c
}
} else {
r.balancerChannel <- nil
}
}
}()
return r
}
@@ -43,6 +54,8 @@ func NewBalancer() *Balancer {
Adds a client to the two internal slices.
*/
func (bl *Balancer) AddClient(address string, client *rpc.Client) {
bl.mu.Lock()
defer bl.mu.Unlock()
bl.clientAddresses = append(bl.clientAddresses, address)
bl.clientConnections = append(bl.clientConnections, client)
return
@@ -62,17 +75,19 @@ func (bl *Balancer) RemoveClient(address string) {
if index == -1 {
return
}
bl.mu.RLock()
defer bl.mu.RUnlock()
bl.mu.Lock()
defer bl.mu.Unlock()
bl.clientAddresses = append(bl.clientAddresses[:index], bl.clientAddresses[index+1:]...)
bl.clientConnections = append(bl.clientConnections[:index], bl.clientConnections[index+1:]...)
bl.balancerIndex = 0
<-bl.balancerChannel
}
/*
Returns a client for the specifed address.
*/
func (bl *Balancer) GetClient(address string) (*rpc.Client, bool) {
bl.mu.RLock()
defer bl.mu.RUnlock()
for i, v := range bl.clientAddresses {
if v == address {
return bl.clientConnections[i], true
@@ -85,20 +100,15 @@ func (bl *Balancer) GetClient(address string) (*rpc.Client, bool) {
Returns the next available connection at each call looping at the end of connections.
*/
func (bl *Balancer) Balance() (result *rpc.Client) {
bl.mu.Lock()
defer bl.mu.Unlock()
if bl.balancerIndex >= len(bl.clientAddresses) {
bl.balancerIndex = 0
}
if len(bl.clientAddresses) > 0 {
result = bl.clientConnections[bl.balancerIndex]
bl.balancerIndex++
}
bl.mu.RLock()
defer bl.mu.RUnlock()
return
return <-bl.balancerChannel
}
func (bl *Balancer) Shutdown() {
bl.mu.Lock()
defer bl.mu.Unlock()
var reply string
for i, client := range bl.clientConnections {
client.Call("Responder.Shutdown", "", &reply)
@@ -107,5 +117,7 @@ func (bl *Balancer) Shutdown() {
}
func (bl *Balancer) GetClientAddresses() []string {
bl.mu.RLock()
defer bl.mu.RUnlock()
return bl.clientAddresses
}