From 1302ffb8feae6ebd00774adb1c8b111c97a03e8e Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 4 Jun 2020 16:16:08 +0300 Subject: [PATCH] Add Post support for HTTP and other types (amqp;sqs;etc...) --- config/config.go | 18 ++++- ees/ee.go | 4 + ees/httpjsonmap.go | 164 +++++++++++++++++++++++++++++++++++++++ ees/httppost.go | 133 +++++++++++++++++++++++++++++++ engine/action.go | 16 ++-- engine/cdre.go | 2 +- engine/libcdre.go | 2 +- engine/libcdre_test.go | 6 +- engine/poster_it_test.go | 4 +- 9 files changed, 333 insertions(+), 16 deletions(-) create mode 100644 ees/httpjsonmap.go create mode 100644 ees/httppost.go diff --git a/config/config.go b/config/config.go index f34189bad..f94a2c728 100755 --- a/config/config.go +++ b/config/config.go @@ -318,7 +318,9 @@ var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL, utils.MetaFileFWV, utils.MetaPartialCSV, utils.MetaFlatstore, utils.MetaJSON, utils.META_NONE}) -var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.META_NONE, utils.MetaFileFWV}) +var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.META_NONE, utils.MetaFileFWV, + utils.MetaHTTPPost, utils.MetaHTTPjson, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, + utils.MetaKafkajsonMap, utils.MetaS3jsonMap}) func (cfg *CGRConfig) LazySanityCheck() { for _, cdrePrfl := range cfg.cdrsCfg.OnlineCDRExports { @@ -335,6 +337,20 @@ func (cfg *CGRConfig) LazySanityCheck() { } } } + for _, exporter := range cfg.eesCfg.Exporters { + if exporter.Type == utils.MetaS3jsonMap || exporter.Type == utils.MetaSQSjsonMap { + poster := utils.SQSPoster + if exporter.Type == utils.MetaS3jsonMap { + poster = utils.S3Poster + } + argsMap := utils.GetUrlRawArguments(exporter.ExportPath) + for _, arg := range []string{utils.AWSRegion, utils.AWSKey, utils.AWSSecret} { + if _, has := argsMap[arg]; !has { + utils.Logger.Warning(fmt.Sprintf("<%s> No %s present for AWS for exporter with ID: <%s>.", poster, arg, exporter.ID)) + } + } + } + } } // Loads from json configuration object, will be used for defaults, config from file and reload, might need lock diff --git a/ees/ee.go b/ees/ee.go index 7889df5aa..aa3599ec0 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -39,6 +39,10 @@ func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt return NewFileCSVee(cgrCfg, cfgIdx, filterS, dc) case utils.MetaFileFWV: return NewFileFWVee(cgrCfg, cfgIdx, filterS, dc) + case utils.MetaHTTPPost: + return NewHTTPPostEe(cgrCfg, cfgIdx, filterS, dc) + case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap, utils.MetaS3jsonMap: + return NewHTTPJsonMapEe(cgrCfg, cfgIdx, filterS, dc) default: return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type) } diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go new file mode 100644 index 000000000..f2b6bd491 --- /dev/null +++ b/ees/httpjsonmap.go @@ -0,0 +1,164 @@ +/* +Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "encoding/json" + "sync" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, + dc utils.MapStorage) (httpJson *HTTPJsonMapEe, err error) { + dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID + httpJson = &HTTPJsonMapEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} + if cgrCfg.EEsCfg().Exporters[cfgIdx].Type == utils.MetaHTTPjsonMap { + httpJson.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, + cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, + utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts) + } + + return +} + +// FileCSVee implements EventExporter interface for .csv files +type HTTPJsonMapEe struct { + id string + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + filterS *engine.FilterS + httpPoster *engine.HTTPPoster + sync.RWMutex + dc utils.MapStorage +} + +// ID returns the identificator of this exporter +func (httpJson *HTTPJsonMapEe) ID() string { + return httpJson.id +} + +// OnEvicted implements EventExporter, doing the cleanup before exit +func (httpJson *HTTPJsonMapEe) OnEvicted(_ string, _ interface{}) { + return +} + +// ExportEvent implements EventExporter +func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { + httpJson.Lock() + defer httpJson.Unlock() + + httpJson.dc[utils.NumberOfEvents] = httpJson.dc[utils.NumberOfEvents].(int) + 1 + + var body interface{} + valMp := make(map[string]string) + req := utils.MapStorage{} + for k, v := range cgrEv.Event { + req[k] = v + } + eeReq := NewEventExporterRequest(req, httpJson.dc, cgrEv.Tenant, httpJson.cgrCfg.GeneralCfg().DefaultTimezone, + httpJson.filterS) + + if err = eeReq.SetFields(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ContentFields()); err != nil { + httpJson.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + return + } + for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { + var strVal string + if strVal, err = eeReq.cnt.FieldAsString(el.Value.Slice()); err != nil { + return + } + valMp[el.Value.Slice()[1]] = strVal + } + if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, httpJson.cgrCfg.GeneralCfg().DefaultTimezone); err == nil { + if httpJson.dc[utils.FirstEventATime].(time.Time).IsZero() || httpJson.dc[utils.FirstEventATime].(time.Time).Before(aTime) { + httpJson.dc[utils.FirstEventATime] = aTime + } + if aTime.After(httpJson.dc[utils.LastEventATime].(time.Time)) { + httpJson.dc[utils.LastEventATime] = aTime + } + } + if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil { + if httpJson.dc[utils.FirstExpOrderID].(int64) > oID || httpJson.dc[utils.FirstExpOrderID].(int64) == 0 { + httpJson.dc[utils.FirstExpOrderID] = oID + } + if httpJson.dc[utils.LastExpOrderID].(int64) < oID { + httpJson.dc[utils.LastExpOrderID] = oID + } + } + if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil { + httpJson.dc[utils.TotalCost] = httpJson.dc[utils.TotalCost].(float64) + cost + } + if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil { + if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil { + switch tor { + case utils.VOICE: + httpJson.dc[utils.TotalDuration] = httpJson.dc[utils.TotalDuration].(time.Duration) + usage + case utils.SMS: + httpJson.dc[utils.TotalSMSUsage] = httpJson.dc[utils.TotalSMSUsage].(time.Duration) + usage + case utils.MMS: + httpJson.dc[utils.TotalMMSUsage] = httpJson.dc[utils.TotalMMSUsage].(time.Duration) + usage + case utils.GENERIC: + httpJson.dc[utils.TotalGenericUsage] = httpJson.dc[utils.TotalGenericUsage].(time.Duration) + usage + case utils.DATA: + httpJson.dc[utils.TotalDataUsage] = httpJson.dc[utils.TotalDataUsage].(time.Duration) + usage + } + } + } + cgrID := utils.GenUUID() + cgrID, err = cgrEv.FieldAsString(utils.CGRID) + var runID string + runID, err = cgrEv.FieldAsString(utils.RunID) + httpJson.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + if body, err = json.Marshal(valMp); err != nil { + return + } + return httpJson.post(body, utils.ConcatenatedKey(cgrID, runID)) +} + +func (httpJson *HTTPJsonMapEe) post(body interface{}, key string) (err error) { + switch httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Type { + case utils.MetaHTTPjsonMap: + err = httpJson.httpPoster.Post(body, utils.EmptyString) + case utils.MetaAMQPjsonMap: + err = engine.PostersCache.PostAMQP(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, + httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte)) + case utils.MetaAMQPV1jsonMap: + err = engine.PostersCache.PostAMQPv1(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, + httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte)) + case utils.MetaSQSjsonMap: + err = engine.PostersCache.PostSQS(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, + httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte)) + case utils.MetaKafkajsonMap: + err = engine.PostersCache.PostKafka(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, + httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte), key) + case utils.MetaS3jsonMap: + err = engine.PostersCache.PostS3(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, + httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte), key) + } + if err != nil && httpJson.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE { + engine.AddFailedPost(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, + httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Type, utils.EventExporterS, body) + } + return +} diff --git a/ees/httppost.go b/ees/httppost.go new file mode 100644 index 000000000..c945dbc4d --- /dev/null +++ b/ees/httppost.go @@ -0,0 +1,133 @@ +/* +Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "net/url" + "sync" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func NewHTTPPostEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, + dc utils.MapStorage) (httpPost *HTTPPost, err error) { + dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID + httpPost = &HTTPPost{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} + httpPost.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, + cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, + utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts) + return +} + +// FileCSVee implements EventExporter interface for .csv files +type HTTPPost struct { + id string + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + filterS *engine.FilterS + httpPoster *engine.HTTPPoster + sync.RWMutex + dc utils.MapStorage +} + +// ID returns the identificator of this exporter +func (httpPost *HTTPPost) ID() string { + return httpPost.id +} + +// OnEvicted implements EventExporter, doing the cleanup before exit +func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) { + return +} + +// ExportEvent implements EventExporter +func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { + httpPost.Lock() + defer httpPost.Unlock() + + httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int) + 1 + + var body interface{} + urlVals := url.Values{} + req := utils.MapStorage{} + for k, v := range cgrEv.Event { + req[k] = v + } + eeReq := NewEventExporterRequest(req, httpPost.dc, cgrEv.Tenant, httpPost.cgrCfg.GeneralCfg().DefaultTimezone, + httpPost.filterS) + + if err = eeReq.SetFields(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ContentFields()); err != nil { + httpPost.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + return + } + for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { + var strVal string + if strVal, err = eeReq.cnt.FieldAsString(el.Value.Slice()); err != nil { + return + } + urlVals.Set(el.Value.Slice()[1], strVal) + } + if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, httpPost.cgrCfg.GeneralCfg().DefaultTimezone); err == nil { + if httpPost.dc[utils.FirstEventATime].(time.Time).IsZero() || httpPost.dc[utils.FirstEventATime].(time.Time).Before(aTime) { + httpPost.dc[utils.FirstEventATime] = aTime + } + if aTime.After(httpPost.dc[utils.LastEventATime].(time.Time)) { + httpPost.dc[utils.LastEventATime] = aTime + } + } + if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil { + if httpPost.dc[utils.FirstExpOrderID].(int64) > oID || httpPost.dc[utils.FirstExpOrderID].(int64) == 0 { + httpPost.dc[utils.FirstExpOrderID] = oID + } + if httpPost.dc[utils.LastExpOrderID].(int64) < oID { + httpPost.dc[utils.LastExpOrderID] = oID + } + } + if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil { + httpPost.dc[utils.TotalCost] = httpPost.dc[utils.TotalCost].(float64) + cost + } + if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil { + if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil { + switch tor { + case utils.VOICE: + httpPost.dc[utils.TotalDuration] = httpPost.dc[utils.TotalDuration].(time.Duration) + usage + case utils.SMS: + httpPost.dc[utils.TotalSMSUsage] = httpPost.dc[utils.TotalSMSUsage].(time.Duration) + usage + case utils.MMS: + httpPost.dc[utils.TotalMMSUsage] = httpPost.dc[utils.TotalMMSUsage].(time.Duration) + usage + case utils.GENERIC: + httpPost.dc[utils.TotalGenericUsage] = httpPost.dc[utils.TotalGenericUsage].(time.Duration) + usage + case utils.DATA: + httpPost.dc[utils.TotalDataUsage] = httpPost.dc[utils.TotalDataUsage].(time.Duration) + usage + } + } + } + httpPost.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + body = urlVals + if err = httpPost.httpPoster.Post(body, utils.EmptyString); err != nil && + httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE { + engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath, + httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, body) + } + return +} diff --git a/engine/action.go b/engine/action.go index 3b8cc95e2..98e4a5ace 100644 --- a/engine/action.go +++ b/engine/action.go @@ -388,7 +388,7 @@ func sendAMQP(ub *Account, a *Action, acs Actions, extraData interface{}) error } err = PostersCache.PostAMQP(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - addFailedPost(a.ExtraParameters, utils.MetaAMQPjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) + AddFailedPost(a.ExtraParameters, utils.MetaAMQPjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) err = nil } return err @@ -401,7 +401,7 @@ func sendAWS(ub *Account, a *Action, acs Actions, extraData interface{}) error { } err = PostersCache.PostAMQPv1(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - addFailedPost(a.ExtraParameters, utils.MetaAMQPV1jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) + AddFailedPost(a.ExtraParameters, utils.MetaAMQPV1jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) err = nil } return err @@ -414,7 +414,7 @@ func sendSQS(ub *Account, a *Action, acs Actions, extraData interface{}) error { } err = PostersCache.PostSQS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - addFailedPost(a.ExtraParameters, utils.MetaSQSjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) + AddFailedPost(a.ExtraParameters, utils.MetaSQSjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) err = nil } return err @@ -427,7 +427,7 @@ func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error } err = PostersCache.PostKafka(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix()) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - addFailedPost(a.ExtraParameters, utils.MetaKafkajsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) + AddFailedPost(a.ExtraParameters, utils.MetaKafkajsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) err = nil } return err @@ -440,7 +440,7 @@ func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error { } err = PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix()) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - addFailedPost(a.ExtraParameters, utils.MetaS3jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) + AddFailedPost(a.ExtraParameters, utils.MetaS3jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) err = nil } return err @@ -459,7 +459,7 @@ func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error { } err = pstr.Post(body, utils.EmptyString) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) + AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) err = nil } return err @@ -480,7 +480,7 @@ func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) er go func() { err := pstr.Post(body, utils.EmptyString) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) + AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) } }() return nil @@ -1045,7 +1045,7 @@ func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error } err = pstr.Post(body, utils.EmptyString) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) + AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) err = nil } return err diff --git a/engine/cdre.go b/engine/cdre.go index 3fc823857..dc3035a0b 100644 --- a/engine/cdre.go +++ b/engine/cdre.go @@ -290,7 +290,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) { err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), utils.ConcatenatedKey(cdr.CGRID, cdr.RunID)) } if err != nil && cdre.fallbackPath != utils.META_NONE { - addFailedPost(cdre.exportPath, cdre.exportFormat, utils.CDRPoster, body) + AddFailedPost(cdre.exportPath, cdre.exportFormat, utils.CDRPoster, body) } return } diff --git a/engine/libcdre.go b/engine/libcdre.go index b5baa37aa..644d78b03 100644 --- a/engine/libcdre.go +++ b/engine/libcdre.go @@ -58,7 +58,7 @@ func writeFailedPosts(itmID string, value interface{}) { return } -func addFailedPost(expPath, format, module string, ev interface{}) { +func AddFailedPost(expPath, format, module string, ev interface{}) { key := utils.ConcatenatedKey(expPath, format, module) var failedPost *ExportEvents if x, ok := failedPostCache.Get(key); ok { diff --git a/engine/libcdre_test.go b/engine/libcdre_test.go index 64233359d..86d30d978 100644 --- a/engine/libcdre_test.go +++ b/engine/libcdre_test.go @@ -38,7 +38,7 @@ func TestSetFldPostCacheTTL(t *testing.T) { func TestAddFldPost(t *testing.T) { SetFailedPostCacheTTL(time.Duration(5 * time.Second)) - addFailedPost("path1", "format1", "module1", "1") + AddFailedPost("path1", "format1", "module1", "1") x, ok := failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1")) if !ok { t.Error("Error reading from cache") @@ -60,8 +60,8 @@ func TestAddFldPost(t *testing.T) { if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) } - addFailedPost("path1", "format1", "module1", "2") - addFailedPost("path2", "format2", "module2", "3") + AddFailedPost("path1", "format1", "module1", "2") + AddFailedPost("path2", "format2", "module2", "3") x, ok = failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1")) if !ok { t.Error("Error reading from cache") diff --git a/engine/poster_it_test.go b/engine/poster_it_test.go index 1a5cf9036..d61027b00 100644 --- a/engine/poster_it_test.go +++ b/engine/poster_it_test.go @@ -75,7 +75,7 @@ func TestHttpJsonPoster(t *testing.T) { if err = pstr.Post(jsn, utils.EmptyString); err == nil { t.Error("Expected error") } - addFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn) + AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn) time.Sleep(2) fs, err := filepath.Glob("/tmp/test1*") if err != nil { @@ -108,7 +108,7 @@ func TestHttpBytesPoster(t *testing.T) { if err = pstr.Post(content, utils.EmptyString); err == nil { t.Error("Expected error") } - addFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content) + AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content) time.Sleep(2) fs, err := filepath.Glob("/tmp/test2*") if err != nil {