uniformizing json and gob encodings for RPC

This commit is contained in:
Radu Ioan Fericean
2012-03-26 23:28:01 +03:00
parent 2b421ec5eb
commit 595a631310
6 changed files with 71 additions and 69 deletions

View File

@@ -45,7 +45,7 @@ func GetCost(key *timespans.CallDescriptor) (reply *timespans.CallCost) {
time.Sleep(1 * time.Second) // wait one second and retry
} else {
reply = &timespans.CallCost{}
err = client.Call("Storage.GetCost", *key, reply)
err = client.Call("Responder.GetCost", *key, reply)
if err != nil {
log.Printf("Got en error from rater: %v", err)
}

View File

@@ -63,7 +63,7 @@ func debitBalanceHandler(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount}
result := CallMethod(arg, "Storage.DebitCents")
result := CallMethod(arg, "Responder.DebitCents")
enc.Encode(result)
}
@@ -83,7 +83,7 @@ func debitSMSHandler(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount}
result := CallMethod(arg, "Storage.DebitSMS")
result := CallMethod(arg, "Responder.DebitSMS")
enc.Encode(result)
}
@@ -103,7 +103,7 @@ func debitSecondsHandler(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount}
result := CallMethod(arg, "Storage.DebitSeconds")
result := CallMethod(arg, "Responder.DebitSeconds")
enc.Encode(result)
}
@@ -123,7 +123,7 @@ func getMaxSessionTimeHandler(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount}
result := CallMethod(arg, "Storage.GetMaxSessionTime")
result := CallMethod(arg, "Responder.GetMaxSessionTime")
enc.Encode(result)
}
@@ -143,7 +143,7 @@ func addVolumeDiscountSeconds(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount}
result := CallMethod(arg, "Storage.AddVolumeDiscountSeconds")
result := CallMethod(arg, "Responder.AddVolumeDiscountSeconds")
enc.Encode(result)
}
@@ -161,7 +161,7 @@ func resetVolumeDiscountSeconds(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0]}
result := CallMethod(arg, "Storage.ResetVolumeDiscountSeconds")
result := CallMethod(arg, "Responder.ResetVolumeDiscountSeconds")
enc.Encode(result)
}
@@ -181,7 +181,7 @@ func addRecievedCallSeconds(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0], Amount: amount}
result := CallMethod(arg, "Storage.AddRecievedCallSeconds")
result := CallMethod(arg, "Responder.AddRecievedCallSeconds")
enc.Encode(result)
}
@@ -199,7 +199,7 @@ func resetUserBudget(w http.ResponseWriter, r *http.Request) {
return
}
arg := &timespans.CallDescriptor{CstmId: cstmid[0], Subject: subj[0], DestinationPrefix: dest[0]}
result := CallMethod(arg, "Storage.ResetUserBudget")
result := CallMethod(arg, "Responder.ResetUserBudget")
enc.Encode(result)
}

View File

@@ -36,42 +36,42 @@ func (r *Responder) GetCost(arg timespans.CallDescriptor, replay *timespans.Call
}
func (r *Responder) DebitBalance(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Storage.DebitCents")
*replay = CallMethod(&arg, "Responder.DebitCents")
return
}
func (r *Responder) DebitSMS(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Storage.DebitSMS")
*replay = CallMethod(&arg, "Responder.DebitSMS")
return
}
func (r *Responder) DebitSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Storage.DebitSeconds")
*replay = CallMethod(&arg, "Responder.DebitSeconds")
return
}
func (r *Responder) GetMaxSessionTime(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Storage.GetMaxSessionTime")
*replay = CallMethod(&arg, "Responder.GetMaxSessionTime")
return
}
func (r *Responder) AddVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Storage.AddVolumeDiscountSeconds")
*replay = CallMethod(&arg, "Responder.AddVolumeDiscountSeconds")
return
}
func (r *Responder) ResetVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Storage.ResetVolumeDiscountSeconds")
*replay = CallMethod(&arg, "Responder.ResetVolumeDiscountSeconds")
return
}
func (r *Responder) AddRecievedCallSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Storage.AddRecievedCallSeconds")
*replay = CallMethod(&arg, "Responder.AddRecievedCallSeconds")
return
}
func (r *Responder) ResetUserBudget(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay = CallMethod(&arg, "Storage.ResetUserBudget")
*replay = CallMethod(&arg, "Responder.ResetUserBudget")
return
}

