New config options: httpposter_attempts, connect_timeout, reply_timeout, locking_timeout, fix connection drop detection in case of HA

This commit is contained in:
DanB
2016-05-30 21:13:42 +02:00
parent 35c8536d7c
commit d7f2330c6b
17 changed files with 196 additions and 58 deletions

View File

@@ -26,6 +26,7 @@ import (
"log"
"os"
"strings"
"time"
"github.com/cgrates/cgrates/console"
"github.com/cgrates/cgrates/utils"
@@ -109,7 +110,7 @@ func main() {
return
}
var err error
client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3, *rpc_encoding, nil)
client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), *rpc_encoding, nil)
if err != nil {
flag.PrintDefaults()
log.Fatal("Could not connect to server " + *server)

View File

@@ -109,7 +109,7 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
break
}
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cdrcCfg.CdrsConns, internalCdrSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %s", err.Error()))
@@ -132,7 +132,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal
utils.Logger.Info("Starting CGRateS SMGeneric service.")
var ralsConns, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SmGenericConfig.RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.SmGenericConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to RALs: %s", err.Error()))
@@ -141,7 +141,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal
}
}
if len(cfg.SmGenericConfig.CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.SmGenericConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to RALs: %s", err.Error()))
@@ -172,7 +172,7 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC
utils.Logger.Info("Starting CGRateS DiameterAgent service.")
var smgConn, pubsubConn *rpcclient.RpcClientPool
if len(cfg.DiameterAgentCfg().SMGenericConns) != 0 {
smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.DiameterAgentCfg().SMGenericConns, internalSMGChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to SMG: %s", err.Error()))
@@ -181,7 +181,7 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC
}
}
if len(cfg.DiameterAgentCfg().PubSubConns) != 0 {
pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.DiameterAgentCfg().PubSubConns, internalPubSubSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to PubSubS: %s", err.Error()))
@@ -205,7 +205,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcCli
utils.Logger.Info("Starting CGRateS SMFreeSWITCH service.")
var ralsConn, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SmFsConfig.RALsConns) != 0 {
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.SmFsConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMFreeSWITCH> Could not connect to RAL: %s", err.Error()))
@@ -214,7 +214,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcCli
}
}
if len(cfg.SmFsConfig.CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.SmFsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMFreeSWITCH> Could not connect to RAL: %s", err.Error()))
@@ -234,7 +234,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
utils.Logger.Info("Starting CGRateS SMKamailio service.")
var ralsConn, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SmKamConfig.RALsConns) != 0 {
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.SmKamConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMKamailio> Could not connect to RAL: %s", err.Error()))
@@ -243,7 +243,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
}
}
if len(cfg.SmKamConfig.CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.SmKamConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMKamailio> Could not connect to RAL: %s", err.Error()))
@@ -263,7 +263,7 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
utils.Logger.Info("Starting CGRateS SMOpenSIPS service.")
var ralsConn, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SmOsipsConfig.RALsConns) != 0 {
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.SmOsipsConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMOpenSIPS> Could not connect to RALs: %s", err.Error()))
@@ -272,7 +272,7 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
}
}
if len(cfg.SmOsipsConfig.CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.SmOsipsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMOpenSIPS> Could not connect to CDRs: %s", err.Error()))
@@ -295,7 +295,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
utils.Logger.Info("Starting CGRateS CDRS service.")
var ralConn, pubSubConn, usersConn, aliasesConn, statsConn *rpcclient.RpcClientPool
if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL
ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to RAL: %s", err.Error()))
@@ -304,7 +304,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
}
}
if len(cfg.CDRSPubSubSConns) != 0 { // Pubsub connection init
pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.CDRSPubSubSConns, internalPubSubSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to PubSubSystem: %s", err.Error()))
@@ -313,7 +313,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
}
}
if len(cfg.CDRSUserSConns) != 0 { // Users connection init
usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to UserS: %s", err.Error()))
@@ -322,7 +322,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
}
}
if len(cfg.CDRSAliaseSConns) != 0 { // Aliases connection init
aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.CDRSAliaseSConns, internalAliaseSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to AliaseS: %s", err.Error()))
@@ -332,7 +332,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
}
if len(cfg.CDRSStatSConns) != 0 { // Stats connection init
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.CDRSStatSConns, internalCdrStatSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to StatS: %s", err.Error()))

View File

@@ -113,7 +113,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, cdrstatTaskChan)
go func() {
defer close(cdrstatTaskChan)
cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.RALsCDRStatSConns, internalCdrStatSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to CDRStatS, error: %s", err.Error()))
@@ -127,7 +127,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, histTaskChan)
go func() {
defer close(histTaskChan)
if historySConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
if historySConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.RALsHistorySConns, internalHistorySChan, cfg.InternalTtl); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect HistoryS, error: %s", err.Error()))
exitChan <- true
@@ -142,7 +142,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, pubsubTaskChan)
go func() {
defer close(pubsubTaskChan)
if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.RALsPubSubSConns, internalPubSubSChan, cfg.InternalTtl); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to PubSubS: %s", err.Error()))
exitChan <- true
@@ -157,7 +157,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, aliasesTaskChan)
go func() {
defer close(aliasesTaskChan)
if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.RALsAliasSConns, internalAliaseSChan, cfg.InternalTtl); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to AliaseS, error: %s", err.Error()))
exitChan <- true
@@ -173,7 +173,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, usersTaskChan)
go func() {
defer close(usersTaskChan)
if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cfg.RALsUserSConns, internalUserSChan, cfg.InternalTtl); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect UserS, error: %s", err.Error()))
exitChan <- true

