Added Opts to EEs

This commit is contained in:
Trial97
2020-09-01 17:08:44 +03:00
committed by Dan Christian Bogos
parent 6eb4be05c2
commit 8f638b5b85
20 changed files with 184 additions and 255 deletions

View File

@@ -373,6 +373,7 @@ const CGRATES_CFG_JSON = `
"id": "*default", // identifier of the EventReader profile
"type": "*none", // exporter type
"export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed
"opts": {}, // extra options for exporter
"tenant": "", // tenant used in filterS.Pass
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"filters": [], // limit parsing based on the filters

View File

@@ -1848,6 +1848,7 @@ func TestDfEventExporterCfg(t *testing.T) {
Synchronous: utils.BoolPointer(false),
Attempts: utils.IntPointer(1),
Fields: &eContentFlds,
Opts: make(map[string]interface{}),
},
},
}

View File

@@ -2174,6 +2174,7 @@ func TestCgrCdfEventExporter(t *testing.T) {
},
headerFields: []*FCTemplate{},
trailerFields: []*FCTemplate{},
Opts: make(map[string]interface{}),
},
},
}
@@ -2453,6 +2454,7 @@ func TestCgrCfgEventExporterDefault(t *testing.T) {
},
headerFields: []*FCTemplate{},
trailerFields: []*FCTemplate{},
Opts: make(map[string]interface{}),
}
for _, v := range eCfg.Fields {
v.ComputePath()

View File

@@ -66,28 +66,27 @@ func (eeS *EEsCfg) appendEEsExporters(exporters *[]*EventExporterJsonCfg, msgTem
return
}
for _, jsnExp := range *exporters {
exp := new(EventExporterCfg)
if dfltExpCfg != nil {
exp = dfltExpCfg.Clone()
}
var haveID bool
var exp *EventExporterCfg
if jsnExp.Id != nil {
for _, exporter := range eeS.Exporters {
if exporter.ID == *jsnExp.Id {
exp = exporter
haveID = true
break
}
}
}
if err := exp.loadFromJsonCfg(jsnExp, msgTemplates, separator); err != nil {
return err
}
if !haveID {
if exp == nil {
if dfltExpCfg != nil {
exp = dfltExpCfg.Clone()
} else {
exp = new(EventExporterCfg)
exp.Opts = make(map[string]interface{})
}
eeS.Exporters = append(eeS.Exporters, exp)
}
if err = exp.loadFromJsonCfg(jsnExp, msgTemplates, separator); err != nil {
return
}
}
return
}
@@ -123,6 +122,7 @@ type EventExporterCfg struct {
ID string
Type string
ExportPath string
Opts map[string]interface{}
Tenant RSRParsers
Timezone string
Filters []string
@@ -213,6 +213,11 @@ func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, msgTe
}
}
}
if jsnEec.Opts != nil {
for k, v := range jsnEec.Opts {
eeC.Opts[k] = v
}
}
return
}
@@ -275,6 +280,10 @@ func (eeC *EventExporterCfg) Clone() (cln *EventExporterCfg) {
for idx, fld := range eeC.trailerFields {
cln.trailerFields[idx] = fld.Clone()
}
cln.Opts = make(map[string]interface{})
for k, v := range eeC.Opts {
cln.Opts[k] = v
}
return
}
@@ -308,5 +317,6 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) map[string]interfa
utils.AttemptsCfg: eeC.Attempts,
utils.FieldSeparatorCfg: eeC.FieldSep,
utils.FieldsCfg: fields,
utils.OptsCfg: eeC.Opts,
}
}

View File

@@ -67,6 +67,7 @@ func TestEventExporterClone(t *testing.T) {
},
headerFields: []*FCTemplate{},
trailerFields: []*FCTemplate{},
Opts: make(map[string]interface{}),
}
for _, v := range orig.Fields {
v.ComputePath()
@@ -118,6 +119,7 @@ func TestEventExporterClone(t *testing.T) {
},
headerFields: []*FCTemplate{},
trailerFields: []*FCTemplate{},
Opts: make(map[string]interface{}),
}
for _, v := range initialOrig.Fields {
v.ComputePath()
@@ -365,6 +367,7 @@ func TestEventExporterSameID(t *testing.T) {
},
headerFields: []*FCTemplate{},
trailerFields: []*FCTemplate{},
Opts: make(map[string]interface{}),
},
{
ID: "file_exporter1",
@@ -386,6 +389,7 @@ func TestEventExporterSameID(t *testing.T) {
},
headerFields: []*FCTemplate{},
trailerFields: []*FCTemplate{},
Opts: make(map[string]interface{}),
},
},
}

View File

@@ -195,6 +195,7 @@ type EventExporterJsonCfg struct {
Id *string
Type *string
Export_path *string
Opts map[string]interface{}
Tenant *string
Timezone *string
Filters *[]string

View File

@@ -43,7 +43,7 @@ func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt
case utils.MetaHTTPPost:
return NewHTTPPostEe(cgrCfg, cfgIdx, filterS, dc)
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap, utils.MetaS3jsonMap:
return NewHTTPJsonMapEe(cgrCfg, cfgIdx, filterS, dc)
return NewPosterJSONMapEE(cgrCfg, cfgIdx, filterS, dc)
case utils.MetaVirt:
return NewVirtualExporter(cgrCfg, cfgIdx, filterS, dc)
default:

View File

@@ -29,64 +29,81 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (httpJSON *HTTPJsonMapEe, err error) {
dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
httpJSON = &HTTPJsonMapEe{
func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (pstrJSON *PosterJSONMapEE, err error) {
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
pstrJSON = &PosterJSONMapEE{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
}
if cgrCfg.EEsCfg().Exporters[cfgIdx].Type == utils.MetaHTTPjsonMap {
httpJSON.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type {
case utils.MetaHTTPjsonMap:
pstrJSON.poster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts)
case utils.MetaAMQPjsonMap:
pstrJSON.poster = engine.NewAMQPPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts)
case utils.MetaAMQPV1jsonMap:
pstrJSON.poster = engine.NewAMQPv1Poster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts)
case utils.MetaSQSjsonMap:
pstrJSON.poster = engine.NewSQSPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts)
case utils.MetaKafkajsonMap:
pstrJSON.poster = engine.NewKafkaPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts)
case utils.MetaS3jsonMap:
pstrJSON.poster = engine.NewS3Poster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts)
}
return
}
// HTTPJsonMapEe implements EventExporter interface for .csv files
type HTTPJsonMapEe struct {
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
httpPoster *engine.HTTPPoster
dc utils.MapStorage
// PosterJSONMapEE implements EventExporter interface for .csv files
type PosterJSONMapEE struct {
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
poster engine.Poster
dc utils.MapStorage
sync.RWMutex
}
// ID returns the identificator of this exporter
func (httpJson *HTTPJsonMapEe) ID() string {
return httpJson.id
func (pstrEE *PosterJSONMapEE) ID() string {
return pstrEE.id
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (httpJson *HTTPJsonMapEe) OnEvicted(string, interface{}) {
func (pstrEE *PosterJSONMapEE) OnEvicted(string, interface{}) {
pstrEE.poster.Close()
return
}
// ExportEvent implements EventExporter
func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
httpJson.Lock()
func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
pstrEE.Lock()
defer func() {
if err != nil {
httpJson.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
pstrEE.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
httpJson.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
pstrEE.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
httpJson.Unlock()
pstrEE.Unlock()
}()
httpJson.dc[utils.NumberOfEvents] = httpJson.dc[utils.NumberOfEvents].(int64) + 1
pstrEE.dc[utils.NumberOfEvents] = pstrEE.dc[utils.NumberOfEvents].(int64) + 1
valMp := make(map[string]string)
eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), httpJson.dc,
cgrEv.Tenant, httpJson.cgrCfg.GeneralCfg().DefaultTimezone, httpJson.filterS)
eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), pstrEE.dc,
cgrEv.Tenant, pstrEE.cgrCfg.GeneralCfg().DefaultTimezone, pstrEE.filterS)
if err = eeReq.SetFields(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ContentFields()); err != nil {
if err = eeReq.SetFields(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ContentFields()); err != nil {
return
}
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
@@ -104,44 +121,21 @@ func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
valMp[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data)
}
updateEEMetrics(httpJson.dc, cgrEv.Event, httpJson.cgrCfg.GeneralCfg().DefaultTimezone)
updateEEMetrics(pstrEE.dc, cgrEv.Event, pstrEE.cgrCfg.GeneralCfg().DefaultTimezone)
cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID())
runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)
var body []byte
if body, err = json.Marshal(valMp); err != nil {
return
}
err = httpJson.post(body, utils.ConcatenatedKey(cgrID, runID))
return
}
func (httpJson *HTTPJsonMapEe) post(body []byte, key string) (err error) {
switch httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Type {
case utils.MetaHTTPjsonMap:
err = httpJson.httpPoster.Post(body, utils.EmptyString)
case utils.MetaAMQPjsonMap:
err = engine.PostersCache.PostAMQP(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body)
case utils.MetaAMQPV1jsonMap:
err = engine.PostersCache.PostAMQPv1(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body)
case utils.MetaSQSjsonMap:
err = engine.PostersCache.PostSQS(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body)
case utils.MetaKafkajsonMap:
err = engine.PostersCache.PostKafka(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body, key)
case utils.MetaS3jsonMap:
err = engine.PostersCache.PostS3(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body, key)
}
if err != nil && httpJson.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE {
engine.AddFailedPost(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Type, utils.EventExporterS, body)
if err = pstrEE.poster.Post(body, utils.ConcatenatedKey(cgrID, runID)); err != nil &&
pstrEE.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE {
engine.AddFailedPost(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ExportPath,
pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Type, utils.EventExporterS, body)
}
return
}
func (httpJson *HTTPJsonMapEe) GetMetrics() utils.MapStorage {
return httpJson.dc.Clone()
func (pstrEE *PosterJSONMapEE) GetMetrics() utils.MapStorage {
return pstrEE.dc.Clone()
}

View File

@@ -74,7 +74,6 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}()
httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int64) + 1
var body interface{}
urlVals := url.Values{}
req := utils.MapStorage{}
for k, v := range cgrEv.Event {
@@ -101,11 +100,10 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
urlVals.Set(strings.Join(itm.Path, utils.NestingSep), utils.IfaceAsString(itm.Data))
}
updateEEMetrics(httpPost.dc, cgrEv.Event, httpPost.cgrCfg.GeneralCfg().DefaultTimezone)
body = urlVals
if err = httpPost.httpPoster.Post(body, utils.EmptyString); err != nil &&
if err = httpPost.httpPoster.PostValues(urlVals); err != nil &&
httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE {
engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath,
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, body)
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, urlVals)
}
return
}

View File

@@ -460,7 +460,7 @@ func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error {
if err != nil {
return err
}
err = pstr.Post(body, utils.EmptyString)
err = pstr.PostValues(body)
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
err = nil
@@ -481,7 +481,7 @@ func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) er
return err
}
go func() {
err := pstr.Post(body, utils.EmptyString)
err := pstr.PostValues(body)
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
}
@@ -1040,7 +1040,7 @@ func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error
if err != nil {
return err
}
err = pstr.Post(body, utils.EmptyString)
err = pstr.PostValues(body)
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
err = nil

View File

@@ -155,7 +155,7 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export
return expEv, err
}
for _, ev := range expEv.Events {
err = pstr.Post(ev, utils.EmptyString)
err = pstr.PostValues(ev)
if err != nil {
failedEvents.AddEvent(ev)
}

View File

@@ -19,8 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"net/url"
"strings"
"sync"
)
@@ -63,20 +61,6 @@ type Poster interface {
Close()
}
func parseURL(dialURL string) (URL string, qID string, err error) {
u, err := url.Parse(dialURL)
if err != nil {
return "", "", err
}
qry := u.Query()
URL = strings.Split(dialURL, "?")[0]
qID = DefaultQueueID
if vals, has := qry[QueueID]; has && len(vals) != 0 {
qID = vals[0]
}
return
}
// Close closes all cached posters
func (pc *PosterCache) Close() {
for _, v := range pc.amqpCache {
@@ -95,111 +79,81 @@ func (pc *PosterCache) Close() {
// GetAMQPPoster creates a new poster only if not already cached
// uses dialURL as cache key
func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int) (pstr Poster, err error) {
func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int) (pstr Poster) {
pc.Lock()
defer pc.Unlock()
if _, hasIt := pc.amqpCache[dialURL]; !hasIt {
if pstr, err = NewAMQPPoster(dialURL, attempts); err != nil {
return nil, err
}
pstr = NewAMQPPoster(dialURL, attempts, nil)
pc.amqpCache[dialURL] = pstr
}
return pc.amqpCache[dialURL], nil
return pc.amqpCache[dialURL]
}
// GetAMQPv1Poster creates a new poster only if not already cached
func (pc *PosterCache) GetAMQPv1Poster(dialURL string, attempts int) (pstr Poster, err error) {
func (pc *PosterCache) GetAMQPv1Poster(dialURL string, attempts int) (pstr Poster) {
pc.Lock()
defer pc.Unlock()
if _, hasIt := pc.amqpv1Cache[dialURL]; !hasIt {
if pstr, err = NewAMQPv1Poster(dialURL, attempts); err != nil {
return nil, err
}
pstr = NewAMQPv1Poster(dialURL, attempts, nil)
pc.amqpv1Cache[dialURL] = pstr
}
return pc.amqpv1Cache[dialURL], nil
return pc.amqpv1Cache[dialURL]
}
// GetSQSPoster creates a new poster only if not already cached
func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int) (pstr Poster, err error) {
func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int) (pstr Poster) {
pc.Lock()
defer pc.Unlock()
if _, hasIt := pc.sqsCache[dialURL]; !hasIt {
if pstr, err = NewSQSPoster(dialURL, attempts); err != nil {
return nil, err
}
pstr = NewSQSPoster(dialURL, attempts, nil)
pc.sqsCache[dialURL] = pstr
}
return pc.sqsCache[dialURL], nil
return pc.sqsCache[dialURL]
}
// GetKafkaPoster creates a new poster only if not already cached
func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int) (pstr Poster, err error) {
func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int) (pstr Poster) {
pc.Lock()
defer pc.Unlock()
if _, hasIt := pc.kafkaCache[dialURL]; !hasIt {
if pstr, err = NewKafkaPoster(dialURL, attempts); err != nil {
return nil, err
}
pstr = NewKafkaPoster(dialURL, attempts, nil)
pc.kafkaCache[dialURL] = pstr
}
return pc.kafkaCache[dialURL], nil
return pc.kafkaCache[dialURL]
}
// GetS3Poster creates a new poster only if not already cached
func (pc *PosterCache) GetS3Poster(dialURL string, attempts int) (pstr Poster, err error) {
func (pc *PosterCache) GetS3Poster(dialURL string, attempts int) (pstr Poster) {
pc.Lock()
defer pc.Unlock()
if _, hasIt := pc.s3Cache[dialURL]; !hasIt {
if pstr, err = NewS3Poster(dialURL, attempts); err != nil {
return nil, err
}
pstr = NewS3Poster(dialURL, attempts, nil)
pc.s3Cache[dialURL] = pstr
}
return pc.s3Cache[dialURL], nil
return pc.s3Cache[dialURL]
}
func (pc *PosterCache) PostAMQP(dialURL string, attempts int,
content []byte) error {
amqpPoster, err := pc.GetAMQPPoster(dialURL, attempts)
if err != nil {
return err
}
return amqpPoster.Post(content, "")
return pc.GetAMQPPoster(dialURL, attempts).Post(content, "")
}
func (pc *PosterCache) PostAMQPv1(dialURL string, attempts int,
content []byte) error {
AMQPv1Poster, err := pc.GetAMQPv1Poster(dialURL, attempts)
if err != nil {
return err
}
return AMQPv1Poster.Post(content, "")
return pc.GetAMQPv1Poster(dialURL, attempts).Post(content, "")
}
func (pc *PosterCache) PostSQS(dialURL string, attempts int,
content []byte) error {
sqsPoster, err := pc.GetSQSPoster(dialURL, attempts)
if err != nil {
return err
}
return sqsPoster.Post(content, "")
return pc.GetSQSPoster(dialURL, attempts).Post(content, "")
}
func (pc *PosterCache) PostKafka(dialURL string, attempts int,
content []byte, key string) error {
kafkaPoster, err := pc.GetKafkaPoster(dialURL, attempts)
if err != nil {
return err
}
return kafkaPoster.Post(content, key)
return pc.GetKafkaPoster(dialURL, attempts).Post(content, key)
}
func (pc *PosterCache) PostS3(dialURL string, attempts int,
content []byte, key string) error {
sqsPoster, err := pc.GetS3Poster(dialURL, attempts)
if err != nil {
return err
}
return sqsPoster.Post(content, key)
return pc.GetS3Poster(dialURL, attempts).Post(content, key)
}

View File

@@ -35,7 +35,7 @@ func TestAMQPPosterParseURL(t *testing.T) {
routingKey: "CGRCDR",
}
dialURL := "amqp://guest:guest@localhost:5672/?queue_id=q1&exchange=E1&routing_key=CGRCDR&heartbeat=5&exchange_type=fanout"
if err := amqp.parseURL(dialURL); err != nil {
if err := amqp.parseOpts(dialURL); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, amqp) {
t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(expected), utils.ToJSON(amqp))
@@ -49,7 +49,7 @@ func TestKafkaParseURL(t *testing.T) {
topic: "cdr_billing",
attempts: 10,
}
if kfk, err := NewKafkaPoster(u, 10); err != nil {
if kfk, err := NewKafkaPoster(u, 10, nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, kfk) {
t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk))
@@ -60,7 +60,7 @@ func TestKafkaParseURL(t *testing.T) {
topic: "cdr_billing",
attempts: 10,
}
if kfk, err := NewKafkaPoster(u, 10); err != nil {
if kfk, err := NewKafkaPoster(u, 10, nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, kfk) {
t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk))

View File

@@ -20,8 +20,6 @@ package engine
import (
"fmt"
"net/url"
"strings"
"sync"
"time"
@@ -34,14 +32,13 @@ var AMQPPosibleQuery = []string{"cacertfile", "certfile", "keyfile", "verify", "
// NewAMQPPoster creates a new amqp poster
// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs"
func NewAMQPPoster(dialURL string, attempts int) (*AMQPPoster, error) {
func NewAMQPPoster(dialURL string, attempts int, opts map[string]interface{}) *AMQPPoster {
amqp := &AMQPPoster{
attempts: attempts,
dialURL: dialURL,
}
if err := amqp.parseURL(dialURL); err != nil {
return nil, err
}
return amqp, nil
amqp.parseOpts(opts)
return amqp
}
// AMQPPoster used to post cdrs to amqp
@@ -56,35 +53,23 @@ type AMQPPoster struct {
conn *amqp.Connection
}
func (pstr *AMQPPoster) parseURL(dialURL string) error {
u, err := url.Parse(dialURL)
if err != nil {
return err
}
qry := u.Query()
q := url.Values{}
for _, key := range AMQPPosibleQuery {
if vals, has := qry[key]; has && len(vals) != 0 {
q.Add(key, vals[0])
}
}
pstr.dialURL = strings.Split(dialURL, "?")[0] + "?" + q.Encode()
func (pstr *AMQPPoster) parseOpts(dialURL map[string]interface{}) {
pstr.queueID = DefaultQueueID
pstr.routingKey = DefaultQueueID
if vals, has := qry[QueueID]; has && len(vals) != 0 {
pstr.queueID = vals[0]
if vals, has := dialURL[QueueID]; has {
pstr.queueID = utils.IfaceAsString(vals)
}
if vals, has := qry[RoutingKey]; has && len(vals) != 0 {
pstr.routingKey = vals[0]
if vals, has := dialURL[RoutingKey]; has {
pstr.routingKey = utils.IfaceAsString(vals)
}
if vals, has := qry[Exchange]; has && len(vals) != 0 {
pstr.exchange = vals[0]
if vals, has := dialURL[Exchange]; has {
pstr.exchange = utils.IfaceAsString(vals)
pstr.exchangeType = DefaultExchangeType
}
if vals, has := qry[ExchangeType]; has && len(vals) != 0 {
pstr.exchangeType = vals[0]
if vals, has := dialURL[ExchangeType]; has {
pstr.exchangeType = utils.IfaceAsString(vals)
}
return nil
return
}
// Post is the method being called when we need to post anything in the queue

View File

@@ -30,16 +30,16 @@ import (
)
// NewAMQPv1Poster creates a poster for amqpv1
func NewAMQPv1Poster(dialURL string, attempts int) (Poster, error) {
URL, qID, err := parseURL(dialURL)
if err != nil {
return nil, err
}
return &AMQPv1Poster{
dialURL: URL,
queueID: "/" + qID,
func NewAMQPv1Poster(dialURL string, attempts int, opts map[string]interface{}) Poster {
pstr := &AMQPv1Poster{
dialURL: dialURL,
queueID: "/" + DefaultQueueID,
attempts: attempts,
}, nil
}
if vals, has := opts[QueueID]; has {
pstr.queueID = "/" + utils.IfaceAsString(vals)
}
return pstr
}
// AMQPv1Poster a poster for amqpv1

View File

@@ -83,19 +83,27 @@ type HTTPPoster struct {
attempts int
}
// Post will post the event
func (pstr *HTTPPoster) Post(content interface{}, key string) (err error) {
// PostValues will post the event
func (pstr *HTTPPoster) PostValues(content interface{}) (err error) {
_, err = pstr.GetResponse(content)
return
}
// Post will post the event
func (pstr *HTTPPoster) Post(content []byte, _ string) (err error) {
_, err = pstr.GetResponse(content)
return
}
// Close only yo implement the Poster interface
func (*HTTPPoster) Close() {}
// GetResponse will post the event and return the response
func (pstr *HTTPPoster) GetResponse(content interface{}) (respBody []byte, err error) {
var body []byte // Used to write in file and send over http
var urlVals url.Values // Used when posting form
if pstr.contentType == utils.CONTENT_FORM {
urlVals = content.(url.Values)
body = []byte(urlVals.Encode())
} else {
body = content.([]byte)
}

View File

@@ -19,8 +19,6 @@ package engine
import (
"context"
"net/url"
"strings"
"sync"
"github.com/cgrates/cgrates/utils"
@@ -28,14 +26,16 @@ import (
)
// NewKafkaPoster creates a kafka poster
func NewKafkaPoster(dialURL string, attempts int) (*KafkaPoster, error) {
func NewKafkaPoster(dialURL string, attempts int, opts map[string]interface{}) *KafkaPoster {
kfkPstr := &KafkaPoster{
dialURL: dialURL,
attempts: attempts,
topic: DefaultQueueID,
}
if err := kfkPstr.parseURL(dialURL); err != nil {
return nil, err
if vals, has := opts[utils.KafkaTopic]; has {
kfkPstr.topic = utils.IfaceAsString(vals)
}
return kfkPstr, nil
return kfkPstr
}
// KafkaPoster is a kafka poster
@@ -47,26 +47,6 @@ type KafkaPoster struct {
writer *kafka.Writer
}
func (pstr *KafkaPoster) parseURL(dialURL string) error {
pstr.topic = DefaultQueueID
i := strings.IndexByte(dialURL, '?')
if i < 0 {
pstr.dialURL = dialURL
return nil
}
pstr.dialURL = dialURL[:i]
rawQuery := dialURL[i+1:]
qry, err := url.ParseQuery(rawQuery)
if err != nil {
return err
}
pstr.dialURL = strings.Split(dialURL, "?")[0]
if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 {
pstr.topic = vals[0]
}
return nil
}
// Post is the method being called when we need to post anything in the queue
// the optional chn will permits channel caching
func (pstr *KafkaPoster) Post(content []byte, key string) (err error) {

View File

@@ -21,7 +21,6 @@ package engine
import (
"bytes"
"fmt"
"strings"
"sync"
"time"
@@ -33,12 +32,13 @@ import (
)
// NewS3Poster creates a s3 poster
func NewS3Poster(dialURL string, attempts int) (Poster, error) {
func NewS3Poster(dialURL string, attempts int, opts map[string]interface{}) Poster {
pstr := &S3Poster{
dialURL: dialURL,
attempts: attempts,
}
pstr.parseURL(dialURL)
return pstr, nil
pstr.parseOpts(opts)
return pstr
}
// S3Poster is a s3 poster
@@ -58,29 +58,25 @@ type S3Poster struct {
// Close for Poster interface
func (pstr *S3Poster) Close() {}
func (pstr *S3Poster) parseURL(dialURL string) {
qry := utils.GetUrlRawArguments(dialURL)
pstr.dialURL = strings.Split(dialURL, "?")[0]
pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint
func (pstr *S3Poster) parseOpts(opts map[string]interface{}) {
pstr.queueID = DefaultQueueID
if val, has := qry[QueueID]; has {
pstr.queueID = val
if val, has := opts[QueueID]; has {
pstr.queueID = utils.IfaceAsString(val)
}
if val, has := qry[folderPath]; has {
pstr.folderPath = val
if val, has := opts[folderPath]; has {
pstr.folderPath = utils.IfaceAsString(val)
}
if val, has := qry[utils.AWSRegion]; has {
pstr.awsRegion = val
if val, has := opts[utils.AWSRegion]; has {
pstr.awsRegion = utils.IfaceAsString(val)
}
if val, has := qry[utils.AWSKey]; has {
pstr.awsID = val
if val, has := opts[utils.AWSKey]; has {
pstr.awsID = utils.IfaceAsString(val)
}
if val, has := qry[utils.AWSSecret]; has {
pstr.awsKey = val
if val, has := opts[utils.AWSSecret]; has {
pstr.awsKey = utils.IfaceAsString(val)
}
if val, has := qry[awsToken]; has {
pstr.awsToken = val
if val, has := opts[awsToken]; has {
pstr.awsToken = utils.IfaceAsString(val)
}
}

View File

@@ -20,7 +20,6 @@ package engine
import (
"fmt"
"strings"
"sync"
"time"
@@ -33,12 +32,12 @@ import (
)
// NewSQSPoster creates a poster for sqs
func NewSQSPoster(dialURL string, attempts int) (Poster, error) {
func NewSQSPoster(dialURL string, attempts int, opts map[string]interface{}) Poster {
pstr := &SQSPoster{
attempts: attempts,
}
pstr.parseURL(dialURL)
return pstr, nil
pstr.parseOpts(opts)
return pstr
}
// SQSPoster is a poster for sqs
@@ -59,26 +58,22 @@ type SQSPoster struct {
// Close for Poster interface
func (pstr *SQSPoster) Close() {}
func (pstr *SQSPoster) parseURL(dialURL string) {
qry := utils.GetUrlRawArguments(dialURL)
pstr.dialURL = strings.Split(dialURL, "?")[0]
pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint
func (pstr *SQSPoster) parseOpts(opts map[string]interface{}) {
pstr.queueID = DefaultQueueID
if val, has := qry[QueueID]; has {
pstr.queueID = val
if val, has := opts[QueueID]; has {
pstr.queueID = utils.IfaceAsString(val)
}
if val, has := qry[utils.AWSRegion]; has {
pstr.awsRegion = val
if val, has := opts[utils.AWSRegion]; has {
pstr.awsRegion = utils.IfaceAsString(val)
}
if val, has := qry[utils.AWSKey]; has {
pstr.awsID = val
if val, has := opts[utils.AWSKey]; has {
pstr.awsID = utils.IfaceAsString(val)
}
if val, has := qry[utils.AWSSecret]; has {
pstr.awsKey = val
if val, has := opts[utils.AWSSecret]; has {
pstr.awsKey = utils.IfaceAsString(val)
}
if val, has := qry[awsToken]; has {
pstr.awsToken = val
if val, has := opts[awsToken]; has {
pstr.awsToken = utils.IfaceAsString(val)
}
pstr.getQueueURL()
}

View File

@@ -72,7 +72,7 @@ func TestHttpJsonPoster(t *testing.T) {
if err != nil {
t.Error(err)
}
if err = pstr.Post(jsn, utils.EmptyString); err == nil {
if err = pstr.PostValues(jsn); err == nil {
t.Error("Expected error")
}
AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn)
@@ -105,7 +105,7 @@ func TestHttpBytesPoster(t *testing.T) {
if err != nil {
t.Error(err)
}
if err = pstr.Post(content, utils.EmptyString); err == nil {
if err = pstr.PostValues(content); err == nil {
t.Error("Expected error")
}
AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content)