Finished changing opts for Event Exporter

This commit is contained in:
andronache
2021-05-10 15:59:12 +03:00
committed by Dan Christian Bogos
parent 99a50502f7
commit a6b8af0e96
32 changed files with 94 additions and 89 deletions

View File

@@ -2149,7 +2149,7 @@ func testApierReplayFldPosts(t *testing.T) {
ev = &engine.ExportEvents{
Path: "amqp://guest:guest@localhost:5672/",
Opts: map[string]interface{}{
"queueID": "cgrates_cdrs",
"amqpQueueID": "cgrates_cdrs",
},
Format: utils.MetaAMQPjsonMap,
Events: []interface{}{bev},

View File

@@ -451,15 +451,11 @@ const CGRATES_CFG_JSON = `
"type": "*none", // exporter type
"export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed
"opts": {
// General
// "queueID": "cgrates_cdrs", // the queue id where events are exported
// CSV
// "csvFieldSeparator": ",", // separator used when reading the fields
// Elasticsearch options
// "elsIndex": "", // ElsIndex
// "elsIfPrimaryTerm": 0, // ElsIfPrimaryTerm
@@ -487,14 +483,17 @@ const CGRATES_CFG_JSON = `
// Kafka
// "kafkaTopic": "cgrates", // the topic from where the events are exported
// AMQP
// "amqpRoutingKey": "", // RoutingKey
// "sqlExchange": "", // Exchange
// "sqlExchangeType": "", // ExchangeType
// "amqpQueueID": "cgrates_cdrs", // the queue id for AMQP exporters from were the events are exported
// "amqpRoutingKey": "", // RoutingKey
// "sqlExchange": "", // Exchange
// "sqlExchangeType": "", // ExchangeType
// SQS and S3
// "sqsQueueID": "cgrates_cdrs", // the queue id for SQS exporters from were the events are exported
// "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from where the events that are exported
// "awsRegion": "", // AWSRegion
// "awsKey": "", // AWSKey
// "awsSecret": "", // AWSSecret

View File

@@ -11,12 +11,12 @@
},
"data_db": {
"db_type": "*internal",
"db_type": "*internal"
},
"stor_db": {
"db_type": "*internal",
"db_type": "*internal"
},
@@ -26,13 +26,13 @@
"attributes": {
"enabled": true,
"enabled": true
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
"attributes_conns": ["*internal"]
},
@@ -56,37 +56,37 @@
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"},
],
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
]
},
{
"id": "amqp_localhost",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID": "cgrates_cdrs",
"amqpQueueID": "cgrates_cdrs",
"sqlExchange": "exchangename",
"sqlExchangeType": "fanout",
"amqpRoutingKey": "cgr_cdrs",
"amqpRoutingKey": "cgr_cdrs"
},
"tenant": "cgrates.org",
"attempts": 20,
"fields":[
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"},
],
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
]
},
{
"id": "aws_test_file",
"type": "*amqpv1_json_map",
"export_path": "amqps://guest:guest@localhost:95672/",
"opts": {
"queueID": "cgrates_cdrs",
"amqpQueueID": "cgrates_cdrs"
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"},
],
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
]
},
{
"id": "sqs_test_file",
@@ -98,26 +98,26 @@
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
"SQSQueueID": "cgrates-cdrs"
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"},
],
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
]
},
{
"id": "kafka_localhost",
"type": "*kafka_json_map",
"export_path": "127.0.0.1:9092",
"opts":{
"kafkaTopic": "cgrates_cdrs",
"kafkaTopic": "cgrates_cdrs"
},
"tenant": "cgrates.org",
"attempts": 10,
"fields":[
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"},
],
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
]
},
{
"id": "s3_test_file",
@@ -129,15 +129,15 @@
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
"s3BucketID": "cgrates-cdrs"
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"},
],
},
],
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
]
}
]
},
@@ -159,8 +159,8 @@
"apiers": {
"enabled": true,
},
"enabled": true
}
}

View File

