diff --git a/config/config_defaults.go b/config/config_defaults.go
index e0185336a..aa8acd273 100644
--- a/config/config_defaults.go
+++ b/config/config_defaults.go
@@ -627,10 +627,14 @@ const CGRATES_CFG_JSON = `
//SQS
// "sqsQueueID": "cgrates_cdrs", // the queue id for SQS exporters from were the events are exported
+ // "sqsForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY)
+ // "sqsSkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate
// S3
// "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from where the events that are exported
// "s3FolderPath": "", // S3FolderPath
+ // "s3ForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY)
+ // "s3SkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate
// Nats
// "natsJetStream": false, // controls if the nats poster uses the JetStream
diff --git a/config/eescfg.go b/config/eescfg.go
index 65bbc0e35..bf9f7dc9d 100644
--- a/config/eescfg.go
+++ b/config/eescfg.go
@@ -221,13 +221,17 @@ type AMQPOpts struct {
}
type AWSOpts struct {
- Region *string
- Key *string
- Secret *string
- Token *string
- SQSQueueID *string
- S3BucketID *string
- S3FolderPath *string
+ Region *string
+ Key *string
+ Secret *string
+ Token *string
+ SQSQueueID *string
+ SQSForcePathStyle *bool
+ SQSSkipTlsVerify *bool
+ S3BucketID *string
+ S3FolderPath *string
+ S3ForcePathStyle *bool
+ S3SkipTlsVerify *bool
}
type NATSOpts struct {
@@ -484,12 +488,24 @@ func (awsOpts *AWSOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err erro
if jsnCfg.SQSQueueID != nil {
awsOpts.SQSQueueID = jsnCfg.SQSQueueID
}
+ if jsnCfg.SQSForcePathStyle != nil {
+ awsOpts.SQSForcePathStyle = jsnCfg.SQSForcePathStyle
+ }
+ if jsnCfg.SQSSkipTlsVerify != nil {
+ awsOpts.SQSSkipTlsVerify = jsnCfg.SQSSkipTlsVerify
+ }
if jsnCfg.S3BucketID != nil {
awsOpts.S3BucketID = jsnCfg.S3BucketID
}
if jsnCfg.S3FolderPath != nil {
awsOpts.S3FolderPath = jsnCfg.S3FolderPath
}
+ if jsnCfg.S3ForcePathStyle != nil {
+ awsOpts.S3ForcePathStyle = jsnCfg.S3ForcePathStyle
+ }
+ if jsnCfg.S3SkipTlsVerify != nil {
+ awsOpts.S3SkipTlsVerify = jsnCfg.S3SkipTlsVerify
+ }
return
}
func (natsOpts *NATSOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) {
@@ -907,6 +923,14 @@ func (awsOpts *AWSOpts) Clone() *AWSOpts {
cln.SQSQueueID = new(string)
*cln.SQSQueueID = *awsOpts.SQSQueueID
}
+ if awsOpts.SQSForcePathStyle != nil {
+ cln.SQSForcePathStyle = new(bool)
+ *cln.SQSForcePathStyle = *awsOpts.SQSForcePathStyle
+ }
+ if awsOpts.SQSSkipTlsVerify != nil {
+ cln.SQSSkipTlsVerify = new(bool)
+ *cln.SQSSkipTlsVerify = *awsOpts.SQSSkipTlsVerify
+ }
if awsOpts.S3BucketID != nil {
cln.S3BucketID = new(string)
*cln.S3BucketID = *awsOpts.S3BucketID
@@ -915,6 +939,14 @@ func (awsOpts *AWSOpts) Clone() *AWSOpts {
cln.S3FolderPath = new(string)
*cln.S3FolderPath = *awsOpts.S3FolderPath
}
+ if awsOpts.S3ForcePathStyle != nil {
+ cln.S3ForcePathStyle = new(bool)
+ *cln.S3ForcePathStyle = *awsOpts.S3ForcePathStyle
+ }
+ if awsOpts.S3SkipTlsVerify != nil {
+ cln.S3SkipTlsVerify = new(bool)
+ *cln.S3SkipTlsVerify = *awsOpts.S3SkipTlsVerify
+ }
return cln
}
@@ -1233,12 +1265,24 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
if awsOpts.SQSQueueID != nil {
opts[utils.SQSQueueID] = *awsOpts.SQSQueueID
}
+ if awsOpts.SQSForcePathStyle != nil {
+ opts[utils.SQSForcePathStyle] = *awsOpts.SQSForcePathStyle
+ }
+ if awsOpts.SQSSkipTlsVerify != nil {
+ opts[utils.SQSSkipTlsVerify] = *awsOpts.SQSSkipTlsVerify
+ }
if awsOpts.S3BucketID != nil {
opts[utils.S3Bucket] = *awsOpts.S3BucketID
}
if awsOpts.S3FolderPath != nil {
opts[utils.S3FolderPath] = *awsOpts.S3FolderPath
}
+ if awsOpts.S3ForcePathStyle != nil {
+ opts[utils.S3ForcePathStyle] = *awsOpts.S3ForcePathStyle
+ }
+ if awsOpts.S3SkipTlsVerify != nil {
+ opts[utils.S3SkipTlsVerify] = *awsOpts.S3SkipTlsVerify
+ }
}
if natOpts := eeC.Opts.NATS; natOpts != nil {
if natOpts.JetStream != nil {
diff --git a/config/eescfg_test.go b/config/eescfg_test.go
index 03b04e086..56e80aa4a 100644
--- a/config/eescfg_test.go
+++ b/config/eescfg_test.go
@@ -78,7 +78,11 @@ func TestEESClone(t *testing.T) {
"awsKey":"key",
"awsSecret":"secretkey",
"sqsQueueID":"sqsid",
+ "sqsForcePathStyle": true,
+ "sqsSkipTlsVerify": true,
"s3BucketID":"s3",
+ "s3ForcePathStyle": true,
+ "s3SkipTlsVerify": true,
"rpcCodec":"rpc",
"serviceMethod":"service",
"keyPath":"path",
@@ -315,13 +319,17 @@ func TestEESClone(t *testing.T) {
Topic: utils.StringPointer("kafka"),
},
AWS: &AWSOpts{
- Token: utils.StringPointer("token"),
- S3FolderPath: utils.StringPointer("s3"),
- Region: utils.StringPointer("eu"),
- Key: utils.StringPointer("key"),
- Secret: utils.StringPointer("secretkey"),
- S3BucketID: utils.StringPointer("s3"),
- SQSQueueID: utils.StringPointer("sqsid"),
+ Token: utils.StringPointer("token"),
+ S3FolderPath: utils.StringPointer("s3"),
+ Region: utils.StringPointer("eu"),
+ Key: utils.StringPointer("key"),
+ Secret: utils.StringPointer("secretkey"),
+ S3BucketID: utils.StringPointer("s3"),
+ S3ForcePathStyle: utils.BoolPointer(true),
+ S3SkipTlsVerify: utils.BoolPointer(true),
+ SQSQueueID: utils.StringPointer("sqsid"),
+ SQSForcePathStyle: utils.BoolPointer(true),
+ SQSSkipTlsVerify: utils.BoolPointer(true),
},
NATS: &NATSOpts{
JetStream: utils.BoolPointer(true),
@@ -1109,7 +1117,11 @@ func TestEEsCfgAsMapInterface(t *testing.T) {
"awsKey": "key",
"awsSecret": "secretkey",
"sqsQueueID": "sqsid",
+ "sqsForcePathStyle": true,
+ "sqsSkipTlsVerify": true,
"s3BucketID": "s3",
+ "s3ForcePathStyle": true,
+ "s3SkipTlsVerify": true,
"rpcCodec": "rpc",
"serviceMethod": "service",
"keyPath": "path",
@@ -1265,7 +1277,11 @@ func TestEEsCfgAsMapInterface(t *testing.T) {
utils.AWSKey: "key",
utils.AWSSecret: "secretkey",
utils.SQSQueueID: "sqsid",
+ utils.SQSForcePathStyle: true,
+ utils.SQSSkipTlsVerify: true,
utils.S3Bucket: "s3",
+ utils.S3ForcePathStyle: true,
+ utils.S3SkipTlsVerify: true,
utils.RpcCodec: "rpc",
utils.ServiceMethod: "service",
utils.KeyPath: "path",
@@ -1419,8 +1435,12 @@ func TestEEsCfgloadFromJSONCfg(t *testing.T) {
AWSSecret: &str,
AWSToken: &str,
SQSQueueID: &str,
+ SQSForcePathStyle: &bl,
+ SQSSkipTlsVerify: &bl,
S3BucketID: &str,
S3FolderPath: &str,
+ S3ForcePathStyle: &bl,
+ S3SkipTlsVerify: &bl,
NATSJetStream: &bl,
NATSSubject: &str,
NATSJWTFile: &str,
diff --git a/config/libconfig_json.go b/config/libconfig_json.go
index 10d2a849f..97cd463b9 100644
--- a/config/libconfig_json.go
+++ b/config/libconfig_json.go
@@ -370,8 +370,12 @@ type EventExporterOptsJson struct {
AWSSecret *string `json:"awsSecret"`
AWSToken *string `json:"awsToken"`
SQSQueueID *string `json:"sqsQueueID"`
+ SQSForcePathStyle *bool `json:"sqsForcePathStyle"`
+ SQSSkipTlsVerify *bool `json:"sqsSkipTlsVerify"`
S3BucketID *string `json:"s3BucketID"`
S3FolderPath *string `json:"s3FolderPath"`
+ S3ForcePathStyle *bool `json:"s3ForcePathStyle"`
+ S3SkipTlsVerify *bool `json:"s3SkipTlsVerify"`
NATSJetStream *bool `json:"natsJetStream"`
NATSSubject *string `json:"natsSubject"`
NATSJWTFile *string `json:"natsJWTFile"`
diff --git a/data/conf/samples/ees_cloud/cgrates.json b/data/conf/samples/ees_cloud/cgrates.json
index 0c498cfd0..fc268a7f8 100644
--- a/data/conf/samples/ees_cloud/cgrates.json
+++ b/data/conf/samples/ees_cloud/cgrates.json
@@ -54,6 +54,25 @@
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
]
},
+ {
+ "id": "s3_nuantix_test_file",
+ "type": "*s3_json_map",
+ "export_path": "s3.eu-central-1.amazonaws.com",
+ "opts": {
+ "awsRegion": "eu-central-1",
+ "awsKey": "set-using-flags",
+ "awsSecret": "set-using-flags",
+ "s3BucketID": "cgrates-cdrs",
+ "s3ForcePathStyle": true,
+ "s3SkipTlsVerify": true
+ },
+ "attempts": 1,
+ "failed_posts_dir": "/var/spool/cgrates/failed_posts2",
+ "synchronous": true,
+ "fields":[
+ {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
+ ]
+ },
{
"id": "amqpv1_test_file",
"type": "*amqpv1_json_map",
diff --git a/ees/s3.go b/ees/s3.go
index 6e5864678..9cdb38678 100644
--- a/ees/s3.go
+++ b/ees/s3.go
@@ -20,7 +20,9 @@ package ees
import (
"bytes"
+ "crypto/tls"
"fmt"
+ "net/http"
"sync"
"github.com/aws/aws-sdk-go/aws"
@@ -44,14 +46,16 @@ func NewS3EE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *S3EE {
// S3EE is a s3 poster
type S3EE struct {
- awsRegion string
- awsID string
- awsKey string
- awsToken string
- bucket string
- folderPath string
- session *session.Session
- up *s3manager.Uploader
+ awsRegion string
+ awsID string
+ awsKey string
+ awsToken string
+ bucket string
+ folderPath string
+ forcePathStyle bool
+ skipTlsVerify bool
+ session *session.Session
+ up *s3manager.Uploader
cfg *config.EventExporterCfg
em *utils.ExporterMetrics
@@ -81,6 +85,12 @@ func (pstr *S3EE) parseOpts(opts *config.EventExporterOpts) {
if s3Opts.Token != nil {
pstr.awsToken = *s3Opts.Token
}
+ if s3Opts.S3ForcePathStyle != nil {
+ pstr.forcePathStyle = *s3Opts.S3ForcePathStyle
+ }
+ if s3Opts.S3SkipTlsVerify != nil {
+ pstr.skipTlsVerify = *s3Opts.S3SkipTlsVerify
+ }
}
}
@@ -98,6 +108,18 @@ func (pstr *S3EE) Connect() (err error) {
len(pstr.awsKey) != 0 {
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
}
+ if pstr.forcePathStyle {
+ cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints
+ }
+ if pstr.skipTlsVerify {
+ cfg.HTTPClient = &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates
+ },
+ },
+ }
+ }
pstr.session, err = session.NewSessionWithOptions(
session.Options{
Config: cfg,
diff --git a/ees/s3_it_test.go b/ees/s3_it_test.go
index e9d4c860a..d0415fee9 100644
--- a/ees/s3_it_test.go
+++ b/ees/s3_it_test.go
@@ -22,8 +22,10 @@ along with this program. If not, see
package ees
import (
+ "crypto/tls"
"flag"
"fmt"
+ "net/http"
"path"
"testing"
"time"
@@ -185,3 +187,112 @@ func testS3VerifyExport(t *testing.T) {
t.Errorf("Expected: %q, received: %q", expected, rply)
}
}
+
+var (
+ runS3NuantixTest = flag.Bool("nuantix", false, "Run the integration test for the S3 exporter from Nuantix")
+
+ sTestsS3Nuantix = []func(t *testing.T){
+ testS3LoadConfig,
+ testS3ResetDataDB,
+ testS3ResetStorDb,
+ testS3StartEngine,
+ testS3RPCConn,
+ testS3NuantixExportEvent,
+ testS3NuantixVerifyExport,
+ testStopCgrEngine,
+ }
+)
+
+func TestS3ExportNuantix(t *testing.T) {
+ if !*runS3NuantixTest {
+ t.SkipNow()
+ }
+ s3ConfDir = "ees_cloud"
+ for _, stest := range sTestsS3Nuantix {
+ t.Run(s3ConfDir, stest)
+ }
+}
+
+func testS3NuantixExportEvent(t *testing.T) {
+ cgrID = utils.Sha1("abcdefg", time.Unix(1383813745, 0).UTC().String())
+ t.Log(cgrID)
+ ev := &engine.CGREventWithEeIDs{
+ EeIDs: []string{"s3_nuantix_test_file"},
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "dataEvent",
+ Time: utils.TimePointer(time.Now()),
+ Event: map[string]any{
+ utils.CGRID: cgrID,
+ utils.ToR: utils.MetaData,
+ utils.OriginID: "abcdefg",
+ utils.OriginHost: "192.168.1.1",
+ utils.RequestType: utils.MetaRated,
+ utils.Tenant: "AnotherTenant",
+ utils.Category: "call", //for data CDR use different Tenant
+ utils.AccountField: "1001",
+ utils.Subject: "1001",
+ utils.Destination: "1002",
+ utils.SetupTime: time.Unix(1383813745, 0).UTC(),
+ utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
+ utils.Usage: 10 * time.Nanosecond,
+ utils.RunID: utils.MetaDefault,
+ utils.Cost: 0.012,
+ },
+ },
+ }
+
+ var reply map[string]utils.MapStorage
+ if err := s3RPC.Call(context.Background(), utils.EeSv1ProcessEvent, ev, &reply); err != nil {
+ t.Error(err)
+ }
+ time.Sleep(2 * time.Second)
+}
+
+func testS3NuantixVerifyExport(t *testing.T) {
+ endpoint := "s3.eu-central-1.amazonaws.com"
+ region := "eu-central-1"
+ qname := "cgrates-cdrs"
+
+ key := fmt.Sprintf("%s/%s:%s.json", "", cgrID, utils.MetaDefault)
+
+ var sess *session.Session
+ cfg := aws.Config{Endpoint: aws.String(endpoint)}
+ cfg.Region = aws.String(region)
+
+ cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "")
+ var err error
+ cfg.S3ForcePathStyle = aws.Bool(true)
+ cfg.HTTPClient = &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true,
+ },
+ },
+ }
+ sess, err = session.NewSessionWithOptions(
+ session.Options{
+ Config: cfg,
+ },
+ )
+ if err != nil {
+ t.Error(err)
+ }
+ s3Clnt := s3.New(sess)
+ s3Clnt.DeleteObject(&s3.DeleteObjectInput{})
+ file := aws.NewWriteAtBuffer([]byte{})
+ svc := s3manager.NewDownloader(sess)
+
+ if _, err = svc.Download(file,
+ &s3.GetObjectInput{
+ Bucket: aws.String(qname),
+ Key: aws.String(key),
+ }); err != nil {
+ t.Fatalf("Unable to download item %v", err)
+ }
+
+ expected := `{"Account":"1001","CGRID":"5a5ff7c40039976e1c2bd303dee45983267f6ed2","Category":"call","Destination":"1002","OriginID":"abcdefg","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"AnotherTenant","ToR":"*data"}`
+ if rply := string(file.Bytes()); rply != expected {
+ t.Errorf("Expected: %q, received: %q", expected, rply)
+ }
+}
diff --git a/ees/s3_test.go b/ees/s3_test.go
index a807eaa0e..2cd955c42 100644
--- a/ees/s3_test.go
+++ b/ees/s3_test.go
@@ -66,25 +66,31 @@ func TestParseOptsAllFieldsSet(t *testing.T) {
key := "id"
secret := "key"
token := "token"
+ fps := true
+ stv := true
opts := &config.EventExporterOpts{
AWS: &config.AWSOpts{
- S3BucketID: &bucketID,
- S3FolderPath: &folderPath,
- Region: ®ion,
- Key: &key,
- Secret: &secret,
- Token: &token,
+ S3BucketID: &bucketID,
+ S3FolderPath: &folderPath,
+ Region: ®ion,
+ Key: &key,
+ Secret: &secret,
+ Token: &token,
+ S3ForcePathStyle: &fps,
+ S3SkipTlsVerify: &stv,
},
}
expected := S3EE{
- bucket: bucketID,
- folderPath: folderPath,
- awsRegion: region,
- awsID: key,
- awsKey: secret,
- awsToken: token,
+ bucket: bucketID,
+ folderPath: folderPath,
+ awsRegion: region,
+ awsID: key,
+ awsKey: secret,
+ awsToken: token,
+ forcePathStyle: true,
+ skipTlsVerify: true,
}
s3ee := &S3EE{}
@@ -107,4 +113,10 @@ func TestParseOptsAllFieldsSet(t *testing.T) {
if s3ee.awsToken != expected.awsToken {
t.Errorf("Expected awsToken %s, got %s", expected.awsToken, s3ee.awsToken)
}
+ if s3ee.forcePathStyle != expected.forcePathStyle {
+ t.Errorf("Expected forcePathStyle %v, got %v", expected.forcePathStyle, s3ee.forcePathStyle)
+ }
+ if s3ee.skipTlsVerify != expected.skipTlsVerify {
+ t.Errorf("Expected skipTLSVerify %v, got %v", expected.skipTlsVerify, s3ee.skipTlsVerify)
+ }
}
diff --git a/ees/sqs.go b/ees/sqs.go
index 116b85eb1..7c619091c 100644
--- a/ees/sqs.go
+++ b/ees/sqs.go
@@ -19,6 +19,8 @@ along with this program. If not, see
package ees
import (
+ "crypto/tls"
+ "net/http"
"sync"
"github.com/aws/aws-sdk-go/aws"
@@ -43,14 +45,16 @@ func NewSQSee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *SQSee {
// SQSee is a poster for sqs
type SQSee struct {
- awsRegion string
- awsID string
- awsKey string
- awsToken string
- queueURL *string
- queueID string
- session *session.Session
- svc *sqs.SQS
+ awsRegion string
+ awsID string
+ awsKey string
+ awsToken string
+ queueURL *string
+ queueID string
+ forcePathStyle bool
+ skipTlsVerify bool
+ session *session.Session
+ svc *sqs.SQS
cfg *config.EventExporterCfg
em *utils.ExporterMetrics
@@ -77,6 +81,12 @@ func (pstr *SQSee) parseOpts(opts *config.EventExporterOpts) {
if sqsOpts.Token != nil {
pstr.awsToken = *sqsOpts.Token
}
+ if sqsOpts.SQSForcePathStyle != nil {
+ pstr.forcePathStyle = *sqsOpts.SQSForcePathStyle
+ }
+ if sqsOpts.SQSSkipTlsVerify != nil {
+ pstr.skipTlsVerify = *sqsOpts.SQSSkipTlsVerify
+ }
}
}
@@ -94,6 +104,18 @@ func (pstr *SQSee) Connect() (err error) {
len(pstr.awsKey) != 0 {
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
}
+ if pstr.forcePathStyle {
+ cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints
+ }
+ if pstr.skipTlsVerify {
+ cfg.HTTPClient = &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates
+ },
+ },
+ }
+ }
pstr.session, err = session.NewSessionWithOptions(
session.Options{
Config: cfg,
diff --git a/utils/consts.go b/utils/consts.go
index b02ade0a5..4df816ca4 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -3056,11 +3056,15 @@ const (
AWSToken = "awsToken"
// sqs
- SQSQueueID = "sqsQueueID"
+ SQSQueueID = "sqsQueueID"
+ SQSForcePathStyle = "sqsForcePathStyle"
+ SQSSkipTlsVerify = "sqsSkipTlsVerify"
// s3
- S3Bucket = "s3BucketID"
- S3FolderPath = "s3FolderPath"
+ S3Bucket = "s3BucketID"
+ S3FolderPath = "s3FolderPath"
+ S3ForcePathStyle = "s3ForcePathStyle"
+ S3SkipTlsVerify = "s3SkipTlsVerify"
// sql
SQLDefaultDBName = "cgrates"