Add options to support non-Amazon S3 and SQS

This commit is contained in:
arberkatellari
2025-12-04 18:17:22 +02:00
committed by Dan Christian Bogos
parent eb372148d1
commit a8895a6a5a
10 changed files with 307 additions and 45 deletions

View File

@@ -627,10 +627,14 @@ const CGRATES_CFG_JSON = `
//SQS
// "sqsQueueID": "cgrates_cdrs", // the queue id for SQS exporters from were the events are exported
// "sqsForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY)
// "sqsSkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate
// S3
// "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from where the events that are exported
// "s3FolderPath": "", // S3FolderPath
// "s3ForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY)
// "s3SkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate
// Nats
// "natsJetStream": false, // controls if the nats poster uses the JetStream

View File

@@ -226,8 +226,12 @@ type AWSOpts struct {
Secret *string
Token *string
SQSQueueID *string
SQSForcePathStyle *bool
SQSSkipTlsVerify *bool
S3BucketID *string
S3FolderPath *string
S3ForcePathStyle *bool
S3SkipTlsVerify *bool
}
type NATSOpts struct {
@@ -484,12 +488,24 @@ func (awsOpts *AWSOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err erro
if jsnCfg.SQSQueueID != nil {
awsOpts.SQSQueueID = jsnCfg.SQSQueueID
}
if jsnCfg.SQSForcePathStyle != nil {
awsOpts.SQSForcePathStyle = jsnCfg.SQSForcePathStyle
}
if jsnCfg.SQSSkipTlsVerify != nil {
awsOpts.SQSSkipTlsVerify = jsnCfg.SQSSkipTlsVerify
}
if jsnCfg.S3BucketID != nil {
awsOpts.S3BucketID = jsnCfg.S3BucketID
}
if jsnCfg.S3FolderPath != nil {
awsOpts.S3FolderPath = jsnCfg.S3FolderPath
}
if jsnCfg.S3ForcePathStyle != nil {
awsOpts.S3ForcePathStyle = jsnCfg.S3ForcePathStyle
}
if jsnCfg.S3SkipTlsVerify != nil {
awsOpts.S3SkipTlsVerify = jsnCfg.S3SkipTlsVerify
}
return
}
func (natsOpts *NATSOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) {
@@ -907,6 +923,14 @@ func (awsOpts *AWSOpts) Clone() *AWSOpts {
cln.SQSQueueID = new(string)
*cln.SQSQueueID = *awsOpts.SQSQueueID
}
if awsOpts.SQSForcePathStyle != nil {
cln.SQSForcePathStyle = new(bool)
*cln.SQSForcePathStyle = *awsOpts.SQSForcePathStyle
}
if awsOpts.SQSSkipTlsVerify != nil {
cln.SQSSkipTlsVerify = new(bool)
*cln.SQSSkipTlsVerify = *awsOpts.SQSSkipTlsVerify
}
if awsOpts.S3BucketID != nil {
cln.S3BucketID = new(string)
*cln.S3BucketID = *awsOpts.S3BucketID
@@ -915,6 +939,14 @@ func (awsOpts *AWSOpts) Clone() *AWSOpts {
cln.S3FolderPath = new(string)
*cln.S3FolderPath = *awsOpts.S3FolderPath
}
if awsOpts.S3ForcePathStyle != nil {
cln.S3ForcePathStyle = new(bool)
*cln.S3ForcePathStyle = *awsOpts.S3ForcePathStyle
}
if awsOpts.S3SkipTlsVerify != nil {
cln.S3SkipTlsVerify = new(bool)
*cln.S3SkipTlsVerify = *awsOpts.S3SkipTlsVerify
}
return cln
}
@@ -1233,12 +1265,24 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
if awsOpts.SQSQueueID != nil {
opts[utils.SQSQueueID] = *awsOpts.SQSQueueID
}
if awsOpts.SQSForcePathStyle != nil {
opts[utils.SQSForcePathStyle] = *awsOpts.SQSForcePathStyle
}
if awsOpts.SQSSkipTlsVerify != nil {
opts[utils.SQSSkipTlsVerify] = *awsOpts.SQSSkipTlsVerify
}
if awsOpts.S3BucketID != nil {
opts[utils.S3Bucket] = *awsOpts.S3BucketID
}
if awsOpts.S3FolderPath != nil {
opts[utils.S3FolderPath] = *awsOpts.S3FolderPath
}
if awsOpts.S3ForcePathStyle != nil {
opts[utils.S3ForcePathStyle] = *awsOpts.S3ForcePathStyle
}
if awsOpts.S3SkipTlsVerify != nil {
opts[utils.S3SkipTlsVerify] = *awsOpts.S3SkipTlsVerify
}
}
if natOpts := eeC.Opts.NATS; natOpts != nil {
if natOpts.JetStream != nil {

View File

@@ -78,7 +78,11 @@ func TestEESClone(t *testing.T) {
"awsKey":"key",
"awsSecret":"secretkey",
"sqsQueueID":"sqsid",
"sqsForcePathStyle": true,
"sqsSkipTlsVerify": true,
"s3BucketID":"s3",
"s3ForcePathStyle": true,
"s3SkipTlsVerify": true,
"rpcCodec":"rpc",
"serviceMethod":"service",
"keyPath":"path",
@@ -321,7 +325,11 @@ func TestEESClone(t *testing.T) {
Key: utils.StringPointer("key"),
Secret: utils.StringPointer("secretkey"),
S3BucketID: utils.StringPointer("s3"),
S3ForcePathStyle: utils.BoolPointer(true),
S3SkipTlsVerify: utils.BoolPointer(true),
SQSQueueID: utils.StringPointer("sqsid"),
SQSForcePathStyle: utils.BoolPointer(true),
SQSSkipTlsVerify: utils.BoolPointer(true),
},
NATS: &NATSOpts{
JetStream: utils.BoolPointer(true),
@@ -1109,7 +1117,11 @@ func TestEEsCfgAsMapInterface(t *testing.T) {
"awsKey": "key",
"awsSecret": "secretkey",
"sqsQueueID": "sqsid",
"sqsForcePathStyle": true,
"sqsSkipTlsVerify": true,
"s3BucketID": "s3",
"s3ForcePathStyle": true,
"s3SkipTlsVerify": true,
"rpcCodec": "rpc",
"serviceMethod": "service",
"keyPath": "path",
@@ -1265,7 +1277,11 @@ func TestEEsCfgAsMapInterface(t *testing.T) {
utils.AWSKey: "key",
utils.AWSSecret: "secretkey",
utils.SQSQueueID: "sqsid",
utils.SQSForcePathStyle: true,
utils.SQSSkipTlsVerify: true,
utils.S3Bucket: "s3",
utils.S3ForcePathStyle: true,
utils.S3SkipTlsVerify: true,
utils.RpcCodec: "rpc",
utils.ServiceMethod: "service",
utils.KeyPath: "path",
@@ -1419,8 +1435,12 @@ func TestEEsCfgloadFromJSONCfg(t *testing.T) {
AWSSecret: &str,
AWSToken: &str,
SQSQueueID: &str,
SQSForcePathStyle: &bl,
SQSSkipTlsVerify: &bl,
S3BucketID: &str,
S3FolderPath: &str,
S3ForcePathStyle: &bl,
S3SkipTlsVerify: &bl,
NATSJetStream: &bl,
NATSSubject: &str,
NATSJWTFile: &str,

View File

@@ -370,8 +370,12 @@ type EventExporterOptsJson struct {
AWSSecret *string `json:"awsSecret"`
AWSToken *string `json:"awsToken"`
SQSQueueID *string `json:"sqsQueueID"`
SQSForcePathStyle *bool `json:"sqsForcePathStyle"`
SQSSkipTlsVerify *bool `json:"sqsSkipTlsVerify"`
S3BucketID *string `json:"s3BucketID"`
S3FolderPath *string `json:"s3FolderPath"`
S3ForcePathStyle *bool `json:"s3ForcePathStyle"`
S3SkipTlsVerify *bool `json:"s3SkipTlsVerify"`
NATSJetStream *bool `json:"natsJetStream"`
NATSSubject *string `json:"natsSubject"`
NATSJWTFile *string `json:"natsJWTFile"`

View File

@@ -54,6 +54,25 @@
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
]
},
{
"id": "s3_nuantix_test_file",
"type": "*s3_json_map",
"export_path": "s3.eu-central-1.amazonaws.com",
"opts": {
"awsRegion": "eu-central-1",
"awsKey": "set-using-flags",
"awsSecret": "set-using-flags",
"s3BucketID": "cgrates-cdrs",
"s3ForcePathStyle": true,
"s3SkipTlsVerify": true
},
"attempts": 1,
"failed_posts_dir": "/var/spool/cgrates/failed_posts2",
"synchronous": true,
"fields":[
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
]
},
{
"id": "amqpv1_test_file",
"type": "*amqpv1_json_map",

View File

@@ -20,7 +20,9 @@ package ees
import (
"bytes"
"crypto/tls"
"fmt"
"net/http"
"sync"
"github.com/aws/aws-sdk-go/aws"
@@ -50,6 +52,8 @@ type S3EE struct {
awsToken string
bucket string
folderPath string
forcePathStyle bool
skipTlsVerify bool
session *session.Session
up *s3manager.Uploader
@@ -81,6 +85,12 @@ func (pstr *S3EE) parseOpts(opts *config.EventExporterOpts) {
if s3Opts.Token != nil {
pstr.awsToken = *s3Opts.Token
}
if s3Opts.S3ForcePathStyle != nil {
pstr.forcePathStyle = *s3Opts.S3ForcePathStyle
}
if s3Opts.S3SkipTlsVerify != nil {
pstr.skipTlsVerify = *s3Opts.S3SkipTlsVerify
}
}
}
@@ -98,6 +108,18 @@ func (pstr *S3EE) Connect() (err error) {
len(pstr.awsKey) != 0 {
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
}
if pstr.forcePathStyle {
cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints
}
if pstr.skipTlsVerify {
cfg.HTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates
},
},
}
}
pstr.session, err = session.NewSessionWithOptions(
session.Options{
Config: cfg,

View File

@@ -22,8 +22,10 @@ along with this program. If not, see <https://www.gnu.org/licenses/>
package ees
import (
"crypto/tls"
"flag"
"fmt"
"net/http"
"path"
"testing"
"time"
@@ -185,3 +187,112 @@ func testS3VerifyExport(t *testing.T) {
t.Errorf("Expected: %q, received: %q", expected, rply)
}
}
var (
runS3NuantixTest = flag.Bool("nuantix", false, "Run the integration test for the S3 exporter from Nuantix")
sTestsS3Nuantix = []func(t *testing.T){
testS3LoadConfig,
testS3ResetDataDB,
testS3ResetStorDb,
testS3StartEngine,
testS3RPCConn,
testS3NuantixExportEvent,
testS3NuantixVerifyExport,
testStopCgrEngine,
}
)
func TestS3ExportNuantix(t *testing.T) {
if !*runS3NuantixTest {
t.SkipNow()
}
s3ConfDir = "ees_cloud"
for _, stest := range sTestsS3Nuantix {
t.Run(s3ConfDir, stest)
}
}
func testS3NuantixExportEvent(t *testing.T) {
cgrID = utils.Sha1("abcdefg", time.Unix(1383813745, 0).UTC().String())
t.Log(cgrID)
ev := &engine.CGREventWithEeIDs{
EeIDs: []string{"s3_nuantix_test_file"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "dataEvent",
Time: utils.TimePointer(time.Now()),
Event: map[string]any{
utils.CGRID: cgrID,
utils.ToR: utils.MetaData,
utils.OriginID: "abcdefg",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.MetaRated,
utils.Tenant: "AnotherTenant",
utils.Category: "call", //for data CDR use different Tenant
utils.AccountField: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
utils.Usage: 10 * time.Nanosecond,
utils.RunID: utils.MetaDefault,
utils.Cost: 0.012,
},
},
}
var reply map[string]utils.MapStorage
if err := s3RPC.Call(context.Background(), utils.EeSv1ProcessEvent, ev, &reply); err != nil {
t.Error(err)
}
time.Sleep(2 * time.Second)
}
func testS3NuantixVerifyExport(t *testing.T) {
endpoint := "s3.eu-central-1.amazonaws.com"
region := "eu-central-1"
qname := "cgrates-cdrs"
key := fmt.Sprintf("%s/%s:%s.json", "", cgrID, utils.MetaDefault)
var sess *session.Session
cfg := aws.Config{Endpoint: aws.String(endpoint)}
cfg.Region = aws.String(region)
cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "")
var err error
cfg.S3ForcePathStyle = aws.Bool(true)
cfg.HTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
sess, err = session.NewSessionWithOptions(
session.Options{
Config: cfg,
},
)
if err != nil {
t.Error(err)
}
s3Clnt := s3.New(sess)
s3Clnt.DeleteObject(&s3.DeleteObjectInput{})
file := aws.NewWriteAtBuffer([]byte{})
svc := s3manager.NewDownloader(sess)
if _, err = svc.Download(file,
&s3.GetObjectInput{
Bucket: aws.String(qname),
Key: aws.String(key),
}); err != nil {
t.Fatalf("Unable to download item %v", err)
}
expected := `{"Account":"1001","CGRID":"5a5ff7c40039976e1c2bd303dee45983267f6ed2","Category":"call","Destination":"1002","OriginID":"abcdefg","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"AnotherTenant","ToR":"*data"}`
if rply := string(file.Bytes()); rply != expected {
t.Errorf("Expected: %q, received: %q", expected, rply)
}
}

