diff --git a/cmd/inquirer/balancer.go b/cmd/inquirer/balancer.go index 562ff6cfb..33bb8e052 100644 --- a/cmd/inquirer/balancer.go +++ b/cmd/inquirer/balancer.go @@ -6,24 +6,24 @@ import ( ) type RaterList struct { - clientAddresses []string + clientAddresses []string clientConnections []*rpc.Client - balancerIndex int - mu sync.RWMutex + balancerIndex int + mu sync.RWMutex } /* Constructor for RateList holding one slice for addreses and one slice for connections. */ func NewRaterList() *RaterList { - r:= &RaterList{balancerIndex: 0} // leaving both slices to nil + r := &RaterList{balancerIndex: 0} // leaving both slices to nil return r } /* Adds a client to the two internal slices. */ -func (rl *RaterList) AddClient(address string, client *rpc.Client){ +func (rl *RaterList) AddClient(address string, client *rpc.Client) { rl.clientAddresses = append(rl.clientAddresses, address) rl.clientConnections = append(rl.clientConnections, client) return @@ -32,7 +32,7 @@ func (rl *RaterList) AddClient(address string, client *rpc.Client){ /* Removes a client from the slices locking the readers and reseting the balancer index. */ -func (rl *RaterList) RemoveClient(address string){ +func (rl *RaterList) RemoveClient(address string) { index := -1 for i, v := range rl.clientAddresses { if v == address { @@ -53,7 +53,7 @@ func (rl *RaterList) RemoveClient(address string){ /* Returns a client for the specifed address. */ -func (rl *RaterList) GetClient(address string) (*rpc.Client, bool){ +func (rl *RaterList) GetClient(address string) (*rpc.Client, bool) { for i, v := range rl.clientAddresses { if v == address { return rl.clientConnections[i], true @@ -74,7 +74,7 @@ func (rl *RaterList) Balance() (result *rpc.Client) { if len(rl.clientAddresses) > 0 { result = rl.clientConnections[rl.balancerIndex] rl.balancerIndex++ - } - - return + } + + return } diff --git a/cmd/inquirer/balancer_test.go b/cmd/inquirer/balancer_test.go index 84bd007ed..c8b4f5f32 100644 --- a/cmd/inquirer/balancer_test.go +++ b/cmd/inquirer/balancer_test.go @@ -1,12 +1,12 @@ package main import ( - "testing" "net/rpc" + "testing" ) func BenchmarkBalance(b *testing.B) { - raterlist:= NewRaterList() + raterlist := NewRaterList() raterlist.AddClient("client 1", new(rpc.Client)) raterlist.AddClient("client 2", new(rpc.Client)) raterlist.AddClient("client 3", new(rpc.Client)) @@ -15,12 +15,11 @@ func BenchmarkBalance(b *testing.B) { } } - func TestRemoving(t *testing.T) { - raterlist:= NewRaterList() - c1:= new(rpc.Client) - c2:= new(rpc.Client) - c3:= new(rpc.Client) + raterlist := NewRaterList() + c1 := new(rpc.Client) + c2 := new(rpc.Client) + c3 := new(rpc.Client) raterlist.AddClient("client 1", c1) raterlist.AddClient("client 2", c2) raterlist.AddClient("client 3", c3) @@ -33,42 +32,42 @@ func TestRemoving(t *testing.T) { } func TestGet(t *testing.T) { - raterlist:= NewRaterList() - c1:= new(rpc.Client) + raterlist := NewRaterList() + c1 := new(rpc.Client) raterlist.AddClient("client 1", c1) - result, ok:= raterlist.GetClient("client 1") + result, ok := raterlist.GetClient("client 1") if !ok || c1 != result { t.Error("Get failed") } } func TestOneBalancer(t *testing.T) { - raterlist:= NewRaterList() + raterlist := NewRaterList() raterlist.AddClient("client 1", new(rpc.Client)) - c1:= raterlist.Balance() - c2:= raterlist.Balance() + c1 := raterlist.Balance() + c2 := raterlist.Balance() if c1 != c2 { t.Error("With only one rater these shoud be equal") } } func Test100Balancer(t *testing.T) { - raterlist:= NewRaterList() + raterlist := NewRaterList() var clients []*rpc.Client - for i:=0; i< 100; i++ { - c:= new(rpc.Client) + for i := 0; i < 100; i++ { + c := new(rpc.Client) clients = append(clients, c) raterlist.AddClient("client 1", c) } - for i:=0; i< 100; i++ { - c:=raterlist.Balance() + for i := 0; i < 100; i++ { + c := raterlist.Balance() if c != clients[i] { t.Error("Balance did not iterate all the available clients") } } - c:=raterlist.Balance() + c := raterlist.Balance() if c != clients[0] { t.Error("Balance did not lopped from the begining") } - + } diff --git a/cmd/inquirer/inquirer.go b/cmd/inquirer/inquirer.go index e04a4b913..6e4fa258c 100644 --- a/cmd/inquirer/inquirer.go +++ b/cmd/inquirer/inquirer.go @@ -1,31 +1,29 @@ package main import ( + "errors" "fmt" "log" "net" "net/http" "net/rpc" "net/rpc/jsonrpc" - "errors" - "time" "runtime" "sync" + "time" + "github.com/rif/cgrates/timespans" ) - var ( - nCPU = runtime.NumCPU() - raterList *RaterList - inChannels []chan string - outChannels []chan string - multiplexerIndex int - mu sync.Mutex - sem = make(chan int, nCPU) + nCPU = runtime.NumCPU() + raterList *RaterList + inChannels []chan *timespans.CallDescriptor + outChannels []chan *timespans.CallCost + multiplexerIndex int + mu sync.Mutex + sem = make(chan int, nCPU) ) - - /* Handler for the statistics web client */ @@ -41,33 +39,33 @@ func handler(w http.ResponseWriter, r *http.Request) { /* Creates a gorutine for every cpu core and the multiplexses the calls to each of them. */ -func initThreadedCallRater(){ +func initThreadedCallRater() { multiplexerIndex = 0 runtime.GOMAXPROCS(nCPU) - inChannels = make([]chan string, nCPU) - outChannels = make([]chan string, nCPU) - for i:= 0; i< nCPU; i++ { - inChannels[i] = make(chan string) - outChannels[i] = make(chan string) - go func(in, out chan string){ + inChannels = make([]chan *timespans.CallDescriptor, nCPU) + outChannels = make([]chan *timespans.CallCost, nCPU) + for i := 0; i < nCPU; i++ { + inChannels[i] = make(chan *timespans.CallDescriptor) + outChannels[i] = make(chan *timespans.CallCost) + go func(in chan *timespans.CallDescriptor, out chan *timespans.CallCost) { for { - key := <- in + key := <-in out <- CallRater(key) } }(inChannels[i], outChannels[i]) - } + } } /* -*/ -func ThreadedCallRater(key string) (replay string) { + */ +func ThreadedCallRater(key *timespans.CallDescriptor) (reply *timespans.CallCost) { mu.Lock() defer mu.Unlock() if multiplexerIndex >= nCPU { multiplexerIndex = 0 } inChannels[multiplexerIndex] <- key - replay = <- outChannels[multiplexerIndex] + reply = <-outChannels[multiplexerIndex] multiplexerIndex++ return } @@ -75,21 +73,22 @@ func ThreadedCallRater(key string) (replay string) { /* The function that gets the information from the raters using balancer. */ -func CallRater(key string) (reply string) { +func CallRater(key *timespans.CallDescriptor) (reply *timespans.CallCost) { err := errors.New("") //not nil value for err != nil { - client:= raterList.Balance() + client := raterList.Balance() if client == nil { log.Print("Waiting for raters to register...") time.Sleep(1 * time.Second) // wait one second and retry } else { - err = client.Call("Storage.Get", key, &reply) + reply = ×pans.CallCost{} + err = client.Call("Storage.GetCost", *key, reply) if err != nil { log.Printf("Got en error from rater: %v", err) } - } + } } - return + return } func listenToTheWorld() { @@ -106,7 +105,7 @@ func listenToTheWorld() { rpc.Register(responder) for { - c, err := l.Accept() + c, err := l.Accept() if err != nil { log.Printf("accept error: %s", c) continue @@ -122,11 +121,11 @@ func main() { raterServer := new(RaterServer) rpc.Register(raterServer) rpc.HandleHTTP() - - go StopSingnalHandler() + + go StopSingnalHandler() go listenToTheWorld() //initThreadedCallRater() - http.HandleFunc("/", handler) + http.HandleFunc("/", handler) log.Print("The server is listening...") http.ListenAndServe(":2000", nil) } diff --git a/cmd/inquirer/inquirer_test.go b/cmd/inquirer/inquirer_test.go index ab19482d8..b9f8fa220 100644 --- a/cmd/inquirer/inquirer_test.go +++ b/cmd/inquirer/inquirer_test.go @@ -28,4 +28,3 @@ func TestRPCGet(t *testing.T) { t.Errorf("replay == %q, want %q", reply, expect) } } - diff --git a/cmd/inquirer/registration.go b/cmd/inquirer/registration.go index e10a1dd60..dca223448 100644 --- a/cmd/inquirer/registration.go +++ b/cmd/inquirer/registration.go @@ -4,10 +4,10 @@ import ( "fmt" "log" "net/rpc" - "time" - "os/signal" "os" + "exp/signal" "syscall" + "time" ) /* @@ -26,7 +26,7 @@ func StopSingnalHandler() { case syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT: log.Printf("Caught signal %v, sending shutdownto raters\n", usig) var reply string - for i,client:= range raterList.clientConnections { + for i, client := range raterList.clientConnections { client.Call("Storage.Shutdown", "", &reply) log.Printf("Shutdown rater %v: %v ", raterList.clientAddresses[i], reply) } @@ -53,14 +53,14 @@ func (rs *RaterServer) RegisterRater(clientAddress string, replay *byte) error { /* RPC method that recives a rater addres gets the connections and closes it and removes the pair from rater list. */ -func (rs *RaterServer) UnRegisterRater(clientAddress string, replay *byte) error { +func (rs *RaterServer) UnRegisterRater(clientAddress string, replay *byte) error { client, ok := raterList.GetClient(clientAddress) if ok { - client.Close() + client.Close() raterList.RemoveClient(clientAddress) - log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress)) + log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress)) } else { - log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress)) + log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress)) } return nil } diff --git a/cmd/inquirer/responder.go b/cmd/inquirer/responder.go index 1eb8dbd9c..2b7c6aff0 100644 --- a/cmd/inquirer/responder.go +++ b/cmd/inquirer/responder.go @@ -1,12 +1,15 @@ package main +import ( + "github.com/rif/cgrates/timespans" +) + type Responder byte /* RPC method thet provides the external RPC interface for getting the rating information. */ -func (r *Responder) Get(args string, replay *string) error { - *replay = CallRater(args) +func (r *Responder) Get(arg timespans.CallDescriptor, replay *timespans.CallCost) error { + *replay = *CallRater(&arg) return nil } - diff --git a/cmd/rater/rater.go b/cmd/rater/rater.go index 63dbef9ad..ebcc86554 100644 --- a/cmd/rater/rater.go +++ b/cmd/rater/rater.go @@ -2,16 +2,16 @@ package main import ( "flag" + "github.com/rif/cgrates/timespans" "log" "net" "net/rpc" "os" - "github.com/rif/cgrates/timespans" ) var ( - server = flag.String("server", "127.0.0.1:2000", "target host:port") - listen = flag.String("listen", "127.0.0.1:1234", "target host:port") + server = flag.String("server", "127.0.0.1:2000", "target host:port") + listen = flag.String("listen", "127.0.0.1:1234", "target host:port") storage Storage ) @@ -19,15 +19,16 @@ type Storage struct { sg timespans.StorageGetter } -func NewStorage(nsg timespans.StorageGetter) *Storage{ +func NewStorage(nsg timespans.StorageGetter) *Storage { return &Storage{sg: nsg} } /* RPC method providing the rating information from the storage. */ -func (s *Storage) GetCost(in *timespans.CallDescriptor, reply *timespans.CallCost) (err error) { - r, e := in.GetCost(s.sg) +func (s *Storage) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) { + descriptor := &cd + r, e := descriptor.GetCost(s.sg) *reply, err = *r, e return nil } @@ -42,7 +43,7 @@ func (s *Storage) Shutdown(args string, reply *string) (err error) { return nil } -func main() { +func main() { flag.Parse() getter, err := timespans.NewKyotoStorage("storage.kch") //getter, err := NewRedisStorage("tcp:127.0.0.1:6379", 10) diff --git a/cmd/rater/registration.go b/cmd/rater/registration.go index 2bf27b489..ea31747d6 100644 --- a/cmd/rater/registration.go +++ b/cmd/rater/registration.go @@ -1,12 +1,12 @@ package main import ( + "exp/signal" + "github.com/rif/cgrates/timespans" "log" "net/rpc" "os" - "exp/signal" "syscall" - "github.com/rif/cgrates/timespans" ) /* diff --git a/cmd/stresstest/stresstest.go b/cmd/stresstest/stresstest.go index 0464229ed..9bf05448a 100644 --- a/cmd/stresstest/stresstest.go +++ b/cmd/stresstest/stresstest.go @@ -3,11 +3,15 @@ package main import ( "net/rpc/jsonrpc" "log" - //"time" + "github.com/rif/cgrates/timespans" + "time" ) - func main(){ + t1 := time.Date(2012, time.February, 02, 17, 30, 0, 0, time.UTC) + t2 := time.Date(2012, time.February, 02, 18, 30, 0, 0, time.UTC) + cd := timespans.CallDescriptor{CstmId: "vdf", Subject: "rif", DestinationPrefix: "0256", TimeStart: t1, TimeEnd: t2} + result := timespans.CallCost{} client, _ := jsonrpc.Dial("tcp", "localhost:5090") runs := int(5 * 1e4); i:= 0 @@ -15,7 +19,7 @@ func main(){ for ; i < runs; i++ { go func(){ var reply string - client.Call("Responder.Get", "test", &reply) + client.Call("Responder.Get", cd, &result) c <- reply }() //time.Sleep(1*time.Second) @@ -23,7 +27,8 @@ func main(){ for j:=0; j < runs; j++ { <-c } - log.Print(i) + log.Println(result) + log.Println(i) client.Close() } diff --git a/cmd/stresstest/stresstest.py b/cmd/stresstest/stresstest.py index 5497f993d..b4632425d 100644 --- a/cmd/stresstest/stresstest.py +++ b/cmd/stresstest/stresstest.py @@ -3,6 +3,7 @@ # Written by Stephen Day # Modified by Bruce Eckel to work with both Python 2 & 3 import json, socket, itertools +from datetime import datetime class JSONClient(object): @@ -42,13 +43,15 @@ class JSONClient(object): rpc =JSONClient(("127.0.0.1", 5090)) +cd = {"Tor":0, "CstmId": "vdf", "Subject": "rif", "DestinationPrefix": "0256", "TimeStart": "2012-02-02T17:30:00Z", "TimeEnd": "2012-02-02T18:30:00Z"} + # alternative to the above s = socket.create_connection(("127.0.0.1", 5090)) -s.sendall(json.dumps(({"id": 1, "method": "Responder.Get", "params": ["test"]}))) +s.sendall(json.dumps(({"id": 1, "method": "Responder.Get", "params": [cd]}))) print s.recv(4096) i = 0 result = "" for i in xrange(5 * int(1e4) + 1): - result = rpc.call("Responder.Get", "test") + result = rpc.call("Responder.Get", cd) print i, result diff --git a/timespans/calldesc.go b/timespans/calldesc.go index 28f97a5f6..a15ceebf9 100644 --- a/timespans/calldesc.go +++ b/timespans/calldesc.go @@ -4,7 +4,7 @@ import ( "fmt" "strings" "time" - //"log" + "log" ) /* @@ -49,6 +49,10 @@ func (cd *CallDescriptor) GetKey() string { Finds the activation periods applicable to the call descriptior. */ func (cd *CallDescriptor) getActivePeriods() (is []*ActivationPeriod) { + if len(cd.ActivationPeriods) == 0{ + log.Print("No activation periods available!", cd) + return + } bestTime := cd.ActivationPeriods[0].ActivationTime is = append(is, cd.ActivationPeriods[0]) @@ -68,6 +72,10 @@ func (cd *CallDescriptor) getActivePeriods() (is []*ActivationPeriod) { Splits the call timespan into sub time spans accordin to the activation periods intervals. */ func (cd *CallDescriptor) splitInTimeSpans(aps []*ActivationPeriod) (timespans []*TimeSpan) { + if len(aps) == 0 { + log.Print("Nothing to split, move along...") + return + } ts1 := &TimeSpan{TimeStart: cd.TimeStart, TimeEnd: cd.TimeEnd} ts1.ActivationPeriod = aps[0] // first activation period starts before the timespan