mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add Post support for HTTP and other types (amqp;sqs;etc...)
This commit is contained in:
committed by
Dan Christian Bogos
parent
d20aecc7d9
commit
1302ffb8fe
@@ -318,7 +318,9 @@ var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV,
|
||||
utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL, utils.MetaFileFWV,
|
||||
utils.MetaPartialCSV, utils.MetaFlatstore, utils.MetaJSON, utils.META_NONE})
|
||||
|
||||
var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.META_NONE, utils.MetaFileFWV})
|
||||
var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.META_NONE, utils.MetaFileFWV,
|
||||
utils.MetaHTTPPost, utils.MetaHTTPjson, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap,
|
||||
utils.MetaKafkajsonMap, utils.MetaS3jsonMap})
|
||||
|
||||
func (cfg *CGRConfig) LazySanityCheck() {
|
||||
for _, cdrePrfl := range cfg.cdrsCfg.OnlineCDRExports {
|
||||
@@ -335,6 +337,20 @@ func (cfg *CGRConfig) LazySanityCheck() {
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, exporter := range cfg.eesCfg.Exporters {
|
||||
if exporter.Type == utils.MetaS3jsonMap || exporter.Type == utils.MetaSQSjsonMap {
|
||||
poster := utils.SQSPoster
|
||||
if exporter.Type == utils.MetaS3jsonMap {
|
||||
poster = utils.S3Poster
|
||||
}
|
||||
argsMap := utils.GetUrlRawArguments(exporter.ExportPath)
|
||||
for _, arg := range []string{utils.AWSRegion, utils.AWSKey, utils.AWSSecret} {
|
||||
if _, has := argsMap[arg]; !has {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> No %s present for AWS for exporter with ID: <%s>.", poster, arg, exporter.ID))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Loads from json configuration object, will be used for defaults, config from file and reload, might need lock
|
||||
|
||||
@@ -39,6 +39,10 @@ func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt
|
||||
return NewFileCSVee(cgrCfg, cfgIdx, filterS, dc)
|
||||
case utils.MetaFileFWV:
|
||||
return NewFileFWVee(cgrCfg, cfgIdx, filterS, dc)
|
||||
case utils.MetaHTTPPost:
|
||||
return NewHTTPPostEe(cgrCfg, cfgIdx, filterS, dc)
|
||||
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap, utils.MetaS3jsonMap:
|
||||
return NewHTTPJsonMapEe(cgrCfg, cfgIdx, filterS, dc)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type)
|
||||
}
|
||||
|
||||
164
ees/httpjsonmap.go
Normal file
164
ees/httpjsonmap.go
Normal file
@@ -0,0 +1,164 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ees
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
|
||||
dc utils.MapStorage) (httpJson *HTTPJsonMapEe, err error) {
|
||||
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
|
||||
httpJson = &HTTPJsonMapEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
|
||||
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
|
||||
if cgrCfg.EEsCfg().Exporters[cfgIdx].Type == utils.MetaHTTPjsonMap {
|
||||
httpJson.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
|
||||
cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
|
||||
utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// FileCSVee implements EventExporter interface for .csv files
|
||||
type HTTPJsonMapEe struct {
|
||||
id string
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
filterS *engine.FilterS
|
||||
httpPoster *engine.HTTPPoster
|
||||
sync.RWMutex
|
||||
dc utils.MapStorage
|
||||
}
|
||||
|
||||
// ID returns the identificator of this exporter
|
||||
func (httpJson *HTTPJsonMapEe) ID() string {
|
||||
return httpJson.id
|
||||
}
|
||||
|
||||
// OnEvicted implements EventExporter, doing the cleanup before exit
|
||||
func (httpJson *HTTPJsonMapEe) OnEvicted(_ string, _ interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
// ExportEvent implements EventExporter
|
||||
func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
httpJson.Lock()
|
||||
defer httpJson.Unlock()
|
||||
|
||||
httpJson.dc[utils.NumberOfEvents] = httpJson.dc[utils.NumberOfEvents].(int) + 1
|
||||
|
||||
var body interface{}
|
||||
valMp := make(map[string]string)
|
||||
req := utils.MapStorage{}
|
||||
for k, v := range cgrEv.Event {
|
||||
req[k] = v
|
||||
}
|
||||
eeReq := NewEventExporterRequest(req, httpJson.dc, cgrEv.Tenant, httpJson.cgrCfg.GeneralCfg().DefaultTimezone,
|
||||
httpJson.filterS)
|
||||
|
||||
if err = eeReq.SetFields(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ContentFields()); err != nil {
|
||||
httpJson.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
|
||||
return
|
||||
}
|
||||
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
|
||||
var strVal string
|
||||
if strVal, err = eeReq.cnt.FieldAsString(el.Value.Slice()); err != nil {
|
||||
return
|
||||
}
|
||||
valMp[el.Value.Slice()[1]] = strVal
|
||||
}
|
||||
if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, httpJson.cgrCfg.GeneralCfg().DefaultTimezone); err == nil {
|
||||
if httpJson.dc[utils.FirstEventATime].(time.Time).IsZero() || httpJson.dc[utils.FirstEventATime].(time.Time).Before(aTime) {
|
||||
httpJson.dc[utils.FirstEventATime] = aTime
|
||||
}
|
||||
if aTime.After(httpJson.dc[utils.LastEventATime].(time.Time)) {
|
||||
httpJson.dc[utils.LastEventATime] = aTime
|
||||
}
|
||||
}
|
||||
if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil {
|
||||
if httpJson.dc[utils.FirstExpOrderID].(int64) > oID || httpJson.dc[utils.FirstExpOrderID].(int64) == 0 {
|
||||
httpJson.dc[utils.FirstExpOrderID] = oID
|
||||
}
|
||||
if httpJson.dc[utils.LastExpOrderID].(int64) < oID {
|
||||
httpJson.dc[utils.LastExpOrderID] = oID
|
||||
}
|
||||
}
|
||||
if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil {
|
||||
httpJson.dc[utils.TotalCost] = httpJson.dc[utils.TotalCost].(float64) + cost
|
||||
}
|
||||
if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil {
|
||||
if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil {
|
||||
switch tor {
|
||||
case utils.VOICE:
|
||||
httpJson.dc[utils.TotalDuration] = httpJson.dc[utils.TotalDuration].(time.Duration) + usage
|
||||
case utils.SMS:
|
||||
httpJson.dc[utils.TotalSMSUsage] = httpJson.dc[utils.TotalSMSUsage].(time.Duration) + usage
|
||||
case utils.MMS:
|
||||
httpJson.dc[utils.TotalMMSUsage] = httpJson.dc[utils.TotalMMSUsage].(time.Duration) + usage
|
||||
case utils.GENERIC:
|
||||
httpJson.dc[utils.TotalGenericUsage] = httpJson.dc[utils.TotalGenericUsage].(time.Duration) + usage
|
||||
case utils.DATA:
|
||||
httpJson.dc[utils.TotalDataUsage] = httpJson.dc[utils.TotalDataUsage].(time.Duration) + usage
|
||||
}
|
||||
}
|
||||
}
|
||||
cgrID := utils.GenUUID()
|
||||
cgrID, err = cgrEv.FieldAsString(utils.CGRID)
|
||||
var runID string
|
||||
runID, err = cgrEv.FieldAsString(utils.RunID)
|
||||
httpJson.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
|
||||
if body, err = json.Marshal(valMp); err != nil {
|
||||
return
|
||||
}
|
||||
return httpJson.post(body, utils.ConcatenatedKey(cgrID, runID))
|
||||
}
|
||||
|
||||
func (httpJson *HTTPJsonMapEe) post(body interface{}, key string) (err error) {
|
||||
switch httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Type {
|
||||
case utils.MetaHTTPjsonMap:
|
||||
err = httpJson.httpPoster.Post(body, utils.EmptyString)
|
||||
case utils.MetaAMQPjsonMap:
|
||||
err = engine.PostersCache.PostAMQP(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
|
||||
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte))
|
||||
case utils.MetaAMQPV1jsonMap:
|
||||
err = engine.PostersCache.PostAMQPv1(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
|
||||
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte))
|
||||
case utils.MetaSQSjsonMap:
|
||||
err = engine.PostersCache.PostSQS(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
|
||||
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte))
|
||||
case utils.MetaKafkajsonMap:
|
||||
err = engine.PostersCache.PostKafka(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
|
||||
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte), key)
|
||||
case utils.MetaS3jsonMap:
|
||||
err = engine.PostersCache.PostS3(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
|
||||
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte), key)
|
||||
}
|
||||
if err != nil && httpJson.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
engine.AddFailedPost(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
|
||||
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Type, utils.EventExporterS, body)
|
||||
}
|
||||
return
|
||||
}
|
||||
133
ees/httppost.go
Normal file
133
ees/httppost.go
Normal file
@@ -0,0 +1,133 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ees
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewHTTPPostEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
|
||||
dc utils.MapStorage) (httpPost *HTTPPost, err error) {
|
||||
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
|
||||
httpPost = &HTTPPost{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
|
||||
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
|
||||
httpPost.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
|
||||
cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
|
||||
utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts)
|
||||
return
|
||||
}
|
||||
|
||||
// FileCSVee implements EventExporter interface for .csv files
|
||||
type HTTPPost struct {
|
||||
id string
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
filterS *engine.FilterS
|
||||
httpPoster *engine.HTTPPoster
|
||||
sync.RWMutex
|
||||
dc utils.MapStorage
|
||||
}
|
||||
|
||||
// ID returns the identificator of this exporter
|
||||
func (httpPost *HTTPPost) ID() string {
|
||||
return httpPost.id
|
||||
}
|
||||
|
||||
// OnEvicted implements EventExporter, doing the cleanup before exit
|
||||
func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
// ExportEvent implements EventExporter
|
||||
func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
httpPost.Lock()
|
||||
defer httpPost.Unlock()
|
||||
|
||||
httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int) + 1
|
||||
|
||||
var body interface{}
|
||||
urlVals := url.Values{}
|
||||
req := utils.MapStorage{}
|
||||
for k, v := range cgrEv.Event {
|
||||
req[k] = v
|
||||
}
|
||||
eeReq := NewEventExporterRequest(req, httpPost.dc, cgrEv.Tenant, httpPost.cgrCfg.GeneralCfg().DefaultTimezone,
|
||||
httpPost.filterS)
|
||||
|
||||
if err = eeReq.SetFields(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ContentFields()); err != nil {
|
||||
httpPost.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
|
||||
return
|
||||
}
|
||||
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
|
||||
var strVal string
|
||||
if strVal, err = eeReq.cnt.FieldAsString(el.Value.Slice()); err != nil {
|
||||
return
|
||||
}
|
||||
urlVals.Set(el.Value.Slice()[1], strVal)
|
||||
}
|
||||
if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, httpPost.cgrCfg.GeneralCfg().DefaultTimezone); err == nil {
|
||||
if httpPost.dc[utils.FirstEventATime].(time.Time).IsZero() || httpPost.dc[utils.FirstEventATime].(time.Time).Before(aTime) {
|
||||
httpPost.dc[utils.FirstEventATime] = aTime
|
||||
}
|
||||
if aTime.After(httpPost.dc[utils.LastEventATime].(time.Time)) {
|
||||
httpPost.dc[utils.LastEventATime] = aTime
|
||||
}
|
||||
}
|
||||
if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil {
|
||||
if httpPost.dc[utils.FirstExpOrderID].(int64) > oID || httpPost.dc[utils.FirstExpOrderID].(int64) == 0 {
|
||||
httpPost.dc[utils.FirstExpOrderID] = oID
|
||||
}
|
||||
if httpPost.dc[utils.LastExpOrderID].(int64) < oID {
|
||||
httpPost.dc[utils.LastExpOrderID] = oID
|
||||
}
|
||||
}
|
||||
if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil {
|
||||
httpPost.dc[utils.TotalCost] = httpPost.dc[utils.TotalCost].(float64) + cost
|
||||
}
|
||||
if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil {
|
||||
if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil {
|
||||
switch tor {
|
||||
case utils.VOICE:
|
||||
httpPost.dc[utils.TotalDuration] = httpPost.dc[utils.TotalDuration].(time.Duration) + usage
|
||||
case utils.SMS:
|
||||
httpPost.dc[utils.TotalSMSUsage] = httpPost.dc[utils.TotalSMSUsage].(time.Duration) + usage
|
||||
case utils.MMS:
|
||||
httpPost.dc[utils.TotalMMSUsage] = httpPost.dc[utils.TotalMMSUsage].(time.Duration) + usage
|
||||
case utils.GENERIC:
|
||||
httpPost.dc[utils.TotalGenericUsage] = httpPost.dc[utils.TotalGenericUsage].(time.Duration) + usage
|
||||
case utils.DATA:
|
||||
httpPost.dc[utils.TotalDataUsage] = httpPost.dc[utils.TotalDataUsage].(time.Duration) + usage
|
||||
}
|
||||
}
|
||||
}
|
||||
httpPost.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
|
||||
body = urlVals
|
||||
if err = httpPost.httpPoster.Post(body, utils.EmptyString); err != nil &&
|
||||
httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath,
|
||||
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, body)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -388,7 +388,7 @@ func sendAMQP(ub *Account, a *Action, acs Actions, extraData interface{}) error
|
||||
}
|
||||
err = PostersCache.PostAMQP(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body)
|
||||
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaAMQPjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
AddFailedPost(a.ExtraParameters, utils.MetaAMQPjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
@@ -401,7 +401,7 @@ func sendAWS(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
}
|
||||
err = PostersCache.PostAMQPv1(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body)
|
||||
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaAMQPV1jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
AddFailedPost(a.ExtraParameters, utils.MetaAMQPV1jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
@@ -414,7 +414,7 @@ func sendSQS(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
}
|
||||
err = PostersCache.PostSQS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body)
|
||||
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaSQSjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
AddFailedPost(a.ExtraParameters, utils.MetaSQSjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
@@ -427,7 +427,7 @@ func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error
|
||||
}
|
||||
err = PostersCache.PostKafka(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix())
|
||||
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaKafkajsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
AddFailedPost(a.ExtraParameters, utils.MetaKafkajsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
@@ -440,7 +440,7 @@ func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
}
|
||||
err = PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix())
|
||||
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaS3jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
AddFailedPost(a.ExtraParameters, utils.MetaS3jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
@@ -459,7 +459,7 @@ func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
}
|
||||
err = pstr.Post(body, utils.EmptyString)
|
||||
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
@@ -480,7 +480,7 @@ func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) er
|
||||
go func() {
|
||||
err := pstr.Post(body, utils.EmptyString)
|
||||
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
@@ -1045,7 +1045,7 @@ func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error
|
||||
}
|
||||
err = pstr.Post(body, utils.EmptyString)
|
||||
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
|
||||
@@ -290,7 +290,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
|
||||
err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), utils.ConcatenatedKey(cdr.CGRID, cdr.RunID))
|
||||
}
|
||||
if err != nil && cdre.fallbackPath != utils.META_NONE {
|
||||
addFailedPost(cdre.exportPath, cdre.exportFormat, utils.CDRPoster, body)
|
||||
AddFailedPost(cdre.exportPath, cdre.exportFormat, utils.CDRPoster, body)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ func writeFailedPosts(itmID string, value interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
func addFailedPost(expPath, format, module string, ev interface{}) {
|
||||
func AddFailedPost(expPath, format, module string, ev interface{}) {
|
||||
key := utils.ConcatenatedKey(expPath, format, module)
|
||||
var failedPost *ExportEvents
|
||||
if x, ok := failedPostCache.Get(key); ok {
|
||||
|
||||
@@ -38,7 +38,7 @@ func TestSetFldPostCacheTTL(t *testing.T) {
|
||||
|
||||
func TestAddFldPost(t *testing.T) {
|
||||
SetFailedPostCacheTTL(time.Duration(5 * time.Second))
|
||||
addFailedPost("path1", "format1", "module1", "1")
|
||||
AddFailedPost("path1", "format1", "module1", "1")
|
||||
x, ok := failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1"))
|
||||
if !ok {
|
||||
t.Error("Error reading from cache")
|
||||
@@ -60,8 +60,8 @@ func TestAddFldPost(t *testing.T) {
|
||||
if !reflect.DeepEqual(eOut, failedPost) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
|
||||
}
|
||||
addFailedPost("path1", "format1", "module1", "2")
|
||||
addFailedPost("path2", "format2", "module2", "3")
|
||||
AddFailedPost("path1", "format1", "module1", "2")
|
||||
AddFailedPost("path2", "format2", "module2", "3")
|
||||
x, ok = failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1"))
|
||||
if !ok {
|
||||
t.Error("Error reading from cache")
|
||||
|
||||
@@ -75,7 +75,7 @@ func TestHttpJsonPoster(t *testing.T) {
|
||||
if err = pstr.Post(jsn, utils.EmptyString); err == nil {
|
||||
t.Error("Expected error")
|
||||
}
|
||||
addFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn)
|
||||
AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn)
|
||||
time.Sleep(2)
|
||||
fs, err := filepath.Glob("/tmp/test1*")
|
||||
if err != nil {
|
||||
@@ -108,7 +108,7 @@ func TestHttpBytesPoster(t *testing.T) {
|
||||
if err = pstr.Post(content, utils.EmptyString); err == nil {
|
||||
t.Error("Expected error")
|
||||
}
|
||||
addFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content)
|
||||
AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content)
|
||||
time.Sleep(2)
|
||||
fs, err := filepath.Glob("/tmp/test2*")
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user