diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index fbbfd6730..301d387dd 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -20,6 +20,7 @@ package main import ( "fmt" + "time" "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/apier/v2" @@ -71,8 +72,14 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c waitTasks = append(waitTasks, schedTaskChan) go func() { defer close(schedTaskChan) - sched = <-internalSchedulerChan - internalSchedulerChan <- sched + select { + case sched = <-internalSchedulerChan: + internalSchedulerChan <- sched + case <-time.After(cfg.InternalTtl): + engine.Logger.Crit(": Internal scheduler connection timeout.") + exitChan <- true + return + } }() } @@ -85,8 +92,14 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c go func() { defer close(balTaskChan) if cfg.RaterBalancer == utils.INTERNAL { - bal = <-internalBalancerChan - internalBalancerChan <- bal // Put it back if someone else is interested about + select { + case bal = <-internalBalancerChan: + internalBalancerChan <- bal // Put it back if someone else is interested about + case <-time.After(cfg.InternalTtl): + engine.Logger.Crit(": Internal balancer connection timeout.") + exitChan <- true + return + } } else { go registerToBalancer(exitChan) go stopRaterSignalHandler(internalCdrStatSChan, exitChan) @@ -103,10 +116,16 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c go func() { defer close(cdrstatTaskChan) if cfg.RaterCdrStats == utils.INTERNAL { - cdrStats = <-internalCdrStatSChan - internalCdrStatSChan <- cdrStats + select { + case cdrStats = <-internalCdrStatSChan: + internalCdrStatSChan <- cdrStats + case <-time.After(cfg.InternalTtl): + engine.Logger.Crit(": Internal cdrstats connection timeout.") + exitChan <- true + return + } } else if cdrStats, err = engine.NewProxyStats(cfg.RaterCdrStats, cfg.ConnectAttempts, -1); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) + engine.Logger.Crit(fmt.Sprintf(" Could not connect to cdrstats, error: %s", err.Error())) exitChan <- true return } @@ -121,10 +140,16 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c defer close(histTaskChan) var scribeServer history.Scribe if cfg.RaterHistoryServer == utils.INTERNAL { - scribeServer = <-internalHistorySChan - internalHistorySChan <- scribeServer + select { + case scribeServer = <-internalHistorySChan: + internalHistorySChan <- scribeServer + case <-time.After(cfg.InternalTtl): + engine.Logger.Crit(": Internal historys connection timeout.") + exitChan <- true + return + } } else if scribeServer, err = history.NewProxyScribe(cfg.RaterHistoryServer, cfg.ConnectAttempts, -1); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) + engine.Logger.Crit(fmt.Sprintf(" Could not connect historys, error: %s", err.Error())) exitChan <- true return } @@ -140,10 +165,16 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c defer close(pubsubTaskChan) var pubSubServer engine.PublisherSubscriber if cfg.RaterPubSubServer == utils.INTERNAL { - pubSubServer = <-internalPubSubSChan - internalPubSubSChan <- pubSubServer + select { + case pubSubServer = <-internalPubSubSChan: + internalPubSubSChan <- pubSubServer + case <-time.After(cfg.InternalTtl): + engine.Logger.Crit(": Internal pubsub connection timeout.") + exitChan <- true + return + } } else if pubSubServer, err = engine.NewProxyPubSub(cfg.RaterPubSubServer, cfg.ConnectAttempts, -1); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) + engine.Logger.Crit(fmt.Sprintf(" Could not connect to pubsubs: %s", err.Error())) exitChan <- true return } @@ -159,10 +190,16 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c defer close(aliasesTaskChan) var aliasesServer engine.AliasService if cfg.RaterAliasesServer == utils.INTERNAL { - aliasesServer = <-internalAliaseSChan - internalAliaseSChan <- aliasesServer + select { + case aliasesServer = <-internalAliaseSChan: + internalAliaseSChan <- aliasesServer + case <-time.After(cfg.InternalTtl): + engine.Logger.Crit(": Internal aliases connection timeout.") + exitChan <- true + return + } } else if aliasesServer, err = engine.NewProxyAliasService(cfg.RaterAliasesServer, cfg.ConnectAttempts, -1); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) + engine.Logger.Crit(fmt.Sprintf(" Could not connect to aliases, error: %s", err.Error())) exitChan <- true return } @@ -178,10 +215,16 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c go func() { defer close(usersTaskChan) if cfg.RaterUserServer == utils.INTERNAL { - userServer = <-internalUserSChan - internalUserSChan <- userServer + select { + case userServer = <-internalUserSChan: + internalUserSChan <- userServer + case <-time.After(cfg.InternalTtl): + engine.Logger.Crit(": Internal users connection timeout.") + exitChan <- true + return + } } else if userServer, err = engine.NewProxyUserService(cfg.RaterUserServer, cfg.ConnectAttempts, -1); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to the server, error: %s", err.Error())) + engine.Logger.Crit(fmt.Sprintf(" Could not connect users, error: %s", err.Error())) exitChan <- true return } diff --git a/config/config.go b/config/config.go index 41f64d958..563650c6e 100644 --- a/config/config.go +++ b/config/config.go @@ -189,6 +189,7 @@ type CGRConfig struct { DefaultTimezone string // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb> ConnectAttempts int // number of initial connection attempts before giving up + InternalTtl time.Duration // maximum duration to wait for internal connections before giving up RoundingDecimals int // Number of decimals to round end prices at HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate TpExportPath string // Path towards export folder for offline Tariff Plans @@ -563,6 +564,11 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if jsnGeneralCfg.Default_timezone != nil { self.DefaultTimezone = *jsnGeneralCfg.Default_timezone } + if jsnGeneralCfg.Internal_ttl != nil { + if self.InternalTtl, err = utils.ParseDurationWithSecs(*jsnGeneralCfg.Internal_ttl); err != nil { + return err + } + } } if jsnListenCfg != nil { diff --git a/config/config_defaults.go b/config/config_defaults.go index 39382f811..41edf9703 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -37,8 +37,9 @@ const CGRATES_CFG_JSON = ` "default_tenant": "cgrates.org", // default Tenant to consider when missing from requests "default_subject": "cgrates", // default rating Subject to consider when missing from requests "default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> - "connect_attempts": 3, // initial server connect attempts - "reconnects": -1, // number of retries in case of connection lost + "connect_attempts": 3, // initial server connect attempts + "reconnects": -1, // number of retries in case of connection lost + "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up }, @@ -83,7 +84,7 @@ const CGRATES_CFG_JSON = ` "balancer": { - "enabled": false, // start Balancer service: + "enabled": false, // start Balancer service: }, @@ -94,7 +95,7 @@ const CGRATES_CFG_JSON = ` "historys": "", // address where to reach the history service, empty to disable history functionality: <""|internal|x.y.z.y:1234> "pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> "users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> - "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234> + "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234> }, @@ -108,11 +109,10 @@ const CGRATES_CFG_JSON = ` "extra_fields": [], // extra fields to store in CDRs for non-generic CDRs "store_cdrs": true, // store cdrs in storDb "rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234> - "pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> + "pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> "users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> - "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234> + "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234> "cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> - "reconnects": 5, // number of reconnect attempts to rater or cdrs "cdr_replication":[], // replicate the raw CDR to a number of servers }, @@ -129,7 +129,7 @@ const CGRATES_CFG_JSON = ` "field_separator": ",", "data_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from KBytes to Bytes) "sms_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from SMS unit to call duration in some billing systems) - "generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems) + "generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems) "cost_multiply_factor": 1, // multiply cost before export, eg: add VAT "cost_rounding_decimals": -1, // rounding decimals for Cost values. -1 to disable rounding "cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents) @@ -200,7 +200,6 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts SessionManager service: "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> - "reconnects": 5, // number of reconnect attempts to rater or cdrs "create_cdr": false, // create CDR out of events and sends them to CDRS component "extra_fields": [], // extra fields to store in auth/CDRs when creating them "debit_interval": "10s", // interval to perform debits on. @@ -222,7 +221,6 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts SessionManager service: "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> - "reconnects": 5, // number of reconnect attempts to rater or cdrs "create_cdr": false, // create CDR out of events and sends them to CDRS component "debit_interval": "10s", // interval to perform debits on. "min_call_duration": "0s", // only authorize calls with allowed duration higher than this @@ -259,10 +257,12 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts PubSub service: . }, + "aliases": { "enabled": false, // starts Aliases service: . }, + "users": { "enabled": false, // starts User service: . "indexes": [], // user profile field indexes diff --git a/config/config_json_test.go b/config/config_json_test.go index 864663323..27fae03bb 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -48,7 +48,8 @@ func TestDfGeneralJsonCfg(t *testing.T) { Default_subject: utils.StringPointer("cgrates"), Default_timezone: utils.StringPointer("Local"), Connect_attempts: utils.IntPointer(3), - Reconnects: utils.IntPointer(-1)} + Reconnects: utils.IntPointer(-1), + Internal_ttl: utils.StringPointer("2m")} if gCfg, err := dfCgrJsonCfg.GeneralJsonCfg(); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCfg, gCfg) { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 8e3fe2d6a..34529fb03 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -31,6 +31,7 @@ type GeneralJsonCfg struct { Default_timezone *string Reconnects *int Connect_attempts *int + Internal_ttl *string } // Listen config section diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index afe5206e1..c44004943 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -16,8 +16,10 @@ // "default_category": "call", // default Type of Record to consider when missing from requests // "default_tenant": "cgrates.org", // default Tenant to consider when missing from requests // "default_subject": "cgrates", // default rating Subject to consider when missing from requests -// "connect_attempts": 3, // initial server connect attempts -// "reconnects": -1, // number of retries in case of connection lost +// "default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> +// "connect_attempts": 3, // initial server connect attempts +// "reconnects": -1, // number of retries in case of connection lost +// "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up //}, @@ -45,6 +47,7 @@ // "db_name": "11", // data_db database name to connect to // "db_user": "", // username to use when connecting to data_db // "db_passwd": "", // password to use when connecting to data_db +// "load_history_size": 10, // Number of records in the load history //}, @@ -55,8 +58,8 @@ // "db_name": "cgrates", // stor database name // "db_user": "cgrates", // username to use when connecting to stordb // "db_passwd": "CGRateS.org", // password to use when connecting to stordb -// "max_open_conns": 0, // maximum database connections opened -// "max_idle_conns": -1, // maximum database connections idle +// "max_open_conns": 100, // maximum database connections opened +// "max_idle_conns": 10, // maximum database connections idle //}, @@ -72,7 +75,7 @@ // "historys": "", // address where to reach the history service, empty to disable history functionality: <""|internal|x.y.z.y:1234> // "pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> // "users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> -// "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234> +// "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234> //}, @@ -88,9 +91,8 @@ // "rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234> // "pubsubs": "", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234> // "users": "", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234> -// "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234> +// "aliases": "", // address where to reach the aliases service, empty to disable aliases functionality: <""|internal|x.y.z.y:1234> // "cdrstats": "", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234> -// "reconnects": 5, // number of reconnect attempts to rater or cdrs // "cdr_replication":[], // replicate the raw CDR to a number of servers //}, @@ -107,7 +109,7 @@ // "field_separator": ",", // "data_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from KBytes to Bytes) // "sms_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from SMS unit to call duration in some billing systems) -// "generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems) +// "generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems) // "cost_multiply_factor": 1, // multiply cost before export, eg: add VAT // "cost_rounding_decimals": -1, // rounding decimals for Cost values. -1 to disable rounding // "cost_shift_digits": 0, // shift digits in the cost on export (eg: convert from EUR to cents) @@ -144,6 +146,7 @@ // "cdrs": "internal", // address where to reach CDR server. // "cdr_format": "csv", // CDR file format // "field_separator": ",", // separator used in case of csv files +// "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> // "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify // "max_open_files": 1024, // maximum simultaneous files to process, 0 for unlimited // "data_usage_multiply_factor": 1024, // conversion factor for data usage @@ -179,7 +182,7 @@ // "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> // "reconnects": 5, // number of reconnect attempts to rater or cdrs // "create_cdr": false, // create CDR out of events and sends them to CDRS component -// "cdr_extra_fields": [], // extra fields to store in CDRs when creating them +// "extra_fields": [], // extra fields to store in auth/CDRs when creating them // "debit_interval": "10s", // interval to perform debits on. // "min_call_duration": "0s", // only authorize calls with allowed duration higher than this // "max_call_duration": "3h", // maximum call duration a prepaid call can last @@ -237,14 +240,16 @@ //}, +//"aliases": { +// "enabled": false, // starts Aliases service: . +//}, + + //"users": { // "enabled": false, // starts User service: . // "indexes": [], // user profile field indexes //}, -//"aliases": { -// "enabled": false, // starts Aliases service: . -//}, //"mailer": { // "server": "localhost", // the server to use when sending emails out