diff --git a/engine/action.go b/engine/action.go index bf6fbaccc..44e867e60 100644 --- a/engine/action.go +++ b/engine/action.go @@ -422,7 +422,8 @@ func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error } cfg := config.CgrConfig() fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID())) - _, _, err = utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, config.CgrConfig().HttpPosterAttempts, fallbackPath, false) + _, err = utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify, + config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn, config.CgrConfig().HttpPosterAttempts, fallbackPath) return err } @@ -441,7 +442,8 @@ func callUrlAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) } cfg := config.CgrConfig() fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID())) - go utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, config.CgrConfig().HttpPosterAttempts, fallbackPath, false) + go utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify, + config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn, config.CgrConfig().HttpPosterAttempts, fallbackPath) return nil } diff --git a/engine/cdrs.go b/engine/cdrs.go index 4c31281b3..f446ad326 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -86,7 +86,8 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dataDB AccountingS stats = nil } return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dataDB: dataDB, - rals: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil + rals: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian, + httpPoster: utils.NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil } type CdrServer struct { @@ -100,6 +101,7 @@ type CdrServer struct { stats rpcclient.RpcClientConnection guard *GuardianLock responseCache *cache.ResponseCache + httpPoster *utils.HTTPPoster // used for replication } func (self *CdrServer) Timezone() string { @@ -477,10 +479,7 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { fallbackPath := path.Join( self.cgrCfg.HttpFailedDir, rplCfg.FallbackFileName()) - _, _, err := utils.HttpPoster( - rplCfg.Address, self.cgrCfg.HttpSkipTlsVerify, body, - content, rplCfg.Attempts, fallbackPath, false) // ToDo: Review caching here after we are sure that the connection leak is gone - if err != nil { + if _, err := self.httpPoster.Post(rplCfg.Address, content, body, rplCfg.Attempts, fallbackPath); err != nil { utils.Logger.Err(fmt.Sprintf( " Replicating CDR: %+v, got error: %s", cdr, err.Error())) if rplCfg.Synchronous { diff --git a/utils/httpclient.go b/utils/httpclient.go index 54729477e..c860a7bf8 100644 --- a/utils/httpclient.go +++ b/utils/httpclient.go @@ -68,11 +68,22 @@ func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error return respBody, nil } +func NewHTTPPoster(skipTLSVerify bool, replyTimeout time.Duration) *HTTPPoster { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify}, + } + return &HTTPPoster{httpClient: &http.Client{Transport: tr, Timeout: replyTimeout}} +} + +type HTTPPoster struct { + httpClient *http.Client +} + // Post with built-in failover // Returns also reference towards client so we can close it's connections when done -func HttpPoster(addr string, skipTlsVerify bool, content interface{}, contentType string, attempts int, fallbackFilePath string, cacheIdleConns bool) ([]byte, *http.Client, error) { +func (poster *HTTPPoster) Post(addr string, contentType string, content interface{}, attempts int, fallbackFilePath string) ([]byte, error) { if !IsSliceMember([]string{CONTENT_JSON, CONTENT_FORM, CONTENT_TEXT}, contentType) { - return nil, nil, fmt.Errorf("Unsupported ContentType: %s", contentType) + return nil, fmt.Errorf("unsupported ContentType: %s", contentType) } var body []byte // Used to write in file and send over http var urlVals url.Values // Used when posting form @@ -82,13 +93,6 @@ func HttpPoster(addr string, skipTlsVerify bool, content interface{}, contentTyp urlVals = content.(url.Values) body = []byte(urlVals.Encode()) } - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTlsVerify}, - } - if !cacheIdleConns { - tr.DisableKeepAlives = true - } - client := &http.Client{Transport: tr} delay := Fib() bodyType := "application/x-www-form-urlencoded" if contentType == CONTENT_JSON { @@ -98,37 +102,37 @@ func HttpPoster(addr string, skipTlsVerify bool, content interface{}, contentTyp for i := 0; i < attempts; i++ { var resp *http.Response if IsSliceMember([]string{CONTENT_JSON, CONTENT_TEXT}, contentType) { - resp, err = client.Post(addr, bodyType, bytes.NewBuffer(body)) + resp, err = poster.httpClient.Post(addr, bodyType, bytes.NewBuffer(body)) } else if contentType == CONTENT_FORM { - resp, err = client.PostForm(addr, urlVals) + resp, err = poster.httpClient.PostForm(addr, urlVals) } if err != nil { - Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) + Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) time.Sleep(delay()) continue } defer resp.Body.Close() respBody, err := ioutil.ReadAll(resp.Body) if err != nil { - Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) + Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) time.Sleep(delay()) continue } if resp.StatusCode > 299 { - Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode)) + Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode)) time.Sleep(delay()) continue } - return respBody, client, nil + return respBody, nil } // If we got that far, post was not possible, write it on disk fileOut, err := os.Create(fallbackFilePath) if err != nil { - return nil, client, err + return nil, err } defer fileOut.Close() if _, err := fileOut.Write(body); err != nil { - return nil, client, err + return nil, err } - return nil, client, nil + return nil, nil } diff --git a/utils/httpclient_local_test.go b/utils/httpclient_local_test.go index 41b8fe0c2..6f67619f8 100644 --- a/utils/httpclient_local_test.go +++ b/utils/httpclient_local_test.go @@ -24,6 +24,7 @@ import ( "os" "reflect" "testing" + "time" ) var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args @@ -40,7 +41,7 @@ func TestHttpJsonPoster(t *testing.T) { content := &TestContent{Var1: "Val1", Var2: "Val2"} jsn, _ := json.Marshal(content) filePath := "/tmp/cgr_test_http_poster.json" - if _, _, err := HttpPoster("http://localhost:8080/invalid", true, jsn, CONTENT_JSON, 3, filePath, false); err != nil { + if _, err := NewHTTPPoster(true, time.Duration(2*time.Second)).Post("http://localhost:8080/invalid", CONTENT_JSON, jsn, 3, filePath); err != nil { t.Error(err) } if readBytes, err := ioutil.ReadFile(filePath); err != nil { @@ -61,7 +62,7 @@ func TestHttpBytesPoster(t *testing.T) { Test2 `) filePath := "/tmp/test_http_poster.http" - if _, _, err := HttpPoster("http://localhost:8080/invalid", true, content, CONTENT_TEXT, 3, filePath, false); err != nil { + if _, err := NewHTTPPoster(true, time.Duration(2*time.Second)).Post("http://localhost:8080/invalid", CONTENT_TEXT, content, 3, filePath); err != nil { t.Error(err) } if readBytes, err := ioutil.ReadFile(filePath); err != nil {