@@ -69,7 +69,7 @@
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID": "cgrates_cdrs",
"amqpQueueID": "cgrates_cdrs",
"sqlExchange": "exchangename",
"sqlExchangeType": "fanout",
"amqpRoutingKey": "cgr_cdrs",
@@ -85,7 +85,7 @@
"type": "*amqpv1_json_map",
"export_path": "amqps://guest:guest@localhost:95672/",
"opts": {
"queueID": "cgrates_cdrs",
"amqpQueueID": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
@@ -103,7 +103,7 @@
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
"SQSQueueID": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
@@ -134,7 +134,7 @@
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
"s3BucketID": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,

View File

@@ -66,7 +66,7 @@
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID": "cgrates_cdrs",
"amqpQueueID": "cgrates_cdrs",
"sqlExchange": "exchangename",
"sqlExchangeType": "fanout",
"amqpRoutingKey": "cgr_cdrs",
@@ -82,7 +82,7 @@
"type": "*amqpv1_json_map",
"export_path": "amqps://guest:guest@localhost:95672/",
"opts": {
"queueID": "cgrates_cdrs",
"amqpQueueID": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
@@ -99,7 +99,7 @@
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
"SQSQueueID": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
@@ -129,7 +129,7 @@
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
"s3BucketID": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,

View File

@@ -78,7 +78,7 @@
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID": "cgrates_cdrs",
"amqpQueueID": "cgrates_cdrs",
"sqlExchange": "exchangename",
"sqlExchangeType": "fanout",
"amqpRoutingKey": "cgr_cdrs",
@@ -132,7 +132,7 @@
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@wrongurl:25672/",
"opts": {
"queueID": "cgrates_cdrs",
"amqpQueueID": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"filters":["*string:~*ec.Cost:100"],

View File

@@ -76,7 +76,7 @@
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID": "cgrates_cdrs",
"amqpQueueID": "cgrates_cdrs",
"sqlExchange": "exchangename",
"sqlExchangeType": "fanout",
"amqpRoutingKey": "cgr_cdrs",
@@ -130,7 +130,7 @@
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@wrongurl:25672/",
"opts": {
"queueID": "cgrates_cdrs",
"amqpQueueID": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"filters":["*string:~*ec.Cost:100"],

View File

@@ -76,7 +76,7 @@
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
"s3BucketID": "cgrates-cdrs",
// "awsToken": "sessionToken",
},
"tenant": "cgrates.org",

View File

@@ -85,7 +85,7 @@
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
"s3BucketID": "cgrates-cdrs",
// "awsToken": "sessionToken",
},
"tenant": "cgrates.org",

View File

@@ -82,7 +82,7 @@
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
"s3BucketID": "cgrates-cdrs",
// "awsToken": "sessionToken",
},
"tenant": "cgrates.org",

View File

@@ -61,7 +61,7 @@
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID":"cgrates_cdrs",
"amqpQueueID":"cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,

View File

@@ -68,7 +68,7 @@
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queueID":"cgrates_cdrs",
"amqpQueueID":"cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,

View File

@@ -59,7 +59,10 @@ func writeFailedPosts(itmID string, value interface{}) {
func AddFailedPost(expPath, format, module string, ev interface{}, opts map[string]interface{}) {
key := utils.ConcatenatedKey(expPath, format, module)
// also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id
if qID := utils.FirstNonEmpty(utils.IfaceAsString(opts[utils.QueueID]),
if qID := utils.FirstNonEmpty(
utils.IfaceAsString(opts[utils.AMQPQueueID]),
utils.IfaceAsString(opts[utils.S3Bucket]),
utils.IfaceAsString(opts[utils.SQSQueueID]),
utils.IfaceAsString(opts[utils.KafkaTopic])); len(qID) != 0 {
key = utils.ConcatenatedKey(key, qID)
}

View File

@@ -61,7 +61,7 @@ func TestAddFldPost(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
}
AddFailedPost("path1", "format1", "module1", "2", make(map[string]interface{}))
AddFailedPost("path2", "format2", "module2", "3", map[string]interface{}{utils.QueueID: "qID"})
AddFailedPost("path2", "format2", "module2", "3", map[string]interface{}{utils.SQSQueueID: "qID"})
x, ok = failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1"))
if !ok {
t.Error("Error reading from cache")
@@ -99,7 +99,7 @@ func TestAddFldPost(t *testing.T) {
Format: "format2",
module: "module2",
Events: []interface{}{"3"},
Opts: map[string]interface{}{utils.QueueID: "qID"},
Opts: map[string]interface{}{utils.SQSQueueID: "qID"},
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))

View File

@@ -37,7 +37,7 @@ func TestAMQPPosterParseURL(t *testing.T) {
routingKey: "CGRCDR",
}
opts := map[string]interface{}{
"queueID": "q1",
"amqpQueueID": "q1",
"sqlExchange": "E1",
"amqpRoutingKey": "CGRCDR",
"sqlExchangeType": "fanout",

View File

@@ -53,7 +53,7 @@ type AMQPPoster struct {
func (pstr *AMQPPoster) parseOpts(dialURL map[string]interface{}) {
pstr.queueID = utils.DefaultQueueID
pstr.routingKey = utils.DefaultQueueID
if vals, has := dialURL[utils.QueueID]; has {
if vals, has := dialURL[utils.AMQPQueueID]; has {
pstr.queueID = utils.IfaceAsString(vals)
}
if vals, has := dialURL[utils.RoutingKey]; has {

View File

@@ -35,7 +35,7 @@ func NewAMQPv1Poster(dialURL string, attempts int, opts map[string]interface{})
queueID: "/" + utils.DefaultQueueID,
attempts: attempts,
}
if vals, has := opts[utils.QueueID]; has {
if vals, has := opts[utils.AMQPQueueID]; has {
pstr.queueID = "/" + utils.IfaceAsString(vals)
}
return pstr

View File

@@ -60,7 +60,7 @@ func (pstr *S3Poster) Close() {}
func (pstr *S3Poster) parseOpts(opts map[string]interface{}) {
pstr.queueID = utils.DefaultQueueID
if val, has := opts[utils.QueueID]; has {
if val, has := opts[utils.S3Bucket]; has {
pstr.queueID = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSFolderPath]; has {

View File

@@ -60,7 +60,7 @@ func (pstr *SQSPoster) Close() {}
func (pstr *SQSPoster) parseOpts(opts map[string]interface{}) {
pstr.queueID = utils.DefaultQueueID
if val, has := opts[utils.QueueID]; has {
if val, has := opts[utils.SQSQueueID]; has {
pstr.queueID = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSRegion]; has {

View File

@@ -145,10 +145,10 @@ func TestSQSPoster(t *testing.T) {
qname := "cgrates-cdrs"
opts := map[string]interface{}{
"awsRegion": region,
"awsKey": awsKey,
"awsSecret": awsSecret,
"queueID": qname,
"awsRegion": region,
"awsKey": awsKey,
"awsSecret": awsSecret,
"s3BucketID": qname,
}
//#####################################
@@ -224,10 +224,10 @@ func TestS3Poster(t *testing.T) {
qname := "cgrates-cdrs"
opts := map[string]interface{}{
"awsRegion": region,
"awsKey": awsKey,
"awsSecret": awsSecret,
"queueID": qname,
"awsRegion": region,
"awsKey": awsKey,
"awsSecret": awsSecret,
"s3BucketID": qname,
}
//#####################################
@@ -281,7 +281,7 @@ func TestAMQPv1Poster(t *testing.T) {
endpoint := "amqps://RootManageSharedAccessKey:UlfIJ%2But11L0ZzA%2Fgpje8biFJeQihpWibJsUhaOi1DU%3D@cdrscgrates.servicebus.windows.net"
qname := "cgrates-cdrs"
opts := map[string]interface{}{
"queueID": qname,
"amqpQueueID": qname,
}
//#####################################

View File

@@ -210,7 +210,7 @@ func (rdr *AMQPER) processMessage(msg []byte) (err error) {
func (rdr *AMQPER) setOpts(opts map[string]interface{}) {
rdr.queueID = utils.DefaultQueueID
if vals, has := opts[utils.QueueID]; has {
if vals, has := opts[utils.AMQPQueueID]; has {
rdr.queueID = utils.IfaceAsString(vals)
}
rdr.tag = utils.AMQPDefaultConsumerTag

View File

@@ -44,7 +44,7 @@ func TestAMQPER(t *testing.T) {
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "amqp://guest:guest@localhost:5672/",// read data from this path
"opts": {
"queueID": "cdrs3",
"amqpQueueID": "cdrs3",
"consumerTag": "test-key",
"sqlExchange": "test-exchange",
"sqlExchangeType": "direct",

View File

@@ -30,7 +30,7 @@ func TestAMQPSetOpts(t *testing.T) {
queueID: "cdrs",
tag: "new",
}
if k.setOpts(map[string]interface{}{"queueID": "cdrs", "consumerTag": "new"}); expKafka.dialURL != k.dialURL {
if k.setOpts(map[string]interface{}{"amqpQueueID": "cdrs", "consumerTag": "new"}); expKafka.dialURL != k.dialURL {
t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL)
} else if expKafka.queueID != k.queueID {
t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID)

View File

@@ -49,7 +49,7 @@ func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int,
rdr.cap <- struct{}{}
}
}
if vals, has := rdr.Config().Opts[utils.QueueID]; has {
if vals, has := rdr.Config().Opts[utils.AMQPQueueID]; has {
rdr.queueID = "/" + utils.IfaceAsString(vals)
}
rdr.createPoster()

View File

@@ -53,7 +53,7 @@ func TestAMQPERv1(t *testing.T) {
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "amqps://RootManageSharedAccessKey:Je8l%2Bt9tyOgZbdA%2B5SmGIJEsEzhZ9VdIO7yRke5EYtM%3D@test0123456y.servicebus.windows.net",// read data from this path
"opts": {
"queueID": "cdrs3",
"amqpQueueID": "cdrs3",
},
"processed_path": "", // move processed data here
"tenant": "cgrates.org", // tenant used by import

View File

@@ -145,7 +145,7 @@ func (rdr *S3ER) processMessage(body []byte) (err error) {
func (rdr *S3ER) parseOpts(opts map[string]interface{}) {
rdr.queueID = utils.DefaultQueueID
if val, has := opts[utils.QueueID]; has {
if val, has := opts[utils.S3Bucket]; has {
rdr.queueID = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSRegion]; has {

View File

@@ -60,7 +60,7 @@ func TestS3ER(t *testing.T) {
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
"opts": {
"queueID": "cgrates-cdrs",
"s3BucketID": "cgrates-cdrs",
"awsRegion": "us-east-2",
"awsKey": "AWSAccessKeyId",
"awsSecret": "AWSSecretKey",

View File

@@ -226,14 +226,14 @@ func TestS3ERParseOpts(t *testing.T) {
}
opts := map[string]interface{}{
utils.QueueID: "QueueID",
utils.S3Bucket: "s3BucketID",
utils.AWSRegion: "AWSRegion",
utils.AWSKey: "AWSKey",
utils.AWSSecret: "AWSSecret",
utils.AWSToken: "AWSToken",
}
rdr.parseOpts(opts)
if rdr.queueID != opts[utils.QueueID] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] {
if rdr.queueID != opts[utils.S3Bucket] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] {
t.Error("Fields do not corespond")
}
rdr.Config().Opts = map[string]interface{}{}

View File

@@ -133,7 +133,7 @@ func (rdr *SQSER) processMessage(body []byte) (err error) {
func (rdr *SQSER) parseOpts(opts map[string]interface{}) {
rdr.queueID = utils.DefaultQueueID
if val, has := opts[utils.QueueID]; has {
if val, has := opts[utils.SQSQueueID]; has {
rdr.queueID = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSRegion]; has {

View File

@@ -59,7 +59,7 @@ func TestSQSER(t *testing.T) {
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
"opts": {
"queueID": "cgrates-cdrs",
"sqsQueueID": "cgrates-cdrs",
"awsRegion": "us-east-2",
"awsKey": "AWSAccessKeyId",
"awsSecret": "AWSSecretKey",

View File

@@ -285,14 +285,14 @@ func TestSQSERParseOpts(t *testing.T) {
}
opts := map[string]interface{}{
utils.QueueID: "QueueID",
utils.AWSRegion: "AWSRegion",
utils.AWSKey: "AWSKey",
utils.AWSSecret: "AWSSecret",
utils.AWSToken: "AWSToken",
utils.SQSQueueID: "SQSQueueID",
utils.AWSRegion: "AWSRegion",
utils.AWSKey: "AWSKey",
utils.AWSSecret: "AWSSecret",
utils.AWSToken: "AWSToken",
}
rdr.parseOpts(opts)
if rdr.queueID != opts[utils.QueueID] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] {
if rdr.queueID != opts[utils.SQSQueueID] || rdr.awsRegion != opts[utils.AWSRegion] || rdr.awsID != opts[utils.AWSKey] || rdr.awsKey != opts[utils.AWSSecret] || rdr.awsToken != opts[utils.AWSToken] {
t.Error("Fields do not corespond")
}
rdr.Config().Opts = map[string]interface{}{}

View File

@@ -2542,13 +2542,13 @@ const (
// General constants for posters
DefaultQueueID = "cgrates_cdrs"
QueueID = "queueID"
DefaultExchangeType = "direct"
Exchange = "sqlExchange"
ExchangeType = "sqlExchangeType"
RoutingKey = "amqpRoutingKey"
// for ers:
AMQPQueueID = "amqpQueueID"
AMQPDefaultConsumerTag = "cgrates"
AMQPConsumerTag = "consumerTag"
@@ -2564,6 +2564,9 @@ const (
SQLDefaultDBName = "cgrates"
ProcessedOpt = "Processed"
S3Bucket = "s3BucketID"
SQSQueueID = "SQSQueueID"
)
// Analyzers constants