Implement reply_timeout opt for RPCPool

This commit is contained in:
ionutboangiu
2022-09-19 18:30:06 +03:00
committed by Dan Christian Bogos
parent cf144cee74
commit 9dc0133207
4 changed files with 125 additions and 93 deletions

View File

@@ -87,23 +87,26 @@ const CGRATES_CFG_JSON = `
"rpc_conns": {
//"*localhost": {
//"conns": [
//{
//"address": "127.0.0.1:2012",
//"transport":"*json",
//"connect_attempts": 5,
//"reconnects": -1,
//"max_reconnect_interval": ""
//"connect_timeout":"1s",
//"reply_timeout":"2s",
//"tls":false,
//"client_key":"",
//"client_certificate":"",
//"ca_certificate":""
//}
//],
//},
// "*localhost": {
// "strategy": "*first"
// "poolsize": 0,
// "reply_timeout": "2s",
// "conns": [
// {
// "address": "127.0.0.1:2012",
// "transport":"*json",
// "connect_attempts": 5,
// "reconnects": -1,
// "max_reconnect_interval": ""
// "connect_timeout":"1s",
// "reply_timeout":"2s",
// "tls":false,
// "client_key":"",
// "client_certificate":"",
// "ca_certificate":""
// }
// ],
// },
}, // rpc connections definitions

View File

@@ -105,12 +105,13 @@ func (rC RPCConns) Clone() (cln RPCConns) {
// RPCConn the connection pool config
type RPCConn struct {
Strategy string
PoolSize int
Conns []*RemoteHost
Strategy string
PoolSize int
ReplyTimeout time.Duration
Conns []*RemoteHost
}
func (rC *RPCConn) loadFromJSONCfg(jsnCfg *RPCConnJson) {
func (rC *RPCConn) loadFromJSONCfg(jsnCfg *RPCConnJson) (err error) {
if jsnCfg == nil {
return
}
@@ -120,6 +121,11 @@ func (rC *RPCConn) loadFromJSONCfg(jsnCfg *RPCConnJson) {
if jsnCfg.PoolSize != nil {
rC.PoolSize = *jsnCfg.PoolSize
}
if jsnCfg.Reply_timeout != nil {
if rC.ReplyTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.Reply_timeout); err != nil {
return
}
}
if jsnCfg.Conns != nil {
rC.Conns = make([]*RemoteHost, len(*jsnCfg.Conns))
for idx, jsnHaCfg := range *jsnCfg.Conns {
@@ -127,20 +133,24 @@ func (rC *RPCConn) loadFromJSONCfg(jsnCfg *RPCConnJson) {
rC.Conns[idx].loadFromJSONCfg(jsnHaCfg) //To review if the function signature changes
}
}
return
}
// AsMapInterface returns the config as a map[string]interface{}
func (rC *RPCConn) AsMapInterface() (initialMP map[string]interface{}) {
initialMP = map[string]interface{}{
func (rC *RPCConn) AsMapInterface() (mp map[string]interface{}) {
mp = map[string]interface{}{
utils.StrategyCfg: rC.Strategy,
utils.PoolSize: rC.PoolSize,
}
if rC.ReplyTimeout != 0 {
mp[utils.ReplyTimeoutCfg] = rC.ReplyTimeout
}
if rC.Conns != nil {
conns := make([]map[string]interface{}, len(rC.Conns))
for i, item := range rC.Conns {
conns[i] = item.AsMapInterface()
}
initialMP[utils.Conns] = conns
mp[utils.Conns] = conns
}
return
}
@@ -148,8 +158,9 @@ func (rC *RPCConn) AsMapInterface() (initialMP map[string]interface{}) {
// Clone returns a deep copy of RPCConn
func (rC RPCConn) Clone() (cln *RPCConn) {
cln = &RPCConn{
Strategy: rC.Strategy,
PoolSize: rC.PoolSize,
Strategy: rC.Strategy,
PoolSize: rC.PoolSize,
ReplyTimeout: rC.ReplyTimeout,
}
if rC.Conns != nil {
cln.Conns = make([]*RemoteHost, len(rC.Conns))
@@ -396,9 +407,10 @@ func diffRemoteHostJson(v1, v2 *RemoteHost) (d *RemoteHostJson) {
}
type RPCConnJson struct {
Strategy *string
PoolSize *int
Conns *[]*RemoteHostJson
Strategy *string
PoolSize *int
Reply_timeout *string
Conns *[]*RemoteHostJson
}
func diffRPCConnJson(d *RPCConnJson, v1, v2 *RPCConn) *RPCConnJson {
@@ -411,6 +423,9 @@ func diffRPCConnJson(d *RPCConnJson, v1, v2 *RPCConn) *RPCConnJson {
if v1.PoolSize != v2.PoolSize {
d.PoolSize = utils.IntPointer(v2.PoolSize)
}
if v1.ReplyTimeout != v2.ReplyTimeout {
d.Reply_timeout = utils.StringPointer(v2.ReplyTimeout.String())
}
if v2.Conns != nil {
conns := make([]*RemoteHostJson, len(v2.Conns))
dft := getDftRemHstCfg()
@@ -450,6 +465,7 @@ func equalsRPCConn(v1, v2 *RPCConn) bool {
(v1 != nil && v2 != nil &&
v1.Strategy == v2.Strategy &&
v1.PoolSize == v2.PoolSize &&
v1.ReplyTimeout == v2.ReplyTimeout &&
equalsRemoteHosts(v1.Conns, v2.Conns))
}

View File

@@ -65,23 +65,26 @@
// "rpc_conns": {
// //"*localhost": {
// //"conns": [
// //{
// //"address": "127.0.0.1:2012",
// //"transport":"*json",
// //"connect_attempts": 5,
// //"reconnects": -1,
// //"max_reconnect_interval": ""
// //"connect_timeout":"1s",
// //"reply_timeout":"2s",
// //"tls":false,
// //"client_key":"",
// //"client_certificate":"",
// //"ca_certificate":""
// //}
// //],
// //},
// // "*localhost": {
// // "strategy": "*first"
// // "poolsize": 0,
// // "reply_timeout": "2s",
// // "conns": [
// // {
// // "address": "127.0.0.1:2012",
// // "transport":"*json",
// // "connect_attempts": 5,
// // "reconnects": -1,
// // "max_reconnect_interval": ""
// // "connect_timeout":"1s",
// // "reply_timeout":"2s",
// // "tls":false,
// // "client_key":"",
// // "client_certificate":"",
// // "ca_certificate":""
// // }
// // ],
// // },
// }, // rpc connections definitions
@@ -203,51 +206,52 @@
// "caches":{
// "partitions": {
// "*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control resource profiles caching
// "*resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control resources caching
// "*event_resources": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // matching resources to events
// "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // statqueue profiles
// "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // statqueues with metrics
// "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control threshold profiles caching
// "*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control thresholds caching
// "*filters": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control filters caching
// "*route_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control route profile caching
// "*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control attribute profile caching
// "*charger_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control charger profile caching
// "*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control dispatcher profile caching
// "*dispatcher_hosts": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control dispatcher hosts caching
// "*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control rate profile caching
// "*action_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control action profile caching
// "*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control account profile caching
// "*resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control resource filter indexes caching
// "*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control stat filter indexes caching
// "*threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control threshold filter indexes caching
// "*route_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control route filter indexes caching
// "*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control attribute filter indexes caching
// "*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control charger filter indexes caching
// "*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher filter indexes caching
// "*rate_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control rate profile filter indexes caching
// "*rate_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control rate filter indexes caching
// "*action_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control action profile filter indexes caching
// "*account_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control coount profile filter indexes caching
// "*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control reverse filter indexes caching used only for set and remove filters
// "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher routes caching
// "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load( in case of *ratio ConnParams is present)
// "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher interface
// "*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // diameter messages caching
// "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "replicate": false}, // RPC responses caching
// "*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false, "replicate": false}, // closed sessions cached for CDRs
// "*event_charges": {"limit": 0, "ttl": "10s", "static_ttl": false, "replicate": false}, // events proccessed by ChargerS
// "*cdr_ids": {"limit": -1, "ttl": "10m", "static_ttl": false, "replicate": false}, // protects CDRs against double-charging
// "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control the load_ids for items
// "*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // RPC connections caching
// "*uch": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // User cache
// "*stir": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // stirShaken cache keys
// "*apiban":{"limit": -1, "ttl": "2m", "static_ttl": false, "replicate": false},
// "*caps_events": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // caps cached samples
// "*replication_hosts": {"limit": 0, "ttl": "", "static_ttl": false, "replicate": false}, // the replication hosts cache(used when replication_filtered is enbled)
// "*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control resource profiles caching
// "*resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control resources caching
// "*event_resources": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // matching resources to events
// "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // statqueue profiles
// "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // statqueues with metrics
// "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control threshold profiles caching
// "*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control thresholds caching
// "*filters": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control filters caching
// "*route_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control route profile caching
// "*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control attribute profile caching
// "*charger_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control charger profile caching
// "*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control dispatcher profile caching
// "*dispatcher_hosts": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control dispatcher hosts caching
// "*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control rate profile caching
// "*action_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control action profile caching
// "*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control account profile caching
// "*resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control resource filter indexes caching
// "*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control stat filter indexes caching
// "*threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control threshold filter indexes caching
// "*route_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control route filter indexes caching
// "*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control attribute filter indexes caching
// "*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control charger filter indexes caching
// "*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control dispatcher filter indexes caching
// "*rate_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control rate profile filter indexes caching
// "*rate_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control rate filter indexes caching
// "*action_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control action profile filter indexes caching
// "*account_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control coount profile filter indexes caching
// "*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control reverse filter indexes caching used only for set and remove filters
// "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control dispatcher routes caching
// "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control dispatcher load( in case of *ratio ConnParams is present)
// "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control dispatcher interface
// "*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "remote":false, "replicate": false}, // diameter messages caching
// "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "remote":false, "replicate": false}, // RPC responses caching
// "*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false, "remote":false, "replicate": false}, // closed sessions cached for CDRs
// "*event_charges": {"limit": 0, "ttl": "10s", "static_ttl": false, "remote":false, "replicate": false}, // events proccessed by ChargerS
// "*cdr_ids": {"limit": -1, "ttl": "10m", "static_ttl": false, "remote":false, "replicate": false}, // protects CDRs against double-charging
// "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control the load_ids for items
// "*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // RPC connections caching
// "*uch": {"limit": -1, "ttl": "3h", "static_ttl": false, "remote":false, "replicate": false}, // User cache
// "*stir": {"limit": -1, "ttl": "3h", "static_ttl": false, "remote":false, "replicate": false}, // stirShaken cache keys
// "*apiban":{"limit": -1, "ttl": "2m", "static_ttl": false, "remote":false, "replicate": false},
// "*caps_events": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // caps cached samples
// "*replication_hosts": {"limit": 0, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // the replication hosts cache(used when replication_filtered is enbled)
// },
// "replication_conns": [],
// "remote_conns": [], // the conns that are queried when the items are not found in cache
// },
@@ -373,8 +377,14 @@
// // "kafkaTopic": "cgrates", // the topic from were the events are read
// // "kafkaGroupID": "cgrates", // the group that reads the events
// // "kafkaMaxWait": "1ms", // the maximum amount of time to wait for new data to come
// // "kafkaTLS": false, // if set to true it will try to authenticate the server
// // "kafkaCAPath": "",
// // "kafkaSkipTLSVerify": false,
// // "kafkaTopicProcessed": "", the topic were the events are sent after they are processed
// // "kafkaTopicProcessed": "", //the topic were the events are sent after they are processed
// // "kafkaTLSProcessed": false,
// // "kafkaCAPathProcessed": "",
// // "kafkaSkipTLSVerifyProcessed": false,
// // SQL
// // "sqlDBName": "cgrates", // the name of the database from were the events are read
@@ -503,6 +513,7 @@
// // Kafka
// // "kafkaTopic": "cgrates", // the topic from where the events are exported
// // "kafkaTLS": false, // if set to true it will try to authenticate the server
// // "kafkaCAPath": "", // path to certificate authority pem file
// // "kafkaSkipTLSVerify": false, // if set to true it will skip certificate verification

View File

@@ -124,7 +124,8 @@ func (cM *ConnManager) getConnWithConfig(ctx *context.Context, connID string, co
cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate,
cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects,
cM.cfg.GeneralCfg().MaxReconnectInterval, cM.cfg.GeneralCfg().ConnectTimeout,
cM.cfg.GeneralCfg().ReplyTimeout, connCfg.Conns, intChan, false, ctx.Client,
utils.FirstDurationNonEmpty(connCfg.ReplyTimeout, cM.cfg.GeneralCfg().ReplyTimeout),
connCfg.Conns, intChan, false, ctx.Client,
connID, cM.connCache); err != nil {
return
}
@@ -178,8 +179,9 @@ func (cM *ConnManager) CallWithConnIDs(connIDs []string, ctx *context.Context, s
// recreate the config with only conns that are needed
connCfg := cM.cfg.RPCConns()[connID]
newCfg := &config.RPCConn{
Strategy: connCfg.Strategy,
PoolSize: connCfg.PoolSize,
Strategy: connCfg.Strategy,
PoolSize: connCfg.PoolSize,
ReplyTimeout: connCfg.ReplyTimeout,
// alloc for all connection in order to not increase the size later
Conns: make([]*config.RemoteHost, 0, len(connCfg.Conns)),
}