Finished renaming opts for Event Exporter

This commit is contained in:
andronache
2021-05-10 12:34:03 +03:00
committed by Dan Christian Bogos
parent d20f8dac7b
commit 99a50502f7
15 changed files with 72 additions and 73 deletions

View File

@@ -377,16 +377,16 @@ const CGRATES_CFG_JSON = `
// AMQP
// "consumerTag": "cgrates", // the ID of the consumer
// "exchange": "",
// "exchangeType": "",
// "routingKey": "",
// "sqlExchange": "",
// "sqlExchangeType": "",
// "amqpRoutingKey": "",
// "exchangeProcessed": "",
// "exchangeTypeProcessed": "",
// "routingKeyProcessed": "",
// Kafka
// "topic": "cgrates", // the topic from were the events are read
// "kafkaTopic": "cgrates", // the topic from were the events are read
// "groupID": "cgrates", // the group that reads the events
// "maxWait": "1ms", // the maximum amount of time to wait for new data to come
@@ -394,11 +394,11 @@ const CGRATES_CFG_JSON = `
// SQL
// "dbName": "cgrates", // the name of the database from were the events are read
// "sqlDBName": "cgrates", // the name of the database from were the events are read
// "sqlTableName": "cdrs", // the name of the table from were the events are read
// "sslmode": "disable", // the postgresSSLMode for postgres db
// "dbNameProcessed": "", // the name of the database were the events are sent after they are processed
// "sqlDBNameProcessed": "", // the name of the database were the events are sent after they are processed: "", // the name of the database were the events are sent after they are processed
// "sqlTableNameProcessed": "", // the name of the table were the events are sent after they are processed
// "sslmodeProcessed": "", // the postgresSSLMode for postgres db
@@ -479,19 +479,19 @@ const CGRATES_CFG_JSON = `
// "sqlMaxConnLifetime": 0, // SQLMaxConnLifetime
// "sqlTableName":"cdrs", // the name of the table from where the events are exported
// "dbName": "cgrates", // the name of the database from where the events are exported
// "sqlTableName":"cdrs", // the name of the table from where the events are exported
// "sqlDBName": "cgrates", // the name of the database from where the events are exported
// "sslmode": "disable", // the postgresSSLMode for postgres
// Kafka
// "topic": "cgrates", // the topic from where the events are exported
// "kafkaTopic": "cgrates", // the topic from where the events are exported
// AMQP
// "routingKey": "", // RoutingKey
// "exchange": "", // Exchange
// "exchangeType": "", // ExchangeType
// "amqpRoutingKey": "", // RoutingKey
// "sqlExchange": "", // Exchange
// "sqlExchangeType": "", // ExchangeType
// SQS and S3

View File

@@ -356,16 +356,16 @@
// // AMQP
// // "consumerTag": "cgrates", // the ID of the consumer
// // "exchange": "",
// // "exchangeType": "",
// // "routingKey": "",
// // "sqlExchange": "",
// // "sqlExchangeType": "",
// // "amqpRoutingKey": "",
// // "exchangeProcessed": "",
// // "exchangeTypeProcessed": "",
// // "routingKeyProcessed": "",
// // Kafka
// // "topic": "cgrates", // the topic from were the events are read
// // "kafkaTopic": "cgrates", // the topic from were the events are read
// // "groupID": "cgrates", // the group that reads the events
// // "maxWait": "1ms", // the maximum amount of time to wait for new data to come
@@ -373,11 +373,11 @@
// // SQL
// // "dbName": "cgrates", // the name of the database from were the events are read
// // "sqlDBName": "cgrates", // the name of the database from were the events are read
// // "sqlTableName": "cdrs", // the name of the table from were the events are read
// // "sslmode": "disable", // the ssl mode for postgres db
// // "dbNameProcessed": "", // the name of the database were the events are sent after they are processed
// // "sqlDBNameProcessed": "", // the name of the database were the events are sent after they are processed
// // "sqlTableNameProcessed": "", // the name of the table were the events are sent after they are processed
// // "sslmodeProcessed": "", // the ssl mode for postgres db

View File

@@ -65,9 +65,9 @@
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID": "cgrates_cdrs",
"exchange": "exchangename",
"exchangeType": "fanout",
"routingKey": "cgr_cdrs",
"sqlExchange": "exchangename",
"sqlExchangeType": "fanout",
"amqpRoutingKey": "cgr_cdrs",
},
"tenant": "cgrates.org",
"attempts": 20,
@@ -111,7 +111,7 @@
"type": "*kafka_json_map",
"export_path": "127.0.0.1:9092",
"opts":{
"topic": "cgrates_cdrs",
"kafkaTopic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 10,

View File

@@ -70,9 +70,9 @@
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID": "cgrates_cdrs",
"exchange": "exchangename",
"exchangeType": "fanout",
"routingKey": "cgr_cdrs",
"sqlExchange": "exchangename",
"sqlExchangeType": "fanout",
"amqpRoutingKey": "cgr_cdrs",
},
"tenant": "cgrates.org",
"attempts": 20,
@@ -116,7 +116,7 @@
"type": "*kafka_json_map",
"export_path": "127.0.0.1:9092",
"opts":{
"topic": "cgrates_cdrs",
"kafkaTopic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 10,

View File

@@ -67,9 +67,9 @@
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID": "cgrates_cdrs",
"exchange": "exchangename",
"exchangeType": "fanout",
"routingKey": "cgr_cdrs",
"sqlExchange": "exchangename",
"sqlExchangeType": "fanout",
"amqpRoutingKey": "cgr_cdrs",
},
"tenant": "cgrates.org",
"attempts": 20,
@@ -112,7 +112,7 @@
"type": "*kafka_json_map",
"export_path": "127.0.0.1:9092",
"opts": {
"topic": "cgrates_cdrs",
"kafkaTopic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 10,

View File

@@ -79,9 +79,9 @@
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID": "cgrates_cdrs",
"exchange": "exchangename",
"exchangeType": "fanout",
"routingKey": "cgr_cdrs",
"sqlExchange": "exchangename",
"sqlExchangeType": "fanout",
"amqpRoutingKey": "cgr_cdrs",
},
"tenant": "cgrates.org",
"attempts": 3,
@@ -118,7 +118,7 @@
"type": "*kafka_json_map",
"export_path": "localhost:9092",
"opts": {
"topic": "cgrates_cdrs",
"kafkaTopic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"synchronous": true,

View File

@@ -77,9 +77,9 @@
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID": "cgrates_cdrs",
"exchange": "exchangename",
"exchangeType": "fanout",
"routingKey": "cgr_cdrs",
"sqlExchange": "exchangename",
"sqlExchangeType": "fanout",
"amqpRoutingKey": "cgr_cdrs",
},
"tenant": "cgrates.org",
"attempts": 3,
@@ -116,7 +116,7 @@
"type": "*kafka_json_map",
"export_path": "localhost:9092",
"opts": {
"topic": "cgrates_cdrs",
"kafkaTopic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"synchronous": true,

View File

@@ -394,7 +394,7 @@
"export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306",
"attempts": 1,
"opts": {
"dbName": "exportedDatabase", // if dbName is not present "cgrates" will be used as default
"sqlDBName": "exportedDatabase", // if dbName is not present "cgrates" will be used as default
"sqlTableName": "expTable", // tableName is mandatory in opts for sql exporter
"sslmode": "disable",
"sqlMaxIdleConns": "10",
@@ -415,7 +415,7 @@
"export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306",
"attempts": 1,
"opts": {
"dbName": "exportedDatabase",
"sqlDBName": "exportedDatabase",
"sqlTableName": "expTable",
"sslmode": "disable",
"sqlMaxIdleConns": "10",

View File

@@ -37,10 +37,10 @@ func TestAMQPPosterParseURL(t *testing.T) {
routingKey: "CGRCDR",
}
opts := map[string]interface{}{
"queueID": "q1",
"exchange": "E1",
"routingKey": "CGRCDR",
"exchangeType": "fanout",
"queueID": "q1",
"sqlExchange": "E1",
"amqpRoutingKey": "CGRCDR",
"sqlExchangeType": "fanout",
}
amqp.parseOpts(opts)
if !reflect.DeepEqual(expected, amqp) {
@@ -55,7 +55,7 @@ func TestKafkaParseURL(t *testing.T) {
topic: "cdr_billing",
attempts: 10,
}
if kfk := NewKafkaPoster(u, 10, map[string]interface{}{"topic": "cdr_billing"}); !reflect.DeepEqual(exp, kfk) {
if kfk := NewKafkaPoster(u, 10, map[string]interface{}{"kafkaTopic": "cdr_billing"}); !reflect.DeepEqual(exp, kfk) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(kfk))
}
u = "localhost:9092"
@@ -64,7 +64,7 @@ func TestKafkaParseURL(t *testing.T) {
topic: "cdr_billing",
attempts: 10,
}
if kfk := NewKafkaPoster(u, 10, map[string]interface{}{"topic": "cdr_billing"}); !reflect.DeepEqual(exp, kfk) {
if kfk := NewKafkaPoster(u, 10, map[string]interface{}{"kafkaTopic": "cdr_billing"}); !reflect.DeepEqual(exp, kfk) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(kfk))
}
}

View File

@@ -46,9 +46,9 @@ func TestAMQPER(t *testing.T) {
"opts": {
"queueID": "cdrs3",
"consumerTag": "test-key",
"exchange": "test-exchange",
"exchangeType": "direct",
"routingKey": "test-key",
"sqlExchange": "test-exchange",
"sqlExchangeType": "direct",
"amqpRoutingKey": "test-key",
},
"processed_path": "", // move processed data here
"tenant": "cgrates.org", // tenant used by import

View File

@@ -38,9 +38,9 @@ func TestKafkasetOpts(t *testing.T) {
maxWait: time.Second,
}
if err := k.setOpts(map[string]interface{}{
"topic": "cdrs",
"groupID": "new",
"maxWait": "1s",
"kafkaTopic": "cdrs",
"groupID": "new",
"maxWait": "1s",
}); err != nil {
t.Fatal(err)
} else if expKafka.dialURL != k.dialURL {
@@ -81,9 +81,9 @@ func TestKafkasetOpts(t *testing.T) {
maxWait: time.Second,
}
if err := k.setOpts(map[string]interface{}{
"topic": "cdrs",
"groupID": "new",
"maxWait": "1s",
"kafkaTopic": "cdrs",
"groupID": "new",
"maxWait": "1s",
}); err != nil {
t.Fatal(err)
} else if expKafka.dialURL != k.dialURL {

View File

@@ -328,7 +328,6 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts map[string]interfac
rdr.expTableName = utils.CDRsTBL
if vals, has := processedOpt[utils.SQLTableName]; has {
rdr.expTableName = utils.IfaceAsString(vals)
}
switch rdr.expConnType {

View File

@@ -98,8 +98,8 @@ func testSQLInitConfig(t *testing.T) {
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", // read data from this path
"opts": {
"dbName":"cgrates2",
"dbNameProcessed":"cgrates2",
"sqlDBName":"cgrates2",
"sqlDBNameProcessed":"cgrates2",
"sqlTableNameProcessed":"cdrs2",
},
"processed_path": "", // move processed data here
@@ -375,8 +375,8 @@ func testSQLInitConfig2(t *testing.T) {
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", // read data from this path
"opts": {
"dbName":"cgrates2",
"dbNameProcessed":"cgrates2",
"sqlDBName":"cgrates2",
"sqlDBNameProcessed":"cgrates2",
"sqlTableNameProcessed":"cdrs2",
},
"processed_path": "", // move processed data here

View File

@@ -41,11 +41,11 @@ func TestSQLSetURL(t *testing.T) {
inURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306"
outURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306"
if err := sql.setURL(inURL, outURL, map[string]interface{}{
"dbName": "cgrates2",
utils.SQLTableName: "cdrs2",
"sslmode": "enabled",
"sqlDBName": "cgrates2",
"sqlTableName": "cdrs2",
"sslmode": "enabled",
"dbNameProcessed": "cgrates3",
"sqlDBNameProcessed": "cgrates3",
"sqlTableNameProcessed": "cdrs3",
"sslmodeProcessed": "enabled",
}); err != nil {
@@ -76,11 +76,11 @@ func TestSQLSetURL(t *testing.T) {
inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306"
outURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306"
if err := sql.setURL(inURL, outURL, map[string]interface{}{
"dbName": "cgrates2",
"sqlDBName": "cgrates2",
"sqlTableName": "cdrs2",
"sslmode": "enabled",
"dbNameProcessed": "cgrates3",
"sqlDBNameProcessed": "cgrates3",
"sqlTableNameProcessed": "cdrs3",
"sslmodeProcessed": "enabled",
}); err != nil {
@@ -111,11 +111,11 @@ func TestSQLSetURL(t *testing.T) {
inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306"
outURL = ""
if err := sql.setURL(inURL, outURL, map[string]interface{}{
"dbName": "cgrates2",
"sqlDBName": "cgrates2",
"sqlTableName": "cdrs2",
"sslmode": "enabled",
"dbNameProcessed": "cgrates2",
"sqlDBNameProcessed": "cgrates2",
"sqlTableNameProcessed": "cdrs2",
"sslmodeProcessed": "enabled",
}); err != nil {

View File

@@ -2536,7 +2536,7 @@ const (
AWSSecret = "awsSecret"
AWSToken = "awsToken"
AWSFolderPath = "s3FolderPath"
KafkaTopic = "topic"
KafkaTopic = "kafkaTopic"
KafkaGroupID = "groupID"
KafkaMaxWait = "maxWait"
@@ -2544,9 +2544,9 @@ const (
DefaultQueueID = "cgrates_cdrs"
QueueID = "queueID"
DefaultExchangeType = "direct"
Exchange = "exchange"
ExchangeType = "exchangeType"
RoutingKey = "routingKey"
Exchange = "sqlExchange"
ExchangeType = "sqlExchangeType"
RoutingKey = "amqpRoutingKey"
// for ers:
AMQPDefaultConsumerTag = "cgrates"
@@ -2558,7 +2558,7 @@ const (
KafkaDefaultGroupID = "cgrates"
KafkaDefaultMaxWait = time.Millisecond
SQLDBName = "dbName"
SQLDBName = "sqlDBName"
SQLTableName = "sqlTableName"
SQLDefaultSSLMode = "disable"
SQLDefaultDBName = "cgrates"