From e83bf2514baeabd399719d90b22a33ffddf770ed Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 30 Jan 2012 19:04:45 +0200 Subject: [PATCH] started timeslots package --- cmd/inquirer/inquirer.go | 55 +++++++++++++++++++++++++++++++--- cmd/inquirer/responder.go | 2 +- cmd/rater/kyoto_storage.go | 27 ----------------- cmd/rater/rater.go | 11 +++---- cmd/rater/redis_storage.go | 28 ----------------- cmd/rater/registration.go | 5 ++-- cmd/rater/storage_interface.go | 9 ------ cmd/rater/timestamps.go | 33 -------------------- cmd/stresstest/stresstest.go | 19 ++++++++---- cmd/stresstest/stresstest.py | 2 +- 10 files changed, 76 insertions(+), 115 deletions(-) delete mode 100644 cmd/rater/kyoto_storage.go delete mode 100644 cmd/rater/redis_storage.go delete mode 100644 cmd/rater/storage_interface.go delete mode 100644 cmd/rater/timestamps.go diff --git a/cmd/inquirer/inquirer.go b/cmd/inquirer/inquirer.go index da9d1fc0d..0a69804cd 100644 --- a/cmd/inquirer/inquirer.go +++ b/cmd/inquirer/inquirer.go @@ -9,21 +9,68 @@ import ( "net/rpc/jsonrpc" "errors" "time" + "runtime" + "sync" ) -var raterList *RaterList +const NCPU = 4 + +var ( + raterList *RaterList + inChannels [NCPU]chan string + outChannels [NCPU]chan string + multiplexerIndex int + mu sync.Mutex + sem = make(chan int, NCPU) +) + + /* Handler for the statistics web client */ func handler(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, "
    ") - for addr, _ := range raterList.clientAddresses { - fmt.Fprint(w, fmt.Sprintf("
  1. %s
  2. ", addr)) + for _, addr := range raterList.clientAddresses { + fmt.Fprint(w, fmt.Sprintf("
  3. Client: %v
  4. ", addr)) } + fmt.Fprint(w, fmt.Sprintf("
  5. Gorutines: %v
  6. ", runtime.Goroutines())) fmt.Fprint(w, "
") } +/* +Creates a gorutine for every cpu core and the multiplexses the calls to each of them. +*/ +func initThreadedCallRater(){ + multiplexerIndex = 0 + runtime.GOMAXPROCS(NCPU) + fmt.Println(runtime.GOMAXPROCS(NCPU)) + for i:= 0; i< NCPU; i++ { + inChannels[i] = make(chan string) + outChannels[i] = make(chan string) + go func(in, out chan string){ + for { + key := <- in + out <- CallRater(key) + } + }(inChannels[i], outChannels[i]) + } +} + +/* +*/ +func ThreadedCallRater(key string) (replay string) { + mu.Lock() + defer mu.Unlock() + if multiplexerIndex >= NCPU { + multiplexerIndex = 0 + } + inChannels[multiplexerIndex] <- key + replay = <- outChannels[multiplexerIndex] + multiplexerIndex++ + return +} + /* The function that gets the information from the raters using balancer. */ @@ -77,7 +124,7 @@ func main() { go StopSingnalHandler() go listenToTheWorld() - + //initThreadedCallRater() http.HandleFunc("/", handler) log.Print("The server is listening...") http.ListenAndServe(":2000", nil) diff --git a/cmd/inquirer/responder.go b/cmd/inquirer/responder.go index 627ea6a51..1eb8dbd9c 100644 --- a/cmd/inquirer/responder.go +++ b/cmd/inquirer/responder.go @@ -5,7 +5,7 @@ type Responder byte /* RPC method thet provides the external RPC interface for getting the rating information. */ -func (r *Responder) Get(args string, replay *string) error { +func (r *Responder) Get(args string, replay *string) error { *replay = CallRater(args) return nil } diff --git a/cmd/rater/kyoto_storage.go b/cmd/rater/kyoto_storage.go deleted file mode 100644 index 7e915ffa5..000000000 --- a/cmd/rater/kyoto_storage.go +++ /dev/null @@ -1,27 +0,0 @@ -package main - -import ( - "log" - "github.com/fsouza/gokabinet/kc" -) - -type KyotoStorage struct { - db *kc.DB -} - -func NewKyotoStorage(filaName string) (*KyotoStorage, error) { - ndb, err := kc.Open(filaName, kc.READ) - log.Print("Starting kyoto storage") - return &KyotoStorage{db: ndb}, err -} - - -func (ks *KyotoStorage) Close() { - log.Print("Closing kyoto storage") - ks.db.Close() -} - -func (ks *KyotoStorage) Get(key string) (value string, err error) { - return ks.db.Get(key) -} - diff --git a/cmd/rater/rater.go b/cmd/rater/rater.go index 7009ea2ad..d14ca18c4 100644 --- a/cmd/rater/rater.go +++ b/cmd/rater/rater.go @@ -6,6 +6,7 @@ import ( "net" "net/rpc" "os" + "github.com/rif/cgrates/timeslots" ) var ( @@ -15,18 +16,18 @@ var ( ) type Storage struct { - sg StorageGetter + sg timeslots.StorageGetter } -func NewStorage(nsg StorageGetter) *Storage{ +func NewStorage(nsg timeslots.StorageGetter) *Storage{ return &Storage{sg: nsg} } /* RPC method providing the rating information from the storage. */ -func (s *Storage) Get(args string, reply *string) (err error) { - *reply, err = s.sg.Get(args) +func (s *Storage) GetCost(in *timeslots.CallDescription, reply *string) (err error) { + *reply, err = timeslots.GetCost(in, s.sg) return err } @@ -42,7 +43,7 @@ func (s *Storage) Shutdown(args string, reply *string) (err error) { func main() { flag.Parse() - getter, err := NewKyotoStorage("storage.kch") + getter, err := timeslots.NewKyotoStorage("storage.kch") //getter, err := NewRedisStorage("tcp:127.0.0.1:6379") //defer getter.Close() if err != nil { diff --git a/cmd/rater/redis_storage.go b/cmd/rater/redis_storage.go deleted file mode 100644 index de1176b9c..000000000 --- a/cmd/rater/redis_storage.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "log" - "github.com/simonz05/godis" -) - -type RedisStorage struct { - db *godis.Client -} - -func NewRedisStorage(address string) (*RedisStorage, error) { - ndb:= godis.New(address, 10, "") - log.Print("Starting redis storage") - return &RedisStorage{db: ndb}, nil -} - - -func (rs *RedisStorage) Close() { - log.Print("Closing redis storage") - rs.db.Quit() -} - -func (rs *RedisStorage) Get(key string) (string, error) { - elem, err := rs.db.Get(key) - return elem.String(), err -} - diff --git a/cmd/rater/registration.go b/cmd/rater/registration.go index faaaef1df..5e557382c 100644 --- a/cmd/rater/registration.go +++ b/cmd/rater/registration.go @@ -6,12 +6,13 @@ import ( "os" "os/signal" "syscall" + "github.com/rif/cgrates/timeslots" ) /* Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from inquirer and closes the storage before exiting. */ -func StopSingnalHandler(server, listen *string, getter *KyotoStorage) { +func StopSingnalHandler(server, listen *string, sg timeslots.StorageGetter) { log.Print("Handling stop signals...") sig := <-signal.Incoming if usig, ok := sig.(os.UnixSignal); ok { @@ -19,7 +20,7 @@ func StopSingnalHandler(server, listen *string, getter *KyotoStorage) { case syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT: log.Printf("Caught signal %v, unregistering from server\n", usig) unregisterFromServer(server, listen) - getter.Close() + sg.Close() os.Exit(1) } } diff --git a/cmd/rater/storage_interface.go b/cmd/rater/storage_interface.go deleted file mode 100644 index 87054eb22..000000000 --- a/cmd/rater/storage_interface.go +++ /dev/null @@ -1,9 +0,0 @@ -package main - -/* -Interface for storage providers. -*/ -type StorageGetter interface { - Close() - Get(key string) (string, error) -} diff --git a/cmd/rater/timestamps.go b/cmd/rater/timestamps.go deleted file mode 100644 index f950045a5..000000000 --- a/cmd/rater/timestamps.go +++ /dev/null @@ -1,33 +0,0 @@ -package main - -import ( - "time" -) - -type BilingUnit int - -type RatingProfile struct { - StartTime time.Time - ConnectFee float32 - Price float32 - BillingUnit BilingUnit -} - -type ActivationPeriod struct { - ActivationTime time.Time - RatingProfiles []RatingProfile -} - -type Customer struct { - Id string - Prefix string - ActivationPeriods []ActivationPeriod -} - -const ( - SECONDS =iota - COUNT - BYTES -) - - diff --git a/cmd/stresstest/stresstest.go b/cmd/stresstest/stresstest.go index 72e89d6a6..3a494d2f5 100644 --- a/cmd/stresstest/stresstest.go +++ b/cmd/stresstest/stresstest.go @@ -2,20 +2,29 @@ package main import ( "net/rpc/jsonrpc" - "fmt" + "log" //"time" ) func main(){ client, _ := jsonrpc.Dial("tcp", "localhost:5090") - var reply string + runs := int(5 * 10e3); i:= 0 - for ; i < 5 * 10e3; i++ { - client.Call("Responder.Get", "test", &reply) + c := make(chan string) + for ; i < runs; i++ { + go func(){ + var reply string + client.Call("Responder.Get", "test", &reply) + c <- reply + }() //time.Sleep(1*time.Second) } - fmt.Println(i, reply) + for j:=0; j < runs; j++ { + <-c + } + log.Print(i) + client.Close() } diff --git a/cmd/stresstest/stresstest.py b/cmd/stresstest/stresstest.py index 4be423008..f263e0f7b 100644 --- a/cmd/stresstest/stresstest.py +++ b/cmd/stresstest/stresstest.py @@ -49,6 +49,6 @@ print s.recv(4096) i = 0 result = "" -for i in xrange(5 * int(10e3) + 1): +for i in xrange(5 * int(10e4) + 1): result = rpc.call("Responder.Get", "test") print i, result