Updated SQSPoster to create queue only if the queue doesn't exist already

This commit is contained in:
Trial97
2019-03-04 16:43:36 +02:00
committed by Dan Christian Bogos
parent fbf9add765
commit 48cf929b52
2 changed files with 53 additions and 31 deletions

View File

@@ -92,8 +92,8 @@
},
"sqs_test_file": {
"export_format": "*sqs_json_map",
// export_path for sqs: "endpoint/?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken"
"export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret",
// export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
"content_fields": [
{"tag": "CGRID", "type": "*composed", "value": "~CGRID", "field_id": "CGRID"},
],

View File

@@ -33,6 +33,7 @@ import (
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
@@ -52,10 +53,11 @@ const (
exchangeType = "exchange_type"
routingKey = "routing_key"
awsRegion = "aws_region"
awsID = "aws_key"
awsSecret = "aws_secret"
awsToken = "aws_token"
awsRegion = "aws_region"
awsID = "aws_key"
awsSecret = "aws_secret"
awsToken = "aws_token"
awsAccountID = "aws_account_id"
)
func init() {
@@ -557,14 +559,16 @@ func NewSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster,
type SQSPoster struct {
sync.Mutex
dialURL string
queueID string // identifier of the CDR queue where we publish
awsRegion string
awsID string
awsKey string
awsToken string
attempts int
fallbackFileDir string
session *session.Session
queueURL *string
queueID string
// getQueueOnce sync.Once
session *session.Session
}
func (pstr *SQSPoster) Close() {}
@@ -582,27 +586,61 @@ func (pstr *SQSPoster) parseURL(dialURL string) (err error) {
pstr.queueID = vals[0]
}
if vals, has := qry[awsRegion]; has && len(vals) != 0 {
pstr.awsRegion = vals[0]
pstr.awsRegion = url.QueryEscape(vals[0])
} else {
utils.Logger.Warning("<SQSPoster> No region present for AWS.")
}
if vals, has := qry[awsID]; has && len(vals) != 0 {
pstr.awsID = vals[0]
pstr.awsID = url.QueryEscape(vals[0])
} else {
utils.Logger.Warning("<SQSPoster> No access key ID present for AWS.")
}
if vals, has := qry[awsSecret]; has && len(vals) != 0 {
pstr.awsKey = vals[0]
pstr.awsKey = url.QueryEscape(vals[0])
} else {
utils.Logger.Warning("<SQSPoster> No secret access key present for AWS.")
}
if vals, has := qry[awsToken]; has && len(vals) != 0 {
pstr.awsToken = vals[0]
pstr.awsToken = url.QueryEscape(vals[0])
} else {
utils.Logger.Warning("<SQSPoster> No session token present for AWS.")
}
return nil
return pstr.getQueueURL()
}
func (pstr *SQSPoster) getQueueURL() (err error) {
if pstr.queueURL != nil {
return nil
}
// pstr.getQueueOnce.Do(func() {
var svc *sqs.SQS
if svc, err = pstr.newPosterSession(); err != nil {
return
}
var result *sqs.GetQueueUrlOutput
if result, err = svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(pstr.queueID),
}); err == nil {
pstr.queueURL = new(string)
*(pstr.queueURL) = *result.QueueUrl
return
}
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist {
// For CreateQueue
var createResult *sqs.CreateQueueOutput
if createResult, err = svc.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(pstr.queueID),
}); err == nil {
pstr.queueURL = new(string)
*(pstr.queueURL) = *createResult.QueueUrl
return
}
}
utils.Logger.Warning(fmt.Sprintf("<SQSPoster> can not get url for queue with ID=%s because err: %v", pstr.queueID, err))
// })
return err
}
func (pstr *SQSPoster) Post(message []byte, fallbackFileName string) (err error) {
var svc *sqs.SQS
fib := utils.Fib()
@@ -622,32 +660,17 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName string) (err error)
}
for i := 0; i < pstr.attempts; i++ {
// For CreateQueue
if _, err = svc.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(pstr.queueID),
}); err != nil {
continue
}
result, err := svc.GetQueueUrl(
&sqs.GetQueueUrlInput{
QueueName: aws.String(pstr.queueID),
},
)
if err != nil {
time.Sleep(time.Duration(fib()) * time.Second)
continue
}
if _, err = svc.SendMessage(
&sqs.SendMessageInput{
MessageBody: aws.String(string(message)),
QueueUrl: aws.String(*result.QueueUrl),
QueueUrl: pstr.queueURL,
},
); err == nil {
break
}
}
if err != nil && fallbackFileName != utils.META_NONE {
utils.Logger.Warning(fmt.Sprintf("<SQSPoster> posting new message, err: %s", err.Error()))
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message)
}
return err
@@ -669,7 +692,6 @@ func (pstr *SQSPoster) newPosterSession() (s *sqs.SQS, err error) {
ses, err = session.NewSessionWithOptions(
session.Options{
Config: cfg,
// SharedConfigState: session.SharedConfigEnable,
},
)
if err != nil {