From 616f7b283c7b43e28d79d373f2bd4df5b6fe0f01 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 16 Oct 2020 11:41:07 +0300 Subject: [PATCH] Added GlobalVarS as service to manage the reload for the global variables --- cmd/cgr-engine/cgr-engine.go | 7 +- config/config.go | 1 + config/config_defaults.go | 28 ++--- config/httpcfg.go | 123 +------------------- config/httpcfg_test.go | 119 +------------------- data/conf/cgrates/cgrates.json | 39 ++++--- ees/httpjsonmap.go | 7 +- ees/httppost.go | 7 +- engine/account_test.go | 3 - engine/action.go | 9 +- engine/calldesc.go | 32 ------ engine/cdr.go | 13 +-- engine/cdr_test.go | 43 +++---- engine/dispatcherprfl.go | 5 +- engine/globalvars.go | 179 ++++++++++++++++++++++++++++++ engine/globalvars_test.go | 172 ++++++++++++++++++++++++++++ engine/libcdre.go | 3 +- engine/pstr_http.go | 10 +- engine/suretax.go | 2 +- engine/tpexporter.go | 4 - engine/z_cdr_it_test.go | 3 +- engine/z_poster_it_test.go | 4 +- ers/partial_csv.go | 4 +- general_tests/cacherpl_it_test.go | 3 +- services/globalvars.go | 82 ++++++++++++++ services/globalvars_test.go | 56 ++++++++++ servmanager/servmanager.go | 2 + utils/consts.go | 29 ++--- 28 files changed, 599 insertions(+), 390 deletions(-) create mode 100644 engine/globalvars.go create mode 100644 engine/globalvars_test.go create mode 100644 services/globalvars.go create mode 100644 services/globalvars_test.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c98688add..6d932a18f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -484,7 +484,10 @@ func main() { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatcherh): internalDispatcherHChan, }) - + gvService := services.NewGlobalVarS(cfg) + if err = gvService.Start(); err != nil { + return + } dmService := services.NewDataDBService(cfg, connManager) if dmService.ShouldRun() { // Some services can run without db, ie: ERs if err = dmService.Start(); err != nil { @@ -566,7 +569,7 @@ func main() { anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan) - srvManager.AddServices(attrS, chrS, tS, stS, reS, routeS, schS, rals, + srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals, rals.GetResponder(), apiSv1, apiSv2, cdrS, smg, services.NewEventReaderService(cfg, filterSChan, exitChan, connManager), services.NewDNSAgent(cfg, filterSChan, exitChan, connManager), diff --git a/config/config.go b/config/config.go index ddc23b04b..da728a8b6 100755 --- a/config/config.go +++ b/config/config.go @@ -1569,6 +1569,7 @@ func (cfg *CGRConfig) reloadSections(sections ...string) (err error) { case TlsCfgJson: // nothing to reload case APIBanCfgJson: // nothing to reload case HTTP_JSN: + cfg.rldChans[HTTP_JSN] <- struct{}{} case SCHEDULER_JSN: cfg.rldChans[SCHEDULER_JSN] <- struct{}{} case RALS_JSN: diff --git a/config/config_defaults.go b/config/config_defaults.go index ef881b876..f35410ce3 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -189,22 +189,22 @@ const CGRATES_CFG_JSON = ` "use_basic_auth": false, // use basic authentication "auth_users": {}, // basic authentication usernames and base64-encoded passwords (eg: { "username1": "cGFzc3dvcmQ=", "username2": "cGFzc3dvcmQy "}) "client_opts":{ - "skip_tls_verify": false, // if enabled Http Client will accept any TLS certificate + "skipTlsVerify": false, // if enabled Http Client will accept any TLS certificate // the options to configure the http.Transport - "tls_handshake_timeout": "10s", - "disable_keep_alives": false, - "disable_compression": false, - "max_idle_conns": 100, - "max_idle_conns_per_host": 2, - "max_conns_per_host": 0, - "idle_conn_timeout": "90s", - "response_header_timeout": "0", - "expect_continue_timeout": "0", - "force_attempt_http2": true, + "tlsHandshakeTimeout": "10s", + "disableKeepAlives": false, + "disableCompression": false, + "maxIdleConns": 100, + "maxIdleConnsPerHost": 2, + "maxConnsPerHost": 0, + "idleConnTimeout": "90s", + "responseHeaderTimeout": "0", + "expectContinueTimeout": "0", + "forceAttemptHttp2": true, // the optins to configure the net.Dialer - "dial_timeout": "30s", - "dial_fallback_delay": "300ms", - "dial_keep_alive": "30s", + "dialTimeout": "30s", + "dialFallbackDelay": "300ms", + "dialKeepAlive": "30s", }, }, diff --git a/config/httpcfg.go b/config/httpcfg.go index 6da2a086c..8caec6dd7 100644 --- a/config/httpcfg.go +++ b/config/httpcfg.go @@ -19,11 +19,6 @@ along with this program. If not, see package config import ( - "crypto/tls" - "net" - "net/http" - "time" - "github.com/cgrates/cgrates/utils" ) @@ -37,7 +32,6 @@ type HTTPCfg struct { HTTPUseBasicAuth bool // Use basic auth for HTTP API HTTPAuthUsers map[string]string // Basic auth user:password map (base64 passwords) ClientOpts map[string]interface{} - transport *http.Transport } // loadFromJsonCfg loads Database config from JsonCfg @@ -71,7 +65,7 @@ func (httpcfg *HTTPCfg) loadFromJsonCfg(jsnHttpCfg *HTTPJsonCfg) (err error) { httpcfg.ClientOpts[k] = v } } - return httpcfg.initTransport() + return nil } func (httpcfg *HTTPCfg) AsMapInterface() (initialMP map[string]interface{}) { @@ -87,118 +81,3 @@ func (httpcfg *HTTPCfg) AsMapInterface() (initialMP map[string]interface{}) { } return } - -func (httpcfg *HTTPCfg) initTransport() (err error) { - trsp := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - } - dial := &net.Dialer{ - DualStack: true, - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientTLSClientConfigCfg]; has { - var skipTLSVerify bool - if skipTLSVerify, err = utils.IfaceAsBool(val); err != nil { - return - } - trsp.TLSClientConfig = &tls.Config{InsecureSkipVerify: skipTLSVerify} - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientTLSHandshakeTimeoutCfg]; has { - var tlsHndTimeout time.Duration - if tlsHndTimeout, err = utils.IfaceAsDuration(val); err != nil { - return - } - trsp.TLSHandshakeTimeout = tlsHndTimeout - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientDisableKeepAlivesCfg]; has { - var disKeepAlives bool - if disKeepAlives, err = utils.IfaceAsBool(val); err != nil { - return - } - trsp.DisableKeepAlives = disKeepAlives - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientDisableCompressionCfg]; has { - var disCmp bool - if disCmp, err = utils.IfaceAsBool(val); err != nil { - return - } - trsp.DisableCompression = disCmp - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientMaxIdleConnsCfg]; has { - var maxIdleConns int64 - if maxIdleConns, err = utils.IfaceAsTInt64(val); err != nil { - return - } - trsp.MaxIdleConns = int(maxIdleConns) - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientMaxIdleConnsPerHostCfg]; has { - var maxIdleConns int64 - if maxIdleConns, err = utils.IfaceAsTInt64(val); err != nil { - return - } - trsp.MaxIdleConnsPerHost = int(maxIdleConns) - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientMaxConnsPerHostCfg]; has { - var maxConns int64 - if maxConns, err = utils.IfaceAsTInt64(val); err != nil { - return - } - trsp.MaxConnsPerHost = int(maxConns) - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientIdleConnTimeoutCfg]; has { - var idleTimeout time.Duration - if idleTimeout, err = utils.IfaceAsDuration(val); err != nil { - return - } - trsp.IdleConnTimeout = idleTimeout - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientResponseHeaderTimeoutCfg]; has { - var responseTimeout time.Duration - if responseTimeout, err = utils.IfaceAsDuration(val); err != nil { - return - } - trsp.ResponseHeaderTimeout = responseTimeout - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientExpectContinueTimeoutCfg]; has { - var continueTimeout time.Duration - if continueTimeout, err = utils.IfaceAsDuration(val); err != nil { - return - } - trsp.ExpectContinueTimeout = continueTimeout - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientForceAttemptHTTP2Cfg]; has { - var forceHTTP2 bool - if forceHTTP2, err = utils.IfaceAsBool(val); err != nil { - return - } - trsp.ForceAttemptHTTP2 = forceHTTP2 - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientDialTimeoutCfg]; has { - var timeout time.Duration - if timeout, err = utils.IfaceAsDuration(val); err != nil { - return - } - dial.Timeout = timeout - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientDialFallbackDelayCfg]; has { - var fallDelay time.Duration - if fallDelay, err = utils.IfaceAsDuration(val); err != nil { - return - } - dial.FallbackDelay = fallDelay - } - if val, has := httpcfg.ClientOpts[utils.HTTPClientDialKeepAliveCfg]; has { - var keepAlive time.Duration - if keepAlive, err = utils.IfaceAsDuration(val); err != nil { - return - } - dial.KeepAlive = keepAlive - } - trsp.DialContext = dial.DialContext - httpcfg.transport = trsp - return -} - -// GetDefaultHTTPTransort returns the transport initialized when the config was loaded -func (httpcfg *HTTPCfg) GetDefaultHTTPTransort() *http.Transport { - return httpcfg.transport -} diff --git a/config/httpcfg_test.go b/config/httpcfg_test.go index 1a73e5380..77b9be70c 100644 --- a/config/httpcfg_test.go +++ b/config/httpcfg_test.go @@ -18,11 +18,8 @@ along with this program. If not, see package config import ( - "crypto/tls" - "net/http" "reflect" "testing" - "time" "github.com/cgrates/cgrates/utils" ) @@ -66,7 +63,7 @@ func TestHTTPCfgloadFromJsonCfg(t *testing.T) { t.Error(err) } else if err = cfgJsn.httpCfg.loadFromJsonCfg(cfgJSONStr); err != nil { t.Error(err) - } else if cfgJsn.httpCfg.transport = nil; !reflect.DeepEqual(expected, cfgJsn.httpCfg) { + } else if !reflect.DeepEqual(expected, cfgJsn.httpCfg) { t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(expected), utils.ToJSON(cfgJsn.httpCfg)) } } @@ -150,117 +147,3 @@ func TestHTTPCfgAsMapInterface1(t *testing.T) { t.Errorf("Expected %+v, received %+v", eMap, rcv) } } - -func TestHTTPCfgGetDefaultHTTPTransort(t *testing.T) { - httpCfg := new(HTTPCfg) - if rply := httpCfg.GetDefaultHTTPTransort(); rply != nil { - t.Errorf("Expected %+v, received %+v", nil, rply) - } -} - -func TestHTTPCfgInitTransport(t *testing.T) { - httpCfg := &HTTPCfg{ - ClientOpts: map[string]interface{}{ - utils.HTTPClientTLSClientConfigCfg: false, - utils.HTTPClientTLSHandshakeTimeoutCfg: "10s", - utils.HTTPClientDisableKeepAlivesCfg: false, - utils.HTTPClientDisableCompressionCfg: false, - utils.HTTPClientMaxIdleConnsCfg: 100., - utils.HTTPClientMaxIdleConnsPerHostCfg: 2., - utils.HTTPClientMaxConnsPerHostCfg: 0., - utils.HTTPClientIdleConnTimeoutCfg: "90s", - utils.HTTPClientResponseHeaderTimeoutCfg: "0", - utils.HTTPClientExpectContinueTimeoutCfg: "0", - utils.HTTPClientForceAttemptHTTP2Cfg: true, - utils.HTTPClientDialTimeoutCfg: "30s", - utils.HTTPClientDialFallbackDelayCfg: "300ms", - utils.HTTPClientDialKeepAliveCfg: "30s", - }, - } - // the dial options are not included - checkTransport := func(t1, t2 *http.Transport) bool { - return t1 != nil && t2 != nil && - t1.TLSClientConfig.InsecureSkipVerify == t2.TLSClientConfig.InsecureSkipVerify && - t1.TLSHandshakeTimeout == t2.TLSHandshakeTimeout && - t1.DisableKeepAlives == t2.DisableKeepAlives && - t1.DisableCompression == t2.DisableCompression && - t1.MaxIdleConns == t2.MaxIdleConns && - t1.MaxIdleConnsPerHost == t2.MaxIdleConnsPerHost && - t1.MaxConnsPerHost == t2.MaxConnsPerHost && - t1.IdleConnTimeout == t2.IdleConnTimeout && - t1.ResponseHeaderTimeout == t2.ResponseHeaderTimeout && - t1.ExpectContinueTimeout == t2.ExpectContinueTimeout && - t1.ForceAttemptHTTP2 == t2.ForceAttemptHTTP2 - } - expTransport := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: false}, - TLSHandshakeTimeout: 10 * time.Second, - MaxIdleConns: 100, - MaxIdleConnsPerHost: 2, - MaxConnsPerHost: 0, - IdleConnTimeout: 90 * time.Second, - ForceAttemptHTTP2: true, - } - if err := httpCfg.initTransport(); err != nil { - t.Fatal(err) - } else if !checkTransport(expTransport, httpCfg.GetDefaultHTTPTransort()) { - t.Errorf("Expected %+v, received %+v", expTransport, httpCfg.GetDefaultHTTPTransort()) - } - - httpCfg.ClientOpts[utils.HTTPClientDialKeepAliveCfg] = "30as" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientDialFallbackDelayCfg] = "300ams" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientDialTimeoutCfg] = "30as" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientForceAttemptHTTP2Cfg] = "string" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientExpectContinueTimeoutCfg] = "0a" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientResponseHeaderTimeoutCfg] = "0a" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientIdleConnTimeoutCfg] = "90as" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientMaxConnsPerHostCfg] = "not a number" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientMaxIdleConnsPerHostCfg] = "not a number" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientMaxIdleConnsCfg] = "not a number" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientDisableCompressionCfg] = "string" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientDisableKeepAlivesCfg] = "string" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientTLSHandshakeTimeoutCfg] = "10as" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } - httpCfg.ClientOpts[utils.HTTPClientTLSClientConfigCfg] = "string" - if err := httpCfg.initTransport(); err == nil { - t.Error("Expected error but the transport was builded succesfully") - } -} diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 6a632149c..23dcc2394 100755 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -85,7 +85,10 @@ // "redis_cluster_sync": "5s", // the sync interval for the redis cluster // "redis_cluster_ondown_delay": "0", // the delay before executing the commands if the redis cluster is in the CLUSTERDOWN state // "query_timeout":"10s", -// "redis_tls": false, // if true it will use a tls connection and use the client certificate, key and ca_certificate for tls connection +// "redis_tls": false, // if true it will use a tls connection and use the redis_client_certificate certificate, redis_client_key and redis_ca_certificate for tls connection +// "redis_client_certificate":"", // path to client certificate +// "redis_client_key":"", // path to client key +// "redis_ca_certificate":"", // path to CA certificate (populate for self-signed certificate otherwise let it empty) // } // }, @@ -165,22 +168,22 @@ // "use_basic_auth": false, // use basic authentication // "auth_users": {}, // basic authentication usernames and base64-encoded passwords (eg: { "username1": "cGFzc3dvcmQ=", "username2": "cGFzc3dvcmQy "}) // "client_opts":{ -// "skip_tls_verify": false, // if enabled Http Client will accept any TLS certificate +// "skipTlsVerify": false, // if enabled Http Client will accept any TLS certificate // // the options to configure the http.Transport -// "tls_handshake_timeout": "10s", -// "disable_keep_alives": false, -// "disable_compression": false, -// "max_idle_conns": 100, -// "max_idle_conns_per_host": 2, -// "max_conns_per_host": 0, -// "idle_conn_timeout": "90s", -// "response_header_timeout": "0", -// "expect_continue_timeout": "0", -// "force_attempt_http2": true, +// "tlsHandshakeTimeout": "10s", +// "disableKeepAlives": false, +// "disableCompression": false, +// "maxIdleConns": 100, +// "maxIdleConnsPerHost": 2, +// "maxConnsPerHost": 0, +// "idleConnTimeout": "90s", +// "responseHeaderTimeout": "0", +// "expectContinueTimeout": "0", +// "forceAttemptHttp2": true, // // the optins to configure the net.Dialer -// "dial_timeout": "30s", -// "dial_fallback_delay": "300ms", -// "dial_keep_alive": "30s", +// "dialTimeout": "30s", +// "dialFallbackDelay": "300ms", +// "dialKeepAlive": "30s", // }, // }, @@ -869,7 +872,11 @@ // "redis_sentinel": "", // "redis_cluster": false, // "redis_cluster_sync": "5s", -// "redis_cluster_ondown_delay": "0", +// "redis_cluster_ondown_delay": "0", +// "redis_tls": false, // if true it will use a tls connection and use the redis_client_certificate, redis_client_key and redis_ca_certificate for tls connection +// "redis_client_certificate":"", // path to client certificate +// "redis_client_key":"", // path to client key +// "redis_ca_certificate":"", // path to CA certificate (populate for self-signed certificate otherwise let it empty) // }, // "out_stordb_opts":{}, // }, diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index da0829520..685f8aa24 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -40,9 +40,10 @@ func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Fi } switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type { case utils.MetaHTTPjsonMap: - pstrJSON.poster, err = engine.NewHTTPPoster(cgrCfg.HTTPCfg().GetDefaultHTTPTransort(), - cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, - utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts) + pstrJSON.poster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().ReplyTimeout, + cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, + utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], + cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts) case utils.MetaAMQPjsonMap: pstrJSON.poster = engine.NewAMQPPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts) diff --git a/ees/httppost.go b/ees/httppost.go index 8312b7cbc..ad13ea5eb 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -33,9 +33,10 @@ func NewHTTPPostEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS dc utils.MapStorage) (httpPost *HTTPPost, err error) { httpPost = &HTTPPost{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} - httpPost.httpPoster, err = engine.NewHTTPPoster(cgrCfg.HTTPCfg().GetDefaultHTTPTransort(), - cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, - utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts) + httpPost.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().ReplyTimeout, + cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, + utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], + cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts) return } diff --git a/engine/account_test.go b/engine/account_test.go index 3e9f35958..b6c3eef2a 100644 --- a/engine/account_test.go +++ b/engine/account_test.go @@ -162,9 +162,6 @@ func TestGetSpecialPricedSeconds(t *testing.T) { } func TestAccountStorageStore(t *testing.T) { - if DB == "mongo" { - return // mongo will have a problem with null and {} so the Equal will not work - } b1 := &Balance{Value: 10, Weight: 10, DestinationIDs: utils.StringMap{"NAT": true}} b2 := &Balance{Value: 100, Weight: 20, DestinationIDs: utils.StringMap{"RET": true}} diff --git a/engine/action.go b/engine/action.go index 7929202b8..c4e4d5c37 100644 --- a/engine/action.go +++ b/engine/action.go @@ -385,8 +385,7 @@ func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error { if err != nil { return err } - pstr, err := NewHTTPPoster(config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), - config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters, + pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters, utils.CONTENT_JSON, config.CgrConfig().GeneralCfg().PosterAttempts) if err != nil { return err @@ -405,8 +404,7 @@ func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) er if err != nil { return err } - pstr, err := NewHTTPPoster(config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), - config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters, + pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters, utils.CONTENT_JSON, config.CgrConfig().GeneralCfg().PosterAttempts) if err != nil { return err @@ -974,8 +972,7 @@ func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error if err != nil { return err } - pstr, err := NewHTTPPoster(config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), - config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters, + pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters, utils.CONTENT_JSON, config.CgrConfig().GeneralCfg().PosterAttempts) if err != nil { return err diff --git a/engine/calldesc.go b/engine/calldesc.go index 3a1672cb5..f9411b772 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -35,42 +35,15 @@ const ( RECURSION_MAX_DEPTH = 3 MIN_PREFIX_MATCH = 1 FALLBACK_SUBJECT = utils.ANY - DB = "map" ) -func init() { - var data DataDB - switch DB { - case "map": - if cgrCfg := config.CgrConfig(); cgrCfg == nil { - cgrCfg, _ = config.NewDefaultCGRConfig() - config.SetCgrConfig(cgrCfg) - } - data = NewInternalDB(nil, nil, true) - } - dm = NewDataManager(data, config.CgrConfig().CacheCfg(), connMgr) -} - var ( - dm *DataManager - cdrStorage CdrStorage debitPeriod = 10 * time.Second globalRoundingDecimals = 6 - connMgr *ConnManager rpSubjectPrefixMatching bool rpSubjectPrefixMatchingMutex sync.RWMutex // used to reload rpSubjectPrefixMatching ) -// SetDataStorage is the exported method to set the storage getter. -func SetDataStorage(dm2 *DataManager) { - dm = dm2 -} - -// SetConnManager is the exported method to set the connectionManager used when operate on an account. -func SetConnManager(conMgr *ConnManager) { - connMgr = conMgr -} - // SetRoundingDecimals sets the global rounding method and decimal precision for GetCost method func SetRoundingDecimals(rd int) { globalRoundingDecimals = rd @@ -91,11 +64,6 @@ func getRpSubjectPrefixMatching() (flag bool) { return } -// SetCdrStorage sets the database for CDR storing, used by *cdrlog in first place -func SetCdrStorage(cStorage CdrStorage) { - cdrStorage = cStorage -} - // NewCallDescriptorFromCGREvent converts a CGREvent into CallDescriptor func NewCallDescriptorFromCGREvent(cgrEv *utils.CGREvent, timezone string) (cd *CallDescriptor, err error) { diff --git a/engine/cdr.go b/engine/cdr.go index 8b819d0aa..7d3104d03 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -22,7 +22,6 @@ import ( "encoding/json" "fmt" "math" - "net/http" "strconv" "strings" "time" @@ -334,8 +333,8 @@ func (cdr *CDR) exportFieldValue(cfgCdrFld *config.FCTemplate, filterS *FilterS) return } -func (cdr *CDR) formatField(cfgFld *config.FCTemplate, pstrTransport *http.Transport, - groupedCDRs []*CDR, filterS *FilterS) (outVal string, err error) { +func (cdr *CDR) formatField(cfgFld *config.FCTemplate, groupedCDRs []*CDR, + filterS *FilterS) (outVal string, err error) { switch cfgFld.Type { case utils.META_FILLER: outVal, err = cfgFld.Value.ParseValue(utils.EmptyString) @@ -364,7 +363,7 @@ func (cdr *CDR) formatField(cfgFld *config.FCTemplate, pstrTransport *http.Trans } if len(httpAddr) == 0 { err = fmt.Errorf("Empty http address for field %s type %s", cfgFld.Tag, cfgFld.Type) - } else if outValByte, err = HTTPPostJSON(httpAddr, pstrTransport, jsn); err == nil { + } else if outValByte, err = HTTPPostJSON(httpAddr, jsn); err == nil { outVal = string(outValByte) if len(outVal) == 0 && cfgFld.Mandatory { err = fmt.Errorf("Empty result for http_post field: %s", cfgFld.Tag) @@ -390,8 +389,8 @@ func (cdr *CDR) formatField(cfgFld *config.FCTemplate, pstrTransport *http.Trans // AsExportRecord is used in place where we need to export the CDR based on an export template // ExportRecord is a []string to keep it compatible with encoding/csv Writer -func (cdr *CDR) AsExportRecord(exportFields []*config.FCTemplate, - pstrTransport *http.Transport, groupedCDRs []*CDR, filterS *FilterS) (expRecord []string, err error) { +func (cdr *CDR) AsExportRecord(exportFields []*config.FCTemplate, groupedCDRs []*CDR, + filterS *FilterS) (expRecord []string, err error) { nM := utils.MapStorage{ utils.MetaReq: cdr.AsMapStringIface(), utils.MetaEC: cdr.CostDetails, @@ -407,7 +406,7 @@ func (cdr *CDR) AsExportRecord(exportFields []*config.FCTemplate, continue } var fmtOut string - if fmtOut, err = cdr.formatField(cfgFld, pstrTransport, groupedCDRs, filterS); err != nil { + if fmtOut, err = cdr.formatField(cfgFld, groupedCDRs, filterS); err != nil { utils.Logger.Warning(fmt.Sprintf(" error: %s exporting field: %s, CDR: %s\n", err.Error(), utils.ToJSON(cfgFld), utils.ToJSON(cdr))) return nil, err diff --git a/engine/cdr_test.go b/engine/cdr_test.go index 6f6d3b763..8ee2fb7cb 100644 --- a/engine/cdr_test.go +++ b/engine/cdr_test.go @@ -641,8 +641,7 @@ func TestCDRAsExportRecord(t *testing.T) { Value: prsr, Timezone: "UTC", } - if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, nil); err != nil { + if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, nil); err != nil { t.Error(err) } else if expRecord[0] != cdr.Destination { t.Errorf("Expecting:\n%s\nReceived:\n%s", cdr.Destination, expRecord) @@ -661,8 +660,7 @@ func TestCDRAsExportRecord(t *testing.T) { MaskLen: 3, } eDst := "+4986517174***" - if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, nil); err != nil { + if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, nil); err != nil { t.Error(err) } else if expRecord[0] != eDst { t.Errorf("Expecting:\n%s\nReceived:\n%s", eDst, expRecord[0]) @@ -675,8 +673,7 @@ func TestCDRAsExportRecord(t *testing.T) { Value: prsr, MaskDestID: "MASKED_DESTINATIONS", } - if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, nil); err != nil { + if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, nil); err != nil { t.Error(err) } else if expRecord[0] != "1" { t.Errorf("Expecting:\n%s\nReceived:\n%s", "1", expRecord[0]) @@ -692,8 +689,7 @@ func TestCDRAsExportRecord(t *testing.T) { Filters: []string{"*string:~*req.Tenant:itsyscom.com"}, Timezone: "UTC", } - if rcrd, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, &FilterS{dm: dmForCDR, cfg: defaultCfg}); err != nil { + if rcrd, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, &FilterS{dm: dmForCDR, cfg: defaultCfg}); err != nil { t.Error(err) } else if len(rcrd) != 0 { t.Error("failed using filter") @@ -710,8 +706,7 @@ func TestCDRAsExportRecord(t *testing.T) { Layout: layout, Timezone: "UTC", } - if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, &FilterS{dm: dmForCDR, cfg: defaultCfg}); err != nil { + if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, &FilterS{dm: dmForCDR, cfg: defaultCfg}); err != nil { t.Error(err) } else if expRecord[0] != "2014-06-11 19:19:00" { t.Error("Expecting: 2014-06-11 19:19:00, got: ", expRecord[0]) @@ -727,8 +722,7 @@ func TestCDRAsExportRecord(t *testing.T) { Layout: layout, Timezone: "UTC", } - if rcrd, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, &FilterS{dm: dmForCDR, cfg: defaultCfg}); err != nil { + if rcrd, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, &FilterS{dm: dmForCDR, cfg: defaultCfg}); err != nil { t.Error(err) } else if len(rcrd) != 0 { t.Error("failed using filter") @@ -743,8 +737,7 @@ func TestCDRAsExportRecord(t *testing.T) { Layout: layout, Timezone: "UTC"} // Test time parse error - if _, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, nil); err == nil { + if _, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, nil); err == nil { t.Error("Should give error here, got none.") } @@ -755,8 +748,7 @@ func TestCDRAsExportRecord(t *testing.T) { Path: "*exp.CGRIDFromCostDetails", Value: prsr, } - if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, nil); err != nil { + if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, nil); err != nil { t.Error(err) } else if expRecord[0] != cdr.CostDetails.CGRID { t.Errorf("Expecting:\n%s\nReceived:\n%s", cdr.CostDetails.CGRID, expRecord) @@ -768,8 +760,7 @@ func TestCDRAsExportRecord(t *testing.T) { Path: "*exp.CustomAccountID", Value: prsr, } - if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, nil); err != nil { + if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, nil); err != nil { t.Error(err) } else if expRecord[0] != cdr.CostDetails.AccountSummary.ID { t.Errorf("Expecting:\n%s\nReceived:\n%s", cdr.CostDetails.AccountSummary.ID, expRecord) @@ -783,8 +774,7 @@ func TestCDRAsExportRecord(t *testing.T) { Path: "*exp.CustomDestinationID", Value: prsr, } - if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, nil); err != nil { + if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, nil); err != nil { t.Error(err) } else if expRecord[0] != expected { t.Errorf("Expecting: <%q>,\n Received: <%q>", expected, expRecord[0]) @@ -798,8 +788,7 @@ func TestCDRAsExportRecord(t *testing.T) { Path: "*exp.CustomDestinationID", Value: prsr, } - if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, nil); err != nil { + if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, nil); err != nil { t.Error(err) } else if expRecord[0] != expected { t.Errorf("Expecting: <%q>,\n Received: <%q>", expected, expRecord[0]) @@ -813,8 +802,7 @@ func TestCDRAsExportRecord(t *testing.T) { Path: "*exp.CustomDestinationID", Value: prsr, } - if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, nil); err != nil { + if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, nil); err != nil { t.Error(err) } else if expRecord[0] != expected { t.Errorf("Expecting: <%q>,\n Received: <%q>", expected, expRecord[0]) @@ -828,8 +816,7 @@ func TestCDRAsExportRecord(t *testing.T) { Path: "*exp.CustomDestinationID", Value: prsr, } - if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, - config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), nil, nil); err != nil { + if expRecord, err := cdr.AsExportRecord([]*config.FCTemplate{cfgCdrFld}, nil, nil); err != nil { t.Error(err) } else if expRecord[0] != expected { t.Errorf("Expecting: <%q>,\n Received: <%q>", expected, expRecord[0]) @@ -887,7 +874,7 @@ func TestCDRAsCDRsql(t *testing.T) { func TestCDRNewCDRFromSQL(t *testing.T) { extraFields := map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"} - cdrSql := &CDRsql{ + cdrSQL := &CDRsql{ ID: 123, Cgrid: "abecd993d06672714c4218a6dcf8278e0589a171", RunID: utils.MetaDefault, @@ -928,7 +915,7 @@ func TestCDRNewCDRFromSQL(t *testing.T) { ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, } - if eCDR, err := NewCDRFromSQL(cdrSql); err != nil { + if eCDR, err := NewCDRFromSQL(cdrSQL); err != nil { t.Error(err) } else if !reflect.DeepEqual(cdr, eCDR) { t.Errorf("Expecting: %+v, received: %+v", cdr, eCDR) diff --git a/engine/dispatcherprfl.go b/engine/dispatcherprfl.go index 318b874b6..ce209763d 100644 --- a/engine/dispatcherprfl.go +++ b/engine/dispatcherprfl.go @@ -150,9 +150,8 @@ func (dH *DispatcherHost) TenantID() string { } // GetRPCConnection builds or returns the cached connection -func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply interface{}) error { +func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply interface{}) (err error) { if dH.rpcConn == nil { - var err error // connect the rpcConn cfg := config.CgrConfig() if dH.rpcConn, err = NewRPCPool( @@ -162,7 +161,7 @@ func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply int cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, dH.Conns, IntRPC.GetInternalChanel(), false); err != nil { - return err + return } } diff --git a/engine/globalvars.go b/engine/globalvars.go new file mode 100644 index 000000000..d2a19f379 --- /dev/null +++ b/engine/globalvars.go @@ -0,0 +1,179 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "crypto/tls" + "net" + "net/http" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +// this file will contain all the global variable that are used by other subsystems + +var ( + httpPstrTransport *http.Transport + dm *DataManager + cdrStorage CdrStorage + connMgr *ConnManager +) + +func init() { + dm = NewDataManager(NewInternalDB(nil, nil, true), config.CgrConfig().CacheCfg(), connMgr) + httpPstrTransport, _ = NewHTTPTransport(config.CgrConfig().HTTPCfg().ClientOpts) +} + +// SetDataStorage is the exported method to set the storage getter. +func SetDataStorage(dm2 *DataManager) { + dm = dm2 +} + +// SetConnManager is the exported method to set the connectionManager used when operate on an account. +func SetConnManager(conMgr *ConnManager) { + connMgr = conMgr +} + +// SetCdrStorage sets the database for CDR storing, used by *cdrlog in first place +func SetCdrStorage(cStorage CdrStorage) { + cdrStorage = cStorage +} + +// SetHTTPPstrTransport sets the http transport to be used by the HTTP Poster +func SetHTTPPstrTransport(pstrTransport *http.Transport) { + httpPstrTransport = pstrTransport +} + +// NewHTTPTransport will create a new transport for HTTP client +func NewHTTPTransport(opts map[string]interface{}) (trsp *http.Transport, err error) { + trsp = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + } + if val, has := opts[utils.HTTPClientTLSClientConfigCfg]; has { + var skipTLSVerify bool + if skipTLSVerify, err = utils.IfaceAsBool(val); err != nil { + return + } + trsp.TLSClientConfig = &tls.Config{InsecureSkipVerify: skipTLSVerify} + } + if val, has := opts[utils.HTTPClientTLSHandshakeTimeoutCfg]; has { + var tlsHndTimeout time.Duration + if tlsHndTimeout, err = utils.IfaceAsDuration(val); err != nil { + return + } + trsp.TLSHandshakeTimeout = tlsHndTimeout + } + if val, has := opts[utils.HTTPClientDisableKeepAlivesCfg]; has { + var disKeepAlives bool + if disKeepAlives, err = utils.IfaceAsBool(val); err != nil { + return + } + trsp.DisableKeepAlives = disKeepAlives + } + if val, has := opts[utils.HTTPClientDisableCompressionCfg]; has { + var disCmp bool + if disCmp, err = utils.IfaceAsBool(val); err != nil { + return + } + trsp.DisableCompression = disCmp + } + if val, has := opts[utils.HTTPClientMaxIdleConnsCfg]; has { + var maxIdleConns int64 + if maxIdleConns, err = utils.IfaceAsTInt64(val); err != nil { + return + } + trsp.MaxIdleConns = int(maxIdleConns) + } + if val, has := opts[utils.HTTPClientMaxIdleConnsPerHostCfg]; has { + var maxIdleConns int64 + if maxIdleConns, err = utils.IfaceAsTInt64(val); err != nil { + return + } + trsp.MaxIdleConnsPerHost = int(maxIdleConns) + } + if val, has := opts[utils.HTTPClientMaxConnsPerHostCfg]; has { + var maxConns int64 + if maxConns, err = utils.IfaceAsTInt64(val); err != nil { + return + } + trsp.MaxConnsPerHost = int(maxConns) + } + if val, has := opts[utils.HTTPClientIdleConnTimeoutCfg]; has { + var idleTimeout time.Duration + if idleTimeout, err = utils.IfaceAsDuration(val); err != nil { + return + } + trsp.IdleConnTimeout = idleTimeout + } + if val, has := opts[utils.HTTPClientResponseHeaderTimeoutCfg]; has { + var responseTimeout time.Duration + if responseTimeout, err = utils.IfaceAsDuration(val); err != nil { + return + } + trsp.ResponseHeaderTimeout = responseTimeout + } + if val, has := opts[utils.HTTPClientExpectContinueTimeoutCfg]; has { + var continueTimeout time.Duration + if continueTimeout, err = utils.IfaceAsDuration(val); err != nil { + return + } + trsp.ExpectContinueTimeout = continueTimeout + } + if val, has := opts[utils.HTTPClientForceAttemptHTTP2Cfg]; has { + var forceHTTP2 bool + if forceHTTP2, err = utils.IfaceAsBool(val); err != nil { + return + } + trsp.ForceAttemptHTTP2 = forceHTTP2 + } + var dial *net.Dialer + if dial, err = newDialer(opts); err != nil { + return + } + trsp.DialContext = dial.DialContext + return +} + +// newDialer returns the objects that creates the DialContext function +func newDialer(opts map[string]interface{}) (dial *net.Dialer, err error) { + dial = &net.Dialer{ + DualStack: true, + } + if val, has := opts[utils.HTTPClientDialTimeoutCfg]; has { + var timeout time.Duration + if timeout, err = utils.IfaceAsDuration(val); err != nil { + return + } + dial.Timeout = timeout + } + if val, has := opts[utils.HTTPClientDialFallbackDelayCfg]; has { + var fallDelay time.Duration + if fallDelay, err = utils.IfaceAsDuration(val); err != nil { + return + } + dial.FallbackDelay = fallDelay + } + if val, has := opts[utils.HTTPClientDialKeepAliveCfg]; has { + var keepAlive time.Duration + if keepAlive, err = utils.IfaceAsDuration(val); err != nil { + return + } + dial.KeepAlive = keepAlive + } + return +} diff --git a/engine/globalvars_test.go b/engine/globalvars_test.go new file mode 100644 index 000000000..98e23291e --- /dev/null +++ b/engine/globalvars_test.go @@ -0,0 +1,172 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +import ( + "crypto/tls" + "net" + "net/http" + "testing" + "time" + + "github.com/cgrates/cgrates/utils" +) + +func TestNewHTTPTransport(t *testing.T) { + opts := map[string]interface{}{ + utils.HTTPClientTLSClientConfigCfg: false, + utils.HTTPClientTLSHandshakeTimeoutCfg: "10s", + utils.HTTPClientDisableKeepAlivesCfg: false, + utils.HTTPClientDisableCompressionCfg: false, + utils.HTTPClientMaxIdleConnsCfg: 100., + utils.HTTPClientMaxIdleConnsPerHostCfg: 2., + utils.HTTPClientMaxConnsPerHostCfg: 0., + utils.HTTPClientIdleConnTimeoutCfg: "90s", + utils.HTTPClientResponseHeaderTimeoutCfg: "0", + utils.HTTPClientExpectContinueTimeoutCfg: "0", + utils.HTTPClientForceAttemptHTTP2Cfg: true, + utils.HTTPClientDialTimeoutCfg: "30s", + utils.HTTPClientDialFallbackDelayCfg: "300ms", + utils.HTTPClientDialKeepAliveCfg: "30s", + } + + expDialer := &net.Dialer{ + DualStack: true, + Timeout: 30 * time.Second, + FallbackDelay: 300 * time.Millisecond, + KeepAlive: 30 * time.Second, + } + if dial, err := newDialer(opts); err != nil { + t.Fatal(err) + } else if !(expDialer != nil && dial != nil && + expDialer.DualStack == dial.DualStack && + expDialer.Timeout == dial.Timeout && + expDialer.FallbackDelay == dial.FallbackDelay && + expDialer.KeepAlive == dial.KeepAlive) { + t.Errorf("Expected %+v, received %+v", expDialer, dial) + } + expTransport := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: false}, + TLSHandshakeTimeout: 10 * time.Second, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 2, + MaxConnsPerHost: 0, + IdleConnTimeout: 90 * time.Second, + ForceAttemptHTTP2: true, + } + if trsp, err := NewHTTPTransport(opts); err != nil { + t.Fatal(err) + } else if !(expTransport != nil && trsp != nil && // the dial options are not included + expTransport.TLSClientConfig.InsecureSkipVerify == trsp.TLSClientConfig.InsecureSkipVerify && + expTransport.TLSHandshakeTimeout == trsp.TLSHandshakeTimeout && + expTransport.DisableKeepAlives == trsp.DisableKeepAlives && + expTransport.DisableCompression == trsp.DisableCompression && + expTransport.MaxIdleConns == trsp.MaxIdleConns && + expTransport.MaxIdleConnsPerHost == trsp.MaxIdleConnsPerHost && + expTransport.MaxConnsPerHost == trsp.MaxConnsPerHost && + expTransport.IdleConnTimeout == trsp.IdleConnTimeout && + expTransport.ResponseHeaderTimeout == trsp.ResponseHeaderTimeout && + expTransport.ExpectContinueTimeout == trsp.ExpectContinueTimeout && + expTransport.ForceAttemptHTTP2 == trsp.ForceAttemptHTTP2) { + t.Errorf("Expected %+v, received %+v", expTransport, trsp) + } + + opts[utils.HTTPClientDialKeepAliveCfg] = "30as" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientDialFallbackDelayCfg] = "300ams" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientDialTimeoutCfg] = "30as" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientForceAttemptHTTP2Cfg] = "string" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientExpectContinueTimeoutCfg] = "0a" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientResponseHeaderTimeoutCfg] = "0a" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientIdleConnTimeoutCfg] = "90as" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientMaxConnsPerHostCfg] = "not a number" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientMaxIdleConnsPerHostCfg] = "not a number" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientMaxIdleConnsCfg] = "not a number" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientDisableCompressionCfg] = "string" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientDisableKeepAlivesCfg] = "string" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientTLSHandshakeTimeoutCfg] = "10as" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } + opts[utils.HTTPClientTLSClientConfigCfg] = "string" + if _, err := NewHTTPTransport(opts); err == nil { + t.Error("Expected error but the transport was builded succesfully") + } +} + +func TestSetHTTPPstrTransport(t *testing.T) { + tmp := httpPstrTransport + SetHTTPPstrTransport(nil) + if httpPstrTransport != nil { + t.Error("Expected the transport to be nil", httpPstrTransport) + } + httpPstrTransport = tmp +} + +func TestSetCdrStorage(t *testing.T) { + tmp := cdrStorage + SetCdrStorage(nil) + if cdrStorage != nil { + t.Error("Expected the cdrStorage to be nil", cdrStorage) + } + cdrStorage = tmp +} + +func TestSetDataStorage(t *testing.T) { + tmp := dm + SetDataStorage(nil) + if dm != nil { + t.Error("Expected the dm to be nil", dm) + } + dm = tmp +} diff --git a/engine/libcdre.go b/engine/libcdre.go index 4a1ed801c..68c86faff 100644 --- a/engine/libcdre.go +++ b/engine/libcdre.go @@ -157,8 +157,7 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export switch expEv.Format { case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.MetaHTTPPost: var pstr *HTTPPoster - pstr, err = NewHTTPPoster(config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), - config.CgrConfig().GeneralCfg().ReplyTimeout, expEv.Path, + pstr, err = NewHTTPPoster(config.CgrConfig().GeneralCfg().ReplyTimeout, expEv.Path, utils.PosterTransportContentTypes[expEv.Format], config.CgrConfig().GeneralCfg().PosterAttempts) if err != nil { diff --git a/engine/pstr_http.go b/engine/pstr_http.go index 5e5df447d..100f68072 100644 --- a/engine/pstr_http.go +++ b/engine/pstr_http.go @@ -30,8 +30,8 @@ import ( ) // HTTPPostJSON posts without automatic failover -func HTTPPostJSON(url string, posterTransport *http.Transport, content []byte) (respBody []byte, err error) { - client := &http.Client{Transport: posterTransport} +func HTTPPostJSON(url string, content []byte) (respBody []byte, err error) { + client := &http.Client{Transport: httpPstrTransport} var resp *http.Response if resp, err = client.Post(url, "application/json", bytes.NewBuffer(content)); err != nil { return @@ -48,13 +48,13 @@ func HTTPPostJSON(url string, posterTransport *http.Transport, content []byte) ( } // NewHTTPPoster return a new HTTP poster -func NewHTTPPoster(posterTransport *http.Transport, replyTimeout time.Duration, - addr, contentType string, attempts int) (httposter *HTTPPoster, err error) { +func NewHTTPPoster(replyTimeout time.Duration, addr, contentType string, + attempts int) (httposter *HTTPPoster, err error) { if !utils.SliceHasMember([]string{utils.CONTENT_FORM, utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) { return nil, fmt.Errorf("unsupported ContentType: %s", contentType) } return &HTTPPoster{ - httpClient: &http.Client{Transport: posterTransport, Timeout: replyTimeout}, + httpClient: &http.Client{Transport: httpPstrTransport, Timeout: replyTimeout}, addr: addr, contentType: contentType, attempts: attempts, diff --git a/engine/suretax.go b/engine/suretax.go index 1600badab..5515bc0f8 100644 --- a/engine/suretax.go +++ b/engine/suretax.go @@ -182,7 +182,7 @@ func SureTaxProcessCdr(cdr *CDR) error { } if sureTaxClient == nil { // First time used, init the client here sureTaxClient = &http.Client{ - Transport: config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), + Transport: httpPstrTransport, } } req, err := NewSureTaxRequest(cdr, stCfg) diff --git a/engine/tpexporter.go b/engine/tpexporter.go index bbd908aca..19c9c439d 100644 --- a/engine/tpexporter.go +++ b/engine/tpexporter.go @@ -32,10 +32,6 @@ import ( "github.com/cgrates/cgrates/utils" ) -// var ( -// TPExportFormats = []string{utils.CSV} -// ) - func NewTPExporter(storDb LoadStorage, tpID, expPath, fileFormat, sep string, compress bool) (*TPExporter, error) { if len(tpID) == 0 { return nil, errors.New("Missing TPid") diff --git a/engine/z_cdr_it_test.go b/engine/z_cdr_it_test.go index 80c0d5ece..b89e16c1d 100644 --- a/engine/z_cdr_it_test.go +++ b/engine/z_cdr_it_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -39,7 +38,7 @@ func TestHttpJsonPost(t *testing.T) { Usage: "0.00000001", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, } jsn, _ := json.Marshal(cdrOut) - if _, err := HTTPPostJSON("http://localhost:8000", config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), jsn); err == nil { + if _, err := HTTPPostJSON("http://localhost:8000", jsn); err == nil { t.Error(err) } } diff --git a/engine/z_poster_it_test.go b/engine/z_poster_it_test.go index c32239830..4d205fbd1 100644 --- a/engine/z_poster_it_test.go +++ b/engine/z_poster_it_test.go @@ -67,7 +67,7 @@ func TestHttpJsonPoster(t *testing.T) { config.CgrConfig().GeneralCfg().FailedPostsDir = "/tmp" content := &TestContent{Var1: "Val1", Var2: "Val2"} jsn, _ := json.Marshal(content) - pstr, err := NewHTTPPoster(config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), time.Duration(2*time.Second), "http://localhost:8080/invalid", utils.CONTENT_JSON, 3) + pstr, err := NewHTTPPoster(time.Duration(2*time.Second), "http://localhost:8080/invalid", utils.CONTENT_JSON, 3) if err != nil { t.Error(err) } @@ -100,7 +100,7 @@ func TestHttpBytesPoster(t *testing.T) { content := []byte(`Test Test2 `) - pstr, err := NewHTTPPoster(config.CgrConfig().HTTPCfg().GetDefaultHTTPTransort(), time.Duration(2*time.Second), "http://localhost:8080/invalid", utils.CONTENT_TEXT, 3) + pstr, err := NewHTTPPoster(time.Duration(2*time.Second), "http://localhost:8080/invalid", utils.CONTENT_TEXT, 3) if err != nil { t.Error(err) } diff --git a/ers/partial_csv.go b/ers/partial_csv.go index 9e0b018f4..65ab31c4a 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -313,7 +313,7 @@ func (rdr *PartialCSVFileER) dumpToFile(itmID string, value interface{}) { utils.ERs, utils.ToJSON(origCgrEvs[0].Event), err.Error())) return } - record, err := cdr.AsExportRecord(rdr.Config().CacheDumpFields, rdr.cgrCfg.HTTPCfg().GetDefaultHTTPTransort(), nil, rdr.fltrS) + record, err := cdr.AsExportRecord(rdr.Config().CacheDumpFields, nil, rdr.fltrS) if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>", @@ -345,7 +345,7 @@ func (rdr *PartialCSVFileER) dumpToFile(itmID string, value interface{}) { utils.ERs, utils.ToJSON(origCgrEv.Event), err.Error())) return } - record, err = cdr.AsExportRecord(rdr.Config().CacheDumpFields, rdr.cgrCfg.HTTPCfg().GetDefaultHTTPTransort(), nil, rdr.fltrS) + record, err = cdr.AsExportRecord(rdr.Config().CacheDumpFields, nil, rdr.fltrS) if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>", diff --git a/general_tests/cacherpl_it_test.go b/general_tests/cacherpl_it_test.go index 7ca31215b..ad3dce0e2 100644 --- a/general_tests/cacherpl_it_test.go +++ b/general_tests/cacherpl_it_test.go @@ -459,13 +459,13 @@ func testCacheRplAACheckLoadReplication(t *testing.T) { t.Error(err) } - var rpl []*engine.ChrgSProcessEventReply var wgDisp1 sync.WaitGroup var wgDisp2 sync.WaitGroup for i := 0; i < 10; i++ { wgDisp1.Add(1) wgDisp2.Add(1) go func() { + var rpl []*engine.ChrgSProcessEventReply if err := dspEngine1RPC.Call(utils.ChargerSv1ProcessEvent, &utils.CGREventWithOpts{ CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", @@ -487,6 +487,7 @@ func testCacheRplAACheckLoadReplication(t *testing.T) { wgDisp1.Done() }() go func() { + var rpl []*engine.ChrgSProcessEventReply if err := dspEngine2RPC.Call(utils.ChargerSv1ProcessEvent, &utils.CGREventWithOpts{ CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", diff --git a/services/globalvars.go b/services/globalvars.go new file mode 100644 index 000000000..d035336cb --- /dev/null +++ b/services/globalvars.go @@ -0,0 +1,82 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "net/http" + + "github.com/cgrates/cgrates/engine" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +// NewGlobalVarS . +func NewGlobalVarS(cfg *config.CGRConfig) servmanager.Service { + return &GlobalVarS{ + cfg: cfg, + } +} + +// GlobalVarS implements Agent interface +type GlobalVarS struct { + cfg *config.CGRConfig +} + +// Start should handle the sercive start +func (gv *GlobalVarS) Start() (err error) { + return gv.initHTTPTransport() +} + +// Reload handles the change of config +func (gv *GlobalVarS) Reload() (err error) { + return gv.initHTTPTransport() +} + +// Shutdown stops the service +func (gv *GlobalVarS) Shutdown() (err error) { + return +} + +// IsRunning returns if the service is running +func (gv *GlobalVarS) IsRunning() bool { + return true +} + +// ServiceName returns the service name +func (gv *GlobalVarS) ServiceName() string { + return utils.GlobalVarS +} + +// ShouldRun returns if the service should be running +func (gv *GlobalVarS) ShouldRun() bool { + return true +} + +func (gv *GlobalVarS) initHTTPTransport() (err error) { + var trsp *http.Transport + if trsp, err = engine.NewHTTPTransport(gv.cfg.HTTPCfg().ClientOpts); err != nil { + utils.Logger.Crit(fmt.Sprintf("Could not configure the http transport: %s exiting!", err)) + return + } + engine.SetHTTPPstrTransport(trsp) + return +} diff --git a/services/globalvars_test.go b/services/globalvars_test.go new file mode 100644 index 000000000..cb4ba287d --- /dev/null +++ b/services/globalvars_test.go @@ -0,0 +1,56 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package services + +import ( + "reflect" + "testing" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +func TestGlobalVarS(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + exp := &GlobalVarS{cfg: cfg} + if gv := NewGlobalVarS(cfg); !reflect.DeepEqual(gv, exp) { + t.Errorf("Expected %+v, received %+v", exp, gv) + } + if exp.ServiceName() != utils.GlobalVarS { + t.Errorf("Unexpected service name %q", exp.ServiceName()) + } + if !exp.ShouldRun() { + t.Errorf("This service should allways run") + } + if err := exp.Start(); err != nil { + t.Fatal(err) + } + if !exp.IsRunning() { + t.Errorf("This service needs to be running") + } + cfg.HTTPCfg().ClientOpts[utils.HTTPClientDialTimeoutCfg] = "30as" + if err := exp.Reload(); err == nil { + t.Error("Expected reload to not be succesfully finished") + } + if err = exp.Shutdown(); err != nil { + t.Fatal(err) + } +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index ad9ce7f6f..dab6f7983 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -251,6 +251,8 @@ func (srvMngr *ServiceManager) handleReload() { go srvMngr.reloadService(utils.SIPAgent) case <-srvMngr.GetConfig().GetReloadChan(config.DispatcherHJson): go srvMngr.reloadService(utils.DispatcherH) + case <-srvMngr.GetConfig().GetReloadChan(config.HTTP_JSN): + go srvMngr.reloadService(utils.GlobalVarS) } // handle RPC server } diff --git a/utils/consts.go b/utils/consts.go index 25a18ecb0..1f415dac0 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -547,6 +547,7 @@ const ( StatS = "Stats" LoadIDsVrs = "LoadIDs" RALService = "RALs" + GlobalVarS = "GlobalVarS" CostSource = "CostSource" ExtraInfo = "ExtraInfo" Meta = "*" @@ -1975,20 +1976,20 @@ const ( HTTPClientOptsCfg = "client_opts" ConfigsURL = "configs_url" - HTTPClientTLSClientConfigCfg = "skip_tls_verify" - HTTPClientTLSHandshakeTimeoutCfg = "tls_handshake_timeout" - HTTPClientDisableKeepAlivesCfg = "disable_keep_alives" - HTTPClientDisableCompressionCfg = "disable_compression" - HTTPClientMaxIdleConnsCfg = "max_idle_conns" - HTTPClientMaxIdleConnsPerHostCfg = "max_idle_conns_per_host" - HTTPClientMaxConnsPerHostCfg = "max_conns_per_host" - HTTPClientIdleConnTimeoutCfg = "idle_conn_timeout" - HTTPClientResponseHeaderTimeoutCfg = "response_header_timeout" - HTTPClientExpectContinueTimeoutCfg = "expect_continue_timeout" - HTTPClientForceAttemptHTTP2Cfg = "force_attempt_http2" - HTTPClientDialTimeoutCfg = "dial_timeout" - HTTPClientDialFallbackDelayCfg = "dial_fallback_delay" - HTTPClientDialKeepAliveCfg = "dial_keep_alive" + HTTPClientTLSClientConfigCfg = "skipTlsVerify" + HTTPClientTLSHandshakeTimeoutCfg = "tlsHandshakeTimeout" + HTTPClientDisableKeepAlivesCfg = "disableKeepAlives" + HTTPClientDisableCompressionCfg = "disableCompression" + HTTPClientMaxIdleConnsCfg = "maxIdleConns" + HTTPClientMaxIdleConnsPerHostCfg = "maxIdleConnsPerHost" + HTTPClientMaxConnsPerHostCfg = "maxConnsPerHost" + HTTPClientIdleConnTimeoutCfg = "idleConnTimeout" + HTTPClientResponseHeaderTimeoutCfg = "responseHeaderTimeout" + HTTPClientExpectContinueTimeoutCfg = "expectContinueTimeout" + HTTPClientForceAttemptHTTP2Cfg = "forceAttemptHttp2" + HTTPClientDialTimeoutCfg = "dialTimeout" + HTTPClientDialFallbackDelayCfg = "dialFallbackDelay" + HTTPClientDialKeepAliveCfg = "dialKeepAlive" ) // FilterSCfg