View File

@@ -26,6 +26,7 @@ import (
"path"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -287,7 +288,7 @@ func main() {
return
}
if *historyServer != "" { // Init scribeAgent so we can store the differences
if scribeAgent, err := rpcclient.NewRpcClient("tcp", *historyServer, 3, 3, utils.GOB, nil); err != nil {
if scribeAgent, err := rpcclient.NewRpcClient("tcp", *historyServer, 3, 3, time.Duration(1*time.Second), time.Duration(5*time.Minute), utils.GOB, nil); err != nil {
log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error())
return
} else {

View File

@@ -25,7 +25,7 @@ func main() {
if cdrsMasterCfg, err = config.NewCGRConfigFromFolder(cdrsMasterCfgPath); err != nil {
log.Fatal("Got config error: ", err.Error())
}
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, "json", nil)
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil)
if err != nil {
log.Fatal("Could not connect to rater: ", err.Error())
}

View File

@@ -191,23 +191,27 @@ type CGRConfig struct {
StorDBMaxOpenConns int // Maximum database connections opened
StorDBMaxIdleConns int // Maximum idle connections to keep opened
StorDBCDRSIndexes []string
DBDataEncoding string // The encoding used to store object data in strings: <msgpack|json>
RPCJSONListen string // RPC JSON listening address
RPCGOBListen string // RPC GOB listening address
HTTPListen string // HTTP listening address
DefaultReqType string // Use this request type if not defined on top
DefaultCategory string // set default type of record
DefaultTenant string // set default tenant
DefaultTimezone string // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb>
ConnectAttempts int // number of initial connection attempts before giving up
ResponseCacheTTL time.Duration // the life span of a cached response
InternalTtl time.Duration // maximum duration to wait for internal connections before giving up
RoundingDecimals int // Number of decimals to round end prices at
HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate
TpExportPath string // Path towards export folder for offline Tariff Plans
DBDataEncoding string // The encoding used to store object data in strings: <msgpack|json>
RPCJSONListen string // RPC JSON listening address
RPCGOBListen string // RPC GOB listening address
HTTPListen string // HTTP listening address
DefaultReqType string // Use this request type if not defined on top
DefaultCategory string // set default type of record
DefaultTenant string // set default tenant
DefaultTimezone string // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb>
ConnectTimeout time.Duration // timeout for RPC connection attempts
ReplyTimeout time.Duration // timeout replies if not reaching back
ConnectAttempts int // number of initial connection attempts before giving up
ResponseCacheTTL time.Duration // the life span of a cached response
InternalTtl time.Duration // maximum duration to wait for internal connections before giving up
RoundingDecimals int // Number of decimals to round end prices at
HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate
TpExportPath string // Path towards export folder for offline Tariff Plans
HttpPosterAttempts int
HttpFailedDir string // Directory path where we store failed http requests
MaxCallDuration time.Duration // The maximum call duration (used by responder when querying DerivedCharging) // ToDo: export it in configuration file
LockingTimeout time.Duration // locking mechanism timeout to avoid deadlocks
RALsEnabled bool // start standalone server (no balancer)
RALsBalancer string // balancer address host:port
RALsCDRStatSConns []*HaPoolConfig // address where to reach the cdrstats service. Empty to disable stats gathering <""|internal|x.y.z.y:1234>
@@ -657,6 +661,16 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
if jsnGeneralCfg.Reconnects != nil {
self.Reconnects = *jsnGeneralCfg.Reconnects
}
if jsnGeneralCfg.Connect_timeout != nil {
if self.ConnectTimeout, err = utils.ParseDurationWithSecs(*jsnGeneralCfg.Connect_timeout); err != nil {
return err
}
}
if jsnGeneralCfg.Reply_timeout != nil {
if self.ReplyTimeout, err = utils.ParseDurationWithSecs(*jsnGeneralCfg.Reply_timeout); err != nil {
return err
}
}
if jsnGeneralCfg.Rounding_decimals != nil {
self.RoundingDecimals = *jsnGeneralCfg.Rounding_decimals
}
@@ -666,6 +680,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
if jsnGeneralCfg.Tpexport_dir != nil {
self.TpExportPath = *jsnGeneralCfg.Tpexport_dir
}
if jsnGeneralCfg.Httpposter_attempts != nil {
self.HttpPosterAttempts = *jsnGeneralCfg.Httpposter_attempts
}
if jsnGeneralCfg.Http_failed_dir != nil {
self.HttpFailedDir = *jsnGeneralCfg.Http_failed_dir
}
@@ -677,6 +694,11 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
return err
}
}
if jsnGeneralCfg.Locking_timeout != nil {
if self.LockingTimeout, err = utils.ParseDurationWithSecs(*jsnGeneralCfg.Locking_timeout); err != nil {
return err
}
}
}
if jsnListenCfg != nil {

View File

@@ -32,6 +32,7 @@ const CGRATES_CFG_JSON = `
"rounding_decimals": 5, // system level precision for floats
"dbdata_encoding": "msgpack", // encoding used to store object data in strings: <msgpack|json>
"tpexport_dir": "/var/log/cgrates/tpe", // path towards export folder for offline Tariff Plans
"httpposter_attempts": 3, // number of http attempts before considering request failed (eg: *call_url)
"http_failed_dir": "/var/log/cgrates/http_failed", // directory path where we store failed http requests
"default_request_type": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated>
"default_category": "call", // default category to consider when missing from requests
@@ -39,8 +40,11 @@ const CGRATES_CFG_JSON = `
"default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"connect_attempts": 3, // initial server connect attempts
"reconnects": -1, // number of retries in case of connection lost
"connect_timeout": "1s", // consider connection unsuccessful on timeout, 0 to disable the feature
"reply_timeout": "2s", // consider connection down for replies taking longer than this value
"response_cache_ttl": "0s", // the life span of a cached response
"internal_ttl": "2m", // maximum duration to wait for internal connections before giving up
"locking_timeout": "5s", // timeout internal locks to avoid deadlocks
},

