mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-25 09:08:45 +05:00
Added support for AWS SQS. closes #1396
This commit is contained in:
committed by
Dan Christian Bogos
parent
af059c1aa6
commit
9d92f4cb92
@@ -126,6 +126,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
|
||||
MetaPublishBalance: publishBalance,
|
||||
utils.MetaAMQPjsonMap: sendAMQP,
|
||||
utils.MetaAWSjsonMap: sendAWS,
|
||||
utils.MetaSQSjsonMap: sendSQS,
|
||||
}
|
||||
f, exists := actionFuncMap[typ]
|
||||
return f, exists
|
||||
@@ -391,7 +392,7 @@ func sendAMQP(ub *Account, a *Action, acs Actions, extraData interface{}) error
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
}).AsString()
|
||||
|
||||
return AMQPPostersCache.PostAMQP(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
return PostersCache.PostAMQP(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
body, utils.CONTENT_JSON, cfg.GeneralCfg().FailedPostsDir, fallbackFileName)
|
||||
}
|
||||
|
||||
@@ -409,7 +410,25 @@ func sendAWS(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
}).AsString()
|
||||
|
||||
return AMQPPostersCache.PostAWS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
return PostersCache.PostAWS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName)
|
||||
}
|
||||
|
||||
func sendSQS(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
body, err := getOneData(ub, extraData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := config.CgrConfig()
|
||||
fallbackFileName := (&utils.FallbackFileName{
|
||||
Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaSQSjsonMap,
|
||||
Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(),
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
}).AsString()
|
||||
|
||||
return PostersCache.PostSQS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName)
|
||||
}
|
||||
|
||||
|
||||
@@ -246,7 +246,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
|
||||
return err
|
||||
}
|
||||
body = jsn
|
||||
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAWSjsonMap:
|
||||
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAWSjsonMap, utils.MetaSQSjsonMap:
|
||||
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.roundingDecimals, cdre.filterS)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -283,10 +283,12 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
|
||||
_, err = cdre.httpPoster.Post(cdre.exportPath, utils.PosterTransportContentTypes[cdre.exportFormat], body, cdre.attempts, fallbackPath)
|
||||
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
|
||||
err = AMQPPostersCache.PostAMQP(cdre.exportPath, cdre.attempts, body.([]byte),
|
||||
err = PostersCache.PostAMQP(cdre.exportPath, cdre.attempts, body.([]byte),
|
||||
utils.PosterTransportContentTypes[cdre.exportFormat], cdre.fallbackPath, fallbackFileName)
|
||||
case utils.MetaAWSjsonMap:
|
||||
err = AMQPPostersCache.PostAWS(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName)
|
||||
err = PostersCache.PostAWS(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName)
|
||||
case utils.MetaSQSjsonMap:
|
||||
err = PostersCache.PostSQS(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
365
engine/poster.go
365
engine/poster.go
@@ -32,6 +32,10 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/sqs"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/streadway/amqp"
|
||||
@@ -47,16 +51,22 @@ const (
|
||||
exchange = "exchange"
|
||||
exchangeType = "exchange_type"
|
||||
routingKey = "routing_key"
|
||||
|
||||
awsRegion = "aws_region"
|
||||
awsID = "aws_key"
|
||||
awsSecret = "aws_secret"
|
||||
awsToken = "aws_token"
|
||||
)
|
||||
|
||||
func init() {
|
||||
AMQPPostersCache = &AMQPCachedPosters{
|
||||
cache: make(map[string]*AMQPPoster),
|
||||
cache2: make(map[string]*AWSPoster),
|
||||
PostersCache = &PosterCache{
|
||||
amqpCache: make(map[string]Poster),
|
||||
awsCache: make(map[string]Poster),
|
||||
sqsCache: make(map[string]Poster),
|
||||
} // Initialize the cache for amqpPosters
|
||||
}
|
||||
|
||||
var AMQPPostersCache *AMQPCachedPosters
|
||||
var PostersCache *PosterCache
|
||||
|
||||
// Post without automatic failover
|
||||
func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error) {
|
||||
@@ -151,69 +161,124 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
|
||||
return
|
||||
}
|
||||
|
||||
// AMQPPosterCache is used to cache mutliple AMQPPoster connections based on the address
|
||||
type AMQPCachedPosters struct {
|
||||
type Poster interface {
|
||||
Post([]byte, string) error
|
||||
Close()
|
||||
}
|
||||
|
||||
func writeToFile(fileDir, fileName string, content []byte) (err error) {
|
||||
fallbackFilePath := path.Join(fileDir, fileName)
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
fileOut, err := os.Create(fallbackFilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = fileOut.Write(content)
|
||||
fileOut.Close()
|
||||
return nil, err
|
||||
}, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath)
|
||||
return
|
||||
}
|
||||
|
||||
func parseURL(dialURL string) (URL string, qID string, err error) {
|
||||
u, err := url.Parse(dialURL)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
qry := u.Query()
|
||||
URL = strings.Split(dialURL, "?")[0]
|
||||
qID = defaultQueueID
|
||||
if vals, has := qry[queueID]; has && len(vals) != 0 {
|
||||
qID = vals[0]
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type PosterCache struct {
|
||||
sync.Mutex
|
||||
cache map[string]*AMQPPoster
|
||||
cache2 map[string]*AWSPoster
|
||||
amqpCache map[string]Poster
|
||||
awsCache map[string]Poster
|
||||
sqsCache map[string]Poster
|
||||
}
|
||||
|
||||
func (pc *PosterCache) Close() {
|
||||
for _, v := range pc.amqpCache {
|
||||
v.Close()
|
||||
}
|
||||
for _, v := range pc.awsCache {
|
||||
v.Close()
|
||||
}
|
||||
for _, v := range pc.sqsCache {
|
||||
v.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// GetAMQPPoster creates a new poster only if not already cached
|
||||
// uses dialURL as cache key
|
||||
func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int,
|
||||
fallbackFileDir string) (amqpPoster *AMQPPoster, err error) {
|
||||
func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
if _, hasIt := pc.cache[dialURL]; !hasIt {
|
||||
if _, hasIt := pc.amqpCache[dialURL]; !hasIt {
|
||||
if pstr, err := NewAMQPPoster(dialURL, attempts, fallbackFileDir); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
pc.cache[dialURL] = pstr
|
||||
pc.amqpCache[dialURL] = pstr
|
||||
}
|
||||
}
|
||||
return pc.cache[dialURL], nil
|
||||
return pc.amqpCache[dialURL], nil
|
||||
}
|
||||
|
||||
func (pc *AMQPCachedPosters) GetAWSPoster(dialURL string, attempts int,
|
||||
fallbackFileDir string) (amqpPoster *AWSPoster, err error) {
|
||||
func (pc *PosterCache) GetAWSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
if _, hasIt := pc.cache2[dialURL]; !hasIt {
|
||||
if _, hasIt := pc.awsCache[dialURL]; !hasIt {
|
||||
if pstr, err := NewAWSPoster(dialURL, attempts, fallbackFileDir); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
pc.cache2[dialURL] = pstr
|
||||
pc.awsCache[dialURL] = pstr
|
||||
}
|
||||
}
|
||||
return pc.cache2[dialURL], nil
|
||||
return pc.awsCache[dialURL], nil
|
||||
}
|
||||
|
||||
func (pc *AMQPCachedPosters) PostAMQP(dialURL string, attempts int,
|
||||
func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
if _, hasIt := pc.sqsCache[dialURL]; !hasIt {
|
||||
if pstr, err := NewSQSPoster(dialURL, attempts, fallbackFileDir); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
pc.sqsCache[dialURL] = pstr
|
||||
}
|
||||
}
|
||||
return pc.sqsCache[dialURL], nil
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostAMQP(dialURL string, attempts int,
|
||||
content []byte, contentType, fallbackFileDir, fallbackFileName string) error {
|
||||
amqpPoster, err := pc.GetAMQPPoster(dialURL, attempts, fallbackFileDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var chn *amqp.Channel
|
||||
chn, err = amqpPoster.Post(nil, contentType, content, fallbackFileName)
|
||||
if chn != nil {
|
||||
chn.Close()
|
||||
}
|
||||
return nil
|
||||
return amqpPoster.Post(content, fallbackFileName)
|
||||
}
|
||||
|
||||
func (pc *AMQPCachedPosters) PostAWS(dialURL string, attempts int,
|
||||
func (pc *PosterCache) PostAWS(dialURL string, attempts int,
|
||||
content []byte, fallbackFileDir, fallbackFileName string) error {
|
||||
awsPoster, err := pc.GetAWSPoster(dialURL, attempts, fallbackFileDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var ses *amqpv1.Session
|
||||
ses, err = awsPoster.Post(content, fallbackFileName)
|
||||
if ses != nil {
|
||||
ses.Close(context.Background())
|
||||
return awsPoster.Post(content, fallbackFileName)
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostSQS(dialURL string, attempts int,
|
||||
content []byte, fallbackFileDir, fallbackFileName string) error {
|
||||
sqsPoster, err := pc.GetSQSPoster(dialURL, attempts, fallbackFileDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return sqsPoster.Post(content, fallbackFileName)
|
||||
}
|
||||
|
||||
// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs"
|
||||
@@ -273,25 +338,22 @@ func (pstr *AMQPPoster) parseURL(dialURL string) error {
|
||||
|
||||
// Post is the method being called when we need to post anything in the queue
|
||||
// the optional chn will permits channel caching
|
||||
func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []byte,
|
||||
fallbackFileName string) (rChn *amqp.Channel, err error) {
|
||||
func (pstr *AMQPPoster) Post(content []byte, fallbackFileName string) (err error) {
|
||||
var chn *amqp.Channel
|
||||
fib := utils.Fib()
|
||||
if chn == nil {
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
if chn, err = pstr.NewPostChannel(); err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
if err != nil &&
|
||||
fallbackFileName != utils.META_NONE {
|
||||
utils.Logger.Warning(fmt.Sprintf("<AMQPPoster> creating new post channel, err: %s", err.Error()))
|
||||
err = pstr.writeToFile(fallbackFileName, content)
|
||||
return nil, err
|
||||
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
if chn, err = pstr.newPostChannel(); err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
if fallbackFileName != utils.META_NONE {
|
||||
utils.Logger.Warning(fmt.Sprintf("<AMQPPoster> creating new post channel, err: %s", err.Error()))
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content)
|
||||
}
|
||||
return err
|
||||
}
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
if err = chn.Publish(
|
||||
@@ -301,7 +363,7 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
DeliveryMode: amqp.Persistent,
|
||||
ContentType: contentType,
|
||||
ContentType: utils.CONTENT_JSON,
|
||||
Body: content,
|
||||
}); err == nil {
|
||||
break
|
||||
@@ -309,10 +371,13 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
if err != nil && fallbackFileName != utils.META_NONE {
|
||||
err = pstr.writeToFile(fallbackFileName, content)
|
||||
return nil, err
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content)
|
||||
return err
|
||||
}
|
||||
return chn, nil
|
||||
if chn != nil {
|
||||
chn.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *AMQPPoster) Close() {
|
||||
@@ -324,7 +389,7 @@ func (pstr *AMQPPoster) Close() {
|
||||
pstr.Unlock()
|
||||
}
|
||||
|
||||
func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) {
|
||||
func (pstr *AMQPPoster) newPostChannel() (postChan *amqp.Channel, err error) {
|
||||
pstr.Lock()
|
||||
if pstr.conn == nil {
|
||||
var conn *amqp.Connection
|
||||
@@ -386,27 +451,23 @@ func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// writeToFile writes the content in the file with fileName on amqp.fallbackFileDir
|
||||
func (pstr *AMQPPoster) writeToFile(fileName string, content []byte) (err error) {
|
||||
return writeToFile(pstr.fallbackFileDir, fileName, content)
|
||||
}
|
||||
|
||||
func NewAWSPoster(dialURL string, attempts int, fallbackFileDir string) (*AWSPoster, error) {
|
||||
amqp := &AWSPoster{
|
||||
attempts: attempts,
|
||||
fallbackFileDir: fallbackFileDir,
|
||||
}
|
||||
if err := amqp.parseURL(dialURL); err != nil {
|
||||
func NewAWSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
URL, qID, err := parseURL(dialURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return amqp, nil
|
||||
return &AWSPoster{
|
||||
dialURL: URL,
|
||||
queueID: "/" + qID,
|
||||
attempts: attempts,
|
||||
fallbackFileDir: fallbackFileDir,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type AWSPoster struct {
|
||||
sync.Mutex
|
||||
dialURL string
|
||||
queueID string // identifier of the CDR queue where we publish
|
||||
exchange string
|
||||
attempts int
|
||||
fallbackFileDir string
|
||||
client *amqpv1.Client
|
||||
@@ -421,22 +482,8 @@ func (pstr *AWSPoster) Close() {
|
||||
pstr.Unlock()
|
||||
}
|
||||
|
||||
func (pstr *AWSPoster) parseURL(dialURL string) error {
|
||||
// "amqp+ssl://cgrates:password@host:5671/?queue_id=cgrates_cdrs",
|
||||
u, err := url.Parse(dialURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
qry := u.Query()
|
||||
pstr.dialURL = strings.Split(dialURL, "?")[0]
|
||||
pstr.queueID = "/" + defaultQueueID
|
||||
if vals, has := qry[queueID]; has && len(vals) != 0 {
|
||||
pstr.queueID = "/" + vals[0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pstr *AWSPoster) Post(content []byte, fallbackFileName string) (s *amqpv1.Session, err error) {
|
||||
func (pstr *AWSPoster) Post(content []byte, fallbackFileName string) (err error) {
|
||||
var s *amqpv1.Session
|
||||
fib := utils.Fib()
|
||||
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
@@ -448,9 +495,9 @@ func (pstr *AWSPoster) Post(content []byte, fallbackFileName string) (s *amqpv1.
|
||||
if err != nil {
|
||||
if fallbackFileName != utils.META_NONE {
|
||||
utils.Logger.Warning(fmt.Sprintf("<AWSPoster> creating new post channel, err: %s", err.Error()))
|
||||
err = pstr.writeToFile(fallbackFileName, content)
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content)
|
||||
}
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
@@ -472,10 +519,13 @@ func (pstr *AWSPoster) Post(content []byte, fallbackFileName string) (s *amqpv1.
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
if err != nil && fallbackFileName != utils.META_NONE {
|
||||
err = pstr.writeToFile(fallbackFileName, content)
|
||||
return nil, err
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content)
|
||||
return err
|
||||
}
|
||||
return s, nil
|
||||
if s != nil {
|
||||
s.Close(context.Background())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *AWSPoster) newPosterSession() (s *amqpv1.Session, err error) {
|
||||
@@ -492,21 +542,142 @@ func (pstr *AWSPoster) newPosterSession() (s *amqpv1.Session, err error) {
|
||||
return pstr.client.NewSession()
|
||||
}
|
||||
|
||||
// writeToFile writes the content in the file with fileName on amqp.fallbackFileDir
|
||||
func (pstr *AWSPoster) writeToFile(fileName string, content []byte) (err error) {
|
||||
return writeToFile(pstr.fallbackFileDir, fileName, content)
|
||||
func NewSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
pstr := &SQSPoster{
|
||||
attempts: attempts,
|
||||
fallbackFileDir: fallbackFileDir,
|
||||
}
|
||||
err := pstr.parseURL(dialURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pstr, nil
|
||||
}
|
||||
|
||||
func writeToFile(fileDir, fileName string, content []byte) (err error) {
|
||||
fallbackFilePath := path.Join(fileDir, fileName)
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
fileOut, err := os.Create(fallbackFilePath)
|
||||
type SQSPoster struct {
|
||||
sync.Mutex
|
||||
dialURL string
|
||||
queueID string // identifier of the CDR queue where we publish
|
||||
awsRegion string
|
||||
awsID string
|
||||
awsKey string
|
||||
awsToken string
|
||||
attempts int
|
||||
fallbackFileDir string
|
||||
session *session.Session
|
||||
}
|
||||
|
||||
func (pstr *SQSPoster) Close() {}
|
||||
|
||||
func (pstr *SQSPoster) parseURL(dialURL string) (err error) {
|
||||
u, err := url.Parse(dialURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
qry := u.Query()
|
||||
|
||||
pstr.dialURL = strings.Split(dialURL, "?")[0]
|
||||
pstr.queueID = defaultQueueID
|
||||
if vals, has := qry[queueID]; has && len(vals) != 0 {
|
||||
pstr.queueID = vals[0]
|
||||
}
|
||||
if vals, has := qry[awsRegion]; has && len(vals) != 0 {
|
||||
pstr.awsRegion = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No region present for AWS.")
|
||||
}
|
||||
if vals, has := qry[awsID]; has && len(vals) != 0 {
|
||||
pstr.awsID = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No access key ID present for AWS.")
|
||||
}
|
||||
if vals, has := qry[awsSecret]; has && len(vals) != 0 {
|
||||
pstr.awsKey = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No secret access key present for AWS.")
|
||||
}
|
||||
if vals, has := qry[awsToken]; has && len(vals) != 0 {
|
||||
pstr.awsToken = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No session token present for AWS.")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (pstr *SQSPoster) Post(message []byte, fallbackFileName string) (err error) {
|
||||
var svc *sqs.SQS
|
||||
fib := utils.Fib()
|
||||
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
if svc, err = pstr.newPosterSession(); err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
if err != nil {
|
||||
if fallbackFileName != utils.META_NONE {
|
||||
utils.Logger.Warning(fmt.Sprintf("<SQSPoster> creating new session, err: %s", err.Error()))
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// For CreateQueue
|
||||
if _, err = svc.CreateQueue(&sqs.CreateQueueInput{
|
||||
QueueName: aws.String(pstr.queueID),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
result, err := svc.GetQueueUrl(
|
||||
&sqs.GetQueueUrlInput{
|
||||
QueueName: aws.String(pstr.queueID),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err = svc.SendMessage(
|
||||
&sqs.SendMessageInput{
|
||||
MessageBody: aws.String(string(message)),
|
||||
QueueUrl: aws.String(*result.QueueUrl),
|
||||
},
|
||||
); err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil && fallbackFileName != utils.META_NONE {
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (pstr *SQSPoster) newPosterSession() (s *sqs.SQS, err error) {
|
||||
pstr.Lock()
|
||||
defer pstr.Unlock()
|
||||
if pstr.session == nil {
|
||||
var ses *session.Session
|
||||
cfg := aws.Config{Endpoint: aws.String(pstr.dialURL)}
|
||||
if len(pstr.awsRegion) != 0 {
|
||||
cfg.Region = aws.String(pstr.awsRegion)
|
||||
}
|
||||
if len(pstr.awsID) != 0 &&
|
||||
len(pstr.awsKey) != 0 &&
|
||||
len(pstr.awsToken) != 0 {
|
||||
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
|
||||
}
|
||||
ses, err = session.NewSessionWithOptions(
|
||||
session.Options{
|
||||
Config: cfg,
|
||||
// SharedConfigState: session.SharedConfigEnable,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = fileOut.Write(content)
|
||||
fileOut.Close()
|
||||
return nil, err
|
||||
}, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath)
|
||||
return
|
||||
pstr.session = ses
|
||||
}
|
||||
return sqs.New(pstr.session), nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user