From 6b3e22f132d31c64cd9df4379daf4da3c088908b Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sun, 27 May 2012 13:44:42 +0300 Subject: [PATCH] balancer intergarted in rpc delegate --- .../raterlist.go => balancer/balancer.go | 52 +++++++++---------- .../balancer_test.go | 50 +++++++++--------- sessionmanager/sessiondelegate.go | 18 ++++--- 3 files changed, 61 insertions(+), 59 deletions(-) rename cmd/cgr-balancer/raterlist.go => balancer/balancer.go (56%) rename cmd/cgr-balancer/raterlist_test.go => balancer/balancer_test.go (63%) diff --git a/cmd/cgr-balancer/raterlist.go b/balancer/balancer.go similarity index 56% rename from cmd/cgr-balancer/raterlist.go rename to balancer/balancer.go index 54092e8c3..cec2ae0dd 100644 --- a/cmd/cgr-balancer/raterlist.go +++ b/balancer/balancer.go @@ -1,5 +1,5 @@ /* -Rating system designed to be used in VoIP Carriers World +Rating system designed to be used in VoIP Carriers Wobld Copyright (C) 2012 Radu Ioan Fericean This program is free software: you can redistribute it and/or modify @@ -16,14 +16,14 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package main +package balancer import ( "net/rpc" "sync" ) -type RaterList struct { +type Balancer struct { clientAddresses []string clientConnections []*rpc.Client balancerIndex int @@ -33,26 +33,26 @@ type RaterList struct { /* Constructor for RateList holding one slice for addreses and one slice for connections. */ -func NewRaterList() *RaterList { - r := &RaterList{balancerIndex: 0} // leaving both slices to nil +func NewBalancer() *Balancer { + r := &Balancer{balancerIndex: 0} // leaving both slices to nil return r } /* Adds a client to the two internal slices. */ -func (rl *RaterList) AddClient(address string, client *rpc.Client) { - rl.clientAddresses = append(rl.clientAddresses, address) - rl.clientConnections = append(rl.clientConnections, client) +func (bl *Balancer) AddClient(address string, client *rpc.Client) { + bl.clientAddresses = append(bl.clientAddresses, address) + bl.clientConnections = append(bl.clientConnections, client) return } /* Removes a client from the slices locking the readers and reseting the balancer index. */ -func (rl *RaterList) RemoveClient(address string) { +func (bl *Balancer) RemoveClient(address string) { index := -1 - for i, v := range rl.clientAddresses { + for i, v := range bl.clientAddresses { if v == address { index = i break @@ -61,20 +61,20 @@ func (rl *RaterList) RemoveClient(address string) { if index == -1 { return } - rl.mu.RLock() - defer rl.mu.RUnlock() - rl.clientAddresses = append(rl.clientAddresses[:index], rl.clientAddresses[index+1:]...) - rl.clientConnections = append(rl.clientConnections[:index], rl.clientConnections[index+1:]...) - rl.balancerIndex = 0 + bl.mu.RLock() + defer bl.mu.RUnlock() + bl.clientAddresses = append(bl.clientAddresses[:index], bl.clientAddresses[index+1:]...) + bl.clientConnections = append(bl.clientConnections[:index], bl.clientConnections[index+1:]...) + bl.balancerIndex = 0 } /* Returns a client for the specifed address. */ -func (rl *RaterList) GetClient(address string) (*rpc.Client, bool) { - for i, v := range rl.clientAddresses { +func (bl *Balancer) GetClient(address string) (*rpc.Client, bool) { + for i, v := range bl.clientAddresses { if v == address { - return rl.clientConnections[i], true + return bl.clientConnections[i], true } } return nil, false @@ -83,15 +83,15 @@ func (rl *RaterList) GetClient(address string) (*rpc.Client, bool) { /* Returns the next available connection at each call looping at the end of connections. */ -func (rl *RaterList) Balance() (result *rpc.Client) { - rl.mu.Lock() - defer rl.mu.Unlock() - if rl.balancerIndex >= len(rl.clientAddresses) { - rl.balancerIndex = 0 +func (bl *Balancer) Balance() (result *rpc.Client) { + bl.mu.Lock() + defer bl.mu.Unlock() + if bl.balancerIndex >= len(bl.clientAddresses) { + bl.balancerIndex = 0 } - if len(rl.clientAddresses) > 0 { - result = rl.clientConnections[rl.balancerIndex] - rl.balancerIndex++ + if len(bl.clientAddresses) > 0 { + result = bl.clientConnections[bl.balancerIndex] + bl.balancerIndex++ } return diff --git a/cmd/cgr-balancer/raterlist_test.go b/balancer/balancer_test.go similarity index 63% rename from cmd/cgr-balancer/raterlist_test.go rename to balancer/balancer_test.go index 98ab9556d..a74470c6d 100644 --- a/cmd/cgr-balancer/raterlist_test.go +++ b/balancer/balancer_test.go @@ -15,7 +15,7 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ -package main +package balancer import ( "net/rpc" @@ -23,66 +23,66 @@ import ( ) func BenchmarkBalance(b *testing.B) { - raterlist := NewRaterList() - raterlist.AddClient("client 1", new(rpc.Client)) - raterlist.AddClient("client 2", new(rpc.Client)) - raterlist.AddClient("client 3", new(rpc.Client)) + balancer := NewBalancer() + balancer.AddClient("client 1", new(rpc.Client)) + balancer.AddClient("client 2", new(rpc.Client)) + balancer.AddClient("client 3", new(rpc.Client)) for i := 0; i < b.N; i++ { - raterlist.Balance() + balancer.Balance() } } func TestRemoving(t *testing.T) { - raterlist := NewRaterList() + balancer := NewBalancer() c1 := new(rpc.Client) c2 := new(rpc.Client) c3 := new(rpc.Client) - raterlist.AddClient("client 1", c1) - raterlist.AddClient("client 2", c2) - raterlist.AddClient("client 3", c3) - raterlist.RemoveClient("client 2") - if raterlist.clientConnections[0] != c1 || - raterlist.clientConnections[1] != c3 || - len(raterlist.clientConnections) != 2 { + balancer.AddClient("client 1", c1) + balancer.AddClient("client 2", c2) + balancer.AddClient("client 3", c3) + balancer.RemoveClient("client 2") + if balancer.clientConnections[0] != c1 || + balancer.clientConnections[1] != c3 || + len(balancer.clientConnections) != 2 { t.Error("Failed removing rater") } } func TestGet(t *testing.T) { - raterlist := NewRaterList() + balancer := NewBalancer() c1 := new(rpc.Client) - raterlist.AddClient("client 1", c1) - result, ok := raterlist.GetClient("client 1") + balancer.AddClient("client 1", c1) + result, ok := balancer.GetClient("client 1") if !ok || c1 != result { t.Error("Get failed") } } func TestOneBalancer(t *testing.T) { - raterlist := NewRaterList() - raterlist.AddClient("client 1", new(rpc.Client)) - c1 := raterlist.Balance() - c2 := raterlist.Balance() + balancer := NewBalancer() + balancer.AddClient("client 1", new(rpc.Client)) + c1 := balancer.Balance() + c2 := balancer.Balance() if c1 != c2 { t.Error("With only one rater these shoud be equal") } } func Test100Balancer(t *testing.T) { - raterlist := NewRaterList() + balancer := NewBalancer() var clients []*rpc.Client for i := 0; i < 100; i++ { c := new(rpc.Client) clients = append(clients, c) - raterlist.AddClient("client 1", c) + balancer.AddClient("client 1", c) } for i := 0; i < 100; i++ { - c := raterlist.Balance() + c := balancer.Balance() if c != clients[i] { t.Error("Balance did not iterate all the available clients") } } - c := raterlist.Balance() + c := balancer.Balance() if c != clients[0] { t.Error("Balance did not lopped from the begining") } diff --git a/sessionmanager/sessiondelegate.go b/sessionmanager/sessiondelegate.go index 4ce24363a..2e33b8182 100644 --- a/sessionmanager/sessiondelegate.go +++ b/sessionmanager/sessiondelegate.go @@ -21,7 +21,7 @@ package sessionmanager import ( "github.com/rif/cgrates/timespans" "log" - "net/rpc" + "github.com/rif/cgrates/balancer" "time" ) @@ -152,11 +152,11 @@ func (dsd *DirectSessionDelegate) GetDebitPeriod() time.Duration { // Sample SessionDelegate calling the timespans methods through the RPC interface type RPCSessionDelegate struct { - client *rpc.Client + balancer *balancer.Balancer } -func NewRPCSessionDelegate(client *rpc.Client) (rpc *RPCSessionDelegate) { - return &RPCSessionDelegate{client} +func NewRPCSessionDelegate(balancer *balancer.Balancer) (rpc *RPCSessionDelegate) { + return &RPCSessionDelegate{balancer} } func (rsd *RPCSessionDelegate) OnHeartBeat(ev Event) { @@ -169,6 +169,7 @@ func (rsd *RPCSessionDelegate) OnChannelAnswer(ev Event, s *Session, sm SessionM func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { lastCC := s.CallCosts[len(s.CallCosts)-1] + client := rsd.balancer.Balance() // put credit back start := time.Now() end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd @@ -211,7 +212,7 @@ func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { Amount: -cost, } var response float64 - err := rsd.client.Call("Responder.DebitCents", cd, &response) + err := client.Call("Responder.DebitCents", cd, &response) if err != nil { log.Printf("Debit cents failed: %v", err) } @@ -224,7 +225,7 @@ func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { Amount: -seconds, } var response float64 - err := rsd.client.Call("Responder.DebitSeconds", cd, &response) + err := client.Call("Responder.DebitSeconds", cd, &response) if err != nil { log.Printf("Debit seconds failed: %v", err) } @@ -235,7 +236,8 @@ func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { func (rsd *RPCSessionDelegate) LoopAction(s *Session, cd *timespans.CallDescriptor) { cc := ×pans.CallCost{} - err := rsd.client.Call("Responder.Debit", cd, cc) + client := rsd.balancer.Balance() + err := client.Call("Responder.Debit", cd, cc) if err != nil { log.Printf("Could not complete debit opperation: %v", err) } @@ -243,7 +245,7 @@ func (rsd *RPCSessionDelegate) LoopAction(s *Session, cd *timespans.CallDescript log.Print(cc) cd.Amount = DEBIT_PERIOD.Seconds() var remainingSeconds float64 - err = rsd.client.Call("Responder.GetMaxSessionTime", cd, &remainingSeconds) + err = client.Call("Responder.GetMaxSessionTime", cd, &remainingSeconds) if err != nil { log.Printf("Could not get max session time: %v", err) }