From d7f2330c6bceb4f5e287e1b0fb46570f790e65e4 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 30 May 2016 21:13:42 +0200 Subject: [PATCH] New config options: httpposter_attempts, connect_timeout, reply_timeout, locking_timeout, fix connection drop detection in case of HA --- cmd/cgr-console/cgr-console.go | 3 +- cmd/cgr-engine/cgr-engine.go | 32 +++---- cmd/cgr-engine/rater.go | 10 +- cmd/cgr-loader/cgr-loader.go | 3 +- cmd/cgr-tester/cdr_repl/process_cdr.go | 2 +- config/config.go | 52 ++++++++--- config/config_defaults.go | 4 + config/config_json_test.go | 6 +- config/libconfig_json.go | 4 + data/conf/cgrates/cgrates.json | 5 + engine/action.go | 6 +- engine/libengine.go | 9 +- engine/pubsub.go | 4 +- general_tests/cdrs_replication_it_test.go | 4 +- general_tests/rpcclient_it_test.go | 106 +++++++++++++++++++++- glide.lock | 2 +- sessionmanager/smgeneric.go | 2 +- 17 files changed, 196 insertions(+), 58 deletions(-) diff --git a/cmd/cgr-console/cgr-console.go b/cmd/cgr-console/cgr-console.go index d4723a4d7..2f9286d76 100644 --- a/cmd/cgr-console/cgr-console.go +++ b/cmd/cgr-console/cgr-console.go @@ -26,6 +26,7 @@ import ( "log" "os" "strings" + "time" "github.com/cgrates/cgrates/console" "github.com/cgrates/cgrates/utils" @@ -109,7 +110,7 @@ func main() { return } var err error - client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3, *rpc_encoding, nil) + client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), *rpc_encoding, nil) if err != nil { flag.PrintDefaults() log.Fatal("Could not connect to server " + *server) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 88f3a2401..6c5463192 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -109,7 +109,7 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break } - cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cdrcCfg.CdrsConns, internalCdrSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %s", err.Error())) @@ -132,7 +132,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal utils.Logger.Info("Starting CGRateS SMGeneric service.") var ralsConns, cdrsConn *rpcclient.RpcClientPool if len(cfg.SmGenericConfig.RALsConns) != 0 { - ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.SmGenericConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RALs: %s", err.Error())) @@ -141,7 +141,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal } } if len(cfg.SmGenericConfig.CDRsConns) != 0 { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.SmGenericConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RALs: %s", err.Error())) @@ -172,7 +172,7 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC utils.Logger.Info("Starting CGRateS DiameterAgent service.") var smgConn, pubsubConn *rpcclient.RpcClientPool if len(cfg.DiameterAgentCfg().SMGenericConns) != 0 { - smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.DiameterAgentCfg().SMGenericConns, internalSMGChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to SMG: %s", err.Error())) @@ -181,7 +181,7 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC } } if len(cfg.DiameterAgentCfg().PubSubConns) != 0 { - pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.DiameterAgentCfg().PubSubConns, internalPubSubSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubS: %s", err.Error())) @@ -205,7 +205,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcCli utils.Logger.Info("Starting CGRateS SMFreeSWITCH service.") var ralsConn, cdrsConn *rpcclient.RpcClientPool if len(cfg.SmFsConfig.RALsConns) != 0 { - ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.SmFsConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) @@ -214,7 +214,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcCli } } if len(cfg.SmFsConfig.CDRsConns) != 0 { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.SmFsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) @@ -234,7 +234,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien utils.Logger.Info("Starting CGRateS SMKamailio service.") var ralsConn, cdrsConn *rpcclient.RpcClientPool if len(cfg.SmKamConfig.RALsConns) != 0 { - ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.SmKamConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) @@ -243,7 +243,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien } } if len(cfg.SmKamConfig.CDRsConns) != 0 { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.SmKamConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) @@ -263,7 +263,7 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien utils.Logger.Info("Starting CGRateS SMOpenSIPS service.") var ralsConn, cdrsConn *rpcclient.RpcClientPool if len(cfg.SmOsipsConfig.RALsConns) != 0 { - ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.SmOsipsConfig.RALsConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RALs: %s", err.Error())) @@ -272,7 +272,7 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien } } if len(cfg.SmOsipsConfig.CDRsConns) != 0 { - cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.SmOsipsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRs: %s", err.Error())) @@ -295,7 +295,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine utils.Logger.Info("Starting CGRateS CDRS service.") var ralConn, pubSubConn, usersConn, aliasesConn, statsConn *rpcclient.RpcClientPool if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL - ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) @@ -304,7 +304,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine } } if len(cfg.CDRSPubSubSConns) != 0 { // Pubsub connection init - pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.CDRSPubSubSConns, internalPubSubSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubSystem: %s", err.Error())) @@ -313,7 +313,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine } } if len(cfg.CDRSUserSConns) != 0 { // Users connection init - usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to UserS: %s", err.Error())) @@ -322,7 +322,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine } } if len(cfg.CDRSAliaseSConns) != 0 { // Aliases connection init - aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.CDRSAliaseSConns, internalAliaseSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to AliaseS: %s", err.Error())) @@ -332,7 +332,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine } if len(cfg.CDRSStatSConns) != 0 { // Stats connection init - statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.CDRSStatSConns, internalCdrStatSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 8fe1ee27d..e15a413a0 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -113,7 +113,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, cdrstatTaskChan) go func() { defer close(cdrstatTaskChan) - cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.RALsCDRStatSConns, internalCdrStatSChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRStatS, error: %s", err.Error())) @@ -127,7 +127,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, histTaskChan) go func() { defer close(histTaskChan) - if historySConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + if historySConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.RALsHistorySConns, internalHistorySChan, cfg.InternalTtl); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect HistoryS, error: %s", err.Error())) exitChan <- true @@ -142,7 +142,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, pubsubTaskChan) go func() { defer close(pubsubTaskChan) - if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.RALsPubSubSConns, internalPubSubSChan, cfg.InternalTtl); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubS: %s", err.Error())) exitChan <- true @@ -157,7 +157,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, aliasesTaskChan) go func() { defer close(aliasesTaskChan) - if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.RALsAliasSConns, internalAliaseSChan, cfg.InternalTtl); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to AliaseS, error: %s", err.Error())) exitChan <- true @@ -173,7 +173,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, usersTaskChan) go func() { defer close(usersTaskChan) - if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, + if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB, cfg.RALsUserSConns, internalUserSChan, cfg.InternalTtl); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect UserS, error: %s", err.Error())) exitChan <- true diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 409918918..269a6c026 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -26,6 +26,7 @@ import ( "path" "strconv" "strings" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -287,7 +288,7 @@ func main() { return } if *historyServer != "" { // Init scribeAgent so we can store the differences - if scribeAgent, err := rpcclient.NewRpcClient("tcp", *historyServer, 3, 3, utils.GOB, nil); err != nil { + if scribeAgent, err := rpcclient.NewRpcClient("tcp", *historyServer, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), utils.GOB, nil); err != nil { log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error()) return } else { diff --git a/cmd/cgr-tester/cdr_repl/process_cdr.go b/cmd/cgr-tester/cdr_repl/process_cdr.go index 77166a747..8c0594412 100644 --- a/cmd/cgr-tester/cdr_repl/process_cdr.go +++ b/cmd/cgr-tester/cdr_repl/process_cdr.go @@ -25,7 +25,7 @@ func main() { if cdrsMasterCfg, err = config.NewCGRConfigFromFolder(cdrsMasterCfgPath); err != nil { log.Fatal("Got config error: ", err.Error()) } - cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, "json", nil) + cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil) if err != nil { log.Fatal("Could not connect to rater: ", err.Error()) } diff --git a/config/config.go b/config/config.go index 736923a6c..41b66ae32 100644 --- a/config/config.go +++ b/config/config.go @@ -191,23 +191,27 @@ type CGRConfig struct { StorDBMaxOpenConns int // Maximum database connections opened StorDBMaxIdleConns int // Maximum idle connections to keep opened StorDBCDRSIndexes []string - DBDataEncoding string // The encoding used to store object data in strings: - RPCJSONListen string // RPC JSON listening address - RPCGOBListen string // RPC GOB listening address - HTTPListen string // HTTP listening address - DefaultReqType string // Use this request type if not defined on top - DefaultCategory string // set default type of record - DefaultTenant string // set default tenant - DefaultTimezone string // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> - Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb> - ConnectAttempts int // number of initial connection attempts before giving up - ResponseCacheTTL time.Duration // the life span of a cached response - InternalTtl time.Duration // maximum duration to wait for internal connections before giving up - RoundingDecimals int // Number of decimals to round end prices at - HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate - TpExportPath string // Path towards export folder for offline Tariff Plans + DBDataEncoding string // The encoding used to store object data in strings: + RPCJSONListen string // RPC JSON listening address + RPCGOBListen string // RPC GOB listening address + HTTPListen string // HTTP listening address + DefaultReqType string // Use this request type if not defined on top + DefaultCategory string // set default type of record + DefaultTenant string // set default tenant + DefaultTimezone string // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> + Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb> + ConnectTimeout time.Duration // timeout for RPC connection attempts + ReplyTimeout time.Duration // timeout replies if not reaching back + ConnectAttempts int // number of initial connection attempts before giving up + ResponseCacheTTL time.Duration // the life span of a cached response + InternalTtl time.Duration // maximum duration to wait for internal connections before giving up + RoundingDecimals int // Number of decimals to round end prices at + HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate + TpExportPath string // Path towards export folder for offline Tariff Plans + HttpPosterAttempts int HttpFailedDir string // Directory path where we store failed http requests MaxCallDuration time.Duration // The maximum call duration (used by responder when querying DerivedCharging) // ToDo: export it in configuration file + LockingTimeout time.Duration // locking mechanism timeout to avoid deadlocks RALsEnabled bool // start standalone server (no balancer) RALsBalancer string // balancer address host:port RALsCDRStatSConns []*HaPoolConfig // address where to reach the cdrstats service. Empty to disable stats gathering <""|internal|x.y.z.y:1234> @@ -657,6 +661,16 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if jsnGeneralCfg.Reconnects != nil { self.Reconnects = *jsnGeneralCfg.Reconnects } + if jsnGeneralCfg.Connect_timeout != nil { + if self.ConnectTimeout, err = utils.ParseDurationWithSecs(*jsnGeneralCfg.Connect_timeout); err != nil { + return err + } + } + if jsnGeneralCfg.Reply_timeout != nil { + if self.ReplyTimeout, err = utils.ParseDurationWithSecs(*jsnGeneralCfg.Reply_timeout); err != nil { + return err + } + } if jsnGeneralCfg.Rounding_decimals != nil { self.RoundingDecimals = *jsnGeneralCfg.Rounding_decimals } @@ -666,6 +680,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if jsnGeneralCfg.Tpexport_dir != nil { self.TpExportPath = *jsnGeneralCfg.Tpexport_dir } + if jsnGeneralCfg.Httpposter_attempts != nil { + self.HttpPosterAttempts = *jsnGeneralCfg.Httpposter_attempts + } if jsnGeneralCfg.Http_failed_dir != nil { self.HttpFailedDir = *jsnGeneralCfg.Http_failed_dir } @@ -677,6 +694,11 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { return err } } + if jsnGeneralCfg.Locking_timeout != nil { + if self.LockingTimeout, err = utils.ParseDurationWithSecs(*jsnGeneralCfg.Locking_timeout); err != nil { + return err + } + } } if jsnListenCfg != nil { diff --git a/config/config_defaults.go b/config/config_defaults.go index e9809266b..5b1ea8175 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -32,6 +32,7 @@ const CGRATES_CFG_JSON = ` "rounding_decimals": 5, // system level precision for floats "dbdata_encoding": "msgpack", // encoding used to store object data in strings: "tpexport_dir": "/var/log/cgrates/tpe", // path towards export folder for offline Tariff Plans + "httpposter_attempts": 3, // number of http attempts before considering request failed (eg: *call_url) "http_failed_dir": "/var/log/cgrates/http_failed", // directory path where we store failed http requests "default_request_type": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated> "default_category": "call", // default category to consider when missing from requests @@ -39,8 +40,11 @@ const CGRATES_CFG_JSON = ` "default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> "connect_attempts": 3, // initial server connect attempts "reconnects": -1, // number of retries in case of connection lost + "connect_timeout": "1s", // consider connection unsuccessful on timeout, 0 to disable the feature + "reply_timeout": "2s", // consider connection down for replies taking longer than this value "response_cache_ttl": "0s", // the life span of a cached response "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up + "locking_timeout": "5s", // timeout internal locks to avoid deadlocks }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 0dcde2f8e..2d84fe02f 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -43,6 +43,7 @@ func TestDfGeneralJsonCfg(t *testing.T) { Rounding_decimals: utils.IntPointer(5), Dbdata_encoding: utils.StringPointer("msgpack"), Tpexport_dir: utils.StringPointer("/var/log/cgrates/tpe"), + Httpposter_attempts: utils.IntPointer(3), Http_failed_dir: utils.StringPointer("/var/log/cgrates/http_failed"), Default_request_type: utils.StringPointer(utils.META_RATED), Default_category: utils.StringPointer("call"), @@ -50,8 +51,11 @@ func TestDfGeneralJsonCfg(t *testing.T) { Default_timezone: utils.StringPointer("Local"), Connect_attempts: utils.IntPointer(3), Reconnects: utils.IntPointer(-1), + Connect_timeout: utils.StringPointer("1s"), + Reply_timeout: utils.StringPointer("2s"), Response_cache_ttl: utils.StringPointer("0s"), - Internal_ttl: utils.StringPointer("2m")} + Internal_ttl: utils.StringPointer("2m"), + Locking_timeout: utils.StringPointer("5s")} if gCfg, err := dfCgrJsonCfg.GeneralJsonCfg(); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCfg, gCfg) { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index a6bae87eb..e80e50cee 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -24,6 +24,7 @@ type GeneralJsonCfg struct { Rounding_decimals *int Dbdata_encoding *string Tpexport_dir *string + Httpposter_attempts *int Http_failed_dir *string Default_request_type *string Default_category *string @@ -31,8 +32,11 @@ type GeneralJsonCfg struct { Default_timezone *string Connect_attempts *int Reconnects *int + Connect_timeout *string + Reply_timeout *string Response_cache_ttl *string Internal_ttl *string + Locking_timeout *string } // Listen config section diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index b08f4d751..53e3e8910 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -11,6 +11,7 @@ // "rounding_decimals": 5, // system level precision for floats // "dbdata_encoding": "msgpack", // encoding used to store object data in strings: // "tpexport_dir": "/var/log/cgrates/tpe", // path towards export folder for offline Tariff Plans +// "httpposter_attempts": 3, // number of http attempts before considering request failed (eg: *call_url) // "http_failed_dir": "/var/log/cgrates/http_failed", // directory path where we store failed http requests // "default_request_type": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated> // "default_category": "call", // default category to consider when missing from requests @@ -18,8 +19,11 @@ // "default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> // "connect_attempts": 3, // initial server connect attempts // "reconnects": -1, // number of retries in case of connection lost +// "connect_timeout": "1s", // consider connection unsuccessful on timeout, 0 to disable the feature +// "reply_timeout": "2s", // consider connection down for replies taking longer than this value // "response_cache_ttl": "0s", // the life span of a cached response // "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up +// "locking_timeout": "5s", // timeout internal locks to avoid deadlocks // }, @@ -162,6 +166,7 @@ // "cdr_in_dir": "/var/log/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored // "cdr_out_dir": "/var/log/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved // "failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records +// "cdr_path": "", // path towards one CDR element in case of XML CDRs // "cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database // "cdr_filter": "", // filter CDR records to import // "continue_on_success": false, // continue to the next template if executed diff --git a/engine/action.go b/engine/action.go index fd9e2b295..aefd9cf34 100644 --- a/engine/action.go +++ b/engine/action.go @@ -423,7 +423,7 @@ func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error } cfg := config.CgrConfig() fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID())) - _, _, err = utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, 1, fallbackPath, false) + _, _, err = utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, config.CgrConfig().HttpPosterAttempts, fallbackPath, false) return err } @@ -442,7 +442,7 @@ func callUrlAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) } cfg := config.CgrConfig() fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID())) - go utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, 3, fallbackPath, false) + go utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, config.CgrConfig().HttpPosterAttempts, fallbackPath, false) return nil } @@ -686,7 +686,7 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti } var client rpcclient.RpcClientConnection if req.Address != utils.MetaInternal { - if client, err = rpcclient.NewRpcClient("tcp", req.Address, req.Attempts, 0, req.Transport, nil); err != nil { + if client, err = rpcclient.NewRpcClient("tcp", req.Address, req.Attempts, 0, config.CgrConfig().ConnectTimeout, config.CgrConfig().ReplyTimeout, req.Transport, nil); err != nil { return err } } else { diff --git a/engine/libengine.go b/engine/libengine.go index 11c9f6f38..531b89397 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -27,7 +27,7 @@ import ( "github.com/cgrates/rpcclient" ) -func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, codec string, +func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration, codec string, rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) { var rpcClient *rpcclient.RpcClient var err error @@ -42,13 +42,10 @@ func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, codec str case <-time.After(ttl): return nil, errors.New("TTL triggered") } - rpcClient, err = rpcclient.NewRpcClient("", "", 0, 0, rpcclient.INTERNAL_RPC, internalConn) + rpcClient, err = rpcclient.NewRpcClient("", "", connAttempts, reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn) } else { - rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, codec, nil) + rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil) } - //if err != nil { // Commented so we pass the last error instead of first - // break - //} if err == nil { atLestOneConnected = true } diff --git a/engine/pubsub.go b/engine/pubsub.go index fa031d6aa..6d6d6c59d 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -213,8 +213,8 @@ type ProxyPubSub struct { Client *rpcclient.RpcClient } -func NewProxyPubSub(addr string, attempts, reconnects int) (*ProxyPubSub, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil) +func NewProxyPubSub(addr string, attempts, reconnects int, connectTimeout, replyTimeout time.Duration) (*ProxyPubSub, error) { + client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, connectTimeout, replyTimeout, utils.GOB, nil) if err != nil { return nil, err } diff --git a/general_tests/cdrs_replication_it_test.go b/general_tests/cdrs_replication_it_test.go index e5df836cc..1a496cd0b 100644 --- a/general_tests/cdrs_replication_it_test.go +++ b/general_tests/cdrs_replication_it_test.go @@ -91,7 +91,7 @@ func TestCdrsHttpCdrReplication(t *testing.T) { if !*testIntegration { return } - cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, "json", nil) + cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil) if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -108,7 +108,7 @@ func TestCdrsHttpCdrReplication(t *testing.T) { t.Error("Unexpected reply received: ", reply) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) - cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1, "json", nil) + cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil) if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } diff --git a/general_tests/rpcclient_it_test.go b/general_tests/rpcclient_it_test.go index 454c7aaf3..f478f5024 100644 --- a/general_tests/rpcclient_it_test.go +++ b/general_tests/rpcclient_it_test.go @@ -19,6 +19,8 @@ along with this program. If not, see package general_tests import ( + "flag" + "fmt" "os/exec" "path" "testing" @@ -37,7 +39,9 @@ var rpcRAL1, rpcRAL2 *rpcclient.RpcClient var rpcPoolFirst *rpcclient.RpcClientPool var ral1, ral2 *exec.Cmd var err error -var ral1ID, ral2ID string +var ral1ID, ral2ID, ralRmtID string + +var testRemoteRALs = flag.Bool("remote_rals", false, "Perform the tests in integration mode, not by default.") // This flag will be passed here via "go test -local" args func TestRPCITInitCfg(t *testing.T) { if !*testIntegration { @@ -71,12 +75,12 @@ func TestRPCITRpcConnPool(t *testing.T) { return } rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST) - rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 3, 1, rpcclient.JSON_RPC, nil) + rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil) if err == nil { t.Fatal("Should receive cannot connect error here") } rpcPoolFirst.AddClient(rpcRAL1) - rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, 3, 1, rpcclient.JSON_RPC, nil) + rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil) if err != nil { t.Fatal(err) } @@ -195,3 +199,99 @@ func TestRPCITDirectedRPC(t *testing.T) { t.Errorf("Received sessions: %+v", sessions) } } + +// Special tests involving remote server (manually set) +// The server network will be manually disconnected without TCP close +func TestRPCITRmtRpcConnPool(t *testing.T) { + if !*testIntegration { + return + } + rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST) + rpcRALRmt, err := rpcclient.NewRpcClient("tcp", "172.16.254.83:2012", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil) + if err != nil { + t.Fatal(err) + } + rpcPoolFirst.AddClient(rpcRALRmt) + rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil) + if err != nil { + t.Fatal(err) + } + rpcPoolFirst.AddClient(rpcRAL1) +} + +func TestRPCITRmtStatusFirstInitial(t *testing.T) { + if !*testRemoteRALs { + return + } + var status map[string]interface{} + if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } else if status[utils.InstanceID].(string) == ral1ID { + t.Fatal("Should receive ralID different than first one") + } else { + ralRmtID = status[utils.InstanceID].(string) + } + if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { // Make sure second time we land on the same instance + t.Error(err) + } else if status[utils.InstanceID].(string) != ralRmtID { + t.Errorf("Expecting: %s, received: %s", ralRmtID, status[utils.InstanceID].(string)) + } +} + +func TestRPCITRmtStatusFirstFailover(t *testing.T) { + if !*testRemoteRALs { + return + } + fmt.Println("Ready for doing failover") + remaining := 5 + for i := 0; i < remaining; i++ { + fmt.Printf("\n\t%d", remaining-i) + time.Sleep(1 * time.Second) + } + fmt.Println("\n\nExecuting query ...") + var status map[string]interface{} + if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } else if status[utils.InstanceID].(string) != ral1ID { + t.Fatal("Did not do failover") + } + if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } else if status[utils.InstanceID].(string) != ral1ID { + t.Fatal("Did not do failover") + } +} + +func TestRPCITRmtStatusFirstFailback(t *testing.T) { + if !*testRemoteRALs { + return + } + fmt.Println("Ready for doing failback") + remaining := 10 + for i := 0; i < remaining; i++ { + fmt.Printf("\n\t%d", remaining-i) + time.Sleep(1 * time.Second) + } + fmt.Println("\n\nExecuting query ...") + var status map[string]interface{} + if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } else if status[utils.InstanceID].(string) != ralRmtID { + t.Fatal("Did not do failback") + } + if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { + t.Error(err) + } else if status[utils.InstanceID].(string) == "" { + t.Error("Empty InstanceID received") + } else if status[utils.InstanceID].(string) != ralRmtID { + t.Fatal("Did not do failback") + } +} diff --git a/glide.lock b/glide.lock index 0ebb9e3ba..7157b9e78 100644 --- a/glide.lock +++ b/glide.lock @@ -12,7 +12,7 @@ imports: - name: github.com/cgrates/osipsdagram version: 3d6beed663452471dec3ca194137a30d379d9e8f - name: github.com/cgrates/rpcclient - version: 9a6185f8a2093ce10f1a08242b0d757f24795800 + version: d9a94e52e08bf98a288c9460ce6adb661a6c089b - name: github.com/ChrisTrenkamp/goxpath version: 4aad8d0161aae7d17df4755d2c1e86cd1fcaaab6 subpackages: diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 53c0d66a7..eb4873bd8 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -190,7 +190,7 @@ func (self *SMGeneric) sessionStart(evStart SMGenericEvent, connId string) error } } return true, nil - }, time.Duration(2)*time.Second, sessionId) + }, self.cgrCfg.LockingTimeout, sessionId) if processed == nil || processed == false { utils.Logger.Err(" Cannot start session, empty reply") return utils.ErrServerError