View File

@@ -43,6 +43,7 @@ func TestDfGeneralJsonCfg(t *testing.T) {
Rounding_decimals: utils.IntPointer(5),
Dbdata_encoding: utils.StringPointer("msgpack"),
Tpexport_dir: utils.StringPointer("/var/log/cgrates/tpe"),
Httpposter_attempts: utils.IntPointer(3),
Http_failed_dir: utils.StringPointer("/var/log/cgrates/http_failed"),
Default_request_type: utils.StringPointer(utils.META_RATED),
Default_category: utils.StringPointer("call"),
@@ -50,8 +51,11 @@ func TestDfGeneralJsonCfg(t *testing.T) {
Default_timezone: utils.StringPointer("Local"),
Connect_attempts: utils.IntPointer(3),
Reconnects: utils.IntPointer(-1),
Connect_timeout: utils.StringPointer("1s"),
Reply_timeout: utils.StringPointer("2s"),
Response_cache_ttl: utils.StringPointer("0s"),
Internal_ttl: utils.StringPointer("2m")}
Internal_ttl: utils.StringPointer("2m"),
Locking_timeout: utils.StringPointer("5s")}
if gCfg, err := dfCgrJsonCfg.GeneralJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, gCfg) {

View File

@@ -24,6 +24,7 @@ type GeneralJsonCfg struct {
Rounding_decimals *int
Dbdata_encoding *string
Tpexport_dir *string
Httpposter_attempts *int
Http_failed_dir *string
Default_request_type *string
Default_category *string
@@ -31,8 +32,11 @@ type GeneralJsonCfg struct {
Default_timezone *string
Connect_attempts *int
Reconnects *int
Connect_timeout *string
Reply_timeout *string
Response_cache_ttl *string
Internal_ttl *string
Locking_timeout *string
}
// Listen config section

View File

@@ -11,6 +11,7 @@
// "rounding_decimals": 5, // system level precision for floats
// "dbdata_encoding": "msgpack", // encoding used to store object data in strings: <msgpack|json>
// "tpexport_dir": "/var/log/cgrates/tpe", // path towards export folder for offline Tariff Plans
// "httpposter_attempts": 3, // number of http attempts before considering request failed (eg: *call_url)
// "http_failed_dir": "/var/log/cgrates/http_failed", // directory path where we store failed http requests
// "default_request_type": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated>
// "default_category": "call", // default category to consider when missing from requests
@@ -18,8 +19,11 @@
// "default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
// "connect_attempts": 3, // initial server connect attempts
// "reconnects": -1, // number of retries in case of connection lost
// "connect_timeout": "1s", // consider connection unsuccessful on timeout, 0 to disable the feature
// "reply_timeout": "2s", // consider connection down for replies taking longer than this value
// "response_cache_ttl": "0s", // the life span of a cached response
// "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up
// "locking_timeout": "5s", // timeout internal locks to avoid deadlocks
// },
@@ -162,6 +166,7 @@
// "cdr_in_dir": "/var/log/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored
// "cdr_out_dir": "/var/log/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
// "failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records
// "cdr_path": "", // path towards one CDR element in case of XML CDRs
// "cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database
// "cdr_filter": "", // filter CDR records to import
// "continue_on_success": false, // continue to the next template if executed

View File

@@ -423,7 +423,7 @@ func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error
}
cfg := config.CgrConfig()
fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID()))
_, _, err = utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, 1, fallbackPath, false)
_, _, err = utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, config.CgrConfig().HttpPosterAttempts, fallbackPath, false)
return err
}
@@ -442,7 +442,7 @@ func callUrlAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions)
}
cfg := config.CgrConfig()
fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID()))
go utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, 3, fallbackPath, false)
go utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, config.CgrConfig().HttpPosterAttempts, fallbackPath, false)
return nil
}
@@ -686,7 +686,7 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti
}
var client rpcclient.RpcClientConnection
if req.Address != utils.MetaInternal {
if client, err = rpcclient.NewRpcClient("tcp", req.Address, req.Attempts, 0, req.Transport, nil); err != nil {
if client, err = rpcclient.NewRpcClient("tcp", req.Address, req.Attempts, 0, config.CgrConfig().ConnectTimeout, config.CgrConfig().ReplyTimeout, req.Transport, nil); err != nil {
return err
}
} else {

View File

@@ -27,7 +27,7 @@ import (
"github.com/cgrates/rpcclient"
)
func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, codec string,
func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration, codec string,
rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) {
var rpcClient *rpcclient.RpcClient
var err error
@@ -42,13 +42,10 @@ func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, codec str
case <-time.After(ttl):
return nil, errors.New("TTL triggered")
}
rpcClient, err = rpcclient.NewRpcClient("", "", 0, 0, rpcclient.INTERNAL_RPC, internalConn)
rpcClient, err = rpcclient.NewRpcClient("", "", connAttempts, reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn)
} else {
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, codec, nil)
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil)
}
//if err != nil { // Commented so we pass the last error instead of first
// break
//}
if err == nil {
atLestOneConnected = true
}

