better RPC session manager

This commit is contained in:
Radu Ioan Fericean
2012-05-27 20:46:24 +03:00
parent 6b3e22f132
commit 1edba8d483
8 changed files with 61 additions and 24 deletions

View File

@@ -21,10 +21,11 @@ package balancer
import (
"net/rpc"
"sync"
"log"
)
type Balancer struct {
clientAddresses []string
clientAddresses []string // we need to hold these two slices because maps fo not keep order
clientConnections []*rpc.Client
balancerIndex int
mu sync.RWMutex
@@ -96,3 +97,15 @@ func (bl *Balancer) Balance() (result *rpc.Client) {
return
}
func (bl *Balancer) Shutdown() {
var reply string
for i, client := range bl.clientConnections {
client.Call("Responder.Shutdown", "", &reply)
log.Printf("Shutdown rater %v: %v ", bl.clientAddresses[i], reply)
}
}
func (bl *Balancer) GetClientAddresses() []string {
return bl.clientAddresses
}

View File

@@ -23,6 +23,7 @@ import (
"flag"
"github.com/rif/cgrates/sessionmanager"
"github.com/rif/cgrates/timespans"
"github.com/rif/cgrates/balancer"
"log"
"runtime"
"time"
@@ -34,7 +35,7 @@ var (
httpApiAddress = flag.String("httpapiaddr", "127.0.0.1:8000", "Http API server address (localhost:2002)")
freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port")
freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port")
raterList *RaterList
bal *balancer.Balancer
)
/*
@@ -43,7 +44,7 @@ The function that gets the information from the raters using balancer.
func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans.CallCost) {
err := errors.New("") //not nil value
for err != nil {
client := raterList.Balance()
client := bal.Balance()
if client == nil {
log.Print("Waiting for raters to register...")
time.Sleep(1 * time.Second) // wait one second and retry
@@ -64,7 +65,7 @@ The function that gets the information from the raters using balancer.
func CallMethod(key *timespans.CallDescriptor, method string) (reply float64) {
err := errors.New("") //not nil value
for err != nil {
client := raterList.Balance()
client := bal.Balance()
if client == nil {
log.Print("Waiting for raters to register...")
time.Sleep(1 * time.Second) // wait one second and retry
@@ -81,14 +82,14 @@ func CallMethod(key *timespans.CallDescriptor, method string) (reply float64) {
func main() {
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU() - 1)
raterList = NewRaterList()
bal = balancer.NewBalancer()
go StopSingnalHandler()
go listenToRPCRaterRequests()
go listenToJsonRPCRequests()
sm := &sessionmanager.FSSessionManager{}
sm.Connect(sessionmanager.NewRPCSessionDelegate(), *freeswitchsrv, *freeswitchpass)
sm.Connect(sessionmanager.NewRPCBalancerSessionDelegate(bal), *freeswitchsrv, *freeswitchpass)
listenToHttpRequests()
}

View File

@@ -86,7 +86,7 @@ func (r *Responder) Status(arg timespans.CallDescriptor, replay *string) (err er
memstats := new(runtime.MemStats)
runtime.ReadMemStats(memstats)
*replay = "Connected raters:\n"
for _, rater := range raterList.clientAddresses {
for _, rater := range bal.GetClientAddresses() {
log.Print(rater)
*replay += fmt.Sprintf("%v\n", rater)
}

View File

@@ -49,11 +49,7 @@ func StopSingnalHandler() {
sig := <-c
log.Printf("Caught signal %v, sending shutdownto raters\n", sig)
var reply string
for i, client := range raterList.clientConnections {
client.Call("Responder.Shutdown", "", &reply)
log.Printf("Shutdown rater %v: %v ", raterList.clientAddresses[i], reply)
}
bal.Shutdown()
os.Exit(1)
}
@@ -68,7 +64,7 @@ func (rs *RaterServer) RegisterRater(clientAddress string, replay *byte) error {
log.Print("Could not connect to client!")
return err
}
raterList.AddClient(clientAddress, client)
bal.AddClient(clientAddress, client)
log.Printf("Rater %v registered succesfully.", clientAddress)
return nil
}
@@ -77,10 +73,10 @@ func (rs *RaterServer) RegisterRater(clientAddress string, replay *byte) error {
RPC method that recives a rater addres gets the connections and closes it and removes the pair from rater list.
*/
func (rs *RaterServer) UnRegisterRater(clientAddress string, replay *byte) error {
client, ok := raterList.GetClient(clientAddress)
client, ok := bal.GetClient(clientAddress)
if ok {
client.Close()
raterList.RemoveClient(clientAddress)
bal.RemoveClient(clientAddress)
log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress))
} else {
log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress))

View File

