HTTPPoster with reply timeout and connection caching for CDR replication

This commit is contained in:
DanB
2016-10-13 17:54:24 +02:00
parent 39b601780a
commit 7f44dcba69
4 changed files with 33 additions and 27 deletions

View File

@@ -422,7 +422,8 @@ func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error
}
cfg := config.CgrConfig()
fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID()))
_, _, err = utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, config.CgrConfig().HttpPosterAttempts, fallbackPath, false)
_, err = utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn, config.CgrConfig().HttpPosterAttempts, fallbackPath)
return err
}
@@ -441,7 +442,8 @@ func callUrlAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions)
}
cfg := config.CgrConfig()
fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID()))
go utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, jsn, utils.CONTENT_JSON, config.CgrConfig().HttpPosterAttempts, fallbackPath, false)
go utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn, config.CgrConfig().HttpPosterAttempts, fallbackPath)
return nil
}

View File

@@ -86,7 +86,8 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dataDB AccountingS
stats = nil
}
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dataDB: dataDB,
rals: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil
rals: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian,
httpPoster: utils.NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil
}
type CdrServer struct {
@@ -100,6 +101,7 @@ type CdrServer struct {
stats rpcclient.RpcClientConnection
guard *GuardianLock
responseCache *cache.ResponseCache
httpPoster *utils.HTTPPoster // used for replication
}
func (self *CdrServer) Timezone() string {
@@ -477,10 +479,7 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
fallbackPath := path.Join(
self.cgrCfg.HttpFailedDir,
rplCfg.FallbackFileName())
_, _, err := utils.HttpPoster(
rplCfg.Address, self.cgrCfg.HttpSkipTlsVerify, body,
content, rplCfg.Attempts, fallbackPath, false) // ToDo: Review caching here after we are sure that the connection leak is gone
if err != nil {
if _, err := self.httpPoster.Post(rplCfg.Address, content, body, rplCfg.Attempts, fallbackPath); err != nil {
utils.Logger.Err(fmt.Sprintf(
"<CDRReplicator> Replicating CDR: %+v, got error: %s", cdr, err.Error()))
if rplCfg.Synchronous {

View File

@@ -68,11 +68,22 @@ func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error
return respBody, nil
}
func NewHTTPPoster(skipTLSVerify bool, replyTimeout time.Duration) *HTTPPoster {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify},
}
return &HTTPPoster{httpClient: &http.Client{Transport: tr, Timeout: replyTimeout}}
}
type HTTPPoster struct {
httpClient *http.Client
}
// Post with built-in failover
// Returns also reference towards client so we can close it's connections when done
func HttpPoster(addr string, skipTlsVerify bool, content interface{}, contentType string, attempts int, fallbackFilePath string, cacheIdleConns bool) ([]byte, *http.Client, error) {
func (poster *HTTPPoster) Post(addr string, contentType string, content interface{}, attempts int, fallbackFilePath string) ([]byte, error) {
if !IsSliceMember([]string{CONTENT_JSON, CONTENT_FORM, CONTENT_TEXT}, contentType) {
return nil, nil, fmt.Errorf("Unsupported ContentType: %s", contentType)
return nil, fmt.Errorf("unsupported ContentType: %s", contentType)
}
var body []byte // Used to write in file and send over http
var urlVals url.Values // Used when posting form
@@ -82,13 +93,6 @@ func HttpPoster(addr string, skipTlsVerify bool, content interface{}, contentTyp
urlVals = content.(url.Values)
body = []byte(urlVals.Encode())
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTlsVerify},
}
if !cacheIdleConns {
tr.DisableKeepAlives = true
}
client := &http.Client{Transport: tr}
delay := Fib()
bodyType := "application/x-www-form-urlencoded"
if contentType == CONTENT_JSON {
@@ -98,37 +102,37 @@ func HttpPoster(addr string, skipTlsVerify bool, content interface{}, contentTyp
for i := 0; i < attempts; i++ {
var resp *http.Response
if IsSliceMember([]string{CONTENT_JSON, CONTENT_TEXT}, contentType) {
resp, err = client.Post(addr, bodyType, bytes.NewBuffer(body))
resp, err = poster.httpClient.Post(addr, bodyType, bytes.NewBuffer(body))
} else if contentType == CONTENT_FORM {
resp, err = client.PostForm(addr, urlVals)
resp, err = poster.httpClient.PostForm(addr, urlVals)
}
if err != nil {
Logger.Warning(fmt.Sprintf("<HttpPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
time.Sleep(delay())
continue
}
defer resp.Body.Close()
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
Logger.Warning(fmt.Sprintf("<HttpPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
time.Sleep(delay())
continue
}
if resp.StatusCode > 299 {
Logger.Warning(fmt.Sprintf("<HttpPoster> Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode))
Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode))
time.Sleep(delay())
continue
}
return respBody, client, nil
return respBody, nil
}
// If we got that far, post was not possible, write it on disk
fileOut, err := os.Create(fallbackFilePath)
if err != nil {
return nil, client, err
return nil, err
}
defer fileOut.Close()
if _, err := fileOut.Write(body); err != nil {
return nil, client, err
return nil, err
}
return nil, client, nil
return nil, nil
}

View File

@@ -24,6 +24,7 @@ import (
"os"
"reflect"
"testing"
"time"
)
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
@@ -40,7 +41,7 @@ func TestHttpJsonPoster(t *testing.T) {
content := &TestContent{Var1: "Val1", Var2: "Val2"}
jsn, _ := json.Marshal(content)
filePath := "/tmp/cgr_test_http_poster.json"
if _, _, err := HttpPoster("http://localhost:8080/invalid", true, jsn, CONTENT_JSON, 3, filePath, false); err != nil {
if _, err := NewHTTPPoster(true, time.Duration(2*time.Second)).Post("http://localhost:8080/invalid", CONTENT_JSON, jsn, 3, filePath); err != nil {
t.Error(err)
}
if readBytes, err := ioutil.ReadFile(filePath); err != nil {
@@ -61,7 +62,7 @@ func TestHttpBytesPoster(t *testing.T) {
Test2
`)
filePath := "/tmp/test_http_poster.http"
if _, _, err := HttpPoster("http://localhost:8080/invalid", true, content, CONTENT_TEXT, 3, filePath, false); err != nil {
if _, err := NewHTTPPoster(true, time.Duration(2*time.Second)).Post("http://localhost:8080/invalid", CONTENT_TEXT, content, 3, filePath); err != nil {
t.Error(err)
}
if readBytes, err := ioutil.ReadFile(filePath); err != nil {