From 6c10cf0960c48a439408fd9fc2432a51d0dc63fd Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 2 Sep 2020 17:07:42 +0300 Subject: [PATCH] Added Opts to ERs --- apier/v1/apier_it_test.go | 5 +- config/config_defaults.go | 1 + config/erscfg.go | 47 ++++--- config/libconfig_json.go | 1 + .../samples/cdrsexport_internal/cgrates.json | 40 +++++- .../samples/cdrsexport_mongo/cgrates.json | 40 +++++- .../samples/cdrsexport_mysql/cgrates.json | 38 +++++- .../cdrsreplicationmaster.json | 48 +++++-- .../cdrsreplicationmaster.json | 48 +++++-- .../cdrsv_failover_internal/cgrates.json | 12 +- .../samples/cdrsv_failover_mongo/cgrates.json | 12 +- .../samples/cdrsv_failover_mysql/cgrates.json | 12 +- data/conf/samples/tutmongo2/cgrates.json | 5 +- data/conf/samples/tutmongo2_gob/cgrates.json | 5 +- ees/httpjsonmap.go | 3 +- ees/httppost.go | 3 +- engine/action.go | 76 +---------- engine/libcdre.go | 59 +++++---- engine/libcdre_test.go | 11 +- engine/poster.go | 122 ------------------ engine/poster_test.go | 28 ++-- engine/z_poster_it_test.go | 46 ++++--- ers/amqp.go | 61 +++------ ers/amqp_test.go | 23 +--- ers/kafka.go | 44 +++---- general_tests/poster_it_test.go | 16 +-- guardian/guardian_test.go | 6 +- migrator/storage_mongo_stordb.go | 4 +- migrator/thresholds_test.go | 2 +- 29 files changed, 375 insertions(+), 443 deletions(-) diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index 0d2e37e0e..caf5be9ae 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -1946,7 +1946,10 @@ func testApierReplayFldPosts(t *testing.T) { bev = []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`) fileInPath := path.Join(*args.FailedRequestsInDir, fileName) ev = &engine.ExportEvents{ - Path: "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs", + Path: "amqp://guest:guest@localhost:5672/", + Opts: map[string]interface{}{ + "queue_id": "cgrates_cdrs", + }, Format: utils.MetaAMQPjsonMap, Events: []interface{}{bev}, } diff --git a/config/config_defaults.go b/config/config_defaults.go index d65cc613b..d2d0aad74 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -338,6 +338,7 @@ const CGRATES_CFG_JSON = ` "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited "source_path": "/var/spool/cgrates/ers/in", // read data from this path "processed_path": "/var/spool/cgrates/ers/out", // move processed data here + "opts": {}, "xml_root_path": "", // path towards one event in case of XML CDRs "tenant": "", // tenant used by import "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> diff --git a/config/erscfg.go b/config/erscfg.go index 75f8700e4..6c5afe452 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -58,27 +58,27 @@ func (ers *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, msgTempla return } for _, jsnReader := range *jsnReaders { - rdr := new(EventReaderCfg) - if dfltRdrCfg != nil { - rdr = dfltRdrCfg.Clone() - } - var haveID bool + var rdr *EventReaderCfg if jsnReader.Id != nil { for _, reader := range ers.Readers { if reader.ID == *jsnReader.Id { rdr = reader - haveID = true break } } } - + if rdr == nil { + if dfltRdrCfg != nil { + rdr = dfltRdrCfg.Clone() + } else { + rdr = new(EventReaderCfg) + rdr.Opts = make(map[string]interface{}) + } + ers.Readers = append(ers.Readers, rdr) + } if err := rdr.loadFromJsonCfg(jsnReader, msgTemplates, sep); err != nil { return err } - if !haveID { - ers.Readers = append(ers.Readers, rdr) - } } return nil @@ -121,6 +121,7 @@ type EventReaderCfg struct { ConcurrentReqs int SourcePath string ProcessedPath string + Opts map[string]interface{} XmlRootPath utils.HierarchyPath Tenant RSRParsers Timezone string @@ -222,16 +223,18 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, msgTemplat //Clone itself into a new EventReaderCfg func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) { - cln = new(EventReaderCfg) - cln.ID = er.ID - cln.Type = er.Type - cln.FieldSep = er.FieldSep - cln.HeaderDefineChar = er.HeaderDefineChar - cln.RunDelay = er.RunDelay - cln.ConcurrentReqs = er.ConcurrentReqs - cln.SourcePath = er.SourcePath - cln.ProcessedPath = er.ProcessedPath - cln.XmlRootPath = er.XmlRootPath + cln = &EventReaderCfg{ + ID: er.ID, + Type: er.Type, + FieldSep: er.FieldSep, + HeaderDefineChar: er.HeaderDefineChar, + RunDelay: er.RunDelay, + ConcurrentReqs: er.ConcurrentReqs, + SourcePath: er.SourcePath, + ProcessedPath: er.ProcessedPath, + XmlRootPath: er.XmlRootPath, + Opts: make(map[string]interface{}), + } if len(er.Tenant) != 0 { cln.Tenant = make(RSRParsers, len(er.Tenant)) for idx, val := range er.Tenant { @@ -256,6 +259,9 @@ func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) { for idx, fld := range er.CacheDumpFields { cln.CacheDumpFields[idx] = fld.Clone() } + for k, v := range er.Opts { + cln.Opts[k] = v + } return } @@ -315,5 +321,6 @@ func (er *EventReaderCfg) AsMapInterface(separator string) map[string]interface{ utils.PartialCacheExpiryActionCfg: er.PartialCacheExpiryAction, utils.FieldsCfg: fields, utils.CacheDumpFieldsCfg: cacheDumpFields, + utils.OptsCfg: er.Opts, } } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index fed0cb812..44ca7196a 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -170,6 +170,7 @@ type EventReaderJsonCfg struct { Concurrent_requests *int Source_path *string Processed_path *string + Opts map[string]interface{} Xml_root_path *string Tenant *string Timezone *string diff --git a/data/conf/samples/cdrsexport_internal/cgrates.json b/data/conf/samples/cdrsexport_internal/cgrates.json index f5f410dbd..6448b8962 100644 --- a/data/conf/samples/cdrsexport_internal/cgrates.json +++ b/data/conf/samples/cdrsexport_internal/cgrates.json @@ -62,7 +62,13 @@ { "id": "amqp_localhost", "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs", + "export_path": "amqp://guest:guest@localhost:5672/", + "opts": { + "queue_id": "cgrates_cdrs", + "exchange": "exchangename", + "exchange_type": "fanout", + "routing_key": "cgr_cdrs", + }, "tenant": "cgrates.org", "attempts": 3, "fields":[ @@ -72,7 +78,10 @@ { "id": "aws_test_file", "type": "*amqpv1_json_map", - "export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "export_path": "amqps://guest:guest@localhost:25672/", + "opts": { + "queue_id": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -82,8 +91,15 @@ { "id": "sqs_test_file", "type": "*sqs_json_map", - // export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for sqs: "endpoint" + "export_path": "http://sqs.eu-west-2.amazonaws.com/", + "opts": { + // posible options for sqs: + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -93,7 +109,10 @@ { "id": "kafka_localhost", "type": "*kafka_json_map", - "export_path": "127.0.0.1:9092?topic=cgrates_cdrs", + "export_path": "127.0.0.1:9092", + "opts":{ + "topic": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -103,8 +122,15 @@ { "id": "s3_test_file", "type": "*s3_json_map", - // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for s3: "endpoint" + "export_path": "http://s3.us-east-2.amazonaws.com/", + "opts": { + // posible options for s3: + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ diff --git a/data/conf/samples/cdrsexport_mongo/cgrates.json b/data/conf/samples/cdrsexport_mongo/cgrates.json index 56ff2a55d..bace92af8 100644 --- a/data/conf/samples/cdrsexport_mongo/cgrates.json +++ b/data/conf/samples/cdrsexport_mongo/cgrates.json @@ -67,7 +67,13 @@ { "id": "amqp_localhost", "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs", + "export_path": "amqp://guest:guest@localhost:5672/", + "opts": { + "queue_id": "cgrates_cdrs", + "exchange": "exchangename", + "exchange_type": "fanout", + "routing_key": "cgr_cdrs", + }, "tenant": "cgrates.org", "attempts": 3, "fields":[ @@ -77,7 +83,10 @@ { "id": "aws_test_file", "type": "*amqpv1_json_map", - "export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "export_path": "amqps://guest:guest@localhost:25672/", + "opts": { + "queue_id": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -87,8 +96,15 @@ { "id": "sqs_test_file", "type": "*sqs_json_map", - // export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for sqs: "endpoint" + "export_path": "http://sqs.eu-west-2.amazonaws.com/", + "opts": { + // posible options for sqs: + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -98,7 +114,10 @@ { "id": "kafka_localhost", "type": "*kafka_json_map", - "export_path": "127.0.0.1:9092?topic=cgrates_cdrs", + "export_path": "127.0.0.1:9092", + "opts":{ + "topic": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -108,8 +127,15 @@ { "id": "s3_test_file", "type": "*s3_json_map", - // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for s3: "endpoint" + "export_path": "http://s3.us-east-2.amazonaws.com/", + "opts": { + // posible options for s3: + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ diff --git a/data/conf/samples/cdrsexport_mysql/cgrates.json b/data/conf/samples/cdrsexport_mysql/cgrates.json index 1746e82ff..e6083493e 100644 --- a/data/conf/samples/cdrsexport_mysql/cgrates.json +++ b/data/conf/samples/cdrsexport_mysql/cgrates.json @@ -64,7 +64,13 @@ { "id": "amqp_localhost", "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs", + "export_path": "amqp://guest:guest@localhost:5672/", + "opts": { + "queue_id": "cgrates_cdrs", + "exchange": "exchangename", + "exchange_type": "fanout", + "routing_key": "cgr_cdrs", + }, "tenant": "cgrates.org", "attempts": 3, "fields":[ @@ -74,7 +80,10 @@ { "id": "aws_test_file", "type": "*amqpv1_json_map", - "export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "export_path": "amqps://guest:guest@localhost:25672/", + "opts": { + "queue_id": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -84,8 +93,14 @@ { "id": "sqs_test_file", "type": "*sqs_json_map", - // export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for sqs: "endpoint" + "export_path": "http://sqs.eu-west-2.amazonaws.com/", + "opts": { + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -95,7 +110,10 @@ { "id": "kafka_localhost", "type": "*kafka_json_map", - "export_path": "127.0.0.1:9092?topic=cgrates_cdrs", + "export_path": "127.0.0.1:9092", + "opts": { + "topic": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -105,8 +123,14 @@ { "id": "s3_test_file", "type": "*s3_json_map", - // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for s3: "endpoint" + "export_path": "http://s3.us-east-2.amazonaws.com/", + "opts": { + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ diff --git a/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json b/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json index 0f96b0aed..fa34ca752 100644 --- a/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json +++ b/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json @@ -73,7 +73,13 @@ { "id": "amqp_localhost", "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs", + "export_path": "amqp://guest:guest@localhost:5672/", + "opts": { + "queue_id": "cgrates_cdrs", + "exchange": "exchangename", + "exchange_type": "fanout", + "routing_key": "cgr_cdrs", + }, "tenant": "cgrates.org", "attempts": 3, "fields":[ @@ -106,7 +112,10 @@ { "id": "aws_test_file", "type": "*amqpv1_json_map", - "export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "export_path": "amqps://guest:guest@localhost:25672/", + "opts": { + "queue_id": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -116,8 +125,14 @@ { "id": "sqs_test_file", "type": "*sqs_json_map", - // export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for sqs: "endpoint" + "export_path": "http://sqs.eu-west-2.amazonaws.com/", + "opts": { + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -127,7 +142,10 @@ { "id": "amqp_test_file", "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "export_path": "amqp://guest:guest@localhost:25672/", + "opts": { + "queue_id": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -137,7 +155,10 @@ { "id": "kafka_localhost", "type": "*kafka_json_map", - "export_path": "localhost:9092?topic=cgrates_cdrs", + "export_path": "localhost:9092", + "opts": { + "topic": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -147,8 +168,14 @@ { "id": "s3_test_file", "type": "*s3_json_map", - // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for s3: "endpoint" + "export_path": "http://s3.us-east-2.amazonaws.com/", + "opts": { + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -158,7 +185,10 @@ { "id": "eventcost_filter", "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@wrongurl:25672/?queue_id=cgrates_cdrs", + "export_path": "amqp://guest:guest@wrongurl:25672/", + "opts": { + "queue_id": "cgrates_cdrs", + }, "tenant": "cgrates.org", "filters":["*string:~*ec.Cost:100"], "attempts": 1, diff --git a/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json b/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json index 3854801c3..dffd6e744 100644 --- a/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json +++ b/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json @@ -72,7 +72,13 @@ { "id": "amqp_localhost", "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs", + "export_path": "amqp://guest:guest@localhost:5672/", + "opts": { + "queue_id": "cgrates_cdrs", + "exchange": "exchangename", + "exchange_type": "fanout", + "routing_key": "cgr_cdrs", + }, "tenant": "cgrates.org", "attempts": 3, "fields":[ @@ -105,7 +111,10 @@ { "id": "aws_test_file", "type": "*amqpv1_json_map", - "export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "export_path": "amqps://guest:guest@localhost:25672/", + "opts": { + "queue_id": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -115,8 +124,14 @@ { "id": "sqs_test_file", "type": "*sqs_json_map", - // export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for sqs: "endpoint" + "export_path": "http://sqs.eu-west-2.amazonaws.com/", + "opts": { + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -126,7 +141,10 @@ { "id": "amqp_test_file", "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "export_path": "amqp://guest:guest@localhost:25672/", + "opts": { + "queue_id": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -136,7 +154,10 @@ { "id": "kafka_localhost", "type": "*kafka_json_map", - "export_path": "localhost:9092?topic=cgrates_cdrs", + "export_path": "localhost:9092", + "opts": { + "topic": "cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -146,8 +167,14 @@ { "id": "s3_test_file", "type": "*s3_json_map", - // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for s3: "endpoint" + "export_path": "http://s3.us-east-2.amazonaws.com/", + "opts": { + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ @@ -157,7 +184,10 @@ { "id": "eventcost_filter", "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@wrongurl:25672/?queue_id=cgrates_cdrs", + "export_path": "amqp://guest:guest@wrongurl:25672/", + "opts": { + "queue_id": "cgrates_cdrs", + }, "tenant": "cgrates.org", "filters":["*string:~*ec.Cost:100"], "attempts": 1, diff --git a/data/conf/samples/cdrsv_failover_internal/cgrates.json b/data/conf/samples/cdrsv_failover_internal/cgrates.json index 8f1758b70..b6d0bde59 100644 --- a/data/conf/samples/cdrsv_failover_internal/cgrates.json +++ b/data/conf/samples/cdrsv_failover_internal/cgrates.json @@ -68,8 +68,16 @@ { "id": "s3_test_file", "type": "*s3_json_map", - // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for s3: "endpoint" + "export_path": "http://s3.us-east-2.amazonaws.com/", + "opts": { + // posible options for s3: + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + // "aws_token": "sessionToken", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ diff --git a/data/conf/samples/cdrsv_failover_mongo/cgrates.json b/data/conf/samples/cdrsv_failover_mongo/cgrates.json index 49c4e09af..a823bf61d 100644 --- a/data/conf/samples/cdrsv_failover_mongo/cgrates.json +++ b/data/conf/samples/cdrsv_failover_mongo/cgrates.json @@ -77,8 +77,16 @@ { "id": "s3_test_file", "type": "*s3_json_map", - // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for s3: "endpoint" + "export_path": "http://s3.us-east-2.amazonaws.com/", + "opts": { + // posible options for s3: + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + // "aws_token": "sessionToken", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ diff --git a/data/conf/samples/cdrsv_failover_mysql/cgrates.json b/data/conf/samples/cdrsv_failover_mysql/cgrates.json index b765bb847..d252c0992 100644 --- a/data/conf/samples/cdrsv_failover_mysql/cgrates.json +++ b/data/conf/samples/cdrsv_failover_mysql/cgrates.json @@ -74,8 +74,16 @@ { "id": "s3_test_file", "type": "*s3_json_map", - // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + // export_path for s3: "endpoint" + "export_path": "http://s3.us-east-2.amazonaws.com/", + "opts": { + // posible options for s3: + "aws_region": "eu-west-2", + "aws_key": "testkey", + "aws_secret": "testsecret", + "queue_id": "cgrates-cdrs", + // "aws_token": "sessionToken", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ diff --git a/data/conf/samples/tutmongo2/cgrates.json b/data/conf/samples/tutmongo2/cgrates.json index 66d1bb417..e4277c733 100644 --- a/data/conf/samples/tutmongo2/cgrates.json +++ b/data/conf/samples/tutmongo2/cgrates.json @@ -59,7 +59,10 @@ { "id": "amqp_localhost", "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs", + "export_path": "amqp://guest:guest@localhost:5672/", + "opts": { + "queue_id":"cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ diff --git a/data/conf/samples/tutmongo2_gob/cgrates.json b/data/conf/samples/tutmongo2_gob/cgrates.json index 13af37029..0d752ccdd 100644 --- a/data/conf/samples/tutmongo2_gob/cgrates.json +++ b/data/conf/samples/tutmongo2_gob/cgrates.json @@ -66,7 +66,10 @@ { "id": "amqp_localhost", "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs", + "export_path": "amqp://guest:guest@localhost:5672/", + "opts": { + "queue_id":"cgrates_cdrs", + }, "tenant": "cgrates.org", "attempts": 1, "fields":[ diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index abd417e62..b06454e05 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -131,7 +131,8 @@ func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) { if err = pstrEE.poster.Post(body, utils.ConcatenatedKey(cgrID, runID)); err != nil && pstrEE.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE { engine.AddFailedPost(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ExportPath, - pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Type, utils.EventExporterS, body) + pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Type, utils.EventExporterS, body, + pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Opts) } return } diff --git a/ees/httppost.go b/ees/httppost.go index b3b12fa3b..754b8df90 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -103,7 +103,8 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { if err = httpPost.httpPoster.PostValues(urlVals); err != nil && httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE { engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath, - httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, urlVals) + httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, urlVals, + httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Opts) } return } diff --git a/engine/action.go b/engine/action.go index b20adacca..3a4e61296 100644 --- a/engine/action.go +++ b/engine/action.go @@ -99,11 +99,6 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { utils.SetExpiry: setExpiryAction, utils.MetaPublishAccount: publishAccount, utils.MetaPublishBalance: publishBalance, - utils.MetaAMQPjsonMap: sendAMQP, - utils.MetaAMQPV1jsonMap: sendAWS, - utils.MetaSQSjsonMap: sendSQS, - utils.MetaKafkajsonMap: sendKafka, - utils.MetaS3jsonMap: sendS3, utils.MetaRemoveSessionCosts: removeSessionCosts, utils.MetaRemoveExpired: removeExpired, utils.MetaPostEvent: postEvent, @@ -384,71 +379,6 @@ func getOneData(ub *Account, extraData interface{}) ([]byte, error) { return nil, nil } -func sendAMQP(ub *Account, a *Action, acs Actions, extraData interface{}) error { - body, err := getOneData(ub, extraData) - if err != nil { - return err - } - err = PostersCache.PostAMQP(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body) - if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - AddFailedPost(a.ExtraParameters, utils.MetaAMQPjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) - err = nil - } - return err -} - -func sendAWS(ub *Account, a *Action, acs Actions, extraData interface{}) error { - body, err := getOneData(ub, extraData) - if err != nil { - return err - } - err = PostersCache.PostAMQPv1(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body) - if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - AddFailedPost(a.ExtraParameters, utils.MetaAMQPV1jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) - err = nil - } - return err -} - -func sendSQS(ub *Account, a *Action, acs Actions, extraData interface{}) error { - body, err := getOneData(ub, extraData) - if err != nil { - return err - } - err = PostersCache.PostSQS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body) - if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - AddFailedPost(a.ExtraParameters, utils.MetaSQSjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) - err = nil - } - return err -} - -func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error { - body, err := getOneData(ub, extraData) - if err != nil { - return err - } - err = PostersCache.PostKafka(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix()) - if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - AddFailedPost(a.ExtraParameters, utils.MetaKafkajsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) - err = nil - } - return err -} - -func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error { - body, err := getOneData(ub, extraData) - if err != nil { - return err - } - err = PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix()) - if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - AddFailedPost(a.ExtraParameters, utils.MetaS3jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) - err = nil - } - return err -} - func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error { body, err := getOneData(ub, extraData) if err != nil { @@ -462,7 +392,7 @@ func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error { } err = pstr.PostValues(body) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) + AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{})) err = nil } return err @@ -483,7 +413,7 @@ func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) er go func() { err := pstr.PostValues(body) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) + AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{})) } }() return nil @@ -1042,7 +972,7 @@ func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error } err = pstr.PostValues(body) if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE { - AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body) + AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{})) err = nil } return err diff --git a/engine/libcdre.go b/engine/libcdre.go index a3a7fed35..98a929c9d 100644 --- a/engine/libcdre.go +++ b/engine/libcdre.go @@ -58,8 +58,13 @@ func writeFailedPosts(itmID string, value interface{}) { return } -func AddFailedPost(expPath, format, module string, ev interface{}) { +func AddFailedPost(expPath, format, module string, ev interface{}, opts map[string]interface{}) { key := utils.ConcatenatedKey(expPath, format, module) + // also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id + if qID := utils.FirstNonEmpty(utils.IfaceAsString(opts[QueueID]), + utils.IfaceAsString(opts[utils.KafkaTopic])); len(qID) != 0 { + key = utils.ConcatenatedKey(key, qID) + } var failedPost *ExportEvents if x, ok := failedPostCache.Get(key); ok { if x != nil { @@ -70,6 +75,7 @@ func AddFailedPost(expPath, format, module string, ev interface{}) { failedPost = &ExportEvents{ Path: expPath, Format: format, + Opts: opts, module: module, } } @@ -101,6 +107,7 @@ func NewExportEventsFromFile(filePath string) (expEv *ExportEvents, err error) { type ExportEvents struct { lk sync.RWMutex Path string + Opts map[string]interface{} Format string Events []interface{} module string @@ -142,8 +149,11 @@ func (expEv *ExportEvents) AddEvent(ev interface{}) { func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *ExportEvents, err error) { failedEvents = &ExportEvents{ Path: expEv.Path, + Opts: expEv.Opts, Format: expEv.Format, } + var pstr Poster + keyFunc := func() string { return utils.EmptyString } switch expEv.Format { case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.MetaHTTPPost: var pstr *HTTPPoster @@ -160,42 +170,31 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export failedEvents.AddEvent(ev) } } + if len(failedEvents.Events) > 0 { + err = utils.ErrPartiallyExecuted + } else { + failedEvents = nil + } + return case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: - for _, ev := range expEv.Events { - err = PostersCache.PostAMQP(expEv.Path, attempts, ev.([]byte)) - if err != nil { - failedEvents.AddEvent(ev) - } - } + pstr = NewAMQPPoster(expEv.Path, attempts, expEv.Opts) case utils.MetaAMQPV1jsonMap: - for _, ev := range expEv.Events { - err = PostersCache.PostAMQPv1(expEv.Path, attempts, ev.([]byte)) - if err != nil { - failedEvents.AddEvent(ev) - } - } + pstr = NewAMQPv1Poster(expEv.Path, attempts, expEv.Opts) case utils.MetaSQSjsonMap: - for _, ev := range expEv.Events { - err = PostersCache.PostSQS(expEv.Path, attempts, ev.([]byte)) - if err != nil { - failedEvents.AddEvent(ev) - } - } + pstr = NewSQSPoster(expEv.Path, attempts, expEv.Opts) case utils.MetaKafkajsonMap: - for _, ev := range expEv.Events { - err = PostersCache.PostKafka(expEv.Path, attempts, ev.([]byte), utils.UUIDSha1Prefix()) - if err != nil { - failedEvents.AddEvent(ev) - } - } + pstr = NewKafkaPoster(expEv.Path, attempts, expEv.Opts) + keyFunc = utils.UUIDSha1Prefix case utils.MetaS3jsonMap: - for _, ev := range expEv.Events { - err = PostersCache.PostS3(expEv.Path, attempts, ev.([]byte), utils.UUIDSha1Prefix()) - if err != nil { - failedEvents.AddEvent(ev) - } + pstr = NewS3Poster(expEv.Path, attempts, expEv.Opts) + keyFunc = utils.UUIDSha1Prefix + } + for _, ev := range expEv.Events { + if err = pstr.Post(ev.([]byte), keyFunc()); err != nil { + failedEvents.AddEvent(ev) } } + pstr.Close() if len(failedEvents.Events) > 0 { err = utils.ErrPartiallyExecuted } else { diff --git a/engine/libcdre_test.go b/engine/libcdre_test.go index c454a87a3..f83482f85 100644 --- a/engine/libcdre_test.go +++ b/engine/libcdre_test.go @@ -37,7 +37,7 @@ func TestSetFldPostCacheTTL(t *testing.T) { func TestAddFldPost(t *testing.T) { SetFailedPostCacheTTL(time.Duration(5 * time.Second)) - AddFailedPost("path1", "format1", "module1", "1") + AddFailedPost("path1", "format1", "module1", "1", make(map[string]interface{})) x, ok := failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1")) if !ok { t.Error("Error reading from cache") @@ -55,12 +55,13 @@ func TestAddFldPost(t *testing.T) { Format: "format1", module: "module1", Events: []interface{}{"1"}, + Opts: make(map[string]interface{}), } if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) } - AddFailedPost("path1", "format1", "module1", "2") - AddFailedPost("path2", "format2", "module2", "3") + AddFailedPost("path1", "format1", "module1", "2", make(map[string]interface{})) + AddFailedPost("path2", "format2", "module2", "3", map[string]interface{}{QueueID: "qID"}) x, ok = failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1")) if !ok { t.Error("Error reading from cache") @@ -77,11 +78,12 @@ func TestAddFldPost(t *testing.T) { Format: "format1", module: "module1", Events: []interface{}{"1", "2"}, + Opts: make(map[string]interface{}), } if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) } - x, ok = failedPostCache.Get(utils.ConcatenatedKey("path2", "format2", "module2")) + x, ok = failedPostCache.Get(utils.ConcatenatedKey("path2", "format2", "module2", "qID")) if !ok { t.Error("Error reading from cache") } @@ -97,6 +99,7 @@ func TestAddFldPost(t *testing.T) { Format: "format2", module: "module2", Events: []interface{}{"3"}, + Opts: map[string]interface{}{QueueID: "qID"}, } if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) diff --git a/engine/poster.go b/engine/poster.go index 687d32fe8..af21f6673 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -18,10 +18,6 @@ along with this program. If not, see package engine -import ( - "sync" -) - // General constants for posters const ( DefaultQueueID = "cgrates_cdrs" @@ -35,125 +31,7 @@ const ( folderPath = "folder_path" ) -func init() { - PostersCache = &PosterCache{ - amqpCache: make(map[string]Poster), - amqpv1Cache: make(map[string]Poster), - sqsCache: make(map[string]Poster), - kafkaCache: make(map[string]Poster), - s3Cache: make(map[string]Poster), - } // Initialize the cache for amqpPosters -} - -var PostersCache *PosterCache - -type PosterCache struct { - sync.Mutex - amqpCache map[string]Poster - amqpv1Cache map[string]Poster - sqsCache map[string]Poster - kafkaCache map[string]Poster - s3Cache map[string]Poster -} - type Poster interface { Post(body []byte, key string) error Close() } - -// Close closes all cached posters -func (pc *PosterCache) Close() { - for _, v := range pc.amqpCache { - v.Close() - } - for _, v := range pc.amqpv1Cache { - v.Close() - } - for _, v := range pc.sqsCache { - v.Close() - } - for _, v := range pc.kafkaCache { - v.Close() - } -} - -// GetAMQPPoster creates a new poster only if not already cached -// uses dialURL as cache key -func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int) (pstr Poster) { - pc.Lock() - defer pc.Unlock() - if _, hasIt := pc.amqpCache[dialURL]; !hasIt { - pstr = NewAMQPPoster(dialURL, attempts, nil) - pc.amqpCache[dialURL] = pstr - } - return pc.amqpCache[dialURL] -} - -// GetAMQPv1Poster creates a new poster only if not already cached -func (pc *PosterCache) GetAMQPv1Poster(dialURL string, attempts int) (pstr Poster) { - pc.Lock() - defer pc.Unlock() - if _, hasIt := pc.amqpv1Cache[dialURL]; !hasIt { - pstr = NewAMQPv1Poster(dialURL, attempts, nil) - pc.amqpv1Cache[dialURL] = pstr - } - return pc.amqpv1Cache[dialURL] -} - -// GetSQSPoster creates a new poster only if not already cached -func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int) (pstr Poster) { - pc.Lock() - defer pc.Unlock() - if _, hasIt := pc.sqsCache[dialURL]; !hasIt { - pstr = NewSQSPoster(dialURL, attempts, nil) - pc.sqsCache[dialURL] = pstr - } - return pc.sqsCache[dialURL] -} - -// GetKafkaPoster creates a new poster only if not already cached -func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int) (pstr Poster) { - pc.Lock() - defer pc.Unlock() - if _, hasIt := pc.kafkaCache[dialURL]; !hasIt { - pstr = NewKafkaPoster(dialURL, attempts, nil) - pc.kafkaCache[dialURL] = pstr - } - return pc.kafkaCache[dialURL] -} - -// GetS3Poster creates a new poster only if not already cached -func (pc *PosterCache) GetS3Poster(dialURL string, attempts int) (pstr Poster) { - pc.Lock() - defer pc.Unlock() - if _, hasIt := pc.s3Cache[dialURL]; !hasIt { - pstr = NewS3Poster(dialURL, attempts, nil) - pc.s3Cache[dialURL] = pstr - } - return pc.s3Cache[dialURL] -} - -func (pc *PosterCache) PostAMQP(dialURL string, attempts int, - content []byte) error { - return pc.GetAMQPPoster(dialURL, attempts).Post(content, "") -} - -func (pc *PosterCache) PostAMQPv1(dialURL string, attempts int, - content []byte) error { - return pc.GetAMQPv1Poster(dialURL, attempts).Post(content, "") -} - -func (pc *PosterCache) PostSQS(dialURL string, attempts int, - content []byte) error { - return pc.GetSQSPoster(dialURL, attempts).Post(content, "") -} - -func (pc *PosterCache) PostKafka(dialURL string, attempts int, - content []byte, key string) error { - return pc.GetKafkaPoster(dialURL, attempts).Post(content, key) -} - -func (pc *PosterCache) PostS3(dialURL string, attempts int, - content []byte, key string) error { - return pc.GetS3Poster(dialURL, attempts).Post(content, key) -} diff --git a/engine/poster_test.go b/engine/poster_test.go index 551081038..1576c476b 100644 --- a/engine/poster_test.go +++ b/engine/poster_test.go @@ -26,7 +26,9 @@ import ( ) func TestAMQPPosterParseURL(t *testing.T) { - amqp := &AMQPPoster{} + amqp := &AMQPPoster{ + dialURL: "amqp://guest:guest@localhost:5672/?heartbeat=5", + } expected := &AMQPPoster{ dialURL: "amqp://guest:guest@localhost:5672/?heartbeat=5", queueID: "q1", @@ -34,35 +36,35 @@ func TestAMQPPosterParseURL(t *testing.T) { exchangeType: "fanout", routingKey: "CGRCDR", } - dialURL := "amqp://guest:guest@localhost:5672/?queue_id=q1&exchange=E1&routing_key=CGRCDR&heartbeat=5&exchange_type=fanout" - if err := amqp.parseOpts(dialURL); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(expected, amqp) { + opts := map[string]interface{}{ + "queue_id": "q1", + "exchange": "E1", + "routing_key": "CGRCDR", + "exchange_type": "fanout", + } + amqp.parseOpts(opts) + if !reflect.DeepEqual(expected, amqp) { t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(expected), utils.ToJSON(amqp)) } } func TestKafkaParseURL(t *testing.T) { - u := "127.0.0.1:9092?topic=cdr_billing" + u := "127.0.0.1:9092" exp := &KafkaPoster{ dialURL: "127.0.0.1:9092", topic: "cdr_billing", attempts: 10, } - if kfk, err := NewKafkaPoster(u, 10, nil); err != nil { - t.Fatal(err) - } else if !reflect.DeepEqual(exp, kfk) { + if kfk := NewKafkaPoster(u, 10, map[string]interface{}{"topic": "cdr_billing"}); !reflect.DeepEqual(exp, kfk) { t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk)) } - u = "localhost:9092?topic=cdr_billing" + u = "localhost:9092" exp = &KafkaPoster{ dialURL: "localhost:9092", topic: "cdr_billing", attempts: 10, } - if kfk, err := NewKafkaPoster(u, 10, nil); err != nil { - t.Fatal(err) - } else if !reflect.DeepEqual(exp, kfk) { + if kfk := NewKafkaPoster(u, 10, map[string]interface{}{"topic": "cdr_billing"}); !reflect.DeepEqual(exp, kfk) { t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk)) } } diff --git a/engine/z_poster_it_test.go b/engine/z_poster_it_test.go index d1bce5301..a75a20ea4 100644 --- a/engine/z_poster_it_test.go +++ b/engine/z_poster_it_test.go @@ -23,7 +23,6 @@ import ( "context" "encoding/json" "flag" - "fmt" "path/filepath" "reflect" "testing" @@ -75,7 +74,7 @@ func TestHttpJsonPoster(t *testing.T) { if err = pstr.PostValues(jsn); err == nil { t.Error("Expected error") } - AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn) + AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn, make(map[string]interface{})) time.Sleep(2) fs, err := filepath.Glob("/tmp/test1*") if err != nil { @@ -108,7 +107,7 @@ func TestHttpBytesPoster(t *testing.T) { if err = pstr.PostValues(content); err == nil { t.Error("Expected error") } - AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content) + AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content, make(map[string]interface{})) time.Sleep(2) fs, err := filepath.Glob("/tmp/test2*") if err != nil { @@ -145,17 +144,18 @@ func TestSQSPoster(t *testing.T) { awsKey := "replace-this-with-your-secret-key" awsSecret := "replace-this-with-your-secret" qname := "cgrates-cdrs" - //##################################### - // export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - dialURL := fmt.Sprintf("%s?aws_region=%s&aws_key=%s&aws_secret=%s&queue_id=%s", endpoint, region, awsKey, awsSecret, qname) + opts := map[string]interface{}{ + "aws_region": region, + "aws_key": awsKey, + "aws_secret": awsSecret, + "queue_id": qname, + } + //##################################### body := "testString" - pstr, err := PostersCache.GetSQSPoster(dialURL, 5) - if err != nil { - t.Fatal(err) - } + pstr := NewSQSPoster(endpoint, 5, opts) if err := pstr.Post([]byte(body), ""); err != nil { t.Fatal(err) } @@ -225,17 +225,18 @@ func TestS3Poster(t *testing.T) { awsKey := "replace-this-with-your-secret-key" awsSecret := "replace-this-with-your-secret" qname := "cgrates-cdrs" - //##################################### - // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" - dialURL := fmt.Sprintf("%s?aws_region=%s&aws_key=%s&aws_secret=%s&queue_id=%s", endpoint, region, awsKey, awsSecret, qname) + opts := map[string]interface{}{ + "aws_region": region, + "aws_key": awsKey, + "aws_secret": awsSecret, + "queue_id": qname, + } + //##################################### body := "testString" key := "key1234" - pstr, err := PostersCache.GetS3Poster(dialURL, 5) - if err != nil { - t.Fatal(err) - } + pstr := NewS3Poster(endpoint, 5, opts) if err := pstr.Post([]byte(body), key); err != nil { t.Fatal(err) } @@ -284,17 +285,14 @@ func TestAMQPv1Poster(t *testing.T) { // update this variables endpoint := "amqps://RootManageSharedAccessKey:UlfIJ%2But11L0ZzA%2Fgpje8biFJeQihpWibJsUhaOi1DU%3D@cdrscgrates.servicebus.windows.net" qname := "cgrates-cdrs" + opts := map[string]interface{}{ + "queue_id": qname, + } //##################################### - // export_path for amqpv1: "amqps://admin:admin@endpoint?queue_id=cgrates_cdrs", - dialURL := fmt.Sprintf("%s?queue_id=%s", endpoint, qname) - body := "testString" - pstr, err := PostersCache.GetAMQPv1Poster(dialURL, 5) - if err != nil { - t.Fatal(err) - } + pstr := NewAMQPv1Poster(endpoint, 5, opts) if err := pstr.Post([]byte(body), ""); err != nil { t.Fatal(err) } diff --git a/ers/amqp.go b/ers/amqp.go index 3c74f4936..64c0216ed 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -21,8 +21,6 @@ package ers import ( "encoding/json" "fmt" - "net/url" - "strings" "time" "github.com/cgrates/cgrates/agents" @@ -41,7 +39,6 @@ const ( func NewAMQPER(cfg *config.CGRConfig, cfgIdx int, rdrEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { - rdr := &AMQPER{ cgrCfg: cfg, cfgIdx: cfgIdx, @@ -56,9 +53,9 @@ func NewAMQPER(cfg *config.CGRConfig, cfgIdx int, rdr.cap <- struct{}{} } } - er = rdr - err = rdr.setURL(rdr.Config().SourcePath) - return + rdr.dialURL = rdr.Config().SourcePath + rdr.setOpts(rdr.Config().Opts) + return rdr, nil } // AMQPER implements EventReader interface for kafka message @@ -171,12 +168,12 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) { utils.ERs, msg.MessageId, err.Error())) } if rdr.Config().ProcessedPath != utils.EmptyString { // post it - if err := engine.PostersCache.PostAMQP(rdr.Config().ProcessedPath, - rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Body); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> writing message %s error: %s", - utils.ERs, msg.MessageId, err.Error())) - } + // if err := engine.PostersCache.PostAMQP(rdr.Config().ProcessedPath, + // rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Body); err != nil { + // utils.Logger.Warning( + // fmt.Sprintf("<%s> writing message %s error: %s", + // utils.ERs, msg.MessageId, err.Error())) + // } } if rdr.Config().ConcurrentReqs != -1 { rdr.cap <- struct{}{} @@ -214,44 +211,26 @@ func (rdr *AMQPER) processMessage(msg []byte) (err error) { return } -func (rdr *AMQPER) setURL(dialURL string) (err error) { - var u *url.URL - if u, err = url.Parse(dialURL); err != nil { - return - } - qry := u.Query() - q := url.Values{} - for _, key := range engine.AMQPPosibleQuery { - if vals, has := qry[key]; has && len(vals) != 0 { - q.Add(key, vals[0]) - } - } - rdr.dialURL = strings.Split(dialURL, "?")[0] - if params := q.Encode(); params != utils.EmptyString { - rdr.dialURL += "?" + params - - } +func (rdr *AMQPER) setOpts(opts map[string]interface{}) { rdr.queueID = engine.DefaultQueueID - if vals, has := qry[engine.QueueID]; has && len(vals) != 0 { - rdr.queueID = vals[0] + if vals, has := opts[engine.QueueID]; has { + rdr.queueID = utils.IfaceAsString(vals) } rdr.tag = defaultConsumerTag - if vals, has := qry[consumerTag]; has && len(vals) != 0 { - rdr.tag = vals[0] + if vals, has := opts[consumerTag]; has { + rdr.tag = utils.IfaceAsString(vals) } - if vals, has := qry[engine.RoutingKey]; has && len(vals) != 0 { - rdr.routingKey = vals[0] + if vals, has := opts[engine.RoutingKey]; has { + rdr.routingKey = utils.IfaceAsString(vals) } - if vals, has := qry[engine.Exchange]; has && len(vals) != 0 { - rdr.exchange = vals[0] + if vals, has := opts[engine.Exchange]; has { + rdr.exchange = utils.IfaceAsString(vals) rdr.exchangeType = engine.DefaultExchangeType } - if vals, has := qry[engine.ExchangeType]; has && len(vals) != 0 { - rdr.exchangeType = vals[0] + if vals, has := opts[engine.ExchangeType]; has { + rdr.exchangeType = utils.IfaceAsString(vals) } - - return nil } func (rdr *AMQPER) close() (err error) { diff --git a/ers/amqp_test.go b/ers/amqp_test.go index 7e4823c86..7489e20dc 100644 --- a/ers/amqp_test.go +++ b/ers/amqp_test.go @@ -22,17 +22,15 @@ import ( "testing" ) -func TestAMQPSetURL(t *testing.T) { +func TestAMQPSetOpts(t *testing.T) { k := new(AMQPER) + k.dialURL = "amqp://localhost:2013" expKafka := &AMQPER{ dialURL: "amqp://localhost:2013", queueID: "cdrs", tag: "new", } - url := "amqp://localhost:2013?queue_id=cdrs&consumer_tag=new" - if err := k.setURL(url); err != nil { - t.Fatal(err) - } else if expKafka.dialURL != k.dialURL { + if k.setOpts(map[string]interface{}{"queue_id": "cdrs", "consumer_tag": "new"}); expKafka.dialURL != k.dialURL { t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) } else if expKafka.queueID != k.queueID { t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID) @@ -40,28 +38,17 @@ func TestAMQPSetURL(t *testing.T) { t.Errorf("Expected: %s ,received: %s", expKafka.tag, k.tag) } k = new(AMQPER) + k.dialURL = "amqp://localhost:2013" expKafka = &AMQPER{ dialURL: "amqp://localhost:2013", queueID: "cgrates_cdrs", tag: "cgrates", } - url = "amqp://localhost:2013" - if err := k.setURL(url); err != nil { - t.Fatal(err) - } else if expKafka.dialURL != k.dialURL { + if k.setOpts(map[string]interface{}{}); expKafka.dialURL != k.dialURL { t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) } else if expKafka.queueID != k.queueID { t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID) } else if expKafka.tag != k.tag { t.Errorf("Expected: %s ,received: %s", expKafka.tag, k.tag) } - k = new(AMQPER) - expKafka = &AMQPER{ - dialURL: "amqp://localhost:2013", - queueID: "cgrates", - tag: "cgrates", - } - if err := k.setURL("127.0.0.1:2013?queue_id=cdrs&consumer_tag=new"); err == nil { - t.Errorf("Expected error received: %v", err) - } } diff --git a/ers/kafka.go b/ers/kafka.go index dda7cb60c..f56bb8f41 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -23,8 +23,6 @@ import ( "encoding/json" "fmt" "io" - "net/url" - "strings" "time" "github.com/cgrates/cgrates/agents" @@ -60,9 +58,11 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int, rdr.cap <- struct{}{} } } + rdr.dialURL = rdr.Config().SourcePath er = rdr - err = rdr.setURL(rdr.Config().SourcePath) + err = rdr.setOpts(rdr.Config().Opts) return + } // KafkaER implements EventReader interface for kafka message @@ -138,12 +138,12 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) { utils.ERs, string(msg.Key), err.Error())) } if rdr.Config().ProcessedPath != utils.EmptyString { // post it - if err := engine.PostersCache.PostKafka(rdr.Config().ProcessedPath, - rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Value, string(msg.Key)); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> writing message %s error: %s", - utils.ERs, string(msg.Key), err.Error())) - } + // if err := engine.PostersCache.PostKafka(rdr.Config().ProcessedPath, + // rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Value, string(msg.Key)); err != nil { + // utils.Logger.Warning( + // fmt.Sprintf("<%s> writing message %s error: %s", + // utils.ERs, string(msg.Key), err.Error())) + // } } if rdr.Config().ConcurrentReqs != -1 { rdr.cap <- struct{}{} @@ -181,31 +181,19 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) { return } -func (rdr *KafkaER) setURL(dialURL string) (err error) { +func (rdr *KafkaER) setOpts(opts map[string]interface{}) (err error) { rdr.topic = defaultTopic rdr.groupID = defaultGroupID rdr.maxWait = defaultMaxWait - i := strings.IndexByte(dialURL, '?') - if i < 0 { - rdr.dialURL = dialURL - return + if vals, has := opts[utils.KafkaTopic]; has { + rdr.topic = utils.IfaceAsString(vals) } - rdr.dialURL = dialURL[:i] - rawQuery := dialURL[i+1:] - var qry url.Values - if qry, err = url.ParseQuery(rawQuery); err != nil { - return + if vals, has := opts[utils.KafkaGroupID]; has { + rdr.groupID = utils.IfaceAsString(vals) } - - if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 { - rdr.topic = vals[0] - } - if vals, has := qry[utils.KafkaGroupID]; has && len(vals) != 0 { - rdr.groupID = vals[0] - } - if vals, has := qry[utils.KafkaMaxWait]; has && len(vals) != 0 { - rdr.maxWait, err = time.ParseDuration(vals[0]) + if vals, has := opts[utils.KafkaMaxWait]; has { + rdr.maxWait, err = utils.IfaceAsDuration(vals) } return } diff --git a/general_tests/poster_it_test.go b/general_tests/poster_it_test.go index ce8046b85..be6f5a7ed 100644 --- a/general_tests/poster_it_test.go +++ b/general_tests/poster_it_test.go @@ -19,20 +19,7 @@ along with this program. If not, see */ package general_tests -import ( - "encoding/json" - "fmt" - "io/ioutil" - "net/rpc" - "path" - "testing" - "time" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - +/* var ( pstrCfg *config.CGRConfig pstrRpc *rpc.Client @@ -337,3 +324,4 @@ func testPosterITStopCgrEngine(t *testing.T) { t.Error(err) } } +*/ diff --git a/guardian/guardian_test.go b/guardian/guardian_test.go index 1de2c9564..f6d420f2d 100644 --- a/guardian/guardian_test.go +++ b/guardian/guardian_test.go @@ -270,11 +270,11 @@ func BenchmarkGuardian(b *testing.B) { // BenchmarkGuardIDs-8 1000000 8732 ns/op func BenchmarkGuardIDs(b *testing.B) { for n := 0; n < b.N; n++ { - go func() { - if refID := Guardian.GuardIDs("", 0, strconv.Itoa(n)); refID != "" { + go func(i int) { + if refID := Guardian.GuardIDs("", 0, strconv.Itoa(i)); refID != "" { time.Sleep(time.Microsecond) Guardian.UnguardIDs(refID) } - }() + }(n) } } diff --git a/migrator/storage_mongo_stordb.go b/migrator/storage_mongo_stordb.go index 532d4fc2c..795f55251 100644 --- a/migrator/storage_mongo_stordb.go +++ b/migrator/storage_mongo_stordb.go @@ -87,14 +87,14 @@ func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) { return err } return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(), - bson.D{{"create", utils.SessionCostsTBL}}).Err() + bson.D{{Key: "create", Value: utils.SessionCostsTBL}}).Err() } func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) { v1ms.mgoDB.DB().Collection(utils.OldSMCosts).Drop(v1ms.mgoDB.GetContext()) v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).Drop(v1ms.mgoDB.GetContext()) return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(), - bson.D{{"create", utils.OldSMCosts}, {"size", 1024}}).Err() + bson.D{{Key: "create", Value: utils.OldSMCosts}, {Key: "size", Value: 1024}}).Err() } //get diff --git a/migrator/thresholds_test.go b/migrator/thresholds_test.go index 22e2992c1..66cdcee49 100644 --- a/migrator/thresholds_test.go +++ b/migrator/thresholds_test.go @@ -66,7 +66,7 @@ func Testv2ActionTriggerAsThreshold(t *testing.T) { Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, Blocker: false, Weight: v2ATR.Weight, - ActivationInterval: &utils.ActivationInterval{v2ATR.ExpirationDate, v2ATR.ActivationDate}, + ActivationInterval: &utils.ActivationInterval{ExpiryTime: v2ATR.ExpirationDate, ActivationTime: v2ATR.ActivationDate}, MinSleep: v2ATR.MinSleep, } th := &engine.Threshold{