From c7e4c333914c838702694175fc245941f26e7802 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 9 Jul 2019 17:09:35 +0300 Subject: [PATCH] Updated S3Poster --- apier/v1/apier.go | 4 ++-- config/config.go | 14 ++++++++++- engine/action.go | 4 ++-- engine/cdre.go | 4 ++-- engine/poster.go | 27 +++++++++++---------- engine/pstr_amqp.go | 2 +- engine/pstr_amqpv1.go | 2 +- engine/pstr_kafka.go | 4 ++-- engine/pstr_s3.go | 55 ++++++++++++++++--------------------------- engine/pstr_sqs.go | 45 +++++++++++------------------------ utils/coreutils.go | 21 +++++++++++++++++ 11 files changed, 92 insertions(+), 90 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 6fc0eb9a3..0088e0c45 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1068,10 +1068,10 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) ( fileContent, failedReqsOutDir, file.Name()) case utils.MetaKafkajsonMap: err = engine.PostersCache.PostKafka(ffn.Address, v1.Config.GeneralCfg().PosterAttempts, - fileContent, failedReqsOutDir, file.Name()) + fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix()) case utils.MetaS3jsonMap: err = engine.PostersCache.PostS3(ffn.Address, v1.Config.GeneralCfg().PosterAttempts, - fileContent, failedReqsOutDir, file.Name()) + fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix()) default: err = fmt.Errorf("unsupported replication transport: %s", ffn.Transport) } diff --git a/config/config.go b/config/config.go index 9d3d8f55c..e24e5999c 100755 --- a/config/config.go +++ b/config/config.go @@ -395,8 +395,20 @@ func (self *CGRConfig) checkConfigSanity() error { } } for _, cdrePrfl := range self.cdrsCfg.CDRSOnlineCDRExports { - if _, hasIt := self.CdreProfiles[cdrePrfl]; !hasIt { + if cdreProfile, hasIt := self.CdreProfiles[cdrePrfl]; !hasIt { return fmt.Errorf(" Cannot find CDR export template with ID: <%s>", cdrePrfl) + } else if cdreProfile.ExportFormat == utils.MetaS3jsonMap || cdreProfile.ExportFormat == utils.MetaSQSjsonMap { + poster := "SQSPoster" + if cdreProfile.ExportFormat == utils.MetaS3jsonMap { + poster = "S3Poster" + } + neededArgs := []string{"aws_region", "aws_key", "aws_secret"} + args := utils.GetUrlRawArguments(cdreProfile.ExportPath) + for _, arg := range neededArgs { + if _, has := args[arg]; !has { + utils.Logger.Err(fmt.Sprintf("<%s> No %s present for AWS for cdre: <%s>.", poster, arg, cdrePrfl)) + } + } } } if !self.thresholdSCfg.Enabled { diff --git a/engine/action.go b/engine/action.go index 27bece556..8585ae5a4 100644 --- a/engine/action.go +++ b/engine/action.go @@ -469,7 +469,7 @@ func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error }).AsString() return PostersCache.PostKafka(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, - body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName) + body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName, utils.UUIDSha1Prefix()) } func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error { @@ -487,7 +487,7 @@ func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error { }).AsString() return PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, - body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName) + body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName, utils.UUIDSha1Prefix()) } func callUrl(ub *Account, a *Action, acs Actions, extraData interface{}) error { diff --git a/engine/cdre.go b/engine/cdre.go index d36e6dbc7..11c782fa7 100644 --- a/engine/cdre.go +++ b/engine/cdre.go @@ -288,9 +288,9 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) { case utils.MetaSQSjsonMap: err = PostersCache.PostSQS(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName) case utils.MetaKafkajsonMap: - err = PostersCache.PostKafka(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName) + err = PostersCache.PostKafka(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID)) case utils.MetaS3jsonMap: - err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName) + err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID)) } return } diff --git a/engine/poster.go b/engine/poster.go index 78ef599bf..3c8ce03a6 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -39,11 +39,12 @@ const ( exchangeType = "exchange_type" routingKey = "routing_key" - awsRegion = "aws_region" - awsID = "aws_key" - awsSecret = "aws_secret" - awsToken = "aws_token" - awsAccountID = "aws_account_id" + awsRegion = "aws_region" + awsID = "aws_key" + awsSecret = "aws_secret" + awsToken = "aws_token" + // awsAccountID = "aws_account_id" + folderPath = "folder_path" ) func init() { @@ -68,7 +69,7 @@ type PosterCache struct { } type Poster interface { - Post([]byte, string) error + Post(body []byte, fallbackName, key string) error Close() } @@ -188,7 +189,7 @@ func (pc *PosterCache) PostAMQP(dialURL string, attempts int, if err != nil { return err } - return amqpPoster.Post(content, fallbackFileName) + return amqpPoster.Post(content, fallbackFileName, "") } func (pc *PosterCache) PostAMQPv1(dialURL string, attempts int, @@ -197,7 +198,7 @@ func (pc *PosterCache) PostAMQPv1(dialURL string, attempts int, if err != nil { return err } - return AMQPv1Poster.Post(content, fallbackFileName) + return AMQPv1Poster.Post(content, fallbackFileName, "") } func (pc *PosterCache) PostSQS(dialURL string, attempts int, @@ -206,23 +207,23 @@ func (pc *PosterCache) PostSQS(dialURL string, attempts int, if err != nil { return err } - return sqsPoster.Post(content, fallbackFileName) + return sqsPoster.Post(content, fallbackFileName, "") } func (pc *PosterCache) PostKafka(dialURL string, attempts int, - content []byte, fallbackFileDir, fallbackFileName string) error { + content []byte, fallbackFileDir, fallbackFileName, key string) error { kafkaPoster, err := pc.GetKafkaPoster(dialURL, attempts, fallbackFileDir) if err != nil { return err } - return kafkaPoster.Post(content, fallbackFileName) + return kafkaPoster.Post(content, fallbackFileName, key) } func (pc *PosterCache) PostS3(dialURL string, attempts int, - content []byte, fallbackFileDir, fallbackFileName string) error { + content []byte, fallbackFileDir, fallbackFileName, key string) error { sqsPoster, err := pc.GetS3Poster(dialURL, attempts, fallbackFileDir) if err != nil { return err } - return sqsPoster.Post(content, fallbackFileName) + return sqsPoster.Post(content, fallbackFileName, key) } diff --git a/engine/pstr_amqp.go b/engine/pstr_amqp.go index 41c2f2db0..38acc8d47 100644 --- a/engine/pstr_amqp.go +++ b/engine/pstr_amqp.go @@ -87,7 +87,7 @@ func (pstr *AMQPPoster) parseURL(dialURL string) error { // Post is the method being called when we need to post anything in the queue // the optional chn will permits channel caching -func (pstr *AMQPPoster) Post(content []byte, fallbackFileName string) (err error) { +func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err error) { var chn *amqp.Channel fib := utils.Fib() diff --git a/engine/pstr_amqpv1.go b/engine/pstr_amqpv1.go index fc1f25aef..36ac7f7b8 100644 --- a/engine/pstr_amqpv1.go +++ b/engine/pstr_amqpv1.go @@ -60,7 +60,7 @@ func (pstr *AMQPv1Poster) Close() { pstr.Unlock() } -func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName string) (err error) { +func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName, _ string) (err error) { var s *amqpv1.Session fib := utils.Fib() diff --git a/engine/pstr_kafka.go b/engine/pstr_kafka.go index 1c71edb10..d7df61d26 100644 --- a/engine/pstr_kafka.go +++ b/engine/pstr_kafka.go @@ -65,11 +65,11 @@ func (pstr *KafkaPoster) parseURL(dialURL string) error { // Post is the method being called when we need to post anything in the queue // the optional chn will permits channel caching -func (pstr *KafkaPoster) Post(content []byte, fallbackFileName string) (err error) { +func (pstr *KafkaPoster) Post(content []byte, fallbackFileName, key string) (err error) { pstr.newPostWriter() pstr.Lock() if err = pstr.writer.WriteMessages(context.Background(), kafka.Message{ - Key: []byte(utils.UUIDSha1Prefix()), + Key: []byte(key), Value: content, }); err == nil { pstr.Unlock() diff --git a/engine/pstr_s3.go b/engine/pstr_s3.go index bbdec5f1b..d2fc2a835 100644 --- a/engine/pstr_s3.go +++ b/engine/pstr_s3.go @@ -21,7 +21,6 @@ package engine import ( "bytes" "fmt" - "net/url" "strings" "sync" "time" @@ -38,10 +37,7 @@ func NewS3Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, attempts: attempts, fallbackFileDir: fallbackFileDir, } - err := pstr.parseURL(dialURL) - if err != nil { - return nil, err - } + pstr.parseURL(dialURL) return pstr, nil } @@ -54,51 +50,40 @@ type S3Poster struct { awsToken string attempts int fallbackFileDir string - queueURL *string queueID string - // getQueueOnce sync.Once - session *session.Session + folderPath string + session *session.Session } func (pstr *S3Poster) Close() {} -func (pstr *S3Poster) parseURL(dialURL string) (err error) { - u, err := url.Parse(dialURL) - if err != nil { - return err - } - qry := u.Query() +func (pstr *S3Poster) parseURL(dialURL string) { + qry := utils.GetUrlRawArguments(dialURL) pstr.dialURL = strings.Split(dialURL, "?")[0] pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint pstr.queueID = defaultQueueID - if vals, has := qry[queueID]; has && len(vals) != 0 { - pstr.queueID = vals[0] + if val, has := qry[queueID]; has { + pstr.queueID = val } - if vals, has := qry[awsRegion]; has && len(vals) != 0 { - pstr.awsRegion = vals[0] - } else { - utils.Logger.Warning(" No region present for AWS.") + if val, has := qry[folderPath]; has { + pstr.folderPath = val } - if vals, has := qry[awsID]; has && len(vals) != 0 { - pstr.awsID = vals[0] - } else { - utils.Logger.Warning(" No access key ID present for AWS.") + if val, has := qry[awsRegion]; has { + pstr.awsRegion = val } - if vals, has := qry[awsSecret]; has && len(vals) != 0 { - pstr.awsKey = vals[0] - } else { - utils.Logger.Warning(" No secret access key present for AWS.") + if val, has := qry[awsID]; has { + pstr.awsID = val } - if vals, has := qry[awsToken]; has && len(vals) != 0 { - pstr.awsToken = vals[0] - } else { - utils.Logger.Warning(" No session token present for AWS.") + if val, has := qry[awsSecret]; has { + pstr.awsKey = val + } + if val, has := qry[awsToken]; has { + pstr.awsToken = val } - return nil } -func (pstr *S3Poster) Post(message []byte, fallbackFileName string) (err error) { +func (pstr *S3Poster) Post(message []byte, fallbackFileName, key string) (err error) { var svc *s3manager.Uploader fib := utils.Fib() @@ -123,7 +108,7 @@ func (pstr *S3Poster) Post(message []byte, fallbackFileName string) (err error) // Can also use the `filepath` standard library package to modify the // filename as need for an S3 object key. Such as turning absolute path // to a relative path. - Key: aws.String(fallbackFileName), + Key: aws.String(fmt.Sprintf("%s/%s.json", pstr.folderPath, key)), // The file to be uploaded. io.ReadSeeker is preferred as the Uploader // will be able to optimize memory when uploading large content. io.Reader diff --git a/engine/pstr_sqs.go b/engine/pstr_sqs.go index c081df477..6e7a13c74 100644 --- a/engine/pstr_sqs.go +++ b/engine/pstr_sqs.go @@ -20,7 +20,6 @@ package engine import ( "fmt" - "net/url" "strings" "sync" "time" @@ -38,10 +37,7 @@ func NewSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, attempts: attempts, fallbackFileDir: fallbackFileDir, } - err := pstr.parseURL(dialURL) - if err != nil { - return nil, err - } + pstr.parseURL(dialURL) return pstr, nil } @@ -62,41 +58,28 @@ type SQSPoster struct { func (pstr *SQSPoster) Close() {} -func (pstr *SQSPoster) parseURL(dialURL string) (err error) { - u, err := url.Parse(dialURL) - if err != nil { - return err - } - qry := u.Query() +func (pstr *SQSPoster) parseURL(dialURL string) { + qry := utils.GetUrlRawArguments(dialURL) pstr.dialURL = strings.Split(dialURL, "?")[0] pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint pstr.queueID = defaultQueueID - if vals, has := qry[queueID]; has && len(vals) != 0 { - pstr.queueID = vals[0] + if val, has := qry[queueID]; has { + pstr.queueID = val } - if vals, has := qry[awsRegion]; has && len(vals) != 0 { - pstr.awsRegion = vals[0] - } else { - utils.Logger.Warning(" No region present for AWS.") + if val, has := qry[awsRegion]; has { + pstr.awsRegion = val } - if vals, has := qry[awsID]; has && len(vals) != 0 { - pstr.awsID = vals[0] - } else { - utils.Logger.Warning(" No access key ID present for AWS.") + if val, has := qry[awsID]; has { + pstr.awsID = val } - if vals, has := qry[awsSecret]; has && len(vals) != 0 { - pstr.awsKey = vals[0] - } else { - utils.Logger.Warning(" No secret access key present for AWS.") + if val, has := qry[awsSecret]; has { + pstr.awsKey = val } - if vals, has := qry[awsToken]; has && len(vals) != 0 { - pstr.awsToken = vals[0] - } else { - utils.Logger.Warning(" No session token present for AWS.") + if val, has := qry[awsToken]; has { + pstr.awsToken = val } pstr.getQueueURL() - return nil } func (pstr *SQSPoster) getQueueURL() (err error) { @@ -132,7 +115,7 @@ func (pstr *SQSPoster) getQueueURL() (err error) { return err } -func (pstr *SQSPoster) Post(message []byte, fallbackFileName string) (err error) { +func (pstr *SQSPoster) Post(message []byte, fallbackFileName, _ string) (err error) { var svc *sqs.SQS fib := utils.Fib() diff --git a/utils/coreutils.go b/utils/coreutils.go index 3db3493d2..38e91e158 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -963,3 +963,24 @@ func ReverseString(s string) string { } return string(r) } + +func GetUrlRawArguments(dialURL string) (out map[string]string) { + out = make(map[string]string) + idx := strings.IndexRune(dialURL, '?') + if idx == -1 { + return + } + strParams := dialURL[idx+1:] + if len(strParams) == 0 { + return + } + vecParams := strings.Split(strParams, "&") + for _, paramPair := range vecParams { + idx := strings.IndexRune(paramPair, '=') + if idx == -1 { + continue + } + out[paramPair[:idx]] = paramPair[idx+1:] + } + return +}