From a6b8af0e96c4cf58fee039b94f1b6acbf2bd738c Mon Sep 17 00:00:00 2001 From: andronache Date: Mon, 10 May 2021 15:59:12 +0300 Subject: [PATCH] Finished changing opts for Event Exporter --- apier/v1/apier_it_test.go | 2 +- config/config_defaults.go | 19 ++++--- .../samples/cdrsexport_internal/cgrates.json | 52 +++++++++---------- .../samples/cdrsexport_mongo/cgrates.json | 8 +-- .../samples/cdrsexport_mysql/cgrates.json | 8 +-- .../cdrsreplicationmaster.json | 4 +- .../cdrsreplicationmaster.json | 4 +- .../cdrsv_failover_internal/cgrates.json | 2 +- .../samples/cdrsv_failover_mongo/cgrates.json | 2 +- .../samples/cdrsv_failover_mysql/cgrates.json | 2 +- data/conf/samples/tutmongo2/cgrates.json | 2 +- data/conf/samples/tutmongo2_gob/cgrates.json | 2 +- engine/libcdre.go | 5 +- engine/libcdre_test.go | 4 +- engine/poster_test.go | 2 +- engine/pstr_amqp.go | 2 +- engine/pstr_amqpv1.go | 2 +- engine/pstr_s3.go | 2 +- engine/pstr_sqs.go | 2 +- engine/z_poster_it_test.go | 18 +++---- ers/amqp.go | 2 +- ers/amqp_it_test.go | 2 +- ers/amqp_test.go | 2 +- ers/amqpv1.go | 2 +- ers/amqpv1_it_test.go | 2 +- ers/s3.go | 2 +- ers/s3_it_test.go | 2 +- ers/s3_test.go | 4 +- ers/sqs.go | 2 +- ers/sqs_it_test.go | 2 +- ers/sqs_test.go | 12 ++--- utils/consts.go | 5 +- 32 files changed, 94 insertions(+), 89 deletions(-) diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index 1d5c31971..f1676d619 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -2149,7 +2149,7 @@ func testApierReplayFldPosts(t *testing.T) { ev = &engine.ExportEvents{ Path: "amqp://guest:guest@localhost:5672/", Opts: map[string]interface{}{ - "queueID": "cgrates_cdrs", + "amqpQueueID": "cgrates_cdrs", }, Format: utils.MetaAMQPjsonMap, Events: []interface{}{bev}, diff --git a/config/config_defaults.go b/config/config_defaults.go index 3630d298d..17d0c365c 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -451,15 +451,11 @@ const CGRATES_CFG_JSON = ` "type": "*none", // exporter type "export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed "opts": { - - // General - // "queueID": "cgrates_cdrs", // the queue id where events are exported - // CSV // "csvFieldSeparator": ",", // separator used when reading the fields - + // Elasticsearch options // "elsIndex": "", // ElsIndex // "elsIfPrimaryTerm": 0, // ElsIfPrimaryTerm @@ -487,14 +483,17 @@ const CGRATES_CFG_JSON = ` // Kafka // "kafkaTopic": "cgrates", // the topic from where the events are exported - + // AMQP - // "amqpRoutingKey": "", // RoutingKey - // "sqlExchange": "", // Exchange - // "sqlExchangeType": "", // ExchangeType - + // "amqpQueueID": "cgrates_cdrs", // the queue id for AMQP exporters from were the events are exported + // "amqpRoutingKey": "", // RoutingKey + // "sqlExchange": "", // Exchange + // "sqlExchangeType": "", // ExchangeType + // SQS and S3 + // "sqsQueueID": "cgrates_cdrs", // the queue id for SQS exporters from were the events are exported + // "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from where the events that are exported // "awsRegion": "", // AWSRegion // "awsKey": "", // AWSKey // "awsSecret": "", // AWSSecret diff --git a/data/conf/samples/cdrsexport_internal/cgrates.json b/data/conf/samples/cdrsexport_internal/cgrates.json index fa10a2e00..cd56ee3da 100644 --- a/data/conf/samples/cdrsexport_internal/cgrates.json +++ b/data/conf/samples/cdrsexport_internal/cgrates.json @@ -11,12 +11,12 @@ }, "data_db": { - "db_type": "*internal", + "db_type": "*internal" }, "stor_db": { - "db_type": "*internal", + "db_type": "*internal" }, @@ -26,13 +26,13 @@ "attributes": { - "enabled": true, + "enabled": true }, "chargers": { "enabled": true, - "attributes_conns": ["*internal"], + "attributes_conns": ["*internal"] }, @@ -56,37 +56,37 @@ "tenant": "cgrates.org", "attempts": 1, "fields":[ - {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, - ], + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] }, { "id": "amqp_localhost", "type": "*amqp_json_map", "export_path": "amqp://guest:guest@localhost:5672/", "opts": { - "queueID": "cgrates_cdrs", + "amqpQueueID": "cgrates_cdrs", "sqlExchange": "exchangename", "sqlExchangeType": "fanout", - "amqpRoutingKey": "cgr_cdrs", + "amqpRoutingKey": "cgr_cdrs" }, "tenant": "cgrates.org", "attempts": 20, "fields":[ - {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, - ], + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] }, { "id": "aws_test_file", "type": "*amqpv1_json_map", "export_path": "amqps://guest:guest@localhost:95672/", "opts": { - "queueID": "cgrates_cdrs", + "amqpQueueID": "cgrates_cdrs" }, "tenant": "cgrates.org", "attempts": 1, "fields":[ - {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, - ], + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] }, { "id": "sqs_test_file", @@ -98,26 +98,26 @@ "awsRegion": "eu-west-2", "awsKey": "testkey", "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", + "SQSQueueID": "cgrates-cdrs" }, "tenant": "cgrates.org", "attempts": 1, "fields":[ - {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, - ], + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] }, { "id": "kafka_localhost", "type": "*kafka_json_map", "export_path": "127.0.0.1:9092", "opts":{ - "kafkaTopic": "cgrates_cdrs", + "kafkaTopic": "cgrates_cdrs" }, "tenant": "cgrates.org", "attempts": 10, "fields":[ - {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, - ], + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] }, { "id": "s3_test_file", @@ -129,15 +129,15 @@ "awsRegion": "eu-west-2", "awsKey": "testkey", "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", + "s3BucketID": "cgrates-cdrs" }, "tenant": "cgrates.org", "attempts": 1, "fields":[ - {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, - ], - }, - ], + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] + } + ] }, @@ -159,8 +159,8 @@ "apiers": { - "enabled": true, -}, + "enabled": true +} } diff --git a/data/conf/samples/cdrsexport_mongo/cgrates.json b/data/conf/samples/cdrsexport_mongo/cgrates.json index a858058ee..dd48d11ab 100644 --- a/data/conf/samples/cdrsexport_mongo/cgrates.json +++ b/data/conf/samples/cdrsexport_mongo/cgrates.json @@ -69,7 +69,7 @@ "type": "*amqp_json_map", "export_path": "amqp://guest:guest@localhost:5672/", "opts": { - "queueID": "cgrates_cdrs", + "amqpQueueID": "cgrates_cdrs", "sqlExchange": "exchangename", "sqlExchangeType": "fanout", "amqpRoutingKey": "cgr_cdrs", @@ -85,7 +85,7 @@ "type": "*amqpv1_json_map", "export_path": "amqps://guest:guest@localhost:95672/", "opts": { - "queueID": "cgrates_cdrs", + "amqpQueueID": "cgrates_cdrs", }, "tenant": "cgrates.org", "attempts": 1, @@ -103,7 +103,7 @@ "awsRegion": "eu-west-2", "awsKey": "testkey", "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", + "SQSQueueID": "cgrates-cdrs", }, "tenant": "cgrates.org", "attempts": 1, @@ -134,7 +134,7 @@ "awsRegion": "eu-west-2", "awsKey": "testkey", "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", + "s3BucketID": "cgrates-cdrs", }, "tenant": "cgrates.org", "attempts": 1, diff --git a/data/conf/samples/cdrsexport_mysql/cgrates.json b/data/conf/samples/cdrsexport_mysql/cgrates.json index 675b990c0..2ca4d0156 100644 --- a/data/conf/samples/cdrsexport_mysql/cgrates.json +++ b/data/conf/samples/cdrsexport_mysql/cgrates.json @@ -66,7 +66,7 @@ "type": "*amqp_json_map", "export_path": "amqp://guest:guest@localhost:5672/", "opts": { - "queueID": "cgrates_cdrs", + "amqpQueueID": "cgrates_cdrs", "sqlExchange": "exchangename", "sqlExchangeType": "fanout", "amqpRoutingKey": "cgr_cdrs", @@ -82,7 +82,7 @@ "type": "*amqpv1_json_map", "export_path": "amqps://guest:guest@localhost:95672/", "opts": { - "queueID": "cgrates_cdrs", + "amqpQueueID": "cgrates_cdrs", }, "tenant": "cgrates.org", "attempts": 1, @@ -99,7 +99,7 @@ "awsRegion": "eu-west-2", "awsKey": "testkey", "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", + "SQSQueueID": "cgrates-cdrs", }, "tenant": "cgrates.org", "attempts": 1, @@ -129,7 +129,7 @@ "awsRegion": "eu-west-2", "awsKey": "testkey", "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", + "s3BucketID": "cgrates-cdrs", }, "tenant": "cgrates.org", "attempts": 1, diff --git a/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json b/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json index 6f8d424bc..856cffdc1 100644 --- a/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json +++ b/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json @@ -78,7 +78,7 @@ "type": "*amqp_json_map", "export_path": "amqp://guest:guest@localhost:5672/", "opts": { - "queueID": "cgrates_cdrs", + "amqpQueueID": "cgrates_cdrs", "sqlExchange": "exchangename", "sqlExchangeType": "fanout", "amqpRoutingKey": "cgr_cdrs", @@ -132,7 +132,7 @@ "type": "*amqp_json_map", "export_path": "amqp://guest:guest@wrongurl:25672/", "opts": { - "queueID": "cgrates_cdrs", + "amqpQueueID": "cgrates_cdrs", }, "tenant": "cgrates.org", "filters":["*string:~*ec.Cost:100"], diff --git a/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json b/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json index 2dbfeb95a..3083cfc1f 100644 --- a/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json +++ b/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json @@ -76,7 +76,7 @@ "type": "*amqp_json_map", "export_path": "amqp://guest:guest@localhost:5672/", "opts": { - "queueID": "cgrates_cdrs", + "amqpQueueID": "cgrates_cdrs", "sqlExchange": "exchangename", "sqlExchangeType": "fanout", "amqpRoutingKey": "cgr_cdrs", @@ -130,7 +130,7 @@ "type": "*amqp_json_map", "export_path": "amqp://guest:guest@wrongurl:25672/", "opts": { - "queueID": "cgrates_cdrs", + "amqpQueueID": "cgrates_cdrs", }, "tenant": "cgrates.org", "filters":["*string:~*ec.Cost:100"], diff --git a/data/conf/samples/cdrsv_failover_internal/cgrates.json b/data/conf/samples/cdrsv_failover_internal/cgrates.json index 1c7ef10d0..5a7b7eae7 100644 --- a/data/conf/samples/cdrsv_failover_internal/cgrates.json +++ b/data/conf/samples/cdrsv_failover_internal/cgrates.json @@ -76,7 +76,7 @@ "awsRegion": "eu-west-2", "awsKey": "testkey", "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", + "s3BucketID": "cgrates-cdrs", // "awsToken": "sessionToken", }, "tenant": "cgrates.org", diff --git a/data/conf/samples/cdrsv_failover_mongo/cgrates.json b/data/conf/samples/cdrsv_failover_mongo/cgrates.json index 78b24b130..1cd5d3acd 100644 --- a/data/conf/samples/cdrsv_failover_mongo/cgrates.json +++ b/data/conf/samples/cdrsv_failover_mongo/cgrates.json @@ -85,7 +85,7 @@ "awsRegion": "eu-west-2", "awsKey": "testkey", "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", + "s3BucketID": "cgrates-cdrs", // "awsToken": "sessionToken", }, "tenant": "cgrates.org", diff --git a/data/conf/samples/cdrsv_failover_mysql/cgrates.json b/data/conf/samples/cdrsv_failover_mysql/cgrates.json index 176360583..ba50d399e 100644 --- a/data/conf/samples/cdrsv_failover_mysql/cgrates.json +++ b/data/conf/samples/cdrsv_failover_mysql/cgrates.json @@ -82,7 +82,7 @@ "awsRegion": "eu-west-2", "awsKey": "testkey", "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", + "s3BucketID": "cgrates-cdrs", // "awsToken": "sessionToken", }, "tenant": "cgrates.org", diff --git a/data/conf/samples/tutmongo2/cgrates.json b/data/conf/samples/tutmongo2/cgrates.json index de47c2d11..85ffe6905 100644 --- a/data/conf/samples/tutmongo2/cgrates.json +++ b/data/conf/samples/tutmongo2/cgrates.json @@ -61,7 +61,7 @@ "type": "*amqp_json_map", "export_path": "amqp://guest:guest@localhost:5672/", "opts": { - "queueID":"cgrates_cdrs", + "amqpQueueID":"cgrates_cdrs", }, "tenant": "cgrates.org", "attempts": 1, diff --git a/data/conf/samples/tutmongo2_gob/cgrates.json b/data/conf/samples/tutmongo2_gob/cgrates.json index 93a6f2275..11ee54b4f 100644 --- a/data/conf/samples/tutmongo2_gob/cgrates.json +++ b/data/conf/samples/tutmongo2_gob/cgrates.json @@ -68,7 +68,7 @@ "type": "*amqp_json_map", "export_path": "amqp://guest:guest@localhost:5672/", "opts": { - "queueID":"cgrates_cdrs", + "amqpQueueID":"cgrates_cdrs", }, "tenant": "cgrates.org", "attempts": 1, diff --git a/engine/libcdre.go b/engine/libcdre.go index fea7034b3..acb4a07c8 100644 --- a/engine/libcdre.go +++ b/engine/libcdre.go @@ -59,7 +59,10 @@ func writeFailedPosts(itmID string, value interface{}) { func AddFailedPost(expPath, format, module string, ev interface{}, opts map[string]interface{}) { key := utils.ConcatenatedKey(expPath, format, module) // also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id - if qID := utils.FirstNonEmpty(utils.IfaceAsString(opts[utils.QueueID]), + if qID := utils.FirstNonEmpty( + utils.IfaceAsString(opts[utils.AMQPQueueID]), + utils.IfaceAsString(opts[utils.S3Bucket]), + utils.IfaceAsString(opts[utils.SQSQueueID]), utils.IfaceAsString(opts[utils.KafkaTopic])); len(qID) != 0 { key = utils.ConcatenatedKey(key, qID) } diff --git a/engine/libcdre_test.go b/engine/libcdre_test.go index fb93f3df8..c027d4eee 100644 --- a/engine/libcdre_test.go +++ b/engine/libcdre_test.go @@ -61,7 +61,7 @@ func TestAddFldPost(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) } AddFailedPost("path1", "format1", "module1", "2", make(map[string]interface{})) - AddFailedPost("path2", "format2", "module2", "3", map[string]interface{}{utils.QueueID: "qID"}) + AddFailedPost("path2", "format2", "module2", "3", map[string]interface{}{utils.SQSQueueID: "qID"}) x, ok = failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1")) if !ok { t.Error("Error reading from cache") @@ -99,7 +99,7 @@ func TestAddFldPost(t *testing.T) { Format: "format2", module: "module2", Events: []interface{}{"3"}, - Opts: map[string]interface{}{utils.QueueID: "qID"}, + Opts: map[string]interface{}{utils.SQSQueueID: "qID"}, } if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) diff --git a/engine/poster_test.go b/engine/poster_test.go index fc89aa3ee..687e669b0 100644 --- a/engine/poster_test.go +++ b/engine/poster_test.go @@ -37,7 +37,7 @@ func TestAMQPPosterParseURL(t *testing.T) { routingKey: "CGRCDR", } opts := map[string]interface{}{ - "queueID": "q1", + "amqpQueueID": "q1", "sqlExchange": "E1", "amqpRoutingKey": "CGRCDR", "sqlExchangeType": "fanout", diff --git a/engine/pstr_amqp.go b/engine/pstr_amqp.go index 172f6240e..0d107649a 100644 --- a/engine/pstr_amqp.go +++ b/engine/pstr_amqp.go @@ -53,7 +53,7 @@ type AMQPPoster struct { func (pstr *AMQPPoster) parseOpts(dialURL map[string]interface{}) { pstr.queueID = utils.DefaultQueueID pstr.routingKey = utils.DefaultQueueID - if vals, has := dialURL[utils.QueueID]; has { + if vals, has := dialURL[utils.AMQPQueueID]; has { pstr.queueID = utils.IfaceAsString(vals) } if vals, has := dialURL[utils.RoutingKey]; has { diff --git a/engine/pstr_amqpv1.go b/engine/pstr_amqpv1.go index 7fc2efbc0..e6e22d511 100644 --- a/engine/pstr_amqpv1.go +++ b/engine/pstr_amqpv1.go @@ -35,7 +35,7 @@ func NewAMQPv1Poster(dialURL string, attempts int, opts map[string]interface{}) queueID: "/" + utils.DefaultQueueID, attempts: attempts, } - if vals, has := opts[utils.QueueID]; has { + if vals, has := opts[utils.AMQPQueueID]; has { pstr.queueID = "/" + utils.IfaceAsString(vals) } return pstr diff --git a/engine/pstr_s3.go b/engine/pstr_s3.go index 4589a1121..0c4c328dd 100644 --- a/engine/pstr_s3.go +++ b/engine/pstr_s3.go @@ -60,7 +60,7 @@ func (pstr *S3Poster) Close() {} func (pstr *S3Poster) parseOpts(opts map[string]interface{}) { pstr.queueID = utils.DefaultQueueID - if val, has := opts[utils.QueueID]; has { + if val, has := opts[utils.S3Bucket]; has { pstr.queueID = utils.IfaceAsString(val) } if val, has := opts[utils.AWSFolderPath]; has { diff --git a/engine/pstr_sqs.go b/engine/pstr_sqs.go index e840c1975..05965db50 100644 --- a/engine/pstr_sqs.go +++ b/engine/pstr_sqs.go @@ -60,7 +60,7 @@ func (pstr *SQSPoster) Close() {} func (pstr *SQSPoster) parseOpts(opts map[string]interface{}) { pstr.queueID = utils.DefaultQueueID - if val, has := opts[utils.QueueID]; has { + if val, has := opts[utils.SQSQueueID]; has { pstr.queueID = utils.IfaceAsString(val) } if val, has := opts[utils.AWSRegion]; has { diff --git a/engine/z_poster_it_test.go b/engine/z_poster_it_test.go index 2c4382cc8..b0d87be68 100644 --- a/engine/z_poster_it_test.go +++ b/engine/z_poster_it_test.go @@ -145,10 +145,10 @@ func TestSQSPoster(t *testing.T) { qname := "cgrates-cdrs" opts := map[string]interface{}{ - "awsRegion": region, - "awsKey": awsKey, - "awsSecret": awsSecret, - "queueID": qname, + "awsRegion": region, + "awsKey": awsKey, + "awsSecret": awsSecret, + "s3BucketID": qname, } //##################################### @@ -224,10 +224,10 @@ func TestS3Poster(t *testing.T) { qname := "cgrates-cdrs" opts := map[string]interface{}{ - "awsRegion": region, - "awsKey": awsKey, - "awsSecret": awsSecret, - "queueID": qname, + "awsRegion": region, + "awsKey": awsKey, + "awsSecret": awsSecret, + "s3BucketID": qname, } //##################################### @@ -281,7 +281,7 @@ func TestAMQPv1Poster(t *testing.T) { endpoint := "amqps://RootManageSharedAccessKey:UlfIJ%2But11L0ZzA%2Fgpje8biFJeQihpWibJsUhaOi1DU%3D@cdrscgrates.servicebus.windows.net" qname := "cgrates-cdrs" opts := map[string]interface{}{ - "queueID": qname, + "amqpQueueID": qname, } //##################################### diff --git a/ers/amqp.go b/ers/amqp.go index 4c5c99dc1..401df31d6 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -210,7 +210,7 @@ func (rdr *AMQPER) processMessage(msg []byte) (err error) { func (rdr *AMQPER) setOpts(opts map[string]interface{}) { rdr.queueID = utils.DefaultQueueID - if vals, has := opts[utils.QueueID]; has { + if vals, has := opts[utils.AMQPQueueID]; has { rdr.queueID = utils.IfaceAsString(vals) } rdr.tag = utils.AMQPDefaultConsumerTag diff --git a/ers/amqp_it_test.go b/ers/amqp_it_test.go index 17e8a6448..43c09db47 100644 --- a/ers/amqp_it_test.go +++ b/ers/amqp_it_test.go @@ -44,7 +44,7 @@ func TestAMQPER(t *testing.T) { "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited "source_path": "amqp://guest:guest@localhost:5672/",// read data from this path "opts": { - "queueID": "cdrs3", + "amqpQueueID": "cdrs3", "consumerTag": "test-key", "sqlExchange": "test-exchange", "sqlExchangeType": "direct", diff --git a/ers/amqp_test.go b/ers/amqp_test.go index ebe4d585a..6e4ae8203 100644 --- a/ers/amqp_test.go +++ b/ers/amqp_test.go @@ -30,7 +30,7 @@ func TestAMQPSetOpts(t *testing.T) { queueID: "cdrs", tag: "new", } - if k.setOpts(map[string]interface{}{"queueID": "cdrs", "consumerTag": "new"}); expKafka.dialURL != k.dialURL { + if k.setOpts(map[string]interface{}{"amqpQueueID": "cdrs", "consumerTag": "new"}); expKafka.dialURL != k.dialURL { t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) } else if expKafka.queueID != k.queueID { t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID) diff --git a/ers/amqpv1.go b/ers/amqpv1.go index 0577bc159..1c1297796 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -49,7 +49,7 @@ func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int, rdr.cap <- struct{}{} } } - if vals, has := rdr.Config().Opts[utils.QueueID]; has { + if vals, has := rdr.Config().Opts[utils.AMQPQueueID]; has { rdr.queueID = "/" + utils.IfaceAsString(vals) } rdr.createPoster() diff --git a/ers/amqpv1_it_test.go b/ers/amqpv1_it_test.go index cd9763064..b359e39e2 100644 --- a/ers/amqpv1_it_test.go +++ b/ers/amqpv1_it_test.go @@ -53,7 +53,7 @@ func TestAMQPERv1(t *testing.T) { "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited "source_path": "amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net",// read data from this path "opts": { - "queueID": "cdrs3", + "amqpQueueID": "cdrs3", }, "processed_path": "", // move processed data here "tenant": "cgrates.org", // tenant used by import diff --git a/ers/s3.go b/ers/s3.go index feffe7927..e44270753 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -145,7 +145,7 @@ func (rdr *S3ER) processMessage(body []byte) (err error) { func (rdr *S3ER) parseOpts(opts map[string]interface{}) { rdr.queueID = utils.DefaultQueueID - if val, has := opts[utils.QueueID]; has { + if val, has := opts[utils.S3Bucket]; has { rdr.queueID = utils.IfaceAsString(val) } if val, has := opts[utils.AWSRegion]; has { diff --git a/ers/s3_it_test.go b/ers/s3_it_test.go index c4290d7cb..aff9d1a4d 100644 --- a/ers/s3_it_test.go +++ b/ers/s3_it_test.go @@ -60,7 +60,7 @@ func TestS3ER(t *testing.T) { "filters": [], // limit parsing based on the filters "flags": [], // flags to influence the event processing "opts": { - "queueID": "cgrates-cdrs", + "s3BucketID": "cgrates-cdrs", "awsRegion": "us-east-2", "awsKey": "AWSAccessKeyId", "awsSecret": "AWSSecretKey", diff --git a/ers/s3_test.go b/ers/s3_test.go index 2fcffde35..1751f9563 100644 --- a/ers/s3_test.go +++ b/ers/s3_test.go @@ -226,14 +226,14 @@ func TestS3ERParseOpts(t *testing.T) { } opts := map[string]interface{}{ - utils.QueueID: "QueueID", + utils.S3Bucket: "s3BucketID", utils.AWSRegion: "AWSRegion", utils.AWSKey: "AWSKey", utils.AWSSecret: "AWSSecret", utils.AWSToken: "AWSToken", } rdr.parseOpts(opts) - if rdr.queueID != opts[utils.QueueID] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] { + if rdr.queueID != opts[utils.S3Bucket] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] { t.Error("Fields do not corespond") } rdr.Config().Opts = map[string]interface{}{} diff --git a/ers/sqs.go b/ers/sqs.go index 2285a10ac..1daf31b2e 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -133,7 +133,7 @@ func (rdr *SQSER) processMessage(body []byte) (err error) { func (rdr *SQSER) parseOpts(opts map[string]interface{}) { rdr.queueID = utils.DefaultQueueID - if val, has := opts[utils.QueueID]; has { + if val, has := opts[utils.SQSQueueID]; has { rdr.queueID = utils.IfaceAsString(val) } if val, has := opts[utils.AWSRegion]; has { diff --git a/ers/sqs_it_test.go b/ers/sqs_it_test.go index aaf80f194..d24a63cd1 100644 --- a/ers/sqs_it_test.go +++ b/ers/sqs_it_test.go @@ -59,7 +59,7 @@ func TestSQSER(t *testing.T) { "filters": [], // limit parsing based on the filters "flags": [], // flags to influence the event processing "opts": { - "queueID": "cgrates-cdrs", + "sqsQueueID": "cgrates-cdrs", "awsRegion": "us-east-2", "awsKey": "AWSAccessKeyId", "awsSecret": "AWSSecretKey", diff --git a/ers/sqs_test.go b/ers/sqs_test.go index 60d4af929..3515be490 100644 --- a/ers/sqs_test.go +++ b/ers/sqs_test.go @@ -285,14 +285,14 @@ func TestSQSERParseOpts(t *testing.T) { } opts := map[string]interface{}{ - utils.QueueID: "QueueID", - utils.AWSRegion: "AWSRegion", - utils.AWSKey: "AWSKey", - utils.AWSSecret: "AWSSecret", - utils.AWSToken: "AWSToken", + utils.SQSQueueID: "SQSQueueID", + utils.AWSRegion: "AWSRegion", + utils.AWSKey: "AWSKey", + utils.AWSSecret: "AWSSecret", + utils.AWSToken: "AWSToken", } rdr.parseOpts(opts) - if rdr.queueID != opts[utils.QueueID] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] { + if rdr.queueID != opts[utils.SQSQueueID] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] { t.Error("Fields do not corespond") } rdr.Config().Opts = map[string]interface{}{} diff --git a/utils/consts.go b/utils/consts.go index a1d8edcea..e081f6829 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2542,13 +2542,13 @@ const ( // General constants for posters DefaultQueueID = "cgrates_cdrs" - QueueID = "queueID" DefaultExchangeType = "direct" Exchange = "sqlExchange" ExchangeType = "sqlExchangeType" RoutingKey = "amqpRoutingKey" // for ers: + AMQPQueueID = "amqpQueueID" AMQPDefaultConsumerTag = "cgrates" AMQPConsumerTag = "consumerTag" @@ -2564,6 +2564,9 @@ const ( SQLDefaultDBName = "cgrates" ProcessedOpt = "Processed" + + S3Bucket = "s3BucketID" + SQSQueueID = "SQSQueueID" ) // Analyzers constants