diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index 13d7849e1..38e08768a 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -32,33 +32,35 @@ import ( "runtime" ) +const ( + DISABLED = "disabled" + INTERNAL = "internal" +) + var ( config = flag.String("config", "/home/rif/Documents/prog/go/src/github.com/cgrates/cgrates/conf/rater_standalone.config", "Configuration file location.") redis_server = "127.0.0.1:6379" // redis address host:port redis_db = 10 // redis database number - rater_enabled = false // start standalone server (no balancer) - rater_standalone = false // start standalone server (no balancer) - rater_balancer_server = "127.0.0.1:2000" // balancer address host:port - rater_listen = "127.0.0.1:1234" // listening address host:port - rater_json = false // use JSON for RPC encoding + rater_enabled = false // start standalone server (no balancer) + rater_balancer = DISABLED // balancer address host:port + rater_listen = "127.0.0.1:1234" // listening address host:port + rater_json = false // use JSON for RPC encoding - balancer_enabled = false - balancer_standalone = false // run standalone - balancer_listen_rater = "127.0.0.1:2000" // Rater server address - balancer_listen_api = "127.0.0.1:2001" // Json RPC server address - balancer_web_status_server = "127.0.0.1:8000" // Web server address - balancer_json = false // use JSON for RPC encoding + balancer_enabled = false + balancer_listen_rater = "127.0.0.1:2000" // Rater server address + balancer_listen = "127.0.0.1:2001" // Json RPC server address + balancer_json = false // use JSON for RPC encoding scheduler_enabled = false sm_enabled = false - sm_api_server = "127.0.0.1:2000" // balancer address host:port + sm_rater = INTERNAL // address where to access rater. Can be internal, direct rater address or the address of a balancer sm_freeswitch_server = "localhost:8021" // freeswitch address host:port sm_freeswitch_pass = "ClueCon" // reeswitch address host:port + sm_json = false // use JSON for RPC encoding mediator_enabled = false - mediator_standalone = false // run standalone mediator_cdr_file = "Master.csv" // Freeswitch Master CSV CDR file. mediator_result_file = "out.csv" // Generated file containing CDR and price info. mediator_host = "localhost" // The host to connect to. Values that start with / are for UNIX domain sockets. @@ -67,6 +69,9 @@ var ( mediator_user = "" // The user to sign in as. mediator_password = "" // The user's password. + stats_enabled = false + stats_listen = "127.0.0.1:8000" // Web server address (for stat reports) + bal = balancer.NewBalancer() exitChan = make(chan bool) ) @@ -81,24 +86,22 @@ func readConfig(configFn string) { redis_db, _ = c.GetInt("global", "redis_db") rater_enabled, _ = c.GetBool("rater", "enabled") - rater_standalone, _ = c.GetBool("rater", "standalone") - rater_balancer_server, _ = c.GetString("rater", "balancer_server") - rater_listen, _ = c.GetString("rater", "listen_api") + rater_balancer, _ = c.GetString("rater", "balancer") + rater_listen, _ = c.GetString("rater", "listen") rater_json, _ = c.GetBool("rater", "json") balancer_enabled, _ = c.GetBool("balancer", "enabled") - balancer_standalone, _ = c.GetBool("balancer", "standalone") balancer_listen_rater, _ = c.GetString("balancer", "listen_rater") - balancer_listen_api, _ = c.GetString("balancer", "listen_api") - balancer_web_status_server, _ = c.GetString("balancer", "web_status_server") + balancer_listen, _ = c.GetString("balancer", "listen") balancer_json, _ = c.GetBool("balancer", "json") scheduler_enabled, _ = c.GetBool("scheduler", "enabled") sm_enabled, _ = c.GetBool("session_manager", "enabled") - sm_api_server, _ = c.GetString("session_manager", "api_server") + sm_rater, _ = c.GetString("session_manager", "rater") sm_freeswitch_server, _ = c.GetString("session_manager", "freeswitch_server") sm_freeswitch_pass, _ = c.GetString("session_manager", "freeswitch_pass") + sm_json, _ = c.GetBool("session_manager", "json") mediator_enabled, _ = c.GetBool("mediator", "enabled") mediator_cdr_file, _ = c.GetString("mediator", "cdr_file") @@ -108,6 +111,9 @@ func readConfig(configFn string) { mediator_db, _ = c.GetString("mediator", "db") mediator_user, _ = c.GetString("mediator", "user") mediator_password, _ = c.GetString("mediator", "password") + + stats_enabled, _ = c.GetBool("stats_server", "enabled") + stats_listen, _ = c.GetString("stats_server", "listen") } func listenToRPCRequests(rpcResponder interface{}, rpcAddress string, json bool) { @@ -146,8 +152,19 @@ func listenToHttpRequests() { http.HandleFunc("/", statusHandler) http.HandleFunc("/getmem", memoryHandler) http.HandleFunc("/raters", ratersHandler) - timespans.Logger.Info(fmt.Sprintf("The server is listening on %s", balancer_web_status_server)) - http.ListenAndServe(balancer_web_status_server, nil) + timespans.Logger.Info(fmt.Sprintf("The server is listening on %s", stats_listen)) + http.ListenAndServe(stats_listen, nil) +} + +func checkConfigSanity() { + if sm_enabled && rater_enabled && rater_balancer != DISABLED { + timespans.Logger.Crit("The session manager must not be enabled on a worker rater (change [rater]/balancer to disabled)!") + exitChan <- true + } + if balancer_enabled && rater_enabled && rater_balancer != DISABLED { + timespans.Logger.Crit("The balancer is enabled so it cannot connect to anatoher balancer (change [rater]/balancer to disabled)!") + exitChan <- true + } } func main() { @@ -155,15 +172,7 @@ func main() { runtime.GOMAXPROCS(runtime.NumCPU()) readConfig(*config) // some consitency checks - if balancer_standalone || balancer_enabled { - balancer_enabled = true - rater_enabled = false - rater_standalone = false - } - if !rater_enabled && !balancer_enabled { - rater_enabled = true - rater_standalone = true - } + checkConfigSanity() getter, err := timespans.NewRedisStorage(redis_server, redis_db) if err != nil { @@ -173,25 +182,28 @@ func main() { defer getter.Close() timespans.SetStorageGetter(getter) - if rater_enabled && !rater_standalone && !balancer_enabled { + if rater_enabled && rater_balancer != DISABLED && !balancer_enabled { go registerToBalancer() go stopRaterSingnalHandler() } responder := ×pans.Responder{ExitChan: exitChan} - if rater_enabled { + if rater_enabled && !balancer_enabled { go listenToRPCRequests(responder, rater_listen, rater_json) } if balancer_enabled { go stopBalancerSingnalHandler() go listenToRPCRequests(new(RaterServer), balancer_listen_rater, false) responder.Bal = bal - go listenToRPCRequests(responder, balancer_listen_api, balancer_json) - go listenToHttpRequests() - if !balancer_standalone { + go listenToRPCRequests(responder, balancer_listen, balancer_json) + if rater_enabled { bal.AddClient("local", new(timespans.ResponderWorker)) } } + if stats_enabled { + go listenToHttpRequests() + } + if scheduler_enabled { go func() { loadActionTimings(getter) @@ -202,8 +214,24 @@ func main() { if sm_enabled { go func() { + var connector sessionmanager.Connector + if sm_rater == INTERNAL { + connector = responder + } else { + var client *rpc.Client + if sm_json { + client, err = jsonrpc.Dial("tcp", sm_rater) + } else { + client, err = rpc.Dial("tcp", sm_rater) + } + if err != nil { + timespans.Logger.Crit(fmt.Sprintf("Could not connect to rater: %v", err)) + exitChan <- true + } + connector = &sessionmanager.RPCClientConnector{client} + } sm := &sessionmanager.FSSessionManager{} - sm.Connect(&sessionmanager.SessionDelegate{responder}, sm_freeswitch_server, sm_freeswitch_pass) + sm.Connect(&sessionmanager.SessionDelegate{connector}, sm_freeswitch_server, sm_freeswitch_pass) }() } diff --git a/cmd/cgr-rater/registration.go b/cmd/cgr-rater/registration.go index 500af7a56..35e81a0d9 100644 --- a/cmd/cgr-rater/registration.go +++ b/cmd/cgr-rater/registration.go @@ -95,14 +95,14 @@ func stopRaterSingnalHandler() { Connects to the balancer and calls unregister RPC method. */ func unregisterFromBalancer() { - client, err := rpc.Dial("tcp", rater_balancer_server) + client, err := rpc.Dial("tcp", rater_balancer) if err != nil { log.Print("Cannot contact the balancer!") exitChan <- true return } var reply int - log.Print("Unregistering from balancer ", rater_balancer_server) + log.Print("Unregistering from balancer ", rater_balancer) client.Call("RaterServer.UnRegisterRater", rater_listen, &reply) if err := client.Close(); err != nil { log.Print("Could not close balancer unregistration!") @@ -114,14 +114,14 @@ func unregisterFromBalancer() { Connects to the balancer and rehisters the rater to the server. */ func registerToBalancer() { - client, err := rpc.Dial("tcp", rater_balancer_server) + client, err := rpc.Dial("tcp", rater_balancer) if err != nil { log.Print("Cannot contact the balancer!") exitChan <- true return } var reply int - log.Print("Registering to balancer ", rater_balancer_server) + log.Print("Registering to balancer ", rater_balancer) client.Call("RaterServer.RegisterRater", rater_listen, &reply) if err := client.Close(); err != nil { log.Print("Could not close balancer registration!") diff --git a/data/test.config b/data/test.config index 508e58c2d..1f4a8d3d8 100644 --- a/data/test.config +++ b/data/test.config @@ -21,6 +21,7 @@ redis_db = 10 # redis database number [balancer] enabled = false # Start balancer server listen = 127.0.0.1:2001 # Balancer listen interface +listen_rater = 127.0.0.1:2000 # Balancer listen interface json = false # use JSON for RPC encoding [rater] diff --git a/sessionmanager/sessiondelegate.go b/sessionmanager/sessiondelegate.go index f29bb51df..01b5a3fb1 100644 --- a/sessionmanager/sessiondelegate.go +++ b/sessionmanager/sessiondelegate.go @@ -21,6 +21,7 @@ package sessionmanager import ( "github.com/cgrates/cgrates/timespans" "log" + "net/rpc" "time" ) @@ -28,9 +29,33 @@ const ( DEBIT_PERIOD = 10 * time.Second ) +type Connector interface { + Debit(timespans.CallDescriptor, *timespans.CallCost) error + DebitCents(timespans.CallDescriptor, *float64) error + DebitSeconds(timespans.CallDescriptor, *float64) error + GetMaxSessionTime(timespans.CallDescriptor, *float64) error +} + +type RPCClientConnector struct { + Client *rpc.Client +} + +func (rcc *RPCClientConnector) Debit(cd timespans.CallDescriptor, cc *timespans.CallCost) error { + return rcc.Client.Call("Responder.Debit", cd, cc) +} +func (rcc *RPCClientConnector) DebitCents(cd timespans.CallDescriptor, resp *float64) error { + return rcc.Client.Call("Responder.DebitCents", cd, resp) +} +func (rcc *RPCClientConnector) DebitSeconds(cd timespans.CallDescriptor, resp *float64) error { + return rcc.Client.Call("Responder.DebitSeconds", cd, resp) +} +func (rcc *RPCClientConnector) GetMaxSessionTime(cd timespans.CallDescriptor, resp *float64) error { + return rcc.Client.Call("Responder.GetMaxSessionTime", cd, resp) +} + // Sample SessionDelegate calling the timespans methods through the RPC interface type SessionDelegate struct { - Responder *timespans.Responder + Connector Connector } func (rsd *SessionDelegate) OnHeartBeat(ev Event) { @@ -88,7 +113,7 @@ func (rsd *SessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { Amount: -cost, } var response float64 - err := rsd.Responder.DebitCents(*cd, &response) + err := rsd.Connector.DebitCents(*cd, &response) if err != nil { log.Printf("Debit cents failed: %v", err) } @@ -103,7 +128,7 @@ func (rsd *SessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { Amount: -seconds, } var response float64 - err := rsd.Responder.DebitSeconds(*cd, &response) + err := rsd.Connector.DebitSeconds(*cd, &response) if err != nil { log.Printf("Debit seconds failed: %v", err) } @@ -114,7 +139,7 @@ func (rsd *SessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { func (rsd *SessionDelegate) LoopAction(s *Session, cd *timespans.CallDescriptor) { cc := ×pans.CallCost{} - err := rsd.Responder.Debit(*cd, cc) + err := rsd.Connector.Debit(*cd, cc) if err != nil { log.Printf("Could not complete debit opperation: %v", err) } @@ -122,7 +147,7 @@ func (rsd *SessionDelegate) LoopAction(s *Session, cd *timespans.CallDescriptor) log.Print(cc) cd.Amount = DEBIT_PERIOD.Seconds() var remainingSeconds float64 - err = rsd.Responder.GetMaxSessionTime(*cd, &remainingSeconds) + err = rsd.Connector.GetMaxSessionTime(*cd, &remainingSeconds) if err != nil { log.Printf("Could not get max session time: %v", err) }