remote connecting session manager

This commit is contained in:
Radu Ioan Fericean
2012-07-25 18:12:55 +03:00
parent baf4cda9fa
commit a677489029
4 changed files with 100 additions and 46 deletions

View File

@@ -32,33 +32,35 @@ import (
"runtime"
)
const (
DISABLED = "disabled"
INTERNAL = "internal"
)
var (
config = flag.String("config", "/home/rif/Documents/prog/go/src/github.com/cgrates/cgrates/conf/rater_standalone.config", "Configuration file location.")
redis_server = "127.0.0.1:6379" // redis address host:port
redis_db = 10 // redis database number
rater_enabled = false // start standalone server (no balancer)
rater_standalone = false // start standalone server (no balancer)
rater_balancer_server = "127.0.0.1:2000" // balancer address host:port
rater_listen = "127.0.0.1:1234" // listening address host:port
rater_json = false // use JSON for RPC encoding
rater_enabled = false // start standalone server (no balancer)
rater_balancer = DISABLED // balancer address host:port
rater_listen = "127.0.0.1:1234" // listening address host:port
rater_json = false // use JSON for RPC encoding
balancer_enabled = false
balancer_standalone = false // run standalone
balancer_listen_rater = "127.0.0.1:2000" // Rater server address
balancer_listen_api = "127.0.0.1:2001" // Json RPC server address
balancer_web_status_server = "127.0.0.1:8000" // Web server address
balancer_json = false // use JSON for RPC encoding
balancer_enabled = false
balancer_listen_rater = "127.0.0.1:2000" // Rater server address
balancer_listen = "127.0.0.1:2001" // Json RPC server address
balancer_json = false // use JSON for RPC encoding
scheduler_enabled = false
sm_enabled = false
sm_api_server = "127.0.0.1:2000" // balancer address host:port
sm_rater = INTERNAL // address where to access rater. Can be internal, direct rater address or the address of a balancer
sm_freeswitch_server = "localhost:8021" // freeswitch address host:port
sm_freeswitch_pass = "ClueCon" // reeswitch address host:port
sm_json = false // use JSON for RPC encoding
mediator_enabled = false
mediator_standalone = false // run standalone
mediator_cdr_file = "Master.csv" // Freeswitch Master CSV CDR file.
mediator_result_file = "out.csv" // Generated file containing CDR and price info.
mediator_host = "localhost" // The host to connect to. Values that start with / are for UNIX domain sockets.
@@ -67,6 +69,9 @@ var (
mediator_user = "" // The user to sign in as.
mediator_password = "" // The user's password.
stats_enabled = false
stats_listen = "127.0.0.1:8000" // Web server address (for stat reports)
bal = balancer.NewBalancer()
exitChan = make(chan bool)
)
@@ -81,24 +86,22 @@ func readConfig(configFn string) {
redis_db, _ = c.GetInt("global", "redis_db")
rater_enabled, _ = c.GetBool("rater", "enabled")
rater_standalone, _ = c.GetBool("rater", "standalone")
rater_balancer_server, _ = c.GetString("rater", "balancer_server")
rater_listen, _ = c.GetString("rater", "listen_api")
rater_balancer, _ = c.GetString("rater", "balancer")
rater_listen, _ = c.GetString("rater", "listen")
rater_json, _ = c.GetBool("rater", "json")
balancer_enabled, _ = c.GetBool("balancer", "enabled")
balancer_standalone, _ = c.GetBool("balancer", "standalone")
balancer_listen_rater, _ = c.GetString("balancer", "listen_rater")
balancer_listen_api, _ = c.GetString("balancer", "listen_api")
balancer_web_status_server, _ = c.GetString("balancer", "web_status_server")
balancer_listen, _ = c.GetString("balancer", "listen")
balancer_json, _ = c.GetBool("balancer", "json")
scheduler_enabled, _ = c.GetBool("scheduler", "enabled")
sm_enabled, _ = c.GetBool("session_manager", "enabled")
sm_api_server, _ = c.GetString("session_manager", "api_server")
sm_rater, _ = c.GetString("session_manager", "rater")
sm_freeswitch_server, _ = c.GetString("session_manager", "freeswitch_server")
sm_freeswitch_pass, _ = c.GetString("session_manager", "freeswitch_pass")
sm_json, _ = c.GetBool("session_manager", "json")
mediator_enabled, _ = c.GetBool("mediator", "enabled")
mediator_cdr_file, _ = c.GetString("mediator", "cdr_file")
@@ -108,6 +111,9 @@ func readConfig(configFn string) {
mediator_db, _ = c.GetString("mediator", "db")
mediator_user, _ = c.GetString("mediator", "user")
mediator_password, _ = c.GetString("mediator", "password")
stats_enabled, _ = c.GetBool("stats_server", "enabled")
stats_listen, _ = c.GetString("stats_server", "listen")
}
func listenToRPCRequests(rpcResponder interface{}, rpcAddress string, json bool) {
@@ -146,8 +152,19 @@ func listenToHttpRequests() {
http.HandleFunc("/", statusHandler)
http.HandleFunc("/getmem", memoryHandler)
http.HandleFunc("/raters", ratersHandler)
timespans.Logger.Info(fmt.Sprintf("The server is listening on %s", balancer_web_status_server))
http.ListenAndServe(balancer_web_status_server, nil)
timespans.Logger.Info(fmt.Sprintf("The server is listening on %s", stats_listen))
http.ListenAndServe(stats_listen, nil)
}
func checkConfigSanity() {
if sm_enabled && rater_enabled && rater_balancer != DISABLED {
timespans.Logger.Crit("The session manager must not be enabled on a worker rater (change [rater]/balancer to disabled)!")
exitChan <- true
}
if balancer_enabled && rater_enabled && rater_balancer != DISABLED {
timespans.Logger.Crit("The balancer is enabled so it cannot connect to anatoher balancer (change [rater]/balancer to disabled)!")
exitChan <- true
}
}
func main() {
@@ -155,15 +172,7 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
readConfig(*config)
// some consitency checks
if balancer_standalone || balancer_enabled {
balancer_enabled = true
rater_enabled = false
rater_standalone = false
}
if !rater_enabled && !balancer_enabled {
rater_enabled = true
rater_standalone = true
}
checkConfigSanity()
getter, err := timespans.NewRedisStorage(redis_server, redis_db)
if err != nil {
@@ -173,25 +182,28 @@ func main() {
defer getter.Close()
timespans.SetStorageGetter(getter)
if rater_enabled && !rater_standalone && !balancer_enabled {
if rater_enabled && rater_balancer != DISABLED && !balancer_enabled {
go registerToBalancer()
go stopRaterSingnalHandler()
}
responder := &timespans.Responder{ExitChan: exitChan}
if rater_enabled {
if rater_enabled && !balancer_enabled {
go listenToRPCRequests(responder, rater_listen, rater_json)
}
if balancer_enabled {
go stopBalancerSingnalHandler()
go listenToRPCRequests(new(RaterServer), balancer_listen_rater, false)
responder.Bal = bal
go listenToRPCRequests(responder, balancer_listen_api, balancer_json)
go listenToHttpRequests()
if !balancer_standalone {
go listenToRPCRequests(responder, balancer_listen, balancer_json)
if rater_enabled {
bal.AddClient("local", new(timespans.ResponderWorker))
}
}
if stats_enabled {
go listenToHttpRequests()
}
if scheduler_enabled {
go func() {
loadActionTimings(getter)
@@ -202,8 +214,24 @@ func main() {
if sm_enabled {
go func() {
var connector sessionmanager.Connector
if sm_rater == INTERNAL {
connector = responder
} else {
var client *rpc.Client
if sm_json {
client, err = jsonrpc.Dial("tcp", sm_rater)
} else {
client, err = rpc.Dial("tcp", sm_rater)
}
if err != nil {
timespans.Logger.Crit(fmt.Sprintf("Could not connect to rater: %v", err))
exitChan <- true
}
connector = &sessionmanager.RPCClientConnector{client}
}
sm := &sessionmanager.FSSessionManager{}
sm.Connect(&sessionmanager.SessionDelegate{responder}, sm_freeswitch_server, sm_freeswitch_pass)
sm.Connect(&sessionmanager.SessionDelegate{connector}, sm_freeswitch_server, sm_freeswitch_pass)
}()
}

View File

@@ -95,14 +95,14 @@ func stopRaterSingnalHandler() {
Connects to the balancer and calls unregister RPC method.
*/
func unregisterFromBalancer() {
client, err := rpc.Dial("tcp", rater_balancer_server)
client, err := rpc.Dial("tcp", rater_balancer)
if err != nil {
log.Print("Cannot contact the balancer!")
exitChan <- true
return
}
var reply int
log.Print("Unregistering from balancer ", rater_balancer_server)
log.Print("Unregistering from balancer ", rater_balancer)
client.Call("RaterServer.UnRegisterRater", rater_listen, &reply)
if err := client.Close(); err != nil {
log.Print("Could not close balancer unregistration!")
@@ -114,14 +114,14 @@ func unregisterFromBalancer() {
Connects to the balancer and rehisters the rater to the server.
*/
func registerToBalancer() {
client, err := rpc.Dial("tcp", rater_balancer_server)
client, err := rpc.Dial("tcp", rater_balancer)
if err != nil {
log.Print("Cannot contact the balancer!")
exitChan <- true
return
}
var reply int
log.Print("Registering to balancer ", rater_balancer_server)
log.Print("Registering to balancer ", rater_balancer)
client.Call("RaterServer.RegisterRater", rater_listen, &reply)
if err := client.Close(); err != nil {
log.Print("Could not close balancer registration!")

View File

@@ -21,6 +21,7 @@ redis_db = 10 # redis database number
[balancer]
enabled = false # Start balancer server
listen = 127.0.0.1:2001 # Balancer listen interface
listen_rater = 127.0.0.1:2000 # Balancer listen interface
json = false # use JSON for RPC encoding
[rater]

View File

@@ -21,6 +21,7 @@ package sessionmanager
import (
"github.com/cgrates/cgrates/timespans"
"log"
"net/rpc"
"time"
)
@@ -28,9 +29,33 @@ const (
DEBIT_PERIOD = 10 * time.Second
)
type Connector interface {
Debit(timespans.CallDescriptor, *timespans.CallCost) error
DebitCents(timespans.CallDescriptor, *float64) error
DebitSeconds(timespans.CallDescriptor, *float64) error
GetMaxSessionTime(timespans.CallDescriptor, *float64) error
}
type RPCClientConnector struct {
Client *rpc.Client
}
func (rcc *RPCClientConnector) Debit(cd timespans.CallDescriptor, cc *timespans.CallCost) error {
return rcc.Client.Call("Responder.Debit", cd, cc)
}
func (rcc *RPCClientConnector) DebitCents(cd timespans.CallDescriptor, resp *float64) error {
return rcc.Client.Call("Responder.DebitCents", cd, resp)
}
func (rcc *RPCClientConnector) DebitSeconds(cd timespans.CallDescriptor, resp *float64) error {
return rcc.Client.Call("Responder.DebitSeconds", cd, resp)
}
func (rcc *RPCClientConnector) GetMaxSessionTime(cd timespans.CallDescriptor, resp *float64) error {
return rcc.Client.Call("Responder.GetMaxSessionTime", cd, resp)
}
// Sample SessionDelegate calling the timespans methods through the RPC interface
type SessionDelegate struct {
Responder *timespans.Responder
Connector Connector
}
func (rsd *SessionDelegate) OnHeartBeat(ev Event) {
@@ -88,7 +113,7 @@ func (rsd *SessionDelegate) OnChannelHangupComplete(ev Event, s *Session) {
Amount: -cost,
}
var response float64
err := rsd.Responder.DebitCents(*cd, &response)
err := rsd.Connector.DebitCents(*cd, &response)
if err != nil {
log.Printf("Debit cents failed: %v", err)
}
@@ -103,7 +128,7 @@ func (rsd *SessionDelegate) OnChannelHangupComplete(ev Event, s *Session) {
Amount: -seconds,
}
var response float64
err := rsd.Responder.DebitSeconds(*cd, &response)
err := rsd.Connector.DebitSeconds(*cd, &response)
if err != nil {
log.Printf("Debit seconds failed: %v", err)
}
@@ -114,7 +139,7 @@ func (rsd *SessionDelegate) OnChannelHangupComplete(ev Event, s *Session) {
func (rsd *SessionDelegate) LoopAction(s *Session, cd *timespans.CallDescriptor) {
cc := &timespans.CallCost{}
err := rsd.Responder.Debit(*cd, cc)
err := rsd.Connector.Debit(*cd, cc)
if err != nil {
log.Printf("Could not complete debit opperation: %v", err)
}
@@ -122,7 +147,7 @@ func (rsd *SessionDelegate) LoopAction(s *Session, cd *timespans.CallDescriptor)
log.Print(cc)
cd.Amount = DEBIT_PERIOD.Seconds()
var remainingSeconds float64
err = rsd.Responder.GetMaxSessionTime(*cd, &remainingSeconds)
err = rsd.Connector.GetMaxSessionTime(*cd, &remainingSeconds)
if err != nil {
log.Printf("Could not get max session time: %v", err)
}