diff --git a/ees/httppost.go b/ees/httppost.go index ad13ea5eb..e9ba2d338 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -20,6 +20,7 @@ package ees import ( "fmt" + "net/http" "net/url" "strings" "sync" @@ -75,6 +76,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int64) + 1 urlVals := url.Values{} + hdr := http.Header{} if len(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ContentFields()) == 0 { for k, v := range cgrEv.Event { urlVals.Set(k, utils.IfaceAsString(v)) @@ -104,13 +106,19 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) } urlVals.Set(strings.Join(itm.Path, utils.NestingSep), utils.IfaceAsString(itm.Data)) } + if hdr, err = httpPost.composeHeader(); err != nil { + return + } } updateEEMetrics(httpPost.dc, cgrEv.Event, httpPost.cgrCfg.GeneralCfg().DefaultTimezone) - if err = httpPost.httpPoster.PostValues(urlVals); err != nil && + if err = httpPost.httpPoster.PostValues(urlVals, hdr); err != nil && httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE { engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath, - httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, urlVals, - httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Opts) + httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, + engine.HTTPPosterRequest{ + Header: hdr, + Body: urlVals, + }, httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Opts) } return } @@ -118,3 +126,28 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error) func (httpPost *HTTPPost) GetMetrics() utils.MapStorage { return httpPost.dc.Clone() } + +// Compose and cache the header +func (httpPost *HTTPPost) composeHeader() (hdr http.Header, err error) { + hdr = make(http.Header) + if len(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].HeaderFields()) == 0 { + return + } + eeReq := NewEventExporterRequest(nil, httpPost.dc, nil, + httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Tenant, + httpPost.cgrCfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone, + httpPost.cgrCfg.GeneralCfg().DefaultTimezone), + httpPost.filterS) + if err = eeReq.SetFields(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].HeaderFields()); err != nil { + return + } + for el := eeReq.hdr.GetFirstElement(); el != nil; el = el.Next() { + var strVal string + if strVal, err = eeReq.hdr.FieldAsString(el.Value.Slice()); err != nil { + return + } + hdr.Set(strings.TrimPrefix(el.Value.String(), utils.MetaHdr), strVal) + } + return +} diff --git a/engine/action.go b/engine/action.go index a0556be0a..5d013b017 100644 --- a/engine/action.go +++ b/engine/action.go @@ -25,6 +25,7 @@ import ( "fmt" "html/template" "net" + "net/http" "net/smtp" "reflect" "sort" @@ -389,7 +390,7 @@ func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error { if err != nil { return err } - err = pstr.PostValues(body) + err = pstr.PostValues(body, make(http.Header)) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{})) err = nil @@ -409,7 +410,7 @@ func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) er return err } go func() { - err := pstr.PostValues(body) + err := pstr.PostValues(body, make(http.Header)) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{})) } @@ -952,7 +953,7 @@ func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error if err != nil { return err } - err = pstr.PostValues(body) + err = pstr.PostValues(body, make(http.Header)) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{})) err = nil diff --git a/engine/libcdre.go b/engine/libcdre.go index 68c86faff..350df9150 100644 --- a/engine/libcdre.go +++ b/engine/libcdre.go @@ -164,9 +164,10 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export return expEv, err } for _, ev := range expEv.Events { - err = pstr.PostValues(ev) + req := ev.(HTTPPosterRequest) + err = pstr.PostValues(req.Body, req.Header) if err != nil { - failedEvents.AddEvent(ev) + failedEvents.AddEvent(req) } } if len(failedEvents.Events) > 0 { diff --git a/engine/pstr_http.go b/engine/pstr_http.go index 100f68072..a7939e857 100644 --- a/engine/pstr_http.go +++ b/engine/pstr_http.go @@ -21,14 +21,21 @@ package engine import ( "bytes" "fmt" + "io" "io/ioutil" "net/http" "net/url" + "strings" "time" "github.com/cgrates/cgrates/utils" ) +type HTTPPosterRequest struct { + Header http.Header + Body interface{} +} + // HTTPPostJSON posts without automatic failover func HTTPPostJSON(url string, content []byte) (respBody []byte, err error) { client := &http.Client{Transport: httpPstrTransport} @@ -70,14 +77,14 @@ type HTTPPoster struct { } // PostValues will post the event -func (pstr *HTTPPoster) PostValues(content interface{}) (err error) { - _, err = pstr.GetResponse(content) +func (pstr *HTTPPoster) PostValues(content interface{}, hdr http.Header) (err error) { + _, err = pstr.GetResponse(content, hdr) return } // Post will post the event func (pstr *HTTPPoster) Post(content []byte, _ string) (err error) { - _, err = pstr.GetResponse(content) + _, err = pstr.GetResponse(content, make(http.Header)) return } @@ -85,51 +92,59 @@ func (pstr *HTTPPoster) Post(content []byte, _ string) (err error) { func (*HTTPPoster) Close() {} // GetResponse will post the event and return the response -func (pstr *HTTPPoster) GetResponse(content interface{}) (respBody []byte, err error) { - var body []byte // Used to write in file and send over http - var urlVals url.Values // Used when posting form - if pstr.contentType == utils.CONTENT_FORM { - urlVals = content.(url.Values) - } else { - body = content.([]byte) - } +func (pstr *HTTPPoster) GetResponse(content interface{}, hdr http.Header) (respBody []byte, err error) { fib := utils.Fib() - bodyType := "application/x-www-form-urlencoded" - if pstr.contentType == utils.CONTENT_JSON { - bodyType = "application/json" - } for i := 0; i < pstr.attempts; i++ { - var resp *http.Response - if pstr.contentType == utils.CONTENT_FORM { - resp, err = pstr.httpClient.PostForm(pstr.addr, urlVals) - } else { - resp, err = pstr.httpClient.Post(pstr.addr, bodyType, bytes.NewBuffer(body)) + var req *http.Request + if req, err = pstr.getRequest(content, hdr); err != nil { + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error creating request: <%s>", pstr.addr, err.Error())) + return } - if err != nil { - utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", pstr.addr, err.Error())) + if respBody, err = pstr.do(req); err != nil { if i+1 < pstr.attempts { time.Sleep(time.Duration(fib()) * time.Second) } continue } - respBody, err = ioutil.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", pstr.addr, err.Error())) - if i+1 < pstr.attempts { - time.Sleep(time.Duration(fib()) * time.Second) - } - continue - } - if resp.StatusCode > 299 { - utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", pstr.addr, resp.StatusCode)) - err = utils.ErrServerError - if i+1 < pstr.attempts { - time.Sleep(time.Duration(fib()) * time.Second) - } - continue - } - return respBody, nil + return } return } + +func (pstr *HTTPPoster) do(req *http.Request) (respBody []byte, err error) { + var resp *http.Response + if resp, err = pstr.httpClient.Do(req); err != nil { + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", pstr.addr, err.Error())) + return + } + respBody, err = ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", pstr.addr, err.Error())) + return + } + if resp.StatusCode > 299 { + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", pstr.addr, resp.StatusCode)) + return + } + return +} + +func (pstr *HTTPPoster) getRequest(content interface{}, hdr http.Header) (req *http.Request, err error) { + var body io.Reader + if pstr.contentType == utils.CONTENT_FORM { + body = strings.NewReader(content.(url.Values).Encode()) + } else { + body = bytes.NewBuffer(content.([]byte)) + } + contentType := "application/x-www-form-urlencoded" + if pstr.contentType == utils.CONTENT_JSON { + contentType = "application/json" + } + hdr.Set("Content-Type", contentType) + if req, err = http.NewRequest(http.MethodPost, pstr.addr, body); err != nil { + return + } + req.Header = hdr + return +} diff --git a/engine/z_poster_it_test.go b/engine/z_poster_it_test.go index 1d6836777..c39717c6b 100644 --- a/engine/z_poster_it_test.go +++ b/engine/z_poster_it_test.go @@ -23,6 +23,7 @@ import ( "context" "encoding/json" "flag" + "net/http" "path/filepath" "reflect" "testing" @@ -71,7 +72,7 @@ func TestHttpJsonPoster(t *testing.T) { if err != nil { t.Error(err) } - if err = pstr.PostValues(jsn); err == nil { + if err = pstr.PostValues(jsn, make(http.Header)); err == nil { t.Error("Expected error") } AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn, make(map[string]interface{})) @@ -104,7 +105,7 @@ func TestHttpBytesPoster(t *testing.T) { if err != nil { t.Error(err) } - if err = pstr.PostValues(content); err == nil { + if err = pstr.PostValues(content, make(http.Header)); err == nil { t.Error("Expected error") } AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content, make(map[string]interface{}))