From cd034b6f65888690fd4f84fd9e6ebb0758eddeb4 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 26 Oct 2021 18:44:42 +0300 Subject: [PATCH] Fix compilation errors after making ees opts into structs (incomplete) --- config/config_json_test.go | 2 +- config/config_test.go | 4 +- config/configsanity.go | 3 +- config/configsanity_test.go | 4 +- config/eescfg_test.go | 52 +++++++++++------------ ees/amqp.go | 25 +++++------ ees/amqpv1.go | 13 +++--- ees/elastic.go | 57 ++++++------------------- ees/elastic_test.go | 84 ++++++++++++------------------------- ees/filecsv.go | 4 +- ees/kafka.go | 13 +++--- ees/libcdre.go | 9 ++-- ees/libcdre_test.go | 9 ++-- ees/nats.go | 54 ++++++++++-------------- ees/poster_test.go | 14 ++++--- ees/rpc.go | 22 ++++------ ees/s3.go | 28 +++++-------- ees/sql.go | 46 +++++++++----------- ees/sql_it_test.go | 46 +++++--------------- ees/sql_test.go | 22 +++++----- ees/sqs.go | 25 ++++------- ers/kafka_test.go | 2 +- ers/sqs_test.go | 2 +- 23 files changed, 212 insertions(+), 328 deletions(-) diff --git a/config/config_json_test.go b/config/config_json_test.go index 1fbe34f6b..247a47227 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -2127,7 +2127,7 @@ func TestDfEventExporterCfg(t *testing.T) { Synchronous: utils.BoolPointer(false), Attempts: utils.IntPointer(1), Fields: &[]*FcTemplateJsonCfg{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOptsJson{}, Concurrent_requests: utils.IntPointer(0), Failed_posts_dir: utils.StringPointer("/var/spool/cgrates/failed_posts"), }, diff --git a/config/config_test.go b/config/config_test.go index a938d2245..d707878c2 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -2287,7 +2287,7 @@ func TestEEsNoLksConfig(t *testing.T) { Fields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, }, @@ -5573,7 +5573,7 @@ func TestCgrCdfEventExporter(t *testing.T) { contentFields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, }, diff --git a/config/configsanity.go b/config/configsanity.go index 3c068635d..772b95ec1 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -781,8 +781,7 @@ func (cfg *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID) } } - if fldSep, has := exp.Opts[utils.CSVFieldSepOpt]; has && - utils.IfaceAsString(fldSep) == utils.EmptyString { + if exp.Opts.CSVFieldSeparator == utils.EmptyString { return fmt.Errorf("<%s> empty %s for exporter with ID: %s", utils.EEs, utils.CSVFieldSepOpt, exp.ID) } case utils.MetaFileFWV: diff --git a/config/configsanity_test.go b/config/configsanity_test.go index 90fd03d77..077fc179a 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -1342,7 +1342,9 @@ func TestConfigSanityEventExporter(t *testing.T) { cfg.eesCfg.Exporters[0].Type = utils.MetaFileCSV cfg.eesCfg.Exporters[0].ExportPath = "/" - cfg.eesCfg.Exporters[0].Opts = map[string]interface{}{utils.CSVFieldSepOpt: ""} + cfg.eesCfg.Exporters[0].Opts = &EventExporterOpts{ + CSVFieldSeparator: utils.EmptyString, + } expected = " empty csvFieldSeparator for exporter with ID: " if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) diff --git a/config/eescfg_test.go b/config/eescfg_test.go index e1f0b286d..7dff2a666 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -40,7 +40,7 @@ func TestEESClone(t *testing.T) { "type": "*none", "export_path": "/var/spool/cgrates/ees", "opts": { - "*default": "randomVal" + "csvFieldSeparator": ";" }, "timezone": "local", "filters": ["randomFiletrs"], @@ -88,7 +88,7 @@ func TestEESClone(t *testing.T) { contentFields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, { @@ -166,8 +166,8 @@ func TestEESClone(t *testing.T) { Layout: time.RFC3339, }, }, - Opts: map[string]interface{}{ - utils.MetaDefault: "randomVal", + Opts: &EventExporterOpts{ + CSVFieldSeparator: utils.InfieldSep, }, }, }, @@ -283,7 +283,7 @@ func TestEventExporterSameID(t *testing.T) { contentFields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, { @@ -305,7 +305,7 @@ func TestEventExporterSameID(t *testing.T) { }, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, }, @@ -405,7 +405,7 @@ func TestEEsCfgloadFromJsonCfgCase1(t *testing.T) { Fields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, { @@ -429,7 +429,7 @@ func TestEEsCfgloadFromJsonCfgCase1(t *testing.T) { Layout: time.RFC3339, }, }, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, Fields: []*FCTemplate{ {Tag: utils.CGRID, Path: "*exp.CGRID", Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.CGRID", utils.InfieldSep), Layout: time.RFC3339}, }, @@ -518,7 +518,7 @@ func TestEEsCfgloadFromJsonCfgCase2(t *testing.T) { Fields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, { @@ -543,7 +543,7 @@ func TestEEsCfgloadFromJsonCfgCase2(t *testing.T) { }, }, FailedPostsDir: "/var/spool/cgrates/failed_posts", - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, Fields: []*FCTemplate{ { Tag: utils.CGRID, @@ -696,7 +696,7 @@ func TestDiffEventExporterJsonCfg(t *testing.T) { ID: "EES_ID", Type: "xml", ExportPath: "/tmp/ees", - Opts: map[string]interface{}{}, + Opts: &EventExporterOpts{}, Timezone: "UTC", Filters: []string{"Filter1"}, Flags: utils.FlagsWithParams{ @@ -736,8 +736,8 @@ func TestDiffEventExporterJsonCfg(t *testing.T) { ID: "EES_ID2", Type: "http", ExportPath: "/var/tmp/ees", - Opts: map[string]interface{}{ - "OPT": "opt", + Opts: &EventExporterOpts{ + CSVFieldSeparator: utils.InfieldSep, }, Timezone: "EEST", @@ -779,8 +779,8 @@ func TestDiffEventExporterJsonCfg(t *testing.T) { Id: utils.StringPointer("EES_ID2"), Type: utils.StringPointer("http"), Export_path: utils.StringPointer("/var/tmp/ees"), - Opts: map[string]interface{}{ - "OPT": "opt", + Opts: &EventExporterOptsJson{ + CSVFieldSeparator: utils.StringPointer(utils.InfieldSep), }, Timezone: utils.StringPointer("EEST"), Filters: &[]string{"Filter2"}, @@ -806,7 +806,7 @@ func TestDiffEventExporterJsonCfg(t *testing.T) { v1 = v2 expected = &EventExporterJsonCfg{ - Opts: map[string]interface{}{}, + Opts: &EventExporterOptsJson{}, } rcv = diffEventExporterJsonCfg(d, v1, v2, ";") if !reflect.DeepEqual(rcv, expected) { @@ -822,7 +822,7 @@ func TestDiffEventExporterJsonCfg(t *testing.T) { } expected = &EventExporterJsonCfg{ - Opts: map[string]interface{}{}, + Opts: &EventExporterOptsJson{}, Fields: &[]*FcTemplateJsonCfg{ { Type: utils.StringPointer("*prefix"), @@ -903,7 +903,7 @@ func TestDiffEventExportersJsonCfg(t *testing.T) { ID: "EES_ID", Type: "xml", ExportPath: "/tmp/ees", - Opts: map[string]interface{}{}, + Opts: &EventExporterOpts{}, Timezone: "UTC", Filters: []string{"Filter1"}, @@ -944,8 +944,8 @@ func TestDiffEventExportersJsonCfg(t *testing.T) { ID: "EES_ID2", Type: "http", ExportPath: "/var/tmp/ees", - Opts: map[string]interface{}{ - "OPT": "opt", + Opts: &EventExporterOpts{ + CSVFieldSeparator: utils.InfieldSep, }, Timezone: "EEST", @@ -987,8 +987,8 @@ func TestDiffEventExportersJsonCfg(t *testing.T) { Id: utils.StringPointer("EES_ID2"), Type: utils.StringPointer("http"), Export_path: utils.StringPointer("/var/tmp/ees"), - Opts: map[string]interface{}{ - "OPT": "opt", + Opts: &EventExporterOptsJson{ + CSVFieldSeparator: utils.StringPointer(utils.InfieldSep), }, Timezone: utils.StringPointer("EEST"), Filters: &[]string{"Filter2"}, @@ -1014,7 +1014,7 @@ func TestDiffEventExportersJsonCfg(t *testing.T) { v1 = v2 expected = &[]*EventExporterJsonCfg{ { - Opts: map[string]interface{}{}, + Opts: &EventExporterOptsJson{}, }, } rcv = diffEventExportersJsonCfg(d, v1, v2, ";") @@ -1030,7 +1030,7 @@ func TestDiffEventExportersJsonCfg(t *testing.T) { expected = &[]*EventExporterJsonCfg{ { - Opts: map[string]interface{}{}, + Opts: &EventExporterOptsJson{}, Id: utils.StringPointer("EES_ID2"), }, } @@ -1081,7 +1081,7 @@ func TestDiffEEsJsonCfg(t *testing.T) { Exporters: &[]*EventExporterJsonCfg{ { Id: utils.StringPointer("EES_ID"), - Opts: map[string]interface{}{}, + Opts: &EventExporterOptsJson{}, }, }, } @@ -1104,7 +1104,7 @@ func TestDiffEEsJsonCfg(t *testing.T) { }, Exporters: &[]*EventExporterJsonCfg{ { - Opts: map[string]interface{}{}, + Opts: &EventExporterOptsJson{}, }, }, } diff --git a/ees/amqp.go b/ees/amqp.go index b5d627346..0da737ec5 100644 --- a/ees/amqp.go +++ b/ees/amqp.go @@ -56,21 +56,22 @@ type AMQPee struct { bytePreparing } -func (pstr *AMQPee) parseOpts(dialURL map[string]interface{}) { - pstr.queueID = utils.DefaultQueueID - pstr.routingKey = utils.DefaultQueueID - if vals, has := dialURL[utils.AMQPQueueID]; has { - pstr.queueID = utils.IfaceAsString(vals) +func (pstr *AMQPee) parseOpts(dialURL *config.EventExporterOpts) { + if dialURL.AMQPQueueID == utils.EmptyString { + pstr.queueID = utils.DefaultQueueID + } else { + pstr.queueID = dialURL.AMQPQueueID } - if vals, has := dialURL[utils.AMQPRoutingKey]; has { - pstr.routingKey = utils.IfaceAsString(vals) + if dialURL.AMQPRoutingKey == utils.EmptyString { + pstr.routingKey = utils.DefaultQueueID + } else { + pstr.routingKey = dialURL.AMQPRoutingKey } - if vals, has := dialURL[utils.AMQPExchange]; has { - pstr.exchange = utils.IfaceAsString(vals) + pstr.exchange = dialURL.AMQPExchange + if dialURL.AMQPExchangeType == utils.EmptyString { pstr.exchangeType = utils.DefaultExchangeType - } - if vals, has := dialURL[utils.AMQPExchangeType]; has { - pstr.exchangeType = utils.IfaceAsString(vals) + } else { + pstr.exchangeType = dialURL.AMQPExchangeType } } diff --git a/ees/amqpv1.go b/ees/amqpv1.go index 163e1e1f7..32478f75f 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -30,13 +30,14 @@ import ( // NewAMQPv1EE creates a poster for amqpv1 func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1EE { pstr := &AMQPv1EE{ - cfg: cfg, - dc: dc, - queueID: "/" + utils.DefaultQueueID, - reqs: newConcReq(cfg.ConcurrentRequests), + cfg: cfg, + dc: dc, + reqs: newConcReq(cfg.ConcurrentRequests), } - if vals, has := cfg.Opts[utils.AMQPQueueID]; has { - pstr.queueID = "/" + utils.IfaceAsString(vals) + if cfg.Opts.AMQPQueueID == utils.EmptyString { + pstr.queueID = "/" + utils.DefaultQueueID + } else { + pstr.queueID = "/" + cfg.Opts.AMQPQueueID } return pstr } diff --git a/ees/elastic.go b/ees/elastic.go index 55d3ad57b..81c85e61b 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -57,51 +57,20 @@ type ElasticEE struct { // init will create all the necessary dependencies, including opening the file func (eEe *ElasticEE) prepareOpts() (err error) { //parse opts - eEe.opts.Index = utils.CDRsTBL - if val, has := eEe.Cfg().Opts[utils.ElsIndex]; has { - eEe.opts.Index = utils.IfaceAsString(val) - } - if val, has := eEe.Cfg().Opts[utils.ElsIfPrimaryTerm]; has { - var intVal int64 - if intVal, err = utils.IfaceAsTInt64(val); err != nil { - return - } - eEe.opts.IfPrimaryTerm = utils.IntPointer(int(intVal)) - } - if val, has := eEe.Cfg().Opts[utils.ElsIfSeqNo]; has { - var intVal int64 - if intVal, err = utils.IfaceAsTInt64(val); err != nil { - return - } - eEe.opts.IfSeqNo = utils.IntPointer(int(intVal)) - } - if val, has := eEe.Cfg().Opts[utils.ElsOpType]; has { - eEe.opts.OpType = utils.IfaceAsString(val) - } - if val, has := eEe.Cfg().Opts[utils.ElsPipeline]; has { - eEe.opts.Pipeline = utils.IfaceAsString(val) - } - if val, has := eEe.Cfg().Opts[utils.ElsRouting]; has { - eEe.opts.Routing = utils.IfaceAsString(val) - } - if val, has := eEe.Cfg().Opts[utils.ElsTimeout]; has { - if eEe.opts.Timeout, err = utils.IfaceAsDuration(val); err != nil { - return - } - } - if val, has := eEe.Cfg().Opts[utils.ElsVersionLow]; has { - var intVal int64 - if intVal, err = utils.IfaceAsTInt64(val); err != nil { - return - } - eEe.opts.Version = utils.IntPointer(int(intVal)) - } - if val, has := eEe.Cfg().Opts[utils.ElsVersionType]; has { - eEe.opts.VersionType = utils.IfaceAsString(val) - } - if val, has := eEe.Cfg().Opts[utils.ElsWaitForActiveShards]; has { - eEe.opts.WaitForActiveShards = utils.IfaceAsString(val) + if eEe.Cfg().Opts.ElsIndex == utils.EmptyString { + eEe.opts.Index = utils.CDRsTBL + } else { + eEe.opts.Index = eEe.Cfg().Opts.ElsIndex } + eEe.opts.IfPrimaryTerm = eEe.Cfg().Opts.ElsIfPrimaryTerm + eEe.opts.IfSeqNo = eEe.Cfg().Opts.ElsIfSeqNo + eEe.opts.OpType = eEe.Cfg().Opts.ElsOpType + eEe.opts.Pipeline = eEe.Cfg().Opts.ElsPipeline + eEe.opts.Routing = eEe.Cfg().Opts.ElsRouting + eEe.opts.Timeout = eEe.Cfg().Opts.ElsTimeout + eEe.opts.Version = eEe.Cfg().Opts.ElsVersion + eEe.opts.VersionType = eEe.Cfg().Opts.ElsVersionType + eEe.opts.WaitForActiveShards = eEe.Cfg().Opts.ElsWaitForActiveShards return } diff --git a/ees/elastic_test.go b/ees/elastic_test.go index d1f6e55f8..9513cd8fd 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -57,7 +57,9 @@ func TestInitClient(t *testing.T) { func TestInitCase1(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsIndex: "test"}, + Opts: &config.EventExporterOpts{ + ElsIndex: "test", + }, }, } if err := ee.prepareOpts(); err != nil { @@ -72,7 +74,9 @@ func TestInitCase1(t *testing.T) { func TestInitCase2(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsIfPrimaryTerm: 20}, + Opts: &config.EventExporterOpts{ + ElsIfPrimaryTerm: utils.IntPointer(20), + }, }, } if err := ee.prepareOpts(); err != nil { @@ -84,22 +88,12 @@ func TestInitCase2(t *testing.T) { } } -func TestInitCase2Err(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsIfPrimaryTerm: "test"}, - }, - } - errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" - if err := ee.prepareOpts(); err == nil || err.Error() != errExpect { - t.Errorf("Expected %+v \n but got %+v", errExpect, err) - } -} - func TestInitCase3(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsIfSeqNo: 20}, + Opts: &config.EventExporterOpts{ + ElsIfSeqNo: utils.IntPointer(20), + }, }, } if err := ee.prepareOpts(); err != nil { @@ -111,22 +105,12 @@ func TestInitCase3(t *testing.T) { } } -func TestInitCase3Err(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsIfSeqNo: "test"}, - }, - } - errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" - if err := ee.prepareOpts(); err == nil || err.Error() != errExpect { - t.Errorf("Expected %+v \n but got %+v", errExpect, err) - } -} - func TestInitCase4(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsOpType: "test"}, + Opts: &config.EventExporterOpts{ + ElsOpType: "test", + }, }, } if err := ee.prepareOpts(); err != nil { @@ -141,7 +125,9 @@ func TestInitCase4(t *testing.T) { func TestInitCase5(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsPipeline: "test"}, + Opts: &config.EventExporterOpts{ + ElsPipeline: "test", + }, }, } if err := ee.prepareOpts(); err != nil { @@ -156,7 +142,9 @@ func TestInitCase5(t *testing.T) { func TestInitCase6(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsRouting: "test"}, + Opts: &config.EventExporterOpts{ + ElsRouting: "test", + }, }, } if err := ee.prepareOpts(); err != nil { @@ -168,22 +156,12 @@ func TestInitCase6(t *testing.T) { } } -func TestInitCase7(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsTimeout: "test"}, - }, - } - errExpect := "time: invalid duration \"test\"" - if err := ee.prepareOpts(); err == nil || err.Error() != errExpect { - t.Errorf("Expected %+v \n but got %+v", errExpect, err) - } -} - func TestInitCase8(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsVersionLow: 20}, + Opts: &config.EventExporterOpts{ + ElsVersion: utils.IntPointer(20), + }, }, } if err := ee.prepareOpts(); err != nil { @@ -195,22 +173,12 @@ func TestInitCase8(t *testing.T) { } } -func TestInitCase8Err(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsVersionLow: "test"}, - }, - } - errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" - if err := ee.prepareOpts(); err == nil || err.Error() != errExpect { - t.Errorf("Expected %+v \n but got %+v", errExpect, err) - } -} - func TestInitCase9(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsVersionType: "test"}, + Opts: &config.EventExporterOpts{ + ElsVersionType: "test", + }, }, } if err := ee.prepareOpts(); err != nil { @@ -225,7 +193,9 @@ func TestInitCase9(t *testing.T) { func TestInitCase10(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsWaitForActiveShards: "test"}, + Opts: &config.EventExporterOpts{ + ElsWaitForActiveShards: "test", + }, }, } if err := ee.prepareOpts(); err != nil { diff --git a/ees/filecsv.go b/ees/filecsv.go index c0868de95..8cdcaa2d0 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -74,8 +74,8 @@ func (fCsv *FileCSVee) init() (err error) { } fCsv.csvWriter = csv.NewWriter(fCsv.file) fCsv.csvWriter.Comma = utils.CSVSep - if fieldSep, has := fCsv.Cfg().Opts[utils.CSVFieldSepOpt]; has { - fCsv.csvWriter.Comma = rune(utils.IfaceAsString(fieldSep)[0]) + if fCsv.Cfg().Opts.CSVFieldSeparator != utils.EmptyString { + fCsv.csvWriter.Comma = rune(fCsv.Cfg().Opts.CSVFieldSeparator[0]) } return fCsv.composeHeader() } diff --git a/ees/kafka.go b/ees/kafka.go index 755646601..57bd49136 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -29,13 +29,14 @@ import ( // NewKafkaEE creates a kafka poster func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE { kfkPstr := &KafkaEE{ - cfg: cfg, - dc: dc, - topic: utils.DefaultQueueID, - reqs: newConcReq(cfg.ConcurrentRequests), + cfg: cfg, + dc: dc, + reqs: newConcReq(cfg.ConcurrentRequests), } - if vals, has := cfg.Opts[utils.KafkaTopic]; has { - kfkPstr.topic = utils.IfaceAsString(vals) + if cfg.Opts.KafkaTopic == utils.EmptyString { + kfkPstr.topic = utils.DefaultQueueID + } else { + kfkPstr.topic = cfg.Opts.KafkaTopic } return kfkPstr } diff --git a/ees/libcdre.go b/ees/libcdre.go index ee8fe8a01..19f256c54 100644 --- a/ees/libcdre.go +++ b/ees/libcdre.go @@ -57,14 +57,11 @@ func writeFailedPosts(itmID string, value interface{}) { } } -func AddFailedPost(failedPostsDir, expPath, format, module string, ev interface{}, opts map[string]interface{}) { +func AddFailedPost(failedPostsDir, expPath, format, module string, ev interface{}, opts *config.EventExporterOpts) { key := utils.ConcatenatedKey(failedPostsDir, expPath, format, module) // also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id - if qID := utils.FirstNonEmpty( - utils.IfaceAsString(opts[utils.AMQPQueueID]), - utils.IfaceAsString(opts[utils.S3Bucket]), - utils.IfaceAsString(opts[utils.SQSQueueID]), - utils.IfaceAsString(opts[utils.KafkaTopic])); len(qID) != 0 { + if qID := utils.FirstNonEmpty(opts.AMQPQueueID, opts.S3BucketID, + opts.SQSQueueID, opts.KafkaTopic); len(qID) != 0 { key = utils.ConcatenatedKey(key, qID) } var failedPost *ExportEvents diff --git a/ees/libcdre_test.go b/ees/libcdre_test.go index 2e97dbef3..20b3f577f 100644 --- a/ees/libcdre_test.go +++ b/ees/libcdre_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -37,7 +38,7 @@ func TestSetFldPostCacheTTL(t *testing.T) { func TestAddFldPost(t *testing.T) { SetFailedPostCacheTTL(5 * time.Second) - AddFailedPost("", "path1", "format1", "module1", "1", make(map[string]interface{})) + AddFailedPost("", "path1", "format1", "module1", "1", &config.EventExporterOpts{}) x, ok := failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1", "module1")) if !ok { t.Error("Error reading from cache") @@ -60,8 +61,10 @@ func TestAddFldPost(t *testing.T) { if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) } - AddFailedPost("", "path1", "format1", "module1", "2", make(map[string]interface{})) - AddFailedPost("", "path2", "format2", "module2", "3", map[string]interface{}{utils.SQSQueueID: "qID"}) + AddFailedPost("", "path1", "format1", "module1", "2", &config.EventExporterOpts{}) + AddFailedPost("", "path2", "format2", "module2", "3", &config.EventExporterOpts{ + SQSQueueID: "qID", + }) x, ok = failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1", "module1")) if !ok { t.Error("Error reading from cache") diff --git a/ees/nats.go b/ees/nats.go index 393f14127..879f17a24 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -61,24 +61,17 @@ type NatsEE struct { bytePreparing } -func (pstr *NatsEE) parseOpt(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (err error) { - if useJetStreamVal, has := opts[utils.NatsJetStream]; has { - if pstr.jetStream, err = utils.IfaceAsBool(useJetStreamVal); err != nil { - return - } - } - pstr.subject = utils.DefaultQueueID - if vals, has := opts[utils.NatsSubject]; has { - pstr.subject = utils.IfaceAsString(vals) +func (pstr *NatsEE) parseOpt(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (err error) { + pstr.jetStream = opts.NATSJetStream + if opts.NATSSubject == utils.EmptyString { + pstr.subject = utils.DefaultQueueID + } else { + pstr.subject = opts.NATSSubject } pstr.opts, err = GetNatsOpts(opts, nodeID, connTimeout) if pstr.jetStream { - if maxWaitVal, has := opts[utils.NatsJetStreamMaxWait]; has { - var maxWait time.Duration - if maxWait, err = utils.IfaceAsDuration(maxWaitVal); err != nil { - return - } - pstr.jsOpts = []nats.JSOpt{nats.MaxWait(maxWait)} + if opts.NATSJetStreamMaxWait != 0 { + pstr.jsOpts = []nats.JSOpt{nats.MaxWait(opts.NATSJetStreamMaxWait)} } } return @@ -130,51 +123,50 @@ func (pstr *NatsEE) Close() (err error) { func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } -func GetNatsOpts(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { +func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { nop = make([]nats.Option, 0, 7) nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID), nats.Timeout(connTimeout), nats.DrainTimeout(time.Second)) - if userFile, has := opts[utils.NatsJWTFile]; has { + if opts.NATSJWTFile != utils.EmptyString { keys := make([]string, 0, 1) - if keyFile, has := opts[utils.NatsSeedFile]; has { - keys = append(keys, utils.IfaceAsString(keyFile)) + if opts.NATSSeedFile != utils.EmptyString { + keys = append(keys, opts.NATSSeedFile) } - nop = append(nop, nats.UserCredentials(utils.IfaceAsString(userFile), keys...)) + nop = append(nop, nats.UserCredentials(opts.NATSJWTFile, keys...)) } - if nkeyFile, has := opts[utils.NatsSeedFile]; has { - opt, err := nats.NkeyOptionFromSeed(utils.IfaceAsString(nkeyFile)) + if opts.NATSSeedFile != utils.EmptyString { + opt, err := nats.NkeyOptionFromSeed(opts.NATSSeedFile) if err != nil { return nil, err } nop = append(nop, opt) } - if certFile, has := opts[utils.NatsClientCertificate]; has { - clientFile, has := opts[utils.NatsClientKey] - if !has { + if opts.NATSClientCertificate != utils.EmptyString { + if opts.NATSClientKey == utils.EmptyString { err = fmt.Errorf("has certificate but no key") return } - nop = append(nop, nats.ClientCert(utils.IfaceAsString(certFile), utils.IfaceAsString(clientFile))) - } else if _, has := opts[utils.NatsClientKey]; has { + nop = append(nop, nats.ClientCert(opts.NATSClientCertificate, opts.NATSClientKey)) + } else if opts.NATSClientKey != utils.EmptyString { err = fmt.Errorf("has key but no certificate") return } - - if caFile, has := opts[utils.NatsCertificateAuthority]; has { + if opts.NATSCertificateAuthority != utils.EmptyString { nop = append(nop, func(o *nats.Options) error { pool, err := x509.SystemCertPool() if err != nil { return err } - rootPEM, err := ioutil.ReadFile(utils.IfaceAsString(caFile)) + rootPEM, err := ioutil.ReadFile(opts.NATSCertificateAuthority) if err != nil || rootPEM == nil { return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err) } ok := pool.AppendCertsFromPEM(rootPEM) if !ok { - return fmt.Errorf("nats: failed to parse root certificate from %q", caFile) + return fmt.Errorf("nats: failed to parse root certificate from %q", + opts.NATSCertificateAuthority) } if o.TLSConfig == nil { o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} diff --git a/ees/poster_test.go b/ees/poster_test.go index f3c9364b7..1c37788e7 100644 --- a/ees/poster_test.go +++ b/ees/poster_test.go @@ -37,11 +37,11 @@ func TestAMQPeeParseURL(t *testing.T) { exchangeType: "fanout", routingKey: "CGRCDR", } - opts := map[string]interface{}{ - utils.AMQPQueueID: "q1", - utils.AMQPExchange: "E1", - utils.AMQPRoutingKey: "CGRCDR", - utils.AMQPExchangeType: "fanout", + opts := &config.EventExporterOpts{ + AMQPQueueID: "q1", + AMQPExchange: "E1", + AMQPRoutingKey: "CGRCDR", + AMQPExchangeType: "fanout", } amqp.parseOpts(opts) if !reflect.DeepEqual(expected, amqp) { @@ -53,7 +53,9 @@ func TestKafkaParseURL(t *testing.T) { cfg := &config.EventExporterCfg{ ExportPath: "127.0.0.1:9092", Attempts: 10, - Opts: map[string]interface{}{utils.KafkaTopic: "cdr_billing"}, + Opts: &config.EventExporterOpts{ + KafkaTopic: "cdr_billing", + }, } exp := &KafkaEE{ cfg: cfg, diff --git a/ees/rpc.go b/ees/rpc.go index 48b1fbe64..b3013762c 100644 --- a/ees/rpc.go +++ b/ees/rpc.go @@ -110,19 +110,13 @@ func (e *RPCee) PrepareOrderMap(oMp *utils.OrderedNavigableMap) (interface{}, er } func (e *RPCee) parseOpts() (err error) { - e.codec = utils.IfaceAsString(e.cfg.Opts[utils.RpcCodec]) - e.serviceMethod = utils.IfaceAsString(e.cfg.Opts[utils.ServiceMethod]) - e.keyPath = utils.IfaceAsString(e.cfg.Opts[utils.KeyPath]) - e.certPath = utils.IfaceAsString(e.cfg.Opts[utils.CertPath]) - e.caPath = utils.IfaceAsString(e.cfg.Opts[utils.CaPath]) - if e.tls, err = utils.IfaceAsBool(e.cfg.Opts[utils.Tls]); err != nil { - return - } - if e.connTimeout, err = utils.IfaceAsDuration(e.cfg.Opts[utils.RpcConnTimeout]); err != nil { - return - } - if e.replyTimeout, err = utils.IfaceAsDuration(e.cfg.Opts[utils.RpcReplyTimeout]); err != nil { - return - } + e.codec = e.cfg.Opts.RPCCodec + e.serviceMethod = e.cfg.Opts.ServiceMethod + e.keyPath = e.cfg.Opts.KeyPath + e.certPath = e.cfg.Opts.CertPath + e.caPath = e.cfg.Opts.CAPath + e.tls = e.cfg.Opts.TLS + e.connTimeout = e.cfg.Opts.RPCConnTimeout + e.replyTimeout = e.cfg.Opts.RPCReplyTimeout return } diff --git a/ees/s3.go b/ees/s3.go index 6aad90547..e8c106a3c 100644 --- a/ees/s3.go +++ b/ees/s3.go @@ -61,26 +61,18 @@ type S3EE struct { bytePreparing } -func (pstr *S3EE) parseOpts(opts map[string]interface{}) { +func (pstr *S3EE) parseOpts(opts *config.EventExporterOpts) { pstr.bucket = utils.DefaultQueueID - if val, has := opts[utils.S3Bucket]; has { - pstr.bucket = utils.IfaceAsString(val) - } - if val, has := opts[utils.S3FolderPath]; has { - pstr.folderPath = utils.IfaceAsString(val) - } - if val, has := opts[utils.AWSRegion]; has { - pstr.awsRegion = utils.IfaceAsString(val) - } - if val, has := opts[utils.AWSKey]; has { - pstr.awsID = utils.IfaceAsString(val) - } - if val, has := opts[utils.AWSSecret]; has { - pstr.awsKey = utils.IfaceAsString(val) - } - if val, has := opts[utils.AWSToken]; has { - pstr.awsToken = utils.IfaceAsString(val) + if opts.S3BucketID == utils.EmptyString { + pstr.bucket = utils.DefaultQueueID + } else { + pstr.bucket = opts.S3BucketID } + pstr.folderPath = opts.S3FolderPath + pstr.awsRegion = opts.AWSRegion + pstr.awsID = opts.AWSKey + pstr.awsKey = opts.AWSSecret + pstr.awsToken = opts.AWSToken } func (pstr *S3EE) Cfg() *config.EventExporterCfg { return pstr.cfg } diff --git a/ees/sql.go b/ees/sql.go index 69c4306fb..f83da278a 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -71,19 +71,23 @@ func (sqlEe *SQLEe) initDialector() (err error) { } password, _ := u.User.Password() - dbname := utils.SQLDefaultDBName - if vals, has := sqlEe.Cfg().Opts[utils.SQLDBNameOpt]; has { - dbname = utils.IfaceAsString(vals) + var dbname string + if sqlEe.Cfg().Opts.SQLDBName == utils.EmptyString { + dbname = utils.SQLDefaultDBName + } else { + dbname = sqlEe.Cfg().Opts.SQLDBName } - ssl := utils.SQLDefaultSSLMode - if vals, has := sqlEe.Cfg().Opts[utils.SSLModeCfg]; has { - ssl = utils.IfaceAsString(vals) + var ssl string + if sqlEe.Cfg().Opts.SSLMode == utils.EmptyString { + ssl = utils.SQLDefaultSSLMode + } else { + ssl = sqlEe.Cfg().Opts.SSLMode } // tableName is mandatory in opts - if iface, has := sqlEe.Cfg().Opts[utils.SQLTableNameOpt]; !has { + if sqlEe.Cfg().Opts.SQLTableName == utils.EmptyString { return utils.NewErrMandatoryIeMissing(utils.SQLTableNameOpt) } else { - sqlEe.tableName = utils.IfaceAsString(iface) + sqlEe.tableName = sqlEe.Cfg().Opts.SQLTableName } // var dialect gorm.Dialector @@ -99,7 +103,7 @@ func (sqlEe *SQLEe) initDialector() (err error) { return } -func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, sqlDB *sql.DB, err error) { +func openDB(dialect gorm.Dialector, opts *config.EventExporterOpts) (db *gorm.DB, sqlDB *sql.DB, err error) { if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil { return } @@ -107,26 +111,14 @@ func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, s return } - if iface, has := opts[utils.SQLMaxIdleConnsCfg]; has { - val, err := utils.IfaceAsTInt64(iface) - if err != nil { - return nil, nil, err - } - sqlDB.SetMaxIdleConns(int(val)) + if opts.SQLMaxIdleConns != 0 { + sqlDB.SetMaxIdleConns(opts.SQLMaxIdleConns) } - if iface, has := opts[utils.SQLMaxOpenConns]; has { - val, err := utils.IfaceAsTInt64(iface) - if err != nil { - return nil, nil, err - } - sqlDB.SetMaxOpenConns(int(val)) + if opts.SQLMaxOpenConns != 0 { + sqlDB.SetMaxOpenConns(opts.SQLMaxOpenConns) } - if iface, has := opts[utils.SQLConnMaxLifetime]; has { - val, err := utils.IfaceAsDuration(iface) - if err != nil { - return nil, nil, err - } - sqlDB.SetConnMaxLifetime(val) + if opts.SQLConnMaxLifetime != 0 { + sqlDB.SetConnMaxLifetime(opts.SQLConnMaxLifetime) } return diff --git a/ees/sql_it_test.go b/ees/sql_it_test.go index c0cdec68b..59a2ad1a4 100644 --- a/ees/sql_it_test.go +++ b/ees/sql_it_test.go @@ -236,64 +236,40 @@ func testSqlEeVerifyExportedEvent2(t *testing.T) { func TestOpenDB1(t *testing.T) { dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxIdleConnsCfg: 2}) + _, _, err := openDB(dialect, &config.EventExporterOpts{ + SQLMaxIdleConns: 2, + }) if err != nil { t.Error(err) } } -func TestOpenDB1Err(t *testing.T) { - dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", - "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxIdleConnsCfg: "test"}) - errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" - if err == nil || err.Error() != errExpect { - t.Errorf("Expected %v but received %v", errExpect, err) - } -} - func TestOpenDB2(t *testing.T) { dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxOpenConns: 2}) + _, _, err := openDB(dialect, &config.EventExporterOpts{ + SQLMaxOpenConns: 2, + }) if err != nil { t.Error(err) } } -func TestOpenDB2Err(t *testing.T) { - dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", - "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxOpenConns: "test"}) - errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" - if err == nil || err.Error() != errExpect { - t.Errorf("Expected %v but received %v", errExpect, err) - } -} - func TestOpenDB3(t *testing.T) { dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLConnMaxLifetime: 2}) + _, _, err := openDB(dialect, &config.EventExporterOpts{ + SQLConnMaxLifetime: 2, + }) if err != nil { t.Error(err) } } -func TestOpenDB3Err(t *testing.T) { - dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", - "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLConnMaxLifetime: "test"}) - errExpect := "time: invalid duration \"test\"" - if err == nil || err.Error() != errExpect { - t.Errorf("Expected %v but received %v", errExpect, err) - } -} - func TestSQLExportEvent1(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "cgrates" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = "cgrates" cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306` sqlEe, err := NewSQLEe(cgrCfg.EEsCfg().Exporters[0], nil) if err != nil { diff --git a/ees/sql_test.go b/ees/sql_test.go index cae7b0ebd..1fae3c862 100644 --- a/ees/sql_test.go +++ b/ees/sql_test.go @@ -46,9 +46,9 @@ func TestSqlGetMetrics(t *testing.T) { func TestNewSQLeUrl(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SSLModeCfg] = "test" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = "postgres" + cgrCfg.EEsCfg().Exporters[0].Opts.SSLMode = "test" sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], reqs: newConcReq(0), @@ -61,8 +61,8 @@ func TestNewSQLeUrl(t *testing.T) { func TestNewSQLeUrlSQL(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "mysql" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = "mysql" cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306` sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], @@ -79,8 +79,8 @@ func TestNewSQLeUrlSQL(t *testing.T) { func TestNewSQLeUrlPostgres(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = "postgres" cgrCfg.EEsCfg().Exporters[0].ExportPath = `postgres://cgrates:CGRateS.org@127.0.0.1:3306` sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], @@ -97,8 +97,8 @@ func TestNewSQLeUrlPostgres(t *testing.T) { func TestNewSQLeExportPathError(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = "postgres" cgrCfg.EEsCfg().Exporters[0].ExportPath = ":foo" sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], @@ -120,7 +120,7 @@ func TestOpenDBError2(t *testing.T) { tmp := logger.Default logger.Default = logger.Default.LogMode(logger.Silent) mckDialect := new(mockDialect2) - _, _, err := openDB(mckDialect, make(map[string]interface{})) + _, _, err := openDB(mckDialect, &config.EventExporterOpts{}) errExpect := "invalid db" if err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) @@ -140,7 +140,7 @@ func TestOpenDBError3(t *testing.T) { tmp := logger.Default logger.Default = logger.Default.LogMode(logger.Silent) mckDialect := new(mockDialectErr) - _, _, err := openDB(mckDialect, make(map[string]interface{})) + _, _, err := openDB(mckDialect, &config.EventExporterOpts{}) errExpect := "NOT_FOUND" if err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) diff --git a/ees/sqs.go b/ees/sqs.go index 3b9bd5b34..f1d0c97ad 100644 --- a/ees/sqs.go +++ b/ees/sqs.go @@ -60,23 +60,16 @@ type SQSee struct { bytePreparing } -func (pstr *SQSee) parseOpts(opts map[string]interface{}) { - pstr.queueID = utils.DefaultQueueID - if val, has := opts[utils.SQSQueueID]; has { - pstr.queueID = utils.IfaceAsString(val) - } - if val, has := opts[utils.AWSRegion]; has { - pstr.awsRegion = utils.IfaceAsString(val) - } - if val, has := opts[utils.AWSKey]; has { - pstr.awsID = utils.IfaceAsString(val) - } - if val, has := opts[utils.AWSSecret]; has { - pstr.awsKey = utils.IfaceAsString(val) - } - if val, has := opts[utils.AWSToken]; has { - pstr.awsToken = utils.IfaceAsString(val) +func (pstr *SQSee) parseOpts(opts *config.EventExporterOpts) { + if opts.SQSQueueID == utils.EmptyString { + pstr.queueID = utils.DefaultQueueID + } else { + pstr.queueID = opts.SQSQueueID } + pstr.awsRegion = opts.AWSRegion + pstr.awsID = opts.AWSKey + pstr.awsKey = opts.AWSSecret + pstr.awsToken = opts.AWSToken } func (pstr *SQSee) Cfg() *config.EventExporterCfg { return pstr.cfg } diff --git a/ers/kafka_test.go b/ers/kafka_test.go index 68d7b299f..6c99db0ec 100644 --- a/ers/kafka_test.go +++ b/ers/kafka_test.go @@ -138,7 +138,7 @@ func TestKafkaERServe2(t *testing.T) { poster: ees.NewKafkaEE(&config.EventExporterCfg{ ExportPath: "url", Attempts: 1, - Opts: make(map[string]interface{}), + Opts: &config.EventExporterOpts{}, }, nil), } rdr.rdrExit <- struct{}{} diff --git a/ers/sqs_test.go b/ers/sqs_test.go index 01be4cd91..fdf5289d3 100644 --- a/ers/sqs_test.go +++ b/ers/sqs_test.go @@ -527,7 +527,7 @@ func TestSQSERReadMsgError3(t *testing.T) { poster: ees.NewSQSee(&config.EventExporterCfg{ ExportPath: "url", Attempts: 1, - Opts: make(map[string]interface{}), + Opts: &config.EventExporterOpts{}, }, nil), } awsCfg := aws.Config{Endpoint: aws.String(rdr.Config().SourcePath)}