diff --git a/config/config_defaults.go b/config/config_defaults.go index 3ca23082c..d65cc613b 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -373,6 +373,7 @@ const CGRATES_CFG_JSON = ` "id": "*default", // identifier of the EventReader profile "type": "*none", // exporter type "export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed + "opts": {}, // extra options for exporter "tenant": "", // tenant used in filterS.Pass "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> "filters": [], // limit parsing based on the filters diff --git a/config/config_json_test.go b/config/config_json_test.go index 878382c2e..3c10fdef0 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -1848,6 +1848,7 @@ func TestDfEventExporterCfg(t *testing.T) { Synchronous: utils.BoolPointer(false), Attempts: utils.IntPointer(1), Fields: &eContentFlds, + Opts: make(map[string]interface{}), }, }, } diff --git a/config/config_test.go b/config/config_test.go index 9803e32ab..b5ae0d9b2 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -2174,6 +2174,7 @@ func TestCgrCdfEventExporter(t *testing.T) { }, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, + Opts: make(map[string]interface{}), }, }, } @@ -2453,6 +2454,7 @@ func TestCgrCfgEventExporterDefault(t *testing.T) { }, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, + Opts: make(map[string]interface{}), } for _, v := range eCfg.Fields { v.ComputePath() diff --git a/config/eescfg.go b/config/eescfg.go index 7da0893b0..702f4c429 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -66,28 +66,27 @@ func (eeS *EEsCfg) appendEEsExporters(exporters *[]*EventExporterJsonCfg, msgTem return } for _, jsnExp := range *exporters { - exp := new(EventExporterCfg) - if dfltExpCfg != nil { - exp = dfltExpCfg.Clone() - } - var haveID bool + var exp *EventExporterCfg if jsnExp.Id != nil { for _, exporter := range eeS.Exporters { if exporter.ID == *jsnExp.Id { exp = exporter - haveID = true break } } } - - if err := exp.loadFromJsonCfg(jsnExp, msgTemplates, separator); err != nil { - return err - } - if !haveID { + if exp == nil { + if dfltExpCfg != nil { + exp = dfltExpCfg.Clone() + } else { + exp = new(EventExporterCfg) + exp.Opts = make(map[string]interface{}) + } eeS.Exporters = append(eeS.Exporters, exp) } - + if err = exp.loadFromJsonCfg(jsnExp, msgTemplates, separator); err != nil { + return + } } return } @@ -123,6 +122,7 @@ type EventExporterCfg struct { ID string Type string ExportPath string + Opts map[string]interface{} Tenant RSRParsers Timezone string Filters []string @@ -213,6 +213,11 @@ func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, msgTe } } } + if jsnEec.Opts != nil { + for k, v := range jsnEec.Opts { + eeC.Opts[k] = v + } + } return } @@ -275,6 +280,10 @@ func (eeC *EventExporterCfg) Clone() (cln *EventExporterCfg) { for idx, fld := range eeC.trailerFields { cln.trailerFields[idx] = fld.Clone() } + cln.Opts = make(map[string]interface{}) + for k, v := range eeC.Opts { + cln.Opts[k] = v + } return } @@ -308,5 +317,6 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) map[string]interfa utils.AttemptsCfg: eeC.Attempts, utils.FieldSeparatorCfg: eeC.FieldSep, utils.FieldsCfg: fields, + utils.OptsCfg: eeC.Opts, } } diff --git a/config/eescfg_test.go b/config/eescfg_test.go index 5424f7173..17df19253 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -67,6 +67,7 @@ func TestEventExporterClone(t *testing.T) { }, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, + Opts: make(map[string]interface{}), } for _, v := range orig.Fields { v.ComputePath() @@ -118,6 +119,7 @@ func TestEventExporterClone(t *testing.T) { }, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, + Opts: make(map[string]interface{}), } for _, v := range initialOrig.Fields { v.ComputePath() @@ -365,6 +367,7 @@ func TestEventExporterSameID(t *testing.T) { }, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, + Opts: make(map[string]interface{}), }, { ID: "file_exporter1", @@ -386,6 +389,7 @@ func TestEventExporterSameID(t *testing.T) { }, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, + Opts: make(map[string]interface{}), }, }, } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 8c5e12de6..fed0cb812 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -195,6 +195,7 @@ type EventExporterJsonCfg struct { Id *string Type *string Export_path *string + Opts map[string]interface{} Tenant *string Timezone *string Filters *[]string diff --git a/ees/ee.go b/ees/ee.go index b63911ba0..191b3d459 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -43,7 +43,7 @@ func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt case utils.MetaHTTPPost: return NewHTTPPostEe(cgrCfg, cfgIdx, filterS, dc) case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap, utils.MetaS3jsonMap: - return NewHTTPJsonMapEe(cgrCfg, cfgIdx, filterS, dc) + return NewPosterJSONMapEE(cgrCfg, cfgIdx, filterS, dc) case utils.MetaVirt: return NewVirtualExporter(cgrCfg, cfgIdx, filterS, dc) default: diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index ba7d6038b..abd417e62 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -29,64 +29,81 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, - dc utils.MapStorage) (httpJSON *HTTPJsonMapEe, err error) { - dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID - httpJSON = &HTTPJsonMapEe{ +func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, + dc utils.MapStorage) (pstrJSON *PosterJSONMapEE, err error) { + dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID + pstrJSON = &PosterJSONMapEE{ id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc, } - if cgrCfg.EEsCfg().Exporters[cfgIdx].Type == utils.MetaHTTPjsonMap { - httpJSON.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, + switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type { + case utils.MetaHTTPjsonMap: + pstrJSON.poster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts) + case utils.MetaAMQPjsonMap: + pstrJSON.poster = engine.NewAMQPPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, + cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts) + case utils.MetaAMQPV1jsonMap: + pstrJSON.poster = engine.NewAMQPv1Poster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, + cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts) + case utils.MetaSQSjsonMap: + pstrJSON.poster = engine.NewSQSPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, + cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts) + case utils.MetaKafkajsonMap: + pstrJSON.poster = engine.NewKafkaPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, + cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts) + case utils.MetaS3jsonMap: + pstrJSON.poster = engine.NewS3Poster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, + cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts) } return } -// HTTPJsonMapEe implements EventExporter interface for .csv files -type HTTPJsonMapEe struct { - id string - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - filterS *engine.FilterS - httpPoster *engine.HTTPPoster - dc utils.MapStorage +// PosterJSONMapEE implements EventExporter interface for .csv files +type PosterJSONMapEE struct { + id string + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + filterS *engine.FilterS + poster engine.Poster + dc utils.MapStorage sync.RWMutex } // ID returns the identificator of this exporter -func (httpJson *HTTPJsonMapEe) ID() string { - return httpJson.id +func (pstrEE *PosterJSONMapEE) ID() string { + return pstrEE.id } // OnEvicted implements EventExporter, doing the cleanup before exit -func (httpJson *HTTPJsonMapEe) OnEvicted(string, interface{}) { +func (pstrEE *PosterJSONMapEE) OnEvicted(string, interface{}) { + pstrEE.poster.Close() return } // ExportEvent implements EventExporter -func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { - httpJson.Lock() +func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) { + pstrEE.Lock() defer func() { if err != nil { - httpJson.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + pstrEE.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) } else { - httpJson.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + pstrEE.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) } - httpJson.Unlock() + pstrEE.Unlock() }() - httpJson.dc[utils.NumberOfEvents] = httpJson.dc[utils.NumberOfEvents].(int64) + 1 + pstrEE.dc[utils.NumberOfEvents] = pstrEE.dc[utils.NumberOfEvents].(int64) + 1 valMp := make(map[string]string) - eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), httpJson.dc, - cgrEv.Tenant, httpJson.cgrCfg.GeneralCfg().DefaultTimezone, httpJson.filterS) + eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), pstrEE.dc, + cgrEv.Tenant, pstrEE.cgrCfg.GeneralCfg().DefaultTimezone, pstrEE.filterS) - if err = eeReq.SetFields(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ContentFields()); err != nil { + if err = eeReq.SetFields(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ContentFields()); err != nil { return } for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { @@ -104,44 +121,21 @@ func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { } valMp[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data) } - updateEEMetrics(httpJson.dc, cgrEv.Event, httpJson.cgrCfg.GeneralCfg().DefaultTimezone) + updateEEMetrics(pstrEE.dc, cgrEv.Event, pstrEE.cgrCfg.GeneralCfg().DefaultTimezone) cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID()) runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault) var body []byte if body, err = json.Marshal(valMp); err != nil { return } - err = httpJson.post(body, utils.ConcatenatedKey(cgrID, runID)) - return -} - -func (httpJson *HTTPJsonMapEe) post(body []byte, key string) (err error) { - switch httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Type { - case utils.MetaHTTPjsonMap: - err = httpJson.httpPoster.Post(body, utils.EmptyString) - case utils.MetaAMQPjsonMap: - err = engine.PostersCache.PostAMQP(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, - httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body) - case utils.MetaAMQPV1jsonMap: - err = engine.PostersCache.PostAMQPv1(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, - httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body) - case utils.MetaSQSjsonMap: - err = engine.PostersCache.PostSQS(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, - httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body) - case utils.MetaKafkajsonMap: - err = engine.PostersCache.PostKafka(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, - httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body, key) - case utils.MetaS3jsonMap: - err = engine.PostersCache.PostS3(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, - httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body, key) - } - if err != nil && httpJson.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE { - engine.AddFailedPost(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, - httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Type, utils.EventExporterS, body) + if err = pstrEE.poster.Post(body, utils.ConcatenatedKey(cgrID, runID)); err != nil && + pstrEE.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE { + engine.AddFailedPost(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ExportPath, + pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Type, utils.EventExporterS, body) } return } -func (httpJson *HTTPJsonMapEe) GetMetrics() utils.MapStorage { - return httpJson.dc.Clone() +func (pstrEE *PosterJSONMapEE) GetMetrics() utils.MapStorage { + return pstrEE.dc.Clone() } diff --git a/ees/httppost.go b/ees/httppost.go index ab5ade3e9..b3b12fa3b 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -74,7 +74,6 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { }() httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int64) + 1 - var body interface{} urlVals := url.Values{} req := utils.MapStorage{} for k, v := range cgrEv.Event { @@ -101,11 +100,10 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { urlVals.Set(strings.Join(itm.Path, utils.NestingSep), utils.IfaceAsString(itm.Data)) } updateEEMetrics(httpPost.dc, cgrEv.Event, httpPost.cgrCfg.GeneralCfg().DefaultTimezone) - body = urlVals - if err = httpPost.httpPoster.Post(body, utils.EmptyString); err != nil && + if err = httpPost.httpPoster.PostValues(urlVals); err != nil && httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE { engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath, - httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, body) + httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, urlVals) } return } diff --git a/engine/action.go b/engine/action.go index d81d598f7..b20adacca 100644 --- a/engine/action.go +++ b/engine/action.go @@ -460,7 +460,7 @@ func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error { if err != nil { return err } - err = pstr.Post(body, utils.EmptyString) + err = pstr.PostValues(body) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) err = nil @@ -481,7 +481,7 @@ func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) er return err } go func() { - err := pstr.Post(body, utils.EmptyString) + err := pstr.PostValues(body) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) } @@ -1040,7 +1040,7 @@ func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error if err != nil { return err } - err = pstr.Post(body, utils.EmptyString) + err = pstr.PostValues(body) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) err = nil diff --git a/engine/libcdre.go b/engine/libcdre.go index 644d78b03..a3a7fed35 100644 --- a/engine/libcdre.go +++ b/engine/libcdre.go @@ -155,7 +155,7 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export return expEv, err } for _, ev := range expEv.Events { - err = pstr.Post(ev, utils.EmptyString) + err = pstr.PostValues(ev) if err != nil { failedEvents.AddEvent(ev) } diff --git a/engine/poster.go b/engine/poster.go index 7ba9bdf94..687d32fe8 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -19,8 +19,6 @@ along with this program. If not, see package engine import ( - "net/url" - "strings" "sync" ) @@ -63,20 +61,6 @@ type Poster interface { Close() } -func parseURL(dialURL string) (URL string, qID string, err error) { - u, err := url.Parse(dialURL) - if err != nil { - return "", "", err - } - qry := u.Query() - URL = strings.Split(dialURL, "?")[0] - qID = DefaultQueueID - if vals, has := qry[QueueID]; has && len(vals) != 0 { - qID = vals[0] - } - return -} - // Close closes all cached posters func (pc *PosterCache) Close() { for _, v := range pc.amqpCache { @@ -95,111 +79,81 @@ func (pc *PosterCache) Close() { // GetAMQPPoster creates a new poster only if not already cached // uses dialURL as cache key -func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int) (pstr Poster, err error) { +func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int) (pstr Poster) { pc.Lock() defer pc.Unlock() if _, hasIt := pc.amqpCache[dialURL]; !hasIt { - if pstr, err = NewAMQPPoster(dialURL, attempts); err != nil { - return nil, err - } + pstr = NewAMQPPoster(dialURL, attempts, nil) pc.amqpCache[dialURL] = pstr } - return pc.amqpCache[dialURL], nil + return pc.amqpCache[dialURL] } // GetAMQPv1Poster creates a new poster only if not already cached -func (pc *PosterCache) GetAMQPv1Poster(dialURL string, attempts int) (pstr Poster, err error) { +func (pc *PosterCache) GetAMQPv1Poster(dialURL string, attempts int) (pstr Poster) { pc.Lock() defer pc.Unlock() if _, hasIt := pc.amqpv1Cache[dialURL]; !hasIt { - if pstr, err = NewAMQPv1Poster(dialURL, attempts); err != nil { - return nil, err - } + pstr = NewAMQPv1Poster(dialURL, attempts, nil) pc.amqpv1Cache[dialURL] = pstr } - return pc.amqpv1Cache[dialURL], nil + return pc.amqpv1Cache[dialURL] } // GetSQSPoster creates a new poster only if not already cached -func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int) (pstr Poster, err error) { +func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int) (pstr Poster) { pc.Lock() defer pc.Unlock() if _, hasIt := pc.sqsCache[dialURL]; !hasIt { - if pstr, err = NewSQSPoster(dialURL, attempts); err != nil { - return nil, err - } + pstr = NewSQSPoster(dialURL, attempts, nil) pc.sqsCache[dialURL] = pstr } - return pc.sqsCache[dialURL], nil + return pc.sqsCache[dialURL] } // GetKafkaPoster creates a new poster only if not already cached -func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int) (pstr Poster, err error) { +func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int) (pstr Poster) { pc.Lock() defer pc.Unlock() if _, hasIt := pc.kafkaCache[dialURL]; !hasIt { - if pstr, err = NewKafkaPoster(dialURL, attempts); err != nil { - return nil, err - } + pstr = NewKafkaPoster(dialURL, attempts, nil) pc.kafkaCache[dialURL] = pstr } - return pc.kafkaCache[dialURL], nil + return pc.kafkaCache[dialURL] } // GetS3Poster creates a new poster only if not already cached -func (pc *PosterCache) GetS3Poster(dialURL string, attempts int) (pstr Poster, err error) { +func (pc *PosterCache) GetS3Poster(dialURL string, attempts int) (pstr Poster) { pc.Lock() defer pc.Unlock() if _, hasIt := pc.s3Cache[dialURL]; !hasIt { - if pstr, err = NewS3Poster(dialURL, attempts); err != nil { - return nil, err - } + pstr = NewS3Poster(dialURL, attempts, nil) pc.s3Cache[dialURL] = pstr } - return pc.s3Cache[dialURL], nil + return pc.s3Cache[dialURL] } func (pc *PosterCache) PostAMQP(dialURL string, attempts int, content []byte) error { - amqpPoster, err := pc.GetAMQPPoster(dialURL, attempts) - if err != nil { - return err - } - return amqpPoster.Post(content, "") + return pc.GetAMQPPoster(dialURL, attempts).Post(content, "") } func (pc *PosterCache) PostAMQPv1(dialURL string, attempts int, content []byte) error { - AMQPv1Poster, err := pc.GetAMQPv1Poster(dialURL, attempts) - if err != nil { - return err - } - return AMQPv1Poster.Post(content, "") + return pc.GetAMQPv1Poster(dialURL, attempts).Post(content, "") } func (pc *PosterCache) PostSQS(dialURL string, attempts int, content []byte) error { - sqsPoster, err := pc.GetSQSPoster(dialURL, attempts) - if err != nil { - return err - } - return sqsPoster.Post(content, "") + return pc.GetSQSPoster(dialURL, attempts).Post(content, "") } func (pc *PosterCache) PostKafka(dialURL string, attempts int, content []byte, key string) error { - kafkaPoster, err := pc.GetKafkaPoster(dialURL, attempts) - if err != nil { - return err - } - return kafkaPoster.Post(content, key) + return pc.GetKafkaPoster(dialURL, attempts).Post(content, key) } func (pc *PosterCache) PostS3(dialURL string, attempts int, content []byte, key string) error { - sqsPoster, err := pc.GetS3Poster(dialURL, attempts) - if err != nil { - return err - } - return sqsPoster.Post(content, key) + return pc.GetS3Poster(dialURL, attempts).Post(content, key) } diff --git a/engine/poster_test.go b/engine/poster_test.go index b0573f35f..551081038 100644 --- a/engine/poster_test.go +++ b/engine/poster_test.go @@ -35,7 +35,7 @@ func TestAMQPPosterParseURL(t *testing.T) { routingKey: "CGRCDR", } dialURL := "amqp://guest:guest@localhost:5672/?queue_id=q1&exchange=E1&routing_key=CGRCDR&heartbeat=5&exchange_type=fanout" - if err := amqp.parseURL(dialURL); err != nil { + if err := amqp.parseOpts(dialURL); err != nil { t.Error(err) } else if !reflect.DeepEqual(expected, amqp) { t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(expected), utils.ToJSON(amqp)) @@ -49,7 +49,7 @@ func TestKafkaParseURL(t *testing.T) { topic: "cdr_billing", attempts: 10, } - if kfk, err := NewKafkaPoster(u, 10); err != nil { + if kfk, err := NewKafkaPoster(u, 10, nil); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(exp, kfk) { t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk)) @@ -60,7 +60,7 @@ func TestKafkaParseURL(t *testing.T) { topic: "cdr_billing", attempts: 10, } - if kfk, err := NewKafkaPoster(u, 10); err != nil { + if kfk, err := NewKafkaPoster(u, 10, nil); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(exp, kfk) { t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk)) diff --git a/engine/pstr_amqp.go b/engine/pstr_amqp.go index 5eab8167c..e6c9b8579 100644 --- a/engine/pstr_amqp.go +++ b/engine/pstr_amqp.go @@ -20,8 +20,6 @@ package engine import ( "fmt" - "net/url" - "strings" "sync" "time" @@ -34,14 +32,13 @@ var AMQPPosibleQuery = []string{"cacertfile", "certfile", "keyfile", "verify", " // NewAMQPPoster creates a new amqp poster // "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" -func NewAMQPPoster(dialURL string, attempts int) (*AMQPPoster, error) { +func NewAMQPPoster(dialURL string, attempts int, opts map[string]interface{}) *AMQPPoster { amqp := &AMQPPoster{ attempts: attempts, + dialURL: dialURL, } - if err := amqp.parseURL(dialURL); err != nil { - return nil, err - } - return amqp, nil + amqp.parseOpts(opts) + return amqp } // AMQPPoster used to post cdrs to amqp @@ -56,35 +53,23 @@ type AMQPPoster struct { conn *amqp.Connection } -func (pstr *AMQPPoster) parseURL(dialURL string) error { - u, err := url.Parse(dialURL) - if err != nil { - return err - } - qry := u.Query() - q := url.Values{} - for _, key := range AMQPPosibleQuery { - if vals, has := qry[key]; has && len(vals) != 0 { - q.Add(key, vals[0]) - } - } - pstr.dialURL = strings.Split(dialURL, "?")[0] + "?" + q.Encode() +func (pstr *AMQPPoster) parseOpts(dialURL map[string]interface{}) { pstr.queueID = DefaultQueueID pstr.routingKey = DefaultQueueID - if vals, has := qry[QueueID]; has && len(vals) != 0 { - pstr.queueID = vals[0] + if vals, has := dialURL[QueueID]; has { + pstr.queueID = utils.IfaceAsString(vals) } - if vals, has := qry[RoutingKey]; has && len(vals) != 0 { - pstr.routingKey = vals[0] + if vals, has := dialURL[RoutingKey]; has { + pstr.routingKey = utils.IfaceAsString(vals) } - if vals, has := qry[Exchange]; has && len(vals) != 0 { - pstr.exchange = vals[0] + if vals, has := dialURL[Exchange]; has { + pstr.exchange = utils.IfaceAsString(vals) pstr.exchangeType = DefaultExchangeType } - if vals, has := qry[ExchangeType]; has && len(vals) != 0 { - pstr.exchangeType = vals[0] + if vals, has := dialURL[ExchangeType]; has { + pstr.exchangeType = utils.IfaceAsString(vals) } - return nil + return } // Post is the method being called when we need to post anything in the queue diff --git a/engine/pstr_amqpv1.go b/engine/pstr_amqpv1.go index 723b2ca43..5cf0d9d42 100644 --- a/engine/pstr_amqpv1.go +++ b/engine/pstr_amqpv1.go @@ -30,16 +30,16 @@ import ( ) // NewAMQPv1Poster creates a poster for amqpv1 -func NewAMQPv1Poster(dialURL string, attempts int) (Poster, error) { - URL, qID, err := parseURL(dialURL) - if err != nil { - return nil, err - } - return &AMQPv1Poster{ - dialURL: URL, - queueID: "/" + qID, +func NewAMQPv1Poster(dialURL string, attempts int, opts map[string]interface{}) Poster { + pstr := &AMQPv1Poster{ + dialURL: dialURL, + queueID: "/" + DefaultQueueID, attempts: attempts, - }, nil + } + if vals, has := opts[QueueID]; has { + pstr.queueID = "/" + utils.IfaceAsString(vals) + } + return pstr } // AMQPv1Poster a poster for amqpv1 diff --git a/engine/pstr_http.go b/engine/pstr_http.go index 5ba21485f..06d34a478 100644 --- a/engine/pstr_http.go +++ b/engine/pstr_http.go @@ -83,19 +83,27 @@ type HTTPPoster struct { attempts int } -// Post will post the event -func (pstr *HTTPPoster) Post(content interface{}, key string) (err error) { +// PostValues will post the event +func (pstr *HTTPPoster) PostValues(content interface{}) (err error) { _, err = pstr.GetResponse(content) return } +// Post will post the event +func (pstr *HTTPPoster) Post(content []byte, _ string) (err error) { + _, err = pstr.GetResponse(content) + return +} + +// Close only yo implement the Poster interface +func (*HTTPPoster) Close() {} + // GetResponse will post the event and return the response func (pstr *HTTPPoster) GetResponse(content interface{}) (respBody []byte, err error) { var body []byte // Used to write in file and send over http var urlVals url.Values // Used when posting form if pstr.contentType == utils.CONTENT_FORM { urlVals = content.(url.Values) - body = []byte(urlVals.Encode()) } else { body = content.([]byte) } diff --git a/engine/pstr_kafka.go b/engine/pstr_kafka.go index 233994311..59b2fec93 100644 --- a/engine/pstr_kafka.go +++ b/engine/pstr_kafka.go @@ -19,8 +19,6 @@ package engine import ( "context" - "net/url" - "strings" "sync" "github.com/cgrates/cgrates/utils" @@ -28,14 +26,16 @@ import ( ) // NewKafkaPoster creates a kafka poster -func NewKafkaPoster(dialURL string, attempts int) (*KafkaPoster, error) { +func NewKafkaPoster(dialURL string, attempts int, opts map[string]interface{}) *KafkaPoster { kfkPstr := &KafkaPoster{ + dialURL: dialURL, attempts: attempts, + topic: DefaultQueueID, } - if err := kfkPstr.parseURL(dialURL); err != nil { - return nil, err + if vals, has := opts[utils.KafkaTopic]; has { + kfkPstr.topic = utils.IfaceAsString(vals) } - return kfkPstr, nil + return kfkPstr } // KafkaPoster is a kafka poster @@ -47,26 +47,6 @@ type KafkaPoster struct { writer *kafka.Writer } -func (pstr *KafkaPoster) parseURL(dialURL string) error { - pstr.topic = DefaultQueueID - i := strings.IndexByte(dialURL, '?') - if i < 0 { - pstr.dialURL = dialURL - return nil - } - pstr.dialURL = dialURL[:i] - rawQuery := dialURL[i+1:] - qry, err := url.ParseQuery(rawQuery) - if err != nil { - return err - } - pstr.dialURL = strings.Split(dialURL, "?")[0] - if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 { - pstr.topic = vals[0] - } - return nil -} - // Post is the method being called when we need to post anything in the queue // the optional chn will permits channel caching func (pstr *KafkaPoster) Post(content []byte, key string) (err error) { diff --git a/engine/pstr_s3.go b/engine/pstr_s3.go index 9703a5f43..c09850a62 100644 --- a/engine/pstr_s3.go +++ b/engine/pstr_s3.go @@ -21,7 +21,6 @@ package engine import ( "bytes" "fmt" - "strings" "sync" "time" @@ -33,12 +32,13 @@ import ( ) // NewS3Poster creates a s3 poster -func NewS3Poster(dialURL string, attempts int) (Poster, error) { +func NewS3Poster(dialURL string, attempts int, opts map[string]interface{}) Poster { pstr := &S3Poster{ + dialURL: dialURL, attempts: attempts, } - pstr.parseURL(dialURL) - return pstr, nil + pstr.parseOpts(opts) + return pstr } // S3Poster is a s3 poster @@ -58,29 +58,25 @@ type S3Poster struct { // Close for Poster interface func (pstr *S3Poster) Close() {} -func (pstr *S3Poster) parseURL(dialURL string) { - qry := utils.GetUrlRawArguments(dialURL) - - pstr.dialURL = strings.Split(dialURL, "?")[0] - pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint +func (pstr *S3Poster) parseOpts(opts map[string]interface{}) { pstr.queueID = DefaultQueueID - if val, has := qry[QueueID]; has { - pstr.queueID = val + if val, has := opts[QueueID]; has { + pstr.queueID = utils.IfaceAsString(val) } - if val, has := qry[folderPath]; has { - pstr.folderPath = val + if val, has := opts[folderPath]; has { + pstr.folderPath = utils.IfaceAsString(val) } - if val, has := qry[utils.AWSRegion]; has { - pstr.awsRegion = val + if val, has := opts[utils.AWSRegion]; has { + pstr.awsRegion = utils.IfaceAsString(val) } - if val, has := qry[utils.AWSKey]; has { - pstr.awsID = val + if val, has := opts[utils.AWSKey]; has { + pstr.awsID = utils.IfaceAsString(val) } - if val, has := qry[utils.AWSSecret]; has { - pstr.awsKey = val + if val, has := opts[utils.AWSSecret]; has { + pstr.awsKey = utils.IfaceAsString(val) } - if val, has := qry[awsToken]; has { - pstr.awsToken = val + if val, has := opts[awsToken]; has { + pstr.awsToken = utils.IfaceAsString(val) } } diff --git a/engine/pstr_sqs.go b/engine/pstr_sqs.go index bf4b175e5..09a130c98 100644 --- a/engine/pstr_sqs.go +++ b/engine/pstr_sqs.go @@ -20,7 +20,6 @@ package engine import ( "fmt" - "strings" "sync" "time" @@ -33,12 +32,12 @@ import ( ) // NewSQSPoster creates a poster for sqs -func NewSQSPoster(dialURL string, attempts int) (Poster, error) { +func NewSQSPoster(dialURL string, attempts int, opts map[string]interface{}) Poster { pstr := &SQSPoster{ attempts: attempts, } - pstr.parseURL(dialURL) - return pstr, nil + pstr.parseOpts(opts) + return pstr } // SQSPoster is a poster for sqs @@ -59,26 +58,22 @@ type SQSPoster struct { // Close for Poster interface func (pstr *SQSPoster) Close() {} -func (pstr *SQSPoster) parseURL(dialURL string) { - qry := utils.GetUrlRawArguments(dialURL) - - pstr.dialURL = strings.Split(dialURL, "?")[0] - pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint +func (pstr *SQSPoster) parseOpts(opts map[string]interface{}) { pstr.queueID = DefaultQueueID - if val, has := qry[QueueID]; has { - pstr.queueID = val + if val, has := opts[QueueID]; has { + pstr.queueID = utils.IfaceAsString(val) } - if val, has := qry[utils.AWSRegion]; has { - pstr.awsRegion = val + if val, has := opts[utils.AWSRegion]; has { + pstr.awsRegion = utils.IfaceAsString(val) } - if val, has := qry[utils.AWSKey]; has { - pstr.awsID = val + if val, has := opts[utils.AWSKey]; has { + pstr.awsID = utils.IfaceAsString(val) } - if val, has := qry[utils.AWSSecret]; has { - pstr.awsKey = val + if val, has := opts[utils.AWSSecret]; has { + pstr.awsKey = utils.IfaceAsString(val) } - if val, has := qry[awsToken]; has { - pstr.awsToken = val + if val, has := opts[awsToken]; has { + pstr.awsToken = utils.IfaceAsString(val) } pstr.getQueueURL() } diff --git a/engine/z_poster_it_test.go b/engine/z_poster_it_test.go index a3b2a81e8..d1bce5301 100644 --- a/engine/z_poster_it_test.go +++ b/engine/z_poster_it_test.go @@ -72,7 +72,7 @@ func TestHttpJsonPoster(t *testing.T) { if err != nil { t.Error(err) } - if err = pstr.Post(jsn, utils.EmptyString); err == nil { + if err = pstr.PostValues(jsn); err == nil { t.Error("Expected error") } AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn) @@ -105,7 +105,7 @@ func TestHttpBytesPoster(t *testing.T) { if err != nil { t.Error(err) } - if err = pstr.Post(content, utils.EmptyString); err == nil { + if err = pstr.PostValues(content); err == nil { t.Error("Expected error") } AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content)