mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-23 08:08:45 +05:00
Updated S3Poster
This commit is contained in:
committed by
Dan Christian Bogos
parent
0aa1ffefaf
commit
c7e4c33391
@@ -1068,10 +1068,10 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (
|
||||
fileContent, failedReqsOutDir, file.Name())
|
||||
case utils.MetaKafkajsonMap:
|
||||
err = engine.PostersCache.PostKafka(ffn.Address, v1.Config.GeneralCfg().PosterAttempts,
|
||||
fileContent, failedReqsOutDir, file.Name())
|
||||
fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix())
|
||||
case utils.MetaS3jsonMap:
|
||||
err = engine.PostersCache.PostS3(ffn.Address, v1.Config.GeneralCfg().PosterAttempts,
|
||||
fileContent, failedReqsOutDir, file.Name())
|
||||
fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix())
|
||||
default:
|
||||
err = fmt.Errorf("unsupported replication transport: %s", ffn.Transport)
|
||||
}
|
||||
|
||||
@@ -395,8 +395,20 @@ func (self *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
}
|
||||
for _, cdrePrfl := range self.cdrsCfg.CDRSOnlineCDRExports {
|
||||
if _, hasIt := self.CdreProfiles[cdrePrfl]; !hasIt {
|
||||
if cdreProfile, hasIt := self.CdreProfiles[cdrePrfl]; !hasIt {
|
||||
return fmt.Errorf("<CDRS> Cannot find CDR export template with ID: <%s>", cdrePrfl)
|
||||
} else if cdreProfile.ExportFormat == utils.MetaS3jsonMap || cdreProfile.ExportFormat == utils.MetaSQSjsonMap {
|
||||
poster := "SQSPoster"
|
||||
if cdreProfile.ExportFormat == utils.MetaS3jsonMap {
|
||||
poster = "S3Poster"
|
||||
}
|
||||
neededArgs := []string{"aws_region", "aws_key", "aws_secret"}
|
||||
args := utils.GetUrlRawArguments(cdreProfile.ExportPath)
|
||||
for _, arg := range neededArgs {
|
||||
if _, has := args[arg]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> No %s present for AWS for cdre: <%s>.", poster, arg, cdrePrfl))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if !self.thresholdSCfg.Enabled {
|
||||
|
||||
@@ -469,7 +469,7 @@ func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error
|
||||
}).AsString()
|
||||
|
||||
return PostersCache.PostKafka(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName)
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName, utils.UUIDSha1Prefix())
|
||||
}
|
||||
|
||||
func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
@@ -487,7 +487,7 @@ func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
}).AsString()
|
||||
|
||||
return PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName)
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName, utils.UUIDSha1Prefix())
|
||||
}
|
||||
|
||||
func callUrl(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
|
||||
@@ -288,9 +288,9 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
|
||||
case utils.MetaSQSjsonMap:
|
||||
err = PostersCache.PostSQS(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName)
|
||||
case utils.MetaKafkajsonMap:
|
||||
err = PostersCache.PostKafka(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName)
|
||||
err = PostersCache.PostKafka(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID))
|
||||
case utils.MetaS3jsonMap:
|
||||
err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName)
|
||||
err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -39,11 +39,12 @@ const (
|
||||
exchangeType = "exchange_type"
|
||||
routingKey = "routing_key"
|
||||
|
||||
awsRegion = "aws_region"
|
||||
awsID = "aws_key"
|
||||
awsSecret = "aws_secret"
|
||||
awsToken = "aws_token"
|
||||
awsAccountID = "aws_account_id"
|
||||
awsRegion = "aws_region"
|
||||
awsID = "aws_key"
|
||||
awsSecret = "aws_secret"
|
||||
awsToken = "aws_token"
|
||||
// awsAccountID = "aws_account_id"
|
||||
folderPath = "folder_path"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -68,7 +69,7 @@ type PosterCache struct {
|
||||
}
|
||||
|
||||
type Poster interface {
|
||||
Post([]byte, string) error
|
||||
Post(body []byte, fallbackName, key string) error
|
||||
Close()
|
||||
}
|
||||
|
||||
@@ -188,7 +189,7 @@ func (pc *PosterCache) PostAMQP(dialURL string, attempts int,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return amqpPoster.Post(content, fallbackFileName)
|
||||
return amqpPoster.Post(content, fallbackFileName, "")
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostAMQPv1(dialURL string, attempts int,
|
||||
@@ -197,7 +198,7 @@ func (pc *PosterCache) PostAMQPv1(dialURL string, attempts int,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return AMQPv1Poster.Post(content, fallbackFileName)
|
||||
return AMQPv1Poster.Post(content, fallbackFileName, "")
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostSQS(dialURL string, attempts int,
|
||||
@@ -206,23 +207,23 @@ func (pc *PosterCache) PostSQS(dialURL string, attempts int,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sqsPoster.Post(content, fallbackFileName)
|
||||
return sqsPoster.Post(content, fallbackFileName, "")
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostKafka(dialURL string, attempts int,
|
||||
content []byte, fallbackFileDir, fallbackFileName string) error {
|
||||
content []byte, fallbackFileDir, fallbackFileName, key string) error {
|
||||
kafkaPoster, err := pc.GetKafkaPoster(dialURL, attempts, fallbackFileDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return kafkaPoster.Post(content, fallbackFileName)
|
||||
return kafkaPoster.Post(content, fallbackFileName, key)
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostS3(dialURL string, attempts int,
|
||||
content []byte, fallbackFileDir, fallbackFileName string) error {
|
||||
content []byte, fallbackFileDir, fallbackFileName, key string) error {
|
||||
sqsPoster, err := pc.GetS3Poster(dialURL, attempts, fallbackFileDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sqsPoster.Post(content, fallbackFileName)
|
||||
return sqsPoster.Post(content, fallbackFileName, key)
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ func (pstr *AMQPPoster) parseURL(dialURL string) error {
|
||||
|
||||
// Post is the method being called when we need to post anything in the queue
|
||||
// the optional chn will permits channel caching
|
||||
func (pstr *AMQPPoster) Post(content []byte, fallbackFileName string) (err error) {
|
||||
func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err error) {
|
||||
var chn *amqp.Channel
|
||||
fib := utils.Fib()
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ func (pstr *AMQPv1Poster) Close() {
|
||||
pstr.Unlock()
|
||||
}
|
||||
|
||||
func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName string) (err error) {
|
||||
func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err error) {
|
||||
var s *amqpv1.Session
|
||||
fib := utils.Fib()
|
||||
|
||||
|
||||
@@ -65,11 +65,11 @@ func (pstr *KafkaPoster) parseURL(dialURL string) error {
|
||||
|
||||
// Post is the method being called when we need to post anything in the queue
|
||||
// the optional chn will permits channel caching
|
||||
func (pstr *KafkaPoster) Post(content []byte, fallbackFileName string) (err error) {
|
||||
func (pstr *KafkaPoster) Post(content []byte, fallbackFileName, key string) (err error) {
|
||||
pstr.newPostWriter()
|
||||
pstr.Lock()
|
||||
if err = pstr.writer.WriteMessages(context.Background(), kafka.Message{
|
||||
Key: []byte(utils.UUIDSha1Prefix()),
|
||||
Key: []byte(key),
|
||||
Value: content,
|
||||
}); err == nil {
|
||||
pstr.Unlock()
|
||||
|
||||
@@ -21,7 +21,6 @@ package engine
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -38,10 +37,7 @@ func NewS3Poster(dialURL string, attempts int, fallbackFileDir string) (Poster,
|
||||
attempts: attempts,
|
||||
fallbackFileDir: fallbackFileDir,
|
||||
}
|
||||
err := pstr.parseURL(dialURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pstr.parseURL(dialURL)
|
||||
return pstr, nil
|
||||
}
|
||||
|
||||
@@ -54,51 +50,40 @@ type S3Poster struct {
|
||||
awsToken string
|
||||
attempts int
|
||||
fallbackFileDir string
|
||||
queueURL *string
|
||||
queueID string
|
||||
// getQueueOnce sync.Once
|
||||
session *session.Session
|
||||
folderPath string
|
||||
session *session.Session
|
||||
}
|
||||
|
||||
func (pstr *S3Poster) Close() {}
|
||||
|
||||
func (pstr *S3Poster) parseURL(dialURL string) (err error) {
|
||||
u, err := url.Parse(dialURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
qry := u.Query()
|
||||
func (pstr *S3Poster) parseURL(dialURL string) {
|
||||
qry := utils.GetUrlRawArguments(dialURL)
|
||||
|
||||
pstr.dialURL = strings.Split(dialURL, "?")[0]
|
||||
pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint
|
||||
pstr.queueID = defaultQueueID
|
||||
if vals, has := qry[queueID]; has && len(vals) != 0 {
|
||||
pstr.queueID = vals[0]
|
||||
if val, has := qry[queueID]; has {
|
||||
pstr.queueID = val
|
||||
}
|
||||
if vals, has := qry[awsRegion]; has && len(vals) != 0 {
|
||||
pstr.awsRegion = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<S3Poster> No region present for AWS.")
|
||||
if val, has := qry[folderPath]; has {
|
||||
pstr.folderPath = val
|
||||
}
|
||||
if vals, has := qry[awsID]; has && len(vals) != 0 {
|
||||
pstr.awsID = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<S3Poster> No access key ID present for AWS.")
|
||||
if val, has := qry[awsRegion]; has {
|
||||
pstr.awsRegion = val
|
||||
}
|
||||
if vals, has := qry[awsSecret]; has && len(vals) != 0 {
|
||||
pstr.awsKey = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<S3Poster> No secret access key present for AWS.")
|
||||
if val, has := qry[awsID]; has {
|
||||
pstr.awsID = val
|
||||
}
|
||||
if vals, has := qry[awsToken]; has && len(vals) != 0 {
|
||||
pstr.awsToken = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<S3Poster> No session token present for AWS.")
|
||||
if val, has := qry[awsSecret]; has {
|
||||
pstr.awsKey = val
|
||||
}
|
||||
if val, has := qry[awsToken]; has {
|
||||
pstr.awsToken = val
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pstr *S3Poster) Post(message []byte, fallbackFileName string) (err error) {
|
||||
func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err error) {
|
||||
var svc *s3manager.Uploader
|
||||
fib := utils.Fib()
|
||||
|
||||
@@ -123,7 +108,7 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName string) (err error)
|
||||
// Can also use the `filepath` standard library package to modify the
|
||||
// filename as need for an S3 object key. Such as turning absolute path
|
||||
// to a relative path.
|
||||
Key: aws.String(fallbackFileName),
|
||||
Key: aws.String(fmt.Sprintf("%s/%s.json", pstr.folderPath, key)),
|
||||
|
||||
// The file to be uploaded. io.ReadSeeker is preferred as the Uploader
|
||||
// will be able to optimize memory when uploading large content. io.Reader
|
||||
|
||||
@@ -20,7 +20,6 @@ package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -38,10 +37,7 @@ func NewSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster,
|
||||
attempts: attempts,
|
||||
fallbackFileDir: fallbackFileDir,
|
||||
}
|
||||
err := pstr.parseURL(dialURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pstr.parseURL(dialURL)
|
||||
return pstr, nil
|
||||
}
|
||||
|
||||
@@ -62,41 +58,28 @@ type SQSPoster struct {
|
||||
|
||||
func (pstr *SQSPoster) Close() {}
|
||||
|
||||
func (pstr *SQSPoster) parseURL(dialURL string) (err error) {
|
||||
u, err := url.Parse(dialURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
qry := u.Query()
|
||||
func (pstr *SQSPoster) parseURL(dialURL string) {
|
||||
qry := utils.GetUrlRawArguments(dialURL)
|
||||
|
||||
pstr.dialURL = strings.Split(dialURL, "?")[0]
|
||||
pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint
|
||||
pstr.queueID = defaultQueueID
|
||||
if vals, has := qry[queueID]; has && len(vals) != 0 {
|
||||
pstr.queueID = vals[0]
|
||||
if val, has := qry[queueID]; has {
|
||||
pstr.queueID = val
|
||||
}
|
||||
if vals, has := qry[awsRegion]; has && len(vals) != 0 {
|
||||
pstr.awsRegion = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No region present for AWS.")
|
||||
if val, has := qry[awsRegion]; has {
|
||||
pstr.awsRegion = val
|
||||
}
|
||||
if vals, has := qry[awsID]; has && len(vals) != 0 {
|
||||
pstr.awsID = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No access key ID present for AWS.")
|
||||
if val, has := qry[awsID]; has {
|
||||
pstr.awsID = val
|
||||
}
|
||||
if vals, has := qry[awsSecret]; has && len(vals) != 0 {
|
||||
pstr.awsKey = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No secret access key present for AWS.")
|
||||
if val, has := qry[awsSecret]; has {
|
||||
pstr.awsKey = val
|
||||
}
|
||||
if vals, has := qry[awsToken]; has && len(vals) != 0 {
|
||||
pstr.awsToken = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No session token present for AWS.")
|
||||
if val, has := qry[awsToken]; has {
|
||||
pstr.awsToken = val
|
||||
}
|
||||
pstr.getQueueURL()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pstr *SQSPoster) getQueueURL() (err error) {
|
||||
@@ -132,7 +115,7 @@ func (pstr *SQSPoster) getQueueURL() (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
func (pstr *SQSPoster) Post(message []byte, fallbackFileName string) (err error) {
|
||||
func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err error) {
|
||||
var svc *sqs.SQS
|
||||
fib := utils.Fib()
|
||||
|
||||
|
||||
@@ -963,3 +963,24 @@ func ReverseString(s string) string {
|
||||
}
|
||||
return string(r)
|
||||
}
|
||||
|
||||
func GetUrlRawArguments(dialURL string) (out map[string]string) {
|
||||
out = make(map[string]string)
|
||||
idx := strings.IndexRune(dialURL, '?')
|
||||
if idx == -1 {
|
||||
return
|
||||
}
|
||||
strParams := dialURL[idx+1:]
|
||||
if len(strParams) == 0 {
|
||||
return
|
||||
}
|
||||
vecParams := strings.Split(strParams, "&")
|
||||
for _, paramPair := range vecParams {
|
||||
idx := strings.IndexRune(paramPair, '=')
|
||||
if idx == -1 {
|
||||
continue
|
||||
}
|
||||
out[paramPair[:idx]] = paramPair[idx+1:]
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user