balancer intergarted in rpc delegate

This commit is contained in:
Radu Ioan Fericean
2012-05-27 13:44:42 +03:00
parent 198ade98b4
commit 6b3e22f132
3 changed files with 61 additions and 59 deletions

View File

@@ -1,5 +1,5 @@
/*
Rating system designed to be used in VoIP Carriers World
Rating system designed to be used in VoIP Carriers Wobld
Copyright (C) 2012 Radu Ioan Fericean
This program is free software: you can redistribute it and/or modify
@@ -16,14 +16,14 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package main
package balancer
import (
"net/rpc"
"sync"
)
type RaterList struct {
type Balancer struct {
clientAddresses []string
clientConnections []*rpc.Client
balancerIndex int
@@ -33,26 +33,26 @@ type RaterList struct {
/*
Constructor for RateList holding one slice for addreses and one slice for connections.
*/
func NewRaterList() *RaterList {
r := &RaterList{balancerIndex: 0} // leaving both slices to nil
func NewBalancer() *Balancer {
r := &Balancer{balancerIndex: 0} // leaving both slices to nil
return r
}
/*
Adds a client to the two internal slices.
*/
func (rl *RaterList) AddClient(address string, client *rpc.Client) {
rl.clientAddresses = append(rl.clientAddresses, address)
rl.clientConnections = append(rl.clientConnections, client)
func (bl *Balancer) AddClient(address string, client *rpc.Client) {
bl.clientAddresses = append(bl.clientAddresses, address)
bl.clientConnections = append(bl.clientConnections, client)
return
}
/*
Removes a client from the slices locking the readers and reseting the balancer index.
*/
func (rl *RaterList) RemoveClient(address string) {
func (bl *Balancer) RemoveClient(address string) {
index := -1
for i, v := range rl.clientAddresses {
for i, v := range bl.clientAddresses {
if v == address {
index = i
break
@@ -61,20 +61,20 @@ func (rl *RaterList) RemoveClient(address string) {
if index == -1 {
return
}
rl.mu.RLock()
defer rl.mu.RUnlock()
rl.clientAddresses = append(rl.clientAddresses[:index], rl.clientAddresses[index+1:]...)
rl.clientConnections = append(rl.clientConnections[:index], rl.clientConnections[index+1:]...)
rl.balancerIndex = 0
bl.mu.RLock()
defer bl.mu.RUnlock()
bl.clientAddresses = append(bl.clientAddresses[:index], bl.clientAddresses[index+1:]...)
bl.clientConnections = append(bl.clientConnections[:index], bl.clientConnections[index+1:]...)
bl.balancerIndex = 0
}
/*
Returns a client for the specifed address.
*/
func (rl *RaterList) GetClient(address string) (*rpc.Client, bool) {
for i, v := range rl.clientAddresses {
func (bl *Balancer) GetClient(address string) (*rpc.Client, bool) {
for i, v := range bl.clientAddresses {
if v == address {
return rl.clientConnections[i], true
return bl.clientConnections[i], true
}
}
return nil, false
@@ -83,15 +83,15 @@ func (rl *RaterList) GetClient(address string) (*rpc.Client, bool) {
/*
Returns the next available connection at each call looping at the end of connections.
*/
func (rl *RaterList) Balance() (result *rpc.Client) {
rl.mu.Lock()
defer rl.mu.Unlock()
if rl.balancerIndex >= len(rl.clientAddresses) {
rl.balancerIndex = 0
func (bl *Balancer) Balance() (result *rpc.Client) {
bl.mu.Lock()
defer bl.mu.Unlock()
if bl.balancerIndex >= len(bl.clientAddresses) {
bl.balancerIndex = 0
}
if len(rl.clientAddresses) > 0 {
result = rl.clientConnections[rl.balancerIndex]
rl.balancerIndex++
if len(bl.clientAddresses) > 0 {
result = bl.clientConnections[bl.balancerIndex]
bl.balancerIndex++
}
return

View File

@@ -15,7 +15,7 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package main
package balancer
import (
"net/rpc"
@@ -23,66 +23,66 @@ import (
)
func BenchmarkBalance(b *testing.B) {
raterlist := NewRaterList()
raterlist.AddClient("client 1", new(rpc.Client))
raterlist.AddClient("client 2", new(rpc.Client))
raterlist.AddClient("client 3", new(rpc.Client))
balancer := NewBalancer()
balancer.AddClient("client 1", new(rpc.Client))
balancer.AddClient("client 2", new(rpc.Client))
balancer.AddClient("client 3", new(rpc.Client))
for i := 0; i < b.N; i++ {
raterlist.Balance()
balancer.Balance()
}
}
func TestRemoving(t *testing.T) {
raterlist := NewRaterList()
balancer := NewBalancer()
c1 := new(rpc.Client)
c2 := new(rpc.Client)
c3 := new(rpc.Client)
raterlist.AddClient("client 1", c1)
raterlist.AddClient("client 2", c2)
raterlist.AddClient("client 3", c3)
raterlist.RemoveClient("client 2")
if raterlist.clientConnections[0] != c1 ||
raterlist.clientConnections[1] != c3 ||
len(raterlist.clientConnections) != 2 {
balancer.AddClient("client 1", c1)
balancer.AddClient("client 2", c2)
balancer.AddClient("client 3", c3)
balancer.RemoveClient("client 2")
if balancer.clientConnections[0] != c1 ||
balancer.clientConnections[1] != c3 ||
len(balancer.clientConnections) != 2 {
t.Error("Failed removing rater")
}
}
func TestGet(t *testing.T) {
raterlist := NewRaterList()
balancer := NewBalancer()
c1 := new(rpc.Client)
raterlist.AddClient("client 1", c1)
result, ok := raterlist.GetClient("client 1")
balancer.AddClient("client 1", c1)
result, ok := balancer.GetClient("client 1")
if !ok || c1 != result {
t.Error("Get failed")
}
}
func TestOneBalancer(t *testing.T) {
raterlist := NewRaterList()
raterlist.AddClient("client 1", new(rpc.Client))
c1 := raterlist.Balance()
c2 := raterlist.Balance()
balancer := NewBalancer()
balancer.AddClient("client 1", new(rpc.Client))
c1 := balancer.Balance()
c2 := balancer.Balance()
if c1 != c2 {
t.Error("With only one rater these shoud be equal")
}
}
func Test100Balancer(t *testing.T) {
raterlist := NewRaterList()
balancer := NewBalancer()
var clients []*rpc.Client
for i := 0; i < 100; i++ {
c := new(rpc.Client)
clients = append(clients, c)
raterlist.AddClient("client 1", c)
balancer.AddClient("client 1", c)
}
for i := 0; i < 100; i++ {
c := raterlist.Balance()
c := balancer.Balance()
if c != clients[i] {
t.Error("Balance did not iterate all the available clients")
}
}
c := raterlist.Balance()
c := balancer.Balance()
if c != clients[0] {
t.Error("Balance did not lopped from the begining")
}

View File

@@ -21,7 +21,7 @@ package sessionmanager
import (
"github.com/rif/cgrates/timespans"
"log"
"net/rpc"
"github.com/rif/cgrates/balancer"
"time"
)
@@ -152,11 +152,11 @@ func (dsd *DirectSessionDelegate) GetDebitPeriod() time.Duration {
// Sample SessionDelegate calling the timespans methods through the RPC interface
type RPCSessionDelegate struct {
client *rpc.Client
balancer *balancer.Balancer
}
func NewRPCSessionDelegate(client *rpc.Client) (rpc *RPCSessionDelegate) {
return &RPCSessionDelegate{client}
func NewRPCSessionDelegate(balancer *balancer.Balancer) (rpc *RPCSessionDelegate) {
return &RPCSessionDelegate{balancer}
}
func (rsd *RPCSessionDelegate) OnHeartBeat(ev Event) {
@@ -169,6 +169,7 @@ func (rsd *RPCSessionDelegate) OnChannelAnswer(ev Event, s *Session, sm SessionM
func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) {
lastCC := s.CallCosts[len(s.CallCosts)-1]
client := rsd.balancer.Balance()
// put credit back
start := time.Now()
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
@@ -211,7 +212,7 @@ func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) {
Amount: -cost,
}
var response float64
err := rsd.client.Call("Responder.DebitCents", cd, &response)
err := client.Call("Responder.DebitCents", cd, &response)
if err != nil {
log.Printf("Debit cents failed: %v", err)
}
@@ -224,7 +225,7 @@ func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) {
Amount: -seconds,
}
var response float64
err := rsd.client.Call("Responder.DebitSeconds", cd, &response)
err := client.Call("Responder.DebitSeconds", cd, &response)
if err != nil {
log.Printf("Debit seconds failed: %v", err)
}
@@ -235,7 +236,8 @@ func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) {
func (rsd *RPCSessionDelegate) LoopAction(s *Session, cd *timespans.CallDescriptor) {
cc := &timespans.CallCost{}
err := rsd.client.Call("Responder.Debit", cd, cc)
client := rsd.balancer.Balance()
err := client.Call("Responder.Debit", cd, cc)
if err != nil {
log.Printf("Could not complete debit opperation: %v", err)
}
@@ -243,7 +245,7 @@ func (rsd *RPCSessionDelegate) LoopAction(s *Session, cd *timespans.CallDescript
log.Print(cc)
cd.Amount = DEBIT_PERIOD.Seconds()
var remainingSeconds float64
err = rsd.client.Call("Responder.GetMaxSessionTime", cd, &remainingSeconds)
err = client.Call("Responder.GetMaxSessionTime", cd, &remainingSeconds)
if err != nil {
log.Printf("Could not get max session time: %v", err)
}