diff --git a/config/config_defaults.go b/config/config_defaults.go index d2ee0cbb6..98fd8c2cc 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -87,23 +87,26 @@ const CGRATES_CFG_JSON = ` "rpc_conns": { - //"*localhost": { - //"conns": [ - //{ - //"address": "127.0.0.1:2012", - //"transport":"*json", - //"connect_attempts": 5, - //"reconnects": -1, - //"max_reconnect_interval": "" - //"connect_timeout":"1s", - //"reply_timeout":"2s", - //"tls":false, - //"client_key":"", - //"client_certificate":"", - //"ca_certificate":"" - //} - //], - //}, + // "*localhost": { + // "strategy": "*first" + // "poolsize": 0, + // "reply_timeout": "2s", + // "conns": [ + // { + // "address": "127.0.0.1:2012", + // "transport":"*json", + // "connect_attempts": 5, + // "reconnects": -1, + // "max_reconnect_interval": "" + // "connect_timeout":"1s", + // "reply_timeout":"2s", + // "tls":false, + // "client_key":"", + // "client_certificate":"", + // "ca_certificate":"" + // } + // ], + // }, }, // rpc connections definitions diff --git a/config/rpcconn.go b/config/rpcconn.go index 2401b9db9..3a3506bb6 100644 --- a/config/rpcconn.go +++ b/config/rpcconn.go @@ -105,12 +105,13 @@ func (rC RPCConns) Clone() (cln RPCConns) { // RPCConn the connection pool config type RPCConn struct { - Strategy string - PoolSize int - Conns []*RemoteHost + Strategy string + PoolSize int + ReplyTimeout time.Duration + Conns []*RemoteHost } -func (rC *RPCConn) loadFromJSONCfg(jsnCfg *RPCConnJson) { +func (rC *RPCConn) loadFromJSONCfg(jsnCfg *RPCConnJson) (err error) { if jsnCfg == nil { return } @@ -120,6 +121,11 @@ func (rC *RPCConn) loadFromJSONCfg(jsnCfg *RPCConnJson) { if jsnCfg.PoolSize != nil { rC.PoolSize = *jsnCfg.PoolSize } + if jsnCfg.Reply_timeout != nil { + if rC.ReplyTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.Reply_timeout); err != nil { + return + } + } if jsnCfg.Conns != nil { rC.Conns = make([]*RemoteHost, len(*jsnCfg.Conns)) for idx, jsnHaCfg := range *jsnCfg.Conns { @@ -127,20 +133,24 @@ func (rC *RPCConn) loadFromJSONCfg(jsnCfg *RPCConnJson) { rC.Conns[idx].loadFromJSONCfg(jsnHaCfg) //To review if the function signature changes } } + return } // AsMapInterface returns the config as a map[string]interface{} -func (rC *RPCConn) AsMapInterface() (initialMP map[string]interface{}) { - initialMP = map[string]interface{}{ +func (rC *RPCConn) AsMapInterface() (mp map[string]interface{}) { + mp = map[string]interface{}{ utils.StrategyCfg: rC.Strategy, utils.PoolSize: rC.PoolSize, } + if rC.ReplyTimeout != 0 { + mp[utils.ReplyTimeoutCfg] = rC.ReplyTimeout + } if rC.Conns != nil { conns := make([]map[string]interface{}, len(rC.Conns)) for i, item := range rC.Conns { conns[i] = item.AsMapInterface() } - initialMP[utils.Conns] = conns + mp[utils.Conns] = conns } return } @@ -148,8 +158,9 @@ func (rC *RPCConn) AsMapInterface() (initialMP map[string]interface{}) { // Clone returns a deep copy of RPCConn func (rC RPCConn) Clone() (cln *RPCConn) { cln = &RPCConn{ - Strategy: rC.Strategy, - PoolSize: rC.PoolSize, + Strategy: rC.Strategy, + PoolSize: rC.PoolSize, + ReplyTimeout: rC.ReplyTimeout, } if rC.Conns != nil { cln.Conns = make([]*RemoteHost, len(rC.Conns)) @@ -396,9 +407,10 @@ func diffRemoteHostJson(v1, v2 *RemoteHost) (d *RemoteHostJson) { } type RPCConnJson struct { - Strategy *string - PoolSize *int - Conns *[]*RemoteHostJson + Strategy *string + PoolSize *int + Reply_timeout *string + Conns *[]*RemoteHostJson } func diffRPCConnJson(d *RPCConnJson, v1, v2 *RPCConn) *RPCConnJson { @@ -411,6 +423,9 @@ func diffRPCConnJson(d *RPCConnJson, v1, v2 *RPCConn) *RPCConnJson { if v1.PoolSize != v2.PoolSize { d.PoolSize = utils.IntPointer(v2.PoolSize) } + if v1.ReplyTimeout != v2.ReplyTimeout { + d.Reply_timeout = utils.StringPointer(v2.ReplyTimeout.String()) + } if v2.Conns != nil { conns := make([]*RemoteHostJson, len(v2.Conns)) dft := getDftRemHstCfg() @@ -450,6 +465,7 @@ func equalsRPCConn(v1, v2 *RPCConn) bool { (v1 != nil && v2 != nil && v1.Strategy == v2.Strategy && v1.PoolSize == v2.PoolSize && + v1.ReplyTimeout == v2.ReplyTimeout && equalsRemoteHosts(v1.Conns, v2.Conns)) } diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 043e03c73..a4cbcb9a2 100755 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -65,23 +65,26 @@ // "rpc_conns": { -// //"*localhost": { -// //"conns": [ -// //{ -// //"address": "127.0.0.1:2012", -// //"transport":"*json", -// //"connect_attempts": 5, -// //"reconnects": -1, -// //"max_reconnect_interval": "" -// //"connect_timeout":"1s", -// //"reply_timeout":"2s", -// //"tls":false, -// //"client_key":"", -// //"client_certificate":"", -// //"ca_certificate":"" -// //} -// //], -// //}, +// // "*localhost": { +// // "strategy": "*first" +// // "poolsize": 0, +// // "reply_timeout": "2s", +// // "conns": [ +// // { +// // "address": "127.0.0.1:2012", +// // "transport":"*json", +// // "connect_attempts": 5, +// // "reconnects": -1, +// // "max_reconnect_interval": "" +// // "connect_timeout":"1s", +// // "reply_timeout":"2s", +// // "tls":false, +// // "client_key":"", +// // "client_certificate":"", +// // "ca_certificate":"" +// // } +// // ], +// // }, // }, // rpc connections definitions @@ -203,51 +206,52 @@ // "caches":{ // "partitions": { -// "*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control resource profiles caching -// "*resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control resources caching -// "*event_resources": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // matching resources to events -// "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // statqueue profiles -// "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // statqueues with metrics -// "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control threshold profiles caching -// "*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control thresholds caching -// "*filters": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control filters caching -// "*route_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control route profile caching -// "*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control attribute profile caching -// "*charger_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control charger profile caching -// "*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control dispatcher profile caching -// "*dispatcher_hosts": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control dispatcher hosts caching -// "*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control rate profile caching -// "*action_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control action profile caching -// "*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control account profile caching -// "*resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control resource filter indexes caching -// "*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control stat filter indexes caching -// "*threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control threshold filter indexes caching -// "*route_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control route filter indexes caching -// "*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control attribute filter indexes caching -// "*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control charger filter indexes caching -// "*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher filter indexes caching -// "*rate_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control rate profile filter indexes caching -// "*rate_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control rate filter indexes caching -// "*action_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control action profile filter indexes caching -// "*account_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control coount profile filter indexes caching -// "*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control reverse filter indexes caching used only for set and remove filters -// "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher routes caching -// "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load( in case of *ratio ConnParams is present) -// "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher interface -// "*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // diameter messages caching -// "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "replicate": false}, // RPC responses caching -// "*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false, "replicate": false}, // closed sessions cached for CDRs -// "*event_charges": {"limit": 0, "ttl": "10s", "static_ttl": false, "replicate": false}, // events proccessed by ChargerS -// "*cdr_ids": {"limit": -1, "ttl": "10m", "static_ttl": false, "replicate": false}, // protects CDRs against double-charging -// "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control the load_ids for items -// "*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // RPC connections caching -// "*uch": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // User cache -// "*stir": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // stirShaken cache keys -// "*apiban":{"limit": -1, "ttl": "2m", "static_ttl": false, "replicate": false}, -// "*caps_events": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // caps cached samples -// "*replication_hosts": {"limit": 0, "ttl": "", "static_ttl": false, "replicate": false}, // the replication hosts cache(used when replication_filtered is enbled) +// "*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control resource profiles caching +// "*resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control resources caching +// "*event_resources": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // matching resources to events +// "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // statqueue profiles +// "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // statqueues with metrics +// "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control threshold profiles caching +// "*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control thresholds caching +// "*filters": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control filters caching +// "*route_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control route profile caching +// "*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control attribute profile caching +// "*charger_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control charger profile caching +// "*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control dispatcher profile caching +// "*dispatcher_hosts": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control dispatcher hosts caching +// "*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control rate profile caching +// "*action_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control action profile caching +// "*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control account profile caching +// "*resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control resource filter indexes caching +// "*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control stat filter indexes caching +// "*threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control threshold filter indexes caching +// "*route_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control route filter indexes caching +// "*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control attribute filter indexes caching +// "*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control charger filter indexes caching +// "*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control dispatcher filter indexes caching +// "*rate_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control rate profile filter indexes caching +// "*rate_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control rate filter indexes caching +// "*action_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control action profile filter indexes caching +// "*account_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control coount profile filter indexes caching +// "*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control reverse filter indexes caching used only for set and remove filters +// "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control dispatcher routes caching +// "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control dispatcher load( in case of *ratio ConnParams is present) +// "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // control dispatcher interface +// "*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "remote":false, "replicate": false}, // diameter messages caching +// "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "remote":false, "replicate": false}, // RPC responses caching +// "*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false, "remote":false, "replicate": false}, // closed sessions cached for CDRs +// "*event_charges": {"limit": 0, "ttl": "10s", "static_ttl": false, "remote":false, "replicate": false}, // events proccessed by ChargerS +// "*cdr_ids": {"limit": -1, "ttl": "10m", "static_ttl": false, "remote":false, "replicate": false}, // protects CDRs against double-charging +// "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "remote":false, "replicate": false}, // control the load_ids for items +// "*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // RPC connections caching +// "*uch": {"limit": -1, "ttl": "3h", "static_ttl": false, "remote":false, "replicate": false}, // User cache +// "*stir": {"limit": -1, "ttl": "3h", "static_ttl": false, "remote":false, "replicate": false}, // stirShaken cache keys +// "*apiban":{"limit": -1, "ttl": "2m", "static_ttl": false, "remote":false, "replicate": false}, +// "*caps_events": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // caps cached samples +// "*replication_hosts": {"limit": 0, "ttl": "", "static_ttl": false, "remote":false, "replicate": false}, // the replication hosts cache(used when replication_filtered is enbled) // }, // "replication_conns": [], +// "remote_conns": [], // the conns that are queried when the items are not found in cache // }, @@ -373,8 +377,14 @@ // // "kafkaTopic": "cgrates", // the topic from were the events are read // // "kafkaGroupID": "cgrates", // the group that reads the events // // "kafkaMaxWait": "1ms", // the maximum amount of time to wait for new data to come +// // "kafkaTLS": false, // if set to true it will try to authenticate the server +// // "kafkaCAPath": "", +// // "kafkaSkipTLSVerify": false, -// // "kafkaTopicProcessed": "", the topic were the events are sent after they are processed +// // "kafkaTopicProcessed": "", //the topic were the events are sent after they are processed +// // "kafkaTLSProcessed": false, +// // "kafkaCAPathProcessed": "", +// // "kafkaSkipTLSVerifyProcessed": false, // // SQL // // "sqlDBName": "cgrates", // the name of the database from were the events are read @@ -503,6 +513,7 @@ // // Kafka // // "kafkaTopic": "cgrates", // the topic from where the events are exported +// // "kafkaTLS": false, // if set to true it will try to authenticate the server // // "kafkaCAPath": "", // path to certificate authority pem file // // "kafkaSkipTLSVerify": false, // if set to true it will skip certificate verification diff --git a/engine/connmanager.go b/engine/connmanager.go index b68e4ccfa..bdd87dfb1 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -124,7 +124,8 @@ func (cM *ConnManager) getConnWithConfig(ctx *context.Context, connID string, co cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().MaxReconnectInterval, cM.cfg.GeneralCfg().ConnectTimeout, - cM.cfg.GeneralCfg().ReplyTimeout, connCfg.Conns, intChan, false, ctx.Client, + utils.FirstDurationNonEmpty(connCfg.ReplyTimeout, cM.cfg.GeneralCfg().ReplyTimeout), + connCfg.Conns, intChan, false, ctx.Client, connID, cM.connCache); err != nil { return } @@ -178,8 +179,9 @@ func (cM *ConnManager) CallWithConnIDs(connIDs []string, ctx *context.Context, s // recreate the config with only conns that are needed connCfg := cM.cfg.RPCConns()[connID] newCfg := &config.RPCConn{ - Strategy: connCfg.Strategy, - PoolSize: connCfg.PoolSize, + Strategy: connCfg.Strategy, + PoolSize: connCfg.PoolSize, + ReplyTimeout: connCfg.ReplyTimeout, // alloc for all connection in order to not increase the size later Conns: make([]*config.RemoteHost, 0, len(connCfg.Conns)), }