Merge branch 'master' of github.com:cgrates/cgrates

This commit is contained in:
Radu Ioan Fericean
2013-05-10 18:28:45 +03:00
3 changed files with 64 additions and 22 deletions

View File

@@ -98,9 +98,21 @@ func startMediator(responder *rater.Responder, loggerDb rater.DataStorage) {
var client *rpc.Client
var err error
if cfg.MediatorRPCEncoding == JSON {
client, err = jsonrpc.Dial("tcp", cfg.MediatorRater)
for i := 0; i < cfg.MediatorRaterReconnects; i++ {
client, err = jsonrpc.Dial("tcp", cfg.MediatorRater)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
}
} else {
client, err = rpc.Dial("tcp", cfg.MediatorRater)
for i := 0; i < cfg.MediatorRaterReconnects; i++ {
client, err = rpc.Dial("tcp", cfg.MediatorRater)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
}
}
if err != nil {
rater.Logger.Crit(fmt.Sprintf("Could not connect to rater: %v", err))
@@ -142,9 +154,23 @@ func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage)
var client *rpc.Client
var err error
if cfg.SMRPCEncoding == JSON {
client, err = jsonrpc.Dial("tcp", cfg.SMRater)
// We attempt to reconnect more times
for i := 0; i < cfg.SMRaterReconnects; i++ {
client, err = jsonrpc.Dial("tcp", cfg.SMRater)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
}
} else {
client, err = rpc.Dial("tcp", cfg.SMRater)
for i := 0; i < cfg.SMRaterReconnects; i++ {
client, err = rpc.Dial("tcp", cfg.SMRater)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
}
}
if err != nil {
rater.Logger.Crit(fmt.Sprintf("Could not connect to rater: %v", err))
@@ -167,14 +193,14 @@ func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage)
exitChan <- true
}
func checkConfigSanity() {
func checkConfigSanity() error {
if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != DISABLED {
rater.Logger.Crit("The session manager must not be enabled on a worker rater (change [rater]/balancer to disabled)!")
exitChan <- true
return errors.New("SessionManager on Worker")
}
if cfg.BalancerEnabled && cfg.RaterEnabled && cfg.RaterBalancer != DISABLED {
rater.Logger.Crit("The balancer is enabled so it cannot connect to anatoher balancer (change [rater]/balancer to disabled)!")
exitChan <- true
return errors.New("Improperly configured balancer")
}
// check if the session manager or mediator is connectting via loopback
@@ -185,13 +211,13 @@ func checkConfigSanity() {
if cfg.BalancerEnabled {
if cfg.BalancerRPCEncoding != cfg.SMRPCEncoding {
rater.Logger.Crit("If you are connecting the session manager via the loopback to the balancer use the same type of rpc encoding!")
exitChan <- true
return errors.New("Balancer and SessionManager using different encoding")
}
}
if cfg.RaterEnabled {
if cfg.RaterRPCEncoding != cfg.SMRPCEncoding {
rater.Logger.Crit("If you are connecting the session manager via the loopback to the arter use the same type of rpc encoding!")
exitChan <- true
return errors.New("Rater and SessionManager using different encoding")
}
}
}
@@ -199,16 +225,17 @@ func checkConfigSanity() {
if cfg.BalancerEnabled {
if cfg.BalancerRPCEncoding != cfg.MediatorRPCEncoding {
rater.Logger.Crit("If you are connecting the mediator via the loopback to the balancer use the same type of rpc encoding!")
exitChan <- true
return errors.New("Balancer and Mediator using different encoding")
}
}
if cfg.RaterEnabled {
if cfg.RaterRPCEncoding != cfg.MediatorRPCEncoding {
rater.Logger.Crit("If you are connecting the mediator via the loopback to the arter use the same type of rpc encoding!")
exitChan <- true
return errors.New("Rater and Mediator using different encoding")
}
}
}
return nil
}
func configureDatabase(db_type, host, port, name, user, pass string) (getter rater.DataStorage, err error) {
@@ -218,7 +245,7 @@ func configureDatabase(db_type, host, port, name, user, pass string) (getter rat
db_nb, err = strconv.Atoi(name)
if err != nil {
rater.Logger.Crit("Redis db name must be an integer!")
exitChan <- true
return nil, err
}
if port != "" {
host += ":" + port
@@ -230,15 +257,13 @@ func configureDatabase(db_type, host, port, name, user, pass string) (getter rat
getter, err = rater.NewPostgresStorage(host, port, name, user, pass)
default:
err = errors.New("unknown db")
rater.Logger.Crit("Unknown db type, exiting!")
exitChan <- true
return nil, err
}
if err != nil {
rater.Logger.Crit(fmt.Sprintf("Could not connect to db: %v, exiting!", err))
exitChan <- true
return nil, err
}
return
return getter, nil
}
func main() {
@@ -255,7 +280,11 @@ func main() {
return
}
// some consitency checks
go checkConfigSanity()
errCfg := checkConfigSanity()
if errCfg != nil {
rater.Logger.Crit( errCfg.Error() )
return
}
var getter, loggerDb rater.DataStorage
getter, err = configureDatabase(cfg.DataDBType, cfg.DataDBHost, cfg.DataDBPort, cfg.DataDBName, cfg.DataDBUser, cfg.DataDBPass)
@@ -269,10 +298,10 @@ func main() {
loggerDb = getter
} else {
loggerDb, err = configureDatabase(cfg.LogDBType, cfg.LogDBHost, cfg.LogDBPort, cfg.LogDBName, cfg.LogDBUser, cfg.LogDBPass)
}
if err != nil { // Cannot configure logger database, show stopper
rater.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
return
if err != nil { // Cannot configure logger database, show stopper
rater.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
return
}
}
defer loggerDb.Close()
rater.SetStorageLogger(loggerDb)
@@ -283,6 +312,7 @@ func main() {
}
}
// Async starts here
if cfg.RaterEnabled && cfg.RaterBalancer != DISABLED && !cfg.BalancerEnabled {
go registerToBalancer()
go stopRaterSingnalHandler()

View File

@@ -61,6 +61,7 @@ type CGRConfig struct {
SMEnabled bool
SMSwitchType string
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
SMRaterReconnects int // Number of reconnect attempts to rater
SMDebitInterval int // the period to be debited in advanced during a call (in seconds)
SMRPCEncoding string // use JSON for RPC encoding
SMDefaultReqType string // Use this request type if not defined on top
@@ -72,6 +73,7 @@ type CGRConfig struct {
MediatorCDRInDir string // Freeswitch Master CSV CDR path.
MediatorCDROutDir string // Freeswitch Master CSV CDR output path.
MediatorRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
MediatorRaterReconnects int // Number of reconnect attempts to rater
MediatorRPCEncoding string // use JSON for RPC encoding
MediatorSkipDB bool
MediatorPseudoprepaid bool
@@ -193,6 +195,10 @@ func NewCGRConfig(cfgPath *string) (*CGRConfig, error) {
if hasOpt = c.HasOption("mediator", "rater"); hasOpt {
cfg.MediatorRater, _ = c.GetString("mediator", "rater")
}
cfg.MediatorRaterReconnects = 3
if hasOpt = c.HasOption("mediator", "rater_reconnects"); hasOpt {
cfg.MediatorRaterReconnects, _ = c.GetInt("mediator", "rater_reconnects")
}
cfg.MediatorRPCEncoding = GOB
if hasOpt = c.HasOption("mediator", "rpc_encoding"); hasOpt {
cfg.MediatorRPCEncoding, _ = c.GetString("mediator", "rpc_encoding")
@@ -221,6 +227,10 @@ func NewCGRConfig(cfgPath *string) (*CGRConfig, error) {
if hasOpt = c.HasOption("session_manager", "rater"); hasOpt {
cfg.SMRater, _ = c.GetString("session_manager", "rater")
}
cfg.SMRaterReconnects = 3
if hasOpt = c.HasOption("session_manager", "rater_reconnects"); hasOpt {
cfg.SMRaterReconnects, _ = c.GetInt("session_manager", "rater_reconnects")
}
cfg.SMDebitInterval = 10
if hasOpt = c.HasOption("session_manager", "debit_interval"); hasOpt {
cfg.SMDebitInterval, _ = c.GetInt("session_manager", "debit_interval")

View File

@@ -35,6 +35,7 @@
[mediator]
# enabled = false # Starts Mediator service: <true|false>.
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# rpc_encoding = gob # RPC encoding used when talking to Rater: <gob|json>.
# skipdb = false # Skips database checks for previous recorded prices: <true|false>.
# pseudoprepaid = false # Execute debits together with pricing: <true|false>.
@@ -46,6 +47,7 @@
# enabled = false # Starts SessionManager service: <true|false>.
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# debit_interval = 5 # Interval to perform debits on.
# rpc_encoding = gob # RPC encoding used when talking to Rater: <gob|json>.
# default_reqtype = # Default request type to consider when missing from requests: <""|prepaid|postpaid>.