diff --git a/cmd/cgr-console/cgr-console.go b/cmd/cgr-console/cgr-console.go
index 4174dbb21..0ae306aae 100644
--- a/cmd/cgr-console/cgr-console.go
+++ b/cmd/cgr-console/cgr-console.go
@@ -23,6 +23,7 @@ import (
"fmt"
"github.com/cgrates/cgrates/timespans"
"log"
+ "net/rpc"
"net/rpc/jsonrpc"
"os"
"time"
@@ -39,11 +40,18 @@ var (
start = flag.String("start", "2012-02-09T00:00:00Z", "Time start")
end = flag.String("end", "2012-02-09T00:10:00Z", "Time end")
amount = flag.Float64("amount", 100, "Amount for different operations")
+ json = flag.Bool("json", false, "Use JSON for RPC encoding.")
)
func main() {
flag.Parse()
- client, err := jsonrpc.Dial("tcp", *server)
+ var client *rpc.Client
+ var err error
+ if *json {
+ client, err = jsonrpc.Dial("tcp", *server)
+ } else {
+ client, err = rpc.Dial("tcp", *server)
+ }
if err != nil {
log.Fatal("Could not connect to server " + *server)
}
@@ -101,21 +109,6 @@ func main() {
if err = client.Call("Responder.DebitSeconds", cd, &result); err == nil {
fmt.Println(result)
}
- /*case "addvolumediscountseconds":
- var result float64
- if err = client.Call("Responder.AddVolumeDiscountSeconds", cd, &result); err == nil {
- fmt.Println(result)
- }
- case "resetvolumediscountseconds":
- var result float64
- if err = client.Call("Responder.ResetVolumeDiscountSeconds", cd, &result); err == nil {
- fmt.Println(result)
- }
- case "addrecievedcallseconds":
- var result float64
- if err = client.Call("Responder.AddRecievedCallSeconds", cd, &result); err == nil {
- fmt.Println(result)
- }*/
case "resetuserbudget":
var result float64
if err = client.Call("Responder.ResetUserBudget", cd, &result); err == nil {
@@ -133,9 +126,6 @@ func main() {
fmt.Println("\tdebitbalance")
fmt.Println("\tdebitsms")
fmt.Println("\tdebitseconds")
- // fmt.Println("\taddvolumediscountseconds")
- // fmt.Println("\tresetvolumediscountseconds")
- // fmt.Println("\taddrecievedcallseconds")
fmt.Println("\tresetuserbudget")
fmt.Println("\tstatus")
flag.PrintDefaults()
diff --git a/cmd/cgr-rater/balancer.go b/cmd/cgr-rater/balancer.go
deleted file mode 100644
index 1823a8e2f..000000000
--- a/cmd/cgr-rater/balancer.go
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
-Rating system designed to be used in VoIP Carriers World
-Copyright (C) 2012 Radu Ioan Fericean
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package main
-
-import (
- "errors"
- "flag"
- "github.com/cgrates/cgrates/balancer"
- "github.com/cgrates/cgrates/sessionmanager"
- "github.com/cgrates/cgrates/timespans"
- "log"
- "runtime"
- "time"
-)
-
-var (
- raterAddress = flag.String("rateraddr", "127.0.0.1:2000", "Rater server address (localhost:2000)")
- rpcAddress = flag.String("rpcaddr", "127.0.0.1:2001", "Json RPC server address (localhost:2001)")
- httpApiAddress = flag.String("httpapiaddr", "127.0.0.1:8000", "Http API server address (localhost:8000)")
- freeswitch = flag.Bool("freeswitch", false, "connect to freeswitch server")
- freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port")
- freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port")
- js = flag.Bool("json", false, "use JSON for RPC encoding")
- bal *balancer.Balancer
- accLock = timespans.NewAccountLock()
-)
-
-/*
-The function that gets the information from the raters using balancer.
-*/
-func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans.CallCost, err error) {
- err = errors.New("") //not nil value
- for err != nil {
- client := bal.Balance()
- if client == nil {
- log.Print("Waiting for raters to register...")
- time.Sleep(1 * time.Second) // wait one second and retry
- } else {
- reply = ×pans.CallCost{}
- reply, err = accLock.GuardGetCost(key.GetKey(), func() (*timespans.CallCost, error) {
- err = client.Call(method, *key, reply)
- return reply, err
- })
- if err != nil {
- log.Printf("Got en error from rater: %v", err)
- }
- }
- }
- return
-}
-
-/*
-The function that gets the information from the raters using balancer.
-*/
-func CallMethod(key *timespans.CallDescriptor, method string) (reply float64, err error) {
- err = errors.New("") //not nil value
- for err != nil {
- client := bal.Balance()
- if client == nil {
- log.Print("Waiting for raters to register...")
- time.Sleep(1 * time.Second) // wait one second and retry
- } else {
- reply, err = accLock.Guard(key.GetKey(), func() (float64, error) {
- err = client.Call(method, *key, &reply)
- return reply, err
- })
- if err != nil {
- log.Printf("Got en error from rater: %v", err)
- }
- }
- }
- return
-}
-
-func maind() {
- flag.Parse()
- runtime.GOMAXPROCS(runtime.NumCPU() - 1)
- bal = balancer.NewBalancer()
-
- go stopSingnalHandler()
- go listenToRPCRaterRequests()
- go listenToRPCRequests()
-
- if *freeswitch {
- sm := &sessionmanager.FSSessionManager{}
- sm.Connect(sessionmanager.NewRPCBalancerSessionDelegate(bal), *freeswitchsrv, *freeswitchpass)
- }
- listenToHttpRequests()
-}
diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go
index 0821462f8..1972922e7 100644
--- a/cmd/cgr-rater/cgr-rater.go
+++ b/cmd/cgr-rater/cgr-rater.go
@@ -20,10 +20,111 @@ package main
import (
"code.google.com/p/goconf/conf"
- "log"
+ "flag"
+ "fmt"
+ "github.com/cgrates/cgrates/timespans"
)
-func main() {
- c, err := conf.ReadConfigFile("something.config")
- log.Print(c.GetString("global", "redis_server")) // return something.com
+var (
+ config = flag.String("config", "/home/rif/Documents/prog/go/src/github.com/cgrates/cgrates/data/cgrates.config", "Configuration file location.")
+ redis_server = "127.0.0.1:6379" //"redis address host:port"
+ redis_db = 10 // "redis database number"
+
+ rater_standalone = false // "start standalone server (no balancer)"
+ rater_balancer_server = "127.0.0.1:2000" // "balancer address host:port"
+ rater_listen = "127.0.0.1:1234" // "listening address host:port"
+ rater_json = false // "use JSON for RPC encoding"
+
+ balancer_standalone = false // "run standalone (run as a rater)")
+ balancer_listen_rater = "127.0.0.1:2000" // "Rater server address (localhost:2000)"
+ balancer_listen_api = "127.0.0.1:2001" // "Json RPC server address (localhost:2001)"
+ balancer_json = false // "use JSON for RPC encoding"
+
+ scheduler_standalone = false // "run standalone (no other service)")
+ scheduler_json = false
+
+ sm_standalone = false // "run standalone (run as a rater)")
+ sm_api_server = "127.0.0.1:2000" // "balancer address host:port"
+ sm_freeswitchsrv = "localhost:8021" // "freeswitch address host:port"
+ sm_freeswitchpass = "ClueCon" // freeswitch address host:port"
+ sm_json = false // "use JSON for RPC encoding"
+)
+
+func readConfig() {
+ flag.Parse()
+ c, err := conf.ReadConfigFile(*config)
+ if err != nil {
+ timespans.Logger.Err(fmt.Sprintf("Could not open the configuration file: %v", err))
+ }
+ redis_server, err = c.GetString("global", "redis_server")
+ if err != nil {
+ }
+ redis_db, err = c.GetInt("global", "redis_db")
+ if err != nil {
+ }
+ rater_standalone, err = c.GetBool("rater", "standalone")
+ if err != nil {
+ }
+ rater_balancer_server, err = c.GetString("rater", "balancer_server")
+ if err != nil {
+ }
+ rater_listen, err = c.GetString("rater", "listen_api")
+ if err != nil {
+ }
+ rater_json, err = c.GetBool("rater", "json")
+ if err != nil {
+ }
+ balancer_standalone, err = c.GetBool("balancer", "standalone")
+ if err != nil {
+ }
+ balancer_listen_rater, err = c.GetString("balancer", "listen_rater")
+ if err != nil {
+ }
+ balancer_listen_api, err = c.GetString("balancer", "listen_api")
+ if err != nil {
+ }
+ balancer_json, err = c.GetBool("balancer", "json")
+ if err != nil {
+ }
+ scheduler_standalone, err = c.GetBool("scheduler", "standalone")
+ if err != nil {
+ }
+ scheduler_json, err = c.GetBool("scheduler", "json")
+ if err != nil {
+ }
+ sm_standalone, err = c.GetBool("session_manager", "standalone")
+ if err != nil {
+ }
+ sm_api_server, err = c.GetString("session_manager", "api_server")
+ if err != nil {
+ }
+ sm_freeswitchsrv, err = c.GetString("session_manager", "freeswitch_server")
+ if err != nil {
+ }
+ sm_freeswitchpass, err = c.GetString("session_manager", "freeswitch_pass")
+ if err != nil {
+ }
+ sm_json, err = c.GetBool("session_manager", "json")
+ if err != nil {
+ }
+}
+
+func resolveStandaloneConfilcts() {
+ if balancer_standalone {
+ rater_standalone = false
+ }
+ if scheduler_standalone {
+ rater_standalone = false
+ balancer_standalone = false
+ }
+ if sm_standalone {
+ rater_standalone = false
+ balancer_standalone = false
+ scheduler_standalone = false
+ }
+}
+
+func main() {
+ readConfig()
+ resolveStandaloneConfilcts()
}
diff --git a/cmd/cgr-rater/direct_responder.go b/cmd/cgr-rater/direct_responder.go
new file mode 100644
index 000000000..cc1edbd38
--- /dev/null
+++ b/cmd/cgr-rater/direct_responder.go
@@ -0,0 +1,98 @@
+/*
+Rating system designed to be used in VoIP Carriers World
+Copyright (C) 2012 Radu Ioan Fericean
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package main
+
+import (
+ "fmt"
+ "github.com/cgrates/cgrates/timespans"
+ "os"
+ "runtime"
+)
+
+type DirectResponder struct {
+ sg timespans.StorageGetter
+}
+
+/*
+RPC method providing the rating information from the storage.
+*/
+func (s *DirectResponder) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) {
+ r, e := timespans.AccLock.GuardGetCost(cd.GetUserBalanceKey(), func() (*timespans.CallCost, error) {
+ return (&cd).GetCost()
+ })
+ *reply, err = *r, e
+ return err
+}
+
+func (s *DirectResponder) DebitCents(cd timespans.CallDescriptor, reply *float64) (err error) {
+ r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) {
+ return (&cd).DebitCents()
+ })
+ *reply, err = r, e
+ return err
+}
+
+func (s *DirectResponder) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err error) {
+ r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) {
+ return (&cd).DebitSMS()
+ })
+ *reply, err = r, e
+ return err
+}
+
+func (s *DirectResponder) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
+ r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) {
+ return 0, (&cd).DebitSeconds()
+ })
+ *reply, err = r, e
+ return err
+}
+
+func (s *DirectResponder) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64) (err error) {
+ r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) {
+ return (&cd).GetMaxSessionTime()
+ })
+ *reply, err = r, e
+ return err
+}
+
+func (s *DirectResponder) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
+ r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) {
+ return 0, (&cd).AddRecievedCallSeconds()
+ })
+ *reply, err = r, e
+ return err
+}
+
+func (r *DirectResponder) Status(arg timespans.CallDescriptor, replay *string) (err error) {
+ memstats := new(runtime.MemStats)
+ runtime.ReadMemStats(memstats)
+ *replay = fmt.Sprintf("memstats before GC: %dKb footprint: %dKb", memstats.HeapAlloc/1024, memstats.Sys/1024)
+ return
+}
+
+/*
+RPC method that triggers rater shutdown in case of balancer exit.
+*/
+func (s *DirectResponder) Shutdown(args string, reply *string) (err error) {
+ s.sg.Close()
+ defer os.Exit(0)
+ *reply = "Done!"
+ return nil
+}
diff --git a/cmd/cgr-rater/rater.go b/cmd/cgr-rater/rater.go
deleted file mode 100644
index 48056c303..000000000
--- a/cmd/cgr-rater/rater.go
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
-Rating system designed to be used in VoIP Carriers World
-Copyright (C) 2012 Radu Ioan Fericean
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package main
-
-import (
- "flag"
- "fmt"
- "github.com/cgrates/cgrates/sessionmanager"
- "github.com/cgrates/cgrates/timespans"
- "log"
- "net"
- "net/rpc"
- "net/rpc/jsonrpc"
- "os"
- "runtime"
-)
-
-var (
- balancer = flag.String("balancer", "127.0.0.1:2000", "balancer address host:port")
- freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port")
- freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port")
- redissrv = flag.String("redissrv", "127.0.0.1:6379", "redis address host:port")
- redisdb = flag.Int("redisdb", 10, "redis database number")
- listen = flag.String("listen", "127.0.0.1:1234", "listening address host:port")
- standalone = flag.Bool("standalone", false, "start standalone server (no balancer, default false)")
- freeswitch = flag.Bool("freeswitch", false, "connect to freeswitch server")
- json = flag.Bool("json", false, "use JSON for RPC encoding")
- storage Responder
-)
-
-type Responder struct {
- sg timespans.StorageGetter
-}
-
-func NewStorage(nsg timespans.StorageGetter) *Responder {
- return &Responder{sg: nsg}
-}
-
-/*
-RPC method providing the rating information from the storage.
-*/
-func (s *Responder) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) {
- r, e := timespans.AccLock.GuardGetCost(cd.GetUserBalanceKey(), func() (*timespans.CallCost, error) {
- return (&cd).GetCost()
- })
- *reply, err = *r, e
- return err
-}
-
-func (s *Responder) DebitCents(cd timespans.CallDescriptor, reply *float64) (err error) {
- r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) {
- return (&cd).DebitCents()
- })
- *reply, err = r, e
- return err
-}
-
-func (s *Responder) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err error) {
- r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) {
- return (&cd).DebitSMS()
- })
- *reply, err = r, e
- return err
-}
-
-func (s *Responder) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
- r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) {
- return 0, (&cd).DebitSeconds()
- })
- *reply, err = r, e
- return err
-}
-
-func (s *Responder) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64) (err error) {
- r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) {
- return (&cd).GetMaxSessionTime()
- })
- *reply, err = r, e
- return err
-}
-
-func (s *Responder) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *float64) (err error) {
- r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) {
- return 0, (&cd).AddRecievedCallSeconds()
- })
- *reply, err = r, e
- return err
-}
-
-/*func (s *Responder) ResetUserBudget(cd timespans.CallDescriptor, reply *float64) (err error) {
- descriptor := &cd
- e := descriptor.ResetUserBudget()
- *reply, err = 0, e
- return err
-}*/
-
-func (r *Responder) Status(arg timespans.CallDescriptor, replay *string) (err error) {
- memstats := new(runtime.MemStats)
- runtime.ReadMemStats(memstats)
- *replay = fmt.Sprintf("memstats before GC: %dKb footprint: %dKb", memstats.HeapAlloc/1024, memstats.Sys/1024)
- return
-}
-
-/*
-RPC method that triggers rater shutdown in case of balancer exit.
-*/
-func (s *Responder) Shutdown(args string, reply *string) (err error) {
- s.sg.Close()
- defer os.Exit(0)
- *reply = "Done!"
- return nil
-}
-
-func maina() {
- flag.Parse()
- //getter, err := timespans.NewKyotoStorage("storage.kch")
- getter, err := timespans.NewRedisStorage(*redissrv, *redisdb)
- defer getter.Close()
- timespans.SetStorageGetter(getter)
- if err != nil {
- log.Fatalf("Cannot open storage: %v", err)
- }
- if *freeswitch {
- sm := &sessionmanager.FSSessionManager{}
- sm.Connect(sessionmanager.NewDirectSessionDelegate(getter), *freeswitchsrv, *freeswitchpass)
- }
- if !*standalone {
- go RegisterToServer(balancer, listen)
- go StopSingnalHandler(balancer, listen, getter)
- }
- /*rpc.Register(NewStorage(getter))
- rpc.HandleHTTP()
- addr, err1 := net.ResolveTCPAddr("tcp", *listen)
- l, err2 := net.ListenTCP("tcp", addr)
- if err1 != nil || err2 != nil {
- log.Print("cannot create listener for specified address ", *listen)
- os.Exit(1)
- }
- rpc.Accept(l)*/
-
- log.Print("Starting Server...")
- l, err := net.Listen("tcp", *listen)
- defer l.Close()
- if err != nil {
- log.Fatal(err)
- }
- log.Print("listening on: ", l.Addr())
- rpc.Register(NewStorage(getter))
- log.Print("waiting for connections ...")
- for {
- conn, err := l.Accept()
- if err != nil {
- log.Printf("accept error: %s", conn)
- continue
- }
- log.Printf("connection started: %v", conn.RemoteAddr())
- if *json {
- // log.Print("json encoding")
- go jsonrpc.ServeConn(conn)
- } else {
- // log.Print("gob encoding")
- go rpc.ServeConn(conn)
- }
-
- }
-}
diff --git a/cmd/cgr-rater/registration.go b/cmd/cgr-rater/registration.go
index ae374d622..6f983888b 100644
--- a/cmd/cgr-rater/registration.go
+++ b/cmd/cgr-rater/registration.go
@@ -19,33 +19,97 @@ along with this program. If not, see
package main
import (
+ "fmt"
+ "github.com/cgrates/cgrates/balancer"
"github.com/cgrates/cgrates/timespans"
"log"
+ "net/http"
"net/rpc"
"os"
"os/signal"
"syscall"
+ "time"
)
/*
-Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from inquirer and closes the storage before exiting.
+RPC Server that handles the registering and unregistering of raters.
*/
-func StopSingnalHandler(server, listen *string, sg timespans.StorageGetter) {
+type RaterServer struct{}
+
+func listenToRPCRaterRequests() {
+ rpc.Register(new(RaterServer))
+ rpc.HandleHTTP()
+ http.ListenAndServe(*raterAddress, nil)
+}
+
+var (
+ bal = balancer.NewBalancer()
+)
+
+/*
+Listens for SIGTERM, SIGINT, SIGQUIT system signals and shuts down all the registered raters.
+*/
+func stopSingnalHandler() {
+ log.Print("Handling stop signals...")
+ c := make(chan os.Signal)
+ signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
+
+ sig := <-c
+ log.Printf("Caught signal %v, sending shutdownto raters\n", sig)
+ bal.Shutdown()
+ os.Exit(1)
+}
+
+/*
+RPC method that receives a rater address, connects to it and ads the pair to the rater list for balancing
+*/
+func (rs *RaterServer) RegisterRater(clientAddress string, replay *int) error {
+ log.Printf("Started rater %v registration...", clientAddress)
+ time.Sleep(2 * time.Second) // wait a second for Rater to start serving
+ client, err := rpc.Dial("tcp", clientAddress)
+ if err != nil {
+ log.Print("Could not connect to client!")
+ return err
+ }
+ bal.AddClient(clientAddress, client)
+ log.Printf("Rater %v registered succesfully.", clientAddress)
+ return nil
+}
+
+/*
+RPC method that recives a rater addres gets the connections and closes it and removes the pair from rater list.
+*/
+func (rs *RaterServer) UnRegisterRater(clientAddress string, replay *int) error {
+ client, ok := bal.GetClient(clientAddress)
+ if ok {
+ client.Close()
+ bal.RemoveClient(clientAddress)
+ log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress))
+ } else {
+ log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress))
+ }
+ return nil
+}
+
+/*
+Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from balancer and closes the storage before exiting.
+*/
+func stopRaterSingnalHandler(server, listen *string, sg timespans.StorageGetter) {
log.Print("Handling stop signals...")
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
sig := <-c
log.Printf("Caught signal %v, unregistering from balancer\n", sig)
- unregisterFromServer(server, listen)
+ unregisterFromBalancer(server, listen)
sg.Close()
os.Exit(1)
}
/*
-Connects to the inquirer and calls unregister RPC method.
+Connects to the balancer and calls unregister RPC method.
*/
-func unregisterFromServer(server, listen *string) {
+func unregisterFromBalancer(server, listen *string) {
client, err := rpc.DialHTTP("tcp", *server)
if err != nil {
log.Print("Cannot contact the balancer!")
@@ -61,9 +125,9 @@ func unregisterFromServer(server, listen *string) {
}
/*
-Connects to the inquirer and rehisters the rater to the server.
+Connects to the balancer and rehisters the rater to the server.
*/
-func RegisterToServer(server, listen *string) {
+func registerToBalancer(server, listen *string) {
client, err := rpc.DialHTTP("tcp", *server)
if err != nil {
log.Print("Cannot contact the balancer!")
diff --git a/cmd/cgr-rater/rpc_responder.go b/cmd/cgr-rater/rpc_responder.go
index c72eb23db..14d4d47e3 100644
--- a/cmd/cgr-rater/rpc_responder.go
+++ b/cmd/cgr-rater/rpc_responder.go
@@ -15,9 +15,11 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
+
package main
import (
+ "errors"
"fmt"
"github.com/cgrates/cgrates/timespans"
"log"
@@ -25,66 +27,52 @@ import (
"net/rpc"
"net/rpc/jsonrpc"
"runtime"
+ "time"
)
-type Responder byte
+type RpcResponder struct{}
/*
RPC method thet provides the external RPC interface for getting the rating information.
*/
-func (r *Responder) GetCost(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) {
+func (r *RpcResponder) GetCost(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) {
rs, err := GetCallCost(&arg, "Responder.GetCost")
*replay = *rs
return
}
-func (r *Responder) Debit(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) {
+func (r *RpcResponder) Debit(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) {
rs, err := GetCallCost(&arg, "Responder.Debit")
*replay = *rs
return
}
-func (r *Responder) DebitBalance(arg timespans.CallDescriptor, replay *float64) (err error) {
+func (r *RpcResponder) DebitBalance(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay, err = CallMethod(&arg, "Responder.DebitCents")
return
}
-func (r *Responder) DebitSMS(arg timespans.CallDescriptor, replay *float64) (err error) {
+func (r *RpcResponder) DebitSMS(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay, err = CallMethod(&arg, "Responder.DebitSMS")
return
}
-func (r *Responder) DebitSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
+func (r *RpcResponder) DebitSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay, err = CallMethod(&arg, "Responder.DebitSeconds")
return
}
-func (r *Responder) GetMaxSessionTime(arg timespans.CallDescriptor, replay *float64) (err error) {
+func (r *RpcResponder) GetMaxSessionTime(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay, err = CallMethod(&arg, "Responder.GetMaxSessionTime")
return
}
-func (r *Responder) AddVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
+func (r *RpcResponder) AddVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
*replay, err = CallMethod(&arg, "Responder.AddVolumeDiscountSeconds")
return
}
-/*func (r *Responder) ResetVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
- *replay = CallMethod(&arg, "Responder.ResetVolumeDiscountSeconds")
- return
-}
-
-func (r *Responder) AddRecievedCallSeconds(arg timespans.CallDescriptor, replay *float64) (err error) {
- *replay = CallMethod(&arg, "Responder.AddRecievedCallSeconds")
- return
-}
-
-func (r *Responder) ResetUserBudget(arg timespans.CallDescriptor, replay *float64) (err error) {
- *replay = CallMethod(&arg, "Responder.ResetUserBudget")
- return
-}*/
-
-func (r *Responder) Status(arg timespans.CallDescriptor, replay *string) (err error) {
+func (r *RpcResponder) Status(arg timespans.CallDescriptor, replay *string) (err error) {
memstats := new(runtime.MemStats)
runtime.ReadMemStats(memstats)
*replay = "Connected raters:\n"
@@ -129,3 +117,50 @@ func listenToRPCRequests() {
}
}
}
+
+/*
+The function that gets the information from the raters using balancer.
+*/
+func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans.CallCost, err error) {
+ err = errors.New("") //not nil value
+ for err != nil {
+ client := bal.Balance()
+ if client == nil {
+ log.Print("Waiting for raters to register...")
+ time.Sleep(1 * time.Second) // wait one second and retry
+ } else {
+ reply = ×pans.CallCost{}
+ reply, err = accLock.GuardGetCost(key.GetKey(), func() (*timespans.CallCost, error) {
+ err = client.Call(method, *key, reply)
+ return reply, err
+ })
+ if err != nil {
+ log.Printf("Got en error from rater: %v", err)
+ }
+ }
+ }
+ return
+}
+
+/*
+The function that gets the information from the raters using balancer.
+*/
+func CallMethod(key *timespans.CallDescriptor, method string) (reply float64, err error) {
+ err = errors.New("") //not nil value
+ for err != nil {
+ client := bal.Balance()
+ if client == nil {
+ log.Print("Waiting for raters to register...")
+ time.Sleep(1 * time.Second) // wait one second and retry
+ } else {
+ reply, err = accLock.Guard(key.GetKey(), func() (float64, error) {
+ err = client.Call(method, *key, &reply)
+ return reply, err
+ })
+ if err != nil {
+ log.Printf("Got en error from rater: %v", err)
+ }
+ }
+ }
+ return
+}
diff --git a/cmd/cgr-rater/balancer_test.go b/cmd/cgr-rater/rpc_responder_test.go
similarity index 100%
rename from cmd/cgr-rater/balancer_test.go
rename to cmd/cgr-rater/rpc_responder_test.go
diff --git a/cmd/cgr-rater/scheduler.go b/cmd/cgr-rater/scheduler.go
index 43282707a..f61111247 100644
--- a/cmd/cgr-rater/scheduler.go
+++ b/cmd/cgr-rater/scheduler.go
@@ -19,21 +19,13 @@ along with this program. If not, see
package main
import (
- "flag"
"github.com/cgrates/cgrates/timespans"
"log"
- "os"
- "os/signal"
"sort"
- "syscall"
"time"
)
var (
- redisserver = flag.String("redisserver", "127.0.0.1:6379", "redis server address (tcp:127.0.0.1:6379)")
- redisdb = flag.Int("rdb", 10, "redis database number (10)")
- redispass = flag.String("pass", "", "redis database password")
- httpAddress = flag.String("httpapiaddr", "127.0.0.1:8000", "Http API server address (localhost:8000)")
storage timespans.StorageGetter
timer *time.Timer
restartLoop = make(chan byte)
@@ -71,20 +63,12 @@ func (s scheduler) loop() {
}
}
-// Listens for the HUP system signal and gracefuly reloads the timers from database.
-func stopSingnalHandler() {
- log.Print("Handling HUP signal...")
- for {
- c := make(chan os.Signal)
- signal.Notify(c, syscall.SIGHUP)
- sig := <-c
-
- log.Printf("Caught signal %v, reloading action timings.\n", sig)
- loadActionTimings()
- // check the tip of the queue for new actions
- restartLoop <- 1
- timer.Stop()
- }
+func reloadActionTimings() {
+ log.Print("Reloading action timings.")
+ loadActionTimings()
+ // check the tip of the queue for new actions
+ restartLoop <- 1
+ timer.Stop()
}
func loadActionTimings() {
@@ -104,18 +88,3 @@ func loadActionTimings() {
}
sort.Sort(s.queue)
}
-
-func mainb() {
- flag.Parse()
- var err error
- storage, err = timespans.NewRedisStorage(*redisserver, *redisdb)
- if err != nil {
- log.Fatalf("Could not open database connection: %v", err)
- }
- defer storage.Close()
- timespans.SetStorageGetter(storage)
- loadActionTimings()
- go stopSingnalHandler()
- // go startWebApp()
- s.loop()
-}
diff --git a/cmd/cgr-rater/sessionmanager.go b/cmd/cgr-rater/sessionmanager.go
index e56d6779b..dd2838be9 100644
--- a/cmd/cgr-rater/sessionmanager.go
+++ b/cmd/cgr-rater/sessionmanager.go
@@ -27,16 +27,6 @@ import (
"net/rpc/jsonrpc"
)
-var (
- standalone = flag.Bool("standalone", false, "run standalone (run as a rater)")
- json = flag.Bool("json", false, "use JSON for RPC encoding")
- balancer = flag.String("balancer", "127.0.0.1:2000", "balancer address host:port")
- freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port")
- freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port")
- redissrv = flag.String("redissrv", "127.0.0.1:6379", "redis address host:port")
- redisdb = flag.Int("redisdb", 10, "redis database number")
-)
-
func mainc() {
flag.Parse()
sm := &sessionmanager.FSSessionManager{}
diff --git a/data/cgrates.config b/data/cgrates.config
index 069124f10..3a01b095f 100644
--- a/data/cgrates.config
+++ b/data/cgrates.config
@@ -15,27 +15,29 @@
# along with this program. If not, see
[global]
-redis_server = "127.0.0.1:6379" #"redis address host:port"
-redis_db = 10 # "redis database number"
+redis_server = "127.0.0.1:6379" #"redis address host:port"
+redis_db = 10 # "redis database number"
[rater]
-standalone = false # "start standalone server (no balancer)"
-balancer_server = "127.0.0.1:2000" # "balancer address host:port"
-listen = "127.0.0.1:1234" # "listening address host:port"
-json = false # "use JSON for RPC encoding"
+standalone = false # "start standalone server (no balancer)"
+balancer_server = "127.0.0.1:2000" # "balancer address host:port"
+listen_api = "127.0.0.1:1234" # "listening address host:port"
+json = false # "use JSON for RPC encoding"
[balancer]
-standalone = false # "run standalone (run as a rater)")
-raterAddress = "127.0.0.1:2000" # "Rater server address (localhost:2000)"
-rpcAddress = "127.0.0.1:2001" # "Json RPC server address (localhost:2001)"
-json = false # "use JSON for RPC encoding"
+standalone = false # "run standalone (run as a rater)")
+listen_rater = "127.0.0.1:2000" # "Rater server address (localhost:2000)"
+listen_api = "127.0.0.1:2001" # "Json RPC server address (localhost:2001)"
+json = false # "use JSON for RPC encoding"
[scheduler]
-standalone = false # "run standalone (no other service)")
+standalone = false # "run standalone (no other service)")
+json = false # "use JSON for RPC encoding"
+
+[session_manager]
+standalone = false # "run standalone (run as a rater)")
+api_server = "127.0.0.1:2000" # "balancer address host:port"
+freeswitch_server = "localhost:8021" # "freeswitch address host:port"
+freeswitch_pass = "ClueCon", # freeswitch address host:port"
+json = false # "use JSON for RPC encoding"
-[session-manager]
-standalone = false # "run standalone (run as a rater)")
-balancer = "127.0.0.1:2000" # "balancer address host:port"
-freeswitchsrv = "localhost:8021" # "freeswitch address host:port"
-freeswitchpass = "ClueCon", # freeswitch address host:port"
-json = false # "use JSON for RPC encoding"
\ No newline at end of file
diff --git a/timespans/calldesc.go b/timespans/calldesc.go
index 6ebb7c0ec..a8d9e54b1 100644
--- a/timespans/calldesc.go
+++ b/timespans/calldesc.go
@@ -21,15 +21,16 @@ package timespans
import (
"errors"
"fmt"
+ "log/syslog"
"math"
"time"
)
func init() {
var err error
- logger, err = NewSyslogLogger()
+ Logger, err = syslog.New(syslog.LOG_INFO, "CGRateS")
if err != nil {
- logger = new(StdLogger)
+ Logger = new(StdLogger)
}
}
@@ -42,7 +43,7 @@ const (
var (
storageGetter StorageGetter
- logger LoggerInterface
+ Logger LoggerInterface
)
/*
@@ -282,7 +283,7 @@ func (cd *CallDescriptor) GetCost() (*CallCost, error) {
Cost: cost,
ConnectFee: connectionFee,
Timespans: timespans}
- logger.Info(fmt.Sprintf("Get Cost: %v => %v", cd, cc))
+ Logger.Info(fmt.Sprintf("Get Cost: %v => %v", cd, cc))
return cc, err
}
@@ -337,7 +338,7 @@ func (cd *CallDescriptor) GetMaxSessionTime() (seconds float64, err error) {
func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
cc, err = cd.GetCost()
if err != nil {
- logger.Err(fmt.Sprintf("error getting cost %v", err))
+ Logger.Err(fmt.Sprintf("error getting cost %v", err))
}
if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil {
defer storageGetter.SetUserBalance(userBalance)
diff --git a/timespans/logger.go b/timespans/logger.go
index eea92e78b..09282ea17 100644
--- a/timespans/logger.go
+++ b/timespans/logger.go
@@ -20,7 +20,6 @@ package timespans
import (
"log"
- "log/syslog"
)
type LoggerInterface interface {
@@ -35,43 +34,7 @@ type LoggerInterface interface {
Warning(m string) error
}
-type SyslogLogger struct {
- *syslog.Writer
-}
-
-func NewSyslogLogger() (*SyslogLogger, error) {
- logger, err := syslog.New(syslog.LOG_INFO, "CGRateS")
- return &SyslogLogger{logger}, err
-}
-
-func (sl *SyslogLogger) Alert(m string) (err error) {
- return sl.Alert(m)
-}
-func (sl *SyslogLogger) Close() error {
- return sl.Close()
-}
-func (sl *SyslogLogger) Crit(m string) (err error) {
- return sl.Crit(m)
-}
-func (sl *SyslogLogger) Debug(m string) (err error) {
- return sl.Debug(m)
-}
-func (sl *SyslogLogger) Emerg(m string) (err error) {
- return sl.Emerg(m)
-}
-func (sl *SyslogLogger) Err(m string) (err error) {
- return sl.Err(m)
-}
-func (sl *SyslogLogger) Info(m string) (err error) {
- return sl.Info(m)
-}
-func (sl *SyslogLogger) Notice(m string) (err error) {
- return sl.Notice(m)
-}
-func (sl *SyslogLogger) Warning(m string) (err error) {
- return sl.Warning(m)
-}
-
+// Logs to standard output
type StdLogger struct{}
func (sl *StdLogger) Alert(m string) (err error) {