From a8895a6a5a0566afc632e955c6e8570094999eb7 Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Thu, 4 Dec 2025 18:17:22 +0200 Subject: [PATCH] Add options to support non-Amazon S3 and SQS --- config/config_defaults.go | 4 + config/eescfg.go | 58 ++++++++++-- config/eescfg_test.go | 34 +++++-- config/libconfig_json.go | 4 + data/conf/samples/ees_cloud/cgrates.json | 19 ++++ ees/s3.go | 38 ++++++-- ees/s3_it_test.go | 111 +++++++++++++++++++++++ ees/s3_test.go | 36 +++++--- ees/sqs.go | 38 ++++++-- utils/consts.go | 10 +- 10 files changed, 307 insertions(+), 45 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index e0185336a..aa8acd273 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -627,10 +627,14 @@ const CGRATES_CFG_JSON = ` //SQS // "sqsQueueID": "cgrates_cdrs", // the queue id for SQS exporters from were the events are exported + // "sqsForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY) + // "sqsSkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate // S3 // "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from where the events that are exported // "s3FolderPath": "", // S3FolderPath + // "s3ForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY) + // "s3SkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate // Nats // "natsJetStream": false, // controls if the nats poster uses the JetStream diff --git a/config/eescfg.go b/config/eescfg.go index 65bbc0e35..bf9f7dc9d 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -221,13 +221,17 @@ type AMQPOpts struct { } type AWSOpts struct { - Region *string - Key *string - Secret *string - Token *string - SQSQueueID *string - S3BucketID *string - S3FolderPath *string + Region *string + Key *string + Secret *string + Token *string + SQSQueueID *string + SQSForcePathStyle *bool + SQSSkipTlsVerify *bool + S3BucketID *string + S3FolderPath *string + S3ForcePathStyle *bool + S3SkipTlsVerify *bool } type NATSOpts struct { @@ -484,12 +488,24 @@ func (awsOpts *AWSOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err erro if jsnCfg.SQSQueueID != nil { awsOpts.SQSQueueID = jsnCfg.SQSQueueID } + if jsnCfg.SQSForcePathStyle != nil { + awsOpts.SQSForcePathStyle = jsnCfg.SQSForcePathStyle + } + if jsnCfg.SQSSkipTlsVerify != nil { + awsOpts.SQSSkipTlsVerify = jsnCfg.SQSSkipTlsVerify + } if jsnCfg.S3BucketID != nil { awsOpts.S3BucketID = jsnCfg.S3BucketID } if jsnCfg.S3FolderPath != nil { awsOpts.S3FolderPath = jsnCfg.S3FolderPath } + if jsnCfg.S3ForcePathStyle != nil { + awsOpts.S3ForcePathStyle = jsnCfg.S3ForcePathStyle + } + if jsnCfg.S3SkipTlsVerify != nil { + awsOpts.S3SkipTlsVerify = jsnCfg.S3SkipTlsVerify + } return } func (natsOpts *NATSOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { @@ -907,6 +923,14 @@ func (awsOpts *AWSOpts) Clone() *AWSOpts { cln.SQSQueueID = new(string) *cln.SQSQueueID = *awsOpts.SQSQueueID } + if awsOpts.SQSForcePathStyle != nil { + cln.SQSForcePathStyle = new(bool) + *cln.SQSForcePathStyle = *awsOpts.SQSForcePathStyle + } + if awsOpts.SQSSkipTlsVerify != nil { + cln.SQSSkipTlsVerify = new(bool) + *cln.SQSSkipTlsVerify = *awsOpts.SQSSkipTlsVerify + } if awsOpts.S3BucketID != nil { cln.S3BucketID = new(string) *cln.S3BucketID = *awsOpts.S3BucketID @@ -915,6 +939,14 @@ func (awsOpts *AWSOpts) Clone() *AWSOpts { cln.S3FolderPath = new(string) *cln.S3FolderPath = *awsOpts.S3FolderPath } + if awsOpts.S3ForcePathStyle != nil { + cln.S3ForcePathStyle = new(bool) + *cln.S3ForcePathStyle = *awsOpts.S3ForcePathStyle + } + if awsOpts.S3SkipTlsVerify != nil { + cln.S3SkipTlsVerify = new(bool) + *cln.S3SkipTlsVerify = *awsOpts.S3SkipTlsVerify + } return cln } @@ -1233,12 +1265,24 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str if awsOpts.SQSQueueID != nil { opts[utils.SQSQueueID] = *awsOpts.SQSQueueID } + if awsOpts.SQSForcePathStyle != nil { + opts[utils.SQSForcePathStyle] = *awsOpts.SQSForcePathStyle + } + if awsOpts.SQSSkipTlsVerify != nil { + opts[utils.SQSSkipTlsVerify] = *awsOpts.SQSSkipTlsVerify + } if awsOpts.S3BucketID != nil { opts[utils.S3Bucket] = *awsOpts.S3BucketID } if awsOpts.S3FolderPath != nil { opts[utils.S3FolderPath] = *awsOpts.S3FolderPath } + if awsOpts.S3ForcePathStyle != nil { + opts[utils.S3ForcePathStyle] = *awsOpts.S3ForcePathStyle + } + if awsOpts.S3SkipTlsVerify != nil { + opts[utils.S3SkipTlsVerify] = *awsOpts.S3SkipTlsVerify + } } if natOpts := eeC.Opts.NATS; natOpts != nil { if natOpts.JetStream != nil { diff --git a/config/eescfg_test.go b/config/eescfg_test.go index 03b04e086..56e80aa4a 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -78,7 +78,11 @@ func TestEESClone(t *testing.T) { "awsKey":"key", "awsSecret":"secretkey", "sqsQueueID":"sqsid", + "sqsForcePathStyle": true, + "sqsSkipTlsVerify": true, "s3BucketID":"s3", + "s3ForcePathStyle": true, + "s3SkipTlsVerify": true, "rpcCodec":"rpc", "serviceMethod":"service", "keyPath":"path", @@ -315,13 +319,17 @@ func TestEESClone(t *testing.T) { Topic: utils.StringPointer("kafka"), }, AWS: &AWSOpts{ - Token: utils.StringPointer("token"), - S3FolderPath: utils.StringPointer("s3"), - Region: utils.StringPointer("eu"), - Key: utils.StringPointer("key"), - Secret: utils.StringPointer("secretkey"), - S3BucketID: utils.StringPointer("s3"), - SQSQueueID: utils.StringPointer("sqsid"), + Token: utils.StringPointer("token"), + S3FolderPath: utils.StringPointer("s3"), + Region: utils.StringPointer("eu"), + Key: utils.StringPointer("key"), + Secret: utils.StringPointer("secretkey"), + S3BucketID: utils.StringPointer("s3"), + S3ForcePathStyle: utils.BoolPointer(true), + S3SkipTlsVerify: utils.BoolPointer(true), + SQSQueueID: utils.StringPointer("sqsid"), + SQSForcePathStyle: utils.BoolPointer(true), + SQSSkipTlsVerify: utils.BoolPointer(true), }, NATS: &NATSOpts{ JetStream: utils.BoolPointer(true), @@ -1109,7 +1117,11 @@ func TestEEsCfgAsMapInterface(t *testing.T) { "awsKey": "key", "awsSecret": "secretkey", "sqsQueueID": "sqsid", + "sqsForcePathStyle": true, + "sqsSkipTlsVerify": true, "s3BucketID": "s3", + "s3ForcePathStyle": true, + "s3SkipTlsVerify": true, "rpcCodec": "rpc", "serviceMethod": "service", "keyPath": "path", @@ -1265,7 +1277,11 @@ func TestEEsCfgAsMapInterface(t *testing.T) { utils.AWSKey: "key", utils.AWSSecret: "secretkey", utils.SQSQueueID: "sqsid", + utils.SQSForcePathStyle: true, + utils.SQSSkipTlsVerify: true, utils.S3Bucket: "s3", + utils.S3ForcePathStyle: true, + utils.S3SkipTlsVerify: true, utils.RpcCodec: "rpc", utils.ServiceMethod: "service", utils.KeyPath: "path", @@ -1419,8 +1435,12 @@ func TestEEsCfgloadFromJSONCfg(t *testing.T) { AWSSecret: &str, AWSToken: &str, SQSQueueID: &str, + SQSForcePathStyle: &bl, + SQSSkipTlsVerify: &bl, S3BucketID: &str, S3FolderPath: &str, + S3ForcePathStyle: &bl, + S3SkipTlsVerify: &bl, NATSJetStream: &bl, NATSSubject: &str, NATSJWTFile: &str, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 10d2a849f..97cd463b9 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -370,8 +370,12 @@ type EventExporterOptsJson struct { AWSSecret *string `json:"awsSecret"` AWSToken *string `json:"awsToken"` SQSQueueID *string `json:"sqsQueueID"` + SQSForcePathStyle *bool `json:"sqsForcePathStyle"` + SQSSkipTlsVerify *bool `json:"sqsSkipTlsVerify"` S3BucketID *string `json:"s3BucketID"` S3FolderPath *string `json:"s3FolderPath"` + S3ForcePathStyle *bool `json:"s3ForcePathStyle"` + S3SkipTlsVerify *bool `json:"s3SkipTlsVerify"` NATSJetStream *bool `json:"natsJetStream"` NATSSubject *string `json:"natsSubject"` NATSJWTFile *string `json:"natsJWTFile"` diff --git a/data/conf/samples/ees_cloud/cgrates.json b/data/conf/samples/ees_cloud/cgrates.json index 0c498cfd0..fc268a7f8 100644 --- a/data/conf/samples/ees_cloud/cgrates.json +++ b/data/conf/samples/ees_cloud/cgrates.json @@ -54,6 +54,25 @@ {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} ] }, + { + "id": "s3_nuantix_test_file", + "type": "*s3_json_map", + "export_path": "s3.eu-central-1.amazonaws.com", + "opts": { + "awsRegion": "eu-central-1", + "awsKey": "set-using-flags", + "awsSecret": "set-using-flags", + "s3BucketID": "cgrates-cdrs", + "s3ForcePathStyle": true, + "s3SkipTlsVerify": true + }, + "attempts": 1, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "synchronous": true, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] + }, { "id": "amqpv1_test_file", "type": "*amqpv1_json_map", diff --git a/ees/s3.go b/ees/s3.go index 6e5864678..9cdb38678 100644 --- a/ees/s3.go +++ b/ees/s3.go @@ -20,7 +20,9 @@ package ees import ( "bytes" + "crypto/tls" "fmt" + "net/http" "sync" "github.com/aws/aws-sdk-go/aws" @@ -44,14 +46,16 @@ func NewS3EE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *S3EE { // S3EE is a s3 poster type S3EE struct { - awsRegion string - awsID string - awsKey string - awsToken string - bucket string - folderPath string - session *session.Session - up *s3manager.Uploader + awsRegion string + awsID string + awsKey string + awsToken string + bucket string + folderPath string + forcePathStyle bool + skipTlsVerify bool + session *session.Session + up *s3manager.Uploader cfg *config.EventExporterCfg em *utils.ExporterMetrics @@ -81,6 +85,12 @@ func (pstr *S3EE) parseOpts(opts *config.EventExporterOpts) { if s3Opts.Token != nil { pstr.awsToken = *s3Opts.Token } + if s3Opts.S3ForcePathStyle != nil { + pstr.forcePathStyle = *s3Opts.S3ForcePathStyle + } + if s3Opts.S3SkipTlsVerify != nil { + pstr.skipTlsVerify = *s3Opts.S3SkipTlsVerify + } } } @@ -98,6 +108,18 @@ func (pstr *S3EE) Connect() (err error) { len(pstr.awsKey) != 0 { cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken) } + if pstr.forcePathStyle { + cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints + } + if pstr.skipTlsVerify { + cfg.HTTPClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates + }, + }, + } + } pstr.session, err = session.NewSessionWithOptions( session.Options{ Config: cfg, diff --git a/ees/s3_it_test.go b/ees/s3_it_test.go index e9d4c860a..d0415fee9 100644 --- a/ees/s3_it_test.go +++ b/ees/s3_it_test.go @@ -22,8 +22,10 @@ along with this program. If not, see package ees import ( + "crypto/tls" "flag" "fmt" + "net/http" "path" "testing" "time" @@ -185,3 +187,112 @@ func testS3VerifyExport(t *testing.T) { t.Errorf("Expected: %q, received: %q", expected, rply) } } + +var ( + runS3NuantixTest = flag.Bool("nuantix", false, "Run the integration test for the S3 exporter from Nuantix") + + sTestsS3Nuantix = []func(t *testing.T){ + testS3LoadConfig, + testS3ResetDataDB, + testS3ResetStorDb, + testS3StartEngine, + testS3RPCConn, + testS3NuantixExportEvent, + testS3NuantixVerifyExport, + testStopCgrEngine, + } +) + +func TestS3ExportNuantix(t *testing.T) { + if !*runS3NuantixTest { + t.SkipNow() + } + s3ConfDir = "ees_cloud" + for _, stest := range sTestsS3Nuantix { + t.Run(s3ConfDir, stest) + } +} + +func testS3NuantixExportEvent(t *testing.T) { + cgrID = utils.Sha1("abcdefg", time.Unix(1383813745, 0).UTC().String()) + t.Log(cgrID) + ev := &engine.CGREventWithEeIDs{ + EeIDs: []string{"s3_nuantix_test_file"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "dataEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]any{ + utils.CGRID: cgrID, + utils.ToR: utils.MetaData, + utils.OriginID: "abcdefg", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "AnotherTenant", + utils.Category: "call", //for data CDR use different Tenant + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Nanosecond, + utils.RunID: utils.MetaDefault, + utils.Cost: 0.012, + }, + }, + } + + var reply map[string]utils.MapStorage + if err := s3RPC.Call(context.Background(), utils.EeSv1ProcessEvent, ev, &reply); err != nil { + t.Error(err) + } + time.Sleep(2 * time.Second) +} + +func testS3NuantixVerifyExport(t *testing.T) { + endpoint := "s3.eu-central-1.amazonaws.com" + region := "eu-central-1" + qname := "cgrates-cdrs" + + key := fmt.Sprintf("%s/%s:%s.json", "", cgrID, utils.MetaDefault) + + var sess *session.Session + cfg := aws.Config{Endpoint: aws.String(endpoint)} + cfg.Region = aws.String(region) + + cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "") + var err error + cfg.S3ForcePathStyle = aws.Bool(true) + cfg.HTTPClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + } + sess, err = session.NewSessionWithOptions( + session.Options{ + Config: cfg, + }, + ) + if err != nil { + t.Error(err) + } + s3Clnt := s3.New(sess) + s3Clnt.DeleteObject(&s3.DeleteObjectInput{}) + file := aws.NewWriteAtBuffer([]byte{}) + svc := s3manager.NewDownloader(sess) + + if _, err = svc.Download(file, + &s3.GetObjectInput{ + Bucket: aws.String(qname), + Key: aws.String(key), + }); err != nil { + t.Fatalf("Unable to download item %v", err) + } + + expected := `{"Account":"1001","CGRID":"5a5ff7c40039976e1c2bd303dee45983267f6ed2","Category":"call","Destination":"1002","OriginID":"abcdefg","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"AnotherTenant","ToR":"*data"}` + if rply := string(file.Bytes()); rply != expected { + t.Errorf("Expected: %q, received: %q", expected, rply) + } +} diff --git a/ees/s3_test.go b/ees/s3_test.go index a807eaa0e..2cd955c42 100644 --- a/ees/s3_test.go +++ b/ees/s3_test.go @@ -66,25 +66,31 @@ func TestParseOptsAllFieldsSet(t *testing.T) { key := "id" secret := "key" token := "token" + fps := true + stv := true opts := &config.EventExporterOpts{ AWS: &config.AWSOpts{ - S3BucketID: &bucketID, - S3FolderPath: &folderPath, - Region: ®ion, - Key: &key, - Secret: &secret, - Token: &token, + S3BucketID: &bucketID, + S3FolderPath: &folderPath, + Region: ®ion, + Key: &key, + Secret: &secret, + Token: &token, + S3ForcePathStyle: &fps, + S3SkipTlsVerify: &stv, }, } expected := S3EE{ - bucket: bucketID, - folderPath: folderPath, - awsRegion: region, - awsID: key, - awsKey: secret, - awsToken: token, + bucket: bucketID, + folderPath: folderPath, + awsRegion: region, + awsID: key, + awsKey: secret, + awsToken: token, + forcePathStyle: true, + skipTlsVerify: true, } s3ee := &S3EE{} @@ -107,4 +113,10 @@ func TestParseOptsAllFieldsSet(t *testing.T) { if s3ee.awsToken != expected.awsToken { t.Errorf("Expected awsToken %s, got %s", expected.awsToken, s3ee.awsToken) } + if s3ee.forcePathStyle != expected.forcePathStyle { + t.Errorf("Expected forcePathStyle %v, got %v", expected.forcePathStyle, s3ee.forcePathStyle) + } + if s3ee.skipTlsVerify != expected.skipTlsVerify { + t.Errorf("Expected skipTLSVerify %v, got %v", expected.skipTlsVerify, s3ee.skipTlsVerify) + } } diff --git a/ees/sqs.go b/ees/sqs.go index 116b85eb1..7c619091c 100644 --- a/ees/sqs.go +++ b/ees/sqs.go @@ -19,6 +19,8 @@ along with this program. If not, see package ees import ( + "crypto/tls" + "net/http" "sync" "github.com/aws/aws-sdk-go/aws" @@ -43,14 +45,16 @@ func NewSQSee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *SQSee { // SQSee is a poster for sqs type SQSee struct { - awsRegion string - awsID string - awsKey string - awsToken string - queueURL *string - queueID string - session *session.Session - svc *sqs.SQS + awsRegion string + awsID string + awsKey string + awsToken string + queueURL *string + queueID string + forcePathStyle bool + skipTlsVerify bool + session *session.Session + svc *sqs.SQS cfg *config.EventExporterCfg em *utils.ExporterMetrics @@ -77,6 +81,12 @@ func (pstr *SQSee) parseOpts(opts *config.EventExporterOpts) { if sqsOpts.Token != nil { pstr.awsToken = *sqsOpts.Token } + if sqsOpts.SQSForcePathStyle != nil { + pstr.forcePathStyle = *sqsOpts.SQSForcePathStyle + } + if sqsOpts.SQSSkipTlsVerify != nil { + pstr.skipTlsVerify = *sqsOpts.SQSSkipTlsVerify + } } } @@ -94,6 +104,18 @@ func (pstr *SQSee) Connect() (err error) { len(pstr.awsKey) != 0 { cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken) } + if pstr.forcePathStyle { + cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints + } + if pstr.skipTlsVerify { + cfg.HTTPClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates + }, + }, + } + } pstr.session, err = session.NewSessionWithOptions( session.Options{ Config: cfg, diff --git a/utils/consts.go b/utils/consts.go index b02ade0a5..4df816ca4 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -3056,11 +3056,15 @@ const ( AWSToken = "awsToken" // sqs - SQSQueueID = "sqsQueueID" + SQSQueueID = "sqsQueueID" + SQSForcePathStyle = "sqsForcePathStyle" + SQSSkipTlsVerify = "sqsSkipTlsVerify" // s3 - S3Bucket = "s3BucketID" - S3FolderPath = "s3FolderPath" + S3Bucket = "s3BucketID" + S3FolderPath = "s3FolderPath" + S3ForcePathStyle = "s3ForcePathStyle" + S3SkipTlsVerify = "s3SkipTlsVerify" // sql SQLDefaultDBName = "cgrates"