single listen address for balancer

This commit is contained in:
Radu Ioan Fericean
2012-07-27 18:37:53 +03:00
parent 07dcefd593
commit 92f23c3c87
12 changed files with 54 additions and 67 deletions

View File

@@ -52,7 +52,6 @@ var (
rater_rpc_encoding = GOB // use JSON for RPC encoding
balancer_enabled = false
balancer_listen_rater = "127.0.0.1:2000" // Rater server address
balancer_listen = "127.0.0.1:2001" // Json RPC server address
balancer_rpc_encoding = GOB // use JSON for RPC encoding
@@ -99,7 +98,6 @@ func readConfig(configFn string) {
rater_rpc_encoding, _ = c.GetString("rater", "rpc_encoding")
balancer_enabled, _ = c.GetBool("balancer", "enabled")
balancer_listen_rater, _ = c.GetString("balancer", "listen_rater")
balancer_listen, _ = c.GetString("balancer", "listen")
balancer_rpc_encoding, _ = c.GetString("balancer", "rpc_encoding")
@@ -258,7 +256,6 @@ func main() {
}
if balancer_enabled {
go stopBalancerSingnalHandler()
go listenToRPCRequests(new(RaterServer), balancer_listen_rater, GOB)
responder.Bal = bal
go listenToRPCRequests(responder, balancer_listen, balancer_rpc_encoding)
if rater_enabled {

View File

@@ -34,7 +34,6 @@ func TestConfig(t *testing.T) {
rater_rpc_encoding != "test" ||
balancer_enabled != true ||
balancer_listen_rater != "test" ||
balancer_listen != "test" ||
balancer_rpc_encoding != "test" ||
@@ -67,7 +66,6 @@ func TestConfig(t *testing.T) {
t.Log(rater_listen)
t.Log(rater_rpc_encoding)
t.Log(balancer_enabled)
t.Log(balancer_listen_rater)
t.Log(balancer_listen)
t.Log(balancer_rpc_encoding)
t.Log(scheduler_enabled)

View File

@@ -27,42 +27,8 @@ import (
"os"
"os/signal"
"syscall"
"time"
)
type RaterServer struct{}
/*
RPC method that receives a rater address, connects to it and ads the pair to the rater list for balancing
*/
func (rs *RaterServer) RegisterRater(clientAddress string, replay *int) error {
log.Printf("Started rater %v registration...", clientAddress)
time.Sleep(2 * time.Second) // wait a second for Rater to start serving
client, err := rpc.Dial("tcp", clientAddress)
if err != nil {
log.Print("Could not connect to client!")
return err
}
bal.AddClient(clientAddress, client)
log.Printf("Rater %v registered succesfully.", clientAddress)
return nil
}
/*
RPC method that recives a rater addres gets the connections and closes it and removes the pair from rater list.
*/
func (rs *RaterServer) UnRegisterRater(clientAddress string, replay *int) error {
client, ok := bal.GetClient(clientAddress)
if ok {
client.Close()
bal.RemoveClient(clientAddress)
log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress))
} else {
log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress))
}
return nil
}
/*
Listens for SIGTERM, SIGINT, SIGQUIT system signals and shuts down all the registered raters.
*/
@@ -103,7 +69,7 @@ func unregisterFromBalancer() {
}
var reply int
log.Print("Unregistering from balancer ", rater_balancer)
client.Call("RaterServer.UnRegisterRater", rater_listen, &reply)
client.Call("Responder.UnRegisterRater", rater_listen, &reply)
if err := client.Close(); err != nil {
log.Print("Could not close balancer unregistration!")
exitChan <- true
@@ -122,7 +88,7 @@ func registerToBalancer() {
}
var reply int
log.Print("Registering to balancer ", rater_balancer)
client.Call("RaterServer.RegisterRater", rater_listen, &reply)
client.Call("Responder.RegisterRater", rater_listen, &reply)
if err := client.Close(); err != nil {
log.Print("Could not close balancer registration!")
exitChan <- true

View File

@@ -21,12 +21,11 @@ redis_db = 10 # redis database number
[balancer]
enabled = true # Start balancer server
listen = 127.0.0.1:2001 # Balancer listen interface
listen_rater = 127.0.0.1:2000 # Balancer listen interface
rpc_encoding = gob # use JSON for RPC encoding
[rater]
enabled = true
listen = 127.0.0.1:2001 # listening address hostort, internal for internal communication only
listen = 127.0.0.1:1234 # listening address hostort, internal for internal communication only
balancer = disabled # if defined it will register to balancer as worker
rpc_encoding = gob # use JSON for RPC encoding

View File

@@ -21,14 +21,11 @@ redis_db = 10 # redis database number
[balancer]
enabled = true # Start balancer server
listen = 127.0.0.1:2001 # Balancer listen interface
listen_rater = 127.0.0.1:2000 # Balancer listen interface
rpc_encoding = gob # use JSON for RPC encoding
[rater]
enabled = false
listen = 127.0.0.1:2001 # listening address hostort, internal for internal communication only
balancer = 127.0.0.1:2000 # if defined it will register to balancer as worker
rpc_encoding = gob # use JSON for RPC encoding
[stats_server]
enabled = true

View File

@@ -15,18 +15,17 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>
[global]
redis_server = 127.0.0.1:6379 #redis address hostort
redis_server = 127.0.0.1:6379 #redis address host:port
redis_db = 10 # redis database number
[balancer]
enabled = false # Start balancer server
listen = 127.0.0.1:2001 # Balancer listen interface
listen_rater = 127.0.0.1:2000 # Balancer listen interface
rpc_encoding = gob # use JSON for RPC encoding
[rater]
enabled = true
listen = 127.0.0.1:2001 # listening address hostort, internal for internal communication only
listen = 127.0.0.1:2001 # listening address host:port, internal for internal communication only
balancer = disabled # if defined it will register to balancer as worker
rpc_encoding = gob # use JSON for RPC encoding
@@ -50,8 +49,8 @@ enabled = true
[session_manager]
enabled = true
rater = 127.0.0.1:2000 #address where to access rater. Can be internal, direct rater address or the address of a balancer
freeswitch_server = localhost:8021 # freeswitch address hostort
freeswitch_pass = ClueCon # freeswitch address hostort
freeswitch_server = localhost:8021 # freeswitch address host:port
freeswitch_pass = ClueCon # freeswitch address host:port
rpc_encoding = gob # use JSON for RPC encoding
[stats_server]

View File

@@ -20,6 +20,6 @@ redis_db = 10 # redis database number
[rater]
enabled = true
listen = 127.0.0.1:2001 # listening address hostort, internal for internal communication only
balancer = 127.0.0.1:2000 # if defined it will register to balancer as worker
listen = 127.0.0.1:1234 # listening address host:port, internal for internal communication only
balancer = 127.0.0.1:2001 # if defined it will register to balancer as worker
rpc_encoding = gob # use JSON for RPC encoding

View File

@@ -15,12 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>
[global]
redis_server = 127.0.0.1:6379 #redis address hostort
redis_server = 127.0.0.1:6379 #redis address host:port
redis_db = 10 # redis database number
[rater]
enabled = true
listen = 127.0.0.1:2001 # listening address hostort, internal for internal communication only
listen = 127.0.0.1:2001 # listening address host:port, internal for internal communication only
balancer = disabled # if defined it will register to balancer as worker
rpc_encoding = gob # use JSON for RPC encoding

View File

@@ -15,12 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>
[global]
redis_server = 127.0.0.1:6379 #redis address hostort
redis_server = 127.0.0.1:6379 #redis address host:port
redis_db = 10 # redis database number
[rater]
enabled = true
listen = 127.0.0.1:2001 # listening address hostort, internal for internal communication only
listen = 127.0.0.1:2001 # listening address host:port, internal for internal communication only
balancer = disabled # if defined it will register to balancer as worker
rpc_encoding = gob # use JSON for RPC encoding

View File

@@ -15,19 +15,19 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>
[global]
redis_server = 127.0.0.1:6379 #redis address hostort
redis_server = 127.0.0.1:6379 #redis address host:port
redis_db = 10 # redis database number
[rater]
enable = true
listen = 127.0.0.1:2001 # listening address hostort, internal for internal communication only
listen = 127.0.0.1:2001 # listening address host:port, internal for internal communication only
balancer = disabled # if defined it will register to balancer as worker
rpc_encoding = gob # use JSON for RPC encoding
[session_manager]
enabled = true
rater = internal #address where to access rater. Can be internal, direct rater address or the address of a balancer
freeswitch_server = localhost:8021 # freeswitch address hostort
freeswitch_pass = ClueCon # freeswitch address hostort
freeswitch_server = localhost:8021 # freeswitch address host:port
freeswitch_pass = ClueCon # freeswitch address host:port
rpc_encoding = gob # use JSON for RPC encoding

View File

@@ -15,18 +15,17 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>
[global]
redis_server = test #redis address hostort
redis_server = test #redis address host:port
redis_db = 1 # redis database number
[balancer]
enabled = true # Start balancer server
listen = test # Balancer listen interface
listen_rater = test # Balancer listen interface
rpc_encoding = test # use JSON for RPC encoding
[rater]
enabled = true
listen = test # listening address hostort, internal for internal communication only
listen = test # listening address host:port, internal for internal communication only
balancer = test # if defined it will register to balancer as worker
rpc_encoding = test # use JSON for RPC encoding
@@ -50,8 +49,8 @@ enabled = true
[session_manager]
enabled = true
rater = test #address where to access rater. Can be internal, direct rater address or the address of a balancer
freeswitch_server = test # freeswitch address hostort
freeswitch_pass = test # freeswitch address hostort
freeswitch_server = test # freeswitch address host:port
freeswitch_pass = test # freeswitch address host:port
rpc_encoding = test # use JSON for RPC encoding
[stats_server]

View File

@@ -23,6 +23,7 @@ import (
"fmt"
"github.com/cgrates/cgrates/balancer"
"log"
"net/rpc"
"reflect"
"runtime"
"strings"
@@ -198,6 +199,37 @@ func (rs *Responder) callMethod(key *CallDescriptor, method string) (reply float
return
}
/*
RPC method that receives a rater address, connects to it and ads the pair to the rater list for balancing
*/
func (rs *Responder) RegisterRater(clientAddress string, replay *int) error {
log.Printf("Started rater %v registration...", clientAddress)
time.Sleep(2 * time.Second) // wait a second for Rater to start serving
client, err := rpc.Dial("tcp", clientAddress)
if err != nil {
log.Print("Could not connect to client!")
return err
}
rs.Bal.AddClient(clientAddress, client)
log.Printf("Rater %v registered succesfully.", clientAddress)
return nil
}
/*
RPC method that recives a rater addres gets the connections and closes it and removes the pair from rater list.
*/
func (rs *Responder) UnRegisterRater(clientAddress string, replay *int) error {
client, ok := rs.Bal.GetClient(clientAddress)
if ok {
client.Close()
rs.Bal.RemoveClient(clientAddress)
log.Print(fmt.Sprintf("Rater %v unregistered succesfully.", clientAddress))
} else {
log.Print(fmt.Sprintf("Server %v was not on my watch!", clientAddress))
}
return nil
}
// Reflection worker type for not standalone balancer
type ResponderWorker struct{}