From 40eb832060c5ca24778ddf8b9e367ea8755892fd Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 25 Nov 2021 17:20:33 +0200 Subject: [PATCH] Fix all compilation errors related to ers after making ees opts of type struct --- apier/v1/apier_it_test.go | 4 +- config/config_json_test.go | 2 +- config/config_test.go | 6 +- config/configsanity.go | 3 +- config/configsanity_test.go | 4 +- config/eescfg.go | 247 +++++++++++++++++------------------- config/eescfg_test.go | 18 +-- config/libconfig_json.go | 2 +- ees/amqp.go | 18 +-- ees/amqpv1.go | 4 +- ees/elastic.go | 54 +++----- ees/elastic_test.go | 84 ++++-------- ees/filecsv.go | 4 +- ees/kafka.go | 4 +- ees/libcdre.go | 27 +++- ees/libcdre_test.go | 17 ++- ees/nats.go | 50 ++++---- ees/nats_it_test.go | 5 +- ees/nats_test.go | 61 +++------ ees/poster_it_test.go | 28 ++-- ees/poster_test.go | 14 +- ees/s3.go | 26 ++-- ees/sql.go | 40 ++---- ees/sql_it_test.go | 46 ++----- ees/sql_test.go | 22 ++-- ees/sqs.go | 22 ++-- ers/kafka_test.go | 2 +- 27 files changed, 355 insertions(+), 459 deletions(-) diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index 630954d5b..d3a59d708 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -2172,8 +2172,8 @@ func testApierReplayFldPosts(t *testing.T) { fileInPath := path.Join(*args.FailedRequestsInDir, fileName) ev = &ees.ExportEvents{ Path: "amqp://guest:guest@localhost:5672/", - Opts: map[string]interface{}{ - utils.AMQPQueueID: "cgrates_cdrs", + Opts: &config.EventExporterOpts{ + AMQPQueueID: utils.StringPointer("cgrates_cdrs"), }, Format: utils.MetaAMQPjsonMap, Events: []interface{}{bev}, diff --git a/config/config_json_test.go b/config/config_json_test.go index 69381a95d..89004cc27 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -2035,7 +2035,7 @@ func TestDfEventExporterCfg(t *testing.T) { Synchronous: utils.BoolPointer(false), Attempts: utils.IntPointer(1), Fields: &[]*FcTemplateJsonCfg{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOptsJson{}, Concurrent_requests: utils.IntPointer(0), Failed_posts_dir: utils.StringPointer("/var/spool/cgrates/failed_posts"), }, diff --git a/config/config_test.go b/config/config_test.go index dac50a93a..872a721f4 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -2391,7 +2391,7 @@ func TestEEsNoLksConfig(t *testing.T) { Fields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, }, @@ -5378,7 +5378,7 @@ func TestCgrCdfEventExporter(t *testing.T) { contentFields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, }, @@ -5459,7 +5459,7 @@ func TestCgrCfgEventExporterDefault(t *testing.T) { Fields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", } if !reflect.DeepEqual(cgrCfg.dfltEvExp, eCfg) { diff --git a/config/configsanity.go b/config/configsanity.go index e961fa943..484fd1c38 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -850,8 +850,7 @@ func (cfg *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID) } } - if fldSep, has := exp.Opts[utils.CSVFieldSepOpt]; has && - utils.IfaceAsString(fldSep) == utils.EmptyString { + if exp.Opts.CSVFieldSeparator != nil && *exp.Opts.CSVFieldSeparator == utils.EmptyString { return fmt.Errorf("<%s> empty %s for exporter with ID: %s", utils.EEs, utils.CSVFieldSepOpt, exp.ID) } case utils.MetaFileFWV: diff --git a/config/configsanity_test.go b/config/configsanity_test.go index 4a9ed85ad..2a1d53130 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -1445,7 +1445,9 @@ func TestConfigSanityEventExporter(t *testing.T) { cfg.eesCfg.Exporters[0].Type = utils.MetaFileCSV cfg.eesCfg.Exporters[0].ExportPath = "/" - cfg.eesCfg.Exporters[0].Opts = map[string]interface{}{utils.CSVFieldSepOpt: ""} + cfg.eesCfg.Exporters[0].Opts = &EventExporterOpts{ + CSVFieldSeparator: utils.StringPointer(utils.EmptyString), + } expected = " empty csvFieldSeparator for exporter with ID: " if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) diff --git a/config/eescfg.go b/config/eescfg.go index 35e49afc4..a09d614da 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -90,7 +90,7 @@ func (eeS *EEsCfg) appendEEsExporters(exporters *[]*EventExporterJsonCfg, msgTem exp = dfltExpCfg.Clone() } else { exp = new(EventExporterCfg) - exp.Opts = make(map[string]interface{}) + exp.Opts = &EventExporterOpts{} } eeS.Exporters = append(eeS.Exporters, exp) } @@ -198,7 +198,7 @@ type EventExporterCfg struct { ID string Type string ExportPath string - Opts map[string]interface{} + Opts *EventExporterOpts Timezone string Filters []string Flags utils.FlagsWithParams @@ -399,14 +399,12 @@ func (eeC *EventExporterCfg) loadFromJSONCfg(jsnEec *EventExporterJsonCfg, msgTe } eeC.ComputeFields() } - if jsnEec.Opts != nil { - for k, v := range jsnEec.Opts { - eeC.Opts[k] = v - } - } if jsnEec.Failed_posts_dir != nil { eeC.FailedPostsDir = *jsnEec.Failed_posts_dir } + if jsnEec.Opts != nil { + err = eeC.Opts.loadFromJSONCfg(jsnEec.Opts) + } return } @@ -575,7 +573,7 @@ func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) { headerFields: make([]*FCTemplate, len(eeC.headerFields)), contentFields: make([]*FCTemplate, len(eeC.contentFields)), trailerFields: make([]*FCTemplate, len(eeC.trailerFields)), - Opts: make(map[string]interface{}), + Opts: eeC.Opts.Clone(), FailedPostsDir: eeC.FailedPostsDir, } @@ -604,126 +602,123 @@ func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) { for idx, fld := range eeC.trailerFields { cln.trailerFields[idx] = fld.Clone() } - for k, v := range eeC.Opts { - cln.Opts[k] = v - } return } // AsMapInterface returns the config as a map[string]interface{} func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[string]interface{}) { - // opts := map[string]interface{}{} - // if eeC.Opts.CSVFieldSeparator != nil { - // opts[utils.CSVFieldSepOpt] = *eeC.Opts.CSVFieldSeparator - // } - // if eeC.Opts.ElsIndex != nil { - // opts[utils.ElsIndex] = *eeC.Opts.ElsIndex - // } - // if eeC.Opts.ElsIfPrimaryTerm != nil { - // opts[utils.ElsIfPrimaryTerm] = *eeC.Opts.ElsIfPrimaryTerm - // } - // if eeC.Opts.ElsIfSeqNo != nil { - // opts[utils.ElsIfSeqNo] = *eeC.Opts.ElsIfSeqNo - // } - // if eeC.Opts.ElsOpType != nil { - // opts[utils.ElsOpType] = *eeC.Opts.ElsOpType - // } - // if eeC.Opts.ElsPipeline != nil { - // opts[utils.ElsPipeline] = *eeC.Opts.ElsPipeline - // } - // if eeC.Opts.ElsRouting != nil { - // opts[utils.ElsRouting] = *eeC.Opts.ElsRouting - // } - // if eeC.Opts.ElsTimeout != nil { - // opts[utils.ElsTimeout] = eeC.Opts.ElsTimeout.String() - // } - // if eeC.Opts.ElsVersion != nil { - // opts[utils.ElsVersionLow] = *eeC.Opts.ElsVersion - // } - // if eeC.Opts.ElsVersionType != nil { - // opts[utils.ElsVersionType] = *eeC.Opts.ElsVersionType - // } - // if eeC.Opts.ElsWaitForActiveShards != nil { - // opts[utils.ElsWaitForActiveShards] = *eeC.Opts.ElsWaitForActiveShards - // } - // if eeC.Opts.SQLMaxIdleConns != nil { - // opts[utils.SQLMaxIdleConnsCfg] = *eeC.Opts.SQLMaxIdleConns - // } - // if eeC.Opts.SQLMaxOpenConns != nil { - // opts[utils.SQLMaxOpenConns] = *eeC.Opts.SQLMaxOpenConns - // } - // if eeC.Opts.SQLConnMaxLifetime != nil { - // opts[utils.SQLConnMaxLifetime] = eeC.Opts.SQLConnMaxLifetime.String() - // } - // if eeC.Opts.SQLTableName != nil { - // opts[utils.SQLTableNameOpt] = *eeC.Opts.SQLTableName - // } - // if eeC.Opts.SQLDBName != nil { - // opts[utils.SQLDBNameOpt] = *eeC.Opts.SQLDBName - // } - // if eeC.Opts.SSLMode != nil { - // opts[utils.SSLModeCfg] = *eeC.Opts.SSLMode - // } - // if eeC.Opts.KafkaTopic != nil { - // opts[utils.KafkaTopic] = *eeC.Opts.KafkaTopic - // } - // if eeC.Opts.AMQPQueueID != nil { - // opts[utils.AMQPQueueID] = *eeC.Opts.AMQPQueueID - // } - // if eeC.Opts.AMQPRoutingKey != nil { - // opts[utils.AMQPRoutingKey] = *eeC.Opts.AMQPRoutingKey - // } - // if eeC.Opts.AMQPExchange != nil { - // opts[utils.AMQPExchange] = *eeC.Opts.AMQPExchange - // } - // if eeC.Opts.AMQPExchangeType != nil { - // opts[utils.AMQPExchangeType] = *eeC.Opts.AMQPExchangeType - // } - // if eeC.Opts.AWSRegion != nil { - // opts[utils.AWSRegion] = *eeC.Opts.AWSRegion - // } - // if eeC.Opts.AWSKey != nil { - // opts[utils.AWSKey] = *eeC.Opts.AWSKey - // } - // if eeC.Opts.AWSSecret != nil { - // opts[utils.AWSSecret] = *eeC.Opts.AWSSecret - // } - // if eeC.Opts.AWSToken != nil { - // opts[utils.AWSToken] = *eeC.Opts.AWSToken - // } - // if eeC.Opts.SQSQueueID != nil { - // opts[utils.SQSQueueID] = *eeC.Opts.SQSQueueID - // } - // if eeC.Opts.S3BucketID != nil { - // opts[utils.S3Bucket] = *eeC.Opts.S3BucketID - // } - // if eeC.Opts.S3FolderPath != nil { - // opts[utils.S3FolderPath] = *eeC.Opts.S3FolderPath - // } - // if eeC.Opts.NATSJetStream != nil { - // opts[utils.NatsJetStream] = *eeC.Opts.NATSJetStream - // } - // if eeC.Opts.NATSSubject != nil { - // opts[utils.NatsSubject] = *eeC.Opts.NATSSubject - // } - // if eeC.Opts.NATSJWTFile != nil { - // opts[utils.NatsJWTFile] = *eeC.Opts.NATSJWTFile - // } - // if eeC.Opts.NATSSeedFile != nil { - // opts[utils.NatsSeedFile] = *eeC.Opts.NATSSeedFile - // } - // if eeC.Opts.NATSCertificateAuthority != nil { - // opts[utils.NatsCertificateAuthority] = *eeC.Opts.NATSCertificateAuthority - // } - // if eeC.Opts.NATSClientCertificate != nil { - // opts[utils.NatsClientCertificate] = *eeC.Opts.NATSClientCertificate - // } - // if eeC.Opts.NATSClientKey != nil { - // opts[utils.NatsClientKey] = *eeC.Opts.NATSClientKey - // } - // if eeC.Opts.NATSJetStreamMaxWait != nil { - // opts[utils.NatsJetStreamMaxWait] = eeC.Opts.NATSJetStreamMaxWait.String() - // } + opts := map[string]interface{}{} + if eeC.Opts.CSVFieldSeparator != nil { + opts[utils.CSVFieldSepOpt] = *eeC.Opts.CSVFieldSeparator + } + if eeC.Opts.ElsIndex != nil { + opts[utils.ElsIndex] = *eeC.Opts.ElsIndex + } + if eeC.Opts.ElsIfPrimaryTerm != nil { + opts[utils.ElsIfPrimaryTerm] = *eeC.Opts.ElsIfPrimaryTerm + } + if eeC.Opts.ElsIfSeqNo != nil { + opts[utils.ElsIfSeqNo] = *eeC.Opts.ElsIfSeqNo + } + if eeC.Opts.ElsOpType != nil { + opts[utils.ElsOpType] = *eeC.Opts.ElsOpType + } + if eeC.Opts.ElsPipeline != nil { + opts[utils.ElsPipeline] = *eeC.Opts.ElsPipeline + } + if eeC.Opts.ElsRouting != nil { + opts[utils.ElsRouting] = *eeC.Opts.ElsRouting + } + if eeC.Opts.ElsTimeout != nil { + opts[utils.ElsTimeout] = eeC.Opts.ElsTimeout.String() + } + if eeC.Opts.ElsVersion != nil { + opts[utils.ElsVersionLow] = *eeC.Opts.ElsVersion + } + if eeC.Opts.ElsVersionType != nil { + opts[utils.ElsVersionType] = *eeC.Opts.ElsVersionType + } + if eeC.Opts.ElsWaitForActiveShards != nil { + opts[utils.ElsWaitForActiveShards] = *eeC.Opts.ElsWaitForActiveShards + } + if eeC.Opts.SQLMaxIdleConns != nil { + opts[utils.SQLMaxIdleConnsCfg] = *eeC.Opts.SQLMaxIdleConns + } + if eeC.Opts.SQLMaxOpenConns != nil { + opts[utils.SQLMaxOpenConns] = *eeC.Opts.SQLMaxOpenConns + } + if eeC.Opts.SQLConnMaxLifetime != nil { + opts[utils.SQLConnMaxLifetime] = eeC.Opts.SQLConnMaxLifetime.String() + } + if eeC.Opts.SQLTableName != nil { + opts[utils.SQLTableNameOpt] = *eeC.Opts.SQLTableName + } + if eeC.Opts.SQLDBName != nil { + opts[utils.SQLDBNameOpt] = *eeC.Opts.SQLDBName + } + if eeC.Opts.SSLMode != nil { + opts[utils.SSLModeCfg] = *eeC.Opts.SSLMode + } + if eeC.Opts.KafkaTopic != nil { + opts[utils.KafkaTopic] = *eeC.Opts.KafkaTopic + } + if eeC.Opts.AMQPQueueID != nil { + opts[utils.AMQPQueueID] = *eeC.Opts.AMQPQueueID + } + if eeC.Opts.AMQPRoutingKey != nil { + opts[utils.AMQPRoutingKey] = *eeC.Opts.AMQPRoutingKey + } + if eeC.Opts.AMQPExchange != nil { + opts[utils.AMQPExchange] = *eeC.Opts.AMQPExchange + } + if eeC.Opts.AMQPExchangeType != nil { + opts[utils.AMQPExchangeType] = *eeC.Opts.AMQPExchangeType + } + if eeC.Opts.AWSRegion != nil { + opts[utils.AWSRegion] = *eeC.Opts.AWSRegion + } + if eeC.Opts.AWSKey != nil { + opts[utils.AWSKey] = *eeC.Opts.AWSKey + } + if eeC.Opts.AWSSecret != nil { + opts[utils.AWSSecret] = *eeC.Opts.AWSSecret + } + if eeC.Opts.AWSToken != nil { + opts[utils.AWSToken] = *eeC.Opts.AWSToken + } + if eeC.Opts.SQSQueueID != nil { + opts[utils.SQSQueueID] = *eeC.Opts.SQSQueueID + } + if eeC.Opts.S3BucketID != nil { + opts[utils.S3Bucket] = *eeC.Opts.S3BucketID + } + if eeC.Opts.S3FolderPath != nil { + opts[utils.S3FolderPath] = *eeC.Opts.S3FolderPath + } + if eeC.Opts.NATSJetStream != nil { + opts[utils.NatsJetStream] = *eeC.Opts.NATSJetStream + } + if eeC.Opts.NATSSubject != nil { + opts[utils.NatsSubject] = *eeC.Opts.NATSSubject + } + if eeC.Opts.NATSJWTFile != nil { + opts[utils.NatsJWTFile] = *eeC.Opts.NATSJWTFile + } + if eeC.Opts.NATSSeedFile != nil { + opts[utils.NatsSeedFile] = *eeC.Opts.NATSSeedFile + } + if eeC.Opts.NATSCertificateAuthority != nil { + opts[utils.NatsCertificateAuthority] = *eeC.Opts.NATSCertificateAuthority + } + if eeC.Opts.NATSClientCertificate != nil { + opts[utils.NatsClientCertificate] = *eeC.Opts.NATSClientCertificate + } + if eeC.Opts.NATSClientKey != nil { + opts[utils.NatsClientKey] = *eeC.Opts.NATSClientKey + } + if eeC.Opts.NATSJetStreamMaxWait != nil { + opts[utils.NatsJetStreamMaxWait] = eeC.Opts.NATSJetStreamMaxWait.String() + } flgs := eeC.Flags.SliceFlags() if flgs == nil { @@ -742,12 +737,8 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str utils.AttemptsCfg: eeC.Attempts, utils.ConcurrentRequestsCfg: eeC.ConcurrentRequests, utils.FailedPostsDirCfg: eeC.FailedPostsDir, + utils.OptsCfg: opts, } - opts := make(map[string]interface{}) - for k, v := range eeC.Opts { - opts[k] = v - } - initialMP[utils.OptsCfg] = opts if eeC.Fields != nil { fields := make([]map[string]interface{}, 0, len(eeC.Fields)) diff --git a/config/eescfg_test.go b/config/eescfg_test.go index 18e08e437..e70b1475b 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -88,7 +88,7 @@ func TestEESClone(t *testing.T) { contentFields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, { @@ -166,8 +166,8 @@ func TestEESClone(t *testing.T) { Layout: time.RFC3339, }, }, - Opts: map[string]interface{}{ - utils.MetaDefault: "randomVal", + Opts: &EventExporterOpts{ + CSVFieldSeparator: utils.StringPointer(utils.InfieldSep), }, }, }, @@ -283,7 +283,7 @@ func TestEventExporterSameID(t *testing.T) { contentFields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, { @@ -305,7 +305,7 @@ func TestEventExporterSameID(t *testing.T) { }, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, }, @@ -405,7 +405,7 @@ func TestEEsCfgloadFromJsonCfgCase1(t *testing.T) { Fields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, { @@ -429,7 +429,7 @@ func TestEEsCfgloadFromJsonCfgCase1(t *testing.T) { Layout: time.RFC3339, }, }, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, Fields: []*FCTemplate{ {Tag: utils.CGRID, Path: "*exp.CGRID", Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.CGRID", utils.InfieldSep), Layout: time.RFC3339}, }, @@ -518,7 +518,7 @@ func TestEEsCfgloadFromJsonCfgCase2(t *testing.T) { Fields: []*FCTemplate{}, headerFields: []*FCTemplate{}, trailerFields: []*FCTemplate{}, - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, { @@ -543,7 +543,7 @@ func TestEEsCfgloadFromJsonCfgCase2(t *testing.T) { }, }, FailedPostsDir: "/var/spool/cgrates/failed_posts", - Opts: make(map[string]interface{}), + Opts: &EventExporterOpts{}, Fields: []*FCTemplate{ { Tag: utils.CGRID, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 648972263..2fa7696b9 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -314,7 +314,7 @@ type EventExporterJsonCfg struct { Id *string Type *string Export_path *string - Opts map[string]interface{} + Opts *EventExporterOptsJson Timezone *string Filters *[]string Flags *[]string diff --git a/ees/amqp.go b/ees/amqp.go index d03e2b071..dece91fec 100644 --- a/ees/amqp.go +++ b/ees/amqp.go @@ -55,21 +55,21 @@ type AMQPee struct { bytePreparing } -func (pstr *AMQPee) parseOpts(dialURL map[string]interface{}) { +func (pstr *AMQPee) parseOpts(dialURL *config.EventExporterOpts) { pstr.queueID = utils.DefaultQueueID pstr.routingKey = utils.DefaultQueueID - if vals, has := dialURL[utils.AMQPQueueID]; has { - pstr.queueID = utils.IfaceAsString(vals) + if dialURL.AMQPQueueID != nil { + pstr.queueID = *dialURL.AMQPQueueID } - if vals, has := dialURL[utils.AMQPRoutingKey]; has { - pstr.routingKey = utils.IfaceAsString(vals) + if dialURL.AMQPRoutingKey != nil { + pstr.routingKey = *dialURL.AMQPRoutingKey } - if vals, has := dialURL[utils.AMQPExchange]; has { - pstr.exchange = utils.IfaceAsString(vals) + if dialURL.AMQPExchange != nil { + pstr.exchange = *dialURL.AMQPExchange pstr.exchangeType = utils.DefaultExchangeType } - if vals, has := dialURL[utils.AMQPExchangeType]; has { - pstr.exchangeType = utils.IfaceAsString(vals) + if dialURL.AMQPExchangeType != nil { + pstr.exchangeType = *dialURL.AMQPExchangeType } } diff --git a/ees/amqpv1.go b/ees/amqpv1.go index bda9aca6e..2256d32f3 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -35,8 +35,8 @@ func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1 queueID: "/" + utils.DefaultQueueID, reqs: newConcReq(cfg.ConcurrentRequests), } - if vals, has := cfg.Opts[utils.AMQPQueueID]; has { - pstr.queueID = "/" + utils.IfaceAsString(vals) + if cfg.Opts.AMQPQueueID != nil { + pstr.queueID = "/" + *cfg.Opts.AMQPQueueID } return pstr } diff --git a/ees/elastic.go b/ees/elastic.go index e889c8a39..f4cb9b3e2 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -58,49 +58,29 @@ type ElasticEE struct { func (eEe *ElasticEE) prepareOpts() (err error) { //parse opts eEe.opts.Index = utils.CDRsTBL - if val, has := eEe.Cfg().Opts[utils.ElsIndex]; has { - eEe.opts.Index = utils.IfaceAsString(val) + if eEe.Cfg().Opts.ElsIndex != nil { + eEe.opts.Index = *eEe.Cfg().Opts.ElsIndex } - if val, has := eEe.Cfg().Opts[utils.ElsIfPrimaryTerm]; has { - var intVal int64 - if intVal, err = utils.IfaceAsTInt64(val); err != nil { - return - } - eEe.opts.IfPrimaryTerm = utils.IntPointer(int(intVal)) + eEe.opts.IfPrimaryTerm = eEe.Cfg().Opts.ElsIfPrimaryTerm + eEe.opts.IfSeqNo = eEe.Cfg().Opts.ElsIfSeqNo + if eEe.Cfg().Opts.ElsOpType != nil { + eEe.opts.OpType = *eEe.Cfg().Opts.ElsOpType } - if val, has := eEe.Cfg().Opts[utils.ElsIfSeqNo]; has { - var intVal int64 - if intVal, err = utils.IfaceAsTInt64(val); err != nil { - return - } - eEe.opts.IfSeqNo = utils.IntPointer(int(intVal)) + if eEe.Cfg().Opts.ElsPipeline != nil { + eEe.opts.Pipeline = *eEe.Cfg().Opts.ElsPipeline } - if val, has := eEe.Cfg().Opts[utils.ElsOpType]; has { - eEe.opts.OpType = utils.IfaceAsString(val) + if eEe.Cfg().Opts.ElsRouting != nil { + eEe.opts.Routing = *eEe.Cfg().Opts.ElsRouting } - if val, has := eEe.Cfg().Opts[utils.ElsPipeline]; has { - eEe.opts.Pipeline = utils.IfaceAsString(val) + if eEe.Cfg().Opts.ElsTimeout != nil { + eEe.opts.Timeout = *eEe.Cfg().Opts.ElsTimeout } - if val, has := eEe.Cfg().Opts[utils.ElsRouting]; has { - eEe.opts.Routing = utils.IfaceAsString(val) + eEe.opts.Version = eEe.Cfg().Opts.ElsVersion + if eEe.Cfg().Opts.ElsVersionType != nil { + eEe.opts.VersionType = *eEe.Cfg().Opts.ElsVersionType } - if val, has := eEe.Cfg().Opts[utils.ElsTimeout]; has { - if eEe.opts.Timeout, err = utils.IfaceAsDuration(val); err != nil { - return - } - } - if val, has := eEe.Cfg().Opts[utils.ElsVersionLow]; has { - var intVal int64 - if intVal, err = utils.IfaceAsTInt64(val); err != nil { - return - } - eEe.opts.Version = utils.IntPointer(int(intVal)) - } - if val, has := eEe.Cfg().Opts[utils.ElsVersionType]; has { - eEe.opts.VersionType = utils.IfaceAsString(val) - } - if val, has := eEe.Cfg().Opts[utils.ElsWaitForActiveShards]; has { - eEe.opts.WaitForActiveShards = utils.IfaceAsString(val) + if eEe.Cfg().Opts.ElsWaitForActiveShards != nil { + eEe.opts.WaitForActiveShards = *eEe.Cfg().Opts.ElsWaitForActiveShards } return } diff --git a/ees/elastic_test.go b/ees/elastic_test.go index 22e7eb8b9..93da3c437 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -56,7 +56,9 @@ func TestInitClient(t *testing.T) { func TestInitCase1(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsIndex: "test"}, + Opts: &config.EventExporterOpts{ + ElsIndex: utils.StringPointer("test"), + }, }, } if err := ee.prepareOpts(); err != nil { @@ -71,7 +73,9 @@ func TestInitCase1(t *testing.T) { func TestInitCase2(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsIfPrimaryTerm: 20}, + Opts: &config.EventExporterOpts{ + ElsIfPrimaryTerm: utils.IntPointer(20), + }, }, } if err := ee.prepareOpts(); err != nil { @@ -83,22 +87,12 @@ func TestInitCase2(t *testing.T) { } } -func TestInitCase2Err(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsIfPrimaryTerm: "test"}, - }, - } - errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" - if err := ee.prepareOpts(); err == nil || err.Error() != errExpect { - t.Errorf("Expected %+v \n but got %+v", errExpect, err) - } -} - func TestInitCase3(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsIfSeqNo: 20}, + Opts: &config.EventExporterOpts{ + ElsIfSeqNo: utils.IntPointer(20), + }, }, } if err := ee.prepareOpts(); err != nil { @@ -110,22 +104,12 @@ func TestInitCase3(t *testing.T) { } } -func TestInitCase3Err(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsIfSeqNo: "test"}, - }, - } - errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" - if err := ee.prepareOpts(); err == nil || err.Error() != errExpect { - t.Errorf("Expected %+v \n but got %+v", errExpect, err) - } -} - func TestInitCase4(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsOpType: "test"}, + Opts: &config.EventExporterOpts{ + ElsOpType: utils.StringPointer("test"), + }, }, } if err := ee.prepareOpts(); err != nil { @@ -140,7 +124,9 @@ func TestInitCase4(t *testing.T) { func TestInitCase5(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsPipeline: "test"}, + Opts: &config.EventExporterOpts{ + ElsPipeline: utils.StringPointer("test"), + }, }, } if err := ee.prepareOpts(); err != nil { @@ -155,7 +141,9 @@ func TestInitCase5(t *testing.T) { func TestInitCase6(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsRouting: "test"}, + Opts: &config.EventExporterOpts{ + ElsRouting: utils.StringPointer("test"), + }, }, } if err := ee.prepareOpts(); err != nil { @@ -167,22 +155,12 @@ func TestInitCase6(t *testing.T) { } } -func TestInitCase7(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsTimeout: "test"}, - }, - } - errExpect := "time: invalid duration \"test\"" - if err := ee.prepareOpts(); err == nil || err.Error() != errExpect { - t.Errorf("Expected %+v \n but got %+v", errExpect, err) - } -} - func TestInitCase8(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsVersionLow: 20}, + Opts: &config.EventExporterOpts{ + ElsVersion: utils.IntPointer(20), + }, }, } if err := ee.prepareOpts(); err != nil { @@ -194,22 +172,12 @@ func TestInitCase8(t *testing.T) { } } -func TestInitCase8Err(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsVersionLow: "test"}, - }, - } - errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" - if err := ee.prepareOpts(); err == nil || err.Error() != errExpect { - t.Errorf("Expected %+v \n but got %+v", errExpect, err) - } -} - func TestInitCase9(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsVersionType: "test"}, + Opts: &config.EventExporterOpts{ + ElsVersionType: utils.StringPointer("test"), + }, }, } if err := ee.prepareOpts(); err != nil { @@ -224,7 +192,9 @@ func TestInitCase9(t *testing.T) { func TestInitCase10(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ - Opts: map[string]interface{}{utils.ElsWaitForActiveShards: "test"}, + Opts: &config.EventExporterOpts{ + ElsWaitForActiveShards: utils.StringPointer("test"), + }, }, } if err := ee.prepareOpts(); err != nil { diff --git a/ees/filecsv.go b/ees/filecsv.go index 94b2d93ef..0ca64773b 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -73,8 +73,8 @@ func (fCsv *FileCSVee) init() (err error) { } fCsv.csvWriter = csv.NewWriter(fCsv.file) fCsv.csvWriter.Comma = utils.CSVSep - if fieldSep, has := fCsv.Cfg().Opts[utils.CSVFieldSepOpt]; has { - fCsv.csvWriter.Comma = rune(utils.IfaceAsString(fieldSep)[0]) + if fCsv.Cfg().Opts.CSVFieldSeparator != nil { + fCsv.csvWriter.Comma = rune((*fCsv.Cfg().Opts.CSVFieldSeparator)[0]) } return fCsv.composeHeader() } diff --git a/ees/kafka.go b/ees/kafka.go index 47dd98f9a..499ade216 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -34,8 +34,8 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE topic: utils.DefaultQueueID, reqs: newConcReq(cfg.ConcurrentRequests), } - if vals, has := cfg.Opts[utils.KafkaTopic]; has { - kfkPstr.topic = utils.IfaceAsString(vals) + if cfg.Opts.KafkaTopic != nil { + kfkPstr.topic = *cfg.Opts.KafkaTopic } return kfkPstr } diff --git a/ees/libcdre.go b/ees/libcdre.go index a1ffc31af..d2d8ab364 100644 --- a/ees/libcdre.go +++ b/ees/libcdre.go @@ -56,14 +56,27 @@ func writeFailedPosts(_ string, value interface{}) { } } -func AddFailedPost(failedPostsDir, expPath, format string, ev interface{}, opts map[string]interface{}) { +func AddFailedPost(failedPostsDir, expPath, format string, ev interface{}, opts *config.EventExporterOpts) { key := utils.ConcatenatedKey(failedPostsDir, expPath, format) // also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id - if qID := utils.FirstNonEmpty( - utils.IfaceAsString(opts[utils.AMQPQueueID]), - utils.IfaceAsString(opts[utils.S3Bucket]), - utils.IfaceAsString(opts[utils.SQSQueueID]), - utils.IfaceAsString(opts[utils.KafkaTopic])); len(qID) != 0 { + var amqpQueueID string + var s3BucketID string + var sqsQueueID string + var kafkaTopic string + if opts.AMQPQueueID != nil { + amqpQueueID = *opts.AMQPQueueID + } + if opts.S3BucketID != nil { + s3BucketID = *opts.S3BucketID + } + if opts.SQSQueueID != nil { + sqsQueueID = *opts.SQSQueueID + } + if opts.KafkaTopic != nil { + kafkaTopic = *opts.KafkaTopic + } + if qID := utils.FirstNonEmpty(amqpQueueID, s3BucketID, sqsQueueID, + kafkaTopic); len(qID) != 0 { key = utils.ConcatenatedKey(key, qID) } var failedPost *ExportEvents @@ -107,7 +120,7 @@ func NewExportEventsFromFile(filePath string) (expEv *ExportEvents, err error) { type ExportEvents struct { lk sync.RWMutex Path string - Opts map[string]interface{} + Opts *config.EventExporterOpts Format string Events []interface{} failedPostsDir string diff --git a/ees/libcdre_test.go b/ees/libcdre_test.go index 786879cfb..b64b2758d 100644 --- a/ees/libcdre_test.go +++ b/ees/libcdre_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -38,7 +39,7 @@ func TestSetFldPostCacheTTL(t *testing.T) { func TestAddFldPost(t *testing.T) { SetFailedPostCacheTTL(5 * time.Second) - AddFailedPost("", "path1", "format1", "1", make(map[string]interface{})) + AddFailedPost("", "path1", "format1", "1", &config.EventExporterOpts{}) x, ok := failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1")) if !ok { t.Error("Error reading from cache") @@ -55,13 +56,15 @@ func TestAddFldPost(t *testing.T) { Path: "path1", Format: "format1", Events: []interface{}{"1"}, - Opts: make(map[string]interface{}), + Opts: &config.EventExporterOpts{}, } if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) } - AddFailedPost("", "path1", "format1", "2", make(map[string]interface{})) - AddFailedPost("", "path2", "format2", "3", map[string]interface{}{utils.SQSQueueID: "qID"}) + AddFailedPost("", "path1", "format1", "2", &config.EventExporterOpts{}) + AddFailedPost("", "path2", "format2", "3", &config.EventExporterOpts{ + SQSQueueID: utils.StringPointer("qID"), + }) x, ok = failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1")) if !ok { t.Error("Error reading from cache") @@ -77,7 +80,7 @@ func TestAddFldPost(t *testing.T) { Path: "path1", Format: "format1", Events: []interface{}{"1", "2"}, - Opts: make(map[string]interface{}), + Opts: &config.EventExporterOpts{}, } if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) @@ -97,7 +100,9 @@ func TestAddFldPost(t *testing.T) { Path: "path2", Format: "format2", Events: []interface{}{"3"}, - Opts: map[string]interface{}{utils.SQSQueueID: "qID"}, + Opts: &config.EventExporterOpts{ + SQSQueueID: utils.StringPointer("qID"), + }, } if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) diff --git a/ees/nats.go b/ees/nats.go index 5d4782fa3..cdc491bbd 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -60,24 +60,18 @@ type NatsEE struct { bytePreparing } -func (pstr *NatsEE) parseOpt(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (err error) { - if useJetStreamVal, has := opts[utils.NatsJetStream]; has { - if pstr.jetStream, err = utils.IfaceAsBool(useJetStreamVal); err != nil { - return - } +func (pstr *NatsEE) parseOpt(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (err error) { + if opts.NATSJetStream != nil { + pstr.jetStream = *opts.NATSJetStream } pstr.subject = utils.DefaultQueueID - if vals, has := opts[utils.NatsSubject]; has { - pstr.subject = utils.IfaceAsString(vals) + if opts.NATSSubject != nil { + pstr.subject = *opts.NATSSubject } pstr.opts, err = GetNatsOpts(opts, nodeID, connTimeout) if pstr.jetStream { - if maxWaitVal, has := opts[utils.NatsJetStreamMaxWait]; has { - var maxWait time.Duration - if maxWait, err = utils.IfaceAsDuration(maxWaitVal); err != nil { - return - } - pstr.jsOpts = []nats.JSOpt{nats.MaxWait(maxWait)} + if opts.NATSJetStreamMaxWait != nil { + pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*opts.NATSJetStreamMaxWait)} } } return @@ -129,51 +123,51 @@ func (pstr *NatsEE) Close() (err error) { func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } -func GetNatsOpts(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { +func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { nop = make([]nats.Option, 0, 7) nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID), nats.Timeout(connTimeout), nats.DrainTimeout(time.Second)) - if userFile, has := opts[utils.NatsJWTFile]; has { + if opts.NATSJWTFile != nil { keys := make([]string, 0, 1) - if keyFile, has := opts[utils.NatsSeedFile]; has { - keys = append(keys, utils.IfaceAsString(keyFile)) + if opts.NATSSeedFile != nil { + keys = append(keys, *opts.NATSSeedFile) } - nop = append(nop, nats.UserCredentials(utils.IfaceAsString(userFile), keys...)) + nop = append(nop, nats.UserCredentials(*opts.NATSJWTFile, keys...)) } - if nkeyFile, has := opts[utils.NatsSeedFile]; has { - opt, err := nats.NkeyOptionFromSeed(utils.IfaceAsString(nkeyFile)) + if opts.NATSSeedFile != nil { + opt, err := nats.NkeyOptionFromSeed(*opts.NATSSeedFile) if err != nil { return nil, err } nop = append(nop, opt) } - if certFile, has := opts[utils.NatsClientCertificate]; has { - clientFile, has := opts[utils.NatsClientKey] - if !has { + if opts.NATSClientCertificate != nil { + if opts.NATSClientKey == nil { err = fmt.Errorf("has certificate but no key") return } - nop = append(nop, nats.ClientCert(utils.IfaceAsString(certFile), utils.IfaceAsString(clientFile))) - } else if _, has := opts[utils.NatsClientKey]; has { + nop = append(nop, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey)) + } else if opts.NATSClientKey != nil { err = fmt.Errorf("has key but no certificate") return } - if caFile, has := opts[utils.NatsCertificateAuthority]; has { + if opts.NATSCertificateAuthority != nil { nop = append(nop, func(o *nats.Options) error { pool, err := x509.SystemCertPool() if err != nil { return err } - rootPEM, err := ioutil.ReadFile(utils.IfaceAsString(caFile)) + rootPEM, err := ioutil.ReadFile(*opts.NATSCertificateAuthority) if err != nil || rootPEM == nil { return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err) } ok := pool.AppendCertsFromPEM(rootPEM) if !ok { - return fmt.Errorf("nats: failed to parse root certificate from %q", caFile) + return fmt.Errorf("nats: failed to parse root certificate from %q", + *opts.NATSCertificateAuthority) } if o.TLSConfig == nil { o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go index 49f415795..ef406651c 100644 --- a/ees/nats_it_test.go +++ b/ees/nats_it_test.go @@ -196,9 +196,8 @@ func TestGetNatsOptsSeedFile(t *testing.T) { nkey := "SUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY" os.WriteFile("/tmp/nkey.txt", []byte(nkey), 0777) - opts := map[string]interface{}{ - utils.NatsSeedFile: "/tmp/nkey.txt", - // utils.NatsSeedFile: "file", + opts := &config.EventExporterOpts{ + NATSSeedFile: utils.StringPointer("/tmp/nkey.txt"), } nodeID := "node_id1" diff --git a/ees/nats_test.go b/ees/nats_test.go index 10acbe4ab..90c45bbfc 100644 --- a/ees/nats_test.go +++ b/ees/nats_test.go @@ -73,7 +73,7 @@ func TestParseOpt(t *testing.T) { Attempts: 2, ConcurrentRequests: 2, } - opts := map[string]interface{}{} + opts := &config.EventExporterOpts{} nodeID := "node_id1" connTimeout := 2 * time.Second dc, err := newEEMetrics("Local") @@ -99,8 +99,8 @@ func TestParseOptJetStream(t *testing.T) { Attempts: 2, ConcurrentRequests: 2, } - opts := map[string]interface{}{ - utils.NatsJetStream: true, + opts := &config.EventExporterOpts{ + NATSJetStream: utils.BoolPointer(true), } nodeID := "node_id1" connTimeout := 2 * time.Second @@ -121,17 +121,6 @@ func TestParseOptJetStream(t *testing.T) { if !pstr.jetStream { t.Error("Expected jetStream to be true") } - - //test error on converson - opts = map[string]interface{}{ - utils.NatsJetStream: uint16(2), - } - - err = pstr.parseOpt(opts, nodeID, connTimeout) - - if err.Error() != "cannot convert field: 2 to bool" { - t.Error("The conversion shouldn't have been possible") - } } func TestParseOptJetStreamMaxWait(t *testing.T) { @@ -141,9 +130,9 @@ func TestParseOptJetStreamMaxWait(t *testing.T) { Attempts: 2, ConcurrentRequests: 2, } - opts := map[string]interface{}{ - utils.NatsJetStream: true, - utils.NatsJetStreamMaxWait: "2ns", + opts := &config.EventExporterOpts{ + NATSJetStream: utils.BoolPointer(true), + NATSJetStreamMaxWait: utils.DurationPointer(2), } nodeID := "node_id1" connTimeout := 2 * time.Second @@ -164,17 +153,6 @@ func TestParseOptJetStreamMaxWait(t *testing.T) { if !reflect.DeepEqual(pstr.jsOpts, exp) { t.Errorf("Expected %v \n but received \n %v", exp, pstr.jsOpts) } - - //test conversion error - opts = map[string]interface{}{ - utils.NatsJetStream: true, - utils.NatsJetStreamMaxWait: true, - } - - err = pstr.parseOpt(opts, nodeID, connTimeout) - if err.Error() != "cannot convert field: true to time.Duration" { - t.Errorf("The conversion shouldn't have been possible: %v", err.Error()) - } } func TestParseOptSubject(t *testing.T) { @@ -184,8 +162,8 @@ func TestParseOptSubject(t *testing.T) { Attempts: 2, ConcurrentRequests: 2, } - opts := map[string]interface{}{ - utils.NatsSubject: "nats_subject", + opts := &config.EventExporterOpts{ + NATSSubject: utils.StringPointer("nats_subject"), } nodeID := "node_id1" connTimeout := 2 * time.Second @@ -203,15 +181,14 @@ func TestParseOptSubject(t *testing.T) { t.Error(err) } - if pstr.subject != opts[utils.NatsSubject] { - t.Errorf("Expected %v \n but received \n %v", opts[utils.NatsSubject], pstr.subject) + if opts.NATSSubject == nil || pstr.subject != *opts.NATSSubject { + t.Errorf("Expected %v \n but received \n %v", *opts.NATSSubject, pstr.subject) } } func TestGetNatsOptsJWT(t *testing.T) { - opts := map[string]interface{}{ - utils.NatsJWTFile: "jwtfile", - // utils.NatsSeedFile: "file", + opts := &config.EventExporterOpts{ + NATSJWTFile: utils.StringPointer("jwtfile"), } nodeID := "node_id1" @@ -224,9 +201,9 @@ func TestGetNatsOptsJWT(t *testing.T) { } func TestGetNatsOptsClientCert(t *testing.T) { - opts := map[string]interface{}{ - utils.NatsClientCertificate: "client_cert", - utils.NatsClientKey: "client_key", + opts := &config.EventExporterOpts{ + NATSClientCertificate: utils.StringPointer("client_cert"), + NATSClientKey: utils.StringPointer("client_key"), } nodeID := "node_id1" connTimeout := 2 * time.Second @@ -245,8 +222,8 @@ func TestGetNatsOptsClientCert(t *testing.T) { // } // no key error - opts = map[string]interface{}{ - utils.NatsClientCertificate: "client_cert", + opts = &config.EventExporterOpts{ + NATSClientCertificate: utils.StringPointer("client_cert"), } _, err = GetNatsOpts(opts, nodeID, connTimeout) if err.Error() != "has certificate but no key" { @@ -254,8 +231,8 @@ func TestGetNatsOptsClientCert(t *testing.T) { } // no certificate error - opts = map[string]interface{}{ - utils.NatsClientKey: "client_key", + opts = &config.EventExporterOpts{ + NATSClientKey: utils.StringPointer("client_key"), } _, err = GetNatsOpts(opts, nodeID, connTimeout) if err.Error() != "has key but no certificate" { diff --git a/ees/poster_it_test.go b/ees/poster_it_test.go index d7d489dc7..0912292b6 100644 --- a/ees/poster_it_test.go +++ b/ees/poster_it_test.go @@ -80,7 +80,7 @@ func TestHttpJsonPoster(t *testing.T) { if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: jsn, Header: make(http.Header)}, ""); err == nil { t.Error("Expected error") } - AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.MetaHTTPjsonMap, jsn, make(map[string]interface{})) + AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.MetaHTTPjsonMap, jsn, &config.EventExporterOpts{}) time.Sleep(5 * time.Millisecond) fs, err := filepath.Glob("/tmp/EEs*") if err != nil { @@ -117,7 +117,7 @@ func TestHttpBytesPoster(t *testing.T) { if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: content, Header: make(http.Header)}, ""); err == nil { t.Error("Expected error") } - AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.ContentJSON, content, make(map[string]interface{})) + AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.ContentJSON, content, &config.EventExporterOpts{}) time.Sleep(5 * time.Millisecond) fs, err := filepath.Glob("/tmp/test2*") if err != nil { @@ -154,11 +154,11 @@ func TestSQSPoster(t *testing.T) { awsSecret := "replace-this-with-your-secret" qname := "cgrates-cdrs" - opts := map[string]interface{}{ - utils.AWSRegion: region, - utils.AWSKey: awsKey, - utils.AWSSecret: awsSecret, - utils.SQSQueueID: qname, + opts := &config.EventExporterOpts{ + AWSRegion: utils.StringPointer(region), + AWSKey: utils.StringPointer(awsKey), + AWSSecret: utils.StringPointer(awsSecret), + SQSQueueID: utils.StringPointer(qname), } //##################################### @@ -237,11 +237,11 @@ func TestS3Poster(t *testing.T) { awsSecret := "replace-this-with-your-secret" qname := "cgrates-cdrs" - opts := map[string]interface{}{ - utils.AWSRegion: region, - utils.AWSKey: awsKey, - utils.AWSSecret: awsSecret, - utils.S3Bucket: qname, + opts := &config.EventExporterOpts{ + AWSRegion: utils.StringPointer(region), + AWSKey: utils.StringPointer(awsKey), + AWSSecret: utils.StringPointer(awsSecret), + SQSQueueID: utils.StringPointer(qname), } //##################################### @@ -299,8 +299,8 @@ func TestAMQPv1Poster(t *testing.T) { // update this variables endpoint := "amqps://RootManageSharedAccessKey:UlfIJ%2But11L0ZzA%2Fgpje8biFJeQihpWibJsUhaOi1DU%3D@cdrscgrates.servicebus.windows.net" qname := "cgrates-cdrs" - opts := map[string]interface{}{ - utils.AMQPQueueID: qname, + opts := &config.EventExporterOpts{ + AMQPQueueID: utils.StringPointer(qname), } //##################################### diff --git a/ees/poster_test.go b/ees/poster_test.go index f3c9364b7..75ccd7c50 100644 --- a/ees/poster_test.go +++ b/ees/poster_test.go @@ -37,11 +37,11 @@ func TestAMQPeeParseURL(t *testing.T) { exchangeType: "fanout", routingKey: "CGRCDR", } - opts := map[string]interface{}{ - utils.AMQPQueueID: "q1", - utils.AMQPExchange: "E1", - utils.AMQPRoutingKey: "CGRCDR", - utils.AMQPExchangeType: "fanout", + opts := &config.EventExporterOpts{ + AMQPQueueID: utils.StringPointer("q1"), + AMQPExchange: utils.StringPointer("E1"), + AMQPRoutingKey: utils.StringPointer("CGRCDR"), + AMQPExchangeType: utils.StringPointer("fanout"), } amqp.parseOpts(opts) if !reflect.DeepEqual(expected, amqp) { @@ -53,7 +53,9 @@ func TestKafkaParseURL(t *testing.T) { cfg := &config.EventExporterCfg{ ExportPath: "127.0.0.1:9092", Attempts: 10, - Opts: map[string]interface{}{utils.KafkaTopic: "cdr_billing"}, + Opts: &config.EventExporterOpts{ + KafkaTopic: utils.StringPointer("cdr_billing"), + }, } exp := &KafkaEE{ cfg: cfg, diff --git a/ees/s3.go b/ees/s3.go index 8aedd8c29..38cf9ee70 100644 --- a/ees/s3.go +++ b/ees/s3.go @@ -60,25 +60,25 @@ type S3EE struct { bytePreparing } -func (pstr *S3EE) parseOpts(opts map[string]interface{}) { +func (pstr *S3EE) parseOpts(opts *config.EventExporterOpts) { pstr.bucket = utils.DefaultQueueID - if val, has := opts[utils.S3Bucket]; has { - pstr.bucket = utils.IfaceAsString(val) + if opts.S3BucketID != nil { + pstr.bucket = *opts.S3BucketID } - if val, has := opts[utils.S3FolderPath]; has { - pstr.folderPath = utils.IfaceAsString(val) + if opts.S3FolderPath != nil { + pstr.folderPath = *opts.S3FolderPath } - if val, has := opts[utils.AWSRegion]; has { - pstr.awsRegion = utils.IfaceAsString(val) + if opts.AWSRegion != nil { + pstr.awsRegion = *opts.AWSRegion } - if val, has := opts[utils.AWSKey]; has { - pstr.awsID = utils.IfaceAsString(val) + if opts.AWSKey != nil { + pstr.awsID = *opts.AWSKey } - if val, has := opts[utils.AWSSecret]; has { - pstr.awsKey = utils.IfaceAsString(val) + if opts.AWSSecret != nil { + pstr.awsKey = *opts.AWSSecret } - if val, has := opts[utils.AWSToken]; has { - pstr.awsToken = utils.IfaceAsString(val) + if opts.AWSToken != nil { + pstr.awsToken = *opts.AWSToken } } diff --git a/ees/sql.go b/ees/sql.go index 253f5b8d3..2f5c3bd36 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -71,18 +71,18 @@ func (sqlEe *SQLEe) initDialector() (err error) { password, _ := u.User.Password() dbname := utils.SQLDefaultDBName - if vals, has := sqlEe.Cfg().Opts[utils.SQLDBNameOpt]; has { - dbname = utils.IfaceAsString(vals) + if sqlEe.Cfg().Opts.SQLDBName != nil { + dbname = *sqlEe.Cfg().Opts.SQLDBName } ssl := utils.SQLDefaultSSLMode - if vals, has := sqlEe.Cfg().Opts[utils.SSLModeCfg]; has { - ssl = utils.IfaceAsString(vals) + if sqlEe.Cfg().Opts.SSLMode != nil { + ssl = *sqlEe.Cfg().Opts.SSLMode } // tableName is mandatory in opts - if iface, has := sqlEe.Cfg().Opts[utils.SQLTableNameOpt]; !has { - return utils.NewErrMandatoryIeMissing(utils.SQLTableNameOpt) + if sqlEe.Cfg().Opts.SQLTableName != nil { + sqlEe.tableName = *sqlEe.Cfg().Opts.SQLTableName } else { - sqlEe.tableName = utils.IfaceAsString(iface) + return utils.NewErrMandatoryIeMissing(utils.SQLTableNameOpt) } // var dialect gorm.Dialector @@ -98,7 +98,7 @@ func (sqlEe *SQLEe) initDialector() (err error) { return } -func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, sqlDB *sql.DB, err error) { +func openDB(dialect gorm.Dialector, opts *config.EventExporterOpts) (db *gorm.DB, sqlDB *sql.DB, err error) { if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil { return } @@ -106,26 +106,14 @@ func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, s return } - if iface, has := opts[utils.SQLMaxIdleConnsCfg]; has { - val, err := utils.IfaceAsTInt64(iface) - if err != nil { - return nil, nil, err - } - sqlDB.SetMaxIdleConns(int(val)) + if opts.SQLMaxIdleConns != nil { + sqlDB.SetMaxIdleConns(*opts.SQLMaxIdleConns) } - if iface, has := opts[utils.SQLMaxOpenConns]; has { - val, err := utils.IfaceAsTInt64(iface) - if err != nil { - return nil, nil, err - } - sqlDB.SetMaxOpenConns(int(val)) + if opts.SQLMaxOpenConns != nil { + sqlDB.SetMaxOpenConns(*opts.SQLMaxOpenConns) } - if iface, has := opts[utils.SQLConnMaxLifetime]; has { - val, err := utils.IfaceAsDuration(iface) - if err != nil { - return nil, nil, err - } - sqlDB.SetConnMaxLifetime(val) + if opts.SQLConnMaxLifetime != nil { + sqlDB.SetConnMaxLifetime(*opts.SQLConnMaxLifetime) } return diff --git a/ees/sql_it_test.go b/ees/sql_it_test.go index b2e3412de..64e9947e1 100644 --- a/ees/sql_it_test.go +++ b/ees/sql_it_test.go @@ -233,64 +233,40 @@ func testSqlEeVerifyExportedEvent2(t *testing.T) { func TestOpenDB1(t *testing.T) { dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxIdleConnsCfg: 2}) + _, _, err := openDB(dialect, &config.EventExporterOpts{ + SQLMaxIdleConns: utils.IntPointer(2), + }) if err != nil { t.Error(err) } } -func TestOpenDB1Err(t *testing.T) { - dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", - "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxIdleConnsCfg: "test"}) - errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" - if err == nil || err.Error() != errExpect { - t.Errorf("Expected %v but received %v", errExpect, err) - } -} - func TestOpenDB2(t *testing.T) { dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxOpenConns: 2}) + _, _, err := openDB(dialect, &config.EventExporterOpts{ + SQLMaxOpenConns: utils.IntPointer(2), + }) if err != nil { t.Error(err) } } -func TestOpenDB2Err(t *testing.T) { - dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", - "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxOpenConns: "test"}) - errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" - if err == nil || err.Error() != errExpect { - t.Errorf("Expected %v but received %v", errExpect, err) - } -} - func TestOpenDB3(t *testing.T) { dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLConnMaxLifetime: 2}) + _, _, err := openDB(dialect, &config.EventExporterOpts{ + SQLConnMaxLifetime: utils.DurationPointer(2), + }) if err != nil { t.Error(err) } } -func TestOpenDB3Err(t *testing.T) { - dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", - "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLConnMaxLifetime: "test"}) - errExpect := "time: invalid duration \"test\"" - if err == nil || err.Error() != errExpect { - t.Errorf("Expected %v but received %v", errExpect, err) - } -} - func TestSQLExportEvent1(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "cgrates" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable") + cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("cgrates") cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306` sqlEe, err := NewSQLEe(cgrCfg.EEsCfg().Exporters[0], nil) if err != nil { diff --git a/ees/sql_test.go b/ees/sql_test.go index cae7b0ebd..9ed0f15a2 100644 --- a/ees/sql_test.go +++ b/ees/sql_test.go @@ -46,9 +46,9 @@ func TestSqlGetMetrics(t *testing.T) { func TestNewSQLeUrl(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SSLModeCfg] = "test" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable") + cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("postgres") + cgrCfg.EEsCfg().Exporters[0].Opts.SSLMode = utils.StringPointer("test") sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], reqs: newConcReq(0), @@ -61,8 +61,8 @@ func TestNewSQLeUrl(t *testing.T) { func TestNewSQLeUrlSQL(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "mysql" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable") + cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("mysql") cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306` sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], @@ -79,8 +79,8 @@ func TestNewSQLeUrlSQL(t *testing.T) { func TestNewSQLeUrlPostgres(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable") + cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("postgres") cgrCfg.EEsCfg().Exporters[0].ExportPath = `postgres://cgrates:CGRateS.org@127.0.0.1:3306` sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], @@ -97,8 +97,8 @@ func TestNewSQLeUrlPostgres(t *testing.T) { func TestNewSQLeExportPathError(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres" + cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable") + cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("postgres") cgrCfg.EEsCfg().Exporters[0].ExportPath = ":foo" sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], @@ -120,7 +120,7 @@ func TestOpenDBError2(t *testing.T) { tmp := logger.Default logger.Default = logger.Default.LogMode(logger.Silent) mckDialect := new(mockDialect2) - _, _, err := openDB(mckDialect, make(map[string]interface{})) + _, _, err := openDB(mckDialect, &config.EventExporterOpts{}) errExpect := "invalid db" if err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) @@ -140,7 +140,7 @@ func TestOpenDBError3(t *testing.T) { tmp := logger.Default logger.Default = logger.Default.LogMode(logger.Silent) mckDialect := new(mockDialectErr) - _, _, err := openDB(mckDialect, make(map[string]interface{})) + _, _, err := openDB(mckDialect, &config.EventExporterOpts{}) errExpect := "NOT_FOUND" if err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) diff --git a/ees/sqs.go b/ees/sqs.go index 83f97d65f..96f25378f 100644 --- a/ees/sqs.go +++ b/ees/sqs.go @@ -59,22 +59,22 @@ type SQSee struct { bytePreparing } -func (pstr *SQSee) parseOpts(opts map[string]interface{}) { +func (pstr *SQSee) parseOpts(opts *config.EventExporterOpts) { pstr.queueID = utils.DefaultQueueID - if val, has := opts[utils.SQSQueueID]; has { - pstr.queueID = utils.IfaceAsString(val) + if opts.SQSQueueID != nil { + pstr.queueID = *opts.SQSQueueID } - if val, has := opts[utils.AWSRegion]; has { - pstr.awsRegion = utils.IfaceAsString(val) + if opts.AWSRegion != nil { + pstr.awsRegion = *opts.AWSRegion } - if val, has := opts[utils.AWSKey]; has { - pstr.awsID = utils.IfaceAsString(val) + if opts.AWSKey != nil { + pstr.awsID = *opts.AWSKey } - if val, has := opts[utils.AWSSecret]; has { - pstr.awsKey = utils.IfaceAsString(val) + if opts.AWSSecret != nil { + pstr.awsKey = *opts.AWSSecret } - if val, has := opts[utils.AWSToken]; has { - pstr.awsToken = utils.IfaceAsString(val) + if opts.AWSToken != nil { + pstr.awsToken = *opts.AWSToken } } diff --git a/ers/kafka_test.go b/ers/kafka_test.go index 48b7126a8..703c0d43e 100644 --- a/ers/kafka_test.go +++ b/ers/kafka_test.go @@ -138,7 +138,7 @@ func TestKafkaERServe2(t *testing.T) { poster: ees.NewKafkaEE(&config.EventExporterCfg{ ExportPath: "url", Attempts: 1, - Opts: make(map[string]interface{}), + Opts: &config.EventExporterOpts{}, }, nil), } rdr.rdrExit <- struct{}{}