View File

@@ -66,6 +66,8 @@ func TestParseOptsAllFieldsSet(t *testing.T) {
key := "id"
secret := "key"
token := "token"
fps := true
stv := true
opts := &config.EventExporterOpts{
AWS: &config.AWSOpts{
@@ -75,6 +77,8 @@ func TestParseOptsAllFieldsSet(t *testing.T) {
Key: &key,
Secret: &secret,
Token: &token,
S3ForcePathStyle: &fps,
S3SkipTlsVerify: &stv,
},
}
@@ -85,6 +89,8 @@ func TestParseOptsAllFieldsSet(t *testing.T) {
awsID: key,
awsKey: secret,
awsToken: token,
forcePathStyle: true,
skipTlsVerify: true,
}
s3ee := &S3EE{}
@@ -107,4 +113,10 @@ func TestParseOptsAllFieldsSet(t *testing.T) {
if s3ee.awsToken != expected.awsToken {
t.Errorf("Expected awsToken %s, got %s", expected.awsToken, s3ee.awsToken)
}
if s3ee.forcePathStyle != expected.forcePathStyle {
t.Errorf("Expected forcePathStyle %v, got %v", expected.forcePathStyle, s3ee.forcePathStyle)
}
if s3ee.skipTlsVerify != expected.skipTlsVerify {
t.Errorf("Expected skipTLSVerify %v, got %v", expected.skipTlsVerify, s3ee.skipTlsVerify)
}
}

View File

@@ -19,6 +19,8 @@ along with this program. If not, see <https://www.gnu.org/licenses/>
package ees
import (
"crypto/tls"
"net/http"
"sync"
"github.com/aws/aws-sdk-go/aws"
@@ -49,6 +51,8 @@ type SQSee struct {
awsToken string
queueURL *string
queueID string
forcePathStyle bool
skipTlsVerify bool
session *session.Session
svc *sqs.SQS
@@ -77,6 +81,12 @@ func (pstr *SQSee) parseOpts(opts *config.EventExporterOpts) {
if sqsOpts.Token != nil {
pstr.awsToken = *sqsOpts.Token
}
if sqsOpts.SQSForcePathStyle != nil {
pstr.forcePathStyle = *sqsOpts.SQSForcePathStyle
}
if sqsOpts.SQSSkipTlsVerify != nil {
pstr.skipTlsVerify = *sqsOpts.SQSSkipTlsVerify
}
}
}
@@ -94,6 +104,18 @@ func (pstr *SQSee) Connect() (err error) {
len(pstr.awsKey) != 0 {
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
}
if pstr.forcePathStyle {
cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints
}
if pstr.skipTlsVerify {
cfg.HTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates
},
},
}
}
pstr.session, err = session.NewSessionWithOptions(
session.Options{
Config: cfg,

View File

@@ -3057,10 +3057,14 @@ const (
// sqs
SQSQueueID = "sqsQueueID"
SQSForcePathStyle = "sqsForcePathStyle"
SQSSkipTlsVerify = "sqsSkipTlsVerify"
// s3
S3Bucket = "s3BucketID"
S3FolderPath = "s3FolderPath"
S3ForcePathStyle = "s3ForcePathStyle"
S3SkipTlsVerify = "s3SkipTlsVerify"
// sql
SQLDefaultDBName = "cgrates"