From 1edba8d483d7df77d775f3052eee45e204720e71 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sun, 27 May 2012 20:46:24 +0300 Subject: [PATCH] better RPC session manager --- balancer/balancer.go | 15 +++++++++++- cmd/cgr-balancer/cgr-balanncer.go | 11 +++++---- cmd/cgr-balancer/jsonrpc_responder.go | 2 +- cmd/cgr-balancer/registration.go | 12 ++++------ cmd/cgr-balancer/status_responder.go | 4 ++-- cmd/cgr-sessionmanager/cgr-sessionmanager.go | 12 +++++++++- .../cgr-balancerstress/cgr-balancerstress.go | 6 ++++- sessionmanager/sessiondelegate.go | 23 +++++++++++++++---- 8 files changed, 61 insertions(+), 24 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index cec2ae0dd..8f991f810 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -21,10 +21,11 @@ package balancer import ( "net/rpc" "sync" + "log" ) type Balancer struct { - clientAddresses []string + clientAddresses []string // we need to hold these two slices because maps fo not keep order clientConnections []*rpc.Client balancerIndex int mu sync.RWMutex @@ -96,3 +97,15 @@ func (bl *Balancer) Balance() (result *rpc.Client) { return } + +func (bl *Balancer) Shutdown() { + var reply string + for i, client := range bl.clientConnections { + client.Call("Responder.Shutdown", "", &reply) + log.Printf("Shutdown rater %v: %v ", bl.clientAddresses[i], reply) + } +} + +func (bl *Balancer) GetClientAddresses() []string { + return bl.clientAddresses +} diff --git a/cmd/cgr-balancer/cgr-balanncer.go b/cmd/cgr-balancer/cgr-balanncer.go index a522ac1c1..4b6784586 100644 --- a/cmd/cgr-balancer/cgr-balanncer.go +++ b/cmd/cgr-balancer/cgr-balanncer.go @@ -23,6 +23,7 @@ import ( "flag" "github.com/rif/cgrates/sessionmanager" "github.com/rif/cgrates/timespans" + "github.com/rif/cgrates/balancer" "log" "runtime" "time" @@ -34,7 +35,7 @@ var ( httpApiAddress = flag.String("httpapiaddr", "127.0.0.1:8000", "Http API server address (localhost:2002)") freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port") freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port") - raterList *RaterList + bal *balancer.Balancer ) /* @@ -43,7 +44,7 @@ The function that gets the information from the raters using balancer. func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans.CallCost) { err := errors.New("") //not nil value for err != nil { - client := raterList.Balance() + client := bal.Balance() if client == nil { log.Print("Waiting for raters to register...") time.Sleep(1 * time.Second) // wait one second and retry @@ -64,7 +65,7 @@ The function that gets the information from the raters using balancer. func CallMethod(key *timespans.CallDescriptor, method string) (reply float64) { err := errors.New("") //not nil value for err != nil { - client := raterList.Balance() + client := bal.Balance() if client == nil { log.Print("Waiting for raters to register...") time.Sleep(1 * time.Second) // wait one second and retry @@ -81,14 +82,14 @@ func CallMethod(key *timespans.CallDescriptor, method string) (reply float64) { func main() { flag.Parse() runtime.GOMAXPROCS(runtime.NumCPU() - 1) - raterList = NewRaterList() + bal = balancer.NewBalancer() go StopSingnalHandler() go listenToRPCRaterRequests() go listenToJsonRPCRequests() sm := &sessionmanager.FSSessionManager{} - sm.Connect(sessionmanager.NewRPCSessionDelegate(), *freeswitchsrv, *freeswitchpass) + sm.Connect(sessionmanager.NewRPCBalancerSessionDelegate(bal), *freeswitchsrv, *freeswitchpass) listenToHttpRequests() } diff --git a/cmd/cgr-balancer/jsonrpc_responder.go b/cmd/cgr-balancer/jsonrpc_responder.go index dc6bc4af0..49225cc1d 100644 --- a/cmd/cgr-balancer/jsonrpc_responder.go +++ b/cmd/cgr-balancer/jsonrpc_responder.go @@ -86,7 +86,7 @@ func (r *Responder) Status(arg timespans.CallDescriptor, replay *string) (err er memstats := new(runtime.MemStats) runtime.ReadMemStats(memstats) *replay = "Connected raters:\n" - for _, rater := range raterList.clientAddresses { + for _, rater := range bal.GetClientAddresses() { log.Print(rater) *replay += fmt.Sprintf("%v\n", rater) } diff --git a/cmd/cgr-balancer/registration.go b/cmd/cgr-balancer/registration.go index a13215bee..0a54b8140 100644 --- a/cmd/cgr-balancer/registration.go +++ b/cmd/cgr-balancer/registration.go @@ -49,11 +49,7 @@ func StopSingnalHandler() { sig := <-c log.Printf("Caught signal %v, sending shutdownto raters\n", sig) - var reply string - for i, client := range raterList.clientConnections { - client.Call("Responder.Shutdown", "", &reply) - log.Printf("Shutdown rater %v: %v ", raterList.clientAddresses[i], reply) - } + bal.Shutdown() os.Exit(1) } @@ -68,7 +64,7 @@ func (rs *RaterServer) RegisterRater(clientAddress string, replay *byte) error { log.Print("Could not connect to client!") return err } - raterList.AddClient(clientAddress, client) + bal.AddClient(clientAddress, client) log.Printf("Rater %v registered succesfully.", clientAddress) return nil } @@ -77,10 +73,10 @@ func (rs *RaterServer) RegisterRater(clientAddress string, replay *byte) error { RPC method that recives a rater addres gets the connections and closes it and removes the pair from rater list. */ func (rs *RaterServer) UnRegisterRater(clientAddress string, replay *byte) error { - client, ok := raterList.GetClient(clientAddress) + client, ok := bal.GetClient(clientAddress) if ok { client.Close() - raterList.RemoveClient(clientAddress) + bal.RemoveClient(clientAddress) log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress)) } else { log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress)) diff --git a/cmd/cgr-balancer/status_responder.go b/cmd/cgr-balancer/status_responder.go index ebfa22165..c9e15cfb5 100644 --- a/cmd/cgr-balancer/status_responder.go +++ b/cmd/cgr-balancer/status_responder.go @@ -30,7 +30,7 @@ Handler for the statistics web client */ func statusHandler(w http.ResponseWriter, r *http.Request) { if t, err := template.ParseFiles("templates/status.html"); err == nil { - t.Execute(w, raterList.clientAddresses) + t.Execute(w, bal.GetClientAddresses()) } else { log.Print("Error rendering status: ", err) } @@ -41,7 +41,7 @@ Ajax Handler for the connected raters */ func ratersHandler(w http.ResponseWriter, r *http.Request) { enc := json.NewEncoder(w) - enc.Encode(raterList.clientAddresses) + enc.Encode(bal.GetClientAddresses()) } /* diff --git a/cmd/cgr-sessionmanager/cgr-sessionmanager.go b/cmd/cgr-sessionmanager/cgr-sessionmanager.go index 63fc541f2..80a2e664e 100644 --- a/cmd/cgr-sessionmanager/cgr-sessionmanager.go +++ b/cmd/cgr-sessionmanager/cgr-sessionmanager.go @@ -23,9 +23,11 @@ import ( "github.com/rif/cgrates/sessionmanager" "github.com/rif/cgrates/timespans" "log" + "net/rpc/jsonrpc" ) var ( + standalone = flag.Bool("standalone", false, "run standalone (run as a rater)") balancer = flag.String("balancer", "127.0.0.1:2000", "balancer address host:port") freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port") freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port") @@ -41,7 +43,15 @@ func main() { if err != nil { log.Fatalf("Cannot open storage: %v", err) } - sm.Connect(sessionmanager.NewDirectSessionDelegate(getter), *freeswitchsrv, *freeswitchpass) + if *standalone { + sm.Connect(sessionmanager.NewDirectSessionDelegate(getter), *freeswitchsrv, *freeswitchpass) + } else { + client, err := jsonrpc.Dial("tcp", *balancer) + if err != nil { + log.Fatalf("could not connect to balancer: %v", err) + } + sm.Connect(sessionmanager.NewRPCClientSessionDelegate(client), *freeswitchsrv, *freeswitchpass) + } waitChan := make(<-chan byte) log.Print("CGRateS is listening!") <-waitChan diff --git a/cmd/stress/cgr-balancerstress/cgr-balancerstress.go b/cmd/stress/cgr-balancerstress/cgr-balancerstress.go index 50f654b1d..c86e7ed7e 100644 --- a/cmd/stress/cgr-balancerstress/cgr-balancerstress.go +++ b/cmd/stress/cgr-balancerstress/cgr-balancerstress.go @@ -27,6 +27,7 @@ import ( ) var ( + balancer = flag.String("balancer", "localhost:2001", "balancer server address") runs = flag.Int("runs", 10000, "stress cycle number") parallel = flag.Bool("parallel", false, "run requests in parallel") ) @@ -37,7 +38,10 @@ func main() { t2 := time.Date(2012, time.February, 02, 18, 30, 0, 0, time.UTC) cd := timespans.CallDescriptor{CstmId: "vdf", Subject: "rif", DestinationPrefix: "0256", TimeStart: t1, TimeEnd: t2} result := timespans.CallCost{} - client, _ := jsonrpc.Dial("tcp", "localhost:2001") + client, err := jsonrpc.Dial("tcp", *balancer) + if err != nil { + log.Fatalf("could not connect to balancer: %v", err) + } if *parallel { var divCall *rpc.Call for i := 0; i < *runs; i++ { diff --git a/sessionmanager/sessiondelegate.go b/sessionmanager/sessiondelegate.go index 2e33b8182..aac6fb3f8 100644 --- a/sessionmanager/sessiondelegate.go +++ b/sessionmanager/sessiondelegate.go @@ -22,6 +22,7 @@ import ( "github.com/rif/cgrates/timespans" "log" "github.com/rif/cgrates/balancer" + "net/rpc" "time" ) @@ -153,23 +154,35 @@ func (dsd *DirectSessionDelegate) GetDebitPeriod() time.Duration { // Sample SessionDelegate calling the timespans methods through the RPC interface type RPCSessionDelegate struct { balancer *balancer.Balancer + client *rpc.Client } -func NewRPCSessionDelegate(balancer *balancer.Balancer) (rpc *RPCSessionDelegate) { - return &RPCSessionDelegate{balancer} +func NewRPCBalancerSessionDelegate(balancer *balancer.Balancer) (rpc *RPCSessionDelegate) { + return &RPCSessionDelegate{balancer: balancer} +} + +func NewRPCClientSessionDelegate(client *rpc.Client) (rpc *RPCSessionDelegate) { + return &RPCSessionDelegate{client: client} +} + +func (rsd *RPCSessionDelegate) getClient() *rpc.Client { + if rsd.client == nil { + return rsd.balancer.Balance() + } + return rsd.client } func (rsd *RPCSessionDelegate) OnHeartBeat(ev Event) { log.Print("rpc hearbeat") } -func (rsd *RPCSessionDelegate) OnChannelAnswer(ev Event, s *Session, sm SessionManager) { +func (rsd *RPCSessionDelegate) OnChannelAnswer(ev Event, s *Session) { log.Print("rpc answer") } func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { lastCC := s.CallCosts[len(s.CallCosts)-1] - client := rsd.balancer.Balance() + client := rsd.getClient() // put credit back start := time.Now() end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd @@ -236,7 +249,7 @@ func (rsd *RPCSessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { func (rsd *RPCSessionDelegate) LoopAction(s *Session, cd *timespans.CallDescriptor) { cc := ×pans.CallCost{} - client := rsd.balancer.Balance() + client := rsd.getClient() err := client.Call("Responder.Debit", cd, cc) if err != nil { log.Printf("Could not complete debit opperation: %v", err)