From 94c29c93d258eab13b75d4fcd28a5ac3415c869a Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 22 Jan 2020 18:04:58 +0200 Subject: [PATCH] Updated CDRExporter --- apier/v1/apier.go | 28 +++--- apier/v1/cdre.go | 4 +- apier/v2/cdre.go | 2 +- engine/action.go | 191 +++++++++++++++++------------------------ engine/cdr.go | 128 +++++++++++++-------------- engine/cdre.go | 187 ++++++++++++++++++++++++++++++++++++---- engine/cdrecsv_test.go | 10 +-- engine/cdrefwv_test.go | 4 +- engine/cdrs.go | 13 ++- engine/poster.go | 72 ++++++++-------- engine/pstr_amqp.go | 44 +++++----- engine/pstr_amqpv1.go | 34 ++++---- engine/pstr_http.go | 125 ++++++++++++++------------- engine/pstr_kafka.go | 26 +++--- engine/pstr_s3.go | 42 +++++---- engine/pstr_sqs.go | 40 ++++----- ers/kafka.go | 3 +- services/apierv1.go | 8 +- 18 files changed, 534 insertions(+), 427 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 640e86f3e..d5d2f8191 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -46,8 +46,7 @@ type ApierV1 struct { DataManager *engine.DataManager Config *config.CGRConfig Responder *engine.Responder - SchedulerService SchedulerGeter // Need to have them capitalize so we can export in V2 - HTTPPoster *engine.HTTPPoster + SchedulerService SchedulerGeter // Need to have them capitalize so we can export in V2 FilterS *engine.FilterS //Used for CDR Exporter ConnMgr *engine.ConnManager @@ -1222,27 +1221,30 @@ func (apiv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string } switch ffn.Transport { case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: - _, err = engine.NewHTTPPoster(apiv1.Config.GeneralCfg().HttpSkipTlsVerify, - apiv1.Config.GeneralCfg().ReplyTimeout).Post(ffn.Address, - utils.PosterTransportContentTypes[ffn.Transport], fileContent, - apiv1.Config.GeneralCfg().PosterAttempts, failoverPath) + var pstr *engine.HTTPPoster + pstr, err = engine.NewHTTPPoster(apiv1.Config.GeneralCfg().HttpSkipTlsVerify, + apiv1.Config.GeneralCfg().ReplyTimeout, ffn.Address, + utils.PosterTransportContentTypes[ffn.Transport], + apiv1.Config.GeneralCfg().PosterAttempts) + if err != nil { + return err + } + err = pstr.Post(fileContent, utils.EmptyString) //this may cause panics for contentType == utils.CONTENT_FORM case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: err = engine.PostersCache.PostAMQP(ffn.Address, - apiv1.Config.GeneralCfg().PosterAttempts, fileContent, - utils.PosterTransportContentTypes[ffn.Transport], - failedReqsOutDir, file.Name()) + apiv1.Config.GeneralCfg().PosterAttempts, fileContent) case utils.MetaAMQPV1jsonMap: err = engine.PostersCache.PostAMQPv1(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts, - fileContent, failedReqsOutDir, file.Name()) + fileContent) case utils.MetaSQSjsonMap: err = engine.PostersCache.PostSQS(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts, - fileContent, failedReqsOutDir, file.Name()) + fileContent) case utils.MetaKafkajsonMap: err = engine.PostersCache.PostKafka(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts, - fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix()) + fileContent, utils.UUIDSha1Prefix()) case utils.MetaS3jsonMap: err = engine.PostersCache.PostS3(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts, - fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix()) + fileContent, utils.UUIDSha1Prefix()) default: err = fmt.Errorf("unsupported replication transport: %s", ffn.Transport) } diff --git a/apier/v1/cdre.go b/apier/v1/cdre.go index aaff0701c..d76edc5cc 100644 --- a/apier/v1/cdre.go +++ b/apier/v1/cdre.go @@ -147,7 +147,7 @@ func (api *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.Ex cdrexp, err := engine.NewCDRExporter(cdrs, exportTemplate, exportFormat, filePath, utils.META_NONE, exportID, exportTemplate.Synchronous, exportTemplate.Attempts, fieldSep, - api.Config.GeneralCfg().HttpSkipTlsVerify, api.HTTPPoster, + api.Config.GeneralCfg().HttpSkipTlsVerify, api.Config.ApierCfg().AttributeSConns, api.FilterS) if err != nil { return utils.NewErrServerError(err) @@ -292,7 +292,7 @@ func (api *ApierV1) ExportCDRs(arg ArgExportCDRs, reply *RplExportedCDRs) (err e filePath, utils.META_NONE, exportID, synchronous, attempts, fieldSep, api.Config.GeneralCfg().HttpSkipTlsVerify, - api.HTTPPoster, api.Config.ApierCfg().AttributeSConns, api.FilterS) + api.Config.ApierCfg().AttributeSConns, api.FilterS) if err != nil { return utils.NewErrServerError(err) } diff --git a/apier/v2/cdre.go b/apier/v2/cdre.go index 1c5ff48c7..e1bbb11cd 100644 --- a/apier/v2/cdre.go +++ b/apier/v2/cdre.go @@ -105,7 +105,7 @@ func (apiv2 *ApierV2) ExportCdrsToFile(attr AttrExportCdrsToFile, reply *utils.E cdrexp, err := engine.NewCDRExporter(cdrs, exportTemplate, exportFormat, filePath, utils.META_NONE, exportID, exportTemplate.Synchronous, exportTemplate.Attempts, fieldSep, apiv2.Config.GeneralCfg().HttpSkipTlsVerify, - apiv2.HTTPPoster, apiv2.Config.ApierCfg().AttributeSConns, apiv2.FilterS) + apiv2.Config.ApierCfg().AttributeSConns, apiv2.FilterS) if err != nil { return utils.NewErrServerError(err) } diff --git a/engine/action.go b/engine/action.go index b509b4501..222f3a207 100644 --- a/engine/action.go +++ b/engine/action.go @@ -26,7 +26,6 @@ import ( "html/template" "net" "net/smtp" - "path" "reflect" "sort" "strconv" @@ -40,9 +39,7 @@ import ( "github.com/mitchellh/mapstructure" ) -/* -Structure to be filled for each tariff plan with the bonus value for received calls minutes. -*/ +// Action will be filled for each tariff plan with the bonus value for received calls minutes. type Action struct { Id string ActionType string @@ -54,6 +51,7 @@ type Action struct { balanceValue float64 // balance value after action execution, used with cdrlog } +// Clone returns a clone of the action func (a *Action) Clone() (cln *Action) { if a == nil { return @@ -88,8 +86,8 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { utils.RESET_COUNTERS: resetCountersAction, utils.ENABLE_ACCOUNT: enableAccountAction, utils.DISABLE_ACCOUNT: disableAccountAction, - utils.HttpPost: callUrl, - utils.HttpPostAsync: callUrlAsync, + utils.HttpPost: callURL, + utils.HttpPostAsync: callURLAsync, utils.MAIL_ASYNC: mailAsync, utils.SET_DDESTINATIONS: setddestinations, utils.REMOVE_ACCOUNT: removeAccountAction, @@ -381,17 +379,11 @@ func sendAMQP(ub *Account, a *Action, acs Actions, extraData interface{}) error if err != nil { return err } - cfg := config.CgrConfig() - fallbackFileName := (&utils.FallbackFileName{ - Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType), - Transport: utils.MetaAMQPjsonMap, - Address: a.ExtraParameters, - RequestID: utils.GenUUID(), - FileSuffix: utils.JSNSuffix, - }).AsString() - - return PostersCache.PostAMQP(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, - body, utils.CONTENT_JSON, cfg.GeneralCfg().FailedPostsDir, fallbackFileName) + err = PostersCache.PostAMQP(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body) + if err != nil { + addFailedPost(a.ExtraParameters, utils.MetaAMQPjsonMap, body) + } + return err } func sendAWS(ub *Account, a *Action, acs Actions, extraData interface{}) error { @@ -399,17 +391,11 @@ func sendAWS(ub *Account, a *Action, acs Actions, extraData interface{}) error { if err != nil { return err } - cfg := config.CgrConfig() - fallbackFileName := (&utils.FallbackFileName{ - Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType), - Transport: utils.MetaAMQPV1jsonMap, - Address: a.ExtraParameters, - RequestID: utils.GenUUID(), - FileSuffix: utils.JSNSuffix, - }).AsString() - - return PostersCache.PostAMQPv1(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, - body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName) + err = PostersCache.PostAMQPv1(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body) + if err != nil { + addFailedPost(a.ExtraParameters, utils.MetaAMQPV1jsonMap, body) + } + return err } func sendSQS(ub *Account, a *Action, acs Actions, extraData interface{}) error { @@ -417,17 +403,11 @@ func sendSQS(ub *Account, a *Action, acs Actions, extraData interface{}) error { if err != nil { return err } - cfg := config.CgrConfig() - fallbackFileName := (&utils.FallbackFileName{ - Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType), - Transport: utils.MetaSQSjsonMap, - Address: a.ExtraParameters, - RequestID: utils.GenUUID(), - FileSuffix: utils.JSNSuffix, - }).AsString() - - return PostersCache.PostSQS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, - body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName) + err = PostersCache.PostSQS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body) + if err != nil { + addFailedPost(a.ExtraParameters, utils.MetaSQSjsonMap, body) + } + return err } func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error { @@ -435,17 +415,11 @@ func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error if err != nil { return err } - cfg := config.CgrConfig() - fallbackFileName := (&utils.FallbackFileName{ - Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType), - Transport: utils.MetaKafkajsonMap, - Address: a.ExtraParameters, - RequestID: utils.GenUUID(), - FileSuffix: utils.JSNSuffix, - }).AsString() - - return PostersCache.PostKafka(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, - body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName, utils.UUIDSha1Prefix()) + err = PostersCache.PostKafka(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix()) + if err != nil { + addFailedPost(a.ExtraParameters, utils.MetaKafkajsonMap, body) + } + return err } func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error { @@ -453,57 +427,49 @@ func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error { if err != nil { return err } - cfg := config.CgrConfig() - fallbackFileName := (&utils.FallbackFileName{ - Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType), - Transport: utils.MetaS3jsonMap, - Address: a.ExtraParameters, - RequestID: utils.GenUUID(), - FileSuffix: utils.JSNSuffix, - }).AsString() - - return PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, - body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName, utils.UUIDSha1Prefix()) + err = PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix()) + if err != nil { + addFailedPost(a.ExtraParameters, utils.MetaS3jsonMap, body) + } + return err } -func callUrl(ub *Account, a *Action, acs Actions, extraData interface{}) error { - jsn, err := getOneData(ub, extraData) +func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error { + body, err := getOneData(ub, extraData) if err != nil { return err } - cfg := config.CgrConfig() - ffn := &utils.FallbackFileName{ - Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType), - Transport: utils.MetaHTTPjson, - Address: a.ExtraParameters, - RequestID: utils.GenUUID(), - FileSuffix: utils.JSNSuffix, + pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify, + config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters, + utils.CONTENT_JSON, config.CgrConfig().GeneralCfg().PosterAttempts) + if err != nil { + return err + } + err = pstr.Post(body, utils.EmptyString) + if err != nil { + addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, body) } - _, err = NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify, - config.CgrConfig().GeneralCfg().ReplyTimeout).Post(a.ExtraParameters, - utils.CONTENT_JSON, jsn, config.CgrConfig().GeneralCfg().PosterAttempts, - path.Join(cfg.GeneralCfg().FailedPostsDir, ffn.AsString())) return err } // Does not block for posts, no error reports -func callUrlAsync(ub *Account, a *Action, acs Actions, extraData interface{}) error { - jsn, err := getOneData(ub, extraData) +func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) error { + body, err := getOneData(ub, extraData) if err != nil { return err } - cfg := config.CgrConfig() - ffn := &utils.FallbackFileName{ - Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType), - Transport: utils.MetaHTTPjson, - Address: a.ExtraParameters, - RequestID: utils.GenUUID(), - FileSuffix: utils.JSNSuffix, + pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify, + config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters, + utils.CONTENT_JSON, config.CgrConfig().GeneralCfg().PosterAttempts) + if err != nil { + return err } - go NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify, - config.CgrConfig().GeneralCfg().ReplyTimeout).Post(a.ExtraParameters, - utils.CONTENT_JSON, jsn, config.CgrConfig().GeneralCfg().PosterAttempts, - path.Join(cfg.GeneralCfg().FailedPostsDir, ffn.AsString())) + go func() { + err := pstr.Post(body, utils.EmptyString) + if err != nil { + addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, body) + } + }() return nil } @@ -548,24 +514,24 @@ func mailAsync(ub *Account, a *Action, acs Actions, extraData interface{}) error } func setddestinations(ub *Account, a *Action, acs Actions, extraData interface{}) (err error) { - var ddcDestId string + var ddcDestID string for _, bchain := range ub.BalanceMap { for _, b := range bchain { - for destId := range b.DestinationIDs { - if strings.HasPrefix(destId, "*ddc") { - ddcDestId = destId + for destID := range b.DestinationIDs { + if strings.HasPrefix(destID, "*ddc") { + ddcDestID = destID break } } - if ddcDestId != "" { + if ddcDestID != "" { break } } - if ddcDestId != "" { + if ddcDestID != "" { break } } - if ddcDestId != "" { + if ddcDestID != "" { // make slice from prefixes // Review here prefixes // prefixes := make([]string, len(sq.Metrics)) @@ -574,8 +540,8 @@ func setddestinations(ub *Account, a *Action, acs Actions, extraData interface{} // prefixes[i] = p // i++ // } - newDest := &Destination{Id: ddcDestId} - oldDest, err := dm.GetDestination(ddcDestId, false, utils.NonTransactional) + newDest := &Destination{Id: ddcDestID} + oldDest, err := dm.GetDestination(ddcDestID, false, utils.NonTransactional) if err != nil { return err } @@ -583,7 +549,7 @@ func setddestinations(ub *Account, a *Action, acs Actions, extraData interface{} if err = dm.SetDestination(newDest, utils.NonTransactional); err != nil { return err } - if err = dm.CacheDataFromDB(utils.DESTINATION_PREFIX, []string{ddcDestId}, true); err != nil { + if err = dm.CacheDataFromDB(utils.DESTINATION_PREFIX, []string{ddcDestID}, true); err != nil { return err } @@ -673,7 +639,7 @@ func removeBalanceAction(ub *Account, a *Action, acs Actions, extraData interfac // delete without preserving order bChain[i] = bChain[len(bChain)-1] bChain = bChain[:len(bChain)-1] - i -= 1 + i-- found = true } } @@ -714,6 +680,7 @@ func transferMonetaryDefaultAction(ub *Account, a *Action, acs Actions, extraDat return nil } +// RPCRequest used by rpc action type RPCRequest struct { Address string Transport string @@ -859,7 +826,7 @@ func publishBalance(ub *Account, a *Action, acs Actions, extraData interface{}) return nil } -// Structure to store actions according to weight +// Actions used to store actions according to weight type Actions []*Action func (apl Actions) Len() int { @@ -875,10 +842,12 @@ func (apl Actions) Less(j, i int) bool { return apl[i].Weight < apl[j].Weight } +// Sort used to implement sort interface func (apl Actions) Sort() { sort.Sort(apl) } +// Clone returns a clone from object func (apl Actions) Clone() (interface{}, error) { if apl == nil { return nil, nil @@ -1023,7 +992,7 @@ func removeExpired(acc *Account, action *Action, _ Actions, extraData interface{ // delete without preserving order bChain[i] = bChain[len(bChain)-1] bChain = bChain[:len(bChain)-1] - i -= 1 + i-- found = true } } @@ -1035,22 +1004,20 @@ func removeExpired(acc *Account, action *Action, _ Actions, extraData interface{ } func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error { - jsn, err := json.Marshal(extraData) + body, err := json.Marshal(extraData) if err != nil { return err } - cfg := config.CgrConfig() - ffn := &utils.FallbackFileName{ - Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType), - Transport: utils.MetaHTTPjson, - Address: a.ExtraParameters, - RequestID: utils.GenUUID(), - FileSuffix: utils.JSNSuffix, + pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify, + config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters, + utils.CONTENT_JSON, config.CgrConfig().GeneralCfg().PosterAttempts) + if err != nil { + return err + } + err = pstr.Post(body, utils.EmptyString) + if err != nil { + addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, body) } - _, err = NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify, - config.CgrConfig().GeneralCfg().ReplyTimeout).Post(a.ExtraParameters, - utils.CONTENT_JSON, jsn, config.CgrConfig().GeneralCfg().PosterAttempts, - path.Join(cfg.GeneralCfg().FailedPostsDir, ffn.AsString())) return err } diff --git a/engine/cdr.go b/engine/cdr.go index 25d965f2b..aa4a5e0b7 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -141,7 +141,7 @@ func (cdr *CDR) ComputeCGRID() { cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.OriginHost) } -// Format cost as string on export +// FormatCost formats the cost as string on export func (cdr *CDR) FormatCost(shiftDecimals, roundDecimals int) string { cost := cdr.Cost if shiftDecimals != 0 { @@ -150,7 +150,7 @@ func (cdr *CDR) FormatCost(shiftDecimals, roundDecimals int) string { return strconv.FormatFloat(cost, 'f', roundDecimals, 64) } -// Used to retrieve fields as string, primary fields are const labeled +// FieldAsString is used to retrieve fields as string, primary fields are const labeled func (cdr *CDR) FieldAsString(rsrPrs *config.RSRParser) (parsed string, err error) { parsed, err = rsrPrs.ParseDataProviderWithInterfaces(config.NewNavigableMap(cdr.AsMapStringIface()), utils.NestingSep) if err != nil { @@ -159,7 +159,7 @@ func (cdr *CDR) FieldAsString(rsrPrs *config.RSRParser) (parsed string, err erro return } -// concatenates values of multiple fields defined in template, used eg in CDR templates +// FieldsAsString concatenates values of multiple fields defined in template, used eg in CDR templates func (cdr *CDR) FieldsAsString(rsrFlds config.RSRParsers) string { outVal, err := rsrFlds.ParseDataProviderWithInterfaces(config.NewNavigableMap(cdr.AsMapStringIface()), utils.NestingSep) if err != nil { @@ -168,7 +168,7 @@ func (cdr *CDR) FieldsAsString(rsrFlds config.RSRParsers) string { return outVal } -// Populates the field with id from value; strings are appended to original one +// ParseFieldValue populates the field with id from value; strings are appended to original one func (cdr *CDR) ParseFieldValue(fieldId, fieldVal, timezone string) error { var err error switch fieldId { @@ -394,7 +394,7 @@ func (cdr *CDR) exportFieldValue(cfgCdrFld *config.FCTemplate, filterS *FilterS) return } -func (cdr *CDR) formatField(cfgFld *config.FCTemplate, httpSkipTlsCheck bool, +func (cdr *CDR) formatField(cfgFld *config.FCTemplate, httpSkipTLSCheck bool, groupedCDRs []*CDR, filterS *FilterS) (outVal string, err error) { switch cfgFld.Type { case utils.META_FILLER: @@ -427,7 +427,7 @@ func (cdr *CDR) formatField(cfgFld *config.FCTemplate, httpSkipTlsCheck bool, } if len(httpAddr) == 0 { err = fmt.Errorf("Empty http address for field %s type %s", cfgFld.Tag, cfgFld.Type) - } else if outValByte, err = HttpJsonPost(httpAddr, httpSkipTlsCheck, jsn); err == nil { + } else if outValByte, err = HttpJsonPost(httpAddr, httpSkipTLSCheck, jsn); err == nil { outVal = string(outValByte) if len(outVal) == 0 && cfgFld.Mandatory { err = fmt.Errorf("Empty result for http_post field: %s", cfgFld.Tag) @@ -451,10 +451,10 @@ func (cdr *CDR) formatField(cfgFld *config.FCTemplate, httpSkipTlsCheck bool, return utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory) } -// Used in place where we need to export the CDR based on an export template +// AsExportRecord is used in place where we need to export the CDR based on an export template // ExportRecord is a []string to keep it compatible with encoding/csv Writer func (cdr *CDR) AsExportRecord(exportFields []*config.FCTemplate, - httpSkipTlsCheck bool, groupedCDRs []*CDR, filterS *FilterS) (expRecord []string, err error) { + httpSkipTLSCheck bool, groupedCDRs []*CDR, filterS *FilterS) (expRecord []string, err error) { nM := config.NewNavigableMap(nil) nM.Set([]string{utils.MetaReq}, cdr.AsMapStringIface(), false, false) for _, cfgFld := range exportFields { @@ -464,20 +464,20 @@ func (cdr *CDR) AsExportRecord(exportFields []*config.FCTemplate, } else if !pass { continue } - if fmtOut, err := cdr.formatField(cfgFld, httpSkipTlsCheck, groupedCDRs, filterS); err != nil { + var fmtOut string + if fmtOut, err = cdr.formatField(cfgFld, httpSkipTLSCheck, groupedCDRs, filterS); err != nil { utils.Logger.Warning(fmt.Sprintf(" error: %s exporting field: %s, CDR: %s\n", err.Error(), utils.ToJSON(cfgFld), utils.ToJSON(cdr))) return nil, err - } else { - expRecord = append(expRecord, fmtOut) } + expRecord = append(expRecord, fmtOut) } return expRecord, nil } // AsExportMap converts the CDR into a map[string]string based on export template // Used in real-time replication as well as remote exports -func (cdr *CDR) AsExportMap(exportFields []*config.FCTemplate, httpSkipTlsCheck bool, +func (cdr *CDR) AsExportMap(exportFields []*config.FCTemplate, httpSkipTLSCheck bool, groupedCDRs []*CDR, filterS *FilterS) (expMap map[string]string, err error) { expMap = make(map[string]string) nM := config.NewNavigableMap(nil) @@ -489,41 +489,41 @@ func (cdr *CDR) AsExportMap(exportFields []*config.FCTemplate, httpSkipTlsCheck } else if !pass { continue } - if fmtOut, err := cdr.formatField(cfgFld, httpSkipTlsCheck, groupedCDRs, filterS); err != nil { + var fmtOut string + if fmtOut, err = cdr.formatField(cfgFld, httpSkipTLSCheck, groupedCDRs, filterS); err != nil { utils.Logger.Warning(fmt.Sprintf(" error: %s exporting field: %s, CDR: %s\n", err.Error(), utils.ToJSON(cfgFld), utils.ToJSON(cdr))) return nil, err - } else { - expMap[cfgFld.FieldId] += fmtOut } + expMap[cfgFld.FieldId] += fmtOut } return } -// AsCDRsTBL converts the CDR into the format used for SQL storage -func (cdr *CDR) AsCDRsql() (cdrSql *CDRsql) { - cdrSql = new(CDRsql) - cdrSql.Cgrid = cdr.CGRID - cdrSql.RunID = cdr.RunID - cdrSql.OriginHost = cdr.OriginHost - cdrSql.Source = cdr.Source - cdrSql.OriginID = cdr.OriginID - cdrSql.TOR = cdr.ToR - cdrSql.RequestType = cdr.RequestType - cdrSql.Tenant = cdr.Tenant - cdrSql.Category = cdr.Category - cdrSql.Account = cdr.Account - cdrSql.Subject = cdr.Subject - cdrSql.Destination = cdr.Destination - cdrSql.SetupTime = cdr.SetupTime - cdrSql.AnswerTime = cdr.AnswerTime - cdrSql.Usage = cdr.Usage.Nanoseconds() - cdrSql.ExtraFields = utils.ToJSON(cdr.ExtraFields) - cdrSql.CostSource = cdr.CostSource - cdrSql.Cost = cdr.Cost - cdrSql.CostDetails = utils.ToJSON(cdr.CostDetails) - cdrSql.ExtraInfo = cdr.ExtraInfo - cdrSql.CreatedAt = time.Now() +// AsCDRsql converts the CDR into the format used for SQL storage +func (cdr *CDR) AsCDRsql() (cdrSQL *CDRsql) { + cdrSQL = new(CDRsql) + cdrSQL.Cgrid = cdr.CGRID + cdrSQL.RunID = cdr.RunID + cdrSQL.OriginHost = cdr.OriginHost + cdrSQL.Source = cdr.Source + cdrSQL.OriginID = cdr.OriginID + cdrSQL.TOR = cdr.ToR + cdrSQL.RequestType = cdr.RequestType + cdrSQL.Tenant = cdr.Tenant + cdrSQL.Category = cdr.Category + cdrSQL.Account = cdr.Account + cdrSQL.Subject = cdr.Subject + cdrSQL.Destination = cdr.Destination + cdrSQL.SetupTime = cdr.SetupTime + cdrSQL.AnswerTime = cdr.AnswerTime + cdrSQL.Usage = cdr.Usage.Nanoseconds() + cdrSQL.ExtraFields = utils.ToJSON(cdr.ExtraFields) + cdrSQL.CostSource = cdr.CostSource + cdrSQL.Cost = cdr.Cost + cdrSQL.CostDetails = utils.ToJSON(cdr.CostDetails) + cdrSQL.ExtraInfo = cdr.ExtraInfo + cdrSQL.CreatedAt = time.Now() return } @@ -605,34 +605,34 @@ func (cdr *CDR) UpdateFromCGREvent(cgrEv *utils.CGREvent, fields []string) (err } // NewCDRFromSQL converts the CDRsql into CDR -func NewCDRFromSQL(cdrSql *CDRsql) (cdr *CDR, err error) { +func NewCDRFromSQL(cdrSQL *CDRsql) (cdr *CDR, err error) { cdr = new(CDR) - cdr.CGRID = cdrSql.Cgrid - cdr.RunID = cdrSql.RunID - cdr.OriginHost = cdrSql.OriginHost - cdr.Source = cdrSql.Source - cdr.OriginID = cdrSql.OriginID - cdr.OrderID = cdrSql.ID - cdr.ToR = cdrSql.TOR - cdr.RequestType = cdrSql.RequestType - cdr.Tenant = cdrSql.Tenant - cdr.Category = cdrSql.Category - cdr.Account = cdrSql.Account - cdr.Subject = cdrSql.Subject - cdr.Destination = cdrSql.Destination - cdr.SetupTime = cdrSql.SetupTime - cdr.AnswerTime = cdrSql.AnswerTime - cdr.Usage = time.Duration(cdrSql.Usage) - cdr.CostSource = cdrSql.CostSource - cdr.Cost = cdrSql.Cost - cdr.ExtraInfo = cdrSql.ExtraInfo - if cdrSql.ExtraFields != "" { - if err = json.Unmarshal([]byte(cdrSql.ExtraFields), &cdr.ExtraFields); err != nil { + cdr.CGRID = cdrSQL.Cgrid + cdr.RunID = cdrSQL.RunID + cdr.OriginHost = cdrSQL.OriginHost + cdr.Source = cdrSQL.Source + cdr.OriginID = cdrSQL.OriginID + cdr.OrderID = cdrSQL.ID + cdr.ToR = cdrSQL.TOR + cdr.RequestType = cdrSQL.RequestType + cdr.Tenant = cdrSQL.Tenant + cdr.Category = cdrSQL.Category + cdr.Account = cdrSQL.Account + cdr.Subject = cdrSQL.Subject + cdr.Destination = cdrSQL.Destination + cdr.SetupTime = cdrSQL.SetupTime + cdr.AnswerTime = cdrSQL.AnswerTime + cdr.Usage = time.Duration(cdrSQL.Usage) + cdr.CostSource = cdrSQL.CostSource + cdr.Cost = cdrSQL.Cost + cdr.ExtraInfo = cdrSQL.ExtraInfo + if cdrSQL.ExtraFields != "" { + if err = json.Unmarshal([]byte(cdrSQL.ExtraFields), &cdr.ExtraFields); err != nil { return nil, err } } - if cdrSql.CostDetails != "" { - if err = json.Unmarshal([]byte(cdrSql.CostDetails), &cdr.CostDetails); err != nil { + if cdrSQL.CostDetails != "" { + if err = json.Unmarshal([]byte(cdrSQL.CostDetails), &cdr.CostDetails); err != nil { return nil, err } } @@ -664,7 +664,7 @@ type ExternalCDR struct { PreRated bool // Mark the CDR as rated so we do not process it during mediation } -// Used when authorizing requests from outside, eg ApierV1.GetMaxUsage +// UsageRecord is used when authorizing requests from outside, eg ApierV1.GetMaxUsage type UsageRecord struct { ToR string RequestType string diff --git a/engine/cdre.go b/engine/cdre.go index 98132f522..65640ae67 100644 --- a/engine/cdre.go +++ b/engine/cdre.go @@ -33,7 +33,9 @@ import ( "time" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" ) const ( @@ -50,10 +52,45 @@ const ( metaCostCDRs = "*cdrs_cost" ) +var failedPostCache *ltcache.Cache + +func init() { + failedPostCache = ltcache.NewCache(-1, 5*time.Second, false, writeFailedPosts) +} + +func writeFailedPosts(itmID string, value interface{}) { + expEv, canConvert := value.(*ExportEvents) + if !canConvert { + return + } + filePath := path.Join(config.CgrConfig().GeneralCfg().FailedPostsDir, expEv.FileName(utils.CDRSCtx)) + if err := expEv.WriteToFile(filePath); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write file <%s> because <%s>", + utils.CDRs, filePath, err)) + } + return +} +func addFailedPost(expPath, format string, ev []byte) { + var failedPost *ExportEvents + if x, ok := failedPostCache.Get(utils.ConcatenatedKey(expPath, format)); ok { + if x != nil { + failedPost = x.(*ExportEvents) + } + } + if failedPost == nil { + failedPost = &ExportEvents{ + Path: expPath, + Format: format, + } + } + failedPost.AddEvent(ev) + failedPostCache.Set(failedPost.ID(), failedPost, nil) +} + // NewCDRExporter returns a new CDRExporter func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, exportPath, fallbackPath, exportID string, synchronous bool, attempts int, fieldSeparator rune, - httpSkipTLSCheck bool, httpPoster *HTTPPoster, attrsConns []string, filterS *FilterS) (*CDRExporter, error) { + httpSkipTLSCheck bool, attrsConns []string, filterS *FilterS) (*CDRExporter, error) { if len(cdrs) == 0 { // Nothing to export return nil, nil } @@ -68,7 +105,6 @@ func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, e attempts: attempts, fieldSeparator: fieldSeparator, httpSkipTLSCheck: httpSkipTLSCheck, - httpPoster: httpPoster, negativeExports: make(map[string]string), attrsConns: attrsConns, filterS: filterS, @@ -89,7 +125,6 @@ type CDRExporter struct { attempts int fieldSeparator rune httpSkipTLSCheck bool - httpPoster *HTTPPoster header, trailer []string // Header and Trailer fields content [][]string // Rows of cdr fields @@ -263,27 +298,29 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) { default: return fmt.Errorf("unsupported exportFormat: <%s>", cdre.exportFormat) } - // compute fallbackPath - fallbackPath := utils.META_NONE - ffn := &utils.FallbackFileName{Module: utils.CDRPoster, Transport: cdre.exportFormat, Address: cdre.exportPath, RequestID: utils.GenUUID()} - fallbackFileName := ffn.AsString() - if cdre.fallbackPath != utils.META_NONE { // not none, need fallback - fallbackPath = path.Join(cdre.fallbackPath, fallbackFileName) - } switch cdre.exportFormat { case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: - _, err = cdre.httpPoster.Post(cdre.exportPath, utils.PosterTransportContentTypes[cdre.exportFormat], body, cdre.attempts, fallbackPath) + var pstr *HTTPPoster + pstr, err = NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify, + config.CgrConfig().GeneralCfg().ReplyTimeout, cdre.exportPath, + utils.PosterTransportContentTypes[cdre.exportFormat], cdre.attempts) + if err != nil { + return err + } + err = pstr.Post(body, utils.EmptyString) case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: - err = PostersCache.PostAMQP(cdre.exportPath, cdre.attempts, body.([]byte), - utils.PosterTransportContentTypes[cdre.exportFormat], cdre.fallbackPath, fallbackFileName) + err = PostersCache.PostAMQP(cdre.exportPath, cdre.attempts, body.([]byte)) case utils.MetaAMQPV1jsonMap: - err = PostersCache.PostAMQPv1(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName) + err = PostersCache.PostAMQPv1(cdre.exportPath, cdre.attempts, body.([]byte)) case utils.MetaSQSjsonMap: - err = PostersCache.PostSQS(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName) + err = PostersCache.PostSQS(cdre.exportPath, cdre.attempts, body.([]byte)) case utils.MetaKafkajsonMap: - err = PostersCache.PostKafka(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID)) + err = PostersCache.PostKafka(cdre.exportPath, cdre.attempts, body.([]byte), utils.ConcatenatedKey(cdr.CGRID, cdr.RunID)) case utils.MetaS3jsonMap: - err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID)) + err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), utils.ConcatenatedKey(cdr.CGRID, cdr.RunID)) + } + if err != nil && cdre.fallbackPath != utils.META_NONE { + addFailedPost(cdre.exportPath, cdre.exportFormat, body.([]byte)) } return } @@ -384,8 +421,7 @@ func (cdre *CDRExporter) processCDRs() (err error) { if cdre.exportTemplate.Tenant == "" { cdre.exportTemplate.Tenant = config.CgrConfig().GeneralCfg().DefaultTenant } - cgrDp := config.NewNavigableMap(nil) - cgrDp.Set([]string{utils.MetaReq}, cdr.AsMapStringIface(), false, false) + cgrDp := config.NewNavigableMap(map[string]interface{}{utils.MetaReq: cdr.AsMapStringIface()}) if pass, err := cdre.filterS.Pass(cdre.exportTemplate.Tenant, cdre.exportTemplate.Filters, cgrDp); err != nil || !pass { continue // Not passes filters, ignore this CDR @@ -550,3 +586,116 @@ func (cdre *CDRExporter) NegativeExports() map[string]string { defer cdre.RUnlock() return cdre.negativeExports } + +// ExportEvents used to save the failed post to file +type ExportEvents struct { + lk sync.RWMutex + Path string + Format string + Zip bool + Events [][]byte +} + +// ID returns the id for cache +func (expEv *ExportEvents) ID() string { + return utils.ConcatenatedKey(expEv.Path, expEv.Format) +} + +// FileName returns the file name it should use for saving the failed events +func (expEv *ExportEvents) FileName(module string) string { + fileSuffix := utils.CDREFileSuffixes[expEv.Format] + // instead of using fmt.Sprintf we use "+" as binary operator (small optimization) + return module + utils.HandlerArgSep + expEv.Format + utils.HandlerArgSep + utils.GenUUID() + fileSuffix +} + +// WriteToFile writes the events to file +func (expEv *ExportEvents) WriteToFile(filePath string) (err error) { + var content []byte + content, err = json.Marshal(expEv) + if err != nil { + return + } + _, err = guardian.Guardian.Guard(func() (interface{}, error) { + fileOut, err := os.Create(filePath) + if err != nil { + return nil, err + } + _, err = fileOut.Write(content) + fileOut.Close() + return nil, err + }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath) + return +} + +// AddEvent adds one event +func (expEv *ExportEvents) AddEvent(ev []byte) { + expEv.lk.Lock() + expEv.Events = append(expEv.Events, ev) + expEv.lk.Unlock() +} + +// ReplayFailedPosts tryies to post cdrs again +func (expEv *ExportEvents) ReplayFailedPosts(attempts int, key string) (failedEvents *ExportEvents, err error) { + failedEvents = &ExportEvents{ + Path: expEv.Path, + Format: expEv.Format, + Zip: expEv.Zip, + } + switch expEv.Format { + case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: + var pstr *HTTPPoster + pstr, err = NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify, + config.CgrConfig().GeneralCfg().ReplyTimeout, expEv.Path, + expEv.Format, config.CgrConfig().GeneralCfg().PosterAttempts) + if err != nil { + return expEv, err + } + for _, ev := range expEv.Events { + err = pstr.Post(ev, utils.EmptyString) + if err != nil { + failedEvents.AddEvent(ev) + } + } + case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: + for _, ev := range expEv.Events { + err = PostersCache.PostAMQP(expEv.Path, attempts, ev) + if err != nil { + failedEvents.AddEvent(ev) + } + } + case utils.MetaAMQPV1jsonMap: + for _, ev := range expEv.Events { + err = PostersCache.PostAMQPv1(expEv.Path, attempts, ev) + if err != nil { + failedEvents.AddEvent(ev) + } + } + case utils.MetaSQSjsonMap: + for _, ev := range expEv.Events { + err = PostersCache.PostSQS(expEv.Path, attempts, ev) + if err != nil { + failedEvents.AddEvent(ev) + } + } + case utils.MetaKafkajsonMap: + for _, ev := range expEv.Events { + err = PostersCache.PostKafka(expEv.Path, attempts, ev, key) + if err != nil { + failedEvents.AddEvent(ev) + } + } + case utils.MetaS3jsonMap: + for _, ev := range expEv.Events { + err = PostersCache.PostS3(expEv.Path, attempts, ev, key) + if err != nil { + failedEvents.AddEvent(ev) + } + } + } + if len(failedEvents.Events) > 0 { + err = utils.ErrPartiallyExecuted + } else { + failedEvents = nil + } + return +} diff --git a/engine/cdrecsv_test.go b/engine/cdrecsv_test.go index 874b73645..b16be3574 100644 --- a/engine/cdrecsv_test.go +++ b/engine/cdrecsv_test.go @@ -45,7 +45,7 @@ func TestCsvCdrWriter(t *testing.T) { } cdre, err := NewCDRExporter([]*CDR{storedCdr1}, cfg.CdreProfiles[utils.MetaDefault], utils.MetaFileCSV, "", "", "firstexport", - true, 1, utils.CSV_SEP, cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil, nil) + true, 1, utils.CSV_SEP, cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil) if err != nil { t.Error("Unexpected error received: ", err) } @@ -83,7 +83,7 @@ func TestAlternativeFieldSeparator(t *testing.T) { } cdre, err := NewCDRExporter([]*CDR{storedCdr1}, cfg.CdreProfiles[utils.MetaDefault], utils.MetaFileCSV, "", "", "firstexport", true, 1, '|', - cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil, nil) + cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil) if err != nil { t.Error("Unexpected error received: ", err) } @@ -177,7 +177,7 @@ func TestExportVoiceWithConvert(t *testing.T) { } cdre, err := NewCDRExporter([]*CDR{cdrVoice, cdrData, cdrSMS}, cdreCfg, utils.MetaFileCSV, "", "", "firstexport", - true, 1, '|', true, nil, nil, &FilterS{cfg: cfg}) + true, 1, '|', true, nil, &FilterS{cfg: cfg}) if err != nil { t.Error("Unexpected error received: ", err) } @@ -274,7 +274,7 @@ func TestExportWithFilter(t *testing.T) { } cdre, err := NewCDRExporter([]*CDR{cdrVoice, cdrData, cdrSMS}, cdreCfg, utils.MetaFileCSV, "", "", "firstexport", - true, 1, '|', true, nil, nil, &FilterS{cfg: cfg}) + true, 1, '|', true, nil, &FilterS{cfg: cfg}) if err != nil { t.Error("Unexpected error received: ", err) } @@ -370,7 +370,7 @@ func TestExportWithFilter2(t *testing.T) { } cdre, err := NewCDRExporter([]*CDR{cdrVoice, cdrData, cdrSMS}, cdreCfg, utils.MetaFileCSV, "", "", "firstexport", - true, 1, '|', true, nil, nil, &FilterS{cfg: cfg}) + true, 1, '|', true, nil, &FilterS{cfg: cfg}) if err != nil { t.Error("Unexpected error received: ", err) } diff --git a/engine/cdrefwv_test.go b/engine/cdrefwv_test.go index 0d06e98ef..884edd5cf 100644 --- a/engine/cdrefwv_test.go +++ b/engine/cdrefwv_test.go @@ -283,7 +283,7 @@ func TestWriteCdr(t *testing.T) { } cdre, err := NewCDRExporter([]*CDR{cdr}, cdreCfg, utils.MetaFileFWV, "", "", "fwv_1", - true, 1, '|', cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil, nil) + true, 1, '|', cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil) if err != nil { t.Error(err) } @@ -370,7 +370,7 @@ func TestWriteCdrs(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() cdre, err := NewCDRExporter([]*CDR{cdr1, cdr2, cdr3, cdr4}, cdreCfg, utils.MetaFileFWV, "", "", "fwv_1", true, 1, utils.CSV_SEP, - cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil, nil) + cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil) if err != nil { t.Error(err) } diff --git a/engine/cdrs.go b/engine/cdrs.go index c64f10e56..b3baf60de 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -72,12 +72,10 @@ func NewCDRServer(cgrCfg *config.CGRConfig, storDBChan chan StorDB, dm *DataMana connMgr *ConnManager) *CDRServer { cdrDb := <-storDBChan return &CDRServer{ - cgrCfg: cgrCfg, - cdrDb: cdrDb, - dm: dm, - guard: guardian.Guardian, - httpPoster: NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, - cgrCfg.GeneralCfg().ReplyTimeout), + cgrCfg: cgrCfg, + cdrDb: cdrDb, + dm: dm, + guard: guardian.Guardian, filterS: filterS, connMgr: connMgr, storDBChan: storDBChan, @@ -90,7 +88,6 @@ type CDRServer struct { cdrDb CdrStorage dm *DataManager guard *guardian.GuardianLocker - httpPoster *HTTPPoster // used for replication filterS *FilterS connMgr *ConnManager storDBChan chan StorDB @@ -391,7 +388,7 @@ func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) { if cdre, err = NewCDRExporter(cdrs, expTpl, expTpl.ExportFormat, expTpl.ExportPath, cdrS.cgrCfg.GeneralCfg().FailedPostsDir, "CDRSReplication", expTpl.Synchronous, expTpl.Attempts, - expTpl.FieldSeparator, cdrS.cgrCfg.GeneralCfg().HttpSkipTlsVerify, cdrS.httpPoster, + expTpl.FieldSeparator, cdrS.cgrCfg.GeneralCfg().HttpSkipTlsVerify, cdrS.cgrCfg.CdrsCfg().AttributeSConns, cdrS.filterS); err != nil { utils.Logger.Err(fmt.Sprintf(" Building CDRExporter for online exports got error: <%s>", err.Error())) continue diff --git a/engine/poster.go b/engine/poster.go index c5d570b02..d7a5d1f3a 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -64,7 +64,7 @@ type PosterCache struct { } type Poster interface { - Post(body []byte, fallbackName, key string) error + Post(body []byte, key string) error Close() } @@ -96,6 +96,7 @@ func parseURL(dialURL string) (URL string, qID string, err error) { return } +// Close closes all cached posters func (pc *PosterCache) Close() { for _, v := range pc.amqpCache { v.Close() @@ -113,112 +114,111 @@ func (pc *PosterCache) Close() { // GetAMQPPoster creates a new poster only if not already cached // uses dialURL as cache key -func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { +func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int) (pstr Poster, err error) { pc.Lock() defer pc.Unlock() if _, hasIt := pc.amqpCache[dialURL]; !hasIt { - if pstr, err := NewAMQPPoster(dialURL, attempts, fallbackFileDir); err != nil { + if pstr, err = NewAMQPPoster(dialURL, attempts); err != nil { return nil, err - } else { - pc.amqpCache[dialURL] = pstr } + pc.amqpCache[dialURL] = pstr } return pc.amqpCache[dialURL], nil } -func (pc *PosterCache) GetAMQPv1Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { +// GetAMQPv1Poster creates a new poster only if not already cached +func (pc *PosterCache) GetAMQPv1Poster(dialURL string, attempts int) (pstr Poster, err error) { pc.Lock() defer pc.Unlock() if _, hasIt := pc.amqpv1Cache[dialURL]; !hasIt { - if pstr, err := NewAMQPv1Poster(dialURL, attempts, fallbackFileDir); err != nil { + if pstr, err = NewAMQPv1Poster(dialURL, attempts); err != nil { return nil, err - } else { - pc.amqpv1Cache[dialURL] = pstr } + pc.amqpv1Cache[dialURL] = pstr } return pc.amqpv1Cache[dialURL], nil } -func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { +// GetSQSPoster creates a new poster only if not already cached +func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int) (pstr Poster, err error) { pc.Lock() defer pc.Unlock() if _, hasIt := pc.sqsCache[dialURL]; !hasIt { - if pstr, err := NewSQSPoster(dialURL, attempts, fallbackFileDir); err != nil { + if pstr, err = NewSQSPoster(dialURL, attempts); err != nil { return nil, err - } else { - pc.sqsCache[dialURL] = pstr } + pc.sqsCache[dialURL] = pstr } return pc.sqsCache[dialURL], nil } -func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { +// GetKafkaPoster creates a new poster only if not already cached +func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int) (pstr Poster, err error) { pc.Lock() defer pc.Unlock() if _, hasIt := pc.kafkaCache[dialURL]; !hasIt { - if pstr, err := NewKafkaPoster(dialURL, attempts, fallbackFileDir); err != nil { + if pstr, err = NewKafkaPoster(dialURL, attempts); err != nil { return nil, err - } else { - pc.kafkaCache[dialURL] = pstr } + pc.kafkaCache[dialURL] = pstr } return pc.kafkaCache[dialURL], nil } -func (pc *PosterCache) GetS3Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { +// GetS3Poster creates a new poster only if not already cached +func (pc *PosterCache) GetS3Poster(dialURL string, attempts int) (pstr Poster, err error) { pc.Lock() defer pc.Unlock() if _, hasIt := pc.s3Cache[dialURL]; !hasIt { - if pstr, err := NewS3Poster(dialURL, attempts, fallbackFileDir); err != nil { + if pstr, err = NewS3Poster(dialURL, attempts); err != nil { return nil, err - } else { - pc.s3Cache[dialURL] = pstr } + pc.s3Cache[dialURL] = pstr } return pc.s3Cache[dialURL], nil } func (pc *PosterCache) PostAMQP(dialURL string, attempts int, - content []byte, contentType, fallbackFileDir, fallbackFileName string) error { - amqpPoster, err := pc.GetAMQPPoster(dialURL, attempts, fallbackFileDir) + content []byte) error { + amqpPoster, err := pc.GetAMQPPoster(dialURL, attempts) if err != nil { return err } - return amqpPoster.Post(content, fallbackFileName, "") + return amqpPoster.Post(content, "") } func (pc *PosterCache) PostAMQPv1(dialURL string, attempts int, - content []byte, fallbackFileDir, fallbackFileName string) error { - AMQPv1Poster, err := pc.GetAMQPv1Poster(dialURL, attempts, fallbackFileDir) + content []byte) error { + AMQPv1Poster, err := pc.GetAMQPv1Poster(dialURL, attempts) if err != nil { return err } - return AMQPv1Poster.Post(content, fallbackFileName, "") + return AMQPv1Poster.Post(content, "") } func (pc *PosterCache) PostSQS(dialURL string, attempts int, - content []byte, fallbackFileDir, fallbackFileName string) error { - sqsPoster, err := pc.GetSQSPoster(dialURL, attempts, fallbackFileDir) + content []byte) error { + sqsPoster, err := pc.GetSQSPoster(dialURL, attempts) if err != nil { return err } - return sqsPoster.Post(content, fallbackFileName, "") + return sqsPoster.Post(content, "") } func (pc *PosterCache) PostKafka(dialURL string, attempts int, - content []byte, fallbackFileDir, fallbackFileName, key string) error { - kafkaPoster, err := pc.GetKafkaPoster(dialURL, attempts, fallbackFileDir) + content []byte, key string) error { + kafkaPoster, err := pc.GetKafkaPoster(dialURL, attempts) if err != nil { return err } - return kafkaPoster.Post(content, fallbackFileName, key) + return kafkaPoster.Post(content, key) } func (pc *PosterCache) PostS3(dialURL string, attempts int, - content []byte, fallbackFileDir, fallbackFileName, key string) error { - sqsPoster, err := pc.GetS3Poster(dialURL, attempts, fallbackFileDir) + content []byte, key string) error { + sqsPoster, err := pc.GetS3Poster(dialURL, attempts) if err != nil { return err } - return sqsPoster.Post(content, fallbackFileName, key) + return sqsPoster.Post(content, key) } diff --git a/engine/pstr_amqp.go b/engine/pstr_amqp.go index 430893ed1..b95778da1 100644 --- a/engine/pstr_amqp.go +++ b/engine/pstr_amqp.go @@ -15,6 +15,7 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ + package engine import ( @@ -28,13 +29,13 @@ import ( "github.com/streadway/amqp" ) -var AMQPQuery = []string{"cacertfile", "certfile", "keyfile", "verify", "server_name_indication", "auth_mechanism", "heartbeat", "connection_timeout", "channel_max"} +var amqpQuery = []string{"cacertfile", "certfile", "keyfile", "verify", "server_name_indication", "auth_mechanism", "heartbeat", "connection_timeout", "channel_max"} +// NewAMQPPoster creates a new amqp poster // "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" -func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) { +func NewAMQPPoster(dialURL string, attempts int) (*AMQPPoster, error) { amqp := &AMQPPoster{ - attempts: attempts, - fallbackFileDir: fallbackFileDir, + attempts: attempts, } if err := amqp.parseURL(dialURL); err != nil { return nil, err @@ -42,16 +43,16 @@ func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPP return amqp, nil } +// AMQPPoster used to post cdrs to amqp type AMQPPoster struct { - dialURL string - queueID string // identifier of the CDR queue where we publish - exchange string - exchangeType string - routingKey string - attempts int - fallbackFileDir string - sync.Mutex // protect connection - conn *amqp.Connection + dialURL string + queueID string // identifier of the CDR queue where we publish + exchange string + exchangeType string + routingKey string + attempts int + sync.Mutex // protect connection + conn *amqp.Connection } func (pstr *AMQPPoster) parseURL(dialURL string) error { @@ -61,7 +62,7 @@ func (pstr *AMQPPoster) parseURL(dialURL string) error { } qry := u.Query() q := url.Values{} - for _, key := range AMQPQuery { + for _, key := range amqpQuery { if vals, has := qry[key]; has && len(vals) != 0 { q.Add(key, vals[0]) } @@ -87,7 +88,7 @@ func (pstr *AMQPPoster) parseURL(dialURL string) error { // Post is the method being called when we need to post anything in the queue // the optional chn will permits channel caching -func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err error) { +func (pstr *AMQPPoster) Post(content []byte, _ string) (err error) { var chn *amqp.Channel fib := utils.Fib() @@ -100,11 +101,8 @@ func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err er } } if err != nil { - if fallbackFileName != utils.META_NONE { - utils.Logger.Warning(fmt.Sprintf(" creating new post channel, err: %s", err.Error())) - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) - } - return err + utils.Logger.Warning(fmt.Sprintf(" creating new post channel, err: %s", err.Error())) + return } for i := 0; i < pstr.attempts; i++ { if err = chn.Publish( @@ -123,9 +121,8 @@ func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err er time.Sleep(time.Duration(fib()) * time.Second) } } - if err != nil && fallbackFileName != utils.META_NONE { - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) - return err + if err != nil { + return } if chn != nil { chn.Close() @@ -133,6 +130,7 @@ func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err er return } +// Close closes the connections func (pstr *AMQPPoster) Close() { pstr.Lock() if pstr.conn != nil { diff --git a/engine/pstr_amqpv1.go b/engine/pstr_amqpv1.go index a121831fc..723b2ca43 100644 --- a/engine/pstr_amqpv1.go +++ b/engine/pstr_amqpv1.go @@ -29,28 +29,29 @@ import ( amqpv1 "pack.ag/amqp" ) -func NewAMQPv1Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { +// NewAMQPv1Poster creates a poster for amqpv1 +func NewAMQPv1Poster(dialURL string, attempts int) (Poster, error) { URL, qID, err := parseURL(dialURL) if err != nil { return nil, err } return &AMQPv1Poster{ - dialURL: URL, - queueID: "/" + qID, - attempts: attempts, - fallbackFileDir: fallbackFileDir, + dialURL: URL, + queueID: "/" + qID, + attempts: attempts, }, nil } +// AMQPv1Poster a poster for amqpv1 type AMQPv1Poster struct { sync.Mutex - dialURL string - queueID string // identifier of the CDR queue where we publish - attempts int - fallbackFileDir string - client *amqpv1.Client + dialURL string + queueID string // identifier of the CDR queue where we publish + attempts int + client *amqpv1.Client } +// Close closes the connections func (pstr *AMQPv1Poster) Close() { pstr.Lock() if pstr.client != nil { @@ -60,7 +61,8 @@ func (pstr *AMQPv1Poster) Close() { pstr.Unlock() } -func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err error) { +// Post is the method being called when we need to post anything in the queue +func (pstr *AMQPv1Poster) Post(content []byte, _ string) (err error) { var s *amqpv1.Session fib := utils.Fib() @@ -79,10 +81,7 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err } } if err != nil { - if fallbackFileName != utils.META_NONE { - utils.Logger.Warning(fmt.Sprintf(" creating new post channel, err: %s", err.Error())) - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) - } + utils.Logger.Warning(fmt.Sprintf(" creating new post channel, err: %s", err.Error())) return err } @@ -125,9 +124,8 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err // } // } } - if err != nil && fallbackFileName != utils.META_NONE { - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) - return err + if err != nil { + return } if s != nil { s.Close(ctx) diff --git a/engine/pstr_http.go b/engine/pstr_http.go index 43b14b01e..dd8e23424 100644 --- a/engine/pstr_http.go +++ b/engine/pstr_http.go @@ -25,109 +25,116 @@ import ( "io/ioutil" "net/http" "net/url" - "os" "time" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" ) -// Post without automatic failover -func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error) { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTlsVerify}, - DisableKeepAlives: true, +// keep it global in order to reuse it +var httpPosterTransport *http.Transport + +// HttpJsonPost posts without automatic failover +func HttpJsonPost(url string, skipTLSVerify bool, content []byte) (respBody []byte, err error) { + if httpPosterTransport == nil { + httpPosterTransport = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify}, + } } - client := &http.Client{Transport: tr} - resp, err := client.Post(url, "application/json", bytes.NewBuffer(content)) - if err != nil { - return nil, err + client := &http.Client{Transport: httpPosterTransport} + var resp *http.Response + if resp, err = client.Post(url, "application/json", bytes.NewBuffer(content)); err != nil { + return } - defer resp.Body.Close() - respBody, err := ioutil.ReadAll(resp.Body) + respBody, err = ioutil.ReadAll(resp.Body) + resp.Body.Close() if err != nil { - return nil, err + return } if resp.StatusCode > 299 { - return respBody, fmt.Errorf("Unexpected status code received: %d", resp.StatusCode) + err = fmt.Errorf("Unexpected status code received: %d", resp.StatusCode) } - return respBody, nil + return } -func NewHTTPPoster(skipTLSVerify bool, replyTimeout time.Duration) *HTTPPoster { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify}, - } - return &HTTPPoster{httpClient: &http.Client{Transport: tr, Timeout: replyTimeout}} -} - -type HTTPPoster struct { - httpClient *http.Client -} - -// Post with built-in failover -// Returns also reference towards client so we can close it's connections when done -func (poster *HTTPPoster) Post(addr string, contentType string, content interface{}, attempts int, fallbackFilePath string) (respBody []byte, err error) { +// NewHTTPPoster return a new HTTP poster +func NewHTTPPoster(skipTLSVerify bool, replyTimeout time.Duration, + addr, contentType string, attempts int) (httposter *HTTPPoster, err error) { if !utils.SliceHasMember([]string{utils.CONTENT_FORM, utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) { return nil, fmt.Errorf("unsupported ContentType: %s", contentType) } + if httpPosterTransport == nil { + httpPosterTransport = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify}, + } + } + return &HTTPPoster{ + httpClient: &http.Client{Transport: httpPosterTransport, Timeout: replyTimeout}, + addr: addr, + contentType: contentType, + attempts: attempts, + }, nil +} + +// HTTPPoster used to post cdrs +type HTTPPoster struct { + httpClient *http.Client + addr string + contentType string + attempts int +} + +// Post will post the event +func (pstr *HTTPPoster) Post(content interface{}, key string) (err error) { + _, err = pstr.GetResponse(content) + return +} + +// GetResponse will post the event and return the response +func (pstr *HTTPPoster) GetResponse(content interface{}) (respBody []byte, err error) { var body []byte // Used to write in file and send over http var urlVals url.Values // Used when posting form - if utils.SliceHasMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) { - body = content.([]byte) - } else if contentType == utils.CONTENT_FORM { + if pstr.contentType == utils.CONTENT_FORM { urlVals = content.(url.Values) body = []byte(urlVals.Encode()) + } else { + body = content.([]byte) } fib := utils.Fib() bodyType := "application/x-www-form-urlencoded" - if contentType == utils.CONTENT_JSON { + if pstr.contentType == utils.CONTENT_JSON { bodyType = "application/json" } - for i := 0; i < attempts; i++ { + for i := 0; i < pstr.attempts; i++ { var resp *http.Response - if utils.SliceHasMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) { - resp, err = poster.httpClient.Post(addr, bodyType, bytes.NewBuffer(body)) - } else if contentType == utils.CONTENT_FORM { - resp, err = poster.httpClient.PostForm(addr, urlVals) + if pstr.contentType == utils.CONTENT_FORM { + resp, err = pstr.httpClient.PostForm(pstr.addr, urlVals) + } else { + resp, err = pstr.httpClient.Post(pstr.addr, bodyType, bytes.NewBuffer(body)) } if err != nil { - utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) - if i+1 < attempts { + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", pstr.addr, err.Error())) + if i+1 < pstr.attempts { time.Sleep(time.Duration(fib()) * time.Second) } continue } - defer resp.Body.Close() respBody, err = ioutil.ReadAll(resp.Body) + resp.Body.Close() if err != nil { - utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) - if i+1 < attempts { + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", pstr.addr, err.Error())) + if i+1 < pstr.attempts { time.Sleep(time.Duration(fib()) * time.Second) } continue } if resp.StatusCode > 299 { - utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode)) - if i+1 < attempts { + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", pstr.addr, resp.StatusCode)) + if i+1 < pstr.attempts { time.Sleep(time.Duration(fib()) * time.Second) } continue } return respBody, nil } - if fallbackFilePath != utils.META_NONE { - // If we got that far, post was not possible, write it on disk - _, err = guardian.Guardian.Guard(func() (interface{}, error) { - fileOut, err := os.Create(fallbackFilePath) - if err != nil { - return nil, err - } - _, err = fileOut.Write(body) - fileOut.Close() - return nil, err - }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+fallbackFilePath) - } return } diff --git a/engine/pstr_kafka.go b/engine/pstr_kafka.go index 5d6f21e85..c88996a0a 100644 --- a/engine/pstr_kafka.go +++ b/engine/pstr_kafka.go @@ -27,11 +27,10 @@ import ( kafka "github.com/segmentio/kafka-go" ) -// "amqp://guest:guest@localhost:5672/?topic=cgrates_cdrs" -func NewKafkaPoster(dialURL string, attempts int, fallbackFileDir string) (*KafkaPoster, error) { +// NewKafkaPoster creates a kafka poster +func NewKafkaPoster(dialURL string, attempts int) (*KafkaPoster, error) { amqp := &KafkaPoster{ - attempts: attempts, - fallbackFileDir: fallbackFileDir, + attempts: attempts, } if err := amqp.parseURL(dialURL); err != nil { return nil, err @@ -39,13 +38,13 @@ func NewKafkaPoster(dialURL string, attempts int, fallbackFileDir string) (*Kafk return amqp, nil } +// KafkaPoster is a kafka poster type KafkaPoster struct { - dialURL string - topic string // identifier of the CDR queue where we publish - attempts int - fallbackFileDir string - sync.Mutex // protect writer - writer *kafka.Writer + dialURL string + topic string // identifier of the CDR queue where we publish + attempts int + sync.Mutex // protect writer + writer *kafka.Writer } func (pstr *KafkaPoster) parseURL(dialURL string) error { @@ -65,7 +64,7 @@ func (pstr *KafkaPoster) parseURL(dialURL string) error { // Post is the method being called when we need to post anything in the queue // the optional chn will permits channel caching -func (pstr *KafkaPoster) Post(content []byte, fallbackFileName, key string) (err error) { +func (pstr *KafkaPoster) Post(content []byte, key string) (err error) { pstr.newPostWriter() pstr.Lock() if err = pstr.writer.WriteMessages(context.Background(), kafka.Message{ @@ -76,13 +75,10 @@ func (pstr *KafkaPoster) Post(content []byte, fallbackFileName, key string) (err return } pstr.Unlock() - if err != nil && fallbackFileName != utils.META_NONE { - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) - return err - } return } +// Close closes the kafka writer func (pstr *KafkaPoster) Close() { pstr.Lock() if pstr.writer != nil { diff --git a/engine/pstr_s3.go b/engine/pstr_s3.go index 4604deb3d..80694545c 100644 --- a/engine/pstr_s3.go +++ b/engine/pstr_s3.go @@ -32,29 +32,30 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewS3Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { +// NewS3Poster creates a s3 poster +func NewS3Poster(dialURL string, attempts int) (Poster, error) { pstr := &S3Poster{ - attempts: attempts, - fallbackFileDir: fallbackFileDir, + attempts: attempts, } pstr.parseURL(dialURL) return pstr, nil } +// S3Poster is a s3 poster type S3Poster struct { sync.Mutex - dialURL string - awsRegion string - awsID string - awsKey string - awsToken string - attempts int - fallbackFileDir string - queueID string - folderPath string - session *session.Session + dialURL string + awsRegion string + awsID string + awsKey string + awsToken string + attempts int + queueID string + folderPath string + session *session.Session } +// Close for Poster interface func (pstr *S3Poster) Close() {} func (pstr *S3Poster) parseURL(dialURL string) { @@ -83,7 +84,8 @@ func (pstr *S3Poster) parseURL(dialURL string) { } } -func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err error) { +// Post is the method being called when we need to post anything in the queue +func (pstr *S3Poster) Post(message []byte, key string) (err error) { var svc *s3manager.Uploader fib := utils.Fib() @@ -96,11 +98,8 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er } } if err != nil { - if fallbackFileName != utils.META_NONE { - utils.Logger.Warning(fmt.Sprintf(" creating new session, err: %s", err.Error())) - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message) - } - return err + utils.Logger.Warning(fmt.Sprintf(" creating new session, err: %s", err.Error())) + return } for i := 0; i < pstr.attempts; i++ { @@ -124,11 +123,10 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er time.Sleep(time.Duration(fib()) * time.Second) } } - if err != nil && fallbackFileName != utils.META_NONE { + if err != nil { utils.Logger.Warning(fmt.Sprintf(" posting new message, err: %s", err.Error())) - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message) } - return err + return } func (pstr *S3Poster) newPosterSession() (s *s3manager.Uploader, err error) { diff --git a/engine/pstr_sqs.go b/engine/pstr_sqs.go index 95976ae02..b6f95cd4e 100644 --- a/engine/pstr_sqs.go +++ b/engine/pstr_sqs.go @@ -32,30 +32,31 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { +// NewSQSPoster creates a poster for sqs +func NewSQSPoster(dialURL string, attempts int) (Poster, error) { pstr := &SQSPoster{ - attempts: attempts, - fallbackFileDir: fallbackFileDir, + attempts: attempts, } pstr.parseURL(dialURL) return pstr, nil } +// SQSPoster is a poster for sqs type SQSPoster struct { sync.Mutex - dialURL string - awsRegion string - awsID string - awsKey string - awsToken string - attempts int - fallbackFileDir string - queueURL *string - queueID string + dialURL string + awsRegion string + awsID string + awsKey string + awsToken string + attempts int + queueURL *string + queueID string // getQueueOnce sync.Once session *session.Session } +// Close for Poster interface func (pstr *SQSPoster) Close() {} func (pstr *SQSPoster) parseURL(dialURL string) { @@ -115,7 +116,8 @@ func (pstr *SQSPoster) getQueueURL() (err error) { return err } -func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err error) { +// Post is the method being called when we need to post anything in the queue +func (pstr *SQSPoster) Post(message []byte, _ string) (err error) { var svc *sqs.SQS fib := utils.Fib() @@ -128,11 +130,8 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err } } if err != nil { - if fallbackFileName != utils.META_NONE { - utils.Logger.Warning(fmt.Sprintf(" creating new session, err: %s", err.Error())) - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message) - } - return err + utils.Logger.Warning(fmt.Sprintf(" creating new session, err: %s", err.Error())) + return } for i := 0; i < pstr.attempts; i++ { @@ -148,11 +147,10 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err time.Sleep(time.Duration(fib()) * time.Second) } } - if err != nil && fallbackFileName != utils.META_NONE { + if err != nil { utils.Logger.Warning(fmt.Sprintf(" posting new message, err: %s", err.Error())) - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message) } - return err + return } func (pstr *SQSPoster) newPosterSession() (s *sqs.SQS, err error) { diff --git a/ers/kafka.go b/ers/kafka.go index 0f4a2bbf1..04d00aff1 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -139,8 +139,7 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) { } if rdr.Config().ProcessedPath != utils.EmptyString { // post it if err := engine.PostersCache.PostKafka(rdr.Config().ProcessedPath, - rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Value, "", - utils.META_NONE, string(msg.Key)); err != nil { + rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Value, string(msg.Key)); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, string(msg.Key), err.Error())) diff --git a/services/apierv1.go b/services/apierv1.go index afdf32a9c..14ef20f1b 100644 --- a/services/apierv1.go +++ b/services/apierv1.go @@ -97,11 +97,9 @@ func (apiService *ApierV1Service) Start() (err error) { Config: apiService.cfg, Responder: apiService.responderService.GetResponder(), SchedulerService: apiService.schedService, - HTTPPoster: engine.NewHTTPPoster(apiService.cfg.GeneralCfg().HttpSkipTlsVerify, - apiService.cfg.GeneralCfg().ReplyTimeout), - FilterS: filterS, - ConnMgr: apiService.connMgr, - StorDBChan: storDBChan, + FilterS: filterS, + ConnMgr: apiService.connMgr, + StorDBChan: storDBChan, } go func(api *v1.ApierV1, stopChan chan struct{}) {