diff --git a/cmd/inquirer/inquirer.go b/cmd/inquirer/inquirer.go
index da9d1fc0d..0a69804cd 100644
--- a/cmd/inquirer/inquirer.go
+++ b/cmd/inquirer/inquirer.go
@@ -9,21 +9,68 @@ import (
"net/rpc/jsonrpc"
"errors"
"time"
+ "runtime"
+ "sync"
)
-var raterList *RaterList
+const NCPU = 4
+
+var (
+ raterList *RaterList
+ inChannels [NCPU]chan string
+ outChannels [NCPU]chan string
+ multiplexerIndex int
+ mu sync.Mutex
+ sem = make(chan int, NCPU)
+)
+
+
/*
Handler for the statistics web client
*/
func handler(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "
")
- for addr, _ := range raterList.clientAddresses {
- fmt.Fprint(w, fmt.Sprintf("- %s
", addr))
+ for _, addr := range raterList.clientAddresses {
+ fmt.Fprint(w, fmt.Sprintf("- Client: %v
", addr))
}
+ fmt.Fprint(w, fmt.Sprintf("- Gorutines: %v
", runtime.Goroutines()))
fmt.Fprint(w, "
")
}
+/*
+Creates a gorutine for every cpu core and the multiplexses the calls to each of them.
+*/
+func initThreadedCallRater(){
+ multiplexerIndex = 0
+ runtime.GOMAXPROCS(NCPU)
+ fmt.Println(runtime.GOMAXPROCS(NCPU))
+ for i:= 0; i< NCPU; i++ {
+ inChannels[i] = make(chan string)
+ outChannels[i] = make(chan string)
+ go func(in, out chan string){
+ for {
+ key := <- in
+ out <- CallRater(key)
+ }
+ }(inChannels[i], outChannels[i])
+ }
+}
+
+/*
+*/
+func ThreadedCallRater(key string) (replay string) {
+ mu.Lock()
+ defer mu.Unlock()
+ if multiplexerIndex >= NCPU {
+ multiplexerIndex = 0
+ }
+ inChannels[multiplexerIndex] <- key
+ replay = <- outChannels[multiplexerIndex]
+ multiplexerIndex++
+ return
+}
+
/*
The function that gets the information from the raters using balancer.
*/
@@ -77,7 +124,7 @@ func main() {
go StopSingnalHandler()
go listenToTheWorld()
-
+ //initThreadedCallRater()
http.HandleFunc("/", handler)
log.Print("The server is listening...")
http.ListenAndServe(":2000", nil)
diff --git a/cmd/inquirer/responder.go b/cmd/inquirer/responder.go
index 627ea6a51..1eb8dbd9c 100644
--- a/cmd/inquirer/responder.go
+++ b/cmd/inquirer/responder.go
@@ -5,7 +5,7 @@ type Responder byte
/*
RPC method thet provides the external RPC interface for getting the rating information.
*/
-func (r *Responder) Get(args string, replay *string) error {
+func (r *Responder) Get(args string, replay *string) error {
*replay = CallRater(args)
return nil
}
diff --git a/cmd/rater/kyoto_storage.go b/cmd/rater/kyoto_storage.go
deleted file mode 100644
index 7e915ffa5..000000000
--- a/cmd/rater/kyoto_storage.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package main
-
-import (
- "log"
- "github.com/fsouza/gokabinet/kc"
-)
-
-type KyotoStorage struct {
- db *kc.DB
-}
-
-func NewKyotoStorage(filaName string) (*KyotoStorage, error) {
- ndb, err := kc.Open(filaName, kc.READ)
- log.Print("Starting kyoto storage")
- return &KyotoStorage{db: ndb}, err
-}
-
-
-func (ks *KyotoStorage) Close() {
- log.Print("Closing kyoto storage")
- ks.db.Close()
-}
-
-func (ks *KyotoStorage) Get(key string) (value string, err error) {
- return ks.db.Get(key)
-}
-
diff --git a/cmd/rater/rater.go b/cmd/rater/rater.go
index 7009ea2ad..d14ca18c4 100644
--- a/cmd/rater/rater.go
+++ b/cmd/rater/rater.go
@@ -6,6 +6,7 @@ import (
"net"
"net/rpc"
"os"
+ "github.com/rif/cgrates/timeslots"
)
var (
@@ -15,18 +16,18 @@ var (
)
type Storage struct {
- sg StorageGetter
+ sg timeslots.StorageGetter
}
-func NewStorage(nsg StorageGetter) *Storage{
+func NewStorage(nsg timeslots.StorageGetter) *Storage{
return &Storage{sg: nsg}
}
/*
RPC method providing the rating information from the storage.
*/
-func (s *Storage) Get(args string, reply *string) (err error) {
- *reply, err = s.sg.Get(args)
+func (s *Storage) GetCost(in *timeslots.CallDescription, reply *string) (err error) {
+ *reply, err = timeslots.GetCost(in, s.sg)
return err
}
@@ -42,7 +43,7 @@ func (s *Storage) Shutdown(args string, reply *string) (err error) {
func main() {
flag.Parse()
- getter, err := NewKyotoStorage("storage.kch")
+ getter, err := timeslots.NewKyotoStorage("storage.kch")
//getter, err := NewRedisStorage("tcp:127.0.0.1:6379")
//defer getter.Close()
if err != nil {
diff --git a/cmd/rater/redis_storage.go b/cmd/rater/redis_storage.go
deleted file mode 100644
index de1176b9c..000000000
--- a/cmd/rater/redis_storage.go
+++ /dev/null
@@ -1,28 +0,0 @@
-package main
-
-import (
- "log"
- "github.com/simonz05/godis"
-)
-
-type RedisStorage struct {
- db *godis.Client
-}
-
-func NewRedisStorage(address string) (*RedisStorage, error) {
- ndb:= godis.New(address, 10, "")
- log.Print("Starting redis storage")
- return &RedisStorage{db: ndb}, nil
-}
-
-
-func (rs *RedisStorage) Close() {
- log.Print("Closing redis storage")
- rs.db.Quit()
-}
-
-func (rs *RedisStorage) Get(key string) (string, error) {
- elem, err := rs.db.Get(key)
- return elem.String(), err
-}
-
diff --git a/cmd/rater/registration.go b/cmd/rater/registration.go
index faaaef1df..5e557382c 100644
--- a/cmd/rater/registration.go
+++ b/cmd/rater/registration.go
@@ -6,12 +6,13 @@ import (
"os"
"os/signal"
"syscall"
+ "github.com/rif/cgrates/timeslots"
)
/*
Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from inquirer and closes the storage before exiting.
*/
-func StopSingnalHandler(server, listen *string, getter *KyotoStorage) {
+func StopSingnalHandler(server, listen *string, sg timeslots.StorageGetter) {
log.Print("Handling stop signals...")
sig := <-signal.Incoming
if usig, ok := sig.(os.UnixSignal); ok {
@@ -19,7 +20,7 @@ func StopSingnalHandler(server, listen *string, getter *KyotoStorage) {
case syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT:
log.Printf("Caught signal %v, unregistering from server\n", usig)
unregisterFromServer(server, listen)
- getter.Close()
+ sg.Close()
os.Exit(1)
}
}
diff --git a/cmd/rater/storage_interface.go b/cmd/rater/storage_interface.go
deleted file mode 100644
index 87054eb22..000000000
--- a/cmd/rater/storage_interface.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package main
-
-/*
-Interface for storage providers.
-*/
-type StorageGetter interface {
- Close()
- Get(key string) (string, error)
-}
diff --git a/cmd/rater/timestamps.go b/cmd/rater/timestamps.go
deleted file mode 100644
index f950045a5..000000000
--- a/cmd/rater/timestamps.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package main
-
-import (
- "time"
-)
-
-type BilingUnit int
-
-type RatingProfile struct {
- StartTime time.Time
- ConnectFee float32
- Price float32
- BillingUnit BilingUnit
-}
-
-type ActivationPeriod struct {
- ActivationTime time.Time
- RatingProfiles []RatingProfile
-}
-
-type Customer struct {
- Id string
- Prefix string
- ActivationPeriods []ActivationPeriod
-}
-
-const (
- SECONDS =iota
- COUNT
- BYTES
-)
-
-
diff --git a/cmd/stresstest/stresstest.go b/cmd/stresstest/stresstest.go
index 72e89d6a6..3a494d2f5 100644
--- a/cmd/stresstest/stresstest.go
+++ b/cmd/stresstest/stresstest.go
@@ -2,20 +2,29 @@ package main
import (
"net/rpc/jsonrpc"
- "fmt"
+ "log"
//"time"
)
func main(){
client, _ := jsonrpc.Dial("tcp", "localhost:5090")
- var reply string
+ runs := int(5 * 10e3);
i:= 0
- for ; i < 5 * 10e3; i++ {
- client.Call("Responder.Get", "test", &reply)
+ c := make(chan string)
+ for ; i < runs; i++ {
+ go func(){
+ var reply string
+ client.Call("Responder.Get", "test", &reply)
+ c <- reply
+ }()
//time.Sleep(1*time.Second)
}
- fmt.Println(i, reply)
+ for j:=0; j < runs; j++ {
+ <-c
+ }
+ log.Print(i)
+ client.Close()
}
diff --git a/cmd/stresstest/stresstest.py b/cmd/stresstest/stresstest.py
index 4be423008..f263e0f7b 100644
--- a/cmd/stresstest/stresstest.py
+++ b/cmd/stresstest/stresstest.py
@@ -49,6 +49,6 @@ print s.recv(4096)
i = 0
result = ""
-for i in xrange(5 * int(10e3) + 1):
+for i in xrange(5 * int(10e4) + 1):
result = rpc.call("Responder.Get", "test")
print i, result