@@ -30,7 +30,7 @@ Handler for the statistics web client
*/
func statusHandler(w http.ResponseWriter, r *http.Request) {
if t, err := template.ParseFiles("templates/status.html"); err == nil {
t.Execute(w, raterList.clientAddresses)
t.Execute(w, bal.GetClientAddresses())
} else {
log.Print("Error rendering status: ", err)
}
@@ -41,7 +41,7 @@ Ajax Handler for the connected raters
*/
func ratersHandler(w http.ResponseWriter, r *http.Request) {
enc := json.NewEncoder(w)
enc.Encode(raterList.clientAddresses)
enc.Encode(bal.GetClientAddresses())
}
/*

View File

@@ -23,9 +23,11 @@ import (
"github.com/rif/cgrates/sessionmanager"
"github.com/rif/cgrates/timespans"
"log"
"net/rpc/jsonrpc"
)
var (
standalone = flag.Bool("standalone", false, "run standalone (run as a rater)")
balancer = flag.String("balancer", "127.0.0.1:2000", "balancer address host:port")
freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port")
freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port")
@@ -41,7 +43,15 @@ func main() {
if err != nil {
log.Fatalf("Cannot open storage: %v", err)
}
sm.Connect(sessionmanager.NewDirectSessionDelegate(getter), *freeswitchsrv, *freeswitchpass)
if *standalone {
sm.Connect(sessionmanager.NewDirectSessionDelegate(getter), *freeswitchsrv, *freeswitchpass)
} else {
client, err := jsonrpc.Dial("tcp", *balancer)
if err != nil {
log.Fatalf("could not connect to balancer: %v", err)
}
sm.Connect(sessionmanager.NewRPCClientSessionDelegate(client), *freeswitchsrv, *freeswitchpass)
}
waitChan := make(<-chan byte)
log.Print("CGRateS is listening!")
<-waitChan

View File

@@ -27,6 +27,7 @@ import (
)
var (
balancer = flag.String("balancer", "localhost:2001", "balancer server address")
runs = flag.Int("runs", 10000, "stress cycle number")
parallel = flag.Bool("parallel", false, "run requests in parallel")
)
@@ -37,7 +38,10 @@ func main() {
t2 := time.Date(2012, time.February, 02, 18, 30, 0, 0, time.UTC)
cd := timespans.CallDescriptor{CstmId: "vdf", Subject: "rif", DestinationPrefix: "0256", TimeStart: t1, TimeEnd: t2}
result := timespans.CallCost{}
client, _ := jsonrpc.Dial("tcp", "localhost:2001")
client, err := jsonrpc.Dial("tcp", *balancer)
if err != nil {
log.Fatalf("could not connect to balancer: %v", err)
}
if *parallel {
var divCall *rpc.Call
for i := 0; i < *runs; i++ {

View File

@@ -22,6 +22,7 @@ import (
"github.com/rif/cgrates/timespans"
"log"
"github.com/rif/cgrates/balancer"
"net/rpc"
"time"
)
@@ -153,23 +154,35 @@ func (dsd *DirectSessionDelegate) GetDebitPeriod() time.Duration {
// Sample SessionDelegate calling the timespans methods through the RPC interface
type RPCSessionDelegate struct {
balancer *balancer.Balancer
client *rpc.Client
}
func NewRPCSessionDelegate(balancer *balancer.Balancer) (rpc *RPCSessionDelegate) {
return &RPCSessionDelegate{balancer}
func NewRPCBalancerSessionDelegate(balancer *balancer.Balancer) (rpc *RPCSessionDelegate) {
return &RPCSessionDelegate{balancer: balancer}
}
func NewRPCClientSessionDelegate(client *rpc.Client) (rpc *RPCSessionDelegate) {
return &RPCSessionDelegate{client: client}
}
func (rsd *RPCSessionDelegate) getClient() *rpc.Client {
if rsd.client == nil {
return rsd.balancer.Balance()
}
return rsd.client
}
func (rsd *RPCSessionDelegate) OnHeartBeat(ev Event) {
log.Print("rpc hearbeat")
}
func (rsd *RPCSessionDelegate) OnChannelAnswer(ev Event, s *Session, sm SessionManager) {
func (rsd *RPCSessionDelegate) OnChannelAnswer(ev Event, s *Session) {
log.Print("rpc answer")
}
func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) {
lastCC := s.CallCosts[len(s.CallCosts)-1]
client := rsd.balancer.Balance()
client := rsd.getClient()
// put credit back
start := time.Now()
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
@@ -236,7 +249,7 @@ func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) {
func (rsd *RPCSessionDelegate) LoopAction(s *Session, cd *timespans.CallDescriptor) {
cc := &timespans.CallCost{}
client := rsd.balancer.Balance()
client := rsd.getClient()
err := client.Call("Responder.Debit", cd, cc)
if err != nil {
log.Printf("Could not complete debit opperation: %v", err)