started timeslots package

This commit is contained in:
Radu Ioan Fericean
2012-01-30 19:04:45 +02:00
parent 9f206bd22e
commit e83bf2514b
10 changed files with 76 additions and 115 deletions

View File

@@ -9,21 +9,68 @@ import (
"net/rpc/jsonrpc"
"errors"
"time"
"runtime"
"sync"
)
var raterList *RaterList
const NCPU = 4
var (
raterList *RaterList
inChannels [NCPU]chan string
outChannels [NCPU]chan string
multiplexerIndex int
mu sync.Mutex
sem = make(chan int, NCPU)
)
/*
Handler for the statistics web client
*/
func handler(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "<html><body><ol>")
for addr, _ := range raterList.clientAddresses {
fmt.Fprint(w, fmt.Sprintf("<li>%s</li>", addr))
for _, addr := range raterList.clientAddresses {
fmt.Fprint(w, fmt.Sprintf("<li>Client: %v</li>", addr))
}
fmt.Fprint(w, fmt.Sprintf("<li>Gorutines: %v</li>", runtime.Goroutines()))
fmt.Fprint(w, "</ol></body></html>")
}
/*
Creates a gorutine for every cpu core and the multiplexses the calls to each of them.
*/
func initThreadedCallRater(){
multiplexerIndex = 0
runtime.GOMAXPROCS(NCPU)
fmt.Println(runtime.GOMAXPROCS(NCPU))
for i:= 0; i< NCPU; i++ {
inChannels[i] = make(chan string)
outChannels[i] = make(chan string)
go func(in, out chan string){
for {
key := <- in
out <- CallRater(key)
}
}(inChannels[i], outChannels[i])
}
}
/*
*/
func ThreadedCallRater(key string) (replay string) {
mu.Lock()
defer mu.Unlock()
if multiplexerIndex >= NCPU {
multiplexerIndex = 0
}
inChannels[multiplexerIndex] <- key
replay = <- outChannels[multiplexerIndex]
multiplexerIndex++
return
}
/*
The function that gets the information from the raters using balancer.
*/
@@ -77,7 +124,7 @@ func main() {
go StopSingnalHandler()
go listenToTheWorld()
//initThreadedCallRater()
http.HandleFunc("/", handler)
log.Print("The server is listening...")
http.ListenAndServe(":2000", nil)

View File

@@ -5,7 +5,7 @@ type Responder byte
/*
RPC method thet provides the external RPC interface for getting the rating information.
*/
func (r *Responder) Get(args string, replay *string) error {
func (r *Responder) Get(args string, replay *string) error {
*replay = CallRater(args)
return nil
}

View File

@@ -1,27 +0,0 @@
package main
import (
"log"
"github.com/fsouza/gokabinet/kc"
)
type KyotoStorage struct {
db *kc.DB
}
func NewKyotoStorage(filaName string) (*KyotoStorage, error) {
ndb, err := kc.Open(filaName, kc.READ)
log.Print("Starting kyoto storage")
return &KyotoStorage{db: ndb}, err
}
func (ks *KyotoStorage) Close() {
log.Print("Closing kyoto storage")
ks.db.Close()
}
func (ks *KyotoStorage) Get(key string) (value string, err error) {
return ks.db.Get(key)
}

View File

@@ -6,6 +6,7 @@ import (
"net"
"net/rpc"
"os"
"github.com/rif/cgrates/timeslots"
)
var (
@@ -15,18 +16,18 @@ var (
)
type Storage struct {
sg StorageGetter
sg timeslots.StorageGetter
}
func NewStorage(nsg StorageGetter) *Storage{
func NewStorage(nsg timeslots.StorageGetter) *Storage{
return &Storage{sg: nsg}
}
/*
RPC method providing the rating information from the storage.
*/
func (s *Storage) Get(args string, reply *string) (err error) {
*reply, err = s.sg.Get(args)
func (s *Storage) GetCost(in *timeslots.CallDescription, reply *string) (err error) {
*reply, err = timeslots.GetCost(in, s.sg)
return err
}
@@ -42,7 +43,7 @@ func (s *Storage) Shutdown(args string, reply *string) (err error) {
func main() {
flag.Parse()
getter, err := NewKyotoStorage("storage.kch")
getter, err := timeslots.NewKyotoStorage("storage.kch")
//getter, err := NewRedisStorage("tcp:127.0.0.1:6379")
//defer getter.Close()
if err != nil {

View File

@@ -1,28 +0,0 @@
package main
import (
"log"
"github.com/simonz05/godis"
)
type RedisStorage struct {
db *godis.Client
}
func NewRedisStorage(address string) (*RedisStorage, error) {
ndb:= godis.New(address, 10, "")
log.Print("Starting redis storage")
return &RedisStorage{db: ndb}, nil
}
func (rs *RedisStorage) Close() {
log.Print("Closing redis storage")
rs.db.Quit()
}
func (rs *RedisStorage) Get(key string) (string, error) {
elem, err := rs.db.Get(key)
return elem.String(), err
}

View File

@@ -6,12 +6,13 @@ import (
"os"
"os/signal"
"syscall"
"github.com/rif/cgrates/timeslots"
)
/*
Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from inquirer and closes the storage before exiting.
*/
func StopSingnalHandler(server, listen *string, getter *KyotoStorage) {
func StopSingnalHandler(server, listen *string, sg timeslots.StorageGetter) {
log.Print("Handling stop signals...")
sig := <-signal.Incoming
if usig, ok := sig.(os.UnixSignal); ok {
@@ -19,7 +20,7 @@ func StopSingnalHandler(server, listen *string, getter *KyotoStorage) {
case syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT:
log.Printf("Caught signal %v, unregistering from server\n", usig)
unregisterFromServer(server, listen)
getter.Close()
sg.Close()
os.Exit(1)
}
}

View File

@@ -1,9 +0,0 @@
package main
/*
Interface for storage providers.
*/
type StorageGetter interface {
Close()
Get(key string) (string, error)
}

View File

@@ -1,33 +0,0 @@
package main
import (
"time"
)
type BilingUnit int
type RatingProfile struct {
StartTime time.Time
ConnectFee float32
Price float32
BillingUnit BilingUnit
}
type ActivationPeriod struct {
ActivationTime time.Time
RatingProfiles []RatingProfile
}
type Customer struct {
Id string
Prefix string
ActivationPeriods []ActivationPeriod
}
const (
SECONDS =iota
COUNT
BYTES
)

View File

@@ -2,20 +2,29 @@ package main
import (
"net/rpc/jsonrpc"
"fmt"
"log"
//"time"
)
func main(){
client, _ := jsonrpc.Dial("tcp", "localhost:5090")
var reply string
runs := int(5 * 10e3);
i:= 0
for ; i < 5 * 10e3; i++ {
client.Call("Responder.Get", "test", &reply)
c := make(chan string)
for ; i < runs; i++ {
go func(){
var reply string
client.Call("Responder.Get", "test", &reply)
c <- reply
}()
//time.Sleep(1*time.Second)
}
fmt.Println(i, reply)
for j:=0; j < runs; j++ {
<-c
}
log.Print(i)
client.Close()
}

View File

@@ -49,6 +49,6 @@ print s.recv(4096)
i = 0
result = ""
for i in xrange(5 * int(10e3) + 1):
for i in xrange(5 * int(10e4) + 1):
result = rpc.call("Responder.Get", "test")
print i, result