Files
cgrates/ees/sqs.go
2025-12-05 13:16:47 +01:00

173 lines
4.4 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
*/
package ees
import (
"crypto/tls"
"net/http"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
// NewSQSee creates a poster for sqs
func NewSQSee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *SQSee {
pstr := &SQSee{
cfg: cfg,
em: em,
reqs: newConcReq(cfg.ConcurrentRequests),
}
pstr.parseOpts(cfg.Opts)
return pstr
}
// SQSee is a poster for sqs
type SQSee struct {
awsRegion string
awsID string
awsKey string
awsToken string
queueURL *string
queueID string
forcePathStyle bool
skipTlsVerify bool
session *session.Session
svc *sqs.SQS
cfg *config.EventExporterCfg
em *utils.ExporterMetrics
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
}
func (pstr *SQSee) parseOpts(opts *config.EventExporterOpts) {
if sqsOpts := opts.AWS; sqsOpts != nil {
pstr.queueID = utils.DefaultQueueID
if sqsOpts.SQSQueueID != nil {
pstr.queueID = *sqsOpts.SQSQueueID
}
if sqsOpts.Region != nil {
pstr.awsRegion = *sqsOpts.Region
}
if sqsOpts.Key != nil {
pstr.awsID = *sqsOpts.Key
}
if sqsOpts.Secret != nil {
pstr.awsKey = *sqsOpts.Secret
}
if sqsOpts.Token != nil {
pstr.awsToken = *sqsOpts.Token
}
if sqsOpts.SQSForcePathStyle != nil {
pstr.forcePathStyle = *sqsOpts.SQSForcePathStyle
}
if sqsOpts.SQSSkipTlsVerify != nil {
pstr.skipTlsVerify = *sqsOpts.SQSSkipTlsVerify
}
}
}
func (pstr *SQSee) Cfg() *config.EventExporterCfg { return pstr.cfg }
func (pstr *SQSee) Connect() (err error) {
pstr.Lock()
defer pstr.Unlock()
if pstr.session == nil {
cfg := aws.Config{Endpoint: aws.String(pstr.Cfg().ExportPath)}
if len(pstr.awsRegion) != 0 {
cfg.Region = aws.String(pstr.awsRegion)
}
if len(pstr.awsID) != 0 &&
len(pstr.awsKey) != 0 {
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
}
if pstr.forcePathStyle {
cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints
}
if pstr.skipTlsVerify {
cfg.HTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates
},
},
}
}
pstr.session, err = session.NewSessionWithOptions(
session.Options{
Config: cfg,
},
)
if err != nil {
return
}
}
if pstr.svc == nil {
pstr.svc = sqs.New(pstr.session)
}
if pstr.queueURL != nil {
return
}
var result *sqs.GetQueueUrlOutput
if result, err = pstr.svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(pstr.queueID),
}); err == nil {
pstr.queueURL = new(string)
*(pstr.queueURL) = *result.QueueUrl
return
}
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist {
// For CreateQueue
var createResult *sqs.CreateQueueOutput
if createResult, err = pstr.svc.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(pstr.queueID),
}); err == nil {
pstr.queueURL = new(string)
*(pstr.queueURL) = *createResult.QueueUrl
return
}
}
return
}
func (pstr *SQSee) ExportEvent(message any, _ string) (err error) {
pstr.reqs.get()
pstr.RLock()
_, err = pstr.svc.SendMessage(
&sqs.SendMessageInput{
MessageBody: aws.String(string(message.([]byte))),
QueueUrl: pstr.queueURL,
},
)
pstr.RUnlock()
pstr.reqs.done()
return
}
func (pstr *SQSee) Close() (_ error) { return }
func (pstr *SQSee) GetMetrics() *utils.ExporterMetrics { return pstr.em }