mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated CDRExporter
This commit is contained in:
committed by
Dan Christian Bogos
parent
2520575c62
commit
94c29c93d2
@@ -46,8 +46,7 @@ type ApierV1 struct {
|
||||
DataManager *engine.DataManager
|
||||
Config *config.CGRConfig
|
||||
Responder *engine.Responder
|
||||
SchedulerService SchedulerGeter // Need to have them capitalize so we can export in V2
|
||||
HTTPPoster *engine.HTTPPoster
|
||||
SchedulerService SchedulerGeter // Need to have them capitalize so we can export in V2
|
||||
FilterS *engine.FilterS //Used for CDR Exporter
|
||||
ConnMgr *engine.ConnManager
|
||||
|
||||
@@ -1222,27 +1221,30 @@ func (apiv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string
|
||||
}
|
||||
switch ffn.Transport {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
|
||||
_, err = engine.NewHTTPPoster(apiv1.Config.GeneralCfg().HttpSkipTlsVerify,
|
||||
apiv1.Config.GeneralCfg().ReplyTimeout).Post(ffn.Address,
|
||||
utils.PosterTransportContentTypes[ffn.Transport], fileContent,
|
||||
apiv1.Config.GeneralCfg().PosterAttempts, failoverPath)
|
||||
var pstr *engine.HTTPPoster
|
||||
pstr, err = engine.NewHTTPPoster(apiv1.Config.GeneralCfg().HttpSkipTlsVerify,
|
||||
apiv1.Config.GeneralCfg().ReplyTimeout, ffn.Address,
|
||||
utils.PosterTransportContentTypes[ffn.Transport],
|
||||
apiv1.Config.GeneralCfg().PosterAttempts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = pstr.Post(fileContent, utils.EmptyString) //this may cause panics for contentType == utils.CONTENT_FORM
|
||||
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
|
||||
err = engine.PostersCache.PostAMQP(ffn.Address,
|
||||
apiv1.Config.GeneralCfg().PosterAttempts, fileContent,
|
||||
utils.PosterTransportContentTypes[ffn.Transport],
|
||||
failedReqsOutDir, file.Name())
|
||||
apiv1.Config.GeneralCfg().PosterAttempts, fileContent)
|
||||
case utils.MetaAMQPV1jsonMap:
|
||||
err = engine.PostersCache.PostAMQPv1(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts,
|
||||
fileContent, failedReqsOutDir, file.Name())
|
||||
fileContent)
|
||||
case utils.MetaSQSjsonMap:
|
||||
err = engine.PostersCache.PostSQS(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts,
|
||||
fileContent, failedReqsOutDir, file.Name())
|
||||
fileContent)
|
||||
case utils.MetaKafkajsonMap:
|
||||
err = engine.PostersCache.PostKafka(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts,
|
||||
fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix())
|
||||
fileContent, utils.UUIDSha1Prefix())
|
||||
case utils.MetaS3jsonMap:
|
||||
err = engine.PostersCache.PostS3(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts,
|
||||
fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix())
|
||||
fileContent, utils.UUIDSha1Prefix())
|
||||
default:
|
||||
err = fmt.Errorf("unsupported replication transport: %s", ffn.Transport)
|
||||
}
|
||||
|
||||
@@ -147,7 +147,7 @@ func (api *ApierV1) ExportCdrsToFile(attr utils.AttrExpFileCdrs, reply *utils.Ex
|
||||
cdrexp, err := engine.NewCDRExporter(cdrs, exportTemplate, exportFormat,
|
||||
filePath, utils.META_NONE, exportID, exportTemplate.Synchronous,
|
||||
exportTemplate.Attempts, fieldSep,
|
||||
api.Config.GeneralCfg().HttpSkipTlsVerify, api.HTTPPoster,
|
||||
api.Config.GeneralCfg().HttpSkipTlsVerify,
|
||||
api.Config.ApierCfg().AttributeSConns, api.FilterS)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -292,7 +292,7 @@ func (api *ApierV1) ExportCDRs(arg ArgExportCDRs, reply *RplExportedCDRs) (err e
|
||||
filePath, utils.META_NONE, exportID,
|
||||
synchronous, attempts, fieldSep,
|
||||
api.Config.GeneralCfg().HttpSkipTlsVerify,
|
||||
api.HTTPPoster, api.Config.ApierCfg().AttributeSConns, api.FilterS)
|
||||
api.Config.ApierCfg().AttributeSConns, api.FilterS)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
|
||||
@@ -105,7 +105,7 @@ func (apiv2 *ApierV2) ExportCdrsToFile(attr AttrExportCdrsToFile, reply *utils.E
|
||||
cdrexp, err := engine.NewCDRExporter(cdrs, exportTemplate, exportFormat,
|
||||
filePath, utils.META_NONE, exportID, exportTemplate.Synchronous,
|
||||
exportTemplate.Attempts, fieldSep, apiv2.Config.GeneralCfg().HttpSkipTlsVerify,
|
||||
apiv2.HTTPPoster, apiv2.Config.ApierCfg().AttributeSConns, apiv2.FilterS)
|
||||
apiv2.Config.ApierCfg().AttributeSConns, apiv2.FilterS)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
|
||||
191
engine/action.go
191
engine/action.go
@@ -26,7 +26,6 @@ import (
|
||||
"html/template"
|
||||
"net"
|
||||
"net/smtp"
|
||||
"path"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
@@ -40,9 +39,7 @@ import (
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
|
||||
/*
|
||||
Structure to be filled for each tariff plan with the bonus value for received calls minutes.
|
||||
*/
|
||||
// Action will be filled for each tariff plan with the bonus value for received calls minutes.
|
||||
type Action struct {
|
||||
Id string
|
||||
ActionType string
|
||||
@@ -54,6 +51,7 @@ type Action struct {
|
||||
balanceValue float64 // balance value after action execution, used with cdrlog
|
||||
}
|
||||
|
||||
// Clone returns a clone of the action
|
||||
func (a *Action) Clone() (cln *Action) {
|
||||
if a == nil {
|
||||
return
|
||||
@@ -88,8 +86,8 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
|
||||
utils.RESET_COUNTERS: resetCountersAction,
|
||||
utils.ENABLE_ACCOUNT: enableAccountAction,
|
||||
utils.DISABLE_ACCOUNT: disableAccountAction,
|
||||
utils.HttpPost: callUrl,
|
||||
utils.HttpPostAsync: callUrlAsync,
|
||||
utils.HttpPost: callURL,
|
||||
utils.HttpPostAsync: callURLAsync,
|
||||
utils.MAIL_ASYNC: mailAsync,
|
||||
utils.SET_DDESTINATIONS: setddestinations,
|
||||
utils.REMOVE_ACCOUNT: removeAccountAction,
|
||||
@@ -381,17 +379,11 @@ func sendAMQP(ub *Account, a *Action, acs Actions, extraData interface{}) error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := config.CgrConfig()
|
||||
fallbackFileName := (&utils.FallbackFileName{
|
||||
Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaAMQPjsonMap,
|
||||
Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(),
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
}).AsString()
|
||||
|
||||
return PostersCache.PostAMQP(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
body, utils.CONTENT_JSON, cfg.GeneralCfg().FailedPostsDir, fallbackFileName)
|
||||
err = PostersCache.PostAMQP(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body)
|
||||
if err != nil {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaAMQPjsonMap, body)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func sendAWS(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
@@ -399,17 +391,11 @@ func sendAWS(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := config.CgrConfig()
|
||||
fallbackFileName := (&utils.FallbackFileName{
|
||||
Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaAMQPV1jsonMap,
|
||||
Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(),
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
}).AsString()
|
||||
|
||||
return PostersCache.PostAMQPv1(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName)
|
||||
err = PostersCache.PostAMQPv1(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body)
|
||||
if err != nil {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaAMQPV1jsonMap, body)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func sendSQS(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
@@ -417,17 +403,11 @@ func sendSQS(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := config.CgrConfig()
|
||||
fallbackFileName := (&utils.FallbackFileName{
|
||||
Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaSQSjsonMap,
|
||||
Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(),
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
}).AsString()
|
||||
|
||||
return PostersCache.PostSQS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName)
|
||||
err = PostersCache.PostSQS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body)
|
||||
if err != nil {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaSQSjsonMap, body)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
@@ -435,17 +415,11 @@ func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := config.CgrConfig()
|
||||
fallbackFileName := (&utils.FallbackFileName{
|
||||
Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaKafkajsonMap,
|
||||
Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(),
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
}).AsString()
|
||||
|
||||
return PostersCache.PostKafka(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName, utils.UUIDSha1Prefix())
|
||||
err = PostersCache.PostKafka(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix())
|
||||
if err != nil {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaKafkajsonMap, body)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
@@ -453,57 +427,49 @@ func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := config.CgrConfig()
|
||||
fallbackFileName := (&utils.FallbackFileName{
|
||||
Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaS3jsonMap,
|
||||
Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(),
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
}).AsString()
|
||||
|
||||
return PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName, utils.UUIDSha1Prefix())
|
||||
err = PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix())
|
||||
if err != nil {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaS3jsonMap, body)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func callUrl(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
jsn, err := getOneData(ub, extraData)
|
||||
func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
body, err := getOneData(ub, extraData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := config.CgrConfig()
|
||||
ffn := &utils.FallbackFileName{
|
||||
Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaHTTPjson,
|
||||
Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(),
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify,
|
||||
config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters,
|
||||
utils.CONTENT_JSON, config.CgrConfig().GeneralCfg().PosterAttempts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = pstr.Post(body, utils.EmptyString)
|
||||
if err != nil {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, body)
|
||||
}
|
||||
_, err = NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify,
|
||||
config.CgrConfig().GeneralCfg().ReplyTimeout).Post(a.ExtraParameters,
|
||||
utils.CONTENT_JSON, jsn, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
path.Join(cfg.GeneralCfg().FailedPostsDir, ffn.AsString()))
|
||||
return err
|
||||
}
|
||||
|
||||
// Does not block for posts, no error reports
|
||||
func callUrlAsync(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
jsn, err := getOneData(ub, extraData)
|
||||
func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
body, err := getOneData(ub, extraData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := config.CgrConfig()
|
||||
ffn := &utils.FallbackFileName{
|
||||
Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaHTTPjson,
|
||||
Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(),
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify,
|
||||
config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters,
|
||||
utils.CONTENT_JSON, config.CgrConfig().GeneralCfg().PosterAttempts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify,
|
||||
config.CgrConfig().GeneralCfg().ReplyTimeout).Post(a.ExtraParameters,
|
||||
utils.CONTENT_JSON, jsn, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
path.Join(cfg.GeneralCfg().FailedPostsDir, ffn.AsString()))
|
||||
go func() {
|
||||
err := pstr.Post(body, utils.EmptyString)
|
||||
if err != nil {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, body)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -548,24 +514,24 @@ func mailAsync(ub *Account, a *Action, acs Actions, extraData interface{}) error
|
||||
}
|
||||
|
||||
func setddestinations(ub *Account, a *Action, acs Actions, extraData interface{}) (err error) {
|
||||
var ddcDestId string
|
||||
var ddcDestID string
|
||||
for _, bchain := range ub.BalanceMap {
|
||||
for _, b := range bchain {
|
||||
for destId := range b.DestinationIDs {
|
||||
if strings.HasPrefix(destId, "*ddc") {
|
||||
ddcDestId = destId
|
||||
for destID := range b.DestinationIDs {
|
||||
if strings.HasPrefix(destID, "*ddc") {
|
||||
ddcDestID = destID
|
||||
break
|
||||
}
|
||||
}
|
||||
if ddcDestId != "" {
|
||||
if ddcDestID != "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
if ddcDestId != "" {
|
||||
if ddcDestID != "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
if ddcDestId != "" {
|
||||
if ddcDestID != "" {
|
||||
// make slice from prefixes
|
||||
// Review here prefixes
|
||||
// prefixes := make([]string, len(sq.Metrics))
|
||||
@@ -574,8 +540,8 @@ func setddestinations(ub *Account, a *Action, acs Actions, extraData interface{}
|
||||
// prefixes[i] = p
|
||||
// i++
|
||||
// }
|
||||
newDest := &Destination{Id: ddcDestId}
|
||||
oldDest, err := dm.GetDestination(ddcDestId, false, utils.NonTransactional)
|
||||
newDest := &Destination{Id: ddcDestID}
|
||||
oldDest, err := dm.GetDestination(ddcDestID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -583,7 +549,7 @@ func setddestinations(ub *Account, a *Action, acs Actions, extraData interface{}
|
||||
if err = dm.SetDestination(newDest, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = dm.CacheDataFromDB(utils.DESTINATION_PREFIX, []string{ddcDestId}, true); err != nil {
|
||||
if err = dm.CacheDataFromDB(utils.DESTINATION_PREFIX, []string{ddcDestID}, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -673,7 +639,7 @@ func removeBalanceAction(ub *Account, a *Action, acs Actions, extraData interfac
|
||||
// delete without preserving order
|
||||
bChain[i] = bChain[len(bChain)-1]
|
||||
bChain = bChain[:len(bChain)-1]
|
||||
i -= 1
|
||||
i--
|
||||
found = true
|
||||
}
|
||||
}
|
||||
@@ -714,6 +680,7 @@ func transferMonetaryDefaultAction(ub *Account, a *Action, acs Actions, extraDat
|
||||
return nil
|
||||
}
|
||||
|
||||
// RPCRequest used by rpc action
|
||||
type RPCRequest struct {
|
||||
Address string
|
||||
Transport string
|
||||
@@ -859,7 +826,7 @@ func publishBalance(ub *Account, a *Action, acs Actions, extraData interface{})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Structure to store actions according to weight
|
||||
// Actions used to store actions according to weight
|
||||
type Actions []*Action
|
||||
|
||||
func (apl Actions) Len() int {
|
||||
@@ -875,10 +842,12 @@ func (apl Actions) Less(j, i int) bool {
|
||||
return apl[i].Weight < apl[j].Weight
|
||||
}
|
||||
|
||||
// Sort used to implement sort interface
|
||||
func (apl Actions) Sort() {
|
||||
sort.Sort(apl)
|
||||
}
|
||||
|
||||
// Clone returns a clone from object
|
||||
func (apl Actions) Clone() (interface{}, error) {
|
||||
if apl == nil {
|
||||
return nil, nil
|
||||
@@ -1023,7 +992,7 @@ func removeExpired(acc *Account, action *Action, _ Actions, extraData interface{
|
||||
// delete without preserving order
|
||||
bChain[i] = bChain[len(bChain)-1]
|
||||
bChain = bChain[:len(bChain)-1]
|
||||
i -= 1
|
||||
i--
|
||||
found = true
|
||||
}
|
||||
}
|
||||
@@ -1035,22 +1004,20 @@ func removeExpired(acc *Account, action *Action, _ Actions, extraData interface{
|
||||
}
|
||||
|
||||
func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
jsn, err := json.Marshal(extraData)
|
||||
body, err := json.Marshal(extraData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := config.CgrConfig()
|
||||
ffn := &utils.FallbackFileName{
|
||||
Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaHTTPjson,
|
||||
Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(),
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify,
|
||||
config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters,
|
||||
utils.CONTENT_JSON, config.CgrConfig().GeneralCfg().PosterAttempts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = pstr.Post(body, utils.EmptyString)
|
||||
if err != nil {
|
||||
addFailedPost(a.ExtraParameters, utils.MetaHTTPjson, body)
|
||||
}
|
||||
_, err = NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify,
|
||||
config.CgrConfig().GeneralCfg().ReplyTimeout).Post(a.ExtraParameters,
|
||||
utils.CONTENT_JSON, jsn, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
path.Join(cfg.GeneralCfg().FailedPostsDir, ffn.AsString()))
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
128
engine/cdr.go
128
engine/cdr.go
@@ -141,7 +141,7 @@ func (cdr *CDR) ComputeCGRID() {
|
||||
cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.OriginHost)
|
||||
}
|
||||
|
||||
// Format cost as string on export
|
||||
// FormatCost formats the cost as string on export
|
||||
func (cdr *CDR) FormatCost(shiftDecimals, roundDecimals int) string {
|
||||
cost := cdr.Cost
|
||||
if shiftDecimals != 0 {
|
||||
@@ -150,7 +150,7 @@ func (cdr *CDR) FormatCost(shiftDecimals, roundDecimals int) string {
|
||||
return strconv.FormatFloat(cost, 'f', roundDecimals, 64)
|
||||
}
|
||||
|
||||
// Used to retrieve fields as string, primary fields are const labeled
|
||||
// FieldAsString is used to retrieve fields as string, primary fields are const labeled
|
||||
func (cdr *CDR) FieldAsString(rsrPrs *config.RSRParser) (parsed string, err error) {
|
||||
parsed, err = rsrPrs.ParseDataProviderWithInterfaces(config.NewNavigableMap(cdr.AsMapStringIface()), utils.NestingSep)
|
||||
if err != nil {
|
||||
@@ -159,7 +159,7 @@ func (cdr *CDR) FieldAsString(rsrPrs *config.RSRParser) (parsed string, err erro
|
||||
return
|
||||
}
|
||||
|
||||
// concatenates values of multiple fields defined in template, used eg in CDR templates
|
||||
// FieldsAsString concatenates values of multiple fields defined in template, used eg in CDR templates
|
||||
func (cdr *CDR) FieldsAsString(rsrFlds config.RSRParsers) string {
|
||||
outVal, err := rsrFlds.ParseDataProviderWithInterfaces(config.NewNavigableMap(cdr.AsMapStringIface()), utils.NestingSep)
|
||||
if err != nil {
|
||||
@@ -168,7 +168,7 @@ func (cdr *CDR) FieldsAsString(rsrFlds config.RSRParsers) string {
|
||||
return outVal
|
||||
}
|
||||
|
||||
// Populates the field with id from value; strings are appended to original one
|
||||
// ParseFieldValue populates the field with id from value; strings are appended to original one
|
||||
func (cdr *CDR) ParseFieldValue(fieldId, fieldVal, timezone string) error {
|
||||
var err error
|
||||
switch fieldId {
|
||||
@@ -394,7 +394,7 @@ func (cdr *CDR) exportFieldValue(cfgCdrFld *config.FCTemplate, filterS *FilterS)
|
||||
return
|
||||
}
|
||||
|
||||
func (cdr *CDR) formatField(cfgFld *config.FCTemplate, httpSkipTlsCheck bool,
|
||||
func (cdr *CDR) formatField(cfgFld *config.FCTemplate, httpSkipTLSCheck bool,
|
||||
groupedCDRs []*CDR, filterS *FilterS) (outVal string, err error) {
|
||||
switch cfgFld.Type {
|
||||
case utils.META_FILLER:
|
||||
@@ -427,7 +427,7 @@ func (cdr *CDR) formatField(cfgFld *config.FCTemplate, httpSkipTlsCheck bool,
|
||||
}
|
||||
if len(httpAddr) == 0 {
|
||||
err = fmt.Errorf("Empty http address for field %s type %s", cfgFld.Tag, cfgFld.Type)
|
||||
} else if outValByte, err = HttpJsonPost(httpAddr, httpSkipTlsCheck, jsn); err == nil {
|
||||
} else if outValByte, err = HttpJsonPost(httpAddr, httpSkipTLSCheck, jsn); err == nil {
|
||||
outVal = string(outValByte)
|
||||
if len(outVal) == 0 && cfgFld.Mandatory {
|
||||
err = fmt.Errorf("Empty result for http_post field: %s", cfgFld.Tag)
|
||||
@@ -451,10 +451,10 @@ func (cdr *CDR) formatField(cfgFld *config.FCTemplate, httpSkipTlsCheck bool,
|
||||
return utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory)
|
||||
}
|
||||
|
||||
// Used in place where we need to export the CDR based on an export template
|
||||
// AsExportRecord is used in place where we need to export the CDR based on an export template
|
||||
// ExportRecord is a []string to keep it compatible with encoding/csv Writer
|
||||
func (cdr *CDR) AsExportRecord(exportFields []*config.FCTemplate,
|
||||
httpSkipTlsCheck bool, groupedCDRs []*CDR, filterS *FilterS) (expRecord []string, err error) {
|
||||
httpSkipTLSCheck bool, groupedCDRs []*CDR, filterS *FilterS) (expRecord []string, err error) {
|
||||
nM := config.NewNavigableMap(nil)
|
||||
nM.Set([]string{utils.MetaReq}, cdr.AsMapStringIface(), false, false)
|
||||
for _, cfgFld := range exportFields {
|
||||
@@ -464,20 +464,20 @@ func (cdr *CDR) AsExportRecord(exportFields []*config.FCTemplate,
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
if fmtOut, err := cdr.formatField(cfgFld, httpSkipTlsCheck, groupedCDRs, filterS); err != nil {
|
||||
var fmtOut string
|
||||
if fmtOut, err = cdr.formatField(cfgFld, httpSkipTLSCheck, groupedCDRs, filterS); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<CDR> error: %s exporting field: %s, CDR: %s\n",
|
||||
err.Error(), utils.ToJSON(cfgFld), utils.ToJSON(cdr)))
|
||||
return nil, err
|
||||
} else {
|
||||
expRecord = append(expRecord, fmtOut)
|
||||
}
|
||||
expRecord = append(expRecord, fmtOut)
|
||||
}
|
||||
return expRecord, nil
|
||||
}
|
||||
|
||||
// AsExportMap converts the CDR into a map[string]string based on export template
|
||||
// Used in real-time replication as well as remote exports
|
||||
func (cdr *CDR) AsExportMap(exportFields []*config.FCTemplate, httpSkipTlsCheck bool,
|
||||
func (cdr *CDR) AsExportMap(exportFields []*config.FCTemplate, httpSkipTLSCheck bool,
|
||||
groupedCDRs []*CDR, filterS *FilterS) (expMap map[string]string, err error) {
|
||||
expMap = make(map[string]string)
|
||||
nM := config.NewNavigableMap(nil)
|
||||
@@ -489,41 +489,41 @@ func (cdr *CDR) AsExportMap(exportFields []*config.FCTemplate, httpSkipTlsCheck
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
if fmtOut, err := cdr.formatField(cfgFld, httpSkipTlsCheck, groupedCDRs, filterS); err != nil {
|
||||
var fmtOut string
|
||||
if fmtOut, err = cdr.formatField(cfgFld, httpSkipTLSCheck, groupedCDRs, filterS); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<CDR> error: %s exporting field: %s, CDR: %s\n",
|
||||
err.Error(), utils.ToJSON(cfgFld), utils.ToJSON(cdr)))
|
||||
return nil, err
|
||||
} else {
|
||||
expMap[cfgFld.FieldId] += fmtOut
|
||||
}
|
||||
expMap[cfgFld.FieldId] += fmtOut
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// AsCDRsTBL converts the CDR into the format used for SQL storage
|
||||
func (cdr *CDR) AsCDRsql() (cdrSql *CDRsql) {
|
||||
cdrSql = new(CDRsql)
|
||||
cdrSql.Cgrid = cdr.CGRID
|
||||
cdrSql.RunID = cdr.RunID
|
||||
cdrSql.OriginHost = cdr.OriginHost
|
||||
cdrSql.Source = cdr.Source
|
||||
cdrSql.OriginID = cdr.OriginID
|
||||
cdrSql.TOR = cdr.ToR
|
||||
cdrSql.RequestType = cdr.RequestType
|
||||
cdrSql.Tenant = cdr.Tenant
|
||||
cdrSql.Category = cdr.Category
|
||||
cdrSql.Account = cdr.Account
|
||||
cdrSql.Subject = cdr.Subject
|
||||
cdrSql.Destination = cdr.Destination
|
||||
cdrSql.SetupTime = cdr.SetupTime
|
||||
cdrSql.AnswerTime = cdr.AnswerTime
|
||||
cdrSql.Usage = cdr.Usage.Nanoseconds()
|
||||
cdrSql.ExtraFields = utils.ToJSON(cdr.ExtraFields)
|
||||
cdrSql.CostSource = cdr.CostSource
|
||||
cdrSql.Cost = cdr.Cost
|
||||
cdrSql.CostDetails = utils.ToJSON(cdr.CostDetails)
|
||||
cdrSql.ExtraInfo = cdr.ExtraInfo
|
||||
cdrSql.CreatedAt = time.Now()
|
||||
// AsCDRsql converts the CDR into the format used for SQL storage
|
||||
func (cdr *CDR) AsCDRsql() (cdrSQL *CDRsql) {
|
||||
cdrSQL = new(CDRsql)
|
||||
cdrSQL.Cgrid = cdr.CGRID
|
||||
cdrSQL.RunID = cdr.RunID
|
||||
cdrSQL.OriginHost = cdr.OriginHost
|
||||
cdrSQL.Source = cdr.Source
|
||||
cdrSQL.OriginID = cdr.OriginID
|
||||
cdrSQL.TOR = cdr.ToR
|
||||
cdrSQL.RequestType = cdr.RequestType
|
||||
cdrSQL.Tenant = cdr.Tenant
|
||||
cdrSQL.Category = cdr.Category
|
||||
cdrSQL.Account = cdr.Account
|
||||
cdrSQL.Subject = cdr.Subject
|
||||
cdrSQL.Destination = cdr.Destination
|
||||
cdrSQL.SetupTime = cdr.SetupTime
|
||||
cdrSQL.AnswerTime = cdr.AnswerTime
|
||||
cdrSQL.Usage = cdr.Usage.Nanoseconds()
|
||||
cdrSQL.ExtraFields = utils.ToJSON(cdr.ExtraFields)
|
||||
cdrSQL.CostSource = cdr.CostSource
|
||||
cdrSQL.Cost = cdr.Cost
|
||||
cdrSQL.CostDetails = utils.ToJSON(cdr.CostDetails)
|
||||
cdrSQL.ExtraInfo = cdr.ExtraInfo
|
||||
cdrSQL.CreatedAt = time.Now()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -605,34 +605,34 @@ func (cdr *CDR) UpdateFromCGREvent(cgrEv *utils.CGREvent, fields []string) (err
|
||||
}
|
||||
|
||||
// NewCDRFromSQL converts the CDRsql into CDR
|
||||
func NewCDRFromSQL(cdrSql *CDRsql) (cdr *CDR, err error) {
|
||||
func NewCDRFromSQL(cdrSQL *CDRsql) (cdr *CDR, err error) {
|
||||
cdr = new(CDR)
|
||||
cdr.CGRID = cdrSql.Cgrid
|
||||
cdr.RunID = cdrSql.RunID
|
||||
cdr.OriginHost = cdrSql.OriginHost
|
||||
cdr.Source = cdrSql.Source
|
||||
cdr.OriginID = cdrSql.OriginID
|
||||
cdr.OrderID = cdrSql.ID
|
||||
cdr.ToR = cdrSql.TOR
|
||||
cdr.RequestType = cdrSql.RequestType
|
||||
cdr.Tenant = cdrSql.Tenant
|
||||
cdr.Category = cdrSql.Category
|
||||
cdr.Account = cdrSql.Account
|
||||
cdr.Subject = cdrSql.Subject
|
||||
cdr.Destination = cdrSql.Destination
|
||||
cdr.SetupTime = cdrSql.SetupTime
|
||||
cdr.AnswerTime = cdrSql.AnswerTime
|
||||
cdr.Usage = time.Duration(cdrSql.Usage)
|
||||
cdr.CostSource = cdrSql.CostSource
|
||||
cdr.Cost = cdrSql.Cost
|
||||
cdr.ExtraInfo = cdrSql.ExtraInfo
|
||||
if cdrSql.ExtraFields != "" {
|
||||
if err = json.Unmarshal([]byte(cdrSql.ExtraFields), &cdr.ExtraFields); err != nil {
|
||||
cdr.CGRID = cdrSQL.Cgrid
|
||||
cdr.RunID = cdrSQL.RunID
|
||||
cdr.OriginHost = cdrSQL.OriginHost
|
||||
cdr.Source = cdrSQL.Source
|
||||
cdr.OriginID = cdrSQL.OriginID
|
||||
cdr.OrderID = cdrSQL.ID
|
||||
cdr.ToR = cdrSQL.TOR
|
||||
cdr.RequestType = cdrSQL.RequestType
|
||||
cdr.Tenant = cdrSQL.Tenant
|
||||
cdr.Category = cdrSQL.Category
|
||||
cdr.Account = cdrSQL.Account
|
||||
cdr.Subject = cdrSQL.Subject
|
||||
cdr.Destination = cdrSQL.Destination
|
||||
cdr.SetupTime = cdrSQL.SetupTime
|
||||
cdr.AnswerTime = cdrSQL.AnswerTime
|
||||
cdr.Usage = time.Duration(cdrSQL.Usage)
|
||||
cdr.CostSource = cdrSQL.CostSource
|
||||
cdr.Cost = cdrSQL.Cost
|
||||
cdr.ExtraInfo = cdrSQL.ExtraInfo
|
||||
if cdrSQL.ExtraFields != "" {
|
||||
if err = json.Unmarshal([]byte(cdrSQL.ExtraFields), &cdr.ExtraFields); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if cdrSql.CostDetails != "" {
|
||||
if err = json.Unmarshal([]byte(cdrSql.CostDetails), &cdr.CostDetails); err != nil {
|
||||
if cdrSQL.CostDetails != "" {
|
||||
if err = json.Unmarshal([]byte(cdrSQL.CostDetails), &cdr.CostDetails); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@@ -664,7 +664,7 @@ type ExternalCDR struct {
|
||||
PreRated bool // Mark the CDR as rated so we do not process it during mediation
|
||||
}
|
||||
|
||||
// Used when authorizing requests from outside, eg ApierV1.GetMaxUsage
|
||||
// UsageRecord is used when authorizing requests from outside, eg ApierV1.GetMaxUsage
|
||||
type UsageRecord struct {
|
||||
ToR string
|
||||
RequestType string
|
||||
|
||||
187
engine/cdre.go
187
engine/cdre.go
@@ -33,7 +33,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/ltcache"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -50,10 +52,45 @@ const (
|
||||
metaCostCDRs = "*cdrs_cost"
|
||||
)
|
||||
|
||||
var failedPostCache *ltcache.Cache
|
||||
|
||||
func init() {
|
||||
failedPostCache = ltcache.NewCache(-1, 5*time.Second, false, writeFailedPosts)
|
||||
}
|
||||
|
||||
func writeFailedPosts(itmID string, value interface{}) {
|
||||
expEv, canConvert := value.(*ExportEvents)
|
||||
if !canConvert {
|
||||
return
|
||||
}
|
||||
filePath := path.Join(config.CgrConfig().GeneralCfg().FailedPostsDir, expEv.FileName(utils.CDRSCtx))
|
||||
if err := expEv.WriteToFile(filePath); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write file <%s> because <%s>",
|
||||
utils.CDRs, filePath, err))
|
||||
}
|
||||
return
|
||||
}
|
||||
func addFailedPost(expPath, format string, ev []byte) {
|
||||
var failedPost *ExportEvents
|
||||
if x, ok := failedPostCache.Get(utils.ConcatenatedKey(expPath, format)); ok {
|
||||
if x != nil {
|
||||
failedPost = x.(*ExportEvents)
|
||||
}
|
||||
}
|
||||
if failedPost == nil {
|
||||
failedPost = &ExportEvents{
|
||||
Path: expPath,
|
||||
Format: format,
|
||||
}
|
||||
}
|
||||
failedPost.AddEvent(ev)
|
||||
failedPostCache.Set(failedPost.ID(), failedPost, nil)
|
||||
}
|
||||
|
||||
// NewCDRExporter returns a new CDRExporter
|
||||
func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, exportPath, fallbackPath, exportID string,
|
||||
synchronous bool, attempts int, fieldSeparator rune,
|
||||
httpSkipTLSCheck bool, httpPoster *HTTPPoster, attrsConns []string, filterS *FilterS) (*CDRExporter, error) {
|
||||
httpSkipTLSCheck bool, attrsConns []string, filterS *FilterS) (*CDRExporter, error) {
|
||||
if len(cdrs) == 0 { // Nothing to export
|
||||
return nil, nil
|
||||
}
|
||||
@@ -68,7 +105,6 @@ func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreCfg, exportFormat, e
|
||||
attempts: attempts,
|
||||
fieldSeparator: fieldSeparator,
|
||||
httpSkipTLSCheck: httpSkipTLSCheck,
|
||||
httpPoster: httpPoster,
|
||||
negativeExports: make(map[string]string),
|
||||
attrsConns: attrsConns,
|
||||
filterS: filterS,
|
||||
@@ -89,7 +125,6 @@ type CDRExporter struct {
|
||||
attempts int
|
||||
fieldSeparator rune
|
||||
httpSkipTLSCheck bool
|
||||
httpPoster *HTTPPoster
|
||||
|
||||
header, trailer []string // Header and Trailer fields
|
||||
content [][]string // Rows of cdr fields
|
||||
@@ -263,27 +298,29 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
|
||||
default:
|
||||
return fmt.Errorf("unsupported exportFormat: <%s>", cdre.exportFormat)
|
||||
}
|
||||
// compute fallbackPath
|
||||
fallbackPath := utils.META_NONE
|
||||
ffn := &utils.FallbackFileName{Module: utils.CDRPoster, Transport: cdre.exportFormat, Address: cdre.exportPath, RequestID: utils.GenUUID()}
|
||||
fallbackFileName := ffn.AsString()
|
||||
if cdre.fallbackPath != utils.META_NONE { // not none, need fallback
|
||||
fallbackPath = path.Join(cdre.fallbackPath, fallbackFileName)
|
||||
}
|
||||
switch cdre.exportFormat {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
|
||||
_, err = cdre.httpPoster.Post(cdre.exportPath, utils.PosterTransportContentTypes[cdre.exportFormat], body, cdre.attempts, fallbackPath)
|
||||
var pstr *HTTPPoster
|
||||
pstr, err = NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify,
|
||||
config.CgrConfig().GeneralCfg().ReplyTimeout, cdre.exportPath,
|
||||
utils.PosterTransportContentTypes[cdre.exportFormat], cdre.attempts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = pstr.Post(body, utils.EmptyString)
|
||||
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
|
||||
err = PostersCache.PostAMQP(cdre.exportPath, cdre.attempts, body.([]byte),
|
||||
utils.PosterTransportContentTypes[cdre.exportFormat], cdre.fallbackPath, fallbackFileName)
|
||||
err = PostersCache.PostAMQP(cdre.exportPath, cdre.attempts, body.([]byte))
|
||||
case utils.MetaAMQPV1jsonMap:
|
||||
err = PostersCache.PostAMQPv1(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName)
|
||||
err = PostersCache.PostAMQPv1(cdre.exportPath, cdre.attempts, body.([]byte))
|
||||
case utils.MetaSQSjsonMap:
|
||||
err = PostersCache.PostSQS(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName)
|
||||
err = PostersCache.PostSQS(cdre.exportPath, cdre.attempts, body.([]byte))
|
||||
case utils.MetaKafkajsonMap:
|
||||
err = PostersCache.PostKafka(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID))
|
||||
err = PostersCache.PostKafka(cdre.exportPath, cdre.attempts, body.([]byte), utils.ConcatenatedKey(cdr.CGRID, cdr.RunID))
|
||||
case utils.MetaS3jsonMap:
|
||||
err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID))
|
||||
err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), utils.ConcatenatedKey(cdr.CGRID, cdr.RunID))
|
||||
}
|
||||
if err != nil && cdre.fallbackPath != utils.META_NONE {
|
||||
addFailedPost(cdre.exportPath, cdre.exportFormat, body.([]byte))
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -384,8 +421,7 @@ func (cdre *CDRExporter) processCDRs() (err error) {
|
||||
if cdre.exportTemplate.Tenant == "" {
|
||||
cdre.exportTemplate.Tenant = config.CgrConfig().GeneralCfg().DefaultTenant
|
||||
}
|
||||
cgrDp := config.NewNavigableMap(nil)
|
||||
cgrDp.Set([]string{utils.MetaReq}, cdr.AsMapStringIface(), false, false)
|
||||
cgrDp := config.NewNavigableMap(map[string]interface{}{utils.MetaReq: cdr.AsMapStringIface()})
|
||||
if pass, err := cdre.filterS.Pass(cdre.exportTemplate.Tenant,
|
||||
cdre.exportTemplate.Filters, cgrDp); err != nil || !pass {
|
||||
continue // Not passes filters, ignore this CDR
|
||||
@@ -550,3 +586,116 @@ func (cdre *CDRExporter) NegativeExports() map[string]string {
|
||||
defer cdre.RUnlock()
|
||||
return cdre.negativeExports
|
||||
}
|
||||
|
||||
// ExportEvents used to save the failed post to file
|
||||
type ExportEvents struct {
|
||||
lk sync.RWMutex
|
||||
Path string
|
||||
Format string
|
||||
Zip bool
|
||||
Events [][]byte
|
||||
}
|
||||
|
||||
// ID returns the id for cache
|
||||
func (expEv *ExportEvents) ID() string {
|
||||
return utils.ConcatenatedKey(expEv.Path, expEv.Format)
|
||||
}
|
||||
|
||||
// FileName returns the file name it should use for saving the failed events
|
||||
func (expEv *ExportEvents) FileName(module string) string {
|
||||
fileSuffix := utils.CDREFileSuffixes[expEv.Format]
|
||||
// instead of using fmt.Sprintf we use "+" as binary operator (small optimization)
|
||||
return module + utils.HandlerArgSep + expEv.Format + utils.HandlerArgSep + utils.GenUUID() + fileSuffix
|
||||
}
|
||||
|
||||
// WriteToFile writes the events to file
|
||||
func (expEv *ExportEvents) WriteToFile(filePath string) (err error) {
|
||||
var content []byte
|
||||
content, err = json.Marshal(expEv)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
fileOut, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = fileOut.Write(content)
|
||||
fileOut.Close()
|
||||
return nil, err
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
|
||||
return
|
||||
}
|
||||
|
||||
// AddEvent adds one event
|
||||
func (expEv *ExportEvents) AddEvent(ev []byte) {
|
||||
expEv.lk.Lock()
|
||||
expEv.Events = append(expEv.Events, ev)
|
||||
expEv.lk.Unlock()
|
||||
}
|
||||
|
||||
// ReplayFailedPosts tryies to post cdrs again
|
||||
func (expEv *ExportEvents) ReplayFailedPosts(attempts int, key string) (failedEvents *ExportEvents, err error) {
|
||||
failedEvents = &ExportEvents{
|
||||
Path: expEv.Path,
|
||||
Format: expEv.Format,
|
||||
Zip: expEv.Zip,
|
||||
}
|
||||
switch expEv.Format {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
|
||||
var pstr *HTTPPoster
|
||||
pstr, err = NewHTTPPoster(config.CgrConfig().GeneralCfg().HttpSkipTlsVerify,
|
||||
config.CgrConfig().GeneralCfg().ReplyTimeout, expEv.Path,
|
||||
expEv.Format, config.CgrConfig().GeneralCfg().PosterAttempts)
|
||||
if err != nil {
|
||||
return expEv, err
|
||||
}
|
||||
for _, ev := range expEv.Events {
|
||||
err = pstr.Post(ev, utils.EmptyString)
|
||||
if err != nil {
|
||||
failedEvents.AddEvent(ev)
|
||||
}
|
||||
}
|
||||
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
|
||||
for _, ev := range expEv.Events {
|
||||
err = PostersCache.PostAMQP(expEv.Path, attempts, ev)
|
||||
if err != nil {
|
||||
failedEvents.AddEvent(ev)
|
||||
}
|
||||
}
|
||||
case utils.MetaAMQPV1jsonMap:
|
||||
for _, ev := range expEv.Events {
|
||||
err = PostersCache.PostAMQPv1(expEv.Path, attempts, ev)
|
||||
if err != nil {
|
||||
failedEvents.AddEvent(ev)
|
||||
}
|
||||
}
|
||||
case utils.MetaSQSjsonMap:
|
||||
for _, ev := range expEv.Events {
|
||||
err = PostersCache.PostSQS(expEv.Path, attempts, ev)
|
||||
if err != nil {
|
||||
failedEvents.AddEvent(ev)
|
||||
}
|
||||
}
|
||||
case utils.MetaKafkajsonMap:
|
||||
for _, ev := range expEv.Events {
|
||||
err = PostersCache.PostKafka(expEv.Path, attempts, ev, key)
|
||||
if err != nil {
|
||||
failedEvents.AddEvent(ev)
|
||||
}
|
||||
}
|
||||
case utils.MetaS3jsonMap:
|
||||
for _, ev := range expEv.Events {
|
||||
err = PostersCache.PostS3(expEv.Path, attempts, ev, key)
|
||||
if err != nil {
|
||||
failedEvents.AddEvent(ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(failedEvents.Events) > 0 {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
} else {
|
||||
failedEvents = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ func TestCsvCdrWriter(t *testing.T) {
|
||||
}
|
||||
cdre, err := NewCDRExporter([]*CDR{storedCdr1},
|
||||
cfg.CdreProfiles[utils.MetaDefault], utils.MetaFileCSV, "", "", "firstexport",
|
||||
true, 1, utils.CSV_SEP, cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil, nil)
|
||||
true, 1, utils.CSV_SEP, cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil)
|
||||
if err != nil {
|
||||
t.Error("Unexpected error received: ", err)
|
||||
}
|
||||
@@ -83,7 +83,7 @@ func TestAlternativeFieldSeparator(t *testing.T) {
|
||||
}
|
||||
cdre, err := NewCDRExporter([]*CDR{storedCdr1}, cfg.CdreProfiles[utils.MetaDefault],
|
||||
utils.MetaFileCSV, "", "", "firstexport", true, 1, '|',
|
||||
cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil, nil)
|
||||
cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil)
|
||||
if err != nil {
|
||||
t.Error("Unexpected error received: ", err)
|
||||
}
|
||||
@@ -177,7 +177,7 @@ func TestExportVoiceWithConvert(t *testing.T) {
|
||||
}
|
||||
cdre, err := NewCDRExporter([]*CDR{cdrVoice, cdrData, cdrSMS}, cdreCfg,
|
||||
utils.MetaFileCSV, "", "", "firstexport",
|
||||
true, 1, '|', true, nil, nil, &FilterS{cfg: cfg})
|
||||
true, 1, '|', true, nil, &FilterS{cfg: cfg})
|
||||
if err != nil {
|
||||
t.Error("Unexpected error received: ", err)
|
||||
}
|
||||
@@ -274,7 +274,7 @@ func TestExportWithFilter(t *testing.T) {
|
||||
}
|
||||
cdre, err := NewCDRExporter([]*CDR{cdrVoice, cdrData, cdrSMS}, cdreCfg,
|
||||
utils.MetaFileCSV, "", "", "firstexport",
|
||||
true, 1, '|', true, nil, nil, &FilterS{cfg: cfg})
|
||||
true, 1, '|', true, nil, &FilterS{cfg: cfg})
|
||||
if err != nil {
|
||||
t.Error("Unexpected error received: ", err)
|
||||
}
|
||||
@@ -370,7 +370,7 @@ func TestExportWithFilter2(t *testing.T) {
|
||||
}
|
||||
cdre, err := NewCDRExporter([]*CDR{cdrVoice, cdrData, cdrSMS}, cdreCfg,
|
||||
utils.MetaFileCSV, "", "", "firstexport",
|
||||
true, 1, '|', true, nil, nil, &FilterS{cfg: cfg})
|
||||
true, 1, '|', true, nil, &FilterS{cfg: cfg})
|
||||
if err != nil {
|
||||
t.Error("Unexpected error received: ", err)
|
||||
}
|
||||
|
||||
@@ -283,7 +283,7 @@ func TestWriteCdr(t *testing.T) {
|
||||
}
|
||||
|
||||
cdre, err := NewCDRExporter([]*CDR{cdr}, cdreCfg, utils.MetaFileFWV, "", "", "fwv_1",
|
||||
true, 1, '|', cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil, nil)
|
||||
true, 1, '|', cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@@ -370,7 +370,7 @@ func TestWriteCdrs(t *testing.T) {
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
cdre, err := NewCDRExporter([]*CDR{cdr1, cdr2, cdr3, cdr4}, cdreCfg,
|
||||
utils.MetaFileFWV, "", "", "fwv_1", true, 1, utils.CSV_SEP,
|
||||
cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil, nil)
|
||||
cfg.GeneralCfg().HttpSkipTlsVerify, nil, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -72,12 +72,10 @@ func NewCDRServer(cgrCfg *config.CGRConfig, storDBChan chan StorDB, dm *DataMana
|
||||
connMgr *ConnManager) *CDRServer {
|
||||
cdrDb := <-storDBChan
|
||||
return &CDRServer{
|
||||
cgrCfg: cgrCfg,
|
||||
cdrDb: cdrDb,
|
||||
dm: dm,
|
||||
guard: guardian.Guardian,
|
||||
httpPoster: NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
|
||||
cgrCfg.GeneralCfg().ReplyTimeout),
|
||||
cgrCfg: cgrCfg,
|
||||
cdrDb: cdrDb,
|
||||
dm: dm,
|
||||
guard: guardian.Guardian,
|
||||
filterS: filterS,
|
||||
connMgr: connMgr,
|
||||
storDBChan: storDBChan,
|
||||
@@ -90,7 +88,6 @@ type CDRServer struct {
|
||||
cdrDb CdrStorage
|
||||
dm *DataManager
|
||||
guard *guardian.GuardianLocker
|
||||
httpPoster *HTTPPoster // used for replication
|
||||
filterS *FilterS
|
||||
connMgr *ConnManager
|
||||
storDBChan chan StorDB
|
||||
@@ -391,7 +388,7 @@ func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) {
|
||||
if cdre, err = NewCDRExporter(cdrs, expTpl, expTpl.ExportFormat,
|
||||
expTpl.ExportPath, cdrS.cgrCfg.GeneralCfg().FailedPostsDir,
|
||||
"CDRSReplication", expTpl.Synchronous, expTpl.Attempts,
|
||||
expTpl.FieldSeparator, cdrS.cgrCfg.GeneralCfg().HttpSkipTlsVerify, cdrS.httpPoster,
|
||||
expTpl.FieldSeparator, cdrS.cgrCfg.GeneralCfg().HttpSkipTlsVerify,
|
||||
cdrS.cgrCfg.CdrsCfg().AttributeSConns, cdrS.filterS); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Building CDRExporter for online exports got error: <%s>", err.Error()))
|
||||
continue
|
||||
|
||||
@@ -64,7 +64,7 @@ type PosterCache struct {
|
||||
}
|
||||
|
||||
type Poster interface {
|
||||
Post(body []byte, fallbackName, key string) error
|
||||
Post(body []byte, key string) error
|
||||
Close()
|
||||
}
|
||||
|
||||
@@ -96,6 +96,7 @@ func parseURL(dialURL string) (URL string, qID string, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Close closes all cached posters
|
||||
func (pc *PosterCache) Close() {
|
||||
for _, v := range pc.amqpCache {
|
||||
v.Close()
|
||||
@@ -113,112 +114,111 @@ func (pc *PosterCache) Close() {
|
||||
|
||||
// GetAMQPPoster creates a new poster only if not already cached
|
||||
// uses dialURL as cache key
|
||||
func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int) (pstr Poster, err error) {
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
if _, hasIt := pc.amqpCache[dialURL]; !hasIt {
|
||||
if pstr, err := NewAMQPPoster(dialURL, attempts, fallbackFileDir); err != nil {
|
||||
if pstr, err = NewAMQPPoster(dialURL, attempts); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
pc.amqpCache[dialURL] = pstr
|
||||
}
|
||||
pc.amqpCache[dialURL] = pstr
|
||||
}
|
||||
return pc.amqpCache[dialURL], nil
|
||||
}
|
||||
|
||||
func (pc *PosterCache) GetAMQPv1Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
// GetAMQPv1Poster creates a new poster only if not already cached
|
||||
func (pc *PosterCache) GetAMQPv1Poster(dialURL string, attempts int) (pstr Poster, err error) {
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
if _, hasIt := pc.amqpv1Cache[dialURL]; !hasIt {
|
||||
if pstr, err := NewAMQPv1Poster(dialURL, attempts, fallbackFileDir); err != nil {
|
||||
if pstr, err = NewAMQPv1Poster(dialURL, attempts); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
pc.amqpv1Cache[dialURL] = pstr
|
||||
}
|
||||
pc.amqpv1Cache[dialURL] = pstr
|
||||
}
|
||||
return pc.amqpv1Cache[dialURL], nil
|
||||
}
|
||||
|
||||
func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
// GetSQSPoster creates a new poster only if not already cached
|
||||
func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int) (pstr Poster, err error) {
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
if _, hasIt := pc.sqsCache[dialURL]; !hasIt {
|
||||
if pstr, err := NewSQSPoster(dialURL, attempts, fallbackFileDir); err != nil {
|
||||
if pstr, err = NewSQSPoster(dialURL, attempts); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
pc.sqsCache[dialURL] = pstr
|
||||
}
|
||||
pc.sqsCache[dialURL] = pstr
|
||||
}
|
||||
return pc.sqsCache[dialURL], nil
|
||||
}
|
||||
|
||||
func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
// GetKafkaPoster creates a new poster only if not already cached
|
||||
func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int) (pstr Poster, err error) {
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
if _, hasIt := pc.kafkaCache[dialURL]; !hasIt {
|
||||
if pstr, err := NewKafkaPoster(dialURL, attempts, fallbackFileDir); err != nil {
|
||||
if pstr, err = NewKafkaPoster(dialURL, attempts); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
pc.kafkaCache[dialURL] = pstr
|
||||
}
|
||||
pc.kafkaCache[dialURL] = pstr
|
||||
}
|
||||
return pc.kafkaCache[dialURL], nil
|
||||
}
|
||||
|
||||
func (pc *PosterCache) GetS3Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
// GetS3Poster creates a new poster only if not already cached
|
||||
func (pc *PosterCache) GetS3Poster(dialURL string, attempts int) (pstr Poster, err error) {
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
if _, hasIt := pc.s3Cache[dialURL]; !hasIt {
|
||||
if pstr, err := NewS3Poster(dialURL, attempts, fallbackFileDir); err != nil {
|
||||
if pstr, err = NewS3Poster(dialURL, attempts); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
pc.s3Cache[dialURL] = pstr
|
||||
}
|
||||
pc.s3Cache[dialURL] = pstr
|
||||
}
|
||||
return pc.s3Cache[dialURL], nil
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostAMQP(dialURL string, attempts int,
|
||||
content []byte, contentType, fallbackFileDir, fallbackFileName string) error {
|
||||
amqpPoster, err := pc.GetAMQPPoster(dialURL, attempts, fallbackFileDir)
|
||||
content []byte) error {
|
||||
amqpPoster, err := pc.GetAMQPPoster(dialURL, attempts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return amqpPoster.Post(content, fallbackFileName, "")
|
||||
return amqpPoster.Post(content, "")
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostAMQPv1(dialURL string, attempts int,
|
||||
content []byte, fallbackFileDir, fallbackFileName string) error {
|
||||
AMQPv1Poster, err := pc.GetAMQPv1Poster(dialURL, attempts, fallbackFileDir)
|
||||
content []byte) error {
|
||||
AMQPv1Poster, err := pc.GetAMQPv1Poster(dialURL, attempts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return AMQPv1Poster.Post(content, fallbackFileName, "")
|
||||
return AMQPv1Poster.Post(content, "")
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostSQS(dialURL string, attempts int,
|
||||
content []byte, fallbackFileDir, fallbackFileName string) error {
|
||||
sqsPoster, err := pc.GetSQSPoster(dialURL, attempts, fallbackFileDir)
|
||||
content []byte) error {
|
||||
sqsPoster, err := pc.GetSQSPoster(dialURL, attempts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sqsPoster.Post(content, fallbackFileName, "")
|
||||
return sqsPoster.Post(content, "")
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostKafka(dialURL string, attempts int,
|
||||
content []byte, fallbackFileDir, fallbackFileName, key string) error {
|
||||
kafkaPoster, err := pc.GetKafkaPoster(dialURL, attempts, fallbackFileDir)
|
||||
content []byte, key string) error {
|
||||
kafkaPoster, err := pc.GetKafkaPoster(dialURL, attempts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return kafkaPoster.Post(content, fallbackFileName, key)
|
||||
return kafkaPoster.Post(content, key)
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostS3(dialURL string, attempts int,
|
||||
content []byte, fallbackFileDir, fallbackFileName, key string) error {
|
||||
sqsPoster, err := pc.GetS3Poster(dialURL, attempts, fallbackFileDir)
|
||||
content []byte, key string) error {
|
||||
sqsPoster, err := pc.GetS3Poster(dialURL, attempts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sqsPoster.Post(content, fallbackFileName, key)
|
||||
return sqsPoster.Post(content, key)
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ GNU General Public License for more details.
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
@@ -28,13 +29,13 @@ import (
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
var AMQPQuery = []string{"cacertfile", "certfile", "keyfile", "verify", "server_name_indication", "auth_mechanism", "heartbeat", "connection_timeout", "channel_max"}
|
||||
var amqpQuery = []string{"cacertfile", "certfile", "keyfile", "verify", "server_name_indication", "auth_mechanism", "heartbeat", "connection_timeout", "channel_max"}
|
||||
|
||||
// NewAMQPPoster creates a new amqp poster
|
||||
// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs"
|
||||
func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) {
|
||||
func NewAMQPPoster(dialURL string, attempts int) (*AMQPPoster, error) {
|
||||
amqp := &AMQPPoster{
|
||||
attempts: attempts,
|
||||
fallbackFileDir: fallbackFileDir,
|
||||
attempts: attempts,
|
||||
}
|
||||
if err := amqp.parseURL(dialURL); err != nil {
|
||||
return nil, err
|
||||
@@ -42,16 +43,16 @@ func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPP
|
||||
return amqp, nil
|
||||
}
|
||||
|
||||
// AMQPPoster used to post cdrs to amqp
|
||||
type AMQPPoster struct {
|
||||
dialURL string
|
||||
queueID string // identifier of the CDR queue where we publish
|
||||
exchange string
|
||||
exchangeType string
|
||||
routingKey string
|
||||
attempts int
|
||||
fallbackFileDir string
|
||||
sync.Mutex // protect connection
|
||||
conn *amqp.Connection
|
||||
dialURL string
|
||||
queueID string // identifier of the CDR queue where we publish
|
||||
exchange string
|
||||
exchangeType string
|
||||
routingKey string
|
||||
attempts int
|
||||
sync.Mutex // protect connection
|
||||
conn *amqp.Connection
|
||||
}
|
||||
|
||||
func (pstr *AMQPPoster) parseURL(dialURL string) error {
|
||||
@@ -61,7 +62,7 @@ func (pstr *AMQPPoster) parseURL(dialURL string) error {
|
||||
}
|
||||
qry := u.Query()
|
||||
q := url.Values{}
|
||||
for _, key := range AMQPQuery {
|
||||
for _, key := range amqpQuery {
|
||||
if vals, has := qry[key]; has && len(vals) != 0 {
|
||||
q.Add(key, vals[0])
|
||||
}
|
||||
@@ -87,7 +88,7 @@ func (pstr *AMQPPoster) parseURL(dialURL string) error {
|
||||
|
||||
// Post is the method being called when we need to post anything in the queue
|
||||
// the optional chn will permits channel caching
|
||||
func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err error) {
|
||||
func (pstr *AMQPPoster) Post(content []byte, _ string) (err error) {
|
||||
var chn *amqp.Channel
|
||||
fib := utils.Fib()
|
||||
|
||||
@@ -100,11 +101,8 @@ func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err er
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if fallbackFileName != utils.META_NONE {
|
||||
utils.Logger.Warning(fmt.Sprintf("<AMQPPoster> creating new post channel, err: %s", err.Error()))
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content)
|
||||
}
|
||||
return err
|
||||
utils.Logger.Warning(fmt.Sprintf("<AMQPPoster> creating new post channel, err: %s", err.Error()))
|
||||
return
|
||||
}
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
if err = chn.Publish(
|
||||
@@ -123,9 +121,8 @@ func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err er
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
}
|
||||
if err != nil && fallbackFileName != utils.META_NONE {
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content)
|
||||
return err
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if chn != nil {
|
||||
chn.Close()
|
||||
@@ -133,6 +130,7 @@ func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err er
|
||||
return
|
||||
}
|
||||
|
||||
// Close closes the connections
|
||||
func (pstr *AMQPPoster) Close() {
|
||||
pstr.Lock()
|
||||
if pstr.conn != nil {
|
||||
|
||||
@@ -29,28 +29,29 @@ import (
|
||||
amqpv1 "pack.ag/amqp"
|
||||
)
|
||||
|
||||
func NewAMQPv1Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
// NewAMQPv1Poster creates a poster for amqpv1
|
||||
func NewAMQPv1Poster(dialURL string, attempts int) (Poster, error) {
|
||||
URL, qID, err := parseURL(dialURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &AMQPv1Poster{
|
||||
dialURL: URL,
|
||||
queueID: "/" + qID,
|
||||
attempts: attempts,
|
||||
fallbackFileDir: fallbackFileDir,
|
||||
dialURL: URL,
|
||||
queueID: "/" + qID,
|
||||
attempts: attempts,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// AMQPv1Poster a poster for amqpv1
|
||||
type AMQPv1Poster struct {
|
||||
sync.Mutex
|
||||
dialURL string
|
||||
queueID string // identifier of the CDR queue where we publish
|
||||
attempts int
|
||||
fallbackFileDir string
|
||||
client *amqpv1.Client
|
||||
dialURL string
|
||||
queueID string // identifier of the CDR queue where we publish
|
||||
attempts int
|
||||
client *amqpv1.Client
|
||||
}
|
||||
|
||||
// Close closes the connections
|
||||
func (pstr *AMQPv1Poster) Close() {
|
||||
pstr.Lock()
|
||||
if pstr.client != nil {
|
||||
@@ -60,7 +61,8 @@ func (pstr *AMQPv1Poster) Close() {
|
||||
pstr.Unlock()
|
||||
}
|
||||
|
||||
func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err error) {
|
||||
// Post is the method being called when we need to post anything in the queue
|
||||
func (pstr *AMQPv1Poster) Post(content []byte, _ string) (err error) {
|
||||
var s *amqpv1.Session
|
||||
fib := utils.Fib()
|
||||
|
||||
@@ -79,10 +81,7 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if fallbackFileName != utils.META_NONE {
|
||||
utils.Logger.Warning(fmt.Sprintf("<AMQPv1Poster> creating new post channel, err: %s", err.Error()))
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content)
|
||||
}
|
||||
utils.Logger.Warning(fmt.Sprintf("<AMQPv1Poster> creating new post channel, err: %s", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -125,9 +124,8 @@ func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err
|
||||
// }
|
||||
// }
|
||||
}
|
||||
if err != nil && fallbackFileName != utils.META_NONE {
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content)
|
||||
return err
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s != nil {
|
||||
s.Close(ctx)
|
||||
|
||||
@@ -25,109 +25,116 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// Post without automatic failover
|
||||
func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error) {
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTlsVerify},
|
||||
DisableKeepAlives: true,
|
||||
// keep it global in order to reuse it
|
||||
var httpPosterTransport *http.Transport
|
||||
|
||||
// HttpJsonPost posts without automatic failover
|
||||
func HttpJsonPost(url string, skipTLSVerify bool, content []byte) (respBody []byte, err error) {
|
||||
if httpPosterTransport == nil {
|
||||
httpPosterTransport = &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify},
|
||||
}
|
||||
}
|
||||
client := &http.Client{Transport: tr}
|
||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(content))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
client := &http.Client{Transport: httpPosterTransport}
|
||||
var resp *http.Response
|
||||
if resp, err = client.Post(url, "application/json", bytes.NewBuffer(content)); err != nil {
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
respBody, err := ioutil.ReadAll(resp.Body)
|
||||
respBody, err = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return
|
||||
}
|
||||
if resp.StatusCode > 299 {
|
||||
return respBody, fmt.Errorf("Unexpected status code received: %d", resp.StatusCode)
|
||||
err = fmt.Errorf("Unexpected status code received: %d", resp.StatusCode)
|
||||
}
|
||||
return respBody, nil
|
||||
return
|
||||
}
|
||||
|
||||
func NewHTTPPoster(skipTLSVerify bool, replyTimeout time.Duration) *HTTPPoster {
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify},
|
||||
}
|
||||
return &HTTPPoster{httpClient: &http.Client{Transport: tr, Timeout: replyTimeout}}
|
||||
}
|
||||
|
||||
type HTTPPoster struct {
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// Post with built-in failover
|
||||
// Returns also reference towards client so we can close it's connections when done
|
||||
func (poster *HTTPPoster) Post(addr string, contentType string, content interface{}, attempts int, fallbackFilePath string) (respBody []byte, err error) {
|
||||
// NewHTTPPoster return a new HTTP poster
|
||||
func NewHTTPPoster(skipTLSVerify bool, replyTimeout time.Duration,
|
||||
addr, contentType string, attempts int) (httposter *HTTPPoster, err error) {
|
||||
if !utils.SliceHasMember([]string{utils.CONTENT_FORM, utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) {
|
||||
return nil, fmt.Errorf("unsupported ContentType: %s", contentType)
|
||||
}
|
||||
if httpPosterTransport == nil {
|
||||
httpPosterTransport = &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify},
|
||||
}
|
||||
}
|
||||
return &HTTPPoster{
|
||||
httpClient: &http.Client{Transport: httpPosterTransport, Timeout: replyTimeout},
|
||||
addr: addr,
|
||||
contentType: contentType,
|
||||
attempts: attempts,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// HTTPPoster used to post cdrs
|
||||
type HTTPPoster struct {
|
||||
httpClient *http.Client
|
||||
addr string
|
||||
contentType string
|
||||
attempts int
|
||||
}
|
||||
|
||||
// Post will post the event
|
||||
func (pstr *HTTPPoster) Post(content interface{}, key string) (err error) {
|
||||
_, err = pstr.GetResponse(content)
|
||||
return
|
||||
}
|
||||
|
||||
// GetResponse will post the event and return the response
|
||||
func (pstr *HTTPPoster) GetResponse(content interface{}) (respBody []byte, err error) {
|
||||
var body []byte // Used to write in file and send over http
|
||||
var urlVals url.Values // Used when posting form
|
||||
if utils.SliceHasMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) {
|
||||
body = content.([]byte)
|
||||
} else if contentType == utils.CONTENT_FORM {
|
||||
if pstr.contentType == utils.CONTENT_FORM {
|
||||
urlVals = content.(url.Values)
|
||||
body = []byte(urlVals.Encode())
|
||||
} else {
|
||||
body = content.([]byte)
|
||||
}
|
||||
fib := utils.Fib()
|
||||
bodyType := "application/x-www-form-urlencoded"
|
||||
if contentType == utils.CONTENT_JSON {
|
||||
if pstr.contentType == utils.CONTENT_JSON {
|
||||
bodyType = "application/json"
|
||||
}
|
||||
for i := 0; i < attempts; i++ {
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
var resp *http.Response
|
||||
if utils.SliceHasMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) {
|
||||
resp, err = poster.httpClient.Post(addr, bodyType, bytes.NewBuffer(body))
|
||||
} else if contentType == utils.CONTENT_FORM {
|
||||
resp, err = poster.httpClient.PostForm(addr, urlVals)
|
||||
if pstr.contentType == utils.CONTENT_FORM {
|
||||
resp, err = pstr.httpClient.PostForm(pstr.addr, urlVals)
|
||||
} else {
|
||||
resp, err = pstr.httpClient.Post(pstr.addr, bodyType, bytes.NewBuffer(body))
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
|
||||
if i+1 < attempts {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", pstr.addr, err.Error()))
|
||||
if i+1 < pstr.attempts {
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
continue
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
respBody, err = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
|
||||
if i+1 < attempts {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", pstr.addr, err.Error()))
|
||||
if i+1 < pstr.attempts {
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode > 299 {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode))
|
||||
if i+1 < attempts {
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", pstr.addr, resp.StatusCode))
|
||||
if i+1 < pstr.attempts {
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return respBody, nil
|
||||
}
|
||||
if fallbackFilePath != utils.META_NONE {
|
||||
// If we got that far, post was not possible, write it on disk
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
fileOut, err := os.Create(fallbackFilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = fileOut.Write(body)
|
||||
fileOut.Close()
|
||||
return nil, err
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+fallbackFilePath)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -27,11 +27,10 @@ import (
|
||||
kafka "github.com/segmentio/kafka-go"
|
||||
)
|
||||
|
||||
// "amqp://guest:guest@localhost:5672/?topic=cgrates_cdrs"
|
||||
func NewKafkaPoster(dialURL string, attempts int, fallbackFileDir string) (*KafkaPoster, error) {
|
||||
// NewKafkaPoster creates a kafka poster
|
||||
func NewKafkaPoster(dialURL string, attempts int) (*KafkaPoster, error) {
|
||||
amqp := &KafkaPoster{
|
||||
attempts: attempts,
|
||||
fallbackFileDir: fallbackFileDir,
|
||||
attempts: attempts,
|
||||
}
|
||||
if err := amqp.parseURL(dialURL); err != nil {
|
||||
return nil, err
|
||||
@@ -39,13 +38,13 @@ func NewKafkaPoster(dialURL string, attempts int, fallbackFileDir string) (*Kafk
|
||||
return amqp, nil
|
||||
}
|
||||
|
||||
// KafkaPoster is a kafka poster
|
||||
type KafkaPoster struct {
|
||||
dialURL string
|
||||
topic string // identifier of the CDR queue where we publish
|
||||
attempts int
|
||||
fallbackFileDir string
|
||||
sync.Mutex // protect writer
|
||||
writer *kafka.Writer
|
||||
dialURL string
|
||||
topic string // identifier of the CDR queue where we publish
|
||||
attempts int
|
||||
sync.Mutex // protect writer
|
||||
writer *kafka.Writer
|
||||
}
|
||||
|
||||
func (pstr *KafkaPoster) parseURL(dialURL string) error {
|
||||
@@ -65,7 +64,7 @@ func (pstr *KafkaPoster) parseURL(dialURL string) error {
|
||||
|
||||
// Post is the method being called when we need to post anything in the queue
|
||||
// the optional chn will permits channel caching
|
||||
func (pstr *KafkaPoster) Post(content []byte, fallbackFileName, key string) (err error) {
|
||||
func (pstr *KafkaPoster) Post(content []byte, key string) (err error) {
|
||||
pstr.newPostWriter()
|
||||
pstr.Lock()
|
||||
if err = pstr.writer.WriteMessages(context.Background(), kafka.Message{
|
||||
@@ -76,13 +75,10 @@ func (pstr *KafkaPoster) Post(content []byte, fallbackFileName, key string) (err
|
||||
return
|
||||
}
|
||||
pstr.Unlock()
|
||||
if err != nil && fallbackFileName != utils.META_NONE {
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content)
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Close closes the kafka writer
|
||||
func (pstr *KafkaPoster) Close() {
|
||||
pstr.Lock()
|
||||
if pstr.writer != nil {
|
||||
|
||||
@@ -32,29 +32,30 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewS3Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
// NewS3Poster creates a s3 poster
|
||||
func NewS3Poster(dialURL string, attempts int) (Poster, error) {
|
||||
pstr := &S3Poster{
|
||||
attempts: attempts,
|
||||
fallbackFileDir: fallbackFileDir,
|
||||
attempts: attempts,
|
||||
}
|
||||
pstr.parseURL(dialURL)
|
||||
return pstr, nil
|
||||
}
|
||||
|
||||
// S3Poster is a s3 poster
|
||||
type S3Poster struct {
|
||||
sync.Mutex
|
||||
dialURL string
|
||||
awsRegion string
|
||||
awsID string
|
||||
awsKey string
|
||||
awsToken string
|
||||
attempts int
|
||||
fallbackFileDir string
|
||||
queueID string
|
||||
folderPath string
|
||||
session *session.Session
|
||||
dialURL string
|
||||
awsRegion string
|
||||
awsID string
|
||||
awsKey string
|
||||
awsToken string
|
||||
attempts int
|
||||
queueID string
|
||||
folderPath string
|
||||
session *session.Session
|
||||
}
|
||||
|
||||
// Close for Poster interface
|
||||
func (pstr *S3Poster) Close() {}
|
||||
|
||||
func (pstr *S3Poster) parseURL(dialURL string) {
|
||||
@@ -83,7 +84,8 @@ func (pstr *S3Poster) parseURL(dialURL string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err error) {
|
||||
// Post is the method being called when we need to post anything in the queue
|
||||
func (pstr *S3Poster) Post(message []byte, key string) (err error) {
|
||||
var svc *s3manager.Uploader
|
||||
fib := utils.Fib()
|
||||
|
||||
@@ -96,11 +98,8 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if fallbackFileName != utils.META_NONE {
|
||||
utils.Logger.Warning(fmt.Sprintf("<S3Poster> creating new session, err: %s", err.Error()))
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message)
|
||||
}
|
||||
return err
|
||||
utils.Logger.Warning(fmt.Sprintf("<S3Poster> creating new session, err: %s", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
@@ -124,11 +123,10 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err er
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
}
|
||||
if err != nil && fallbackFileName != utils.META_NONE {
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<S3Poster> posting new message, err: %s", err.Error()))
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message)
|
||||
}
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *S3Poster) newPosterSession() (s *s3manager.Uploader, err error) {
|
||||
|
||||
@@ -32,30 +32,31 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
// NewSQSPoster creates a poster for sqs
|
||||
func NewSQSPoster(dialURL string, attempts int) (Poster, error) {
|
||||
pstr := &SQSPoster{
|
||||
attempts: attempts,
|
||||
fallbackFileDir: fallbackFileDir,
|
||||
attempts: attempts,
|
||||
}
|
||||
pstr.parseURL(dialURL)
|
||||
return pstr, nil
|
||||
}
|
||||
|
||||
// SQSPoster is a poster for sqs
|
||||
type SQSPoster struct {
|
||||
sync.Mutex
|
||||
dialURL string
|
||||
awsRegion string
|
||||
awsID string
|
||||
awsKey string
|
||||
awsToken string
|
||||
attempts int
|
||||
fallbackFileDir string
|
||||
queueURL *string
|
||||
queueID string
|
||||
dialURL string
|
||||
awsRegion string
|
||||
awsID string
|
||||
awsKey string
|
||||
awsToken string
|
||||
attempts int
|
||||
queueURL *string
|
||||
queueID string
|
||||
// getQueueOnce sync.Once
|
||||
session *session.Session
|
||||
}
|
||||
|
||||
// Close for Poster interface
|
||||
func (pstr *SQSPoster) Close() {}
|
||||
|
||||
func (pstr *SQSPoster) parseURL(dialURL string) {
|
||||
@@ -115,7 +116,8 @@ func (pstr *SQSPoster) getQueueURL() (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err error) {
|
||||
// Post is the method being called when we need to post anything in the queue
|
||||
func (pstr *SQSPoster) Post(message []byte, _ string) (err error) {
|
||||
var svc *sqs.SQS
|
||||
fib := utils.Fib()
|
||||
|
||||
@@ -128,11 +130,8 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if fallbackFileName != utils.META_NONE {
|
||||
utils.Logger.Warning(fmt.Sprintf("<SQSPoster> creating new session, err: %s", err.Error()))
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message)
|
||||
}
|
||||
return err
|
||||
utils.Logger.Warning(fmt.Sprintf("<SQSPoster> creating new session, err: %s", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
@@ -148,11 +147,10 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err err
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
}
|
||||
if err != nil && fallbackFileName != utils.META_NONE {
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<SQSPoster> posting new message, err: %s", err.Error()))
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message)
|
||||
}
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *SQSPoster) newPosterSession() (s *sqs.SQS, err error) {
|
||||
|
||||
@@ -139,8 +139,7 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) {
|
||||
}
|
||||
if rdr.Config().ProcessedPath != utils.EmptyString { // post it
|
||||
if err := engine.PostersCache.PostKafka(rdr.Config().ProcessedPath,
|
||||
rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Value, "",
|
||||
utils.META_NONE, string(msg.Key)); err != nil {
|
||||
rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Value, string(msg.Key)); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> writing message %s error: %s",
|
||||
utils.ERs, string(msg.Key), err.Error()))
|
||||
|
||||
@@ -97,11 +97,9 @@ func (apiService *ApierV1Service) Start() (err error) {
|
||||
Config: apiService.cfg,
|
||||
Responder: apiService.responderService.GetResponder(),
|
||||
SchedulerService: apiService.schedService,
|
||||
HTTPPoster: engine.NewHTTPPoster(apiService.cfg.GeneralCfg().HttpSkipTlsVerify,
|
||||
apiService.cfg.GeneralCfg().ReplyTimeout),
|
||||
FilterS: filterS,
|
||||
ConnMgr: apiService.connMgr,
|
||||
StorDBChan: storDBChan,
|
||||
FilterS: filterS,
|
||||
ConnMgr: apiService.connMgr,
|
||||
StorDBChan: storDBChan,
|
||||
}
|
||||
|
||||
go func(api *v1.ApierV1, stopChan chan struct{}) {
|
||||
|
||||
Reference in New Issue
Block a user