From 38447e9229c09a1fb8bd82bedfed0f6c4f3dc55e Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Fri, 26 Nov 2021 17:09:59 +0200 Subject: [PATCH] Make ers opts as struct and fix compilation errors --- config/config.go | 2 + config/config_it_test.go | 32 +-- config/config_json_test.go | 16 +- config/config_test.go | 50 ++--- config/configsanity.go | 32 ++- config/configsanity_test.go | 82 +++----- config/eescfg.go | 2 +- config/eescfg_test.go | 10 +- config/erscfg.go | 374 ++++++++++++++++++------------------ config/erscfg_test.go | 205 ++++++++++---------- config/libconfig_json.go | 2 +- ees/nats_test.go | 5 + ers/amqp.go | 30 +-- ers/amqp_test.go | 8 +- ers/amqpv1.go | 12 +- ers/amqpv1_it_test.go | 4 +- ers/ers.go | 16 +- ers/ers_it_test.go | 74 +++---- ers/ers_test.go | 26 +-- ers/filecsv.go | 24 +-- ers/filefwv_it_test.go | 4 +- ers/filexml.go | 5 +- ers/filexml_test.go | 4 +- ers/kafka.go | 23 +-- ers/kafka_test.go | 21 +- ers/libers.go | 148 +++++++++++++- ers/libers_test.go | 11 +- ers/nats.go | 102 ++++++++-- ers/nats_it_test.go | 5 +- ers/readers_test.go | 16 +- ers/s3.go | 30 +-- ers/s3_it_test.go | 6 +- ers/s3_test.go | 26 +-- ers/sql.go | 34 ++-- ers/sql_it_test.go | 4 +- ers/sql_test.go | 46 ++--- ers/sqs.go | 30 +-- ers/sqs_test.go | 29 ++- utils/consts.go | 1 + 39 files changed, 869 insertions(+), 682 deletions(-) diff --git a/config/config.go b/config/config.go index fa8f15bd3..dc5d5bf00 100644 --- a/config/config.go +++ b/config/config.go @@ -177,6 +177,8 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) { cfg.configSCfg = new(ConfigSCfg) cfg.apiBanCfg = new(APIBanCfg) cfg.coreSCfg = new(CoreSCfg) + cfg.dfltEvExp = &EventExporterCfg{Opts: &EventExporterOpts{}} + cfg.dfltEvRdr = &EventReaderCfg{Opts: &EventReaderOpts{}} cfg.cacheDP = make(map[string]utils.MapStorage) diff --git a/config/config_it_test.go b/config/config_it_test.go index df13d5092..5c53464c5 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -591,14 +591,14 @@ func testCGRConfigReloadERs(t *testing.T) { Fields: content, CacheDumpFields: []*FCTemplate{}, PartialCommitFields: []*FCTemplate{}, - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "xmlRootPath": "", - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + XMLRootPath: utils.StringPointer(utils.EmptyString), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, { @@ -613,14 +613,14 @@ func testCGRConfigReloadERs(t *testing.T) { Fields: content, CacheDumpFields: []*FCTemplate{}, PartialCommitFields: []*FCTemplate{}, - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "xmlRootPath": "", - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + XMLRootPath: utils.StringPointer(utils.EmptyString), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 89004cc27..1b1677ab8 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -1987,14 +1987,14 @@ func TestDfEventReaderCfg(t *testing.T) { Fields: &cdrFields, Cache_dump_fields: &[]*FcTemplateJsonCfg{}, Partial_commit_fields: &[]*FcTemplateJsonCfg{}, - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOptsJson{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + XMLRootPath: utils.StringPointer(utils.EmptyString), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, }, diff --git a/config/config_test.go b/config/config_test.go index 872a721f4..57d8786b8 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -2342,14 +2342,14 @@ func TestERSConfig(t *testing.T) { Fields: nil, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + XMLRootPath: utils.StringPointer(utils.EmptyString), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -4411,7 +4411,7 @@ func TestV1GetConfigSectionERS(t *testing.T) { utils.OptsCfg: map[string]interface{}{ "csvFieldSeparator": ",", "csvHeaderDefineChar": ":", - "csvRowLength": 0., + "csvRowLength": 0, "xmlRootPath": "", "partialOrderField": "~*req.AnswerTime", "partialCacheAction": utils.MetaNone, @@ -5330,14 +5330,14 @@ func TestCgrCdfEventReader(t *testing.T) { }, CacheDumpFields: []*FCTemplate{}, PartialCommitFields: []*FCTemplate{}, - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + XMLRootPath: utils.StringPointer(utils.EmptyString), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -5426,14 +5426,14 @@ func TestCgrCfgEventReaderDefault(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + XMLRootPath: utils.StringPointer(utils.EmptyString), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, } for _, v := range eCfg.Fields { diff --git a/config/configsanity.go b/config/configsanity.go index 484fd1c38..5365ded19 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -700,7 +700,10 @@ func (cfg *CGRConfig) checkConfigSanity() error { if !possibleReaderTypes.Has(rdr.Type) { return fmt.Errorf("<%s> unsupported data type: %s for reader with ID: %s", utils.ERs, rdr.Type, rdr.ID) } - pAct := utils.IfaceAsString(rdr.Opts[utils.PartialCacheActionOpt]) + var pAct string + if rdr.Opts.PartialCacheAction != nil { + pAct = *rdr.Opts.PartialCacheAction + } if pAct != utils.MetaDumpToFile && pAct != utils.MetaNone && pAct != utils.MetaPostCDR && @@ -708,23 +711,22 @@ func (cfg *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> wrong partial expiry action for reader with ID: %s", utils.ERs, rdr.ID) } if pAct != utils.MetaNone { // if is *none we do not process the evicted events - if fldSep, has := rdr.Opts[utils.PartialOrderFieldOpt]; has && // the field we order after must not be empty - utils.IfaceAsString(fldSep) == utils.EmptyString { + if rdr.Opts.PartialOrderField != nil && *rdr.Opts.PartialOrderField == utils.EmptyString { // the field we order after must not be empty return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.PartialOrderFieldOpt, rdr.ID) } } if pAct == utils.MetaDumpToFile || pAct == utils.MetaDumpToJSON { // only if the action is *dump_to_file path := rdr.ProcessedPath - if pathVal, has := rdr.Opts[utils.PartialPathOpt]; has { // the path from options needs to exists if overwriten by reader - path = utils.IfaceAsString(pathVal) + if rdr.Opts.PartialPath != nil { + path = *rdr.Opts.PartialPath } - if _, err := os.Stat(utils.IfaceAsString(path)); err != nil && os.IsNotExist(err) { + if _, err := os.Stat(path); err != nil && os.IsNotExist(err) { return fmt.Errorf("<%s> nonexistent partial folder: %s for reader with ID: %s", utils.ERs, path, rdr.ID) } if pAct == utils.MetaDumpToFile { - if fldSep, has := rdr.Opts[utils.PartialCSVFieldSepartorOpt]; has && // the separtor must not be empty - utils.IfaceAsString(fldSep) == utils.EmptyString { + if rdr.Opts.PartialCSVFieldSeparator != nil && // the separtor must not be empty + *rdr.Opts.PartialCSVFieldSeparator == utils.EmptyString { return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.PartialCSVFieldSepartorOpt, rdr.ID) } } @@ -740,20 +742,10 @@ func (cfg *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID) } } - if fldSep, has := rdr.Opts[utils.CSVFieldSepOpt]; has && - utils.IfaceAsString(fldSep) == utils.EmptyString { + if rdr.Opts.CSVFieldSeparator != nil && + *rdr.Opts.CSVFieldSeparator == utils.EmptyString { return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.CSVFieldSepOpt, rdr.ID) } - if rowl, has := rdr.Opts[utils.CSVRowLengthOpt]; has { - if _, err := utils.IfaceAsTInt64(rowl); err != nil { - return fmt.Errorf("<%s> error when converting %s: <%s> for reader with ID: %s", utils.ERs, utils.CSVRowLengthOpt, err.Error(), rdr.ID) - } - } - if lq, has := rdr.Opts[utils.CSVLazyQuotes]; has { - if _, err := utils.IfaceAsBool(lq); err != nil { - return fmt.Errorf("<%s> error when converting %s: <%s> for reader with ID: %s", utils.ERs, utils.CSVLazyQuotes, err.Error(), rdr.ID) - } - } case utils.MetaKafkajsonMap: if rdr.RunDelay > 0 { return fmt.Errorf("<%s> the RunDelay field can not be bigger than zero for reader with ID: %s", utils.ERs, rdr.ID) diff --git a/config/configsanity_test.go b/config/configsanity_test.go index 2a1d53130..90620a4ba 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -1234,8 +1234,8 @@ func TestConfigSanityEventReader(t *testing.T) { ID: "test2", Type: utils.MetaFileCSV, ProcessedPath: "not/a/path", - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaNone, + Opts: &EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaNone), }, }} expected = " nonexistent folder: not/a/path for reader with ID: test2" @@ -1248,9 +1248,9 @@ func TestConfigSanityEventReader(t *testing.T) { Type: utils.MetaFileCSV, ProcessedPath: "/", SourcePath: "/", - Opts: map[string]interface{}{ - "csvFieldSeparator": "", - utils.PartialCacheActionOpt: utils.MetaNone, + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.EmptyString), + PartialCacheAction: utils.StringPointer(utils.MetaNone), }, }} expected = " empty csvFieldSeparator for reader with ID: test3" @@ -1261,9 +1261,9 @@ func TestConfigSanityEventReader(t *testing.T) { ID: "test4", Type: utils.MetaKafkajsonMap, RunDelay: 1, - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - utils.PartialCacheActionOpt: utils.MetaNone, + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + PartialCacheAction: utils.StringPointer(utils.MetaNone), }, } expected = " the RunDelay field can not be bigger than zero for reader with ID: test4" @@ -1276,8 +1276,8 @@ func TestConfigSanityEventReader(t *testing.T) { RunDelay: 0, ProcessedPath: "not/a/path", SourcePath: "not/a/path", - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaNone, + Opts: &EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaNone), }, } expected = " nonexistent folder: not/a/path for reader with ID: test5" @@ -1291,8 +1291,8 @@ func TestConfigSanityEventReader(t *testing.T) { RunDelay: 0, ProcessedPath: "not/a/path", SourcePath: "not/a/path", - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaNone, + Opts: &EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaNone), }, } expected = " nonexistent folder: not/a/path for reader with ID: test5" @@ -1313,8 +1313,8 @@ func TestConfigSanityEventReader(t *testing.T) { {Tag: "SessionId", Path: utils.EmptyString, Type: "*variable", Value: NewRSRParsersMustCompile("~*req.Session-Id", utils.InfieldSep), Mandatory: true}, }, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaNone, + Opts: &EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaNone), }, }, }, @@ -1988,33 +1988,33 @@ func TestConfigSanityErs(t *testing.T) { { Type: utils.MetaNone, ID: "rdrID", - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaPostCDR, + Opts: &EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaPostCDR), }, }, } - cfg.ersCfg.Readers[0].Opts = map[string]interface{}{ - utils.PartialOrderFieldOpt: utils.EmptyString, - utils.PartialCacheActionOpt: "invalid", + cfg.ersCfg.Readers[0].Opts = &EventReaderOpts{ + PartialOrderField: utils.StringPointer(utils.EmptyString), + PartialCacheAction: utils.StringPointer("invalid"), } expected := " wrong partial expiry action for reader with ID: rdrID" if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("expected: <%v>,\n received: <%v>", expected, err) } - cfg.ersCfg.Readers[0].Opts = map[string]interface{}{ - utils.PartialOrderFieldOpt: utils.EmptyString, - utils.PartialCacheActionOpt: utils.MetaPostCDR, + cfg.ersCfg.Readers[0].Opts = &EventReaderOpts{ + PartialOrderField: utils.StringPointer(utils.EmptyString), + PartialCacheAction: utils.StringPointer(utils.MetaPostCDR), } expected = " empty partialOrderField for reader with ID: rdrID" if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("expected: <%v>,\n received: <%v>", expected, err) } - cfg.ersCfg.Readers[0].Opts = map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToFile, - utils.PartialCSVFieldSepartorOpt: utils.EmptyString, + cfg.ersCfg.Readers[0].Opts = &EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToFile), + PartialCSVFieldSeparator: utils.StringPointer(utils.EmptyString), } cfg.ersCfg.Readers[0].ProcessedPath = "/tmp" expected = " empty partialcsvFieldSeparator for reader with ID: rdrID" @@ -2022,37 +2022,13 @@ func TestConfigSanityErs(t *testing.T) { t.Errorf("expected: <%v>,\n received: <%v>", expected, err) } - cfg.ersCfg.Readers[0].Opts = map[string]interface{}{ - utils.PartialOrderFieldOpt: "non_empty", - utils.PartialCacheActionOpt: utils.MetaDumpToFile, - utils.PartialPathOpt: "path", + cfg.ersCfg.Readers[0].Opts = &EventReaderOpts{ + PartialOrderField: utils.StringPointer("non_empty"), + PartialCacheAction: utils.StringPointer(utils.MetaDumpToFile), + PartialPath: utils.StringPointer("path"), } expected = " nonexistent partial folder: path for reader with ID: rdrID" if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("expected: <%v>,\n received: <%v>", expected, err) } - - cfg.ersCfg.Readers[0].Opts = map[string]interface{}{ - utils.PartialCSVFieldSepartorOpt: utils.EmptyString, - } - - cfg.ersCfg.Readers[0].Type = utils.MetaFileCSV - cfg.ersCfg.Readers[0].ProcessedPath = utils.EmptyString - cfg.ersCfg.Readers[0].SourcePath = "/tmp" - cfg.ersCfg.Readers[0].ID = utils.MetaDefault - cfg.ersCfg.Readers[0].Opts = map[string]interface{}{ - utils.CSVRowLengthOpt: "can't convert", - utils.CSVLazyQuotes: "can't convert", - utils.PartialCacheActionOpt: utils.MetaNone, - } - expected = ` error when converting csvRowLength: for reader with ID: *default` - if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { - t.Errorf("expected: <%v>,\n received: <%v>", expected, err) - } - - cfg.ersCfg.Readers[0].Opts[utils.CSVRowLengthOpt] = "2" - expected = ` error when converting csvLazyQuotes: for reader with ID: *default` - if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { - t.Errorf("expected: <%v>,\n received: <%v>", expected, err) - } } diff --git a/config/eescfg.go b/config/eescfg.go index a09d614da..781085090 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -657,7 +657,7 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str opts[utils.SQLDBNameOpt] = *eeC.Opts.SQLDBName } if eeC.Opts.SSLMode != nil { - opts[utils.SSLModeCfg] = *eeC.Opts.SSLMode + opts[utils.SSLMode] = *eeC.Opts.SSLMode } if eeC.Opts.KafkaTopic != nil { opts[utils.KafkaTopic] = *eeC.Opts.KafkaTopic diff --git a/config/eescfg_test.go b/config/eescfg_test.go index e70b1475b..f6e5f75f8 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -40,8 +40,8 @@ func TestEESClone(t *testing.T) { "type": "*none", "export_path": "/var/spool/cgrates/ees", "opts": { - "*default": "randomVal" - }, + "csvFieldSeparator": ";", // separator used when reading the fields + }, "timezone": "local", "filters": ["randomFiletrs"], "flags": [], @@ -607,7 +607,7 @@ func TestEEsCfgAsMapInterface(t *testing.T) { "type": "*file_csv", "export_path": "/tmp/testCSV", "opts": { - "kafkaGroupID": "test", + "kafkaTopic": "test", }, "timezone": "UTC", "filters": [], @@ -641,7 +641,7 @@ func TestEEsCfgAsMapInterface(t *testing.T) { utils.TypeCfg: "*file_csv", utils.ExportPathCfg: "/tmp/testCSV", utils.OptsCfg: map[string]interface{}{ - utils.KafkaGroupID: "test", + utils.KafkaTopic: "test", }, utils.TimezoneCfg: "UTC", utils.FiltersCfg: []string{}, @@ -679,7 +679,7 @@ func TestEEsCfgAsMapInterface(t *testing.T) { if !reflect.DeepEqual(rcv[utils.ExportersCfg].([]map[string]interface{})[1], eMap[utils.ExportersCfg].([]map[string]interface{})[0]) { t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(eMap[utils.ExportersCfg].([]map[string]interface{})[0]), - utils.ToJSON(rcv[utils.ExportersCfg].([]map[string]interface{})[0])) + utils.ToJSON(rcv[utils.ExportersCfg].([]map[string]interface{})[1])) } rcv[utils.ExportersCfg] = nil eMap[utils.ExportersCfg] = nil diff --git a/config/erscfg.go b/config/erscfg.go index 9af36a456..e47baeecf 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -77,7 +77,7 @@ func (erS *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, msgTempla rdr = dfltRdrCfg.Clone() } else { rdr = new(EventReaderCfg) - rdr.Opts = make(map[string]interface{}) + rdr.Opts = &EventReaderOpts{} } erS.Readers = append(erS.Readers, rdr) } @@ -205,7 +205,7 @@ type EventReaderCfg struct { ConcurrentReqs int SourcePath string ProcessedPath string - Opts map[string]interface{} + Opts *EventReaderOpts Tenant RSRParsers Timezone string Filters []string @@ -483,9 +483,7 @@ func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplat } } if jsnCfg.Opts != nil { - for k, v := range jsnCfg.Opts { - er.Opts[k] = v - } + err = er.Opts.loadFromJSONCfg(jsnCfg.Opts) } return } @@ -684,7 +682,7 @@ func (er EventReaderCfg) Clone() (cln *EventReaderCfg) { Tenant: er.Tenant.Clone(), Timezone: er.Timezone, Flags: er.Flags.Clone(), - Opts: make(map[string]interface{}), + Opts: er.Opts.Clone(), } if er.Filters != nil { cln.Filters = make([]string, len(er.Filters)) @@ -710,193 +708,190 @@ func (er EventReaderCfg) Clone() (cln *EventReaderCfg) { cln.PartialCommitFields[idx] = fld.Clone() } } - for k, v := range er.Opts { - cln.Opts[k] = v - } return } // AsMapInterface returns the config as a map[string]interface{} func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string]interface{}) { - // opts := map[string]interface{}{} + opts := map[string]interface{}{} - // if er.Opts.PartialPath != nil { - // opts[utils.PartialPathOpt] = *er.Opts.PartialPath - // } - // if er.Opts.PartialCacheAction != nil { - // opts[utils.PartialCacheActionOpt] = *er.Opts.PartialCacheAction - // } - // if er.Opts.PartialOrderField != nil { - // opts[utils.PartialOrderFieldOpt] = *er.Opts.PartialOrderField - // } - // if er.Opts.PartialCSVFieldSeparator != nil { - // opts[utils.PartialCSVFieldSepartorOpt] = *er.Opts.PartialCSVFieldSeparator - // } - // if er.Opts.CSVRowLength != nil { - // opts[utils.CSVRowLengthOpt] = *er.Opts.CSVRowLength - // } - // if er.Opts.CSVFieldSeparator != nil { - // opts[utils.CSVFieldSepOpt] = *er.Opts.CSVFieldSeparator - // } - // if er.Opts.CSVHeaderDefineChar != nil { - // opts[utils.HeaderDefineCharOpt] = *er.Opts.CSVHeaderDefineChar - // } - // if er.Opts.CSVLazyQuotes != nil { - // opts[utils.CSVLazyQuotes] = *er.Opts.CSVLazyQuotes - // } - // if er.Opts.XMLRootPath != nil { - // opts[utils.XMLRootPathOpt] = *er.Opts.XMLRootPath - // } - // if er.Opts.AMQPQueueID != nil { - // opts[utils.AMQPQueueID] = *er.Opts.AMQPQueueID - // } - // if er.Opts.AMQPQueueIDProcessed != nil { - // opts[utils.AMQPQueueIDProcessedCfg] = *er.Opts.AMQPQueueIDProcessed - // } - // if er.Opts.AMQPConsumerTag != nil { - // opts[utils.AMQPConsumerTag] = *er.Opts.AMQPConsumerTag - // } - // if er.Opts.AMQPExchange != nil { - // opts[utils.AMQPExchange] = *er.Opts.AMQPExchange - // } - // if er.Opts.AMQPExchangeType != nil { - // opts[utils.AMQPExchangeType] = *er.Opts.AMQPExchangeType - // } - // if er.Opts.AMQPRoutingKey != nil { - // opts[utils.AMQPRoutingKey] = *er.Opts.AMQPRoutingKey - // } - // if er.Opts.AMQPExchangeProcessed != nil { - // opts[utils.AMQPExchangeProcessedCfg] = *er.Opts.AMQPExchangeProcessed - // } - // if er.Opts.AMQPExchangeTypeProcessed != nil { - // opts[utils.AMQPExchangeTypeProcessedCfg] = *er.Opts.AMQPExchangeTypeProcessed - // } - // if er.Opts.AMQPRoutingKeyProcessed != nil { - // opts[utils.AMQPRoutingKeyProcessedCfg] = *er.Opts.AMQPRoutingKeyProcessed - // } - // if er.Opts.KafkaTopic != nil { - // opts[utils.KafkaTopic] = *er.Opts.KafkaTopic - // } - // if er.Opts.KafkaGroupID != nil { - // opts[utils.KafkaGroupID] = *er.Opts.KafkaGroupID - // } - // if er.Opts.KafkaMaxWait != nil { - // opts[utils.KafkaMaxWait] = er.Opts.KafkaMaxWait.String() - // } - // if er.Opts.KafkaTopicProcessed != nil { - // opts[utils.KafkaTopicProcessedCfg] = *er.Opts.KafkaTopicProcessed - // } - // if er.Opts.SQLDBName != nil { - // opts[utils.SQLDBNameOpt] = *er.Opts.SQLDBName - // } - // if er.Opts.SQLTableName != nil { - // opts[utils.SQLTableNameOpt] = *er.Opts.SQLTableName - // } - // if er.Opts.SSLMode != nil { - // opts[utils.SSLModeCfg] = *er.Opts.SSLMode - // } - // if er.Opts.SQLDBNameProcessed != nil { - // opts[utils.SQLDBNameProcessedCfg] = *er.Opts.SQLDBNameProcessed - // } - // if er.Opts.SQLTableNameProcessed != nil { - // opts[utils.SQLTableNameProcessedCfg] = *er.Opts.SQLTableNameProcessed - // } - // if er.Opts.SSLModeProcessed != nil { - // opts[utils.SSLModeProcessedCfg] = *er.Opts.SSLModeProcessed - // } - // if er.Opts.AWSRegion != nil { - // opts[utils.AWSRegion] = *er.Opts.AWSRegion - // } - // if er.Opts.AWSKey != nil { - // opts[utils.AWSKey] = *er.Opts.AWSKey - // } - // if er.Opts.AWSSecret != nil { - // opts[utils.AWSSecret] = *er.Opts.AWSSecret - // } - // if er.Opts.AWSToken != nil { - // opts[utils.AWSToken] = *er.Opts.AWSToken - // } - // if er.Opts.AWSRegionProcessed != nil { - // opts[utils.AWSRegionProcessedCfg] = *er.Opts.AWSRegionProcessed - // } - // if er.Opts.AWSKeyProcessed != nil { - // opts[utils.AWSKeyProcessedCfg] = *er.Opts.AWSKeyProcessed - // } - // if er.Opts.AWSSecretProcessed != nil { - // opts[utils.AWSSecretProcessedCfg] = *er.Opts.AWSSecretProcessed - // } - // if er.Opts.AWSTokenProcessed != nil { - // opts[utils.AWSTokenProcessedCfg] = *er.Opts.AWSTokenProcessed - // } - // if er.Opts.SQSQueueID != nil { - // opts[utils.SQSQueueID] = *er.Opts.SQSQueueID - // } - // if er.Opts.SQSQueueIDProcessed != nil { - // opts[utils.SQSQueueIDProcessedCfg] = *er.Opts.SQSQueueIDProcessed - // } - // if er.Opts.S3BucketID != nil { - // opts[utils.S3Bucket] = *er.Opts.S3BucketID - // } - // if er.Opts.S3FolderPathProcessed != nil { - // opts[utils.S3FolderPathProcessedCfg] = *er.Opts.S3FolderPathProcessed - // } - // if er.Opts.S3BucketIDProcessed != nil { - // opts[utils.S3BucketIDProcessedCfg] = *er.Opts.S3BucketIDProcessed - // } - // if er.Opts.NATSJetStream != nil { - // opts[utils.NatsJetStream] = *er.Opts.NATSJetStream - // } - // if er.Opts.NATSConsumerName != nil { - // opts[utils.NatsConsumerName] = *er.Opts.NATSConsumerName - // } - // if er.Opts.NATSSubject != nil { - // opts[utils.NatsSubject] = *er.Opts.NATSSubject - // } - // if er.Opts.NATSQueueID != nil { - // opts[utils.NatsQueueID] = *er.Opts.NATSQueueID - // } - // if er.Opts.NATSJWTFile != nil { - // opts[utils.NatsJWTFile] = *er.Opts.NATSJWTFile - // } - // if er.Opts.NATSSeedFile != nil { - // opts[utils.NatsSeedFile] = *er.Opts.NATSSeedFile - // } - // if er.Opts.NATSCertificateAuthority != nil { - // opts[utils.NatsCertificateAuthority] = *er.Opts.NATSCertificateAuthority - // } - // if er.Opts.NATSClientCertificate != nil { - // opts[utils.NatsClientCertificate] = *er.Opts.NATSClientCertificate - // } - // if er.Opts.NATSClientKey != nil { - // opts[utils.NatsClientKey] = *er.Opts.NATSClientKey - // } - // if er.Opts.NATSJetStreamMaxWait != nil { - // opts[utils.NatsJetStreamMaxWait] = er.Opts.NATSJetStreamMaxWait.String() - // } - // if er.Opts.NATSJetStreamProcessed != nil { - // opts[utils.NATSJetStreamProcessedCfg] = *er.Opts.NATSJetStreamProcessed - // } - // if er.Opts.NATSSubjectProcessed != nil { - // opts[utils.NATSSubjectProcessedCfg] = *er.Opts.NATSSubjectProcessed - // } - // if er.Opts.NATSJWTFileProcessed != nil { - // opts[utils.NATSJWTFileProcessedCfg] = *er.Opts.NATSJWTFileProcessed - // } - // if er.Opts.NATSSeedFileProcessed != nil { - // opts[utils.NATSSeedFileProcessedCfg] = *er.Opts.NATSSeedFileProcessed - // } - // if er.Opts.NATSCertificateAuthorityProcessed != nil { - // opts[utils.NATSCertificateAuthorityProcessedCfg] = *er.Opts.NATSCertificateAuthorityProcessed - // } - // if er.Opts.NATSClientCertificateProcessed != nil { - // opts[utils.NATSClientCertificateProcessed] = *er.Opts.NATSClientCertificateProcessed - // } - // if er.Opts.NATSClientKeyProcessed != nil { - // opts[utils.NATSClientKeyProcessedCfg] = *er.Opts.NATSClientKeyProcessed - // } - // if er.Opts.NATSJetStreamMaxWaitProcessed != nil { - // opts[utils.NATSJetStreamMaxWaitProcessedCfg] = er.Opts.NATSJetStreamMaxWaitProcessed.String() - // } + if er.Opts.PartialPath != nil { + opts[utils.PartialPathOpt] = *er.Opts.PartialPath + } + if er.Opts.PartialCacheAction != nil { + opts[utils.PartialCacheActionOpt] = *er.Opts.PartialCacheAction + } + if er.Opts.PartialOrderField != nil { + opts[utils.PartialOrderFieldOpt] = *er.Opts.PartialOrderField + } + if er.Opts.PartialCSVFieldSeparator != nil { + opts[utils.PartialCSVFieldSepartorOpt] = *er.Opts.PartialCSVFieldSeparator + } + if er.Opts.CSVRowLength != nil { + opts[utils.CSVRowLengthOpt] = *er.Opts.CSVRowLength + } + if er.Opts.CSVFieldSeparator != nil { + opts[utils.CSVFieldSepOpt] = *er.Opts.CSVFieldSeparator + } + if er.Opts.CSVHeaderDefineChar != nil { + opts[utils.HeaderDefineCharOpt] = *er.Opts.CSVHeaderDefineChar + } + if er.Opts.CSVLazyQuotes != nil { + opts[utils.CSVLazyQuotes] = *er.Opts.CSVLazyQuotes + } + if er.Opts.XMLRootPath != nil { + opts[utils.XMLRootPathOpt] = *er.Opts.XMLRootPath + } + if er.Opts.AMQPQueueID != nil { + opts[utils.AMQPQueueID] = *er.Opts.AMQPQueueID + } + if er.Opts.AMQPQueueIDProcessed != nil { + opts[utils.AMQPQueueIDProcessedCfg] = *er.Opts.AMQPQueueIDProcessed + } + if er.Opts.AMQPConsumerTag != nil { + opts[utils.AMQPConsumerTag] = *er.Opts.AMQPConsumerTag + } + if er.Opts.AMQPExchange != nil { + opts[utils.AMQPExchange] = *er.Opts.AMQPExchange + } + if er.Opts.AMQPExchangeType != nil { + opts[utils.AMQPExchangeType] = *er.Opts.AMQPExchangeType + } + if er.Opts.AMQPRoutingKey != nil { + opts[utils.AMQPRoutingKey] = *er.Opts.AMQPRoutingKey + } + if er.Opts.AMQPExchangeProcessed != nil { + opts[utils.AMQPExchangeProcessedCfg] = *er.Opts.AMQPExchangeProcessed + } + if er.Opts.AMQPExchangeTypeProcessed != nil { + opts[utils.AMQPExchangeTypeProcessedCfg] = *er.Opts.AMQPExchangeTypeProcessed + } + if er.Opts.AMQPRoutingKeyProcessed != nil { + opts[utils.AMQPRoutingKeyProcessedCfg] = *er.Opts.AMQPRoutingKeyProcessed + } + if er.Opts.KafkaTopic != nil { + opts[utils.KafkaTopic] = *er.Opts.KafkaTopic + } + if er.Opts.KafkaGroupID != nil { + opts[utils.KafkaGroupID] = *er.Opts.KafkaGroupID + } + if er.Opts.KafkaMaxWait != nil { + opts[utils.KafkaMaxWait] = er.Opts.KafkaMaxWait.String() + } + if er.Opts.KafkaTopicProcessed != nil { + opts[utils.KafkaTopicProcessedCfg] = *er.Opts.KafkaTopicProcessed + } + if er.Opts.SQLDBName != nil { + opts[utils.SQLDBNameOpt] = *er.Opts.SQLDBName + } + if er.Opts.SQLTableName != nil { + opts[utils.SQLTableNameOpt] = *er.Opts.SQLTableName + } + if er.Opts.SSLMode != nil { + opts[utils.SSLModeCfg] = *er.Opts.SSLMode + } + if er.Opts.SQLDBNameProcessed != nil { + opts[utils.SQLDBNameProcessedCfg] = *er.Opts.SQLDBNameProcessed + } + if er.Opts.SQLTableNameProcessed != nil { + opts[utils.SQLTableNameProcessedCfg] = *er.Opts.SQLTableNameProcessed + } + if er.Opts.SSLModeProcessed != nil { + opts[utils.SSLModeProcessedCfg] = *er.Opts.SSLModeProcessed + } + if er.Opts.AWSRegion != nil { + opts[utils.AWSRegion] = *er.Opts.AWSRegion + } + if er.Opts.AWSKey != nil { + opts[utils.AWSKey] = *er.Opts.AWSKey + } + if er.Opts.AWSSecret != nil { + opts[utils.AWSSecret] = *er.Opts.AWSSecret + } + if er.Opts.AWSToken != nil { + opts[utils.AWSToken] = *er.Opts.AWSToken + } + if er.Opts.AWSRegionProcessed != nil { + opts[utils.AWSRegionProcessedCfg] = *er.Opts.AWSRegionProcessed + } + if er.Opts.AWSKeyProcessed != nil { + opts[utils.AWSKeyProcessedCfg] = *er.Opts.AWSKeyProcessed + } + if er.Opts.AWSSecretProcessed != nil { + opts[utils.AWSSecretProcessedCfg] = *er.Opts.AWSSecretProcessed + } + if er.Opts.AWSTokenProcessed != nil { + opts[utils.AWSTokenProcessedCfg] = *er.Opts.AWSTokenProcessed + } + if er.Opts.SQSQueueID != nil { + opts[utils.SQSQueueID] = *er.Opts.SQSQueueID + } + if er.Opts.SQSQueueIDProcessed != nil { + opts[utils.SQSQueueIDProcessedCfg] = *er.Opts.SQSQueueIDProcessed + } + if er.Opts.S3BucketID != nil { + opts[utils.S3Bucket] = *er.Opts.S3BucketID + } + if er.Opts.S3FolderPathProcessed != nil { + opts[utils.S3FolderPathProcessedCfg] = *er.Opts.S3FolderPathProcessed + } + if er.Opts.S3BucketIDProcessed != nil { + opts[utils.S3BucketIDProcessedCfg] = *er.Opts.S3BucketIDProcessed + } + if er.Opts.NATSJetStream != nil { + opts[utils.NatsJetStream] = *er.Opts.NATSJetStream + } + if er.Opts.NATSConsumerName != nil { + opts[utils.NatsConsumerName] = *er.Opts.NATSConsumerName + } + if er.Opts.NATSSubject != nil { + opts[utils.NatsSubject] = *er.Opts.NATSSubject + } + if er.Opts.NATSQueueID != nil { + opts[utils.NatsQueueID] = *er.Opts.NATSQueueID + } + if er.Opts.NATSJWTFile != nil { + opts[utils.NatsJWTFile] = *er.Opts.NATSJWTFile + } + if er.Opts.NATSSeedFile != nil { + opts[utils.NatsSeedFile] = *er.Opts.NATSSeedFile + } + if er.Opts.NATSCertificateAuthority != nil { + opts[utils.NatsCertificateAuthority] = *er.Opts.NATSCertificateAuthority + } + if er.Opts.NATSClientCertificate != nil { + opts[utils.NatsClientCertificate] = *er.Opts.NATSClientCertificate + } + if er.Opts.NATSClientKey != nil { + opts[utils.NatsClientKey] = *er.Opts.NATSClientKey + } + if er.Opts.NATSJetStreamMaxWait != nil { + opts[utils.NatsJetStreamMaxWait] = er.Opts.NATSJetStreamMaxWait.String() + } + if er.Opts.NATSJetStreamProcessed != nil { + opts[utils.NATSJetStreamProcessedCfg] = *er.Opts.NATSJetStreamProcessed + } + if er.Opts.NATSSubjectProcessed != nil { + opts[utils.NATSSubjectProcessedCfg] = *er.Opts.NATSSubjectProcessed + } + if er.Opts.NATSJWTFileProcessed != nil { + opts[utils.NATSJWTFileProcessedCfg] = *er.Opts.NATSJWTFileProcessed + } + if er.Opts.NATSSeedFileProcessed != nil { + opts[utils.NATSSeedFileProcessedCfg] = *er.Opts.NATSSeedFileProcessed + } + if er.Opts.NATSCertificateAuthorityProcessed != nil { + opts[utils.NATSCertificateAuthorityProcessedCfg] = *er.Opts.NATSCertificateAuthorityProcessed + } + if er.Opts.NATSClientCertificateProcessed != nil { + opts[utils.NATSClientCertificateProcessed] = *er.Opts.NATSClientCertificateProcessed + } + if er.Opts.NATSClientKeyProcessed != nil { + opts[utils.NATSClientKeyProcessedCfg] = *er.Opts.NATSClientKeyProcessed + } + if er.Opts.NATSJetStreamMaxWaitProcessed != nil { + opts[utils.NATSJetStreamMaxWaitProcessedCfg] = er.Opts.NATSJetStreamMaxWaitProcessed.String() + } initialMP = map[string]interface{}{ utils.IDCfg: er.ID, @@ -909,12 +904,9 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string utils.FiltersCfg: er.Filters, utils.FlagsCfg: []string{}, utils.RunDelayCfg: "0", + utils.OptsCfg: opts, } - opts := make(map[string]interface{}) - for k, v := range er.Opts { - opts[k] = v - } initialMP[utils.OptsCfg] = opts if flags := er.Flags.SliceFlags(); flags != nil { diff --git a/config/erscfg_test.go b/config/erscfg_test.go index 1667c1726..4012099e8 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -38,9 +38,7 @@ func TestERSClone(t *testing.T) { "flags": ["*dryrun"], "source_path": "/tmp/ers/in", "processed_path": "/tmp/ers/out", - "opts": { - "*default": "randomVal" - }, + "opts": {}, "xml_root_path": "", "tenant": "~*req.Destination1", "timezone": "", @@ -101,14 +99,14 @@ func TestERSClone(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, { @@ -133,15 +131,14 @@ func TestERSClone(t *testing.T) { Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, }, PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - utils.MetaDefault: "randomVal", - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -246,14 +243,14 @@ func TestERSLoadFromjsonCfg(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, { @@ -293,14 +290,14 @@ func TestERSLoadFromjsonCfg(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -473,14 +470,14 @@ func TestERSloadFromJsonCase3(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, { @@ -504,14 +501,14 @@ func TestERSloadFromJsonCase3(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -608,14 +605,14 @@ func TestERSloadFromJsonCase4(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, { @@ -639,14 +636,14 @@ func TestERSloadFromJsonCase4(t *testing.T) { }, }, PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -738,14 +735,14 @@ func TestEventReaderSameID(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, { @@ -765,14 +762,14 @@ func TestEventReaderSameID(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -875,7 +872,7 @@ func TestERsCfgAsMapInterfaceCase1(t *testing.T) { utils.OptsCfg: map[string]interface{}{ "csvFieldSeparator": ",", "csvHeaderDefineChar": ":", - "csvRowLength": 0., + "csvRowLength": 0, "xmlRootPath": "", "partialOrderField": "~*req.AnswerTime", "partialCacheAction": utils.MetaNone, @@ -911,7 +908,7 @@ func TestERsCfgAsMapInterfaceCase1(t *testing.T) { utils.OptsCfg: map[string]interface{}{ "csvFieldSeparator": ",", "csvHeaderDefineChar": ":", - "csvRowLength": 0., + "csvRowLength": 0, "xmlRootPath": "", "partialOrderField": "~*req.AnswerTime", "partialCacheAction": utils.MetaNone, @@ -987,7 +984,7 @@ func TestERSCfgAsMapInterfaceCase2(t *testing.T) { utils.OptsCfg: map[string]interface{}{ "csvFieldSeparator": ",", "csvHeaderDefineChar": ":", - "csvRowLength": 0., + "csvRowLength": 0, "xmlRootPath": "", "partialOrderField": "~*req.AnswerTime", "partialCacheAction": utils.MetaNone, @@ -1026,7 +1023,7 @@ func TestERSCfgAsMapInterfaceCase2(t *testing.T) { utils.KafkaGroupID: "test", "csvFieldSeparator": ",", "csvHeaderDefineChar": ":", - "csvRowLength": 0., + "csvRowLength": 0, "xmlRootPath": "", "partialOrderField": "~*req.AnswerTime", "partialCacheAction": utils.MetaNone, @@ -1113,14 +1110,14 @@ func TestERsloadFromJsonCfg(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, { @@ -1146,14 +1143,14 @@ func TestERsloadFromJsonCfg(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), - Opts: map[string]interface{}{ - "csvFieldSeparator": ",", - "csvHeaderDefineChar": ":", - "csvRowLength": 0., - "xmlRootPath": "", - "partialOrderField": "~*req.AnswerTime", - "partialCacheAction": utils.MetaNone, - "natsSubject": "cgrates_cdrs", + Opts: &EventReaderOpts{ + CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), + CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), + CSVRowLength: utils.IntPointer(0), + XMLRootPath: utils.StringPointer(utils.EmptyString), + PartialOrderField: utils.StringPointer("~*req.AnswerTime"), + PartialCacheAction: utils.StringPointer(utils.MetaNone), + NATSSubject: utils.StringPointer("cgrates_cdrs"), }, }, }, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 2fa7696b9..60a31d5bd 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -251,7 +251,7 @@ type EventReaderJsonCfg struct { Concurrent_requests *int Source_path *string Processed_path *string - Opts map[string]interface{} + Opts *EventReaderOptsJson Tenant *string Timezone *string Filters *[]string diff --git a/ees/nats_test.go b/ees/nats_test.go index 90c45bbfc..8bc377f44 100644 --- a/ees/nats_test.go +++ b/ees/nats_test.go @@ -34,6 +34,7 @@ func TestNewNatsEE(t *testing.T) { Type: "nats", Attempts: 2, ConcurrentRequests: 2, + Opts: &config.EventExporterOpts{}, } nodeID := "node_id1" connTimeout := 2 * time.Second @@ -72,6 +73,7 @@ func TestParseOpt(t *testing.T) { Type: "nats", Attempts: 2, ConcurrentRequests: 2, + Opts: &config.EventExporterOpts{}, } opts := &config.EventExporterOpts{} nodeID := "node_id1" @@ -98,6 +100,7 @@ func TestParseOptJetStream(t *testing.T) { Type: "nats", Attempts: 2, ConcurrentRequests: 2, + Opts: &config.EventExporterOpts{}, } opts := &config.EventExporterOpts{ NATSJetStream: utils.BoolPointer(true), @@ -129,6 +132,7 @@ func TestParseOptJetStreamMaxWait(t *testing.T) { Type: "nats", Attempts: 2, ConcurrentRequests: 2, + Opts: &config.EventExporterOpts{}, } opts := &config.EventExporterOpts{ NATSJetStream: utils.BoolPointer(true), @@ -161,6 +165,7 @@ func TestParseOptSubject(t *testing.T) { Type: "nats", Attempts: 2, ConcurrentRequests: 2, + Opts: &config.EventExporterOpts{}, } opts := &config.EventExporterOpts{ NATSSubject: utils.StringPointer("nats_subject"), diff --git a/ers/amqp.go b/ers/amqp.go index 1d24f735b..740ef6439 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -212,24 +212,24 @@ func (rdr *AMQPER) processMessage(msg []byte) (err error) { return } -func (rdr *AMQPER) setOpts(opts map[string]interface{}) { +func (rdr *AMQPER) setOpts(opts *config.EventReaderOpts) { rdr.queueID = utils.DefaultQueueID - if vals, has := opts[utils.AMQPQueueID]; has { - rdr.queueID = utils.IfaceAsString(vals) + if opts.AMQPQueueID != nil { + rdr.queueID = *opts.AMQPQueueID } rdr.tag = utils.AMQPDefaultConsumerTag - if vals, has := opts[utils.AMQPConsumerTag]; has { - rdr.tag = utils.IfaceAsString(vals) + if opts.AMQPConsumerTag != nil { + rdr.tag = *opts.AMQPConsumerTag } - if vals, has := opts[utils.AMQPRoutingKey]; has { - rdr.routingKey = utils.IfaceAsString(vals) + if opts.AMQPRoutingKey != nil { + rdr.routingKey = *opts.AMQPRoutingKey } - if vals, has := opts[utils.AMQPExchange]; has { - rdr.exchange = utils.IfaceAsString(vals) + if opts.AMQPExchange != nil { + rdr.exchange = *opts.AMQPExchange rdr.exchangeType = utils.DefaultExchangeType } - if vals, has := opts[utils.AMQPExchangeType]; has { - rdr.exchangeType = utils.IfaceAsString(vals) + if opts.AMQPExchangeType != nil { + rdr.exchangeType = *opts.AMQPExchangeType } } @@ -250,9 +250,11 @@ func (rdr *AMQPER) close() (err error) { func (rdr *AMQPER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if len(processedOpt) == 0 && - len(rdr.Config().ProcessedPath) == 0 { - return + if processedOpt == nil { + if len(rdr.Config().ProcessedPath) == 0 { + return + } + processedOpt = new(config.EventExporterOpts) } rdr.poster = ees.NewAMQPee(&config.EventExporterCfg{ ID: rdr.Config().ID, diff --git a/ers/amqp_test.go b/ers/amqp_test.go index 9b72c5b4f..39bef3b06 100644 --- a/ers/amqp_test.go +++ b/ers/amqp_test.go @@ -21,6 +21,7 @@ package ers import ( "testing" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -32,7 +33,10 @@ func TestAMQPSetOpts(t *testing.T) { queueID: "cdrs", tag: "new", } - if k.setOpts(map[string]interface{}{utils.AMQPQueueID: "cdrs", utils.AMQPConsumerTag: "new"}); expKafka.dialURL != k.dialURL { + if k.setOpts(&config.EventReaderOpts{ + AMQPQueueID: utils.StringPointer("cdrs"), + AMQPConsumerTag: utils.StringPointer("new"), + }); expKafka.dialURL != k.dialURL { t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) } else if expKafka.queueID != k.queueID { t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID) @@ -46,7 +50,7 @@ func TestAMQPSetOpts(t *testing.T) { queueID: "cgrates_cdrs", tag: "cgrates", } - if k.setOpts(map[string]interface{}{}); expKafka.dialURL != k.dialURL { + if k.setOpts(&config.EventReaderOpts{}); expKafka.dialURL != k.dialURL { t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) } else if expKafka.queueID != k.queueID { t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID) diff --git a/ers/amqpv1.go b/ers/amqpv1.go index 1cfefff09..6f42bbb39 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -51,8 +51,8 @@ func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int, rdr.cap <- struct{}{} } } - if vals, has := rdr.Config().Opts[utils.AMQPQueueID]; has { - rdr.queueID = "/" + utils.IfaceAsString(vals) + if rdr.Config().Opts.AMQPQueueID != nil { + rdr.queueID = "/" + *rdr.Config().Opts.AMQPQueueID } rdr.createPoster() return rdr, nil @@ -202,9 +202,11 @@ func (rdr *AMQPv1ER) close() (err error) { func (rdr *AMQPv1ER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if len(processedOpt) == 0 && - len(rdr.Config().ProcessedPath) == 0 { - return + if processedOpt == nil { + if len(rdr.Config().ProcessedPath) == 0 { + return + } + processedOpt = new(config.EventExporterOpts) } rdr.poster = ees.NewAMQPv1EE(&config.EventExporterCfg{ ID: rdr.Config().ID, diff --git a/ers/amqpv1_it_test.go b/ers/amqpv1_it_test.go index 5b82b5bf1..ec0f47d4a 100644 --- a/ers/amqpv1_it_test.go +++ b/ers/amqpv1_it_test.go @@ -152,7 +152,7 @@ func TestAmqpv1NewAMQPv1ER(t *testing.T) { SourcePath: "/var/spool/cgrates/ers/in", ProcessedPath: "/var/spool/cgrates/ers/out", Filters: []string{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, }, } @@ -183,7 +183,7 @@ func TestAmqpv1NewAMQPv1ER2(t *testing.T) { SourcePath: "/var/spool/cgrates/ers/in", ProcessedPath: "/var/spool/cgrates/ers/out", Filters: []string{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, }, } diff --git a/ers/ers.go b/ers/ers.go index 44438e448..65872ea3b 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -381,8 +381,8 @@ func (erS *ERService) onEvicted(id string, value interface{}) { } eEvs := value.(*erEvents) var action string - if cAct, has := eEvs.rdrCfg.Opts[utils.PartialCacheActionOpt]; has { // if the option is present overwrite the global cache action - action = utils.IfaceAsString(cAct) + if eEvs.rdrCfg.Opts.PartialCacheAction != nil { + action = *eEvs.rdrCfg.Opts.PartialCacheAction } switch action { case utils.MetaNone: // do nothing with the events @@ -400,8 +400,8 @@ func (erS *ERService) onEvicted(id string, value interface{}) { erS.rdrEvents <- &erEvent{cgrEvent: cgrEv, rdrCfg: eEvs.rdrCfg} case utils.MetaDumpToFile: // apply the cacheDumpFields to the united events and write the record to file expPath := eEvs.rdrCfg.ProcessedPath - if pathVal, has := eEvs.rdrCfg.Opts[utils.PartialPathOpt]; has { - expPath = utils.IfaceAsString(pathVal) + if eEvs.rdrCfg.Opts.PartialPath != nil { + expPath = *eEvs.rdrCfg.Opts.PartialPath } if expPath == utils.EmptyString { // do not write the partial event to file return @@ -457,8 +457,8 @@ func (erS *ERService) onEvicted(id string, value interface{}) { return } csvWriter := csv.NewWriter(fileOut) - if fldSep, has := eEvs.rdrCfg.Opts[utils.PartialCSVFieldSepartorOpt]; has { - csvWriter.Comma = rune(utils.IfaceAsString(fldSep)[0]) + if eEvs.rdrCfg.Opts.PartialCSVFieldSeparator != nil { + csvWriter.Comma = rune((*eEvs.rdrCfg.Opts.PartialCSVFieldSeparator)[0]) } if err = csvWriter.Write(record); err != nil { @@ -469,8 +469,8 @@ func (erS *ERService) onEvicted(id string, value interface{}) { fileOut.Close() case utils.MetaDumpToJSON: // apply the cacheDumpFields to the united events and write the record to file expPath := eEvs.rdrCfg.ProcessedPath - if pathVal, has := eEvs.rdrCfg.Opts[utils.PartialPathOpt]; has { - expPath = utils.IfaceAsString(pathVal) + if eEvs.rdrCfg.Opts.PartialPath != nil { + expPath = *eEvs.rdrCfg.Opts.PartialPath } if expPath == utils.EmptyString { // do not write the partial event to file return diff --git a/ers/ers_it_test.go b/ers/ers_it_test.go index 3c319b4e8..dd6bd367c 100644 --- a/ers/ers_it_test.go +++ b/ers/ers_it_test.go @@ -791,9 +791,9 @@ func TestErsOnEvictedMetaDumpToFileOK(t *testing.T) { rdrCfg: &config.EventReaderCfg{ ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToFile, - utils.PartialPathOpt: dirPath, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToFile), + PartialPath: utils.StringPointer(dirPath), }, CacheDumpFields: []*config.FCTemplate{ { @@ -874,10 +874,10 @@ func TestErsOnEvictedMetaDumpToFileCSVWriteErr(t *testing.T) { rdrCfg: &config.EventReaderCfg{ ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToFile, - utils.PartialPathOpt: dirPath, - utils.PartialCSVFieldSepartorOpt: "\"", + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToFile), + PartialPath: utils.StringPointer(dirPath), + PartialCSVFieldSeparator: utils.StringPointer("\""), }, }, } @@ -931,9 +931,9 @@ func TestErsOnEvictedMetaDumpToFileCreateErr(t *testing.T) { rdrCfg: &config.EventReaderCfg{ ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToFile, - utils.PartialPathOpt: dirPath + "/non-existent", + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToFile), + PartialPath: utils.StringPointer(dirPath + "/non-existent"), }, }, } @@ -990,10 +990,10 @@ func TestERsOnEvictedDumpToJSON(t *testing.T) { rdrCfg: &config.EventReaderCfg{ // CacheDumpFields will be empty ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToJSON, - utils.PartialPathOpt: dirPath, - utils.PartialOrderFieldOpt: 2, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToJSON), + PartialPath: utils.StringPointer(dirPath), + PartialOrderField: utils.StringPointer("2"), }, }, } @@ -1074,10 +1074,10 @@ func TestErsOnEvictedDumpToJSONNoPath(t *testing.T) { rdrCfg: &config.EventReaderCfg{ // CacheDumpFields will be empty ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToJSON, - utils.PartialPathOpt: dirPath, - utils.PartialOrderFieldOpt: 2, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToJSON), + PartialPath: utils.StringPointer(dirPath), + PartialOrderField: utils.StringPointer("2"), }, }, } @@ -1152,9 +1152,9 @@ func TestErsOnEvictedDumpToJSONMergeError(t *testing.T) { rdrCfg: &config.EventReaderCfg{ // CacheDumpFields will be empty ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToJSON, - utils.PartialPathOpt: dirPath, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToJSON), + PartialPath: utils.StringPointer(dirPath), }, }, } @@ -1219,10 +1219,10 @@ func TestERsOnEvictedDumpToJSONWithCacheDumpFieldsErrPrefix(t *testing.T) { rdrCfg: &config.EventReaderCfg{ // CacheDumpFields will be empty ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToJSON, - utils.PartialPathOpt: dirPath, - utils.PartialOrderFieldOpt: 2, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToJSON), + PartialPath: utils.StringPointer(dirPath), + PartialOrderField: utils.StringPointer("2"), }, CacheDumpFields: []*config.FCTemplate{ { @@ -1291,10 +1291,10 @@ func TestERsOnEvictedDumpToJSONWithCacheDumpFields(t *testing.T) { rdrCfg: &config.EventReaderCfg{ // CacheDumpFields will be empty ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToJSON, - utils.PartialPathOpt: dirPath, - utils.PartialOrderFieldOpt: 2, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToJSON), + PartialPath: utils.StringPointer(dirPath), + PartialOrderField: utils.StringPointer("2"), }, Fields: []*config.FCTemplate{ {Tag: "SessionId", Path: utils.EmptyString, Type: "*variable", @@ -1391,10 +1391,10 @@ func TestErsOnEvictedDumpToJSONInvalidPath(t *testing.T) { rdrCfg: &config.EventReaderCfg{ // CacheDumpFields will be empty ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToJSON, - utils.PartialPathOpt: "invalid_path", - utils.PartialOrderFieldOpt: 2, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToJSON), + PartialPath: utils.StringPointer(dirPath), + PartialOrderField: utils.StringPointer("2"), }, }, } @@ -1462,10 +1462,10 @@ func TestErsOnEvictedDumpToJSONEncodeErr(t *testing.T) { rdrCfg: &config.EventReaderCfg{ // CacheDumpFields will be empty ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToJSON, - utils.PartialPathOpt: dirPath, - utils.PartialOrderFieldOpt: 2, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToJSON), + PartialPath: utils.StringPointer(dirPath), + PartialOrderField: utils.StringPointer("2"), }, }, } diff --git a/ers/ers_test.go b/ers/ers_test.go index 4ec176d2f..c947627ab 100644 --- a/ers/ers_test.go +++ b/ers/ers_test.go @@ -50,7 +50,7 @@ func TestERsProcessPartialEvent(t *testing.T) { SourcePath: "/var/spool/cgrates/ers/in", ProcessedPath: "/var/spool/cgrates/ers/out", Filters: []string{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, } args := &erEvent{ @@ -93,8 +93,8 @@ func TestErsOnEvictedMetaPostCDROK(t *testing.T) { ID: "ER1", Type: utils.MetaNone, ProcessedPath: "/tmp", - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaPostCDR, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaPostCDR), }, }, } @@ -158,8 +158,8 @@ func TestErsOnEvictedMetaPostCDRMergeErr(t *testing.T) { ID: "ER1", Type: utils.MetaNone, ProcessedPath: "/tmp", - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaPostCDR, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaPostCDR), }, }, } @@ -206,9 +206,9 @@ func TestErsOnEvictedMetaDumpToFileSetFieldsErr(t *testing.T) { rdrCfg: &config.EventReaderCfg{ ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToFile, - utils.PartialPathOpt: dirPath, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToFile), + PartialPath: utils.StringPointer(dirPath), }, CacheDumpFields: []*config.FCTemplate{ { @@ -273,9 +273,9 @@ func TestErsOnEvictedMetaDumpToFileMergeErr(t *testing.T) { rdrCfg: &config.EventReaderCfg{ ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToFile, - utils.PartialPathOpt: dirPath, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToFile), + PartialPath: utils.StringPointer(dirPath), }, }, } @@ -315,8 +315,8 @@ func TestErsOnEvictedMetaDumpToFileEmptyPath(t *testing.T) { rdrCfg: &config.EventReaderCfg{ ID: "ER1", Type: utils.MetaNone, - Opts: map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToFile, + Opts: &config.EventReaderOpts{ + PartialCacheAction: utils.StringPointer(utils.MetaDumpToFile), }, }, } diff --git a/ers/filecsv.go b/ers/filecsv.go index d86a0d45f..5c3e756ec 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -133,28 +133,28 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { } defer file.Close() csvReader := csv.NewReader(file) - var rowLength int64 - if rowLength, err = utils.IfaceAsTInt64(rdr.Config().Opts[utils.CSVRowLengthOpt]); err != nil { - utils.Logger.Err( - fmt.Sprintf("<%s> failed creating CSV reader for <%s>, due to option parsing error: <%s>", - utils.ERs, rdr.Config().ID, err.Error())) - return + var rowLength int + if rdr.Config().Opts.CSVRowLength != nil { + rowLength = *rdr.Config().Opts.CSVRowLength } - csvReader.FieldsPerRecord = int(rowLength) + csvReader.FieldsPerRecord = rowLength csvReader.Comment = utils.CommentChar csvReader.Comma = utils.CSVSep - if fieldSep, has := rdr.Config().Opts[utils.CSVFieldSepOpt]; has { - csvReader.Comma = rune(utils.IfaceAsString(fieldSep)[0]) + if rdr.Config().Opts.CSVFieldSeparator != nil { + csvReader.Comma = rune((*rdr.Config().Opts.CSVFieldSeparator)[0]) } - if val, has := rdr.Config().Opts[utils.CSVLazyQuotes]; has { - csvReader.LazyQuotes, err = utils.IfaceAsBool(val) + if rdr.Config().Opts.CSVLazyQuotes != nil { + csvReader.LazyQuotes = *rdr.Config().Opts.CSVLazyQuotes } var indxAls map[string]int rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.FileName: utils.NewLeafNode(fName)}} - hdrDefChar := utils.IfaceAsString(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts[utils.HeaderDefineCharOpt]) + var hdrDefChar string + if rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts.CSVHeaderDefineChar != nil { + hdrDefChar = *rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts.CSVHeaderDefineChar + } for { var record []string if record, err = csvReader.Read(); err != nil { diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index d5893fc1e..8eb48c1a4 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -236,7 +236,7 @@ func TestFWVFileConfig(t *testing.T) { Timezone: utils.EmptyString, Filters: []string{}, Flags: utils.FlagsWithParams{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, }, { ID: "file_reader2", @@ -249,7 +249,7 @@ func TestFWVFileConfig(t *testing.T) { Timezone: utils.EmptyString, Filters: []string{}, Flags: utils.FlagsWithParams{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, }, } expected := cfg.ERsCfg().Readers[0] diff --git a/ers/filexml.go b/ers/filexml.go index c1455fa3a..b1f817af6 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -136,7 +136,10 @@ func (rdr *XMLFileER) processFile(fPath, fName string) (err error) { if err != nil { return err } - xmlRootPath := utils.ParseHierarchyPath(utils.IfaceAsString(rdr.Config().Opts[utils.XMLRootPathOpt]), utils.EmptyString) + var xmlRootPath utils.HierarchyPath + if rdr.Config().Opts.XMLRootPath != nil { + xmlRootPath = utils.ParseHierarchyPath(*rdr.Config().Opts.XMLRootPath, utils.EmptyString) + } xmlElmts := xmlquery.Find(doc, xmlRootPath.AsString("/", true)) rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 diff --git a/ers/filexml_test.go b/ers/filexml_test.go index aebd97452..3368604e0 100644 --- a/ers/filexml_test.go +++ b/ers/filexml_test.go @@ -58,7 +58,7 @@ func TestERSXMLFileERConfig(t *testing.T) { SourcePath: "/var/spool/cgrates/ers/in", ProcessedPath: "/var/spool/cgrates/ers/out", Filters: []string{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, } result1, err := NewXMLFileER(cfg, 0, nil, nil, nil, nil, nil) if err != nil { @@ -80,7 +80,7 @@ func TestERSXMLFileERServeNil(t *testing.T) { SourcePath: "/var/spool/cgrates/ers/in", ProcessedPath: "/var/spool/cgrates/ers/out", Filters: []string{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, } result1, err := NewXMLFileER(cfg, 0, nil, nil, nil, nil, nil) if err != nil { diff --git a/ers/kafka.go b/ers/kafka.go index 51448a49c..f551065fd 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -184,28 +184,29 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) { return } -func (rdr *KafkaER) setOpts(opts map[string]interface{}) (err error) { +func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) { rdr.topic = utils.KafkaDefaultTopic rdr.groupID = utils.KafkaDefaultGroupID rdr.maxWait = utils.KafkaDefaultMaxWait - - if vals, has := opts[utils.KafkaTopic]; has { - rdr.topic = utils.IfaceAsString(vals) + if opts.KafkaTopic != nil { + rdr.topic = *opts.KafkaTopic } - if vals, has := opts[utils.KafkaGroupID]; has { - rdr.groupID = utils.IfaceAsString(vals) + if opts.KafkaGroupID != nil { + rdr.groupID = *opts.KafkaGroupID } - if vals, has := opts[utils.KafkaMaxWait]; has { - rdr.maxWait, err = utils.IfaceAsDuration(vals) + if opts.KafkaMaxWait != nil { + rdr.maxWait = *opts.KafkaMaxWait } return } func (rdr *KafkaER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if len(processedOpt) == 0 && - len(rdr.Config().ProcessedPath) == 0 { - return + if processedOpt == nil { + if len(rdr.Config().ProcessedPath) == 0 { + return + } + processedOpt = new(config.EventExporterOpts) } rdr.poster = ees.NewKafkaEE(&config.EventExporterCfg{ ID: rdr.Config().ID, diff --git a/ers/kafka_test.go b/ers/kafka_test.go index 703c0d43e..111e70a47 100644 --- a/ers/kafka_test.go +++ b/ers/kafka_test.go @@ -38,10 +38,11 @@ func TestKafkasetOpts(t *testing.T) { groupID: "new", maxWait: time.Second, } - if err := k.setOpts(map[string]interface{}{ - utils.KafkaTopic: "cdrs", - utils.KafkaGroupID: "new", - utils.KafkaMaxWait: "1s", + + if err := k.setOpts(&config.EventReaderOpts{ + KafkaTopic: utils.StringPointer("cdrs"), + KafkaGroupID: utils.StringPointer("new"), + KafkaMaxWait: utils.DurationPointer(time.Second), }); err != nil { t.Fatal(err) } else if expKafka.dialURL != k.dialURL { @@ -61,7 +62,7 @@ func TestKafkasetOpts(t *testing.T) { groupID: "cgrates", maxWait: time.Millisecond, } - if err := k.setOpts(map[string]interface{}{}); err != nil { + if err := k.setOpts(&config.EventReaderOpts{}); err != nil { t.Fatal(err) } else if expKafka.dialURL != k.dialURL { t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) @@ -81,10 +82,10 @@ func TestKafkasetOpts(t *testing.T) { groupID: "new", maxWait: time.Second, } - if err := k.setOpts(map[string]interface{}{ - utils.KafkaTopic: "cdrs", - utils.KafkaGroupID: "new", - utils.KafkaMaxWait: "1s", + if err := k.setOpts(&config.EventReaderOpts{ + KafkaTopic: utils.StringPointer("cdrs"), + KafkaGroupID: utils.StringPointer("new"), + KafkaMaxWait: utils.DurationPointer(time.Second), }); err != nil { t.Fatal(err) } else if expKafka.dialURL != k.dialURL { @@ -115,7 +116,7 @@ func TestKafkaERServe(t *testing.T) { if err := rdr.Serve(); err != nil { t.Error(err) } - rdr.Config().Opts = map[string]interface{}{} + rdr.Config().Opts = &config.EventReaderOpts{} rdr.Config().ProcessedPath = "" rdr.(*KafkaER).createPoster() close(rdrExit) diff --git a/ers/libers.go b/ers/libers.go index b227ecfcd..6abf971b7 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -21,7 +21,6 @@ package ers import ( "fmt" "sort" - "strings" "time" "github.com/cgrates/cgrates/agents" @@ -30,12 +29,144 @@ import ( "github.com/cgrates/cgrates/utils" ) -func getProcessOptions(opts map[string]interface{}) (proc map[string]interface{}) { - proc = make(map[string]interface{}) - for k, v := range opts { - if strings.HasSuffix(k, utils.ProcessedOpt) { - proc[k[:len(k)-9]] = v +func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExporterOpts) { + if erOpts.AMQPExchangeProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) } + eeOpts.AMQPExchange = erOpts.AMQPExchangeProcessed + } + if erOpts.AMQPExchangeTypeProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.AMQPExchangeType = erOpts.AMQPExchangeTypeProcessed + } + if erOpts.AMQPQueueIDProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.AMQPQueueID = erOpts.AMQPQueueIDProcessed + } + if erOpts.AMQPRoutingKeyProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.AMQPRoutingKey = erOpts.AMQPRoutingKeyProcessed + } + if erOpts.AWSKeyProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.AWSKey = erOpts.AWSKeyProcessed + } + if erOpts.AWSRegionProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.AWSRegion = erOpts.AWSRegionProcessed + } + if erOpts.AWSSecretProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.AWSSecret = erOpts.AWSSecretProcessed + } + if erOpts.AWSTokenProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.AWSToken = erOpts.AWSTokenProcessed + } + if erOpts.KafkaTopicProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.KafkaTopic = erOpts.KafkaTopicProcessed + } + if erOpts.NATSCertificateAuthorityProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.NATSCertificateAuthority = erOpts.NATSCertificateAuthorityProcessed + } + if erOpts.NATSClientCertificateProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.NATSClientCertificate = erOpts.NATSClientCertificateProcessed + } + if erOpts.NATSClientKeyProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.NATSClientKey = erOpts.NATSClientKeyProcessed + } + if erOpts.NATSJWTFileProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.NATSJWTFile = erOpts.NATSJWTFileProcessed + } + if erOpts.NATSJetStreamMaxWaitProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.NATSJetStreamMaxWait = erOpts.NATSJetStreamMaxWaitProcessed + } + if erOpts.NATSJetStreamProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.NATSJetStream = erOpts.NATSJetStreamProcessed + } + if erOpts.NATSSeedFileProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.NATSSeedFile = erOpts.NATSSeedFileProcessed + } + if erOpts.NATSSubjectProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.NATSSubject = erOpts.NATSSubjectProcessed + } + if erOpts.S3BucketIDProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.S3BucketID = erOpts.S3BucketIDProcessed + } + if erOpts.S3FolderPathProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.S3FolderPath = erOpts.S3FolderPathProcessed + } + if erOpts.SQLDBNameProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.SQLDBName = erOpts.SQLDBNameProcessed + } + if erOpts.SQLTableNameProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.SQLTableName = erOpts.SQLTableNameProcessed + } + if erOpts.SQSQueueIDProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.SQSQueueID = erOpts.SQSQueueIDProcessed + } + if erOpts.SSLModeProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.SSLMode = erOpts.SSLModeProcessed } return } @@ -45,7 +176,10 @@ func mergePartialEvents(cgrEvs []*utils.CGREvent, cfg *config.EventReaderCfg, fl cgrEv = cgrEvs[0] // by default there is at least one event if len(cgrEvs) != 1 { // need to merge the incoming events // prepare the field after which the events are ordered - ordFld := utils.IfaceAsString(cfg.Opts[utils.PartialOrderFieldOpt]) // safe as the checkConfigSanity forces this option to be populated + var ordFld string + if cfg.Opts.PartialOrderField != nil { + ordFld = *cfg.Opts.PartialOrderField + } var ordPath config.RSRParsers if ordPath, err = config.NewRSRParsers(ordFld, rsrSep); err != nil { // convert the option to rsrParsers return nil, err diff --git a/ers/libers_test.go b/ers/libers_test.go index 671b48aa8..b2bf78dad 100644 --- a/ers/libers_test.go +++ b/ers/libers_test.go @@ -21,15 +21,18 @@ package ers import ( "reflect" "testing" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" ) func TestGetProcessOptions(t *testing.T) { - opts := map[string]interface{}{ - "testKeyProcessed": "testValue", + opts := &config.EventReaderOpts{ + AMQPQueueIDProcessed: utils.StringPointer("processed"), } result := getProcessOptions(opts) - expected := map[string]interface{}{ - "testKey": "testValue", + expected := &config.EventExporterOpts{ + AMQPQueueID: utils.StringPointer("processed"), } if !reflect.DeepEqual(result, expected) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) diff --git a/ers/nats.go b/ers/nats.go index 46812cefc..7f839270e 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -19,8 +19,11 @@ along with this program. If not, see package ers import ( + "crypto/tls" + "crypto/x509" "encoding/json" "fmt" + "io/ioutil" "time" "github.com/cgrates/cgrates/agents" @@ -185,9 +188,11 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) { func (rdr *NatsER) createPoster() (err error) { processedOpt := getProcessOptions(rdr.Config().Opts) - if len(processedOpt) == 0 && - len(rdr.Config().ProcessedPath) == 0 { - return + if processedOpt == nil { + if len(rdr.Config().ProcessedPath) == 0 { + return + } + processedOpt = new(config.EventExporterOpts) } rdr.poster, err = ees.NewNatsEE(&config.EventExporterCfg{ ID: rdr.Config().ID, @@ -202,27 +207,86 @@ func (rdr *NatsER) createPoster() (err error) { } func (rdr *NatsER) processOpts() (err error) { - rdr.subject = utils.IfaceAsString(rdr.Config().Opts[utils.NatsSubject]) - rdr.queueID = utils.FirstNonEmpty(utils.IfaceAsString(rdr.Config().Opts[utils.NatsQueueID]), - rdr.cgrCfg.GeneralCfg().NodeID) - rdr.consumerName = utils.FirstNonEmpty(utils.IfaceAsString(rdr.Config().Opts[utils.NatsConsumerName]), - utils.CGRateSLwr) - if useJetStreamVal, has := rdr.Config().Opts[utils.NatsJetStream]; has { - if rdr.jetStream, err = utils.IfaceAsBool(useJetStreamVal); err != nil { - return - } + if rdr.Config().Opts.NATSSubject != nil { + rdr.subject = *rdr.Config().Opts.NATSSubject + } + var queueID string + if rdr.Config().Opts.NATSQueueID != nil { + queueID = *rdr.Config().Opts.NATSQueueID + } + rdr.queueID = utils.FirstNonEmpty(queueID, rdr.cgrCfg.GeneralCfg().NodeID) + var consumerName string + if rdr.Config().Opts.NATSConsumerName != nil { + consumerName = *rdr.Config().Opts.NATSConsumerName + } + rdr.consumerName = utils.FirstNonEmpty(consumerName, utils.CGRateSLwr) + if rdr.Config().Opts.NATSJetStream != nil { + rdr.jetStream = *rdr.Config().Opts.NATSJetStream } if rdr.jetStream { - if maxWaitVal, has := rdr.Config().Opts[utils.NatsJetStreamMaxWait]; has { - var maxWait time.Duration - if maxWait, err = utils.IfaceAsDuration(maxWaitVal); err != nil { - return - } - rdr.jsOpts = []nats.JSOpt{nats.MaxWait(maxWait)} + if rdr.Config().Opts.NATSJetStreamMaxWait != nil { + rdr.jsOpts = []nats.JSOpt{nats.MaxWait(*rdr.Config().Opts.NATSJetStreamMaxWait)} } } - rdr.opts, err = ees.GetNatsOpts(rdr.Config().Opts, + rdr.opts, err = GetNatsOpts(rdr.Config().Opts, rdr.cgrCfg.GeneralCfg().NodeID, rdr.cgrCfg.GeneralCfg().ConnectTimeout) return } + +func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { + nop = make([]nats.Option, 0, 7) + nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID), + nats.Timeout(connTimeout), + nats.DrainTimeout(time.Second)) + if opts.NATSJWTFile != nil { + keys := make([]string, 0, 1) + if opts.NATSSeedFile != nil { + keys = append(keys, *opts.NATSSeedFile) + } + nop = append(nop, nats.UserCredentials(*opts.NATSJWTFile, keys...)) + } + if opts.NATSSeedFile != nil { + opt, err := nats.NkeyOptionFromSeed(*opts.NATSSeedFile) + if err != nil { + return nil, err + } + nop = append(nop, opt) + } + if opts.NATSClientCertificate != nil { + if opts.NATSClientKey == nil { + err = fmt.Errorf("has certificate but no key") + return + } + nop = append(nop, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey)) + } else if opts.NATSClientKey != nil { + err = fmt.Errorf("has key but no certificate") + return + } + + if opts.NATSCertificateAuthority != nil { + nop = append(nop, + func(o *nats.Options) error { + pool, err := x509.SystemCertPool() + if err != nil { + return err + } + rootPEM, err := ioutil.ReadFile(*opts.NATSCertificateAuthority) + if err != nil || rootPEM == nil { + return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err) + } + ok := pool.AppendCertsFromPEM(rootPEM) + if !ok { + return fmt.Errorf("nats: failed to parse root certificate from %q", + *opts.NATSCertificateAuthority) + } + if o.TLSConfig == nil { + o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} + } + o.TLSConfig.RootCAs = pool + o.Secure = true + return nil + }) + } + return +} diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index 6eb096bb3..ec250dfd4 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -32,7 +32,6 @@ import ( "time" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/ees" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/nats-io/nats.go" @@ -81,7 +80,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) { t.Fatal(err) } - nop, err := ees.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) + nop, err := GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) if err != nil { t.Fatal(err) } @@ -168,7 +167,7 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) { t.Fatal(err) } - nop, err := ees.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) + nop, err := GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) if err != nil { t.Fatal(err) } diff --git a/ers/readers_test.go b/ers/readers_test.go index 329157224..f1a7bab9e 100644 --- a/ers/readers_test.go +++ b/ers/readers_test.go @@ -102,7 +102,9 @@ func TestNewSQLReader(t *testing.T) { reader.Type = utils.MetaSQL reader.ID = "file_reader" reader.ConcurrentReqs = -1 - reader.Opts = map[string]interface{}{"db_name": "cgrates2"} + reader.Opts = &config.EventReaderOpts{ + SQLDBName: utils.StringPointer("cgrates2"), + } reader.SourcePath = "*mysql://cgrates:CGRateS.org@127.0.0.1:3306" reader.ProcessedPath = "" cfg.ERsCfg().Readers = append(cfg.ERsCfg().Readers, reader) @@ -127,7 +129,9 @@ func TestNewSQLReaderError(t *testing.T) { reader.Type = utils.MetaSQL reader.ID = "file_reader" reader.ConcurrentReqs = -1 - reader.Opts = map[string]interface{}{"db_name": "cgrates2"} + reader.Opts = &config.EventReaderOpts{ + SQLDBName: utils.StringPointer("cgrates2"), + } reader.SourcePath = "#" reader.ProcessedPath = "" expected := "unknown db_type " @@ -212,7 +216,7 @@ func TestNewAMQPReader(t *testing.T) { } exp.dialURL = exp.Config().SourcePath exp.Config().ProcessedPath = "" - exp.setOpts(map[string]interface{}{}) + exp.setOpts(&config.EventReaderOpts{}) exp.createPoster() var expected EventReader = exp rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) @@ -237,7 +241,7 @@ func TestNewAMQPv1Reader(t *testing.T) { rdrErr: nil, } exp.Config().ProcessedPath = "" - exp.Config().Opts = map[string]interface{}{} + exp.Config().Opts = &config.EventReaderOpts{} exp.createPoster() var expected EventReader = exp rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) @@ -263,7 +267,7 @@ func TestNewS3Reader(t *testing.T) { bucket: "cgrates_cdrs", } exp.Config().ProcessedPath = "" - exp.Config().Opts = map[string]interface{}{} + exp.Config().Opts = &config.EventReaderOpts{} exp.createPoster() var expected EventReader = exp rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) @@ -300,7 +304,7 @@ func TestNewSQSReader(t *testing.T) { // t.Error(err) // } exp.Config().ProcessedPath = "" - exp.Config().Opts = map[string]interface{}{} + exp.Config().Opts = &config.EventReaderOpts{} exp.createPoster() var expected EventReader = exp rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) diff --git a/ers/s3.go b/ers/s3.go index 648b2f09c..a32c8463c 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -150,22 +150,22 @@ func (rdr *S3ER) processMessage(body []byte) (err error) { return } -func (rdr *S3ER) parseOpts(opts map[string]interface{}) { +func (rdr *S3ER) parseOpts(opts *config.EventReaderOpts) { rdr.bucket = utils.DefaultQueueID - if val, has := opts[utils.S3Bucket]; has { - rdr.bucket = utils.IfaceAsString(val) + if opts.S3BucketID != nil { + rdr.bucket = *opts.S3BucketID } - if val, has := opts[utils.AWSRegion]; has { - rdr.awsRegion = utils.IfaceAsString(val) + if opts.AWSRegion != nil { + rdr.awsRegion = *opts.AWSRegion } - if val, has := opts[utils.AWSKey]; has { - rdr.awsID = utils.IfaceAsString(val) + if opts.AWSKey != nil { + rdr.awsID = *opts.AWSKey } - if val, has := opts[utils.AWSSecret]; has { - rdr.awsKey = utils.IfaceAsString(val) + if opts.AWSSecret != nil { + rdr.awsKey = *opts.AWSSecret } - if val, has := opts[utils.AWSToken]; has { - rdr.awsToken = utils.IfaceAsString(val) + if opts.AWSToken != nil { + rdr.awsToken = *opts.AWSToken } } @@ -194,9 +194,11 @@ func (rdr *S3ER) readLoop(scv s3Client) (err error) { func (rdr *S3ER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if len(processedOpt) == 0 && - len(rdr.Config().ProcessedPath) == 0 { - return + if processedOpt == nil { + if len(rdr.Config().ProcessedPath) == 0 { + return + } + processedOpt = new(config.EventExporterOpts) } rdr.poster = ees.NewS3EE(&config.EventExporterCfg{ ID: rdr.Config().ID, diff --git a/ers/s3_it_test.go b/ers/s3_it_test.go index 011f8046e..f3086eade 100644 --- a/ers/s3_it_test.go +++ b/ers/s3_it_test.go @@ -162,7 +162,7 @@ func TestNewS3ER(t *testing.T) { SourcePath: "/var/spool/cgrates/ers/in", ProcessedPath: "/var/spool/cgrates/ers/out", Filters: []string{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, }, { ID: utils.MetaDefault, @@ -172,7 +172,7 @@ func TestNewS3ER(t *testing.T) { SourcePath: "/var/spool/cgrates/ers/in", ProcessedPath: "/var/spool/cgrates/ers/out", Filters: []string{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, }, } @@ -214,7 +214,7 @@ func TestNewS3ERCase2(t *testing.T) { SourcePath: "/var/spool/cgrates/ers/in", ProcessedPath: "/var/spool/cgrates/ers/out", Filters: []string{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, }, } diff --git a/ers/s3_test.go b/ers/s3_test.go index 832837220..aa7681285 100644 --- a/ers/s3_test.go +++ b/ers/s3_test.go @@ -226,22 +226,22 @@ func TestS3ERParseOpts(t *testing.T) { poster: nil, } - opts := map[string]interface{}{ - utils.S3Bucket: "QueueID", - utils.AWSRegion: "AWSRegion", - utils.AWSKey: "AWSKey", - utils.AWSSecret: "AWSSecret", - utils.AWSToken: "AWSToken", + opts := &config.EventReaderOpts{ + S3BucketID: utils.StringPointer("QueueID"), + AWSRegion: utils.StringPointer("AWSRegion"), + AWSKey: utils.StringPointer("AWSKey"), + AWSSecret: utils.StringPointer("AWSSecret"), + AWSToken: utils.StringPointer("AWSToken"), } rdr.parseOpts(opts) - if rdr.bucket != opts[utils.S3Bucket] || - rdr.awsRegion != opts[utils.AWSRegion] || - rdr.awsID != opts[utils.AWSKey] || - rdr.awsKey != opts[utils.AWSSecret] || - rdr.awsToken != opts[utils.AWSToken] { + if rdr.bucket != *opts.S3BucketID || + rdr.awsRegion != *opts.AWSRegion || + rdr.awsID != *opts.AWSKey || + rdr.awsKey != *opts.AWSSecret || + rdr.awsToken != *opts.AWSToken { t.Error("Fields do not corespond") } - rdr.Config().Opts = map[string]interface{}{} + rdr.Config().Opts = &config.EventReaderOpts{} rdr.Config().ProcessedPath = utils.EmptyString rdr.createPoster() } @@ -579,7 +579,7 @@ func TestS3ERReadMsgError5(t *testing.T) { poster: ees.NewS3EE(&config.EventExporterCfg{ ExportPath: "url", Attempts: 1, - Opts: map[string]interface{}{}, + Opts: &config.EventExporterOpts{}, }, nil), } rdr.Config().SourcePath = rdr.awsRegion diff --git a/ers/sql.go b/ers/sql.go index 9aa1c0916..f0047b609 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -265,7 +265,7 @@ func (rdr *SQLEventReader) processMessage(msg map[string]interface{}) (err error return } -func (rdr *SQLEventReader) setURL(inURL, outURL string, opts map[string]interface{}) (err error) { +func (rdr *SQLEventReader) setURL(inURL, outURL string, opts *config.EventReaderOpts) (err error) { inURL = strings.TrimPrefix(inURL, utils.Meta) var u *url.URL if u, err = url.Parse(inURL); err != nil { @@ -275,17 +275,17 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts map[string]interfac rdr.connType = u.Scheme dbname := utils.SQLDefaultDBName - if vals, has := opts[utils.SQLDBNameOpt]; has { - dbname = utils.IfaceAsString(vals) + if opts.SQLDBName != nil { + dbname = *opts.SQLDBName } ssl := utils.SQLDefaultSSLMode - if vals, has := opts[utils.SSLModeCfg]; has { - ssl = utils.IfaceAsString(vals) + if opts.SSLMode != nil { + ssl = *opts.SSLMode } rdr.tableName = utils.CDRsTBL - if vals, has := opts[utils.SQLTableNameOpt]; has { - rdr.tableName = utils.IfaceAsString(vals) + if opts.SQLTableName != nil { + rdr.tableName = *opts.SQLTableName } switch rdr.connType { case utils.MySQL: @@ -299,9 +299,11 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts map[string]interfac // outURL processedOpt := getProcessOptions(opts) - if len(processedOpt) == 0 && - len(outURL) == 0 { - return + if processedOpt == nil { + if len(outURL) == 0 { + return + } + processedOpt = new(config.EventExporterOpts) } var outUser, outPassword, outDBname, outSSL, outHost, outPort string if len(outURL) == 0 { @@ -324,16 +326,16 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts map[string]interfac } outDBname = utils.SQLDefaultDBName - if vals, has := processedOpt[utils.SQLDBNameOpt]; has { - outDBname = utils.IfaceAsString(vals) + if processedOpt.SQLDBName != nil { + outDBname = *processedOpt.SQLDBName } outSSL = utils.SQLDefaultSSLMode - if vals, has := processedOpt[utils.SSLModeCfg]; has { - outSSL = utils.IfaceAsString(vals) + if processedOpt.SSLMode != nil { + outSSL = *processedOpt.SSLMode } rdr.expTableName = utils.CDRsTBL - if vals, has := processedOpt[utils.SQLTableNameOpt]; has { - rdr.expTableName = utils.IfaceAsString(vals) + if processedOpt.SQLTableName != nil { + rdr.expTableName = *processedOpt.SQLTableName } switch rdr.expConnType { diff --git a/ers/sql_it_test.go b/ers/sql_it_test.go index f811538de..c88f59c2c 100644 --- a/ers/sql_it_test.go +++ b/ers/sql_it_test.go @@ -676,7 +676,9 @@ func TestErsSqlPostCDRS(t *testing.T) { reader.Type = utils.MetaSQL reader.ID = "file_reader" reader.ConcurrentReqs = -1 - reader.Opts = map[string]interface{}{"db_name": "cgrates2"} + reader.Opts = &config.EventReaderOpts{ + SQLDBName: utils.StringPointer("cgrates2"), + } reader.SourcePath = "*mysql://cgrates:CGRateS.org@127.0.0.1:3306" reader.ProcessedPath = "" cfg.ERsCfg().Readers = append(cfg.ERsCfg().Readers, reader) diff --git a/ers/sql_test.go b/ers/sql_test.go index 7db51a633..722b9ea96 100644 --- a/ers/sql_test.go +++ b/ers/sql_test.go @@ -40,14 +40,14 @@ func TestSQLSetURL(t *testing.T) { } inURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306" outURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306" - if err := sql.setURL(inURL, outURL, map[string]interface{}{ - utils.SQLDBNameOpt: "cgrates2", - utils.SQLTableNameOpt: "cdrs2", - utils.SSLModeCfg: "enabled", + if err := sql.setURL(inURL, outURL, &config.EventReaderOpts{ + SQLDBName: utils.StringPointer("cgrates2"), + SQLTableName: utils.StringPointer("cdrs2"), + SSLMode: utils.StringPointer("enabled"), - utils.SQLDBNameOpt + utils.ProcessedOpt: "cgrates3", - utils.SQLTableNameOpt + utils.ProcessedOpt: "cdrs3", - utils.SSLModeCfg + utils.ProcessedOpt: "enabled", + SQLDBNameProcessed: utils.StringPointer("cgrates3"), + SQLTableNameProcessed: utils.StringPointer("cdrs3"), + SSLModeProcessed: utils.StringPointer("enabled"), }); err != nil { t.Fatal(err) } else if expsql.connString != sql.connString { @@ -75,14 +75,14 @@ func TestSQLSetURL(t *testing.T) { } inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306" outURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306" - if err := sql.setURL(inURL, outURL, map[string]interface{}{ - utils.SQLDBNameOpt: "cgrates2", - utils.SQLTableNameOpt: "cdrs2", - utils.SSLModeCfg: "enabled", + if err := sql.setURL(inURL, outURL, &config.EventReaderOpts{ + SQLDBName: utils.StringPointer("cgrates2"), + SQLTableName: utils.StringPointer("cdrs2"), + SSLMode: utils.StringPointer("enabled"), - utils.SQLDBNameOpt + utils.ProcessedOpt: "cgrates3", - utils.SQLTableNameOpt + utils.ProcessedOpt: "cdrs3", - utils.SSLModeCfg + utils.ProcessedOpt: "enabled", + SQLDBNameProcessed: utils.StringPointer("cgrates3"), + SQLTableNameProcessed: utils.StringPointer("cdrs3"), + SSLModeProcessed: utils.StringPointer("enabled"), }); err != nil { t.Fatal(err) } else if expsql.connString != sql.connString { @@ -110,14 +110,14 @@ func TestSQLSetURL(t *testing.T) { } inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306" outURL = "" - if err := sql.setURL(inURL, outURL, map[string]interface{}{ - utils.SQLDBNameOpt: "cgrates2", - utils.SQLTableNameOpt: "cdrs2", - utils.SSLModeCfg: "enabled", + if err := sql.setURL(inURL, outURL, &config.EventReaderOpts{ + SQLDBName: utils.StringPointer("cgrates2"), + SQLTableName: utils.StringPointer("cdrs2"), + SSLMode: utils.StringPointer("enabled"), - utils.SQLDBNameOpt + utils.ProcessedOpt: "cgrates2", - utils.SQLTableNameOpt + utils.ProcessedOpt: "cdrs2", - utils.SSLModeCfg + utils.ProcessedOpt: "enabled", + SQLDBNameProcessed: utils.StringPointer("cgrates2"), + SQLTableNameProcessed: utils.StringPointer("cdrs2"), + SSLModeProcessed: utils.StringPointer("enabled"), }); err != nil { t.Fatal(err) } else if expsql.connString != sql.connString { @@ -136,12 +136,12 @@ func TestSQLSetURL(t *testing.T) { inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?dbName=cgrates2&tableName=cdrs2&sslmode=enabled" outURL = "*postgres2://cgrates:CGRateS.org@127.0.0.1:3306?dbName=cgrates2&tableName=cdrs2&sslmode=enabled" - if err := sql.setURL(inURL, outURL, make(map[string]interface{})); err == nil || err.Error() != "unknown db_type postgres2" { + if err := sql.setURL(inURL, outURL, &config.EventReaderOpts{}); err == nil || err.Error() != "unknown db_type postgres2" { t.Errorf("Expected error: 'unknown db_type postgres2' ,received: %v", err) } inURL = "*postgres2://cgrates:CGRateS.org@127.0.0.1:3306?dbName=cgrates2&tableName=cdrs2&sslmode=enabled" outURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?dbName=cgrates2&tableName=cdrs2&sslmode=enabled" - if err := sql.setURL(inURL, outURL, make(map[string]interface{})); err == nil || err.Error() != "unknown db_type postgres2" { + if err := sql.setURL(inURL, outURL, &config.EventReaderOpts{}); err == nil || err.Error() != "unknown db_type postgres2" { t.Errorf("Expected error: 'unknown db_type postgres2' ,received: %v", err) } } diff --git a/ers/sqs.go b/ers/sqs.go index 704f82ea6..f5f1555a7 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -138,22 +138,22 @@ func (rdr *SQSER) processMessage(body []byte) (err error) { return } -func (rdr *SQSER) parseOpts(opts map[string]interface{}) { +func (rdr *SQSER) parseOpts(opts *config.EventReaderOpts) { rdr.queueID = utils.DefaultQueueID - if val, has := opts[utils.SQSQueueID]; has { - rdr.queueID = utils.IfaceAsString(val) + if opts.SQSQueueID != nil { + rdr.queueID = *opts.SQSQueueID } - if val, has := opts[utils.AWSRegion]; has { - rdr.awsRegion = utils.IfaceAsString(val) + if opts.AWSRegion != nil { + rdr.awsRegion = *opts.AWSRegion } - if val, has := opts[utils.AWSKey]; has { - rdr.awsID = utils.IfaceAsString(val) + if opts.AWSKey != nil { + rdr.awsID = *opts.AWSKey } - if val, has := opts[utils.AWSSecret]; has { - rdr.awsKey = utils.IfaceAsString(val) + if opts.AWSSecret != nil { + rdr.awsKey = *opts.AWSSecret } - if val, has := opts[utils.AWSToken]; has { - rdr.awsToken = utils.IfaceAsString(val) + if opts.AWSToken != nil { + rdr.awsToken = *opts.AWSToken } rdr.getQueueURL() } @@ -217,9 +217,11 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) { func (rdr *SQSER) createPoster() { processedOpt := getProcessOptions(rdr.Config().Opts) - if len(processedOpt) == 0 && - len(rdr.Config().ProcessedPath) == 0 { - return + if processedOpt == nil { + if len(rdr.Config().ProcessedPath) == 0 { + return + } + processedOpt = new(config.EventExporterOpts) } rdr.poster = ees.NewSQSee(&config.EventExporterCfg{ ID: rdr.Config().ID, diff --git a/ers/sqs_test.go b/ers/sqs_test.go index 60bf3fe15..3eb5115c8 100644 --- a/ers/sqs_test.go +++ b/ers/sqs_test.go @@ -49,7 +49,7 @@ func TestNewSQSER(t *testing.T) { SourcePath: "/var/spool/cgrates/ers/in", ProcessedPath: "/var/spool/cgrates/ers/out", Filters: []string{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, }, } rdr, err := NewSQSER(cfg, 0, nil, nil, @@ -77,7 +77,7 @@ func TestSQSERServeRunDelay0(t *testing.T) { SourcePath: "/var/spool/cgrates/ers/in", ProcessedPath: "/var/spool/cgrates/ers/out", Filters: []string{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, }, } rdr, err := NewSQSER(cfg, 0, nil, nil, @@ -103,7 +103,7 @@ func TestSQSERServe(t *testing.T) { SourcePath: "/var/spool/cgrates/ers/in", ProcessedPath: "/var/spool/cgrates/ers/out", Filters: []string{}, - Opts: make(map[string]interface{}), + Opts: &config.EventReaderOpts{}, }, } rdr, err := NewSQSER(cfg, 0, nil, nil, @@ -276,22 +276,19 @@ func TestSQSERParseOpts(t *testing.T) { poster: nil, } - opts := map[string]interface{}{ - utils.SQSQueueID: "QueueID", - utils.AWSRegion: "AWSRegion", - utils.AWSKey: "AWSKey", - utils.AWSSecret: "AWSSecret", - utils.AWSToken: "AWSToken", + opts := &config.EventReaderOpts{ + SQSQueueID: utils.StringPointer("QueueID"), + AWSRegion: utils.StringPointer("AWSRegion"), + AWSKey: utils.StringPointer("AWSKey"), + AWSSecret: utils.StringPointer("AWSSecret"), + AWSToken: utils.StringPointer("AWSToken"), } rdr.parseOpts(opts) - if rdr.queueID != opts[utils.SQSQueueID] || - rdr.awsRegion != opts[utils.AWSRegion] || - rdr.awsID != opts[utils.AWSKey] || - rdr.awsKey != opts[utils.AWSSecret] || - rdr.awsToken != opts[utils.AWSToken] { + if rdr.queueID != *opts.SQSQueueID || rdr.awsRegion != *opts.AWSRegion || rdr.awsID != *opts.AWSKey || rdr.awsKey != *opts.AWSSecret || + rdr.awsToken != *opts.AWSToken { t.Error("Fields do not corespond") } - rdr.Config().Opts = map[string]interface{}{} + rdr.Config().Opts = &config.EventReaderOpts{} rdr.Config().ProcessedPath = utils.EmptyString rdr.createPoster() } @@ -532,7 +529,7 @@ func TestSQSERReadMsgError3(t *testing.T) { poster: ees.NewSQSee(&config.EventExporterCfg{ ExportPath: "url", Attempts: 1, - Opts: make(map[string]interface{}), + Opts: &config.EventExporterOpts{}, }, nil), } awsCfg := aws.Config{Endpoint: aws.String(rdr.Config().SourcePath)} diff --git a/utils/consts.go b/utils/consts.go index 96ca365d9..fee02a136 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1968,6 +1968,7 @@ const ( OptsCfg = "opts" Tenants = "tenants" MysqlLocation = "mysqlLocation" + SSLMode = "sslMode" ) // DataDbCfg