View File

@@ -20,7 +20,6 @@ package main
import (
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"os"
@@ -35,36 +34,9 @@ RPC Server that handles the registering and unregistering of raters.
type RaterServer byte
func listenToRPCRaterRequests() {
/*rpc.Register(new(RaterServer))
rpc.HandleHTTP()
http.ListenAndServe(*raterAddress, nil)*/
rpc.Register(new(RaterServer))
rpc.HandleHTTP()
l, e := net.Listen("tcp", *raterAddress)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
/*log.Print("Starting Server...")
l, err := net.Listen("tcp", *raterAddress)
defer l.Close()
if err != nil {
log.Fatal(err)
}
log.Print("listening on: ", l.Addr())
rpc.Register(new(RaterServer))
for {
log.Print("waiting for connections ...")
conn, err := l.Accept()
if err != nil {
log.Printf("accept error: %s", conn)
continue
}
log.Printf("connection started: %v", conn.RemoteAddr())
go rpc.ServeConn(conn)
}*/
http.ListenAndServe(*raterAddress, nil)
}
/*
@@ -89,14 +61,15 @@ func StopSingnalHandler() {
RPC method that receives a rater address, connects to it and ads the pair to the rater list for balancing
*/
func (rs *RaterServer) RegisterRater(clientAddress string, replay *byte) error {
time.Sleep(1 * time.Second) // wait a second for Rater to start serving
log.Printf("Started rater %v registration...", clientAddress)
time.Sleep(2 * time.Second) // wait a second for Rater to start serving
client, err := rpc.Dial("tcp", clientAddress)
if err != nil {
log.Print("Could not connect to client!")
return err
}
raterList.AddClient(clientAddress, client)
log.Print(fmt.Sprintf("Rater %v registered succesfully.", clientAddress))
log.Printf("Rater %v registered succesfully.", clientAddress)
return nil
}

View File

@@ -23,27 +23,29 @@ import (
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"os"
)
var (
balancer = flag.String("balancer", "127.0.0.1:2000", "balancer address host:port")
listen = flag.String("listen", "127.0.0.1:1234", "listening address host:port")
storage Storage
json = flag.Bool("json", false, "use json for rpc encoding")
storage Responder
)
type Storage struct {
type Responder struct {
sg timespans.StorageGetter
}
func NewStorage(nsg timespans.StorageGetter) *Storage {
return &Storage{sg: nsg}
func NewStorage(nsg timespans.StorageGetter) *Responder {
return &Responder{sg: nsg}
}
/*
RPC method providing the rating information from the storage.
*/
func (s *Storage) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) {
func (s *Responder) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) {
descriptor := &cd
descriptor.SetStorageGetter(s.sg)
r, e := descriptor.GetCost()
@@ -51,7 +53,7 @@ func (s *Storage) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost
return err
}
func (s *Storage) DebitCents(cd timespans.CallDescriptor, reply *float64) (err error) {
func (s *Responder) DebitCents(cd timespans.CallDescriptor, reply *float64) (err error) {
descriptor := &cd
descriptor.SetStorageGetter(s.sg)
r, e := descriptor.DebitCents()
@@ -59,7 +61,7 @@ func (s *Storage) DebitCents(cd timespans.CallDescriptor, reply *float64) (err e
return err
}
func (s *Storage) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err error) {
func (s *Responder) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err error) {
descriptor := &cd
descriptor.SetStorageGetter(s.sg)
r, e := descriptor.DebitSMS()
@@ -67,7 +69,7 @@ func (s *Storage) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err err
return err
}
func (s *Storage) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
func (s *Responder) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
descriptor := &cd
descriptor.SetStorageGetter(s.sg)
e := descriptor.DebitSeconds()
@@ -75,7 +77,7 @@ func (s *Storage) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err
return err
}
func (s *Storage) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64) (err error) {
func (s *Responder) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64) (err error) {
descriptor := &cd
descriptor.SetStorageGetter(s.sg)
r, e := descriptor.GetMaxSessionTime()
@@ -83,7 +85,7 @@ func (s *Storage) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64)
return err
}
func (s *Storage) AddVolumeDiscountSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
func (s *Responder) AddVolumeDiscountSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
descriptor := &cd
descriptor.SetStorageGetter(s.sg)
e := descriptor.AddVolumeDiscountSeconds()
@@ -91,7 +93,7 @@ func (s *Storage) AddVolumeDiscountSeconds(cd timespans.CallDescriptor, reply *f
return err
}
func (s *Storage) ResetVolumeDiscountSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
func (s *Responder) ResetVolumeDiscountSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
descriptor := &cd
descriptor.SetStorageGetter(s.sg)
e := descriptor.ResetVolumeDiscountSeconds()
@@ -99,7 +101,7 @@ func (s *Storage) ResetVolumeDiscountSeconds(cd timespans.CallDescriptor, reply
return err
}
func (s *Storage) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
func (s *Responder) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
descriptor := &cd
descriptor.SetStorageGetter(s.sg)
e := descriptor.AddRecievedCallSeconds()
@@ -107,7 +109,7 @@ func (s *Storage) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *flo
return err
}
func (s *Storage) ResetUserBudget(cd timespans.CallDescriptor, reply *float64) (err error) {
func (s *Responder) ResetUserBudget(cd timespans.CallDescriptor, reply *float64) (err error) {
descriptor := &cd
descriptor.SetStorageGetter(s.sg)
e := descriptor.ResetUserBudget()
@@ -118,7 +120,7 @@ func (s *Storage) ResetUserBudget(cd timespans.CallDescriptor, reply *float64) (
/*
RPC method that triggers rater shutdown in case of balancer exit.
*/
func (s *Storage) Shutdown(args string, reply *string) (err error) {
func (s *Responder) Shutdown(args string, reply *string) (err error) {
s.sg.Close()
defer os.Exit(0)
*reply = "Done!"
@@ -134,16 +136,43 @@ func main() {
log.Printf("Cannot open storage file: %v", err)
os.Exit(1)
}
storage := NewStorage(getter)
rpc.Register(storage)
if !*json {
go RegisterToServer(balancer, listen)
go StopSingnalHandler(balancer, listen, getter)
}
/*rpc.Register(NewStorage(getter))
rpc.HandleHTTP()
go RegisterToServer(balancer, listen)
go StopSingnalHandler(balancer, listen, getter)
addr, err1 := net.ResolveTCPAddr("tcp", *listen)
l, err2 := net.ListenTCP("tcp", addr)
if err1 != nil || err2 != nil {
log.Print("cannot create listener for specified address ", *listen)
os.Exit(1)
}
rpc.Accept(l)
rpc.Accept(l)*/
log.Print("Starting Server...")
l, err := net.Listen("tcp", *listen)
defer l.Close()
if err != nil {
log.Fatal(err)
}
log.Print("listening on: ", l.Addr())
rpc.Register(NewStorage(getter))
for {
log.Print("waiting for connections ...")
conn, err := l.Accept()
if err != nil {
log.Printf("accept error: %s", conn)
continue
}
log.Printf("connection started: %v", conn.RemoteAddr())
if *json {
log.Print("json encoding")
go jsonrpc.ServeConn(conn)
} else {
log.Print("gob encoding")
go rpc.ServeConn(conn)
}
}
}

View File

@@ -47,11 +47,11 @@ cd = {"Tor":0, "CstmId": "vdf", "Subject": "rif", "DestinationPrefix": "0256", "
# alternative to the above
s = socket.create_connection(("127.0.0.1", 2001))
s.sendall(json.dumps(({"id": 1, "method": "Responder.Get", "params": [cd]})))
s.sendall(json.dumps(({"id": 1, "method": "Responder.GetCost", "params": [cd]})))
print s.recv(4096)
i = 0
result = ""
for i in xrange(int(1e5) + 1):
result = rpc.call("Responder.Get", cd)
result = rpc.call("Responder.GetCost", cd)
print i, result