diff --git a/data/conf/samples/cdrsonexpmaster/cdrsreplicationmaster.json b/data/conf/samples/cdrsonexpmaster/cdrsreplicationmaster.json index f3cdba56b..93d7d18f9 100644 --- a/data/conf/samples/cdrsonexpmaster/cdrsreplicationmaster.json +++ b/data/conf/samples/cdrsonexpmaster/cdrsreplicationmaster.json @@ -92,8 +92,8 @@ }, "sqs_test_file": { "export_format": "*sqs_json_map", - // export_path for sqs: "endpoint/?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken" - "export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret", + // export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" + "export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", "content_fields": [ {"tag": "CGRID", "type": "*composed", "value": "~CGRID", "field_id": "CGRID"}, ], diff --git a/engine/poster.go b/engine/poster.go index ad35868a1..d7b5ed360 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -33,6 +33,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" @@ -52,10 +53,11 @@ const ( exchangeType = "exchange_type" routingKey = "routing_key" - awsRegion = "aws_region" - awsID = "aws_key" - awsSecret = "aws_secret" - awsToken = "aws_token" + awsRegion = "aws_region" + awsID = "aws_key" + awsSecret = "aws_secret" + awsToken = "aws_token" + awsAccountID = "aws_account_id" ) func init() { @@ -557,14 +559,16 @@ func NewSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, type SQSPoster struct { sync.Mutex dialURL string - queueID string // identifier of the CDR queue where we publish awsRegion string awsID string awsKey string awsToken string attempts int fallbackFileDir string - session *session.Session + queueURL *string + queueID string + // getQueueOnce sync.Once + session *session.Session } func (pstr *SQSPoster) Close() {} @@ -582,27 +586,61 @@ func (pstr *SQSPoster) parseURL(dialURL string) (err error) { pstr.queueID = vals[0] } if vals, has := qry[awsRegion]; has && len(vals) != 0 { - pstr.awsRegion = vals[0] + pstr.awsRegion = url.QueryEscape(vals[0]) } else { utils.Logger.Warning(" No region present for AWS.") } if vals, has := qry[awsID]; has && len(vals) != 0 { - pstr.awsID = vals[0] + pstr.awsID = url.QueryEscape(vals[0]) } else { utils.Logger.Warning(" No access key ID present for AWS.") } if vals, has := qry[awsSecret]; has && len(vals) != 0 { - pstr.awsKey = vals[0] + pstr.awsKey = url.QueryEscape(vals[0]) } else { utils.Logger.Warning(" No secret access key present for AWS.") } if vals, has := qry[awsToken]; has && len(vals) != 0 { - pstr.awsToken = vals[0] + pstr.awsToken = url.QueryEscape(vals[0]) } else { utils.Logger.Warning(" No session token present for AWS.") } - return nil + return pstr.getQueueURL() } + +func (pstr *SQSPoster) getQueueURL() (err error) { + if pstr.queueURL != nil { + return nil + } + // pstr.getQueueOnce.Do(func() { + var svc *sqs.SQS + if svc, err = pstr.newPosterSession(); err != nil { + return + } + var result *sqs.GetQueueUrlOutput + if result, err = svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(pstr.queueID), + }); err == nil { + pstr.queueURL = new(string) + *(pstr.queueURL) = *result.QueueUrl + return + } + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { + // For CreateQueue + var createResult *sqs.CreateQueueOutput + if createResult, err = svc.CreateQueue(&sqs.CreateQueueInput{ + QueueName: aws.String(pstr.queueID), + }); err == nil { + pstr.queueURL = new(string) + *(pstr.queueURL) = *createResult.QueueUrl + return + } + } + utils.Logger.Warning(fmt.Sprintf(" can not get url for queue with ID=%s because err: %v", pstr.queueID, err)) + // }) + return err +} + func (pstr *SQSPoster) Post(message []byte, fallbackFileName string) (err error) { var svc *sqs.SQS fib := utils.Fib() @@ -622,32 +660,17 @@ func (pstr *SQSPoster) Post(message []byte, fallbackFileName string) (err error) } for i := 0; i < pstr.attempts; i++ { - // For CreateQueue - if _, err = svc.CreateQueue(&sqs.CreateQueueInput{ - QueueName: aws.String(pstr.queueID), - }); err != nil { - continue - } - result, err := svc.GetQueueUrl( - &sqs.GetQueueUrlInput{ - QueueName: aws.String(pstr.queueID), - }, - ) - if err != nil { - time.Sleep(time.Duration(fib()) * time.Second) - continue - } - if _, err = svc.SendMessage( &sqs.SendMessageInput{ MessageBody: aws.String(string(message)), - QueueUrl: aws.String(*result.QueueUrl), + QueueUrl: pstr.queueURL, }, ); err == nil { break } } if err != nil && fallbackFileName != utils.META_NONE { + utils.Logger.Warning(fmt.Sprintf(" posting new message, err: %s", err.Error())) err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message) } return err @@ -669,7 +692,6 @@ func (pstr *SQSPoster) newPosterSession() (s *sqs.SQS, err error) { ses, err = session.NewSessionWithOptions( session.Options{ Config: cfg, - // SharedConfigState: session.SharedConfigEnable, }, ) if err != nil {