diff --git a/config/config_defaults.go b/config/config_defaults.go index 05bcafb45..1eed48b2c 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -477,9 +477,13 @@ const CGRATES_CFG_JSON = ` // SQS // "sqsQueueID": "cgrates_cdrs", // the queue id for SQS readers from were the events are read + // "sqsForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY) + // "sqsSkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate // S3 // "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from were the events are read + // "s3ForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY) + // "s3SkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate // nats // "natsJetStream": false, // controls if the nats reader uses the JetStream diff --git a/config/eescfg.go b/config/eescfg.go index aee414927..3e9e8923b 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -210,8 +210,12 @@ type EventExporterOpts struct { AWSSecret *string AWSToken *string SQSQueueID *string + SQSForcePathStyle *bool + SQSSkipTlsVerify *bool S3BucketID *string S3FolderPath *string + S3ForcePathStyle *bool + S3SkipTlsVerify *bool NATSJetStream *bool NATSSubject *string NATSJWTFile *string @@ -435,12 +439,24 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) if jsnCfg.SQSQueueID != nil { eeOpts.SQSQueueID = jsnCfg.SQSQueueID } + if jsnCfg.SQSForcePathStyle != nil { + eeOpts.SQSForcePathStyle = jsnCfg.SQSForcePathStyle + } + if jsnCfg.SQSSkipTlsVerify != nil { + eeOpts.SQSSkipTlsVerify = jsnCfg.SQSSkipTlsVerify + } if jsnCfg.S3BucketID != nil { eeOpts.S3BucketID = jsnCfg.S3BucketID } if jsnCfg.S3FolderPath != nil { eeOpts.S3FolderPath = jsnCfg.S3FolderPath } + if jsnCfg.S3ForcePathStyle != nil { + eeOpts.S3ForcePathStyle = jsnCfg.S3ForcePathStyle + } + if jsnCfg.S3SkipTlsVerify != nil { + eeOpts.S3SkipTlsVerify = jsnCfg.S3SkipTlsVerify + } if jsnCfg.NATSJetStream != nil { eeOpts.NATSJetStream = jsnCfg.NATSJetStream } @@ -805,6 +821,14 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts { cln.SQSQueueID = new(string) *cln.SQSQueueID = *eeOpts.SQSQueueID } + if eeOpts.SQSForcePathStyle != nil { + cln.SQSForcePathStyle = new(bool) + *cln.SQSForcePathStyle = *eeOpts.SQSForcePathStyle + } + if eeOpts.SQSSkipTlsVerify != nil { + cln.SQSSkipTlsVerify = new(bool) + *cln.SQSSkipTlsVerify = *eeOpts.SQSSkipTlsVerify + } if eeOpts.S3BucketID != nil { cln.S3BucketID = new(string) *cln.S3BucketID = *eeOpts.S3BucketID @@ -813,6 +837,14 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts { cln.S3FolderPath = new(string) *cln.S3FolderPath = *eeOpts.S3FolderPath } + if eeOpts.S3ForcePathStyle != nil { + cln.S3ForcePathStyle = new(bool) + *cln.S3ForcePathStyle = *eeOpts.S3ForcePathStyle + } + if eeOpts.S3SkipTlsVerify != nil { + cln.S3SkipTlsVerify = new(bool) + *cln.S3SkipTlsVerify = *eeOpts.S3SkipTlsVerify + } if eeOpts.NATSJetStream != nil { cln.NATSJetStream = new(bool) *cln.NATSJetStream = *eeOpts.NATSJetStream @@ -1119,12 +1151,24 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]any { if optsEes.SQSQueueID != nil { opts[utils.SQSQueueID] = *optsEes.SQSQueueID } + if optsEes.SQSForcePathStyle != nil { + opts[utils.SQSForcePathStyle] = *optsEes.SQSForcePathStyle + } + if optsEes.SQSSkipTlsVerify != nil { + opts[utils.SQSSkipTlsVerify] = *optsEes.SQSSkipTlsVerify + } if optsEes.S3BucketID != nil { opts[utils.S3Bucket] = *optsEes.S3BucketID } if optsEes.S3FolderPath != nil { opts[utils.S3FolderPath] = *optsEes.S3FolderPath } + if optsEes.S3ForcePathStyle != nil { + opts[utils.S3ForcePathStyle] = *optsEes.S3ForcePathStyle + } + if optsEes.S3SkipTlsVerify != nil { + opts[utils.S3SkipTlsVerify] = *optsEes.S3SkipTlsVerify + } if optsEes.NATSJetStream != nil { opts[utils.NatsJetStream] = *optsEes.NATSJetStream } @@ -1231,8 +1275,12 @@ type EventExporterOptsJson struct { AWSSecret *string `json:"awsSecret"` AWSToken *string `json:"awsToken"` SQSQueueID *string `json:"sqsQueueID"` + SQSForcePathStyle *bool `json:"sqsForcePathStyle"` + SQSSkipTlsVerify *bool `json:"sqsSkipTlsVerify"` S3BucketID *string `json:"s3BucketID"` S3FolderPath *string `json:"s3FolderPath"` + S3ForcePathStyle *bool `json:"s3ForcePathStyle"` + S3SkipTlsVerify *bool `json:"s3SkipTlsVerify"` NATSJetStream *bool `json:"natsJetStream"` NATSSubject *string `json:"natsSubject"` NATSJWTFile *string `json:"natsJWTFile"` @@ -1539,6 +1587,22 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte } else { d.SQSQueueID = nil } + if v2.SQSForcePathStyle != nil { + if v1.SQSForcePathStyle == nil || + *v1.SQSForcePathStyle != *v2.SQSForcePathStyle { + d.SQSForcePathStyle = v2.SQSForcePathStyle + } + } else { + d.SQSForcePathStyle = nil + } + if v2.SQSSkipTlsVerify != nil { + if v1.SQSSkipTlsVerify == nil || + *v1.SQSSkipTlsVerify != *v2.SQSSkipTlsVerify { + d.SQSSkipTlsVerify = v2.SQSSkipTlsVerify + } + } else { + d.SQSSkipTlsVerify = nil + } if v2.S3BucketID != nil { if v1.S3BucketID == nil || *v1.S3BucketID != *v2.S3BucketID { @@ -1555,6 +1619,22 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte } else { d.S3FolderPath = nil } + if v2.S3ForcePathStyle != nil { + if v1.S3ForcePathStyle == nil || + *v1.S3ForcePathStyle != *v2.S3ForcePathStyle { + d.S3ForcePathStyle = v2.S3ForcePathStyle + } + } else { + d.S3ForcePathStyle = nil + } + if v2.S3SkipTlsVerify != nil { + if v1.S3SkipTlsVerify == nil || + *v1.S3SkipTlsVerify != *v2.S3SkipTlsVerify { + d.S3SkipTlsVerify = v2.S3SkipTlsVerify + } + } else { + d.S3SkipTlsVerify = nil + } if v2.NATSJetStream != nil { if v1.NATSJetStream == nil || *v1.NATSJetStream != *v2.NATSJetStream { diff --git a/config/eescfg_test.go b/config/eescfg_test.go index 89dc6851c..c9009c1f4 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -1218,8 +1218,12 @@ func TestDiffEventExporterOptsJsonCfg(t *testing.T) { AWSSecret: utils.StringPointer("aws_secret"), AWSToken: utils.StringPointer("aws_token"), SQSQueueID: utils.StringPointer("sqs_queue_id"), + SQSForcePathStyle: utils.BoolPointer(true), + SQSSkipTlsVerify: utils.BoolPointer(true), S3BucketID: utils.StringPointer("s3_bucket_id"), S3FolderPath: utils.StringPointer("s3_folder_path"), + S3ForcePathStyle: utils.BoolPointer(true), + S3SkipTlsVerify: utils.BoolPointer(true), NATSJetStream: utils.BoolPointer(false), NATSSubject: utils.StringPointer("ees_nats"), NATSJWTFile: utils.StringPointer("/path/to/jwt"), @@ -1268,8 +1272,12 @@ func TestDiffEventExporterOptsJsonCfg(t *testing.T) { AWSSecret: utils.StringPointer("aws_secret"), AWSToken: utils.StringPointer("aws_token"), SQSQueueID: utils.StringPointer("sqs_queue_id"), + SQSForcePathStyle: utils.BoolPointer(true), + SQSSkipTlsVerify: utils.BoolPointer(true), S3BucketID: utils.StringPointer("s3_bucket_id"), S3FolderPath: utils.StringPointer("s3_folder_path"), + S3ForcePathStyle: utils.BoolPointer(true), + S3SkipTlsVerify: utils.BoolPointer(true), NATSJetStream: utils.BoolPointer(false), NATSSubject: utils.StringPointer("ees_nats"), NATSJWTFile: utils.StringPointer("/path/to/jwt"), @@ -1345,8 +1353,12 @@ func TestEventExporterOptsClone(t *testing.T) { AWSSecret: utils.StringPointer("aws_secret"), AWSToken: utils.StringPointer("aws_token"), SQSQueueID: utils.StringPointer("sqs_queue_id"), + SQSForcePathStyle: utils.BoolPointer(true), + SQSSkipTlsVerify: utils.BoolPointer(true), S3BucketID: utils.StringPointer("s3_bucket_id"), S3FolderPath: utils.StringPointer("s3_folder_path"), + S3ForcePathStyle: utils.BoolPointer(true), + S3SkipTlsVerify: utils.BoolPointer(true), NATSJetStream: utils.BoolPointer(false), NATSSubject: utils.StringPointer("ees_nats"), NATSJWTFile: utils.StringPointer("/path/to/jwt"), @@ -1397,8 +1409,12 @@ func TestEventExporterOptsClone(t *testing.T) { AWSSecret: utils.StringPointer("aws_secret"), AWSToken: utils.StringPointer("aws_token"), SQSQueueID: utils.StringPointer("sqs_queue_id"), + SQSForcePathStyle: utils.BoolPointer(true), + SQSSkipTlsVerify: utils.BoolPointer(true), S3BucketID: utils.StringPointer("s3_bucket_id"), S3FolderPath: utils.StringPointer("s3_folder_path"), + S3ForcePathStyle: utils.BoolPointer(true), + S3SkipTlsVerify: utils.BoolPointer(true), NATSJetStream: utils.BoolPointer(false), NATSSubject: utils.StringPointer("ees_nats"), NATSJWTFile: utils.StringPointer("/path/to/jwt"), @@ -1457,8 +1473,12 @@ func TestLoadFromJSONCfg(t *testing.T) { AWSSecret: utils.StringPointer("aws_secret"), AWSToken: utils.StringPointer("aws_token"), SQSQueueID: utils.StringPointer("sqs_queue_id"), + SQSForcePathStyle: utils.BoolPointer(true), + SQSSkipTlsVerify: utils.BoolPointer(true), S3BucketID: utils.StringPointer("s3_bucket_id"), S3FolderPath: utils.StringPointer("s3_folder_path"), + S3ForcePathStyle: utils.BoolPointer(true), + S3SkipTlsVerify: utils.BoolPointer(true), NATSJetStream: utils.BoolPointer(false), NATSSubject: utils.StringPointer("ees_nats"), NATSJWTFile: utils.StringPointer("/path/to/jwt"), @@ -1508,8 +1528,12 @@ func TestLoadFromJSONCfg(t *testing.T) { AWSSecret: utils.StringPointer("aws_secret"), AWSToken: utils.StringPointer("aws_token"), SQSQueueID: utils.StringPointer("sqs_queue_id"), + SQSForcePathStyle: utils.BoolPointer(true), + SQSSkipTlsVerify: utils.BoolPointer(true), S3BucketID: utils.StringPointer("s3_bucket_id"), S3FolderPath: utils.StringPointer("s3_folder_path"), + S3ForcePathStyle: utils.BoolPointer(true), + S3SkipTlsVerify: utils.BoolPointer(true), NATSJetStream: utils.BoolPointer(false), NATSSubject: utils.StringPointer("ees_nats"), NATSJWTFile: utils.StringPointer("/path/to/jwt"), @@ -1662,8 +1686,12 @@ func TestEEsAsMapInterface(t *testing.T) { AWSSecret: utils.StringPointer("aws_secret"), AWSToken: utils.StringPointer("aws_token"), SQSQueueID: utils.StringPointer("sqs_queue_id"), + SQSForcePathStyle: utils.BoolPointer(true), + SQSSkipTlsVerify: utils.BoolPointer(true), S3BucketID: utils.StringPointer("s3_bucket_id"), S3FolderPath: utils.StringPointer("s3_folder_path"), + S3ForcePathStyle: utils.BoolPointer(true), + S3SkipTlsVerify: utils.BoolPointer(true), NATSJetStream: utils.BoolPointer(false), NATSSubject: utils.StringPointer("ees_nats"), NATSJWTFile: utils.StringPointer("/path/to/jwt"), @@ -1725,6 +1753,8 @@ func TestEEsAsMapInterface(t *testing.T) { "rpcReplyTimeout": "2s", "s3BucketID": "s3_bucket_id", "s3FolderPath": "s3_folder_path", + "s3ForcePathStyle": true, + "s3SkipTlsVerify": true, "serviceMethod": "service_method", "sqlConnMaxLifetime": "2s", "sqlDBName": "cgrates", @@ -1732,6 +1762,8 @@ func TestEEsAsMapInterface(t *testing.T) { "sqlMaxOpenConns": 10, "sqlTableName": "cdrs", "sqsQueueID": "sqs_queue_id", + "sqsForcePathStyle": true, + "sqsSkipTlsVerify": true, "pgSSLMode": "sslm", "kafkaTLS": false, "connIDs": []string{"testID"}, @@ -1798,8 +1830,12 @@ func TestEescfgNewEventExporterCfg(t *testing.T) { AWSSecret: &str, AWSToken: &str, SQSQueueID: &str, + SQSForcePathStyle: &bl, + SQSSkipTlsVerify: &bl, S3BucketID: &str, S3FolderPath: &str, + S3ForcePathStyle: &bl, + S3SkipTlsVerify: &bl, NATSJetStream: &bl, NATSSubject: &str, NATSJWTFile: &str, @@ -1902,8 +1938,12 @@ func TestEescfgloadFromJSONCfg(t *testing.T) { AWSSecret: &str, AWSToken: &str, SQSQueueID: &str, + SQSForcePathStyle: &bl, + SQSSkipTlsVerify: &bl, S3BucketID: &str, S3FolderPath: &str, + S3ForcePathStyle: &bl, + S3SkipTlsVerify: &bl, NATSJetStream: &bl, NATSSubject: &str, NATSJWTFile: &str, @@ -1976,8 +2016,12 @@ func TestEescfgloadFromJSONCfg(t *testing.T) { AWSSecret: &str, AWSToken: &str, SQSQueueID: &str, + SQSForcePathStyle: &bl, + SQSSkipTlsVerify: &bl, S3BucketID: &str, S3FolderPath: &str, + S3ForcePathStyle: &bl, + S3SkipTlsVerify: &bl, NATSJetStream: &bl, NATSSubject: &str, NATSJWTFile: &str, diff --git a/data/conf/samples/ees_cloud/cgrates.json b/data/conf/samples/ees_cloud/cgrates.json index 62d635e7f..68c44b61f 100644 --- a/data/conf/samples/ees_cloud/cgrates.json +++ b/data/conf/samples/ees_cloud/cgrates.json @@ -58,6 +58,25 @@ {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} ] }, + { + "id": "s3_nuantix_test_file", + "type": "*s3JSONMap", + "export_path": "s3.eu-central-1.amazonaws.com", + "opts": { + "awsRegion": "eu-central-1", + "awsKey": "access key ID", + "awsSecret": "secret access key", + "s3BucketID": "cgrates-cdrs", + "s3ForcePathStyle": true, + "s3SkipTlsVerify": true + }, + "attempts": 1, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "synchronous": true, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] + }, { "id": "amqpv1_test_file", "type": "*amqpv1JSONMap", diff --git a/ees/s3.go b/ees/s3.go index dc2b63ab9..6dce43352 100644 --- a/ees/s3.go +++ b/ees/s3.go @@ -20,7 +20,9 @@ package ees import ( "bytes" + "crypto/tls" "fmt" + "net/http" "sync" "github.com/aws/aws-sdk-go/aws" @@ -46,14 +48,16 @@ func NewS3EE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *S3EE { // S3EE is a s3 poster type S3EE struct { - awsRegion string - awsID string - awsKey string - awsToken string - bucket string - folderPath string - session *session.Session - up *s3manager.Uploader + awsRegion string + awsID string + awsKey string + awsToken string + bucket string + folderPath string + forcePathStyle bool + skipTlsVerify bool + session *session.Session + up *s3manager.Uploader cfg *config.EventExporterCfg em *utils.ExporterMetrics @@ -82,6 +86,12 @@ func (pstr *S3EE) parseOpts(opts *config.EventExporterOpts) { if opts.AWSToken != nil { pstr.awsToken = *opts.AWSToken } + if opts.S3ForcePathStyle != nil { + pstr.forcePathStyle = *opts.S3ForcePathStyle + } + if opts.S3SkipTlsVerify != nil { + pstr.skipTlsVerify = *opts.S3SkipTlsVerify + } } func (pstr *S3EE) Cfg() *config.EventExporterCfg { return pstr.cfg } @@ -98,6 +108,18 @@ func (pstr *S3EE) Connect() (err error) { len(pstr.awsKey) != 0 { cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken) } + if pstr.forcePathStyle { + cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints + } + if pstr.skipTlsVerify { + cfg.HTTPClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates + }, + }, + } + } pstr.session, err = session.NewSessionWithOptions( session.Options{ Config: cfg, diff --git a/ees/s3_it_test.go b/ees/s3_it_test.go index c34cf2e5b..0a5652d4f 100644 --- a/ees/s3_it_test.go +++ b/ees/s3_it_test.go @@ -175,3 +175,112 @@ package ees // t.Errorf("Expected: %q, received: %q", expected, rply) // } // } + +// var ( +// runS3NuantixTest = flag.Bool("nuantix", false, "Run the integration test for the S3 exporter from Nuantix") + +// sTestsS3Nuantix = []func(t *testing.T){ +// testS3LoadConfig, +// testS3ResetDataDB, +// testS3ResetStorDb, +// testS3StartEngine, +// testS3RPCConn, +// testS3NuantixExportEvent, +// testS3NuantixVerifyExport, +// testStopCgrEngine, +// } +// ) + +// func TestS3ExportNuantix(t *testing.T) { +// if !*runS3NuantixTest { +// t.SkipNow() +// } +// s3ConfDir = "ees_cloud" +// for _, stest := range sTestsS3Nuantix { +// t.Run(s3ConfDir, stest) +// } +// } + +// func testS3NuantixExportEvent(t *testing.T) { +// cgrID = utils.Sha1("abcdefg", time.Unix(1383813745, 0).UTC().String()) +// t.Log(cgrID) +// ev := &engine.CGREventWithEeIDs{ +// EeIDs: []string{"s3_nuantix_test_file"}, +// CGREvent: &utils.CGREvent{ +// Tenant: "cgrates.org", +// ID: "dataEvent", +// Time: utils.TimePointer(time.Now()), +// Event: map[string]any{ +// utils.CGRID: cgrID, +// utils.ToR: utils.MetaData, +// utils.OriginID: "abcdefg", +// utils.OriginHost: "192.168.1.1", +// utils.RequestType: utils.MetaRated, +// utils.Tenant: "AnotherTenant", +// utils.Category: "call", //for data CDR use different Tenant +// utils.AccountField: "1001", +// utils.Subject: "1001", +// utils.Destination: "1002", +// utils.SetupTime: time.Unix(1383813745, 0).UTC(), +// utils.AnswerTime: time.Unix(1383813746, 0).UTC(), +// utils.Usage: 10 * time.Nanosecond, +// utils.RunID: utils.MetaDefault, +// utils.Cost: 0.012, +// }, +// }, +// } + +// var reply map[string]utils.MapStorage +// if err := s3RPC.Call(context.Background(), utils.EeSv1ProcessEvent, ev, &reply); err != nil { +// t.Error(err) +// } +// time.Sleep(2 * time.Second) +// } + +// func testS3NuantixVerifyExport(t *testing.T) { +// endpoint := "s3.eu-central-1.amazonaws.com" +// region := "eu-central-1" +// qname := "cgrates-cdrs" + +// key := fmt.Sprintf("%s/%s:%s.json", "", cgrID, utils.MetaDefault) + +// var sess *session.Session +// cfg := aws.Config{Endpoint: aws.String(endpoint)} +// cfg.Region = aws.String(region) + +// cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "") +// var err error +// cfg.S3ForcePathStyle = aws.Bool(true) +// cfg.HTTPClient = &http.Client{ +// Transport: &http.Transport{ +// TLSClientConfig: &tls.Config{ +// InsecureSkipVerify: true, +// }, +// }, +// } +// sess, err = session.NewSessionWithOptions( +// session.Options{ +// Config: cfg, +// }, +// ) +// if err != nil { +// t.Error(err) +// } +// s3Clnt := s3.New(sess) +// s3Clnt.DeleteObject(&s3.DeleteObjectInput{}) +// file := aws.NewWriteAtBuffer([]byte{}) +// svc := s3manager.NewDownloader(sess) + +// if _, err = svc.Download(file, +// &s3.GetObjectInput{ +// Bucket: aws.String(qname), +// Key: aws.String(key), +// }); err != nil { +// t.Fatalf("Unable to download item %v", err) +// } + +// expected := `{"Account":"1001","CGRID":"5a5ff7c40039976e1c2bd303dee45983267f6ed2","Category":"call","Destination":"1002","OriginID":"abcdefg","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"AnotherTenant","ToR":"*data"}` +// if rply := string(file.Bytes()); rply != expected { +// t.Errorf("Expected: %q, received: %q", expected, rply) +// } +// } diff --git a/ees/sqs.go b/ees/sqs.go index 2ffcf8706..ce10537a7 100644 --- a/ees/sqs.go +++ b/ees/sqs.go @@ -19,6 +19,8 @@ along with this program. If not, see package ees import ( + "crypto/tls" + "net/http" "sync" "github.com/aws/aws-sdk-go/aws" @@ -44,14 +46,16 @@ func NewSQSee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *SQSee { // SQSee is a poster for sqs type SQSee struct { - awsRegion string - awsID string - awsKey string - awsToken string - queueURL *string - queueID string - session *session.Session - svc *sqs.SQS + awsRegion string + awsID string + awsKey string + awsToken string + queueURL *string + queueID string + forcePathStyle bool + skipTlsVerify bool + session *session.Session + svc *sqs.SQS cfg *config.EventExporterCfg em *utils.ExporterMetrics @@ -77,6 +81,12 @@ func (pstr *SQSee) parseOpts(opts *config.EventExporterOpts) { if opts.AWSToken != nil { pstr.awsToken = *opts.AWSToken } + if opts.SQSForcePathStyle != nil { + pstr.forcePathStyle = *opts.SQSForcePathStyle + } + if opts.SQSSkipTlsVerify != nil { + pstr.skipTlsVerify = *opts.SQSSkipTlsVerify + } } func (pstr *SQSee) Cfg() *config.EventExporterCfg { return pstr.cfg } @@ -93,6 +103,18 @@ func (pstr *SQSee) Connect() (err error) { len(pstr.awsKey) != 0 { cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken) } + if pstr.forcePathStyle { + cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints + } + if pstr.skipTlsVerify { + cfg.HTTPClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates + }, + }, + } + } pstr.session, err = session.NewSessionWithOptions( session.Options{ Config: cfg, diff --git a/utils/consts.go b/utils/consts.go index d38106bf2..f752ccd10 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2863,11 +2863,15 @@ const ( AWSToken = "awsToken" // sqs - SQSQueueID = "sqsQueueID" + SQSQueueID = "sqsQueueID" + SQSForcePathStyle = "sqsForcePathStyle" + SQSSkipTlsVerify = "sqsSkipTlsVerify" // s3 - S3Bucket = "s3BucketID" - S3FolderPath = "s3FolderPath" + S3Bucket = "s3BucketID" + S3FolderPath = "s3FolderPath" + S3ForcePathStyle = "s3ForcePathStyle" + S3SkipTlsVerify = "s3SkipTlsVerify" // sql SQLDefaultDBName = "cgrates"