View File

@@ -213,8 +213,8 @@ type ProxyPubSub struct {
Client *rpcclient.RpcClient
}
func NewProxyPubSub(addr string, attempts, reconnects int) (*ProxyPubSub, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil)
func NewProxyPubSub(addr string, attempts, reconnects int, connectTimeout, replyTimeout time.Duration) (*ProxyPubSub, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, connectTimeout, replyTimeout, utils.GOB, nil)
if err != nil {
return nil, err
}

View File

@@ -91,7 +91,7 @@ func TestCdrsHttpCdrReplication(t *testing.T) {
if !*testIntegration {
return
}
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, "json", nil)
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil)
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
@@ -108,7 +108,7 @@ func TestCdrsHttpCdrReplication(t *testing.T) {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1, "json", nil)
cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), "json", nil)
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}

View File

@@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package general_tests
import (
"flag"
"fmt"
"os/exec"
"path"
"testing"
@@ -37,7 +39,9 @@ var rpcRAL1, rpcRAL2 *rpcclient.RpcClient
var rpcPoolFirst *rpcclient.RpcClientPool
var ral1, ral2 *exec.Cmd
var err error
var ral1ID, ral2ID string
var ral1ID, ral2ID, ralRmtID string
var testRemoteRALs = flag.Bool("remote_rals", false, "Perform the tests in integration mode, not by default.") // This flag will be passed here via "go test -local" args
func TestRPCITInitCfg(t *testing.T) {
if !*testIntegration {
@@ -71,12 +75,12 @@ func TestRPCITRpcConnPool(t *testing.T) {
return
}
rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 3, 1, rpcclient.JSON_RPC, nil)
rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil)
if err == nil {
t.Fatal("Should receive cannot connect error here")
}
rpcPoolFirst.AddClient(rpcRAL1)
rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, 3, 1, rpcclient.JSON_RPC, nil)
rpcRAL2, err = rpcclient.NewRpcClient("tcp", rpcITCfg2.RPCJSONListen, 3, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil)
if err != nil {
t.Fatal(err)
}
@@ -195,3 +199,99 @@ func TestRPCITDirectedRPC(t *testing.T) {
t.Errorf("Received sessions: %+v", sessions)
}
}
// Special tests involving remote server (manually set)
// The server network will be manually disconnected without TCP close
func TestRPCITRmtRpcConnPool(t *testing.T) {
if !*testIntegration {
return
}
rpcPoolFirst = rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
rpcRALRmt, err := rpcclient.NewRpcClient("tcp", "172.16.254.83:2012", 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil)
if err != nil {
t.Fatal(err)
}
rpcPoolFirst.AddClient(rpcRALRmt)
rpcRAL1, err = rpcclient.NewRpcClient("tcp", rpcITCfg1.RPCJSONListen, 1, 1, time.Duration(1*time.Second), time.Duration(2*time.Second), rpcclient.JSON_RPC, nil)
if err != nil {
t.Fatal(err)
}
rpcPoolFirst.AddClient(rpcRAL1)
}
func TestRPCITRmtStatusFirstInitial(t *testing.T) {
if !*testRemoteRALs {
return
}
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
} else if status[utils.InstanceID].(string) == ral1ID {
t.Fatal("Should receive ralID different than first one")
} else {
ralRmtID = status[utils.InstanceID].(string)
}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { // Make sure second time we land on the same instance
t.Error(err)
} else if status[utils.InstanceID].(string) != ralRmtID {
t.Errorf("Expecting: %s, received: %s", ralRmtID, status[utils.InstanceID].(string))
}
}
func TestRPCITRmtStatusFirstFailover(t *testing.T) {
if !*testRemoteRALs {
return
}
fmt.Println("Ready for doing failover")
remaining := 5
for i := 0; i < remaining; i++ {
fmt.Printf("\n\t%d", remaining-i)
time.Sleep(1 * time.Second)
}
fmt.Println("\n\nExecuting query ...")
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
} else if status[utils.InstanceID].(string) != ral1ID {
t.Fatal("Did not do failover")
}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
} else if status[utils.InstanceID].(string) != ral1ID {
t.Fatal("Did not do failover")
}
}
func TestRPCITRmtStatusFirstFailback(t *testing.T) {
if !*testRemoteRALs {
return
}
fmt.Println("Ready for doing failback")
remaining := 10
for i := 0; i < remaining; i++ {
fmt.Printf("\n\t%d", remaining-i)
time.Sleep(1 * time.Second)
}
fmt.Println("\n\nExecuting query ...")
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
} else if status[utils.InstanceID].(string) != ralRmtID {
t.Fatal("Did not do failback")
}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
t.Error(err)
} else if status[utils.InstanceID].(string) == "" {
t.Error("Empty InstanceID received")
} else if status[utils.InstanceID].(string) != ralRmtID {
t.Fatal("Did not do failback")
}
}

2
glide.lock generated
View File

@@ -12,7 +12,7 @@ imports:
- name: github.com/cgrates/osipsdagram
version: 3d6beed663452471dec3ca194137a30d379d9e8f
- name: github.com/cgrates/rpcclient
version: 9a6185f8a2093ce10f1a08242b0d757f24795800
version: d9a94e52e08bf98a288c9460ce6adb661a6c089b
- name: github.com/ChrisTrenkamp/goxpath
version: 4aad8d0161aae7d17df4755d2c1e86cd1fcaaab6
subpackages:

View File

@@ -190,7 +190,7 @@ func (self *SMGeneric) sessionStart(evStart SMGenericEvent, connId string) error
}
}
return true, nil
}, time.Duration(2)*time.Second, sessionId)
}, self.cgrCfg.LockingTimeout, sessionId)
if processed == nil || processed == false {
utils.Logger.Err("<SMGeneric> Cannot start session, empty reply")
return utils.ErrServerError