Internal connection timeouts within rater start

This commit is contained in:
DanB
2015-09-05 14:27:00 +02:00
parent 0edbf9bde2
commit 93a3f89428
6 changed files with 98 additions and 42 deletions

View File

@@ -20,6 +20,7 @@ package main
import (
"fmt"
"time"
"github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/apier/v2"
@@ -71,8 +72,14 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
waitTasks = append(waitTasks, schedTaskChan)
go func() {
defer close(schedTaskChan)
sched = <-internalSchedulerChan
internalSchedulerChan <- sched
select {
case sched = <-internalSchedulerChan:
internalSchedulerChan <- sched
case <-time.After(cfg.InternalTtl):
engine.Logger.Crit("<Rater>: Internal scheduler connection timeout.")
exitChan <- true
return
}
}()
}
@@ -85,8 +92,14 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
go func() {
defer close(balTaskChan)
if cfg.RaterBalancer == utils.INTERNAL {
bal = <-internalBalancerChan
internalBalancerChan <- bal // Put it back if someone else is interested about
select {
case bal = <-internalBalancerChan:
internalBalancerChan <- bal // Put it back if someone else is interested about
case <-time.After(cfg.InternalTtl):
engine.Logger.Crit("<Rater>: Internal balancer connection timeout.")
exitChan <- true
return
}
} else {
go registerToBalancer(exitChan)
go stopRaterSignalHandler(internalCdrStatSChan, exitChan)
@@ -103,10 +116,16 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
go func() {
defer close(cdrstatTaskChan)
if cfg.RaterCdrStats == utils.INTERNAL {
cdrStats = <-internalCdrStatSChan
internalCdrStatSChan <- cdrStats
select {
case cdrStats = <-internalCdrStatSChan:
internalCdrStatSChan <- cdrStats
case <-time.After(cfg.InternalTtl):
engine.Logger.Crit("<Rater>: Internal cdrstats connection timeout.")
exitChan <- true
return
}
} else if cdrStats, err = engine.NewProxyStats(cfg.RaterCdrStats, cfg.ConnectAttempts, -1); err != nil {
engine.Logger.Crit(fmt.Sprintf("<CdrStats> Could not connect to the server, error: %s", err.Error()))
engine.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to cdrstats, error: %s", err.Error()))
exitChan <- true
return
}
@@ -121,10 +140,16 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
defer close(histTaskChan)
var scribeServer history.Scribe
if cfg.RaterHistoryServer == utils.INTERNAL {
scribeServer = <-internalHistorySChan
internalHistorySChan <- scribeServer
select {
case scribeServer = <-internalHistorySChan:
internalHistorySChan <- scribeServer
case <-time.After(cfg.InternalTtl):
engine.Logger.Crit("<Rater>: Internal historys connection timeout.")
exitChan <- true
return
}
} else if scribeServer, err = history.NewProxyScribe(cfg.RaterHistoryServer, cfg.ConnectAttempts, -1); err != nil {
engine.Logger.Crit(fmt.Sprintf("<HistoryServer> Could not connect to the server, error: %s", err.Error()))
engine.Logger.Crit(fmt.Sprintf("<Rater> Could not connect historys, error: %s", err.Error()))
exitChan <- true
return
}
@@ -140,10 +165,16 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
defer close(pubsubTaskChan)
var pubSubServer engine.PublisherSubscriber
if cfg.RaterPubSubServer == utils.INTERNAL {
pubSubServer = <-internalPubSubSChan
internalPubSubSChan <- pubSubServer
select {
case pubSubServer = <-internalPubSubSChan:
internalPubSubSChan <- pubSubServer
case <-time.After(cfg.InternalTtl):
engine.Logger.Crit("<Rater>: Internal pubsub connection timeout.")
exitChan <- true
return
}
} else if pubSubServer, err = engine.NewProxyPubSub(cfg.RaterPubSubServer, cfg.ConnectAttempts, -1); err != nil {
engine.Logger.Crit(fmt.Sprintf("<PubSubServer> Could not connect to the server, error: %s", err.Error()))
engine.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to pubsubs: %s", err.Error()))
exitChan <- true
return
}
@@ -159,10 +190,16 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
defer close(aliasesTaskChan)
var aliasesServer engine.AliasService
if cfg.RaterAliasesServer == utils.INTERNAL {
aliasesServer = <-internalAliaseSChan
internalAliaseSChan <- aliasesServer
select {
case aliasesServer = <-internalAliaseSChan:
internalAliaseSChan <- aliasesServer
case <-time.After(cfg.InternalTtl):
engine.Logger.Crit("<Rater>: Internal aliases connection timeout.")
exitChan <- true
return
}
} else if aliasesServer, err = engine.NewProxyAliasService(cfg.RaterAliasesServer, cfg.ConnectAttempts, -1); err != nil {
engine.Logger.Crit(fmt.Sprintf("<AliasesServer> Could not connect to the server, error: %s", err.Error()))
engine.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to aliases, error: %s", err.Error()))
exitChan <- true
return
}
@@ -178,10 +215,16 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
go func() {
defer close(usersTaskChan)
if cfg.RaterUserServer == utils.INTERNAL {
userServer = <-internalUserSChan
internalUserSChan <- userServer
select {
case userServer = <-internalUserSChan:
internalUserSChan <- userServer
case <-time.After(cfg.InternalTtl):
engine.Logger.Crit("<Rater>: Internal users connection timeout.")
exitChan <- true
return
}
} else if userServer, err = engine.NewProxyUserService(cfg.RaterUserServer, cfg.ConnectAttempts, -1); err != nil {
engine.Logger.Crit(fmt.Sprintf("<UserServer> Could not connect to the server, error: %s", err.Error()))
engine.Logger.Crit(fmt.Sprintf("<Rater> Could not connect users, error: %s", err.Error()))
exitChan <- true
return
}

View File

@@ -189,6 +189,7 @@ type CGRConfig struct {
DefaultTimezone string // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb>
ConnectAttempts int // number of initial connection attempts before giving up
InternalTtl time.Duration // maximum duration to wait for internal connections before giving up
RoundingDecimals int // Number of decimals to round end prices at
HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate
TpExportPath string // Path towards export folder for offline Tariff Plans
@@ -563,6 +564,11 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
if jsnGeneralCfg.Default_timezone != nil {
self.DefaultTimezone = *jsnGeneralCfg.Default_timezone
}
if jsnGeneralCfg.Internal_ttl != nil {
if self.InternalTtl, err = utils.ParseDurationWithSecs(*jsnGeneralCfg.Internal_ttl); err != nil {
return err
}
}
}
if jsnListenCfg != nil {

View File

@@ -37,8 +37,9 @@ const CGRATES_CFG_JSON = `
"default_tenant": "cgrates.org", // default Tenant to consider when missing from requests
"default_subject": "cgrates", // default rating Subject to consider when missing from requests
"default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"connect_attempts": 3, // initial server connect attempts
"reconnects": -1, // number of retries in case of connection lost
"connect_attempts": 3, // initial server connect attempts
"reconnects": -1, // number of retries in case of connection lost
"internal_ttl": "2m", // maximum duration to wait for internal connections before giving up
},
@@ -83,7 +84,7 @@ const CGRATES_CFG_JSON = `
"balancer": {
"enabled": false, // start Balancer service: <true|false>
"enabled": false, // start Balancer service: <true|false>
},
@@ -94,7 +95,7 @@ const CGRATES_CFG_JSON = `
"historys": "", // address where to reach the history service, empty to disable history functionality: <""|internal|x.y.z.y:1234>
"pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
"users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
"aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234>
"aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234>
},
@@ -108,11 +109,10 @@ const CGRATES_CFG_JSON = `
"extra_fields": [], // extra fields to store in CDRs for non-generic CDRs
"store_cdrs": true, // store cdrs in storDb
"rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234>
"pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
"pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
"users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
"aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234>
"aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234>
"cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
"reconnects": 5, // number of reconnect attempts to rater or cdrs
"cdr_replication":[], // replicate the raw CDR to a number of servers
},
@@ -129,7 +129,7 @@ const CGRATES_CFG_JSON = `
"field_separator": ",",
"data_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from KBytes to Bytes)
"sms_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from SMS unit to call duration in some billing systems)
"generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems)
"generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems)
"cost_multiply_factor": 1, // multiply cost before export, eg: add VAT
"cost_rounding_decimals": -1, // rounding decimals for Cost values. -1 to disable rounding
"cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents)
@@ -200,7 +200,6 @@ const CGRATES_CFG_JSON = `
"enabled": false, // starts SessionManager service: <true|false>
"rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
"cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
"reconnects": 5, // number of reconnect attempts to rater or cdrs
"create_cdr": false, // create CDR out of events and sends them to CDRS component
"extra_fields": [], // extra fields to store in auth/CDRs when creating them
"debit_interval": "10s", // interval to perform debits on.
@@ -222,7 +221,6 @@ const CGRATES_CFG_JSON = `
"enabled": false, // starts SessionManager service: <true|false>
"rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013>
"cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
"reconnects": 5, // number of reconnect attempts to rater or cdrs
"create_cdr": false, // create CDR out of events and sends them to CDRS component
"debit_interval": "10s", // interval to perform debits on.
"min_call_duration": "0s", // only authorize calls with allowed duration higher than this
@@ -259,10 +257,12 @@ const CGRATES_CFG_JSON = `
"enabled": false, // starts PubSub service: <true|false>.
},
"aliases": {
"enabled": false, // starts Aliases service: <true|false>.
},
"users": {
"enabled": false, // starts User service: <true|false>.
"indexes": [], // user profile field indexes

View File

@@ -48,7 +48,8 @@ func TestDfGeneralJsonCfg(t *testing.T) {
Default_subject: utils.StringPointer("cgrates"),
Default_timezone: utils.StringPointer("Local"),
Connect_attempts: utils.IntPointer(3),
Reconnects: utils.IntPointer(-1)}
Reconnects: utils.IntPointer(-1),
Internal_ttl: utils.StringPointer("2m")}
if gCfg, err := dfCgrJsonCfg.GeneralJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, gCfg) {

View File

@@ -31,6 +31,7 @@ type GeneralJsonCfg struct {
Default_timezone *string
Reconnects *int
Connect_attempts *int
Internal_ttl *string
}
// Listen config section

View File

@@ -16,8 +16,10 @@
// "default_category": "call", // default Type of Record to consider when missing from requests
// "default_tenant": "cgrates.org", // default Tenant to consider when missing from requests
// "default_subject": "cgrates", // default rating Subject to consider when missing from requests
// "connect_attempts": 3, // initial server connect attempts
// "reconnects": -1, // number of retries in case of connection lost
// "default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
// "connect_attempts": 3, // initial server connect attempts
// "reconnects": -1, // number of retries in case of connection lost
// "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up
//},
@@ -45,6 +47,7 @@
// "db_name": "11", // data_db database name to connect to
// "db_user": "", // username to use when connecting to data_db
// "db_passwd": "", // password to use when connecting to data_db
// "load_history_size": 10, // Number of records in the load history
//},
@@ -55,8 +58,8 @@
// "db_name": "cgrates", // stor database name
// "db_user": "cgrates", // username to use when connecting to stordb
// "db_passwd": "CGRateS.org", // password to use when connecting to stordb
// "max_open_conns": 0, // maximum database connections opened
// "max_idle_conns": -1, // maximum database connections idle
// "max_open_conns": 100, // maximum database connections opened
// "max_idle_conns": 10, // maximum database connections idle
//},
@@ -72,7 +75,7 @@
// "historys": "", // address where to reach the history service, empty to disable history functionality: <""|internal|x.y.z.y:1234>
// "pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
// "users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
// "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234>
// "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234>
//},
@@ -88,9 +91,8 @@
// "rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234>
// "pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
// "users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
// "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234>
// "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234>
// "cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
// "reconnects": 5, // number of reconnect attempts to rater or cdrs
// "cdr_replication":[], // replicate the raw CDR to a number of servers
//},
@@ -107,7 +109,7 @@
// "field_separator": ",",
// "data_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from KBytes to Bytes)
// "sms_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from SMS unit to call duration in some billing systems)
// "generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems)
// "generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems)
// "cost_multiply_factor": 1, // multiply cost before export, eg: add VAT
// "cost_rounding_decimals": -1, // rounding decimals for Cost values. -1 to disable rounding
// "cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents)
@@ -144,6 +146,7 @@
// "cdrs": "internal", // address where to reach CDR server. <internal|x.y.z.y:1234>
// "cdr_format": "csv", // CDR file format <csv|freeswitch_csv|fwv|opensips_flatstore>
// "field_separator": ",", // separator used in case of csv files
// "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
// "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify
// "max_open_files": 1024, // maximum simultaneous files to process, 0 for unlimited
// "data_usage_multiply_factor": 1024, // conversion factor for data usage
@@ -179,7 +182,7 @@
// "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
// "reconnects": 5, // number of reconnect attempts to rater or cdrs
// "create_cdr": false, // create CDR out of events and sends them to CDRS component
// "cdr_extra_fields": [], // extra fields to store in CDRs when creating them
// "extra_fields": [], // extra fields to store in auth/CDRs when creating them
// "debit_interval": "10s", // interval to perform debits on.
// "min_call_duration": "0s", // only authorize calls with allowed duration higher than this
// "max_call_duration": "3h", // maximum call duration a prepaid call can last
@@ -237,14 +240,16 @@
//},
//"aliases": {
// "enabled": false, // starts Aliases service: <true|false>.
//},
//"users": {
// "enabled": false, // starts User service: <true|false>.
// "indexes": [], // user profile field indexes
//},
//"aliases": {
// "enabled": false, // starts Aliases service: <true|false>.
//},
//"mailer": {
// "server": "localhost", // the server to use when sending emails out