diff --git a/cmd/balancer/balancer.go b/cmd/balancer/balancer.go index 161ab11dc..21712a890 100644 --- a/cmd/balancer/balancer.go +++ b/cmd/balancer/balancer.go @@ -45,7 +45,7 @@ func GetCost(key *timespans.CallDescriptor) (reply *timespans.CallCost) { time.Sleep(1 * time.Second) // wait one second and retry } else { reply = ×pans.CallCost{} - err = client.Call("Storage.GetCost", *key, reply) + err = client.Call("Responder.GetCost", *key, reply) if err != nil { log.Printf("Got en error from rater: %v", err) } diff --git a/cmd/balancer/http_responder.go b/cmd/balancer/http_responder.go index 868846086..8390072db 100644 --- a/cmd/balancer/http_responder.go +++ b/cmd/balancer/http_responder.go @@ -63,7 +63,7 @@ func debitBalanceHandler(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount} - result := CallMethod(arg, "Storage.DebitCents") + result := CallMethod(arg, "Responder.DebitCents") enc.Encode(result) } @@ -83,7 +83,7 @@ func debitSMSHandler(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount} - result := CallMethod(arg, "Storage.DebitSMS") + result := CallMethod(arg, "Responder.DebitSMS") enc.Encode(result) } @@ -103,7 +103,7 @@ func debitSecondsHandler(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount} - result := CallMethod(arg, "Storage.DebitSeconds") + result := CallMethod(arg, "Responder.DebitSeconds") enc.Encode(result) } @@ -123,7 +123,7 @@ func getMaxSessionTimeHandler(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount} - result := CallMethod(arg, "Storage.GetMaxSessionTime") + result := CallMethod(arg, "Responder.GetMaxSessionTime") enc.Encode(result) } @@ -143,7 +143,7 @@ func addVolumeDiscountSeconds(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount} - result := CallMethod(arg, "Storage.AddVolumeDiscountSeconds") + result := CallMethod(arg, "Responder.AddVolumeDiscountSeconds") enc.Encode(result) } @@ -161,7 +161,7 @@ func resetVolumeDiscountSeconds(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0]} - result := CallMethod(arg, "Storage.ResetVolumeDiscountSeconds") + result := CallMethod(arg, "Responder.ResetVolumeDiscountSeconds") enc.Encode(result) } @@ -181,7 +181,7 @@ func addRecievedCallSeconds(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount} - result := CallMethod(arg, "Storage.AddRecievedCallSeconds") + result := CallMethod(arg, "Responder.AddRecievedCallSeconds") enc.Encode(result) } @@ -199,7 +199,7 @@ func resetUserBudget(w http.ResponseWriter, r *http.Request) { return } arg := ×pans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0]} - result := CallMethod(arg, "Storage.ResetUserBudget") + result := CallMethod(arg, "Responder.ResetUserBudget") enc.Encode(result) } diff --git a/cmd/balancer/jsonrpc_responder.go b/cmd/balancer/jsonrpc_responder.go index ab252df09..4633dbc11 100644 --- a/cmd/balancer/jsonrpc_responder.go +++ b/cmd/balancer/jsonrpc_responder.go @@ -36,42 +36,42 @@ func (r *Responder) GetCost(arg timespans.CallDescriptor, replay *timespans.Call } func (r *Responder) DebitBalance(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Storage.DebitCents") + *replay = CallMethod(&arg, "Responder.DebitCents") return } func (r *Responder) DebitSMS(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Storage.DebitSMS") + *replay = CallMethod(&arg, "Responder.DebitSMS") return } func (r *Responder) DebitSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Storage.DebitSeconds") + *replay = CallMethod(&arg, "Responder.DebitSeconds") return } func (r *Responder) GetMaxSessionTime(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Storage.GetMaxSessionTime") + *replay = CallMethod(&arg, "Responder.GetMaxSessionTime") return } func (r *Responder) AddVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Storage.AddVolumeDiscountSeconds") + *replay = CallMethod(&arg, "Responder.AddVolumeDiscountSeconds") return } func (r *Responder) ResetVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Storage.ResetVolumeDiscountSeconds") + *replay = CallMethod(&arg, "Responder.ResetVolumeDiscountSeconds") return } func (r *Responder) AddRecievedCallSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Storage.AddRecievedCallSeconds") + *replay = CallMethod(&arg, "Responder.AddRecievedCallSeconds") return } func (r *Responder) ResetUserBudget(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Storage.ResetUserBudget") + *replay = CallMethod(&arg, "Responder.ResetUserBudget") return } diff --git a/cmd/balancer/registration.go b/cmd/balancer/registration.go index ffccb31e5..9e4d2c789 100644 --- a/cmd/balancer/registration.go +++ b/cmd/balancer/registration.go @@ -20,7 +20,6 @@ package main import ( "fmt" "log" - "net" "net/http" "net/rpc" "os" @@ -35,36 +34,9 @@ RPC Server that handles the registering and unregistering of raters. type RaterServer byte func listenToRPCRaterRequests() { - /*rpc.Register(new(RaterServer)) - rpc.HandleHTTP() - http.ListenAndServe(*raterAddress, nil)*/ - rpc.Register(new(RaterServer)) rpc.HandleHTTP() - l, e := net.Listen("tcp", *raterAddress) - if e != nil { - log.Fatal("listen error:", e) - } - go http.Serve(l, nil) - - /*log.Print("Starting Server...") - l, err := net.Listen("tcp", *raterAddress) - defer l.Close() - if err != nil { - log.Fatal(err) - } - log.Print("listening on: ", l.Addr()) - rpc.Register(new(RaterServer)) - for { - log.Print("waiting for connections ...") - conn, err := l.Accept() - if err != nil { - log.Printf("accept error: %s", conn) - continue - } - log.Printf("connection started: %v", conn.RemoteAddr()) - go rpc.ServeConn(conn) - }*/ + http.ListenAndServe(*raterAddress, nil) } /* @@ -89,14 +61,15 @@ func StopSingnalHandler() { RPC method that receives a rater address, connects to it and ads the pair to the rater list for balancing */ func (rs *RaterServer) RegisterRater(clientAddress string, replay *byte) error { - time.Sleep(1 * time.Second) // wait a second for Rater to start serving + log.Printf("Started rater %v registration...", clientAddress) + time.Sleep(2 * time.Second) // wait a second for Rater to start serving client, err := rpc.Dial("tcp", clientAddress) if err != nil { log.Print("Could not connect to client!") return err } raterList.AddClient(clientAddress, client) - log.Print(fmt.Sprintf("Rater %v registered succesfully.", clientAddress)) + log.Printf("Rater %v registered succesfully.", clientAddress) return nil } diff --git a/cmd/rater/rater.go b/cmd/rater/rater.go index 29f90d3b2..82ba6e074 100644 --- a/cmd/rater/rater.go +++ b/cmd/rater/rater.go @@ -23,27 +23,29 @@ import ( "log" "net" "net/rpc" + "net/rpc/jsonrpc" "os" ) var ( balancer = flag.String("balancer", "127.0.0.1:2000", "balancer address host:port") listen = flag.String("listen", "127.0.0.1:1234", "listening address host:port") - storage Storage + json = flag.Bool("json", false, "use json for rpc encoding") + storage Responder ) -type Storage struct { +type Responder struct { sg timespans.StorageGetter } -func NewStorage(nsg timespans.StorageGetter) *Storage { - return &Storage{sg: nsg} +func NewStorage(nsg timespans.StorageGetter) *Responder { + return &Responder{sg: nsg} } /* RPC method providing the rating information from the storage. */ -func (s *Storage) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) { +func (s *Responder) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) { descriptor := &cd descriptor.SetStorageGetter(s.sg) r, e := descriptor.GetCost() @@ -51,7 +53,7 @@ func (s *Storage) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost return err } -func (s *Storage) DebitCents(cd timespans.CallDescriptor, reply *float64) (err error) { +func (s *Responder) DebitCents(cd timespans.CallDescriptor, reply *float64) (err error) { descriptor := &cd descriptor.SetStorageGetter(s.sg) r, e := descriptor.DebitCents() @@ -59,7 +61,7 @@ func (s *Storage) DebitCents(cd timespans.CallDescriptor, reply *float64) (err e return err } -func (s *Storage) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err error) { +func (s *Responder) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err error) { descriptor := &cd descriptor.SetStorageGetter(s.sg) r, e := descriptor.DebitSMS() @@ -67,7 +69,7 @@ func (s *Storage) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err err return err } -func (s *Storage) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { +func (s *Responder) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { descriptor := &cd descriptor.SetStorageGetter(s.sg) e := descriptor.DebitSeconds() @@ -75,7 +77,7 @@ func (s *Storage) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err return err } -func (s *Storage) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64) (err error) { +func (s *Responder) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64) (err error) { descriptor := &cd descriptor.SetStorageGetter(s.sg) r, e := descriptor.GetMaxSessionTime() @@ -83,7 +85,7 @@ func (s *Storage) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64) return err } -func (s *Storage) AddVolumeDiscountSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { +func (s *Responder) AddVolumeDiscountSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { descriptor := &cd descriptor.SetStorageGetter(s.sg) e := descriptor.AddVolumeDiscountSeconds() @@ -91,7 +93,7 @@ func (s *Storage) AddVolumeDiscountSeconds(cd timespans.CallDescriptor, reply *f return err } -func (s *Storage) ResetVolumeDiscountSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { +func (s *Responder) ResetVolumeDiscountSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { descriptor := &cd descriptor.SetStorageGetter(s.sg) e := descriptor.ResetVolumeDiscountSeconds() @@ -99,7 +101,7 @@ func (s *Storage) ResetVolumeDiscountSeconds(cd timespans.CallDescriptor, reply return err } -func (s *Storage) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { +func (s *Responder) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { descriptor := &cd descriptor.SetStorageGetter(s.sg) e := descriptor.AddRecievedCallSeconds() @@ -107,7 +109,7 @@ func (s *Storage) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *flo return err } -func (s *Storage) ResetUserBudget(cd timespans.CallDescriptor, reply *float64) (err error) { +func (s *Responder) ResetUserBudget(cd timespans.CallDescriptor, reply *float64) (err error) { descriptor := &cd descriptor.SetStorageGetter(s.sg) e := descriptor.ResetUserBudget() @@ -118,7 +120,7 @@ func (s *Storage) ResetUserBudget(cd timespans.CallDescriptor, reply *float64) ( /* RPC method that triggers rater shutdown in case of balancer exit. */ -func (s *Storage) Shutdown(args string, reply *string) (err error) { +func (s *Responder) Shutdown(args string, reply *string) (err error) { s.sg.Close() defer os.Exit(0) *reply = "Done!" @@ -134,16 +136,43 @@ func main() { log.Printf("Cannot open storage file: %v", err) os.Exit(1) } - storage := NewStorage(getter) - rpc.Register(storage) + if !*json { + go RegisterToServer(balancer, listen) + go StopSingnalHandler(balancer, listen, getter) + } + /*rpc.Register(NewStorage(getter)) rpc.HandleHTTP() - go RegisterToServer(balancer, listen) - go StopSingnalHandler(balancer, listen, getter) addr, err1 := net.ResolveTCPAddr("tcp", *listen) l, err2 := net.ListenTCP("tcp", addr) if err1 != nil || err2 != nil { log.Print("cannot create listener for specified address ", *listen) os.Exit(1) } - rpc.Accept(l) + rpc.Accept(l)*/ + + log.Print("Starting Server...") + l, err := net.Listen("tcp", *listen) + defer l.Close() + if err != nil { + log.Fatal(err) + } + log.Print("listening on: ", l.Addr()) + rpc.Register(NewStorage(getter)) + for { + log.Print("waiting for connections ...") + conn, err := l.Accept() + if err != nil { + log.Printf("accept error: %s", conn) + continue + } + log.Printf("connection started: %v", conn.RemoteAddr()) + if *json { + log.Print("json encoding") + go jsonrpc.ServeConn(conn) + } else { + log.Print("gob encoding") + go rpc.ServeConn(conn) + } + + } } diff --git a/cmd/stress/balancerstress/balancerstress.py b/cmd/stress/balancerstress/balancerstress.py index af482fe99..de7f329a8 100644 --- a/cmd/stress/balancerstress/balancerstress.py +++ b/cmd/stress/balancerstress/balancerstress.py @@ -47,11 +47,11 @@ cd = {"Tor":0, "CstmId": "vdf", "Subject": "rif", "DestinationPrefix": "0256", " # alternative to the above s = socket.create_connection(("127.0.0.1", 2001)) -s.sendall(json.dumps(({"id": 1, "method": "Responder.Get", "params": [cd]}))) +s.sendall(json.dumps(({"id": 1, "method": "Responder.GetCost", "params": [cd]}))) print s.recv(4096) i = 0 result = "" for i in xrange(int(1e5) + 1): - result = rpc.call("Responder.Get", cd) + result = rpc.call("Responder.GetCost", cd) print i, result