mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Partially added support for HTTP Headers for *http_post exporter
This commit is contained in:
committed by
Dan Christian Bogos
parent
5927e96437
commit
a64e58278d
@@ -20,6 +20,7 @@ package ees
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -75,6 +76,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error)
|
||||
httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int64) + 1
|
||||
|
||||
urlVals := url.Values{}
|
||||
hdr := http.Header{}
|
||||
if len(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ContentFields()) == 0 {
|
||||
for k, v := range cgrEv.Event {
|
||||
urlVals.Set(k, utils.IfaceAsString(v))
|
||||
@@ -104,13 +106,19 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error)
|
||||
}
|
||||
urlVals.Set(strings.Join(itm.Path, utils.NestingSep), utils.IfaceAsString(itm.Data))
|
||||
}
|
||||
if hdr, err = httpPost.composeHeader(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
updateEEMetrics(httpPost.dc, cgrEv.Event, httpPost.cgrCfg.GeneralCfg().DefaultTimezone)
|
||||
if err = httpPost.httpPoster.PostValues(urlVals); err != nil &&
|
||||
if err = httpPost.httpPoster.PostValues(urlVals, hdr); err != nil &&
|
||||
httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath,
|
||||
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, urlVals,
|
||||
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Opts)
|
||||
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS,
|
||||
engine.HTTPPosterRequest{
|
||||
Header: hdr,
|
||||
Body: urlVals,
|
||||
}, httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Opts)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -118,3 +126,28 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREventWithOpts) (err error)
|
||||
func (httpPost *HTTPPost) GetMetrics() utils.MapStorage {
|
||||
return httpPost.dc.Clone()
|
||||
}
|
||||
|
||||
// Compose and cache the header
|
||||
func (httpPost *HTTPPost) composeHeader() (hdr http.Header, err error) {
|
||||
hdr = make(http.Header)
|
||||
if len(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].HeaderFields()) == 0 {
|
||||
return
|
||||
}
|
||||
eeReq := NewEventExporterRequest(nil, httpPost.dc, nil,
|
||||
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Tenant,
|
||||
httpPost.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone,
|
||||
httpPost.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
httpPost.filterS)
|
||||
if err = eeReq.SetFields(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].HeaderFields()); err != nil {
|
||||
return
|
||||
}
|
||||
for el := eeReq.hdr.GetFirstElement(); el != nil; el = el.Next() {
|
||||
var strVal string
|
||||
if strVal, err = eeReq.hdr.FieldAsString(el.Value.Slice()); err != nil {
|
||||
return
|
||||
}
|
||||
hdr.Set(strings.TrimPrefix(el.Value.String(), utils.MetaHdr), strVal)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"fmt"
|
||||
"html/template"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/smtp"
|
||||
"reflect"
|
||||
"sort"
|
||||
@@ -389,7 +390,7 @@ func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = pstr.PostValues(body)
|
||||
err = pstr.PostValues(body, make(http.Header))
|
||||
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{}))
|
||||
err = nil
|
||||
@@ -409,7 +410,7 @@ func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) er
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
err := pstr.PostValues(body)
|
||||
err := pstr.PostValues(body, make(http.Header))
|
||||
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{}))
|
||||
}
|
||||
@@ -952,7 +953,7 @@ func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = pstr.PostValues(body)
|
||||
err = pstr.PostValues(body, make(http.Header))
|
||||
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{}))
|
||||
err = nil
|
||||
|
||||
@@ -164,9 +164,10 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export
|
||||
return expEv, err
|
||||
}
|
||||
for _, ev := range expEv.Events {
|
||||
err = pstr.PostValues(ev)
|
||||
req := ev.(HTTPPosterRequest)
|
||||
err = pstr.PostValues(req.Body, req.Header)
|
||||
if err != nil {
|
||||
failedEvents.AddEvent(ev)
|
||||
failedEvents.AddEvent(req)
|
||||
}
|
||||
}
|
||||
if len(failedEvents.Events) > 0 {
|
||||
|
||||
@@ -21,14 +21,21 @@ package engine
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
type HTTPPosterRequest struct {
|
||||
Header http.Header
|
||||
Body interface{}
|
||||
}
|
||||
|
||||
// HTTPPostJSON posts without automatic failover
|
||||
func HTTPPostJSON(url string, content []byte) (respBody []byte, err error) {
|
||||
client := &http.Client{Transport: httpPstrTransport}
|
||||
@@ -70,14 +77,14 @@ type HTTPPoster struct {
|
||||
}
|
||||
|
||||
// PostValues will post the event
|
||||
func (pstr *HTTPPoster) PostValues(content interface{}) (err error) {
|
||||
_, err = pstr.GetResponse(content)
|
||||
func (pstr *HTTPPoster) PostValues(content interface{}, hdr http.Header) (err error) {
|
||||
_, err = pstr.GetResponse(content, hdr)
|
||||
return
|
||||
}
|
||||
|
||||
// Post will post the event
|
||||
func (pstr *HTTPPoster) Post(content []byte, _ string) (err error) {
|
||||
_, err = pstr.GetResponse(content)
|
||||
_, err = pstr.GetResponse(content, make(http.Header))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -85,51 +92,59 @@ func (pstr *HTTPPoster) Post(content []byte, _ string) (err error) {
|
||||
func (*HTTPPoster) Close() {}
|
||||
|
||||
// GetResponse will post the event and return the response
|
||||
func (pstr *HTTPPoster) GetResponse(content interface{}) (respBody []byte, err error) {
|
||||
var body []byte // Used to write in file and send over http
|
||||
var urlVals url.Values // Used when posting form
|
||||
if pstr.contentType == utils.CONTENT_FORM {
|
||||
urlVals = content.(url.Values)
|
||||
} else {
|
||||
body = content.([]byte)
|
||||
}
|
||||
func (pstr *HTTPPoster) GetResponse(content interface{}, hdr http.Header) (respBody []byte, err error) {
|
||||
fib := utils.Fib()
|
||||
bodyType := "application/x-www-form-urlencoded"
|
||||
if pstr.contentType == utils.CONTENT_JSON {
|
||||
bodyType = "application/json"
|
||||
}
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
var resp *http.Response
|
||||
if pstr.contentType == utils.CONTENT_FORM {
|
||||
resp, err = pstr.httpClient.PostForm(pstr.addr, urlVals)
|
||||
} else {
|
||||
resp, err = pstr.httpClient.Post(pstr.addr, bodyType, bytes.NewBuffer(body))
|
||||
var req *http.Request
|
||||
if req, err = pstr.getRequest(content, hdr); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error creating request: <%s>", pstr.addr, err.Error()))
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", pstr.addr, err.Error()))
|
||||
if respBody, err = pstr.do(req); err != nil {
|
||||
if i+1 < pstr.attempts {
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
continue
|
||||
}
|
||||
respBody, err = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", pstr.addr, err.Error()))
|
||||
if i+1 < pstr.attempts {
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode > 299 {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", pstr.addr, resp.StatusCode))
|
||||
err = utils.ErrServerError
|
||||
if i+1 < pstr.attempts {
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return respBody, nil
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *HTTPPoster) do(req *http.Request) (respBody []byte, err error) {
|
||||
var resp *http.Response
|
||||
if resp, err = pstr.httpClient.Do(req); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", pstr.addr, err.Error()))
|
||||
return
|
||||
}
|
||||
respBody, err = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", pstr.addr, err.Error()))
|
||||
return
|
||||
}
|
||||
if resp.StatusCode > 299 {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", pstr.addr, resp.StatusCode))
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *HTTPPoster) getRequest(content interface{}, hdr http.Header) (req *http.Request, err error) {
|
||||
var body io.Reader
|
||||
if pstr.contentType == utils.CONTENT_FORM {
|
||||
body = strings.NewReader(content.(url.Values).Encode())
|
||||
} else {
|
||||
body = bytes.NewBuffer(content.([]byte))
|
||||
}
|
||||
contentType := "application/x-www-form-urlencoded"
|
||||
if pstr.contentType == utils.CONTENT_JSON {
|
||||
contentType = "application/json"
|
||||
}
|
||||
hdr.Set("Content-Type", contentType)
|
||||
if req, err = http.NewRequest(http.MethodPost, pstr.addr, body); err != nil {
|
||||
return
|
||||
}
|
||||
req.Header = hdr
|
||||
return
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
@@ -71,7 +72,7 @@ func TestHttpJsonPoster(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err = pstr.PostValues(jsn); err == nil {
|
||||
if err = pstr.PostValues(jsn, make(http.Header)); err == nil {
|
||||
t.Error("Expected error")
|
||||
}
|
||||
AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn, make(map[string]interface{}))
|
||||
@@ -104,7 +105,7 @@ func TestHttpBytesPoster(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err = pstr.PostValues(content); err == nil {
|
||||
if err = pstr.PostValues(content, make(http.Header)); err == nil {
|
||||
t.Error("Expected error")
|
||||
}
|
||||
AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content, make(map[string]interface{}))
|
||||
|
||||
Reference in New Issue
Block a user