From 33dcd6e3efb74789bd9180b3f8a87ec04bd2017c Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 20 Jul 2012 16:55:22 +0300 Subject: [PATCH] loaded configs --- cmd/cgr-console/cgr-console.go | 28 +-- cmd/cgr-rater/balancer.go | 105 ---------- cmd/cgr-rater/cgr-rater.go | 109 ++++++++++- cmd/cgr-rater/direct_responder.go | 98 ++++++++++ cmd/cgr-rater/rater.go | 182 ------------------ cmd/cgr-rater/registration.go | 78 +++++++- cmd/cgr-rater/rpc_responder.go | 83 +++++--- ...balancer_test.go => rpc_responder_test.go} | 0 cmd/cgr-rater/scheduler.go | 43 +---- cmd/cgr-rater/sessionmanager.go | 10 - data/cgrates.config | 36 ++-- timespans/calldesc.go | 11 +- timespans/logger.go | 39 +--- 13 files changed, 374 insertions(+), 448 deletions(-) delete mode 100644 cmd/cgr-rater/balancer.go create mode 100644 cmd/cgr-rater/direct_responder.go delete mode 100644 cmd/cgr-rater/rater.go rename cmd/cgr-rater/{balancer_test.go => rpc_responder_test.go} (100%) diff --git a/cmd/cgr-console/cgr-console.go b/cmd/cgr-console/cgr-console.go index 4174dbb21..0ae306aae 100644 --- a/cmd/cgr-console/cgr-console.go +++ b/cmd/cgr-console/cgr-console.go @@ -23,6 +23,7 @@ import ( "fmt" "github.com/cgrates/cgrates/timespans" "log" + "net/rpc" "net/rpc/jsonrpc" "os" "time" @@ -39,11 +40,18 @@ var ( start = flag.String("start", "2012-02-09T00:00:00Z", "Time start") end = flag.String("end", "2012-02-09T00:10:00Z", "Time end") amount = flag.Float64("amount", 100, "Amount for different operations") + json = flag.Bool("json", false, "Use JSON for RPC encoding.") ) func main() { flag.Parse() - client, err := jsonrpc.Dial("tcp", *server) + var client *rpc.Client + var err error + if *json { + client, err = jsonrpc.Dial("tcp", *server) + } else { + client, err = rpc.Dial("tcp", *server) + } if err != nil { log.Fatal("Could not connect to server " + *server) } @@ -101,21 +109,6 @@ func main() { if err = client.Call("Responder.DebitSeconds", cd, &result); err == nil { fmt.Println(result) } - /*case "addvolumediscountseconds": - var result float64 - if err = client.Call("Responder.AddVolumeDiscountSeconds", cd, &result); err == nil { - fmt.Println(result) - } - case "resetvolumediscountseconds": - var result float64 - if err = client.Call("Responder.ResetVolumeDiscountSeconds", cd, &result); err == nil { - fmt.Println(result) - } - case "addrecievedcallseconds": - var result float64 - if err = client.Call("Responder.AddRecievedCallSeconds", cd, &result); err == nil { - fmt.Println(result) - }*/ case "resetuserbudget": var result float64 if err = client.Call("Responder.ResetUserBudget", cd, &result); err == nil { @@ -133,9 +126,6 @@ func main() { fmt.Println("\tdebitbalance") fmt.Println("\tdebitsms") fmt.Println("\tdebitseconds") - // fmt.Println("\taddvolumediscountseconds") - // fmt.Println("\tresetvolumediscountseconds") - // fmt.Println("\taddrecievedcallseconds") fmt.Println("\tresetuserbudget") fmt.Println("\tstatus") flag.PrintDefaults() diff --git a/cmd/cgr-rater/balancer.go b/cmd/cgr-rater/balancer.go deleted file mode 100644 index 1823a8e2f..000000000 --- a/cmd/cgr-rater/balancer.go +++ /dev/null @@ -1,105 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012 Radu Ioan Fericean - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package main - -import ( - "errors" - "flag" - "github.com/cgrates/cgrates/balancer" - "github.com/cgrates/cgrates/sessionmanager" - "github.com/cgrates/cgrates/timespans" - "log" - "runtime" - "time" -) - -var ( - raterAddress = flag.String("rateraddr", "127.0.0.1:2000", "Rater server address (localhost:2000)") - rpcAddress = flag.String("rpcaddr", "127.0.0.1:2001", "Json RPC server address (localhost:2001)") - httpApiAddress = flag.String("httpapiaddr", "127.0.0.1:8000", "Http API server address (localhost:8000)") - freeswitch = flag.Bool("freeswitch", false, "connect to freeswitch server") - freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port") - freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port") - js = flag.Bool("json", false, "use JSON for RPC encoding") - bal *balancer.Balancer - accLock = timespans.NewAccountLock() -) - -/* -The function that gets the information from the raters using balancer. -*/ -func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans.CallCost, err error) { - err = errors.New("") //not nil value - for err != nil { - client := bal.Balance() - if client == nil { - log.Print("Waiting for raters to register...") - time.Sleep(1 * time.Second) // wait one second and retry - } else { - reply = ×pans.CallCost{} - reply, err = accLock.GuardGetCost(key.GetKey(), func() (*timespans.CallCost, error) { - err = client.Call(method, *key, reply) - return reply, err - }) - if err != nil { - log.Printf("Got en error from rater: %v", err) - } - } - } - return -} - -/* -The function that gets the information from the raters using balancer. -*/ -func CallMethod(key *timespans.CallDescriptor, method string) (reply float64, err error) { - err = errors.New("") //not nil value - for err != nil { - client := bal.Balance() - if client == nil { - log.Print("Waiting for raters to register...") - time.Sleep(1 * time.Second) // wait one second and retry - } else { - reply, err = accLock.Guard(key.GetKey(), func() (float64, error) { - err = client.Call(method, *key, &reply) - return reply, err - }) - if err != nil { - log.Printf("Got en error from rater: %v", err) - } - } - } - return -} - -func maind() { - flag.Parse() - runtime.GOMAXPROCS(runtime.NumCPU() - 1) - bal = balancer.NewBalancer() - - go stopSingnalHandler() - go listenToRPCRaterRequests() - go listenToRPCRequests() - - if *freeswitch { - sm := &sessionmanager.FSSessionManager{} - sm.Connect(sessionmanager.NewRPCBalancerSessionDelegate(bal), *freeswitchsrv, *freeswitchpass) - } - listenToHttpRequests() -} diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index 0821462f8..1972922e7 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -20,10 +20,111 @@ package main import ( "code.google.com/p/goconf/conf" - "log" + "flag" + "fmt" + "github.com/cgrates/cgrates/timespans" ) -func main() { - c, err := conf.ReadConfigFile("something.config") - log.Print(c.GetString("global", "redis_server")) // return something.com +var ( + config = flag.String("config", "/home/rif/Documents/prog/go/src/github.com/cgrates/cgrates/data/cgrates.config", "Configuration file location.") + redis_server = "127.0.0.1:6379" //"redis address host:port" + redis_db = 10 // "redis database number" + + rater_standalone = false // "start standalone server (no balancer)" + rater_balancer_server = "127.0.0.1:2000" // "balancer address host:port" + rater_listen = "127.0.0.1:1234" // "listening address host:port" + rater_json = false // "use JSON for RPC encoding" + + balancer_standalone = false // "run standalone (run as a rater)") + balancer_listen_rater = "127.0.0.1:2000" // "Rater server address (localhost:2000)" + balancer_listen_api = "127.0.0.1:2001" // "Json RPC server address (localhost:2001)" + balancer_json = false // "use JSON for RPC encoding" + + scheduler_standalone = false // "run standalone (no other service)") + scheduler_json = false + + sm_standalone = false // "run standalone (run as a rater)") + sm_api_server = "127.0.0.1:2000" // "balancer address host:port" + sm_freeswitchsrv = "localhost:8021" // "freeswitch address host:port" + sm_freeswitchpass = "ClueCon" // freeswitch address host:port" + sm_json = false // "use JSON for RPC encoding" +) + +func readConfig() { + flag.Parse() + c, err := conf.ReadConfigFile(*config) + if err != nil { + timespans.Logger.Err(fmt.Sprintf("Could not open the configuration file: %v", err)) + } + redis_server, err = c.GetString("global", "redis_server") + if err != nil { + } + redis_db, err = c.GetInt("global", "redis_db") + if err != nil { + } + rater_standalone, err = c.GetBool("rater", "standalone") + if err != nil { + } + rater_balancer_server, err = c.GetString("rater", "balancer_server") + if err != nil { + } + rater_listen, err = c.GetString("rater", "listen_api") + if err != nil { + } + rater_json, err = c.GetBool("rater", "json") + if err != nil { + } + balancer_standalone, err = c.GetBool("balancer", "standalone") + if err != nil { + } + balancer_listen_rater, err = c.GetString("balancer", "listen_rater") + if err != nil { + } + balancer_listen_api, err = c.GetString("balancer", "listen_api") + if err != nil { + } + balancer_json, err = c.GetBool("balancer", "json") + if err != nil { + } + scheduler_standalone, err = c.GetBool("scheduler", "standalone") + if err != nil { + } + scheduler_json, err = c.GetBool("scheduler", "json") + if err != nil { + } + sm_standalone, err = c.GetBool("session_manager", "standalone") + if err != nil { + } + sm_api_server, err = c.GetString("session_manager", "api_server") + if err != nil { + } + sm_freeswitchsrv, err = c.GetString("session_manager", "freeswitch_server") + if err != nil { + } + sm_freeswitchpass, err = c.GetString("session_manager", "freeswitch_pass") + if err != nil { + } + sm_json, err = c.GetBool("session_manager", "json") + if err != nil { + } +} + +func resolveStandaloneConfilcts() { + if balancer_standalone { + rater_standalone = false + } + if scheduler_standalone { + rater_standalone = false + balancer_standalone = false + } + if sm_standalone { + rater_standalone = false + balancer_standalone = false + scheduler_standalone = false + } +} + +func main() { + readConfig() + resolveStandaloneConfilcts() } diff --git a/cmd/cgr-rater/direct_responder.go b/cmd/cgr-rater/direct_responder.go new file mode 100644 index 000000000..cc1edbd38 --- /dev/null +++ b/cmd/cgr-rater/direct_responder.go @@ -0,0 +1,98 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012 Radu Ioan Fericean + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package main + +import ( + "fmt" + "github.com/cgrates/cgrates/timespans" + "os" + "runtime" +) + +type DirectResponder struct { + sg timespans.StorageGetter +} + +/* +RPC method providing the rating information from the storage. +*/ +func (s *DirectResponder) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) { + r, e := timespans.AccLock.GuardGetCost(cd.GetUserBalanceKey(), func() (*timespans.CallCost, error) { + return (&cd).GetCost() + }) + *reply, err = *r, e + return err +} + +func (s *DirectResponder) DebitCents(cd timespans.CallDescriptor, reply *float64) (err error) { + r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) { + return (&cd).DebitCents() + }) + *reply, err = r, e + return err +} + +func (s *DirectResponder) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err error) { + r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) { + return (&cd).DebitSMS() + }) + *reply, err = r, e + return err +} + +func (s *DirectResponder) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { + r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) { + return 0, (&cd).DebitSeconds() + }) + *reply, err = r, e + return err +} + +func (s *DirectResponder) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64) (err error) { + r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) { + return (&cd).GetMaxSessionTime() + }) + *reply, err = r, e + return err +} + +func (s *DirectResponder) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { + r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) { + return 0, (&cd).AddRecievedCallSeconds() + }) + *reply, err = r, e + return err +} + +func (r *DirectResponder) Status(arg timespans.CallDescriptor, replay *string) (err error) { + memstats := new(runtime.MemStats) + runtime.ReadMemStats(memstats) + *replay = fmt.Sprintf("memstats before GC: %dKb footprint: %dKb", memstats.HeapAlloc/1024, memstats.Sys/1024) + return +} + +/* +RPC method that triggers rater shutdown in case of balancer exit. +*/ +func (s *DirectResponder) Shutdown(args string, reply *string) (err error) { + s.sg.Close() + defer os.Exit(0) + *reply = "Done!" + return nil +} diff --git a/cmd/cgr-rater/rater.go b/cmd/cgr-rater/rater.go deleted file mode 100644 index 48056c303..000000000 --- a/cmd/cgr-rater/rater.go +++ /dev/null @@ -1,182 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012 Radu Ioan Fericean - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package main - -import ( - "flag" - "fmt" - "github.com/cgrates/cgrates/sessionmanager" - "github.com/cgrates/cgrates/timespans" - "log" - "net" - "net/rpc" - "net/rpc/jsonrpc" - "os" - "runtime" -) - -var ( - balancer = flag.String("balancer", "127.0.0.1:2000", "balancer address host:port") - freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port") - freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port") - redissrv = flag.String("redissrv", "127.0.0.1:6379", "redis address host:port") - redisdb = flag.Int("redisdb", 10, "redis database number") - listen = flag.String("listen", "127.0.0.1:1234", "listening address host:port") - standalone = flag.Bool("standalone", false, "start standalone server (no balancer, default false)") - freeswitch = flag.Bool("freeswitch", false, "connect to freeswitch server") - json = flag.Bool("json", false, "use JSON for RPC encoding") - storage Responder -) - -type Responder struct { - sg timespans.StorageGetter -} - -func NewStorage(nsg timespans.StorageGetter) *Responder { - return &Responder{sg: nsg} -} - -/* -RPC method providing the rating information from the storage. -*/ -func (s *Responder) GetCost(cd timespans.CallDescriptor, reply *timespans.CallCost) (err error) { - r, e := timespans.AccLock.GuardGetCost(cd.GetUserBalanceKey(), func() (*timespans.CallCost, error) { - return (&cd).GetCost() - }) - *reply, err = *r, e - return err -} - -func (s *Responder) DebitCents(cd timespans.CallDescriptor, reply *float64) (err error) { - r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) { - return (&cd).DebitCents() - }) - *reply, err = r, e - return err -} - -func (s *Responder) DebitSMS(cd timespans.CallDescriptor, reply *float64) (err error) { - r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) { - return (&cd).DebitSMS() - }) - *reply, err = r, e - return err -} - -func (s *Responder) DebitSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { - r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) { - return 0, (&cd).DebitSeconds() - }) - *reply, err = r, e - return err -} - -func (s *Responder) GetMaxSessionTime(cd timespans.CallDescriptor, reply *float64) (err error) { - r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) { - return (&cd).GetMaxSessionTime() - }) - *reply, err = r, e - return err -} - -func (s *Responder) AddRecievedCallSeconds(cd timespans.CallDescriptor, reply *float64) (err error) { - r, e := timespans.AccLock.Guard(cd.GetUserBalanceKey(), func() (float64, error) { - return 0, (&cd).AddRecievedCallSeconds() - }) - *reply, err = r, e - return err -} - -/*func (s *Responder) ResetUserBudget(cd timespans.CallDescriptor, reply *float64) (err error) { - descriptor := &cd - e := descriptor.ResetUserBudget() - *reply, err = 0, e - return err -}*/ - -func (r *Responder) Status(arg timespans.CallDescriptor, replay *string) (err error) { - memstats := new(runtime.MemStats) - runtime.ReadMemStats(memstats) - *replay = fmt.Sprintf("memstats before GC: %dKb footprint: %dKb", memstats.HeapAlloc/1024, memstats.Sys/1024) - return -} - -/* -RPC method that triggers rater shutdown in case of balancer exit. -*/ -func (s *Responder) Shutdown(args string, reply *string) (err error) { - s.sg.Close() - defer os.Exit(0) - *reply = "Done!" - return nil -} - -func maina() { - flag.Parse() - //getter, err := timespans.NewKyotoStorage("storage.kch") - getter, err := timespans.NewRedisStorage(*redissrv, *redisdb) - defer getter.Close() - timespans.SetStorageGetter(getter) - if err != nil { - log.Fatalf("Cannot open storage: %v", err) - } - if *freeswitch { - sm := &sessionmanager.FSSessionManager{} - sm.Connect(sessionmanager.NewDirectSessionDelegate(getter), *freeswitchsrv, *freeswitchpass) - } - if !*standalone { - go RegisterToServer(balancer, listen) - go StopSingnalHandler(balancer, listen, getter) - } - /*rpc.Register(NewStorage(getter)) - rpc.HandleHTTP() - addr, err1 := net.ResolveTCPAddr("tcp", *listen) - l, err2 := net.ListenTCP("tcp", addr) - if err1 != nil || err2 != nil { - log.Print("cannot create listener for specified address ", *listen) - os.Exit(1) - } - rpc.Accept(l)*/ - - log.Print("Starting Server...") - l, err := net.Listen("tcp", *listen) - defer l.Close() - if err != nil { - log.Fatal(err) - } - log.Print("listening on: ", l.Addr()) - rpc.Register(NewStorage(getter)) - log.Print("waiting for connections ...") - for { - conn, err := l.Accept() - if err != nil { - log.Printf("accept error: %s", conn) - continue - } - log.Printf("connection started: %v", conn.RemoteAddr()) - if *json { - // log.Print("json encoding") - go jsonrpc.ServeConn(conn) - } else { - // log.Print("gob encoding") - go rpc.ServeConn(conn) - } - - } -} diff --git a/cmd/cgr-rater/registration.go b/cmd/cgr-rater/registration.go index ae374d622..6f983888b 100644 --- a/cmd/cgr-rater/registration.go +++ b/cmd/cgr-rater/registration.go @@ -19,33 +19,97 @@ along with this program. If not, see package main import ( + "fmt" + "github.com/cgrates/cgrates/balancer" "github.com/cgrates/cgrates/timespans" "log" + "net/http" "net/rpc" "os" "os/signal" "syscall" + "time" ) /* -Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from inquirer and closes the storage before exiting. +RPC Server that handles the registering and unregistering of raters. */ -func StopSingnalHandler(server, listen *string, sg timespans.StorageGetter) { +type RaterServer struct{} + +func listenToRPCRaterRequests() { + rpc.Register(new(RaterServer)) + rpc.HandleHTTP() + http.ListenAndServe(*raterAddress, nil) +} + +var ( + bal = balancer.NewBalancer() +) + +/* +Listens for SIGTERM, SIGINT, SIGQUIT system signals and shuts down all the registered raters. +*/ +func stopSingnalHandler() { + log.Print("Handling stop signals...") + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + + sig := <-c + log.Printf("Caught signal %v, sending shutdownto raters\n", sig) + bal.Shutdown() + os.Exit(1) +} + +/* +RPC method that receives a rater address, connects to it and ads the pair to the rater list for balancing +*/ +func (rs *RaterServer) RegisterRater(clientAddress string, replay *int) error { + log.Printf("Started rater %v registration...", clientAddress) + time.Sleep(2 * time.Second) // wait a second for Rater to start serving + client, err := rpc.Dial("tcp", clientAddress) + if err != nil { + log.Print("Could not connect to client!") + return err + } + bal.AddClient(clientAddress, client) + log.Printf("Rater %v registered succesfully.", clientAddress) + return nil +} + +/* +RPC method that recives a rater addres gets the connections and closes it and removes the pair from rater list. +*/ +func (rs *RaterServer) UnRegisterRater(clientAddress string, replay *int) error { + client, ok := bal.GetClient(clientAddress) + if ok { + client.Close() + bal.RemoveClient(clientAddress) + log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress)) + } else { + log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress)) + } + return nil +} + +/* +Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from balancer and closes the storage before exiting. +*/ +func stopRaterSingnalHandler(server, listen *string, sg timespans.StorageGetter) { log.Print("Handling stop signals...") c := make(chan os.Signal) signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) sig := <-c log.Printf("Caught signal %v, unregistering from balancer\n", sig) - unregisterFromServer(server, listen) + unregisterFromBalancer(server, listen) sg.Close() os.Exit(1) } /* -Connects to the inquirer and calls unregister RPC method. +Connects to the balancer and calls unregister RPC method. */ -func unregisterFromServer(server, listen *string) { +func unregisterFromBalancer(server, listen *string) { client, err := rpc.DialHTTP("tcp", *server) if err != nil { log.Print("Cannot contact the balancer!") @@ -61,9 +125,9 @@ func unregisterFromServer(server, listen *string) { } /* -Connects to the inquirer and rehisters the rater to the server. +Connects to the balancer and rehisters the rater to the server. */ -func RegisterToServer(server, listen *string) { +func registerToBalancer(server, listen *string) { client, err := rpc.DialHTTP("tcp", *server) if err != nil { log.Print("Cannot contact the balancer!") diff --git a/cmd/cgr-rater/rpc_responder.go b/cmd/cgr-rater/rpc_responder.go index c72eb23db..14d4d47e3 100644 --- a/cmd/cgr-rater/rpc_responder.go +++ b/cmd/cgr-rater/rpc_responder.go @@ -15,9 +15,11 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ + package main import ( + "errors" "fmt" "github.com/cgrates/cgrates/timespans" "log" @@ -25,66 +27,52 @@ import ( "net/rpc" "net/rpc/jsonrpc" "runtime" + "time" ) -type Responder byte +type RpcResponder struct{} /* RPC method thet provides the external RPC interface for getting the rating information. */ -func (r *Responder) GetCost(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) { +func (r *RpcResponder) GetCost(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) { rs, err := GetCallCost(&arg, "Responder.GetCost") *replay = *rs return } -func (r *Responder) Debit(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) { +func (r *RpcResponder) Debit(arg timespans.CallDescriptor, replay *timespans.CallCost) (err error) { rs, err := GetCallCost(&arg, "Responder.Debit") *replay = *rs return } -func (r *Responder) DebitBalance(arg timespans.CallDescriptor, replay *float64) (err error) { +func (r *RpcResponder) DebitBalance(arg timespans.CallDescriptor, replay *float64) (err error) { *replay, err = CallMethod(&arg, "Responder.DebitCents") return } -func (r *Responder) DebitSMS(arg timespans.CallDescriptor, replay *float64) (err error) { +func (r *RpcResponder) DebitSMS(arg timespans.CallDescriptor, replay *float64) (err error) { *replay, err = CallMethod(&arg, "Responder.DebitSMS") return } -func (r *Responder) DebitSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { +func (r *RpcResponder) DebitSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { *replay, err = CallMethod(&arg, "Responder.DebitSeconds") return } -func (r *Responder) GetMaxSessionTime(arg timespans.CallDescriptor, replay *float64) (err error) { +func (r *RpcResponder) GetMaxSessionTime(arg timespans.CallDescriptor, replay *float64) (err error) { *replay, err = CallMethod(&arg, "Responder.GetMaxSessionTime") return } -func (r *Responder) AddVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { +func (r *RpcResponder) AddVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { *replay, err = CallMethod(&arg, "Responder.AddVolumeDiscountSeconds") return } -/*func (r *Responder) ResetVolumeDiscountSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Responder.ResetVolumeDiscountSeconds") - return -} - -func (r *Responder) AddRecievedCallSeconds(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Responder.AddRecievedCallSeconds") - return -} - -func (r *Responder) ResetUserBudget(arg timespans.CallDescriptor, replay *float64) (err error) { - *replay = CallMethod(&arg, "Responder.ResetUserBudget") - return -}*/ - -func (r *Responder) Status(arg timespans.CallDescriptor, replay *string) (err error) { +func (r *RpcResponder) Status(arg timespans.CallDescriptor, replay *string) (err error) { memstats := new(runtime.MemStats) runtime.ReadMemStats(memstats) *replay = "Connected raters:\n" @@ -129,3 +117,50 @@ func listenToRPCRequests() { } } } + +/* +The function that gets the information from the raters using balancer. +*/ +func GetCallCost(key *timespans.CallDescriptor, method string) (reply *timespans.CallCost, err error) { + err = errors.New("") //not nil value + for err != nil { + client := bal.Balance() + if client == nil { + log.Print("Waiting for raters to register...") + time.Sleep(1 * time.Second) // wait one second and retry + } else { + reply = ×pans.CallCost{} + reply, err = accLock.GuardGetCost(key.GetKey(), func() (*timespans.CallCost, error) { + err = client.Call(method, *key, reply) + return reply, err + }) + if err != nil { + log.Printf("Got en error from rater: %v", err) + } + } + } + return +} + +/* +The function that gets the information from the raters using balancer. +*/ +func CallMethod(key *timespans.CallDescriptor, method string) (reply float64, err error) { + err = errors.New("") //not nil value + for err != nil { + client := bal.Balance() + if client == nil { + log.Print("Waiting for raters to register...") + time.Sleep(1 * time.Second) // wait one second and retry + } else { + reply, err = accLock.Guard(key.GetKey(), func() (float64, error) { + err = client.Call(method, *key, &reply) + return reply, err + }) + if err != nil { + log.Printf("Got en error from rater: %v", err) + } + } + } + return +} diff --git a/cmd/cgr-rater/balancer_test.go b/cmd/cgr-rater/rpc_responder_test.go similarity index 100% rename from cmd/cgr-rater/balancer_test.go rename to cmd/cgr-rater/rpc_responder_test.go diff --git a/cmd/cgr-rater/scheduler.go b/cmd/cgr-rater/scheduler.go index 43282707a..f61111247 100644 --- a/cmd/cgr-rater/scheduler.go +++ b/cmd/cgr-rater/scheduler.go @@ -19,21 +19,13 @@ along with this program. If not, see package main import ( - "flag" "github.com/cgrates/cgrates/timespans" "log" - "os" - "os/signal" "sort" - "syscall" "time" ) var ( - redisserver = flag.String("redisserver", "127.0.0.1:6379", "redis server address (tcp:127.0.0.1:6379)") - redisdb = flag.Int("rdb", 10, "redis database number (10)") - redispass = flag.String("pass", "", "redis database password") - httpAddress = flag.String("httpapiaddr", "127.0.0.1:8000", "Http API server address (localhost:8000)") storage timespans.StorageGetter timer *time.Timer restartLoop = make(chan byte) @@ -71,20 +63,12 @@ func (s scheduler) loop() { } } -// Listens for the HUP system signal and gracefuly reloads the timers from database. -func stopSingnalHandler() { - log.Print("Handling HUP signal...") - for { - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGHUP) - sig := <-c - - log.Printf("Caught signal %v, reloading action timings.\n", sig) - loadActionTimings() - // check the tip of the queue for new actions - restartLoop <- 1 - timer.Stop() - } +func reloadActionTimings() { + log.Print("Reloading action timings.") + loadActionTimings() + // check the tip of the queue for new actions + restartLoop <- 1 + timer.Stop() } func loadActionTimings() { @@ -104,18 +88,3 @@ func loadActionTimings() { } sort.Sort(s.queue) } - -func mainb() { - flag.Parse() - var err error - storage, err = timespans.NewRedisStorage(*redisserver, *redisdb) - if err != nil { - log.Fatalf("Could not open database connection: %v", err) - } - defer storage.Close() - timespans.SetStorageGetter(storage) - loadActionTimings() - go stopSingnalHandler() - // go startWebApp() - s.loop() -} diff --git a/cmd/cgr-rater/sessionmanager.go b/cmd/cgr-rater/sessionmanager.go index e56d6779b..dd2838be9 100644 --- a/cmd/cgr-rater/sessionmanager.go +++ b/cmd/cgr-rater/sessionmanager.go @@ -27,16 +27,6 @@ import ( "net/rpc/jsonrpc" ) -var ( - standalone = flag.Bool("standalone", false, "run standalone (run as a rater)") - json = flag.Bool("json", false, "use JSON for RPC encoding") - balancer = flag.String("balancer", "127.0.0.1:2000", "balancer address host:port") - freeswitchsrv = flag.String("freeswitchsrv", "localhost:8021", "freeswitch address host:port") - freeswitchpass = flag.String("freeswitchpass", "ClueCon", "freeswitch address host:port") - redissrv = flag.String("redissrv", "127.0.0.1:6379", "redis address host:port") - redisdb = flag.Int("redisdb", 10, "redis database number") -) - func mainc() { flag.Parse() sm := &sessionmanager.FSSessionManager{} diff --git a/data/cgrates.config b/data/cgrates.config index 069124f10..3a01b095f 100644 --- a/data/cgrates.config +++ b/data/cgrates.config @@ -15,27 +15,29 @@ # along with this program. If not, see [global] -redis_server = "127.0.0.1:6379" #"redis address host:port" -redis_db = 10 # "redis database number" +redis_server = "127.0.0.1:6379" #"redis address host:port" +redis_db = 10 # "redis database number" [rater] -standalone = false # "start standalone server (no balancer)" -balancer_server = "127.0.0.1:2000" # "balancer address host:port" -listen = "127.0.0.1:1234" # "listening address host:port" -json = false # "use JSON for RPC encoding" +standalone = false # "start standalone server (no balancer)" +balancer_server = "127.0.0.1:2000" # "balancer address host:port" +listen_api = "127.0.0.1:1234" # "listening address host:port" +json = false # "use JSON for RPC encoding" [balancer] -standalone = false # "run standalone (run as a rater)") -raterAddress = "127.0.0.1:2000" # "Rater server address (localhost:2000)" -rpcAddress = "127.0.0.1:2001" # "Json RPC server address (localhost:2001)" -json = false # "use JSON for RPC encoding" +standalone = false # "run standalone (run as a rater)") +listen_rater = "127.0.0.1:2000" # "Rater server address (localhost:2000)" +listen_api = "127.0.0.1:2001" # "Json RPC server address (localhost:2001)" +json = false # "use JSON for RPC encoding" [scheduler] -standalone = false # "run standalone (no other service)") +standalone = false # "run standalone (no other service)") +json = false # "use JSON for RPC encoding" + +[session_manager] +standalone = false # "run standalone (run as a rater)") +api_server = "127.0.0.1:2000" # "balancer address host:port" +freeswitch_server = "localhost:8021" # "freeswitch address host:port" +freeswitch_pass = "ClueCon", # freeswitch address host:port" +json = false # "use JSON for RPC encoding" -[session-manager] -standalone = false # "run standalone (run as a rater)") -balancer = "127.0.0.1:2000" # "balancer address host:port" -freeswitchsrv = "localhost:8021" # "freeswitch address host:port" -freeswitchpass = "ClueCon", # freeswitch address host:port" -json = false # "use JSON for RPC encoding" \ No newline at end of file diff --git a/timespans/calldesc.go b/timespans/calldesc.go index 6ebb7c0ec..a8d9e54b1 100644 --- a/timespans/calldesc.go +++ b/timespans/calldesc.go @@ -21,15 +21,16 @@ package timespans import ( "errors" "fmt" + "log/syslog" "math" "time" ) func init() { var err error - logger, err = NewSyslogLogger() + Logger, err = syslog.New(syslog.LOG_INFO, "CGRateS") if err != nil { - logger = new(StdLogger) + Logger = new(StdLogger) } } @@ -42,7 +43,7 @@ const ( var ( storageGetter StorageGetter - logger LoggerInterface + Logger LoggerInterface ) /* @@ -282,7 +283,7 @@ func (cd *CallDescriptor) GetCost() (*CallCost, error) { Cost: cost, ConnectFee: connectionFee, Timespans: timespans} - logger.Info(fmt.Sprintf("Get Cost: %v => %v", cd, cc)) + Logger.Info(fmt.Sprintf("Get Cost: %v => %v", cd, cc)) return cc, err } @@ -337,7 +338,7 @@ func (cd *CallDescriptor) GetMaxSessionTime() (seconds float64, err error) { func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { cc, err = cd.GetCost() if err != nil { - logger.Err(fmt.Sprintf("error getting cost %v", err)) + Logger.Err(fmt.Sprintf("error getting cost %v", err)) } if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil { defer storageGetter.SetUserBalance(userBalance) diff --git a/timespans/logger.go b/timespans/logger.go index eea92e78b..09282ea17 100644 --- a/timespans/logger.go +++ b/timespans/logger.go @@ -20,7 +20,6 @@ package timespans import ( "log" - "log/syslog" ) type LoggerInterface interface { @@ -35,43 +34,7 @@ type LoggerInterface interface { Warning(m string) error } -type SyslogLogger struct { - *syslog.Writer -} - -func NewSyslogLogger() (*SyslogLogger, error) { - logger, err := syslog.New(syslog.LOG_INFO, "CGRateS") - return &SyslogLogger{logger}, err -} - -func (sl *SyslogLogger) Alert(m string) (err error) { - return sl.Alert(m) -} -func (sl *SyslogLogger) Close() error { - return sl.Close() -} -func (sl *SyslogLogger) Crit(m string) (err error) { - return sl.Crit(m) -} -func (sl *SyslogLogger) Debug(m string) (err error) { - return sl.Debug(m) -} -func (sl *SyslogLogger) Emerg(m string) (err error) { - return sl.Emerg(m) -} -func (sl *SyslogLogger) Err(m string) (err error) { - return sl.Err(m) -} -func (sl *SyslogLogger) Info(m string) (err error) { - return sl.Info(m) -} -func (sl *SyslogLogger) Notice(m string) (err error) { - return sl.Notice(m) -} -func (sl *SyslogLogger) Warning(m string) (err error) { - return sl.Warning(m) -} - +// Logs to standard output type StdLogger struct{} func (sl *StdLogger) Alert(m string) (err error) {