whole chain working again

This commit is contained in:
Radu Ioan Fericean
2012-02-08 13:24:47 +02:00
parent b9ced45f52
commit 573ffee878
11 changed files with 107 additions and 90 deletions

View File

@@ -6,24 +6,24 @@ import (
)
type RaterList struct {
clientAddresses []string
clientAddresses []string
clientConnections []*rpc.Client
balancerIndex int
mu sync.RWMutex
balancerIndex int
mu sync.RWMutex
}
/*
Constructor for RateList holding one slice for addreses and one slice for connections.
*/
func NewRaterList() *RaterList {
r:= &RaterList{balancerIndex: 0} // leaving both slices to nil
r := &RaterList{balancerIndex: 0} // leaving both slices to nil
return r
}
/*
Adds a client to the two internal slices.
*/
func (rl *RaterList) AddClient(address string, client *rpc.Client){
func (rl *RaterList) AddClient(address string, client *rpc.Client) {
rl.clientAddresses = append(rl.clientAddresses, address)
rl.clientConnections = append(rl.clientConnections, client)
return
@@ -32,7 +32,7 @@ func (rl *RaterList) AddClient(address string, client *rpc.Client){
/*
Removes a client from the slices locking the readers and reseting the balancer index.
*/
func (rl *RaterList) RemoveClient(address string){
func (rl *RaterList) RemoveClient(address string) {
index := -1
for i, v := range rl.clientAddresses {
if v == address {
@@ -53,7 +53,7 @@ func (rl *RaterList) RemoveClient(address string){
/*
Returns a client for the specifed address.
*/
func (rl *RaterList) GetClient(address string) (*rpc.Client, bool){
func (rl *RaterList) GetClient(address string) (*rpc.Client, bool) {
for i, v := range rl.clientAddresses {
if v == address {
return rl.clientConnections[i], true
@@ -74,7 +74,7 @@ func (rl *RaterList) Balance() (result *rpc.Client) {
if len(rl.clientAddresses) > 0 {
result = rl.clientConnections[rl.balancerIndex]
rl.balancerIndex++
}
return
}
return
}

View File

@@ -1,12 +1,12 @@
package main
import (
"testing"
"net/rpc"
"testing"
)
func BenchmarkBalance(b *testing.B) {
raterlist:= NewRaterList()
raterlist := NewRaterList()
raterlist.AddClient("client 1", new(rpc.Client))
raterlist.AddClient("client 2", new(rpc.Client))
raterlist.AddClient("client 3", new(rpc.Client))
@@ -15,12 +15,11 @@ func BenchmarkBalance(b *testing.B) {
}
}
func TestRemoving(t *testing.T) {
raterlist:= NewRaterList()
c1:= new(rpc.Client)
c2:= new(rpc.Client)
c3:= new(rpc.Client)
raterlist := NewRaterList()
c1 := new(rpc.Client)
c2 := new(rpc.Client)
c3 := new(rpc.Client)
raterlist.AddClient("client 1", c1)
raterlist.AddClient("client 2", c2)
raterlist.AddClient("client 3", c3)
@@ -33,42 +32,42 @@ func TestRemoving(t *testing.T) {
}
func TestGet(t *testing.T) {
raterlist:= NewRaterList()
c1:= new(rpc.Client)
raterlist := NewRaterList()
c1 := new(rpc.Client)
raterlist.AddClient("client 1", c1)
result, ok:= raterlist.GetClient("client 1")
result, ok := raterlist.GetClient("client 1")
if !ok || c1 != result {
t.Error("Get failed")
}
}
func TestOneBalancer(t *testing.T) {
raterlist:= NewRaterList()
raterlist := NewRaterList()
raterlist.AddClient("client 1", new(rpc.Client))
c1:= raterlist.Balance()
c2:= raterlist.Balance()
c1 := raterlist.Balance()
c2 := raterlist.Balance()
if c1 != c2 {
t.Error("With only one rater these shoud be equal")
}
}
func Test100Balancer(t *testing.T) {
raterlist:= NewRaterList()
raterlist := NewRaterList()
var clients []*rpc.Client
for i:=0; i< 100; i++ {
c:= new(rpc.Client)
for i := 0; i < 100; i++ {
c := new(rpc.Client)
clients = append(clients, c)
raterlist.AddClient("client 1", c)
}
for i:=0; i< 100; i++ {
c:=raterlist.Balance()
for i := 0; i < 100; i++ {
c := raterlist.Balance()
if c != clients[i] {
t.Error("Balance did not iterate all the available clients")
}
}
c:=raterlist.Balance()
c := raterlist.Balance()
if c != clients[0] {
t.Error("Balance did not lopped from the begining")
}
}

View File

@@ -1,31 +1,29 @@
package main
import (
"errors"
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
"errors"
"time"
"runtime"
"sync"
"time"
"github.com/rif/cgrates/timespans"
)
var (
nCPU = runtime.NumCPU()
raterList *RaterList
inChannels []chan string
outChannels []chan string
multiplexerIndex int
mu sync.Mutex
sem = make(chan int, nCPU)
nCPU = runtime.NumCPU()
raterList *RaterList
inChannels []chan *timespans.CallDescriptor
outChannels []chan *timespans.CallCost
multiplexerIndex int
mu sync.Mutex
sem = make(chan int, nCPU)
)
/*
Handler for the statistics web client
*/
@@ -41,33 +39,33 @@ func handler(w http.ResponseWriter, r *http.Request) {
/*
Creates a gorutine for every cpu core and the multiplexses the calls to each of them.
*/
func initThreadedCallRater(){
func initThreadedCallRater() {
multiplexerIndex = 0
runtime.GOMAXPROCS(nCPU)
inChannels = make([]chan string, nCPU)
outChannels = make([]chan string, nCPU)
for i:= 0; i< nCPU; i++ {
inChannels[i] = make(chan string)
outChannels[i] = make(chan string)
go func(in, out chan string){
inChannels = make([]chan *timespans.CallDescriptor, nCPU)
outChannels = make([]chan *timespans.CallCost, nCPU)
for i := 0; i < nCPU; i++ {
inChannels[i] = make(chan *timespans.CallDescriptor)
outChannels[i] = make(chan *timespans.CallCost)
go func(in chan *timespans.CallDescriptor, out chan *timespans.CallCost) {
for {
key := <- in
key := <-in
out <- CallRater(key)
}
}(inChannels[i], outChannels[i])
}
}
}
/*
*/
func ThreadedCallRater(key string) (replay string) {
*/
func ThreadedCallRater(key *timespans.CallDescriptor) (reply *timespans.CallCost) {
mu.Lock()
defer mu.Unlock()
if multiplexerIndex >= nCPU {
multiplexerIndex = 0
}
inChannels[multiplexerIndex] <- key
replay = <- outChannels[multiplexerIndex]
reply = <-outChannels[multiplexerIndex]
multiplexerIndex++
return
}
@@ -75,21 +73,22 @@ func ThreadedCallRater(key string) (replay string) {
/*
The function that gets the information from the raters using balancer.
*/
func CallRater(key string) (reply string) {
func CallRater(key *timespans.CallDescriptor) (reply *timespans.CallCost) {
err := errors.New("") //not nil value
for err != nil {
client:= raterList.Balance()
client := raterList.Balance()
if client == nil {
log.Print("Waiting for raters to register...")
time.Sleep(1 * time.Second) // wait one second and retry
} else {
err = client.Call("Storage.Get", key, &reply)
reply = &timespans.CallCost{}
err = client.Call("Storage.GetCost", *key, reply)
if err != nil {
log.Printf("Got en error from rater: %v", err)
}
}
}
}
return
return
}
func listenToTheWorld() {
@@ -106,7 +105,7 @@ func listenToTheWorld() {
rpc.Register(responder)
for {
c, err := l.Accept()
c, err := l.Accept()
if err != nil {
log.Printf("accept error: %s", c)
continue
@@ -122,11 +121,11 @@ func main() {
raterServer := new(RaterServer)
rpc.Register(raterServer)
rpc.HandleHTTP()
go StopSingnalHandler()
go StopSingnalHandler()
go listenToTheWorld()
//initThreadedCallRater()
http.HandleFunc("/", handler)
http.HandleFunc("/", handler)
log.Print("The server is listening...")
http.ListenAndServe(":2000", nil)
}

View File

@@ -28,4 +28,3 @@ func TestRPCGet(t *testing.T) {
t.Errorf("replay == %q, want %q", reply, expect)
}
}

View File

@@ -4,10 +4,10 @@ import (
"fmt"
"log"
"net/rpc"
"time"
"os/signal"
"os"
"exp/signal"
"syscall"
"time"
)
/*
@@ -26,7 +26,7 @@ func StopSingnalHandler() {
case syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT:
log.Printf("Caught signal %v, sending shutdownto raters\n", usig)
var reply string
for i,client:= range raterList.clientConnections {
for i, client := range raterList.clientConnections {
client.Call("Storage.Shutdown", "", &reply)
log.Printf("Shutdown rater %v: %v ", raterList.clientAddresses[i], reply)
}
@@ -53,14 +53,14 @@ func (rs *RaterServer) RegisterRater(clientAddress string, replay *byte) error {
/*
RPC method that recives a rater addres gets the connections and closes it and removes the pair from rater list.
*/
func (rs *RaterServer) UnRegisterRater(clientAddress string, replay *byte) error {
func (rs *RaterServer) UnRegisterRater(clientAddress string, replay *byte) error {
client, ok := raterList.GetClient(clientAddress)
if ok {
client.Close()
client.Close()
raterList.RemoveClient(clientAddress)
log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress))
log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress))
} else {
log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress))
log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress))
}
return nil
}

View File

@@ -1,12 +1,15 @@
package main
import (
"github.com/rif/cgrates/timespans"
)
type Responder byte
/*
RPC method thet provides the external RPC interface for getting the rating information.
*/
func (r *Responder) Get(args string, replay *string) error {
*replay = CallRater(args)
func (r *Responder) Get(arg timespans.CallDescriptor, replay *timespans.CallCost) error {
*replay = *CallRater(&arg)
return nil
}

View File

@@ -2,16 +2,16 @@ package main
import (
"flag"
"github.com/rif/cgrates/timespans"
"log"
"net"
"net/rpc"
"os"
"github.com/rif/cgrates/timespans"
)
var (
server = flag.String("server", "127.0.0.1:2000", "target host:port")
listen = flag.String("listen", "127.0.0.1:1234", "target host:port")
server = flag.String("server", "127.0.0.1:2000", "target host:port")
listen = flag.String("listen", "127.0.0.1:1234", "target host:port")
storage Storage
)
@@ -19,15 +19,16 @@ type Storage struct {
sg timespans.StorageGetter
}
func NewStorage(nsg timespans.StorageGetter) *Storage{
func NewStorage(nsg timespans.StorageGetter) *Storage {
return &Storage{sg: nsg}
}
/*
RPC method providing the rating information from the storage.
*/
func (s *Storage) GetCost(in *timespans.CallDescriptor, reply *timespans.CallCost) (err error) {
r, e := in.GetCost(s.sg)
func (s *Storage) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) {
descriptor := &cd
r, e := descriptor.GetCost(s.sg)
*reply, err = *r, e
return nil
}
@@ -42,7 +43,7 @@ func (s *Storage) Shutdown(args string, reply *string) (err error) {
return nil
}
func main() {
func main() {
flag.Parse()
getter, err := timespans.NewKyotoStorage("storage.kch")
//getter, err := NewRedisStorage("tcp:127.0.0.1:6379", 10)

View File

@@ -1,12 +1,12 @@
package main
import (
"exp/signal"
"github.com/rif/cgrates/timespans"
"log"
"net/rpc"
"os"
"exp/signal"
"syscall"
"github.com/rif/cgrates/timespans"
)
/*

View File

@@ -3,11 +3,15 @@ package main
import (
"net/rpc/jsonrpc"
"log"
//"time"
"github.com/rif/cgrates/timespans"
"time"
)
func main(){
t1 := time.Date(2012, time.February, 02, 17, 30, 0, 0, time.UTC)
t2 := time.Date(2012, time.February, 02, 18, 30, 0, 0, time.UTC)
cd := timespans.CallDescriptor{CstmId: "vdf", Subject: "rif", DestinationPrefix: "0256", TimeStart: t1, TimeEnd: t2}
result := timespans.CallCost{}
client, _ := jsonrpc.Dial("tcp", "localhost:5090")
runs := int(5 * 1e4);
i:= 0
@@ -15,7 +19,7 @@ func main(){
for ; i < runs; i++ {
go func(){
var reply string
client.Call("Responder.Get", "test", &reply)
client.Call("Responder.Get", cd, &result)
c <- reply
}()
//time.Sleep(1*time.Second)
@@ -23,7 +27,8 @@ func main(){
for j:=0; j < runs; j++ {
<-c
}
log.Print(i)
log.Println(result)
log.Println(i)
client.Close()
}

View File

@@ -3,6 +3,7 @@
# Written by Stephen Day
# Modified by Bruce Eckel to work with both Python 2 & 3
import json, socket, itertools
from datetime import datetime
class JSONClient(object):
@@ -42,13 +43,15 @@ class JSONClient(object):
rpc =JSONClient(("127.0.0.1", 5090))
cd = {"Tor":0, "CstmId": "vdf", "Subject": "rif", "DestinationPrefix": "0256", "TimeStart": "2012-02-02T17:30:00Z", "TimeEnd": "2012-02-02T18:30:00Z"}
# alternative to the above
s = socket.create_connection(("127.0.0.1", 5090))
s.sendall(json.dumps(({"id": 1, "method": "Responder.Get", "params": ["test"]})))
s.sendall(json.dumps(({"id": 1, "method": "Responder.Get", "params": [cd]})))
print s.recv(4096)
i = 0
result = ""
for i in xrange(5 * int(1e4) + 1):
result = rpc.call("Responder.Get", "test")
result = rpc.call("Responder.Get", cd)
print i, result

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"strings"
"time"
//"log"
"log"
)
/*
@@ -49,6 +49,10 @@ func (cd *CallDescriptor) GetKey() string {
Finds the activation periods applicable to the call descriptior.
*/
func (cd *CallDescriptor) getActivePeriods() (is []*ActivationPeriod) {
if len(cd.ActivationPeriods) == 0{
log.Print("No activation periods available!", cd)
return
}
bestTime := cd.ActivationPeriods[0].ActivationTime
is = append(is, cd.ActivationPeriods[0])
@@ -68,6 +72,10 @@ func (cd *CallDescriptor) getActivePeriods() (is []*ActivationPeriod) {
Splits the call timespan into sub time spans accordin to the activation periods intervals.
*/
func (cd *CallDescriptor) splitInTimeSpans(aps []*ActivationPeriod) (timespans []*TimeSpan) {
if len(aps) == 0 {
log.Print("Nothing to split, move along...")
return
}
ts1 := &TimeSpan{TimeStart: cd.TimeStart, TimeEnd: cd.TimeEnd}
ts1.ActivationPeriod = aps[0] // first activation period starts before the timespan