diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index 31e9fd79d..3d829a5d4 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -2173,7 +2173,8 @@ func testApierReplayFldPosts(t *testing.T) { ev = &ees.ExportEvents{ Path: "amqp://guest:guest@localhost:5672/", Opts: &config.EventExporterOpts{ - AMQPQueueID: utils.StringPointer("cgrates_cdrs"), + AMQP: &config.AMQPOpts{ + QueueID: utils.StringPointer("cgrates_cdrs")}, }, Format: utils.MetaAMQPjsonMap, Events: []any{bev}, diff --git a/config/config.go b/config/config.go index 355110a0b..651fdda8d 100644 --- a/config/config.go +++ b/config/config.go @@ -176,7 +176,15 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) { cfg.configSCfg = new(ConfigSCfg) cfg.apiBanCfg = new(APIBanCfg) cfg.coreSCfg = new(CoreSCfg) - cfg.dfltEvExp = &EventExporterCfg{Opts: &EventExporterOpts{}} + cfg.dfltEvExp = &EventExporterCfg{Opts: &EventExporterOpts{ + Els: new(ElsOpts), + SQL: new(SQLOpts), + AMQP: new(AMQPOpts), + AWS: new(AWSOpts), + NATS: new(NATSOpts), + RPC: new(RPCOpts), + Kafka: new(KafkaOpts), + }} cfg.dfltEvRdr = &EventReaderCfg{Opts: &EventReaderOpts{}} cfg.cacheDP = make(map[string]utils.MapStorage) diff --git a/config/config_test.go b/config/config_test.go index 9b240a564..b664d9dba 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -5475,19 +5475,27 @@ func TestCgrCfgEventReaderDefault(t *testing.T) { func TestCgrCfgEventExporterDefault(t *testing.T) { eCfg := &EventExporterCfg{ - ID: utils.MetaDefault, - Type: utils.MetaNone, - ExportPath: "/var/spool/cgrates/ees", - Attempts: 1, - Timezone: utils.EmptyString, - Filters: []string{}, - AttributeSIDs: []string{}, - Flags: utils.FlagsWithParams{}, - contentFields: []*FCTemplate{}, - Fields: []*FCTemplate{}, - headerFields: []*FCTemplate{}, - trailerFields: []*FCTemplate{}, - Opts: &EventExporterOpts{}, + ID: utils.MetaDefault, + Type: utils.MetaNone, + ExportPath: "/var/spool/cgrates/ees", + Attempts: 1, + Timezone: utils.EmptyString, + Filters: []string{}, + AttributeSIDs: []string{}, + Flags: utils.FlagsWithParams{}, + contentFields: []*FCTemplate{}, + Fields: []*FCTemplate{}, + headerFields: []*FCTemplate{}, + trailerFields: []*FCTemplate{}, + Opts: &EventExporterOpts{ + Els: &ElsOpts{}, + AMQP: &AMQPOpts{}, + AWS: &AWSOpts{}, + SQL: &SQLOpts{}, + NATS: &NATSOpts{}, + RPC: &RPCOpts{}, + Kafka: &KafkaOpts{}, + }, FailedPostsDir: "/var/spool/cgrates/failed_posts", } if !reflect.DeepEqual(cgrCfg.dfltEvExp, eCfg) { diff --git a/config/eescfg.go b/config/eescfg.go index fe614a00e..96d29b2c5 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -153,72 +153,96 @@ func (eeS *EEsCfg) AsMapInterface(separator string) (initialMP map[string]any) { return } +type ElsOpts struct { + Index *string + IfPrimaryTerm *int + DiscoverNodesOnStart *bool + DiscoverNodeInterval *time.Duration + Cloud *bool + APIKey *string + CertificateFingerprint *string + ServiceToken *string + Username *string // Username for HTTP Basic Authentication. + Password *string + EnableDebugLogger *bool + Logger *string + CompressRequestBody *bool + CompressRequestBodyLevel *int + RetryOnStatus *[]int + MaxRetries *int + DisableRetry *bool + IfSeqNo *int + OpType *string + Pipeline *string + Routing *string + Timeout *time.Duration + Version *int + VersionType *string + WaitForActiveShards *string +} + +type SQLOpts struct { + MaxIdleConns *int + MaxOpenConns *int + ConnMaxLifetime *time.Duration + MYSQLDSNParams map[string]string + TableName *string + DBName *string + PgSSLMode *string +} + +type AMQPOpts struct { + RoutingKey *string + QueueID *string + Exchange *string + ExchangeType *string + Username *string + Password *string +} +type AWSOpts struct { + Region *string + Key *string + Secret *string + Token *string + SQSQueueID *string + S3BucketID *string + S3FolderPath *string +} +type NATSOpts struct { + JetStream *bool + Subject *string + JWTFile *string + SeedFile *string + CertificateAuthority *string + ClientCertificate *string + ClientKey *string + JetStreamMaxWait *time.Duration +} + +type RPCOpts struct { + RPCCodec *string + ServiceMethod *string + KeyPath *string + CertPath *string + CAPath *string + TLS *bool + ConnIDs *[]string + RPCConnTimeout *time.Duration + RPCReplyTimeout *time.Duration + RPCAPIOpts map[string]any +} +type KafkaOpts struct { + KafkaTopic *string +} type EventExporterOpts struct { - CSVFieldSeparator *string - ElsIndex *string - ElsIfPrimaryTerm *int - ElsDiscoverNodesOnStart *bool - ElsDiscoverNodeInterval *time.Duration - ElsCloud *bool - ElsAPIKey *string - ElsCertificateFingerprint *string - ElsServiceToken *string - ElsUsername *string // Username for HTTP Basic Authentication. - ElsPassword *string - ElsEnableDebugLogger *bool - ElsLogger *string - ElsCompressRequestBody *bool - ElsCompressRequestBodyLevel *int - ElsRetryOnStatus *[]int - ElsMaxRetries *int - ElsDisableRetry *bool - ElsIfSeqNo *int - ElsOpType *string - ElsPipeline *string - ElsRouting *string - ElsTimeout *time.Duration - ElsVersion *int - ElsVersionType *string - ElsWaitForActiveShards *string - SQLMaxIdleConns *int - SQLMaxOpenConns *int - SQLConnMaxLifetime *time.Duration - MYSQLDSNParams map[string]string - SQLTableName *string - SQLDBName *string - PgSSLMode *string - KafkaTopic *string - AMQPRoutingKey *string - AMQPQueueID *string - AMQPExchange *string - AMQPExchangeType *string - AMQPUsername *string - AMQPPassword *string - AWSRegion *string - AWSKey *string - AWSSecret *string - AWSToken *string - SQSQueueID *string - S3BucketID *string - S3FolderPath *string - NATSJetStream *bool - NATSSubject *string - NATSJWTFile *string - NATSSeedFile *string - NATSCertificateAuthority *string - NATSClientCertificate *string - NATSClientKey *string - NATSJetStreamMaxWait *time.Duration - RPCCodec *string - ServiceMethod *string - KeyPath *string - CertPath *string - CAPath *string - TLS *bool - ConnIDs *[]string - RPCConnTimeout *time.Duration - RPCReplyTimeout *time.Duration - RPCAPIOpts map[string]any + CSVFieldSeparator *string + Els *ElsOpts + SQL *SQLOpts + AMQP *AMQPOpts + AWS *AWSOpts + NATS *NATSOpts + RPC *RPCOpts + Kafka *KafkaOpts } // EventExporterCfg the config for a Event Exporter @@ -257,6 +281,251 @@ func NewEventExporterCfg(ID, exportType, exportPath, failedPostsDir string, atte Opts: opts, } } +func (elsOpts *ElsOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { + if jsnCfg.ElsCloud != nil { + elsOpts.Cloud = jsnCfg.ElsCloud + } + if jsnCfg.ElsAPIKey != nil { + elsOpts.APIKey = jsnCfg.ElsAPIKey + } + if jsnCfg.ElsServiceToken != nil { + elsOpts.ServiceToken = jsnCfg.ElsServiceToken + } + if jsnCfg.ElsCertificateFingerprint != nil { + elsOpts.CertificateFingerprint = jsnCfg.ElsCertificateFingerprint + } + if jsnCfg.ElsEnableDebugLogger != nil { + elsOpts.EnableDebugLogger = jsnCfg.ElsEnableDebugLogger + } + if jsnCfg.ElsLogger != nil { + elsOpts.Logger = jsnCfg.ElsLogger + } + if jsnCfg.ElsCompressRequestBody != nil { + elsOpts.CompressRequestBody = jsnCfg.ElsCompressRequestBody + } + if jsnCfg.ElsCompressRequestBodyLevel != nil { + elsOpts.CompressRequestBodyLevel = jsnCfg.ElsCompressRequestBodyLevel + } + if jsnCfg.ElsUsername != nil { + elsOpts.Username = jsnCfg.ElsUsername + } + if jsnCfg.ElsPassword != nil { + elsOpts.Password = jsnCfg.ElsPassword + } + if jsnCfg.ElsDiscoverNodesOnStart != nil { + elsOpts.DiscoverNodesOnStart = jsnCfg.ElsDiscoverNodesOnStart + } + if jsnCfg.ElsDiscoverNodesInterval != nil { + var nodesInterval time.Duration + if nodesInterval, err = utils.ParseDurationWithSecs(*jsnCfg.ElsDiscoverNodesInterval); err != nil { + return + } + elsOpts.DiscoverNodeInterval = utils.DurationPointer(nodesInterval) + } + if jsnCfg.ElsRetryOnStatus != nil { + elsOpts.RetryOnStatus = jsnCfg.ElsRetryOnStatus + } + if jsnCfg.ElsMaxRetries != nil { + elsOpts.MaxRetries = jsnCfg.ElsMaxRetries + } + if jsnCfg.ElsDisableRetry != nil { + elsOpts.DisableRetry = jsnCfg.ElsDisableRetry + } + if jsnCfg.ElsIndex != nil { + elsOpts.Index = jsnCfg.ElsIndex + } + if jsnCfg.ElsIfPrimaryTerm != nil { + elsOpts.IfPrimaryTerm = jsnCfg.ElsIfPrimaryTerm + } + if jsnCfg.ElsIfSeqNo != nil { + elsOpts.IfSeqNo = jsnCfg.ElsIfSeqNo + } + if jsnCfg.ElsOpType != nil { + elsOpts.OpType = jsnCfg.ElsOpType + } + if jsnCfg.ElsPipeline != nil { + elsOpts.Pipeline = jsnCfg.ElsPipeline + } + if jsnCfg.ElsRouting != nil { + elsOpts.Routing = jsnCfg.ElsRouting + } + if jsnCfg.ElsTimeout != nil { + var elsTimeout time.Duration + if elsTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.ElsTimeout); err != nil { + return + } + elsOpts.Timeout = utils.DurationPointer(elsTimeout) + } + if jsnCfg.ElsVersion != nil { + elsOpts.Version = jsnCfg.ElsVersion + } + if jsnCfg.ElsVersionType != nil { + elsOpts.VersionType = jsnCfg.ElsVersionType + } + if jsnCfg.ElsWaitForActiveShards != nil { + elsOpts.WaitForActiveShards = jsnCfg.ElsWaitForActiveShards + } + return +} + +func (kafkaOpts *KafkaOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { + if jsnCfg.KafkaTopic != nil { + kafkaOpts.KafkaTopic = jsnCfg.KafkaTopic + } + return +} + +func (sqlOpts *SQLOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { + if jsnCfg.SQLMaxIdleConns != nil { + sqlOpts.MaxIdleConns = jsnCfg.SQLMaxIdleConns + } + if jsnCfg.SQLMaxOpenConns != nil { + sqlOpts.MaxOpenConns = jsnCfg.SQLMaxOpenConns + } + if jsnCfg.SQLConnMaxLifetime != nil { + var sqlConnMaxLifetime time.Duration + if sqlConnMaxLifetime, err = utils.ParseDurationWithNanosecs(*jsnCfg.SQLConnMaxLifetime); err != nil { + return + } + sqlOpts.ConnMaxLifetime = utils.DurationPointer(sqlConnMaxLifetime) + } + if jsnCfg.MYSQLDSNParams != nil { + sqlOpts.MYSQLDSNParams = make(map[string]string) + sqlOpts.MYSQLDSNParams = jsnCfg.MYSQLDSNParams + } + if jsnCfg.SQLTableName != nil { + sqlOpts.TableName = jsnCfg.SQLTableName + } + if jsnCfg.SQLDBName != nil { + sqlOpts.DBName = jsnCfg.SQLDBName + } + if jsnCfg.PgSSLMode != nil { + sqlOpts.PgSSLMode = jsnCfg.PgSSLMode + } + return +} + +func (amqpOpts *AMQPOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { + + if jsnCfg.AMQPQueueID != nil { + amqpOpts.QueueID = jsnCfg.AMQPQueueID + } + if jsnCfg.AMQPRoutingKey != nil { + amqpOpts.RoutingKey = jsnCfg.AMQPRoutingKey + } + if jsnCfg.AMQPExchange != nil { + amqpOpts.Exchange = jsnCfg.AMQPExchange + } + if jsnCfg.AMQPExchangeType != nil { + amqpOpts.ExchangeType = jsnCfg.AMQPExchangeType + } + if jsnCfg.AMQPUsername != nil { + amqpOpts.Username = jsnCfg.AMQPUsername + } + if jsnCfg.AMQPPassword != nil { + amqpOpts.Password = jsnCfg.AMQPPassword + } + return +} + +func (awsOpts *AWSOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { + if jsnCfg.AWSRegion != nil { + awsOpts.Region = jsnCfg.AWSRegion + } + if jsnCfg.AWSKey != nil { + awsOpts.Key = jsnCfg.AWSKey + } + if jsnCfg.AWSSecret != nil { + awsOpts.Secret = jsnCfg.AWSSecret + } + if jsnCfg.AWSToken != nil { + awsOpts.Token = jsnCfg.AWSToken + } + if jsnCfg.SQSQueueID != nil { + awsOpts.SQSQueueID = jsnCfg.SQSQueueID + } + if jsnCfg.S3BucketID != nil { + awsOpts.S3BucketID = jsnCfg.S3BucketID + } + if jsnCfg.S3FolderPath != nil { + awsOpts.S3FolderPath = jsnCfg.S3FolderPath + } + return +} +func (natsOpts *NATSOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { + if jsnCfg.NATSJetStream != nil { + natsOpts.JetStream = jsnCfg.NATSJetStream + } + if jsnCfg.NATSSubject != nil { + natsOpts.Subject = jsnCfg.NATSSubject + } + if jsnCfg.NATSJWTFile != nil { + natsOpts.JWTFile = jsnCfg.NATSJWTFile + } + if jsnCfg.NATSSeedFile != nil { + natsOpts.SeedFile = jsnCfg.NATSSeedFile + } + if jsnCfg.NATSCertificateAuthority != nil { + natsOpts.CertificateAuthority = jsnCfg.NATSCertificateAuthority + } + if jsnCfg.NATSClientCertificate != nil { + natsOpts.ClientCertificate = jsnCfg.NATSClientCertificate + } + if jsnCfg.NATSClientKey != nil { + natsOpts.ClientKey = jsnCfg.NATSClientKey + } + if jsnCfg.NATSJetStreamMaxWait != nil { + var natsJetStreamMaxWait time.Duration + if natsJetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil { + return + } + natsOpts.JetStreamMaxWait = utils.DurationPointer(natsJetStreamMaxWait) + } + return +} +func (rpcOpts *RPCOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { + if jsnCfg.RPCCodec != nil { + rpcOpts.RPCCodec = jsnCfg.RPCCodec + } + if jsnCfg.ServiceMethod != nil { + rpcOpts.ServiceMethod = jsnCfg.ServiceMethod + } + if jsnCfg.KeyPath != nil { + rpcOpts.KeyPath = jsnCfg.KeyPath + } + if jsnCfg.CertPath != nil { + rpcOpts.CertPath = jsnCfg.CertPath + } + if jsnCfg.CAPath != nil { + rpcOpts.CAPath = jsnCfg.CAPath + } + if jsnCfg.TLS != nil { + rpcOpts.TLS = jsnCfg.TLS + } + if jsnCfg.ConnIDs != nil { + rpcOpts.ConnIDs = jsnCfg.ConnIDs + } + if jsnCfg.RPCConnTimeout != nil { + var rpcConnTimeout time.Duration + if rpcConnTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.RPCConnTimeout); err != nil { + return + } + rpcOpts.RPCConnTimeout = utils.DurationPointer(rpcConnTimeout) + } + if jsnCfg.RPCReplyTimeout != nil { + var rpcReplyTimeout time.Duration + if rpcReplyTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.RPCReplyTimeout); err != nil { + return + } + rpcOpts.RPCReplyTimeout = utils.DurationPointer(rpcReplyTimeout) + } + if jsnCfg.RPCAPIOpts != nil { + rpcOpts.RPCAPIOpts = make(map[string]any) + rpcOpts.RPCAPIOpts = jsnCfg.RPCAPIOpts + } + + return +} func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg == nil { @@ -265,224 +534,28 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) if jsnCfg.CSVFieldSeparator != nil { eeOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator } - if jsnCfg.ElsCloud != nil { - eeOpts.ElsCloud = jsnCfg.ElsCloud + if err = eeOpts.Els.loadFromJSONCfg(jsnCfg); err != nil { + return } - if jsnCfg.ElsAPIKey != nil { - eeOpts.ElsAPIKey = jsnCfg.ElsAPIKey + if err = eeOpts.Kafka.loadFromJSONCfg(jsnCfg); err != nil { + return } - if jsnCfg.ElsServiceToken != nil { - eeOpts.ElsServiceToken = jsnCfg.ElsServiceToken + if err = eeOpts.SQL.loadFromJSONCfg(jsnCfg); err != nil { + return } - if jsnCfg.ElsCertificateFingerprint != nil { - eeOpts.ElsCertificateFingerprint = jsnCfg.ElsCertificateFingerprint + if err = eeOpts.AMQP.loadFromJSONCfg(jsnCfg); err != nil { + return } - if jsnCfg.ElsEnableDebugLogger != nil { - eeOpts.ElsEnableDebugLogger = jsnCfg.ElsEnableDebugLogger + if err = eeOpts.AWS.loadFromJSONCfg(jsnCfg); err != nil { + return } - if jsnCfg.ElsLogger != nil { - eeOpts.ElsLogger = jsnCfg.ElsLogger + if err = eeOpts.NATS.loadFromJSONCfg(jsnCfg); err != nil { + return } - if jsnCfg.ElsCompressRequestBody != nil { - eeOpts.ElsCompressRequestBody = jsnCfg.ElsCompressRequestBody - } - if jsnCfg.ElsCompressRequestBodyLevel != nil { - eeOpts.ElsCompressRequestBodyLevel = jsnCfg.ElsCompressRequestBodyLevel - } - if jsnCfg.ElsUsername != nil { - eeOpts.ElsUsername = jsnCfg.ElsUsername - } - if jsnCfg.ElsPassword != nil { - eeOpts.ElsPassword = jsnCfg.ElsPassword - } - if jsnCfg.ElsDiscoverNodesOnStart != nil { - eeOpts.ElsDiscoverNodesOnStart = jsnCfg.ElsDiscoverNodesOnStart - } - if jsnCfg.ElsDiscoverNodesInterval != nil { - var nodesInterval time.Duration - if nodesInterval, err = utils.ParseDurationWithSecs(*jsnCfg.ElsDiscoverNodesInterval); err != nil { - return - } - eeOpts.ElsDiscoverNodeInterval = utils.DurationPointer(nodesInterval) - } - if jsnCfg.ElsRetryOnStatus != nil { - eeOpts.ElsRetryOnStatus = jsnCfg.ElsRetryOnStatus - } - if jsnCfg.ElsMaxRetries != nil { - eeOpts.ElsMaxRetries = jsnCfg.ElsMaxRetries - } - if jsnCfg.ElsDisableRetry != nil { - eeOpts.ElsDisableRetry = jsnCfg.ElsDisableRetry - } - if jsnCfg.ElsIndex != nil { - eeOpts.ElsIndex = jsnCfg.ElsIndex - } - if jsnCfg.ElsIfPrimaryTerm != nil { - eeOpts.ElsIfPrimaryTerm = jsnCfg.ElsIfPrimaryTerm - } - if jsnCfg.ElsIfSeqNo != nil { - eeOpts.ElsIfSeqNo = jsnCfg.ElsIfSeqNo - } - if jsnCfg.ElsOpType != nil { - eeOpts.ElsOpType = jsnCfg.ElsOpType - } - if jsnCfg.ElsPipeline != nil { - eeOpts.ElsPipeline = jsnCfg.ElsPipeline - } - if jsnCfg.ElsRouting != nil { - eeOpts.ElsRouting = jsnCfg.ElsRouting - } - if jsnCfg.ElsTimeout != nil { - var elsTimeout time.Duration - if elsTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.ElsTimeout); err != nil { - return - } - eeOpts.ElsTimeout = utils.DurationPointer(elsTimeout) - } - if jsnCfg.ElsVersion != nil { - eeOpts.ElsVersion = jsnCfg.ElsVersion - } - if jsnCfg.ElsVersionType != nil { - eeOpts.ElsVersionType = jsnCfg.ElsVersionType - } - if jsnCfg.ElsWaitForActiveShards != nil { - eeOpts.ElsWaitForActiveShards = jsnCfg.ElsWaitForActiveShards - } - if jsnCfg.SQLMaxIdleConns != nil { - eeOpts.SQLMaxIdleConns = jsnCfg.SQLMaxIdleConns - } - if jsnCfg.SQLMaxOpenConns != nil { - eeOpts.SQLMaxOpenConns = jsnCfg.SQLMaxOpenConns - } - if jsnCfg.SQLConnMaxLifetime != nil { - var sqlConnMaxLifetime time.Duration - if sqlConnMaxLifetime, err = utils.ParseDurationWithNanosecs(*jsnCfg.SQLConnMaxLifetime); err != nil { - return - } - eeOpts.SQLConnMaxLifetime = utils.DurationPointer(sqlConnMaxLifetime) - } - if jsnCfg.MYSQLDSNParams != nil { - eeOpts.MYSQLDSNParams = make(map[string]string) - eeOpts.MYSQLDSNParams = jsnCfg.MYSQLDSNParams - } - if jsnCfg.SQLTableName != nil { - eeOpts.SQLTableName = jsnCfg.SQLTableName - } - if jsnCfg.SQLDBName != nil { - eeOpts.SQLDBName = jsnCfg.SQLDBName - } - if jsnCfg.PgSSLMode != nil { - eeOpts.PgSSLMode = jsnCfg.PgSSLMode - } - if jsnCfg.KafkaTopic != nil { - eeOpts.KafkaTopic = jsnCfg.KafkaTopic - } - if jsnCfg.AMQPQueueID != nil { - eeOpts.AMQPQueueID = jsnCfg.AMQPQueueID - } - if jsnCfg.AMQPRoutingKey != nil { - eeOpts.AMQPRoutingKey = jsnCfg.AMQPRoutingKey - } - if jsnCfg.AMQPExchange != nil { - eeOpts.AMQPExchange = jsnCfg.AMQPExchange - } - if jsnCfg.AMQPExchangeType != nil { - eeOpts.AMQPExchangeType = jsnCfg.AMQPExchangeType - } - if jsnCfg.AMQPUsername != nil { - eeOpts.AMQPUsername = jsnCfg.AMQPUsername - } - if jsnCfg.AMQPPassword != nil { - eeOpts.AMQPPassword = jsnCfg.AMQPPassword - } - if jsnCfg.AWSRegion != nil { - eeOpts.AWSRegion = jsnCfg.AWSRegion - } - if jsnCfg.AWSKey != nil { - eeOpts.AWSKey = jsnCfg.AWSKey - } - if jsnCfg.AWSSecret != nil { - eeOpts.AWSSecret = jsnCfg.AWSSecret - } - if jsnCfg.AWSToken != nil { - eeOpts.AWSToken = jsnCfg.AWSToken - } - if jsnCfg.SQSQueueID != nil { - eeOpts.SQSQueueID = jsnCfg.SQSQueueID - } - if jsnCfg.S3BucketID != nil { - eeOpts.S3BucketID = jsnCfg.S3BucketID - } - if jsnCfg.S3FolderPath != nil { - eeOpts.S3FolderPath = jsnCfg.S3FolderPath - } - if jsnCfg.NATSJetStream != nil { - eeOpts.NATSJetStream = jsnCfg.NATSJetStream - } - if jsnCfg.NATSSubject != nil { - eeOpts.NATSSubject = jsnCfg.NATSSubject - } - if jsnCfg.NATSJWTFile != nil { - eeOpts.NATSJWTFile = jsnCfg.NATSJWTFile - } - if jsnCfg.NATSSeedFile != nil { - eeOpts.NATSSeedFile = jsnCfg.NATSSeedFile - } - if jsnCfg.NATSCertificateAuthority != nil { - eeOpts.NATSCertificateAuthority = jsnCfg.NATSCertificateAuthority - } - if jsnCfg.NATSClientCertificate != nil { - eeOpts.NATSClientCertificate = jsnCfg.NATSClientCertificate - } - if jsnCfg.NATSClientKey != nil { - eeOpts.NATSClientKey = jsnCfg.NATSClientKey - } - if jsnCfg.NATSJetStreamMaxWait != nil { - var natsJetStreamMaxWait time.Duration - if natsJetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil { - return - } - eeOpts.NATSJetStreamMaxWait = utils.DurationPointer(natsJetStreamMaxWait) - } - if jsnCfg.RPCCodec != nil { - eeOpts.RPCCodec = jsnCfg.RPCCodec - } - if jsnCfg.ServiceMethod != nil { - eeOpts.ServiceMethod = jsnCfg.ServiceMethod - } - if jsnCfg.KeyPath != nil { - eeOpts.KeyPath = jsnCfg.KeyPath - } - if jsnCfg.CertPath != nil { - eeOpts.CertPath = jsnCfg.CertPath - } - if jsnCfg.CAPath != nil { - eeOpts.CAPath = jsnCfg.CAPath - } - if jsnCfg.TLS != nil { - eeOpts.TLS = jsnCfg.TLS - } - if jsnCfg.ConnIDs != nil { - eeOpts.ConnIDs = jsnCfg.ConnIDs - } - if jsnCfg.RPCConnTimeout != nil { - var rpcConnTimeout time.Duration - if rpcConnTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.RPCConnTimeout); err != nil { - return - } - eeOpts.RPCConnTimeout = utils.DurationPointer(rpcConnTimeout) - } - if jsnCfg.RPCReplyTimeout != nil { - var rpcReplyTimeout time.Duration - if rpcReplyTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.RPCReplyTimeout); err != nil { - return - } - eeOpts.RPCReplyTimeout = utils.DurationPointer(rpcReplyTimeout) - } - if jsnCfg.RPCAPIOpts != nil { - eeOpts.RPCAPIOpts = make(map[string]any) - eeOpts.RPCAPIOpts = jsnCfg.RPCAPIOpts + if err = eeOpts.RPC.loadFromJSONCfg(jsnCfg); err != nil { + return } + return } @@ -583,207 +656,264 @@ func (eeC *EventExporterCfg) TrailerFields() []*FCTemplate { return eeC.trailerFields } +func (elsOpts *ElsOpts) Clone() *ElsOpts { + cln := &ElsOpts{} + if elsOpts.Index != nil { + cln.Index = new(string) + *cln.Index = *elsOpts.Index + } + if elsOpts.IfPrimaryTerm != nil { + cln.IfPrimaryTerm = new(int) + *cln.IfPrimaryTerm = *elsOpts.IfPrimaryTerm + } + if elsOpts.IfSeqNo != nil { + cln.IfSeqNo = new(int) + *cln.IfSeqNo = *elsOpts.IfSeqNo + } + if elsOpts.OpType != nil { + cln.OpType = new(string) + *cln.OpType = *elsOpts.OpType + } + if elsOpts.Pipeline != nil { + cln.Pipeline = new(string) + *cln.Pipeline = *elsOpts.Pipeline + } + if elsOpts.Routing != nil { + cln.Routing = new(string) + *cln.Routing = *elsOpts.Routing + } + if elsOpts.Timeout != nil { + cln.Timeout = new(time.Duration) + *cln.Timeout = *elsOpts.Timeout + } + if elsOpts.Version != nil { + cln.Version = new(int) + *cln.Version = *elsOpts.Version + } + if elsOpts.VersionType != nil { + cln.VersionType = new(string) + *cln.VersionType = *elsOpts.VersionType + } + if elsOpts.WaitForActiveShards != nil { + cln.WaitForActiveShards = new(string) + *cln.WaitForActiveShards = *elsOpts.WaitForActiveShards + } + return cln +} + +func (kafkaOpts *KafkaOpts) Clone() *KafkaOpts { + cln := &KafkaOpts{} + + if kafkaOpts.KafkaTopic != nil { + cln.KafkaTopic = new(string) + *cln.KafkaTopic = *kafkaOpts.KafkaTopic + } + return cln +} + +func (sqlOpts *SQLOpts) Clone() *SQLOpts { + cln := &SQLOpts{} + if sqlOpts.MaxIdleConns != nil { + cln.MaxIdleConns = new(int) + *cln.MaxIdleConns = *sqlOpts.MaxIdleConns + } + if sqlOpts.MaxOpenConns != nil { + cln.MaxOpenConns = new(int) + *cln.MaxOpenConns = *sqlOpts.MaxOpenConns + } + if sqlOpts.ConnMaxLifetime != nil { + cln.ConnMaxLifetime = new(time.Duration) + *cln.ConnMaxLifetime = *sqlOpts.ConnMaxLifetime + } + if sqlOpts.MYSQLDSNParams != nil { + cln.MYSQLDSNParams = make(map[string]string) + cln.MYSQLDSNParams = sqlOpts.MYSQLDSNParams + } + if sqlOpts.TableName != nil { + cln.TableName = new(string) + *cln.TableName = *sqlOpts.TableName + } + if sqlOpts.DBName != nil { + cln.DBName = new(string) + *cln.DBName = *sqlOpts.DBName + } + if sqlOpts.PgSSLMode != nil { + cln.PgSSLMode = new(string) + *cln.PgSSLMode = *sqlOpts.PgSSLMode + } + return cln +} + +func (amqpOpts *AMQPOpts) Clone() *AMQPOpts { + cln := &AMQPOpts{} + if amqpOpts.QueueID != nil { + cln.QueueID = new(string) + *cln.QueueID = *amqpOpts.QueueID + } + if amqpOpts.RoutingKey != nil { + cln.RoutingKey = new(string) + *cln.RoutingKey = *amqpOpts.RoutingKey + } + if amqpOpts.Exchange != nil { + cln.Exchange = new(string) + *cln.Exchange = *amqpOpts.Exchange + } + if amqpOpts.ExchangeType != nil { + cln.ExchangeType = new(string) + *cln.ExchangeType = *amqpOpts.ExchangeType + } + if amqpOpts.Username != nil { + cln.Username = new(string) + *cln.Username = *amqpOpts.Username + } + if amqpOpts.Password != nil { + cln.Password = new(string) + *cln.Password = *amqpOpts.Password + } + return cln +} + +func (awsOpts *AWSOpts) Clone() *AWSOpts { + cln := &AWSOpts{} + if awsOpts.Region != nil { + cln.Region = new(string) + *cln.Region = *awsOpts.Region + } + if awsOpts.Key != nil { + cln.Key = new(string) + *cln.Key = *awsOpts.Key + } + if awsOpts.Secret != nil { + cln.Secret = new(string) + *cln.Secret = *awsOpts.Secret + } + if awsOpts.Token != nil { + cln.Token = new(string) + *cln.Token = *awsOpts.Token + } + if awsOpts.SQSQueueID != nil { + cln.SQSQueueID = new(string) + *cln.SQSQueueID = *awsOpts.SQSQueueID + } + if awsOpts.S3BucketID != nil { + cln.S3BucketID = new(string) + *cln.S3BucketID = *awsOpts.S3BucketID + } + if awsOpts.S3FolderPath != nil { + cln.S3FolderPath = new(string) + *cln.S3FolderPath = *awsOpts.S3FolderPath + } + return cln +} + +func (natsOpts *NATSOpts) Clone() *NATSOpts { + cln := &NATSOpts{} + if natsOpts.JetStream != nil { + cln.JetStream = new(bool) + *cln.JetStream = *natsOpts.JetStream + } + if natsOpts.Subject != nil { + cln.Subject = new(string) + *cln.Subject = *natsOpts.Subject + } + if natsOpts.JWTFile != nil { + cln.JWTFile = new(string) + *cln.JWTFile = *natsOpts.JWTFile + } + if natsOpts.SeedFile != nil { + cln.SeedFile = new(string) + *cln.SeedFile = *natsOpts.SeedFile + } + if natsOpts.CertificateAuthority != nil { + cln.CertificateAuthority = new(string) + *cln.CertificateAuthority = *natsOpts.CertificateAuthority + } + if natsOpts.ClientCertificate != nil { + cln.ClientCertificate = new(string) + *cln.ClientCertificate = *natsOpts.ClientCertificate + } + if natsOpts.ClientKey != nil { + cln.ClientKey = new(string) + *cln.ClientKey = *natsOpts.ClientKey + } + if natsOpts.JetStreamMaxWait != nil { + cln.JetStreamMaxWait = new(time.Duration) + *cln.JetStreamMaxWait = *natsOpts.JetStreamMaxWait + } + return cln +} + +func (rpcOpts *RPCOpts) Clone() *RPCOpts { + cln := &RPCOpts{} + if rpcOpts.RPCCodec != nil { + cln.RPCCodec = new(string) + *cln.RPCCodec = *rpcOpts.RPCCodec + } + if rpcOpts.ServiceMethod != nil { + cln.ServiceMethod = new(string) + *cln.ServiceMethod = *rpcOpts.ServiceMethod + } + if rpcOpts.KeyPath != nil { + cln.KeyPath = new(string) + *cln.KeyPath = *rpcOpts.KeyPath + } + if rpcOpts.CertPath != nil { + cln.CertPath = new(string) + *cln.CertPath = *rpcOpts.CertPath + } + if rpcOpts.CAPath != nil { + cln.CAPath = new(string) + *cln.CAPath = *rpcOpts.CAPath + } + if rpcOpts.TLS != nil { + cln.TLS = new(bool) + *cln.TLS = *rpcOpts.TLS + } + if rpcOpts.ConnIDs != nil { + cln.ConnIDs = new([]string) + *cln.ConnIDs = *rpcOpts.ConnIDs + } + if rpcOpts.RPCConnTimeout != nil { + cln.RPCConnTimeout = new(time.Duration) + *cln.RPCConnTimeout = *rpcOpts.RPCConnTimeout + } + if rpcOpts.RPCReplyTimeout != nil { + cln.RPCReplyTimeout = new(time.Duration) + *cln.RPCReplyTimeout = *rpcOpts.RPCReplyTimeout + } + if rpcOpts.RPCAPIOpts != nil { + cln.RPCAPIOpts = make(map[string]any) + cln.RPCAPIOpts = rpcOpts.RPCAPIOpts + } + + return cln +} func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts { cln := &EventExporterOpts{} if eeOpts.CSVFieldSeparator != nil { cln.CSVFieldSeparator = new(string) *cln.CSVFieldSeparator = *eeOpts.CSVFieldSeparator } - if eeOpts.ElsIndex != nil { - cln.ElsIndex = new(string) - *cln.ElsIndex = *eeOpts.ElsIndex + if eeOpts.Els != nil { + cln.Els = eeOpts.Els.Clone() } - if eeOpts.ElsIfPrimaryTerm != nil { - cln.ElsIfPrimaryTerm = new(int) - *cln.ElsIfPrimaryTerm = *eeOpts.ElsIfPrimaryTerm + if eeOpts.SQL != nil { + cln.SQL = eeOpts.SQL.Clone() } - if eeOpts.ElsIfSeqNo != nil { - cln.ElsIfSeqNo = new(int) - *cln.ElsIfSeqNo = *eeOpts.ElsIfSeqNo + if eeOpts.Kafka != nil { + cln.Kafka = eeOpts.Kafka.Clone() } - if eeOpts.ElsOpType != nil { - cln.ElsOpType = new(string) - *cln.ElsOpType = *eeOpts.ElsOpType + if eeOpts.AMQP != nil { + cln.AMQP = eeOpts.AMQP.Clone() } - if eeOpts.ElsPipeline != nil { - cln.ElsPipeline = new(string) - *cln.ElsPipeline = *eeOpts.ElsPipeline + if eeOpts.AWS != nil { + cln.AWS = eeOpts.AWS.Clone() } - if eeOpts.ElsRouting != nil { - cln.ElsRouting = new(string) - *cln.ElsRouting = *eeOpts.ElsRouting + if eeOpts.NATS != nil { + cln.NATS = eeOpts.NATS.Clone() } - if eeOpts.ElsTimeout != nil { - cln.ElsTimeout = new(time.Duration) - *cln.ElsTimeout = *eeOpts.ElsTimeout - } - if eeOpts.ElsVersion != nil { - cln.ElsVersion = new(int) - *cln.ElsVersion = *eeOpts.ElsVersion - } - if eeOpts.ElsVersionType != nil { - cln.ElsVersionType = new(string) - *cln.ElsVersionType = *eeOpts.ElsVersionType - } - if eeOpts.ElsWaitForActiveShards != nil { - cln.ElsWaitForActiveShards = new(string) - *cln.ElsWaitForActiveShards = *eeOpts.ElsWaitForActiveShards - } - if eeOpts.SQLMaxIdleConns != nil { - cln.SQLMaxIdleConns = new(int) - *cln.SQLMaxIdleConns = *eeOpts.SQLMaxIdleConns - } - if eeOpts.SQLMaxOpenConns != nil { - cln.SQLMaxOpenConns = new(int) - *cln.SQLMaxOpenConns = *eeOpts.SQLMaxOpenConns - } - if eeOpts.SQLConnMaxLifetime != nil { - cln.SQLConnMaxLifetime = new(time.Duration) - *cln.SQLConnMaxLifetime = *eeOpts.SQLConnMaxLifetime - } - if eeOpts.MYSQLDSNParams != nil { - cln.MYSQLDSNParams = make(map[string]string) - cln.MYSQLDSNParams = eeOpts.MYSQLDSNParams - } - if eeOpts.SQLTableName != nil { - cln.SQLTableName = new(string) - *cln.SQLTableName = *eeOpts.SQLTableName - } - if eeOpts.SQLDBName != nil { - cln.SQLDBName = new(string) - *cln.SQLDBName = *eeOpts.SQLDBName - } - if eeOpts.PgSSLMode != nil { - cln.PgSSLMode = new(string) - *cln.PgSSLMode = *eeOpts.PgSSLMode - } - if eeOpts.KafkaTopic != nil { - cln.KafkaTopic = new(string) - *cln.KafkaTopic = *eeOpts.KafkaTopic - } - if eeOpts.AMQPQueueID != nil { - cln.AMQPQueueID = new(string) - *cln.AMQPQueueID = *eeOpts.AMQPQueueID - } - if eeOpts.AMQPRoutingKey != nil { - cln.AMQPRoutingKey = new(string) - *cln.AMQPRoutingKey = *eeOpts.AMQPRoutingKey - } - if eeOpts.AMQPExchange != nil { - cln.AMQPExchange = new(string) - *cln.AMQPExchange = *eeOpts.AMQPExchange - } - if eeOpts.AMQPExchangeType != nil { - cln.AMQPExchangeType = new(string) - *cln.AMQPExchangeType = *eeOpts.AMQPExchangeType - } - if eeOpts.AMQPUsername != nil { - cln.AMQPUsername = new(string) - *cln.AMQPUsername = *eeOpts.AMQPUsername - } - if eeOpts.AMQPPassword != nil { - cln.AMQPPassword = new(string) - *cln.AMQPPassword = *eeOpts.AMQPPassword - } - if eeOpts.AWSRegion != nil { - cln.AWSRegion = new(string) - *cln.AWSRegion = *eeOpts.AWSRegion - } - if eeOpts.AWSKey != nil { - cln.AWSKey = new(string) - *cln.AWSKey = *eeOpts.AWSKey - } - if eeOpts.AWSSecret != nil { - cln.AWSSecret = new(string) - *cln.AWSSecret = *eeOpts.AWSSecret - } - if eeOpts.AWSToken != nil { - cln.AWSToken = new(string) - *cln.AWSToken = *eeOpts.AWSToken - } - if eeOpts.SQSQueueID != nil { - cln.SQSQueueID = new(string) - *cln.SQSQueueID = *eeOpts.SQSQueueID - } - if eeOpts.S3BucketID != nil { - cln.S3BucketID = new(string) - *cln.S3BucketID = *eeOpts.S3BucketID - } - if eeOpts.S3FolderPath != nil { - cln.S3FolderPath = new(string) - *cln.S3FolderPath = *eeOpts.S3FolderPath - } - if eeOpts.NATSJetStream != nil { - cln.NATSJetStream = new(bool) - *cln.NATSJetStream = *eeOpts.NATSJetStream - } - if eeOpts.NATSSubject != nil { - cln.NATSSubject = new(string) - *cln.NATSSubject = *eeOpts.NATSSubject - } - if eeOpts.NATSJWTFile != nil { - cln.NATSJWTFile = new(string) - *cln.NATSJWTFile = *eeOpts.NATSJWTFile - } - if eeOpts.NATSSeedFile != nil { - cln.NATSSeedFile = new(string) - *cln.NATSSeedFile = *eeOpts.NATSSeedFile - } - if eeOpts.NATSCertificateAuthority != nil { - cln.NATSCertificateAuthority = new(string) - *cln.NATSCertificateAuthority = *eeOpts.NATSCertificateAuthority - } - if eeOpts.NATSClientCertificate != nil { - cln.NATSClientCertificate = new(string) - *cln.NATSClientCertificate = *eeOpts.NATSClientCertificate - } - if eeOpts.NATSClientKey != nil { - cln.NATSClientKey = new(string) - *cln.NATSClientKey = *eeOpts.NATSClientKey - } - if eeOpts.NATSJetStreamMaxWait != nil { - cln.NATSJetStreamMaxWait = new(time.Duration) - *cln.NATSJetStreamMaxWait = *eeOpts.NATSJetStreamMaxWait - } - if eeOpts.RPCCodec != nil { - cln.RPCCodec = new(string) - *cln.RPCCodec = *eeOpts.RPCCodec - } - if eeOpts.ServiceMethod != nil { - cln.ServiceMethod = new(string) - *cln.ServiceMethod = *eeOpts.ServiceMethod - } - if eeOpts.KeyPath != nil { - cln.KeyPath = new(string) - *cln.KeyPath = *eeOpts.KeyPath - } - if eeOpts.CertPath != nil { - cln.CertPath = new(string) - *cln.CertPath = *eeOpts.CertPath - } - if eeOpts.CAPath != nil { - cln.CAPath = new(string) - *cln.CAPath = *eeOpts.CAPath - } - if eeOpts.TLS != nil { - cln.TLS = new(bool) - *cln.TLS = *eeOpts.TLS - } - if eeOpts.ConnIDs != nil { - cln.ConnIDs = new([]string) - *cln.ConnIDs = *eeOpts.ConnIDs - } - if eeOpts.RPCConnTimeout != nil { - cln.RPCConnTimeout = new(time.Duration) - *cln.RPCConnTimeout = *eeOpts.RPCConnTimeout - } - if eeOpts.RPCReplyTimeout != nil { - cln.RPCReplyTimeout = new(time.Duration) - *cln.RPCReplyTimeout = *eeOpts.RPCReplyTimeout - } - if eeOpts.RPCAPIOpts != nil { - cln.RPCAPIOpts = make(map[string]any) - cln.RPCAPIOpts = eeOpts.RPCAPIOpts + if eeOpts.RPC != nil { + cln.RPC = eeOpts.RPC.Clone() } return cln } @@ -842,152 +972,166 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str if eeC.Opts.CSVFieldSeparator != nil { opts[utils.CSVFieldSepOpt] = *eeC.Opts.CSVFieldSeparator } - if eeC.Opts.ElsIndex != nil { - opts[utils.ElsIndex] = *eeC.Opts.ElsIndex + if elsOpts := eeC.Opts.Els; elsOpts != nil { + if elsOpts.Index != nil { + opts[utils.ElsIndex] = *elsOpts.Index + } + if elsOpts.IfPrimaryTerm != nil { + opts[utils.ElsIfPrimaryTerm] = *elsOpts.IfPrimaryTerm + } + if elsOpts.IfSeqNo != nil { + opts[utils.ElsIfSeqNo] = *elsOpts.IfSeqNo + } + if elsOpts.OpType != nil { + opts[utils.ElsOpType] = *elsOpts.OpType + } + if elsOpts.Pipeline != nil { + opts[utils.ElsPipeline] = *elsOpts.OpType + } + if elsOpts.Routing != nil { + opts[utils.ElsRouting] = *elsOpts.Routing + } + if elsOpts.Timeout != nil { + opts[utils.ElsTimeout] = *elsOpts.Timeout + } + if elsOpts.Version != nil { + opts[utils.ElsVersionLow] = *elsOpts.VersionType + } + if elsOpts.VersionType != nil { + opts[utils.ElsVersionType] = *elsOpts.VersionType + } + if elsOpts.WaitForActiveShards != nil { + opts[utils.ElsWaitForActiveShards] = *elsOpts.WaitForActiveShards + } } - if eeC.Opts.ElsIfPrimaryTerm != nil { - opts[utils.ElsIfPrimaryTerm] = *eeC.Opts.ElsIfPrimaryTerm + if sqlOpts := eeC.Opts.SQL; sqlOpts != nil { + if sqlOpts.MaxIdleConns != nil { + opts[utils.SQLMaxIdleConnsCfg] = *sqlOpts.MaxIdleConns + } + if sqlOpts.MaxOpenConns != nil { + opts[utils.SQLMaxOpenConns] = *sqlOpts.MaxOpenConns + } + if sqlOpts.ConnMaxLifetime != nil { + opts[utils.SQLConnMaxLifetime] = *sqlOpts.ConnMaxLifetime + } + if sqlOpts.MYSQLDSNParams != nil { + opts[utils.MYSQLDSNParams] = sqlOpts.MYSQLDSNParams + } + if sqlOpts.TableName != nil { + opts[utils.SQLTableNameOpt] = *sqlOpts.TableName + } + if sqlOpts.DBName != nil { + opts[utils.SQLDBNameOpt] = *sqlOpts.DBName + } + if sqlOpts.PgSSLMode != nil { + opts[utils.PgSSLModeCfg] = *sqlOpts.PgSSLMode + } } - if eeC.Opts.ElsIfSeqNo != nil { - opts[utils.ElsIfSeqNo] = *eeC.Opts.ElsIfSeqNo + if kafkaOpts := eeC.Opts.Kafka; kafkaOpts != nil { + if kafkaOpts.KafkaTopic != nil { + opts[utils.KafkaTopic] = *kafkaOpts.KafkaTopic + } } - if eeC.Opts.ElsOpType != nil { - opts[utils.ElsOpType] = *eeC.Opts.ElsOpType + if amOpts := eeC.Opts.AMQP; amOpts != nil { + if amOpts.QueueID != nil { + opts[utils.AMQPQueueID] = *amOpts.QueueID + } + if amOpts.RoutingKey != nil { + opts[utils.AMQPRoutingKey] = *amOpts.RoutingKey + } + if amOpts.Exchange != nil { + opts[utils.AMQPExchange] = *amOpts.Exchange + } + if amOpts.ExchangeType != nil { + opts[utils.AMQPExchangeType] = *amOpts.ExchangeType + } + if amOpts.Username != nil { + opts[utils.AMQPUsername] = *amOpts.Username + } + if amOpts.Password != nil { + opts[utils.AMQPPassword] = *amOpts.Password + } } - if eeC.Opts.ElsPipeline != nil { - opts[utils.ElsPipeline] = *eeC.Opts.ElsPipeline + if awsOpts := eeC.Opts.AWS; awsOpts != nil { + if awsOpts.Region != nil { + opts[utils.AWSRegion] = *awsOpts.Region + } + if awsOpts.Key != nil { + opts[utils.AWSKey] = *awsOpts.Key + } + if awsOpts.Secret != nil { + opts[utils.AWSSecret] = *awsOpts.Secret + } + if awsOpts.Token != nil { + opts[utils.AWSToken] = *awsOpts.Token + } + if awsOpts.SQSQueueID != nil { + opts[utils.SQSQueueID] = *awsOpts.SQSQueueID + } + if awsOpts.S3BucketID != nil { + opts[utils.S3Bucket] = *awsOpts.S3BucketID + } + if awsOpts.S3FolderPath != nil { + opts[utils.S3FolderPath] = *awsOpts.S3FolderPath + } } - if eeC.Opts.ElsRouting != nil { - opts[utils.ElsRouting] = *eeC.Opts.ElsRouting + if natOpts := eeC.Opts.NATS; natOpts != nil { + if natOpts.JetStream != nil { + opts[utils.NatsJetStream] = *natOpts.JetStream + } + if natOpts.Subject != nil { + opts[utils.NatsSubject] = *natOpts.Subject + } + if natOpts.JWTFile != nil { + opts[utils.NatsJWTFile] = *natOpts.JWTFile + } + if natOpts.SeedFile != nil { + opts[utils.NatsSeedFile] = *natOpts.SeedFile + } + if natOpts.CertificateAuthority != nil { + opts[utils.NatsCertificateAuthority] = *natOpts.CertificateAuthority + } + if natOpts.ClientCertificate != nil { + opts[utils.NatsClientCertificate] = *natOpts.ClientCertificate + } + if natOpts.ClientKey != nil { + opts[utils.NatsClientKey] = *natOpts.ClientKey + } + if natOpts.JetStreamMaxWait != nil { + opts[utils.NatsJetStreamMaxWait] = natOpts.JetStreamMaxWait.String() + } } - if eeC.Opts.ElsTimeout != nil { - opts[utils.ElsTimeout] = eeC.Opts.ElsTimeout.String() - } - if eeC.Opts.ElsVersion != nil { - opts[utils.ElsVersionLow] = *eeC.Opts.ElsVersion - } - if eeC.Opts.ElsVersionType != nil { - opts[utils.ElsVersionType] = *eeC.Opts.ElsVersionType - } - if eeC.Opts.ElsWaitForActiveShards != nil { - opts[utils.ElsWaitForActiveShards] = *eeC.Opts.ElsWaitForActiveShards - } - if eeC.Opts.SQLMaxIdleConns != nil { - opts[utils.SQLMaxIdleConnsCfg] = *eeC.Opts.SQLMaxIdleConns - } - if eeC.Opts.SQLMaxOpenConns != nil { - opts[utils.SQLMaxOpenConns] = *eeC.Opts.SQLMaxOpenConns - } - if eeC.Opts.SQLConnMaxLifetime != nil { - opts[utils.SQLConnMaxLifetime] = eeC.Opts.SQLConnMaxLifetime.String() - } - if eeC.Opts.MYSQLDSNParams != nil { - opts[utils.MYSQLDSNParams] = eeC.Opts.MYSQLDSNParams - } - if eeC.Opts.SQLTableName != nil { - opts[utils.SQLTableNameOpt] = *eeC.Opts.SQLTableName - } - if eeC.Opts.SQLDBName != nil { - opts[utils.SQLDBNameOpt] = *eeC.Opts.SQLDBName - } - if eeC.Opts.PgSSLMode != nil { - opts[utils.PgSSLModeCfg] = *eeC.Opts.PgSSLMode - } - if eeC.Opts.KafkaTopic != nil { - opts[utils.KafkaTopic] = *eeC.Opts.KafkaTopic - } - if eeC.Opts.AMQPQueueID != nil { - opts[utils.AMQPQueueID] = *eeC.Opts.AMQPQueueID - } - if eeC.Opts.AMQPRoutingKey != nil { - opts[utils.AMQPRoutingKey] = *eeC.Opts.AMQPRoutingKey - } - if eeC.Opts.AMQPExchange != nil { - opts[utils.AMQPExchange] = *eeC.Opts.AMQPExchange - } - if eeC.Opts.AMQPExchangeType != nil { - opts[utils.AMQPExchangeType] = *eeC.Opts.AMQPExchangeType - } - if eeC.Opts.AMQPUsername != nil { - opts[utils.AMQPUsername] = *eeC.Opts.AMQPUsername - } - if eeC.Opts.AMQPPassword != nil { - opts[utils.AMQPPassword] = *eeC.Opts.AMQPPassword - } - if eeC.Opts.AWSRegion != nil { - opts[utils.AWSRegion] = *eeC.Opts.AWSRegion - } - if eeC.Opts.AWSKey != nil { - opts[utils.AWSKey] = *eeC.Opts.AWSKey - } - if eeC.Opts.AWSSecret != nil { - opts[utils.AWSSecret] = *eeC.Opts.AWSSecret - } - if eeC.Opts.AWSToken != nil { - opts[utils.AWSToken] = *eeC.Opts.AWSToken - } - if eeC.Opts.SQSQueueID != nil { - opts[utils.SQSQueueID] = *eeC.Opts.SQSQueueID - } - if eeC.Opts.S3BucketID != nil { - opts[utils.S3Bucket] = *eeC.Opts.S3BucketID - } - if eeC.Opts.S3FolderPath != nil { - opts[utils.S3FolderPath] = *eeC.Opts.S3FolderPath - } - if eeC.Opts.NATSJetStream != nil { - opts[utils.NatsJetStream] = *eeC.Opts.NATSJetStream - } - if eeC.Opts.NATSSubject != nil { - opts[utils.NatsSubject] = *eeC.Opts.NATSSubject - } - if eeC.Opts.NATSJWTFile != nil { - opts[utils.NatsJWTFile] = *eeC.Opts.NATSJWTFile - } - if eeC.Opts.NATSSeedFile != nil { - opts[utils.NatsSeedFile] = *eeC.Opts.NATSSeedFile - } - if eeC.Opts.NATSCertificateAuthority != nil { - opts[utils.NatsCertificateAuthority] = *eeC.Opts.NATSCertificateAuthority - } - if eeC.Opts.NATSClientCertificate != nil { - opts[utils.NatsClientCertificate] = *eeC.Opts.NATSClientCertificate - } - if eeC.Opts.NATSClientKey != nil { - opts[utils.NatsClientKey] = *eeC.Opts.NATSClientKey - } - if eeC.Opts.NATSJetStreamMaxWait != nil { - opts[utils.NatsJetStreamMaxWait] = eeC.Opts.NATSJetStreamMaxWait.String() - } - if eeC.Opts.RPCCodec != nil { - opts[utils.RpcCodec] = *eeC.Opts.RPCCodec - } - if eeC.Opts.ServiceMethod != nil { - opts[utils.ServiceMethod] = *eeC.Opts.ServiceMethod - } - if eeC.Opts.KeyPath != nil { - opts[utils.KeyPath] = *eeC.Opts.KeyPath - } - if eeC.Opts.CertPath != nil { - opts[utils.CertPath] = *eeC.Opts.CertPath - } - if eeC.Opts.CAPath != nil { - opts[utils.CaPath] = *eeC.Opts.CAPath - } - if eeC.Opts.TLS != nil { - opts[utils.Tls] = *eeC.Opts.TLS - } - if eeC.Opts.ConnIDs != nil { - opts[utils.ConnIDs] = *eeC.Opts.ConnIDs - } - if eeC.Opts.RPCConnTimeout != nil { - opts[utils.RpcConnTimeout] = eeC.Opts.RPCConnTimeout.String() - } - if eeC.Opts.RPCReplyTimeout != nil { - opts[utils.RpcReplyTimeout] = eeC.Opts.RPCReplyTimeout.String() - } - if eeC.Opts.RPCAPIOpts != nil { - opts[utils.RPCAPIOpts] = eeC.Opts.RPCAPIOpts + if rpcOpts := eeC.Opts.RPC; rpcOpts != nil { + if rpcOpts.RPCCodec != nil { + opts[utils.RpcCodec] = *rpcOpts.RPCCodec + } + if rpcOpts.ServiceMethod != nil { + opts[utils.ServiceMethod] = *rpcOpts.ServiceMethod + } + if rpcOpts.KeyPath != nil { + opts[utils.KeyPath] = *rpcOpts.KeyPath + } + if rpcOpts.CertPath != nil { + opts[utils.CertPath] = *rpcOpts.CertPath + } + if rpcOpts.CAPath != nil { + opts[utils.CaPath] = *rpcOpts.CAPath + } + if rpcOpts.TLS != nil { + opts[utils.Tls] = *rpcOpts.TLS + } + if rpcOpts.ConnIDs != nil { + opts[utils.ConnIDs] = *rpcOpts.ConnIDs + } + if rpcOpts.RPCConnTimeout != nil { + opts[utils.RpcConnTimeout] = rpcOpts.RPCConnTimeout.String() + } + if rpcOpts.RPCReplyTimeout != nil { + opts[utils.RpcReplyTimeout] = rpcOpts.RPCReplyTimeout.String() + } + if rpcOpts.RPCAPIOpts != nil { + opts[utils.RPCAPIOpts] = rpcOpts.RPCAPIOpts + } } flgs := eeC.Flags.SliceFlags() diff --git a/config/eescfg_test.go b/config/eescfg_test.go index fd0b2c227..8133ee729 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -222,57 +222,71 @@ func TestEESClone(t *testing.T) { Opts: &EventExporterOpts{ CSVFieldSeparator: utils.StringPointer(utils.InfieldSep), - MYSQLDSNParams: map[string]string{ - "allowOldPasswords": "true", - "allowNativePasswords": "true", + SQL: &SQLOpts{ + MYSQLDSNParams: map[string]string{ + "allowOldPasswords": "true", + "allowNativePasswords": "true", + }, + MaxIdleConns: utils.IntPointer(4), + ConnMaxLifetime: utils.DurationPointer(1 * time.Minute), + TableName: utils.StringPointer("table"), + DBName: utils.StringPointer("db"), + PgSSLMode: utils.StringPointer("pg"), + MaxOpenConns: utils.IntPointer(6), }, - ElsIndex: utils.StringPointer("test"), - ElsIfPrimaryTerm: utils.IntPointer(0), - ElsIfSeqNo: utils.IntPointer(0), - ElsOpType: utils.StringPointer("test2"), - ElsPipeline: utils.StringPointer("test3"), - ElsRouting: utils.StringPointer("test4"), - ElsTimeout: utils.DurationPointer(1 * time.Minute), - ElsVersion: utils.IntPointer(2), - ElsVersionType: utils.StringPointer("test5"), - ElsWaitForActiveShards: utils.StringPointer("test6"), - SQLMaxIdleConns: utils.IntPointer(4), - SQLConnMaxLifetime: utils.DurationPointer(1 * time.Minute), - SQLTableName: utils.StringPointer("table"), - SQLDBName: utils.StringPointer("db"), - PgSSLMode: utils.StringPointer("pg"), - KafkaTopic: utils.StringPointer("kafka"), - SQLMaxOpenConns: utils.IntPointer(6), - AWSToken: utils.StringPointer("token"), - S3FolderPath: utils.StringPointer("s3"), - NATSJetStream: utils.BoolPointer(true), - NATSSubject: utils.StringPointer("nat"), - NATSJWTFile: utils.StringPointer("jwt"), - NATSSeedFile: utils.StringPointer("seed"), - NATSCertificateAuthority: utils.StringPointer("NATS"), - NATSClientCertificate: utils.StringPointer("NATSClient"), - NATSClientKey: utils.StringPointer("key"), - NATSJetStreamMaxWait: utils.DurationPointer(1 * time.Minute), - AMQPRoutingKey: utils.StringPointer("key"), - AMQPQueueID: utils.StringPointer("id"), - AMQPExchangeType: utils.StringPointer("type"), - AMQPExchange: utils.StringPointer("exchange"), - AWSRegion: utils.StringPointer("eu"), - AWSKey: utils.StringPointer("key"), - AWSSecret: utils.StringPointer("secretkey"), - S3BucketID: utils.StringPointer("s3"), - SQSQueueID: utils.StringPointer("sqsid"), - RPCCodec: utils.StringPointer("rpc"), - ServiceMethod: utils.StringPointer("service"), - KeyPath: utils.StringPointer("path"), - CertPath: utils.StringPointer("certpath"), - CAPath: utils.StringPointer("capath"), - TLS: utils.BoolPointer(true), - ConnIDs: utils.SliceStringPointer([]string{"id1", "id2"}), - RPCConnTimeout: utils.DurationPointer(1 * time.Minute), - RPCReplyTimeout: utils.DurationPointer(1 * time.Minute), - RPCAPIOpts: map[string]any{ - "key": "val", + Els: &ElsOpts{ + Index: utils.StringPointer("test"), + IfPrimaryTerm: utils.IntPointer(0), + IfSeqNo: utils.IntPointer(0), + OpType: utils.StringPointer("test2"), + Pipeline: utils.StringPointer("test3"), + Routing: utils.StringPointer("test4"), + Timeout: utils.DurationPointer(1 * time.Minute), + Version: utils.IntPointer(2), + VersionType: utils.StringPointer("test5"), + WaitForActiveShards: utils.StringPointer("test6"), + }, + Kafka: &KafkaOpts{ + KafkaTopic: utils.StringPointer("kafka"), + }, + AWS: &AWSOpts{ + Token: utils.StringPointer("token"), + S3FolderPath: utils.StringPointer("s3"), + Region: utils.StringPointer("eu"), + Key: utils.StringPointer("key"), + Secret: utils.StringPointer("secretkey"), + S3BucketID: utils.StringPointer("s3"), + SQSQueueID: utils.StringPointer("sqsid"), + }, + NATS: &NATSOpts{ + JetStream: utils.BoolPointer(true), + Subject: utils.StringPointer("nat"), + JWTFile: utils.StringPointer("jwt"), + SeedFile: utils.StringPointer("seed"), + CertificateAuthority: utils.StringPointer("NATS"), + ClientCertificate: utils.StringPointer("NATSClient"), + ClientKey: utils.StringPointer("key"), + JetStreamMaxWait: utils.DurationPointer(1 * time.Minute), + }, + AMQP: &AMQPOpts{ + RoutingKey: utils.StringPointer("key"), + QueueID: utils.StringPointer("id"), + ExchangeType: utils.StringPointer("type"), + Exchange: utils.StringPointer("exchange"), + }, + RPC: &RPCOpts{ + RPCCodec: utils.StringPointer("rpc"), + ServiceMethod: utils.StringPointer("service"), + KeyPath: utils.StringPointer("path"), + CertPath: utils.StringPointer("certpath"), + CAPath: utils.StringPointer("capath"), + TLS: utils.BoolPointer(true), + ConnIDs: utils.SliceStringPointer([]string{"id1", "id2"}), + RPCConnTimeout: utils.DurationPointer(1 * time.Minute), + RPCReplyTimeout: utils.DurationPointer(1 * time.Minute), + RPCAPIOpts: map[string]any{ + "key": "val", + }, }, }, }, @@ -382,36 +396,51 @@ func TestEventExporterOptsloadFromJsonCfg(t *testing.T) { } expected := &EventExporterOpts{ - - ElsIndex: utils.StringPointer("test"), - ElsIfPrimaryTerm: utils.IntPointer(0), - ElsIfSeqNo: utils.IntPointer(0), - ElsOpType: utils.StringPointer("test2"), - ElsPipeline: utils.StringPointer("test3"), - ElsRouting: utils.StringPointer("test4"), - ElsTimeout: utils.DurationPointer(1 * time.Minute), - ElsVersion: utils.IntPointer(2), - ElsVersionType: utils.StringPointer("test5"), - ElsWaitForActiveShards: utils.StringPointer("test6"), - SQLMaxIdleConns: utils.IntPointer(4), - SQLMaxOpenConns: utils.IntPointer(6), - SQLConnMaxLifetime: utils.DurationPointer(1 * time.Minute), - SQLTableName: utils.StringPointer("table"), - SQLDBName: utils.StringPointer("db"), - PgSSLMode: utils.StringPointer("pg"), - AWSToken: utils.StringPointer("token"), - S3FolderPath: utils.StringPointer("s3"), - NATSJetStream: utils.BoolPointer(true), - NATSSubject: utils.StringPointer("nat"), - NATSJWTFile: utils.StringPointer("jwt"), - NATSSeedFile: utils.StringPointer("seed"), - NATSCertificateAuthority: utils.StringPointer("NATS"), - NATSClientCertificate: utils.StringPointer("NATSClient"), - NATSClientKey: utils.StringPointer("key"), - NATSJetStreamMaxWait: utils.DurationPointer(1 * time.Minute), + Els: &ElsOpts{ + Index: utils.StringPointer("test"), + IfPrimaryTerm: utils.IntPointer(0), + IfSeqNo: utils.IntPointer(0), + OpType: utils.StringPointer("test2"), + Pipeline: utils.StringPointer("test3"), + Routing: utils.StringPointer("test4"), + Timeout: utils.DurationPointer(1 * time.Minute), + Version: utils.IntPointer(2), + VersionType: utils.StringPointer("test5"), + WaitForActiveShards: utils.StringPointer("test6"), + }, + SQL: &SQLOpts{ + MaxIdleConns: utils.IntPointer(4), + MaxOpenConns: utils.IntPointer(6), + ConnMaxLifetime: utils.DurationPointer(1 * time.Minute), + TableName: utils.StringPointer("table"), + DBName: utils.StringPointer("db"), + PgSSLMode: utils.StringPointer("pg"), + }, + AWS: &AWSOpts{ + Token: utils.StringPointer("token"), + S3FolderPath: utils.StringPointer("s3"), + }, + NATS: &NATSOpts{ + JetStream: utils.BoolPointer(true), + Subject: utils.StringPointer("nat"), + JWTFile: utils.StringPointer("jwt"), + SeedFile: utils.StringPointer("seed"), + CertificateAuthority: utils.StringPointer("NATS"), + ClientCertificate: utils.StringPointer("NATSClient"), + ClientKey: utils.StringPointer("key"), + JetStreamMaxWait: utils.DurationPointer(1 * time.Minute), + }, } eventExporter := &EventExporterCfg{ - Opts: &EventExporterOpts{}, + Opts: &EventExporterOpts{ + Els: &ElsOpts{}, + Kafka: &KafkaOpts{}, + AMQP: &AMQPOpts{}, + NATS: &NATSOpts{}, + SQL: &SQLOpts{}, + RPC: &RPCOpts{}, + AWS: &AWSOpts{}, + }, } if err := eventExporter.Opts.loadFromJSONCfg(eventExporterOptsJSON); err != nil { t.Error(expected) @@ -708,19 +737,27 @@ func TestEEsCfgloadFromJsonCfgCase2(t *testing.T) { }, Exporters: []*EventExporterCfg{ { - ID: utils.MetaDefault, - Type: utils.MetaNone, - ExportPath: "/var/spool/cgrates/ees", - Attempts: 1, - Timezone: utils.EmptyString, - Filters: []string{}, - AttributeSIDs: []string{}, - Flags: utils.FlagsWithParams{}, - contentFields: []*FCTemplate{}, - Fields: []*FCTemplate{}, - headerFields: []*FCTemplate{}, - trailerFields: []*FCTemplate{}, - Opts: &EventExporterOpts{}, + ID: utils.MetaDefault, + Type: utils.MetaNone, + ExportPath: "/var/spool/cgrates/ees", + Attempts: 1, + Timezone: utils.EmptyString, + Filters: []string{}, + AttributeSIDs: []string{}, + Flags: utils.FlagsWithParams{}, + contentFields: []*FCTemplate{}, + Fields: []*FCTemplate{}, + headerFields: []*FCTemplate{}, + trailerFields: []*FCTemplate{}, + Opts: &EventExporterOpts{ + Els: &ElsOpts{}, + Kafka: &KafkaOpts{}, + AMQP: &AMQPOpts{}, + SQL: &SQLOpts{}, + AWS: &AWSOpts{}, + NATS: &NATSOpts{}, + RPC: &RPCOpts{}, + }, FailedPostsDir: "/var/spool/cgrates/failed_posts", }, { diff --git a/ees/amqp.go b/ees/amqp.go index 63149437e..bdc872d94 100644 --- a/ees/amqp.go +++ b/ees/amqp.go @@ -56,20 +56,22 @@ type AMQPee struct { } func (pstr *AMQPee) parseOpts(dialURL *config.EventExporterOpts) { - pstr.queueID = utils.DefaultQueueID - pstr.routingKey = utils.DefaultQueueID - if dialURL.AMQPQueueID != nil { - pstr.queueID = *dialURL.AMQPQueueID - } - if dialURL.AMQPRoutingKey != nil { - pstr.routingKey = *dialURL.AMQPRoutingKey - } - if dialURL.AMQPExchange != nil { - pstr.exchange = *dialURL.AMQPExchange - pstr.exchangeType = utils.DefaultExchangeType - } - if dialURL.AMQPExchangeType != nil { - pstr.exchangeType = *dialURL.AMQPExchangeType + if amqpOpts := dialURL.AMQP; amqpOpts != nil { + pstr.queueID = utils.DefaultQueueID + pstr.routingKey = utils.DefaultQueueID + if amqpOpts.QueueID != nil { + pstr.queueID = *amqpOpts.QueueID + } + if amqpOpts.RoutingKey != nil { + pstr.routingKey = *amqpOpts.RoutingKey + } + if amqpOpts.Exchange != nil { + pstr.exchange = *amqpOpts.Exchange + pstr.exchangeType = utils.DefaultExchangeType + } + if amqpOpts.ExchangeType != nil { + pstr.exchangeType = *amqpOpts.ExchangeType + } } } diff --git a/ees/amqpv1.go b/ees/amqpv1.go index 74d842a40..a4a05eb45 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -35,12 +35,14 @@ func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1 queueID: "/" + utils.DefaultQueueID, reqs: newConcReq(cfg.ConcurrentRequests), } - if cfg.Opts.AMQPQueueID != nil { - pstr.queueID = "/" + *cfg.Opts.AMQPQueueID - } - if cfg.Opts.AMQPUsername != nil && cfg.Opts.AMQPPassword != nil { - pstr.connOpts = &amqpv1.ConnOptions{ - SASLType: amqpv1.SASLTypePlain(*cfg.Opts.AMQPUsername, *cfg.Opts.AMQPPassword), + if amqp := cfg.Opts.AMQP; amqp != nil { + if amqp.QueueID != nil { + pstr.queueID = "/" + *amqp.QueueID + } + if amqp.Username != nil && amqp.Password != nil { + pstr.connOpts = &amqpv1.ConnOptions{ + SASLType: amqpv1.SASLTypePlain(*amqp.Username, *amqp.Password), + } } } return pstr diff --git a/ees/amqpv1_it_test.go b/ees/amqpv1_it_test.go index 1b6f1778b..2dfd196f2 100644 --- a/ees/amqpv1_it_test.go +++ b/ees/amqpv1_it_test.go @@ -74,9 +74,9 @@ func testAMQPv1LoadConfig(t *testing.T) { for _, value := range amqpv1Cfg.EEsCfg().Exporters { if value.ID == "amqpv1_test_file" { amqpv1DialURL = value.ExportPath - if value.Opts.AMQPUsername != nil && value.Opts.AMQPPassword != nil { + if value.Opts.AMQP.Username != nil && value.Opts.AMQP.Password != nil { amqpv1ConnOpts = &amqpv1.ConnOptions{ - SASLType: amqpv1.SASLTypePlain(*value.Opts.AMQPUsername, *value.Opts.AMQPPassword), + SASLType: amqpv1.SASLTypePlain(*value.Opts.AMQP.Username, *value.Opts.AMQP.Password), } } } diff --git a/ees/elastic.go b/ees/elastic.go index 78601cdc6..bcdb3aff1 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -61,70 +61,70 @@ type ElasticEE struct { func (eEe *ElasticEE) prepareOpts() (err error) { //parse opts eEe.indxOpts.Index = utils.CDRsTBL - if eEe.Cfg().Opts.ElsIndex != nil { - eEe.indxOpts.Index = *eEe.Cfg().Opts.ElsIndex + if eEe.Cfg().Opts.Els.Index != nil { + eEe.indxOpts.Index = *eEe.Cfg().Opts.Els.Index } - eEe.indxOpts.IfPrimaryTerm = eEe.Cfg().Opts.ElsIfPrimaryTerm - eEe.indxOpts.IfSeqNo = eEe.Cfg().Opts.ElsIfSeqNo - if eEe.Cfg().Opts.ElsOpType != nil { - eEe.indxOpts.OpType = *eEe.Cfg().Opts.ElsOpType + eEe.indxOpts.IfPrimaryTerm = eEe.Cfg().Opts.Els.IfPrimaryTerm + eEe.indxOpts.IfSeqNo = eEe.Cfg().Opts.Els.IfSeqNo + if eEe.Cfg().Opts.Els.OpType != nil { + eEe.indxOpts.OpType = *eEe.Cfg().Opts.Els.OpType } - if eEe.Cfg().Opts.ElsPipeline != nil { - eEe.indxOpts.Pipeline = *eEe.Cfg().Opts.ElsPipeline + if eEe.Cfg().Opts.Els.Pipeline != nil { + eEe.indxOpts.Pipeline = *eEe.Cfg().Opts.Els.Pipeline } - if eEe.Cfg().Opts.ElsRouting != nil { - eEe.indxOpts.Routing = *eEe.Cfg().Opts.ElsRouting + if eEe.Cfg().Opts.Els.Routing != nil { + eEe.indxOpts.Routing = *eEe.Cfg().Opts.Els.Routing } - if eEe.Cfg().Opts.ElsTimeout != nil { - eEe.indxOpts.Timeout = *eEe.Cfg().Opts.ElsTimeout + if eEe.Cfg().Opts.Els.Timeout != nil { + eEe.indxOpts.Timeout = *eEe.Cfg().Opts.Els.Timeout } - eEe.indxOpts.Version = eEe.Cfg().Opts.ElsVersion - if eEe.Cfg().Opts.ElsVersionType != nil { - eEe.indxOpts.VersionType = *eEe.Cfg().Opts.ElsVersionType + eEe.indxOpts.Version = eEe.Cfg().Opts.Els.Version + if eEe.Cfg().Opts.Els.VersionType != nil { + eEe.indxOpts.VersionType = *eEe.Cfg().Opts.Els.VersionType } - if eEe.Cfg().Opts.ElsWaitForActiveShards != nil { - eEe.indxOpts.WaitForActiveShards = *eEe.Cfg().Opts.ElsWaitForActiveShards + if eEe.Cfg().Opts.Els.WaitForActiveShards != nil { + eEe.indxOpts.WaitForActiveShards = *eEe.Cfg().Opts.Els.WaitForActiveShards } //client config - if eEe.Cfg().Opts.ElsCloud != nil && *eEe.Cfg().Opts.ElsCloud { + if eEe.Cfg().Opts.Els.Cloud != nil && *eEe.Cfg().Opts.Els.Cloud { eEe.clnOpts.CloudID = eEe.Cfg().ExportPath } else { eEe.clnOpts.Addresses = strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep) } - if eEe.Cfg().Opts.ElsUsername != nil { - eEe.clnOpts.Username = *eEe.Cfg().Opts.ElsUsername + if eEe.Cfg().Opts.Els.Username != nil { + eEe.clnOpts.Username = *eEe.Cfg().Opts.Els.Username } - if eEe.Cfg().Opts.ElsPassword != nil { - eEe.clnOpts.Password = *eEe.Cfg().Opts.ElsPassword + if eEe.Cfg().Opts.Els.Password != nil { + eEe.clnOpts.Password = *eEe.Cfg().Opts.Els.Password } - if eEe.Cfg().Opts.ElsAPIKey != nil { - eEe.clnOpts.APIKey = *eEe.Cfg().Opts.ElsAPIKey + if eEe.Cfg().Opts.Els.APIKey != nil { + eEe.clnOpts.APIKey = *eEe.Cfg().Opts.Els.APIKey } - if eEe.Cfg().Opts.CAPath != nil { + if eEe.Cfg().Opts.RPC.CAPath != nil { var cacert []byte - cacert, err = os.ReadFile(*eEe.Cfg().Opts.CAPath) + cacert, err = os.ReadFile(*eEe.Cfg().Opts.RPC.CAPath) if err != nil { return } eEe.clnOpts.CACert = cacert } - if eEe.Cfg().Opts.ElsCertificateFingerprint != nil { - eEe.clnOpts.CertificateFingerprint = *eEe.Cfg().Opts.ElsCertificateFingerprint + if eEe.Cfg().Opts.Els.CertificateFingerprint != nil { + eEe.clnOpts.CertificateFingerprint = *eEe.Cfg().Opts.Els.CertificateFingerprint } - if eEe.Cfg().Opts.ElsServiceToken != nil { - eEe.clnOpts.ServiceToken = *eEe.Cfg().Opts.ElsServiceToken + if eEe.Cfg().Opts.Els.ServiceToken != nil { + eEe.clnOpts.ServiceToken = *eEe.Cfg().Opts.Els.ServiceToken } - if eEe.Cfg().Opts.ElsDiscoverNodesOnStart != nil { - eEe.clnOpts.DiscoverNodesOnStart = *eEe.Cfg().Opts.ElsDiscoverNodesOnStart + if eEe.Cfg().Opts.Els.DiscoverNodesOnStart != nil { + eEe.clnOpts.DiscoverNodesOnStart = *eEe.Cfg().Opts.Els.DiscoverNodesOnStart } - if eEe.Cfg().Opts.ElsDiscoverNodeInterval != nil { - eEe.clnOpts.DiscoverNodesInterval = *eEe.Cfg().Opts.ElsDiscoverNodeInterval + if eEe.Cfg().Opts.Els.DiscoverNodeInterval != nil { + eEe.clnOpts.DiscoverNodesInterval = *eEe.Cfg().Opts.Els.DiscoverNodeInterval } - if eEe.Cfg().Opts.ElsEnableDebugLogger != nil { - eEe.clnOpts.EnableDebugLogger = *eEe.Cfg().Opts.ElsEnableDebugLogger + if eEe.Cfg().Opts.Els.EnableDebugLogger != nil { + eEe.clnOpts.EnableDebugLogger = *eEe.Cfg().Opts.Els.EnableDebugLogger } - if loggerType := eEe.Cfg().Opts.ElsLogger; loggerType != nil { + if loggerType := eEe.Cfg().Opts.Els.Logger; loggerType != nil { var logger elastictransport.Logger switch *loggerType { case utils.ElsJson: @@ -138,20 +138,20 @@ func (eEe *ElasticEE) prepareOpts() (err error) { } eEe.clnOpts.Logger = logger } - if eEe.Cfg().Opts.ElsCompressRequestBody != nil { - eEe.clnOpts.CompressRequestBody = *eEe.Cfg().Opts.ElsCompressRequestBody + if eEe.Cfg().Opts.Els.CompressRequestBody != nil { + eEe.clnOpts.CompressRequestBody = *eEe.Cfg().Opts.Els.CompressRequestBody } - if eEe.Cfg().Opts.ElsRetryOnStatus != nil { - eEe.clnOpts.RetryOnStatus = *eEe.Cfg().Opts.ElsRetryOnStatus + if eEe.Cfg().Opts.Els.RetryOnStatus != nil { + eEe.clnOpts.RetryOnStatus = *eEe.Cfg().Opts.Els.RetryOnStatus } - if eEe.Cfg().Opts.ElsMaxRetries != nil { - eEe.clnOpts.MaxRetries = *eEe.Cfg().Opts.ElsMaxRetries + if eEe.Cfg().Opts.Els.MaxRetries != nil { + eEe.clnOpts.MaxRetries = *eEe.Cfg().Opts.Els.MaxRetries } - if eEe.Cfg().Opts.ElsDisableRetry != nil { - eEe.clnOpts.DisableRetry = *eEe.Cfg().Opts.ElsDisableRetry + if eEe.Cfg().Opts.Els.DisableRetry != nil { + eEe.clnOpts.DisableRetry = *eEe.Cfg().Opts.Els.DisableRetry } - if eEe.Cfg().Opts.ElsCompressRequestBodyLevel != nil { - eEe.clnOpts.CompressRequestBodyLevel = *eEe.Cfg().Opts.ElsCompressRequestBodyLevel + if eEe.Cfg().Opts.Els.CompressRequestBodyLevel != nil { + eEe.clnOpts.CompressRequestBodyLevel = *eEe.Cfg().Opts.Els.CompressRequestBodyLevel } return } diff --git a/ees/elastic_test.go b/ees/elastic_test.go index 47e898f4e..16b9c30d3 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -57,7 +57,8 @@ func TestInitCase1(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ Opts: &config.EventExporterOpts{ - ElsIndex: utils.StringPointer("test"), + Els: &config.ElsOpts{Index: utils.StringPointer("test")}, + RPC: &config.RPCOpts{}, }, }, } @@ -74,7 +75,9 @@ func TestInitCase2(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ Opts: &config.EventExporterOpts{ - ElsIfPrimaryTerm: utils.IntPointer(20), + Els: &config.ElsOpts{ + IfPrimaryTerm: utils.IntPointer(20)}, + RPC: &config.RPCOpts{}, }, }, } @@ -91,7 +94,9 @@ func TestInitCase3(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ Opts: &config.EventExporterOpts{ - ElsIfSeqNo: utils.IntPointer(20), + Els: &config.ElsOpts{ + IfSeqNo: utils.IntPointer(20)}, + RPC: &config.RPCOpts{}, }, }, } @@ -108,7 +113,9 @@ func TestInitCase4(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ Opts: &config.EventExporterOpts{ - ElsOpType: utils.StringPointer("test"), + Els: &config.ElsOpts{ + OpType: utils.StringPointer("test")}, + RPC: &config.RPCOpts{}, }, }, } @@ -125,7 +132,9 @@ func TestInitCase5(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ Opts: &config.EventExporterOpts{ - ElsPipeline: utils.StringPointer("test"), + Els: &config.ElsOpts{ + Pipeline: utils.StringPointer("test")}, + RPC: &config.RPCOpts{}, }, }, } @@ -142,7 +151,9 @@ func TestInitCase6(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ Opts: &config.EventExporterOpts{ - ElsRouting: utils.StringPointer("test"), + Els: &config.ElsOpts{ + Routing: utils.StringPointer("test")}, + RPC: &config.RPCOpts{}, }, }, } @@ -159,7 +170,10 @@ func TestInitCase8(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ Opts: &config.EventExporterOpts{ - ElsVersion: utils.IntPointer(20), + Els: &config.ElsOpts{ + Version: utils.IntPointer(20), + }, + RPC: &config.RPCOpts{}, }, }, } @@ -176,7 +190,10 @@ func TestInitCase9(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ Opts: &config.EventExporterOpts{ - ElsVersionType: utils.StringPointer("test"), + Els: &config.ElsOpts{ + VersionType: utils.StringPointer("test"), + }, + RPC: &config.RPCOpts{}, }, }, } @@ -193,7 +210,9 @@ func TestInitCase10(t *testing.T) { ee := &ElasticEE{ cfg: &config.EventExporterCfg{ Opts: &config.EventExporterOpts{ - ElsWaitForActiveShards: utils.StringPointer("test"), + Els: &config.ElsOpts{ + WaitForActiveShards: utils.StringPointer("test")}, + RPC: &config.RPCOpts{}, }, }, } diff --git a/ees/kafka.go b/ees/kafka.go index feb6dbe6c..dd681fdf9 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -34,8 +34,10 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE topic: utils.DefaultQueueID, reqs: newConcReq(cfg.ConcurrentRequests), } - if cfg.Opts.KafkaTopic != nil { - kfkPstr.topic = *cfg.Opts.KafkaTopic + if kafkaOpts := cfg.Opts.Kafka; kafkaOpts != nil { + if kafkaOpts.KafkaTopic != nil { + kfkPstr.topic = *cfg.Opts.Kafka.KafkaTopic + } } return kfkPstr } @@ -62,11 +64,6 @@ func (pstr *KafkaEE) Connect() (_ error) { Topic: pstr.topic, MaxAttempts: pstr.Cfg().Attempts, } - // pstr.writer = kafka.NewWriter(kafka.WriterConfig{ - // Brokers: []string{pstr.Cfg().ExportPath}, - // MaxAttempts: pstr.Cfg().Attempts, - // Topic: pstr.topic, - // }) } pstr.Unlock() return diff --git a/ees/libcdre.go b/ees/libcdre.go index 42489bc9f..06dfcde7b 100644 --- a/ees/libcdre.go +++ b/ees/libcdre.go @@ -63,17 +63,25 @@ func AddFailedPost(failedPostsDir, expPath, format string, ev any, opts *config. var s3BucketID string var sqsQueueID string var kafkaTopic string - if opts.AMQPQueueID != nil { - amqpQueueID = *opts.AMQPQueueID + + if amqpOpts := opts.AMQP; amqpOpts != nil { + if opts.AMQP.QueueID != nil { + amqpQueueID = *opts.AMQP.QueueID + } } - if opts.S3BucketID != nil { - s3BucketID = *opts.S3BucketID + + if awsOpts := opts.AWS; awsOpts != nil { + if opts.AWS.S3BucketID != nil { + s3BucketID = *opts.AWS.S3BucketID + } + if opts.AWS.SQSQueueID != nil { + sqsQueueID = *opts.AWS.SQSQueueID + } } - if opts.SQSQueueID != nil { - sqsQueueID = *opts.SQSQueueID - } - if opts.KafkaTopic != nil { - kafkaTopic = *opts.KafkaTopic + if kfkOpts := opts.Kafka; kfkOpts != nil { + if opts.Kafka.KafkaTopic != nil { + kafkaTopic = *opts.Kafka.KafkaTopic + } } if qID := utils.FirstNonEmpty(amqpQueueID, s3BucketID, sqsQueueID, kafkaTopic); len(qID) != 0 { diff --git a/ees/libcdre_test.go b/ees/libcdre_test.go index 78e4d6d13..dd7b148df 100644 --- a/ees/libcdre_test.go +++ b/ees/libcdre_test.go @@ -39,7 +39,14 @@ func TestSetFldPostCacheTTL(t *testing.T) { func TestAddFldPost(t *testing.T) { SetFailedPostCacheTTL(5 * time.Second) - AddFailedPost("", "path1", "format1", "1", &config.EventExporterOpts{}) + AddFailedPost("", "path1", "format1", "1", &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }) x, ok := failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1")) if !ok { t.Error("Error reading from cache") @@ -56,14 +63,35 @@ func TestAddFldPost(t *testing.T) { Path: "path1", Format: "format1", Events: []any{"1"}, - Opts: &config.EventExporterOpts{}, + Opts: &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }, } if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) } - AddFailedPost("", "path1", "format1", "2", &config.EventExporterOpts{}) + AddFailedPost("", "path1", "format1", "2", &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }) AddFailedPost("", "path2", "format2", "3", &config.EventExporterOpts{ - SQSQueueID: utils.StringPointer("qID"), + AWS: &config.AWSOpts{ + SQSQueueID: utils.StringPointer("qID"), + }, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, }) x, ok = failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1")) if !ok { @@ -80,7 +108,14 @@ func TestAddFldPost(t *testing.T) { Path: "path1", Format: "format1", Events: []any{"1", "2"}, - Opts: &config.EventExporterOpts{}, + Opts: &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }, } if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) @@ -101,7 +136,8 @@ func TestAddFldPost(t *testing.T) { Format: "format2", Events: []any{"3"}, Opts: &config.EventExporterOpts{ - SQSQueueID: utils.StringPointer("qID"), + AWS: &config.AWSOpts{ + SQSQueueID: utils.StringPointer("qID")}, }, } if !reflect.DeepEqual(eOut, failedPost) { diff --git a/ees/nats.go b/ees/nats.go index 19606239b..c8b5f7c3c 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -61,17 +61,21 @@ type NatsEE struct { } func (pstr *NatsEE) parseOpt(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (err error) { - if opts.NATSJetStream != nil { - pstr.jetStream = *opts.NATSJetStream - } - pstr.subject = utils.DefaultQueueID - if opts.NATSSubject != nil { - pstr.subject = *opts.NATSSubject - } - pstr.opts, err = GetNatsOpts(opts, nodeID, connTimeout) - if pstr.jetStream { - if opts.NATSJetStreamMaxWait != nil { - pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*opts.NATSJetStreamMaxWait)} + + if natsOpts := opts.NATS; natsOpts != nil { + if natsOpts.JetStream != nil { + pstr.jetStream = *natsOpts.JetStream + } + pstr.subject = utils.DefaultQueueID + if natsOpts.Subject != nil { + pstr.subject = *natsOpts.Subject + } + + pstr.opts, err = GetNatsOpts(natsOpts, nodeID, connTimeout) + if pstr.jetStream { + if natsOpts.JetStreamMaxWait != nil { + pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*natsOpts.JetStreamMaxWait)} + } } } return @@ -123,51 +127,51 @@ func (pstr *NatsEE) Close() (err error) { func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } -func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { +func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { nop = make([]nats.Option, 0, 7) nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID), nats.Timeout(connTimeout), nats.DrainTimeout(time.Second)) - if opts.NATSJWTFile != nil { + if opts.JWTFile != nil { keys := make([]string, 0, 1) - if opts.NATSSeedFile != nil { - keys = append(keys, *opts.NATSSeedFile) + if opts.SeedFile != nil { + keys = append(keys, *opts.SeedFile) } - nop = append(nop, nats.UserCredentials(*opts.NATSJWTFile, keys...)) + nop = append(nop, nats.UserCredentials(*opts.JWTFile, keys...)) } - if opts.NATSSeedFile != nil { - opt, err := nats.NkeyOptionFromSeed(*opts.NATSSeedFile) + if opts.SeedFile != nil { + opt, err := nats.NkeyOptionFromSeed(*opts.SeedFile) if err != nil { return nil, err } nop = append(nop, opt) } - if opts.NATSClientCertificate != nil { - if opts.NATSClientKey == nil { + if opts.ClientCertificate != nil { + if opts.ClientKey == nil { err = fmt.Errorf("has certificate but no key") return } - nop = append(nop, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey)) - } else if opts.NATSClientKey != nil { + nop = append(nop, nats.ClientCert(*opts.ClientCertificate, *opts.ClientKey)) + } else if opts.ClientKey != nil { err = fmt.Errorf("has key but no certificate") return } - if opts.NATSCertificateAuthority != nil { + if opts.CertificateAuthority != nil { nop = append(nop, func(o *nats.Options) error { pool, err := x509.SystemCertPool() if err != nil { return err } - rootPEM, err := ioutil.ReadFile(*opts.NATSCertificateAuthority) + rootPEM, err := ioutil.ReadFile(*opts.CertificateAuthority) if err != nil || rootPEM == nil { return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err) } ok := pool.AppendCertsFromPEM(rootPEM) if !ok { return fmt.Errorf("nats: failed to parse root certificate from %q", - *opts.NATSCertificateAuthority) + *opts.CertificateAuthority) } if o.TLSConfig == nil { o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go index fe9355241..cd2cf8e7e 100644 --- a/ees/nats_it_test.go +++ b/ees/nats_it_test.go @@ -58,7 +58,7 @@ func TestNatsEEJetStream(t *testing.T) { t.Fatal(err) } - nop, err := GetNatsOpts(cfg.Opts, "natsTest", time.Second) + nop, err := GetNatsOpts(cfg.Opts.NATS, "natsTest", time.Second) if err != nil { t.Fatal(err) } @@ -148,7 +148,7 @@ func TestNatsEE(t *testing.T) { t.Fatal(err) } - nop, err := GetNatsOpts(cfg.Opts, "natsTest", time.Second) + nop, err := GetNatsOpts(cfg.Opts.NATS, "natsTest", time.Second) if err != nil { t.Fatal(err) } @@ -196,8 +196,8 @@ func TestGetNatsOptsSeedFile(t *testing.T) { nkey := "SUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY" os.WriteFile("/tmp/nkey.txt", []byte(nkey), 0777) - opts := &config.EventExporterOpts{ - NATSSeedFile: utils.StringPointer("/tmp/nkey.txt"), + opts := &config.NATSOpts{ + SeedFile: utils.StringPointer("/tmp/nkey.txt"), } nodeID := "node_id1" diff --git a/ees/nats_test.go b/ees/nats_test.go index 8bc377f44..b2fd7daec 100644 --- a/ees/nats_test.go +++ b/ees/nats_test.go @@ -34,7 +34,14 @@ func TestNewNatsEE(t *testing.T) { Type: "nats", Attempts: 2, ConcurrentRequests: 2, - Opts: &config.EventExporterOpts{}, + Opts: &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }, } nodeID := "node_id1" connTimeout := 2 * time.Second @@ -73,7 +80,14 @@ func TestParseOpt(t *testing.T) { Type: "nats", Attempts: 2, ConcurrentRequests: 2, - Opts: &config.EventExporterOpts{}, + Opts: &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }, } opts := &config.EventExporterOpts{} nodeID := "node_id1" @@ -100,10 +114,18 @@ func TestParseOptJetStream(t *testing.T) { Type: "nats", Attempts: 2, ConcurrentRequests: 2, - Opts: &config.EventExporterOpts{}, + Opts: &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }, } opts := &config.EventExporterOpts{ - NATSJetStream: utils.BoolPointer(true), + NATS: &config.NATSOpts{ + JetStream: utils.BoolPointer(true)}, } nodeID := "node_id1" connTimeout := 2 * time.Second @@ -132,12 +154,20 @@ func TestParseOptJetStreamMaxWait(t *testing.T) { Type: "nats", Attempts: 2, ConcurrentRequests: 2, - Opts: &config.EventExporterOpts{}, + Opts: &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }, } opts := &config.EventExporterOpts{ - NATSJetStream: utils.BoolPointer(true), - NATSJetStreamMaxWait: utils.DurationPointer(2), - } + NATS: &config.NATSOpts{ + JetStream: utils.BoolPointer(true), + JetStreamMaxWait: utils.DurationPointer(2), + }} nodeID := "node_id1" connTimeout := 2 * time.Second dc, err := newEEMetrics("Local") @@ -165,11 +195,19 @@ func TestParseOptSubject(t *testing.T) { Type: "nats", Attempts: 2, ConcurrentRequests: 2, - Opts: &config.EventExporterOpts{}, + Opts: &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }, } opts := &config.EventExporterOpts{ - NATSSubject: utils.StringPointer("nats_subject"), - } + NATS: &config.NATSOpts{ + Subject: utils.StringPointer("nats_subject"), + }} nodeID := "node_id1" connTimeout := 2 * time.Second dc, err := newEEMetrics("Local") @@ -186,14 +224,14 @@ func TestParseOptSubject(t *testing.T) { t.Error(err) } - if opts.NATSSubject == nil || pstr.subject != *opts.NATSSubject { - t.Errorf("Expected %v \n but received \n %v", *opts.NATSSubject, pstr.subject) + if opts.NATS.Subject == nil || pstr.subject != *opts.NATS.Subject { + t.Errorf("Expected %v \n but received \n %v", *opts.NATS.Subject, pstr.subject) } } func TestGetNatsOptsJWT(t *testing.T) { - opts := &config.EventExporterOpts{ - NATSJWTFile: utils.StringPointer("jwtfile"), + opts := &config.NATSOpts{ + JWTFile: utils.StringPointer("jwtfile"), } nodeID := "node_id1" @@ -206,9 +244,9 @@ func TestGetNatsOptsJWT(t *testing.T) { } func TestGetNatsOptsClientCert(t *testing.T) { - opts := &config.EventExporterOpts{ - NATSClientCertificate: utils.StringPointer("client_cert"), - NATSClientKey: utils.StringPointer("client_key"), + opts := &config.NATSOpts{ + ClientCertificate: utils.StringPointer("client_cert"), + ClientKey: utils.StringPointer("client_key"), } nodeID := "node_id1" connTimeout := 2 * time.Second @@ -227,8 +265,8 @@ func TestGetNatsOptsClientCert(t *testing.T) { // } // no key error - opts = &config.EventExporterOpts{ - NATSClientCertificate: utils.StringPointer("client_cert"), + opts = &config.NATSOpts{ + ClientCertificate: utils.StringPointer("client_cert"), } _, err = GetNatsOpts(opts, nodeID, connTimeout) if err.Error() != "has certificate but no key" { @@ -236,8 +274,8 @@ func TestGetNatsOptsClientCert(t *testing.T) { } // no certificate error - opts = &config.EventExporterOpts{ - NATSClientKey: utils.StringPointer("client_key"), + opts = &config.NATSOpts{ + ClientKey: utils.StringPointer("client_key"), } _, err = GetNatsOpts(opts, nodeID, connTimeout) if err.Error() != "has key but no certificate" { diff --git a/ees/poster_it_test.go b/ees/poster_it_test.go index 818285ec5..5f2403b93 100644 --- a/ees/poster_it_test.go +++ b/ees/poster_it_test.go @@ -73,7 +73,14 @@ func TestHttpJsonPoster(t *testing.T) { ExportPath: "http://localhost:8080/invalid", Attempts: 3, FailedPostsDir: "/tmp", - Opts: &config.EventExporterOpts{}, + Opts: &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }, }, config.CgrConfig(), nil, nil) if err != nil { t.Error(err) @@ -81,7 +88,14 @@ func TestHttpJsonPoster(t *testing.T) { if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: jsn, Header: make(http.Header)}, ""); err == nil { t.Error("Expected error") } - AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.MetaHTTPjsonMap, jsn, &config.EventExporterOpts{}) + AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.MetaHTTPjsonMap, jsn, &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }) time.Sleep(5 * time.Millisecond) fs, err := filepath.Glob("/tmp/EEs*") if err != nil { @@ -111,7 +125,14 @@ func TestHttpBytesPoster(t *testing.T) { ExportPath: "http://localhost:8080/invalid", Attempts: 3, FailedPostsDir: "/tmp", - Opts: &config.EventExporterOpts{}, + Opts: &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }, }, config.CgrConfig(), nil, nil) if err != nil { t.Error(err) @@ -119,7 +140,14 @@ func TestHttpBytesPoster(t *testing.T) { if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: content, Header: make(http.Header)}, ""); err == nil { t.Error("Expected error") } - AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.ContentJSON, content, &config.EventExporterOpts{}) + AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.ContentJSON, content, &config.EventExporterOpts{ + AMQP: &config.AMQPOpts{}, + Els: &config.ElsOpts{}, + AWS: &config.AWSOpts{}, + NATS: &config.NATSOpts{}, + Kafka: &config.KafkaOpts{}, + RPC: &config.RPCOpts{}, + }) time.Sleep(5 * time.Millisecond) fs, err := filepath.Glob("/tmp/test2*") if err != nil { @@ -156,10 +184,10 @@ func TestSQSPoster(t *testing.T) { awsSecret := "replace-this-with-your-secret" qname := "cgrates-cdrs" - opts := &config.EventExporterOpts{ - AWSRegion: utils.StringPointer(region), - AWSKey: utils.StringPointer(awsKey), - AWSSecret: utils.StringPointer(awsSecret), + opts := &config.AWSOpts{ + Region: utils.StringPointer(region), + Key: utils.StringPointer(awsKey), + Secret: utils.StringPointer(awsSecret), SQSQueueID: utils.StringPointer(qname), } //##################################### @@ -169,7 +197,9 @@ func TestSQSPoster(t *testing.T) { pstr := NewSQSee(&config.EventExporterCfg{ ExportPath: endpoint, Attempts: 5, - Opts: opts, + Opts: &config.EventExporterOpts{ + AWS: opts, + }, }, nil) if err := ExportWithAttempts(pstr, []byte(body), ""); err != nil { t.Fatal(err) @@ -239,10 +269,10 @@ func TestS3Poster(t *testing.T) { awsSecret := "replace-this-with-your-secret" qname := "cgrates-cdrs" - opts := &config.EventExporterOpts{ - AWSRegion: utils.StringPointer(region), - AWSKey: utils.StringPointer(awsKey), - AWSSecret: utils.StringPointer(awsSecret), + opts := &config.AWSOpts{ + Region: utils.StringPointer(region), + Key: utils.StringPointer(awsKey), + Secret: utils.StringPointer(awsSecret), SQSQueueID: utils.StringPointer(qname), } //##################################### @@ -252,7 +282,9 @@ func TestS3Poster(t *testing.T) { pstr := NewS3EE(&config.EventExporterCfg{ ExportPath: endpoint, Attempts: 5, - Opts: opts, + Opts: &config.EventExporterOpts{ + AWS: opts, + }, }, nil) if err := ExportWithAttempts(pstr, []byte(body), key); err != nil { t.Fatal(err) @@ -302,7 +334,9 @@ func TestAMQPv1Poster(t *testing.T) { endpoint := "amqps://RootManageSharedAccessKey:UlfIJ%2But11L0ZzA%2Fgpje8biFJeQihpWibJsUhaOi1DU%3D@cdrscgrates.servicebus.windows.net" qname := "cgrates-cdrs" opts := &config.EventExporterOpts{ - AMQPQueueID: utils.StringPointer(qname), + AMQP: &config.AMQPOpts{ + QueueID: utils.StringPointer(qname), + }, } //##################################### diff --git a/ees/poster_test.go b/ees/poster_test.go index 75ccd7c50..2282a90c4 100644 --- a/ees/poster_test.go +++ b/ees/poster_test.go @@ -38,11 +38,12 @@ func TestAMQPeeParseURL(t *testing.T) { routingKey: "CGRCDR", } opts := &config.EventExporterOpts{ - AMQPQueueID: utils.StringPointer("q1"), - AMQPExchange: utils.StringPointer("E1"), - AMQPRoutingKey: utils.StringPointer("CGRCDR"), - AMQPExchangeType: utils.StringPointer("fanout"), - } + AMQP: &config.AMQPOpts{ + QueueID: utils.StringPointer("q1"), + Exchange: utils.StringPointer("E1"), + RoutingKey: utils.StringPointer("CGRCDR"), + ExchangeType: utils.StringPointer("fanout"), + }} amqp.parseOpts(opts) if !reflect.DeepEqual(expected, amqp) { t.Errorf("Expected: %s ,received: %s", utils.ToJSON(expected), utils.ToJSON(amqp)) @@ -54,7 +55,9 @@ func TestKafkaParseURL(t *testing.T) { ExportPath: "127.0.0.1:9092", Attempts: 10, Opts: &config.EventExporterOpts{ - KafkaTopic: utils.StringPointer("cdr_billing"), + Kafka: &config.KafkaOpts{ + KafkaTopic: utils.StringPointer("cdr_billing"), + }, }, } exp := &KafkaEE{ diff --git a/ees/rpc.go b/ees/rpc.go index b3164bf54..86ddb4563 100644 --- a/ees/rpc.go +++ b/ees/rpc.go @@ -91,7 +91,8 @@ func (e *RPCee) PrepareMap(mp *utils.CGREvent) (any, error) { if mp.APIOpts == nil { mp.APIOpts = make(map[string]any) } - for i, v := range e.Cfg().Opts.RPCAPIOpts { + + for i, v := range e.Cfg().Opts.RPC.RPCAPIOpts { mp.APIOpts[i] = v } return mp, nil @@ -115,32 +116,33 @@ func (e *RPCee) PrepareOrderMap(oMp *utils.OrderedNavigableMap) (any, error) { } func (e *RPCee) parseOpts() (err error) { - if e.cfg.Opts.RPCCodec != nil { - e.codec = *e.cfg.Opts.RPCCodec + + if e.cfg.Opts.RPC.RPCCodec != nil { + e.codec = *e.cfg.Opts.RPC.RPCCodec } - if e.cfg.Opts.ServiceMethod != nil { - e.serviceMethod = *e.cfg.Opts.ServiceMethod + if e.cfg.Opts.RPC.ServiceMethod != nil { + e.serviceMethod = *e.cfg.Opts.RPC.ServiceMethod } - if e.cfg.Opts.KeyPath != nil { - e.keyPath = *e.cfg.Opts.KeyPath + if e.cfg.Opts.RPC.KeyPath != nil { + e.keyPath = *e.cfg.Opts.RPC.KeyPath } - if e.cfg.Opts.CertPath != nil { - e.certPath = *e.cfg.Opts.CertPath + if e.cfg.Opts.RPC.CertPath != nil { + e.certPath = *e.cfg.Opts.RPC.CertPath } - if e.cfg.Opts.CAPath != nil { - e.caPath = *e.cfg.Opts.CAPath + if e.cfg.Opts.RPC.CAPath != nil { + e.caPath = *e.cfg.Opts.RPC.CAPath } - if e.cfg.Opts.TLS != nil { - e.tls = *e.cfg.Opts.TLS + if e.cfg.Opts.RPC.TLS != nil { + e.tls = *e.cfg.Opts.RPC.TLS } - if e.cfg.Opts.ConnIDs != nil { - e.connIDs = *e.cfg.Opts.ConnIDs + if e.cfg.Opts.RPC.ConnIDs != nil { + e.connIDs = *e.cfg.Opts.RPC.ConnIDs } - if e.cfg.Opts.RPCConnTimeout != nil { - e.connTimeout = *e.cfg.Opts.RPCConnTimeout + if e.cfg.Opts.RPC.RPCConnTimeout != nil { + e.connTimeout = *e.cfg.Opts.RPC.RPCConnTimeout } - if e.cfg.Opts.RPCReplyTimeout != nil { - e.replyTimeout = *e.cfg.Opts.RPCReplyTimeout + if e.cfg.Opts.RPC.RPCReplyTimeout != nil { + e.replyTimeout = *e.cfg.Opts.RPC.RPCReplyTimeout } return } diff --git a/ees/rpc_test.go b/ees/rpc_test.go index 44a0f4c0e..1bbab90eb 100644 --- a/ees/rpc_test.go +++ b/ees/rpc_test.go @@ -195,15 +195,17 @@ func TestRPCParseOpts(t *testing.T) { rpcEE := &RPCee{ cfg: &config.EventExporterCfg{ Opts: &config.EventExporterOpts{ - RPCCodec: utils.StringPointer("RPCCodec"), - ServiceMethod: utils.StringPointer("ServiceMethod"), - KeyPath: utils.StringPointer("KeyPath"), - CertPath: utils.StringPointer("CertPath"), - CAPath: utils.StringPointer("CAPath"), - TLS: utils.BoolPointer(true), - ConnIDs: utils.SliceStringPointer([]string{"ConnID"}), - RPCConnTimeout: utils.DurationPointer(time.Second), - RPCReplyTimeout: utils.DurationPointer(time.Minute), + RPC: &config.RPCOpts{ + RPCCodec: utils.StringPointer("RPCCodec"), + ServiceMethod: utils.StringPointer("ServiceMethod"), + KeyPath: utils.StringPointer("KeyPath"), + CertPath: utils.StringPointer("CertPath"), + CAPath: utils.StringPointer("CAPath"), + TLS: utils.BoolPointer(true), + ConnIDs: utils.SliceStringPointer([]string{"ConnID"}), + RPCConnTimeout: utils.DurationPointer(time.Second), + RPCReplyTimeout: utils.DurationPointer(time.Minute), + }, }, }, } @@ -211,15 +213,17 @@ func TestRPCParseOpts(t *testing.T) { exp := &RPCee{ cfg: &config.EventExporterCfg{ Opts: &config.EventExporterOpts{ - RPCCodec: utils.StringPointer("RPCCodec"), - ServiceMethod: utils.StringPointer("ServiceMethod"), - KeyPath: utils.StringPointer("KeyPath"), - CertPath: utils.StringPointer("CertPath"), - CAPath: utils.StringPointer("CAPath"), - TLS: utils.BoolPointer(true), - ConnIDs: utils.SliceStringPointer([]string{"ConnID"}), - RPCConnTimeout: utils.DurationPointer(time.Second), - RPCReplyTimeout: utils.DurationPointer(time.Minute), + RPC: &config.RPCOpts{ + RPCCodec: utils.StringPointer("RPCCodec"), + ServiceMethod: utils.StringPointer("ServiceMethod"), + KeyPath: utils.StringPointer("KeyPath"), + CertPath: utils.StringPointer("CertPath"), + CAPath: utils.StringPointer("CAPath"), + TLS: utils.BoolPointer(true), + ConnIDs: utils.SliceStringPointer([]string{"ConnID"}), + RPCConnTimeout: utils.DurationPointer(time.Second), + RPCReplyTimeout: utils.DurationPointer(time.Minute), + }, }, }, codec: "RPCCodec", diff --git a/ees/s3.go b/ees/s3.go index d86f24a75..640279881 100644 --- a/ees/s3.go +++ b/ees/s3.go @@ -62,23 +62,25 @@ type S3EE struct { func (pstr *S3EE) parseOpts(opts *config.EventExporterOpts) { pstr.bucket = utils.DefaultQueueID - if opts.S3BucketID != nil { - pstr.bucket = *opts.S3BucketID - } - if opts.S3FolderPath != nil { - pstr.folderPath = *opts.S3FolderPath - } - if opts.AWSRegion != nil { - pstr.awsRegion = *opts.AWSRegion - } - if opts.AWSKey != nil { - pstr.awsID = *opts.AWSKey - } - if opts.AWSSecret != nil { - pstr.awsKey = *opts.AWSSecret - } - if opts.AWSToken != nil { - pstr.awsToken = *opts.AWSToken + if s3Opts := opts.AWS; s3Opts != nil { + if s3Opts.S3BucketID != nil { + pstr.bucket = *s3Opts.S3BucketID + } + if s3Opts.S3FolderPath != nil { + pstr.folderPath = *s3Opts.S3FolderPath + } + if s3Opts.Region != nil { + pstr.awsRegion = *s3Opts.Region + } + if s3Opts.Key != nil { + pstr.awsID = *s3Opts.Key + } + if s3Opts.Secret != nil { + pstr.awsKey = *s3Opts.Secret + } + if s3Opts.Token != nil { + pstr.awsToken = *s3Opts.Token + } } } diff --git a/ees/s3_it_test.go b/ees/s3_it_test.go index ee1de71e5..69f8926dc 100644 --- a/ees/s3_it_test.go +++ b/ees/s3_it_test.go @@ -77,8 +77,8 @@ func testS3LoadConfig(t *testing.T) { } for _, value := range s3Cfg.EEsCfg().Exporters { if value.ID == "sqs_test_file" { - awsKey = *value.Opts.AWSKey - awsSecret = *value.Opts.AWSSecret + awsKey = *value.Opts.AWS.Key + awsSecret = *value.Opts.AWS.Secret } } } diff --git a/ees/sql.go b/ees/sql.go index 3015f9d4d..8514e78f8 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -72,16 +72,16 @@ func (sqlEe *SQLEe) initDialector() (err error) { password, _ := u.User.Password() dbname := utils.SQLDefaultDBName - if sqlEe.Cfg().Opts.SQLDBName != nil { - dbname = *sqlEe.Cfg().Opts.SQLDBName + if sqlEe.Cfg().Opts.SQL.DBName != nil { + dbname = *sqlEe.Cfg().Opts.SQL.DBName } ssl := utils.SQLDefaultSSLMode - if sqlEe.Cfg().Opts.PgSSLMode != nil { - ssl = *sqlEe.Cfg().Opts.PgSSLMode + if sqlEe.Cfg().Opts.SQL.PgSSLMode != nil { + ssl = *sqlEe.Cfg().Opts.SQL.PgSSLMode } // tableName is mandatory in opts - if sqlEe.Cfg().Opts.SQLTableName != nil { - sqlEe.tableName = *sqlEe.Cfg().Opts.SQLTableName + if sqlEe.Cfg().Opts.SQL.TableName != nil { + sqlEe.tableName = *sqlEe.Cfg().Opts.SQL.TableName } else { return utils.NewErrMandatoryIeMissing(utils.SQLTableNameOpt) } @@ -90,7 +90,7 @@ func (sqlEe *SQLEe) initDialector() (err error) { switch u.Scheme { case utils.MySQL: sqlEe.dialect = mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", - u.User.Username(), password, u.Hostname(), u.Port(), dbname) + engine.AppendToMysqlDSNOpts(sqlEe.Cfg().Opts.MYSQLDSNParams)) + u.User.Username(), password, u.Hostname(), u.Port(), dbname) + engine.AppendToMysqlDSNOpts(sqlEe.Cfg().Opts.SQL.MYSQLDSNParams)) case utils.Postgres: sqlEe.dialect = postgres.Open(fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", u.Hostname(), u.Port(), dbname, u.User.Username(), password, ssl)) default: @@ -99,7 +99,7 @@ func (sqlEe *SQLEe) initDialector() (err error) { return } -func openDB(dialect gorm.Dialector, opts *config.EventExporterOpts) (db *gorm.DB, sqlDB *sql.DB, err error) { +func openDB(dialect gorm.Dialector, opts *config.SQLOpts) (db *gorm.DB, sqlDB *sql.DB, err error) { if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil { return } @@ -107,14 +107,14 @@ func openDB(dialect gorm.Dialector, opts *config.EventExporterOpts) (db *gorm.DB return } - if opts.SQLMaxIdleConns != nil { - sqlDB.SetMaxIdleConns(*opts.SQLMaxIdleConns) + if opts.MaxIdleConns != nil { + sqlDB.SetMaxIdleConns(*opts.MaxIdleConns) } - if opts.SQLMaxOpenConns != nil { - sqlDB.SetMaxOpenConns(*opts.SQLMaxOpenConns) + if opts.MaxOpenConns != nil { + sqlDB.SetMaxOpenConns(*opts.MaxOpenConns) } - if opts.SQLConnMaxLifetime != nil { - sqlDB.SetConnMaxLifetime(*opts.SQLConnMaxLifetime) + if opts.ConnMaxLifetime != nil { + sqlDB.SetConnMaxLifetime(*opts.ConnMaxLifetime) } return @@ -125,7 +125,7 @@ func (sqlEe *SQLEe) Cfg() *config.EventExporterCfg { return sqlEe.cfg } func (sqlEe *SQLEe) Connect() (err error) { sqlEe.Lock() if sqlEe.db == nil || sqlEe.sqldb == nil { - sqlEe.db, sqlEe.sqldb, err = openDB(sqlEe.dialect, sqlEe.Cfg().Opts) + sqlEe.db, sqlEe.sqldb, err = openDB(sqlEe.dialect, sqlEe.Cfg().Opts.SQL) } sqlEe.Unlock() return diff --git a/ees/sql_it_test.go b/ees/sql_it_test.go index b183e8803..25b56b6bd 100644 --- a/ees/sql_it_test.go +++ b/ees/sql_it_test.go @@ -361,8 +361,8 @@ func testSqlEeVerifyExportedEvent2(t *testing.T) { func TestOpenDB1(t *testing.T) { dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, &config.EventExporterOpts{ - SQLMaxIdleConns: utils.IntPointer(2), + _, _, err := openDB(dialect, &config.SQLOpts{ + MaxIdleConns: utils.IntPointer(2), }) if err != nil { t.Error(err) @@ -372,8 +372,8 @@ func TestOpenDB1(t *testing.T) { func TestOpenDB2(t *testing.T) { dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, &config.EventExporterOpts{ - SQLMaxOpenConns: utils.IntPointer(2), + _, _, err := openDB(dialect, &config.SQLOpts{ + MaxOpenConns: utils.IntPointer(2), }) if err != nil { t.Error(err) @@ -383,8 +383,8 @@ func TestOpenDB2(t *testing.T) { func TestOpenDB3(t *testing.T) { dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, &config.EventExporterOpts{ - SQLConnMaxLifetime: utils.DurationPointer(2), + _, _, err := openDB(dialect, &config.SQLOpts{ + ConnMaxLifetime: utils.DurationPointer(2), }) if err != nil { t.Error(err) @@ -393,8 +393,8 @@ func TestOpenDB3(t *testing.T) { func TestSQLExportEvent1(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable") - cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("cgrates") + cgrCfg.EEsCfg().Exporters[0].Opts.SQL.TableName = utils.StringPointer("expTable") + cgrCfg.EEsCfg().Exporters[0].Opts.SQL.DBName = utils.StringPointer("cgrates") cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306` sqlEe, err := NewSQLEe(cgrCfg.EEsCfg().Exporters[0], nil) if err != nil { diff --git a/ees/sql_test.go b/ees/sql_test.go index 4d787f0fe..32efbc5b8 100644 --- a/ees/sql_test.go +++ b/ees/sql_test.go @@ -46,9 +46,9 @@ func TestSqlGetMetrics(t *testing.T) { func TestNewSQLeUrl(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable") - cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("postgres") - cgrCfg.EEsCfg().Exporters[0].Opts.PgSSLMode = utils.StringPointer("test") + cgrCfg.EEsCfg().Exporters[0].Opts.SQL.TableName = utils.StringPointer("expTable") + cgrCfg.EEsCfg().Exporters[0].Opts.SQL.DBName = utils.StringPointer("postgres") + cgrCfg.EEsCfg().Exporters[0].Opts.SQL.PgSSLMode = utils.StringPointer("test") sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], reqs: newConcReq(0), @@ -61,8 +61,8 @@ func TestNewSQLeUrl(t *testing.T) { func TestNewSQLeUrlSQL(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable") - cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("mysql") + cgrCfg.EEsCfg().Exporters[0].Opts.SQL.TableName = utils.StringPointer("expTable") + cgrCfg.EEsCfg().Exporters[0].Opts.SQL.DBName = utils.StringPointer("mysql") cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306` sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], @@ -79,8 +79,8 @@ func TestNewSQLeUrlSQL(t *testing.T) { func TestNewSQLeUrlPostgres(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable") - cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("postgres") + cgrCfg.EEsCfg().Exporters[0].Opts.SQL.TableName = utils.StringPointer("expTable") + cgrCfg.EEsCfg().Exporters[0].Opts.SQL.DBName = utils.StringPointer("postgres") cgrCfg.EEsCfg().Exporters[0].ExportPath = `postgres://cgrates:CGRateS.org@127.0.0.1:3306` sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], @@ -97,8 +97,8 @@ func TestNewSQLeUrlPostgres(t *testing.T) { func TestNewSQLeExportPathError(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable") - cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("postgres") + cgrCfg.EEsCfg().Exporters[0].Opts.SQL.TableName = utils.StringPointer("expTable") + cgrCfg.EEsCfg().Exporters[0].Opts.SQL.DBName = utils.StringPointer("postgres") cgrCfg.EEsCfg().Exporters[0].ExportPath = ":foo" sqlEe := &SQLEe{ cfg: cgrCfg.EEsCfg().Exporters[0], @@ -120,7 +120,7 @@ func TestOpenDBError2(t *testing.T) { tmp := logger.Default logger.Default = logger.Default.LogMode(logger.Silent) mckDialect := new(mockDialect2) - _, _, err := openDB(mckDialect, &config.EventExporterOpts{}) + _, _, err := openDB(mckDialect, &config.SQLOpts{}) errExpect := "invalid db" if err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) @@ -140,7 +140,7 @@ func TestOpenDBError3(t *testing.T) { tmp := logger.Default logger.Default = logger.Default.LogMode(logger.Silent) mckDialect := new(mockDialectErr) - _, _, err := openDB(mckDialect, &config.EventExporterOpts{}) + _, _, err := openDB(mckDialect, &config.SQLOpts{}) errExpect := "NOT_FOUND" if err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) diff --git a/ees/sqs.go b/ees/sqs.go index f20adc0c3..2c751bf94 100644 --- a/ees/sqs.go +++ b/ees/sqs.go @@ -60,21 +60,23 @@ type SQSee struct { } func (pstr *SQSee) parseOpts(opts *config.EventExporterOpts) { - pstr.queueID = utils.DefaultQueueID - if opts.SQSQueueID != nil { - pstr.queueID = *opts.SQSQueueID - } - if opts.AWSRegion != nil { - pstr.awsRegion = *opts.AWSRegion - } - if opts.AWSKey != nil { - pstr.awsID = *opts.AWSKey - } - if opts.AWSSecret != nil { - pstr.awsKey = *opts.AWSSecret - } - if opts.AWSToken != nil { - pstr.awsToken = *opts.AWSToken + if sqsOpts := opts.AWS; sqsOpts != nil { + pstr.queueID = utils.DefaultQueueID + if sqsOpts.SQSQueueID != nil { + pstr.queueID = *sqsOpts.SQSQueueID + } + if sqsOpts.Region != nil { + pstr.awsRegion = *sqsOpts.Region + } + if sqsOpts.Key != nil { + pstr.awsID = *sqsOpts.Key + } + if sqsOpts.Secret != nil { + pstr.awsKey = *sqsOpts.Secret + } + if sqsOpts.Token != nil { + pstr.awsToken = *sqsOpts.Token + } } } diff --git a/ees/sqs_it_test.go b/ees/sqs_it_test.go index 2a5ff5739..a5a3e64f5 100644 --- a/ees/sqs_it_test.go +++ b/ees/sqs_it_test.go @@ -77,8 +77,8 @@ func testSQSLoadConfig(t *testing.T) { } for _, value := range sqsCfg.EEsCfg().Exporters { if value.ID == "sqs_test_file" { - awsKey = *value.Opts.AWSKey - awsSecret = *value.Opts.AWSSecret + awsKey = *value.Opts.AWS.Key + awsSecret = *value.Opts.AWS.Secret } } } diff --git a/ers/libers.go b/ers/libers.go index 64b0983ed..54c873b25 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -31,156 +31,158 @@ import ( // getProcessOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExporterOpts) { + eeOpts = new(config.EventExporterOpts) if erOpts.AMQPExchangeProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AMQP == nil { + eeOpts.AMQP = new(config.AMQPOpts) } - eeOpts.AMQPExchange = erOpts.AMQPExchangeProcessed + eeOpts.AMQP.Exchange = erOpts.AMQPExchangeProcessed } if erOpts.AMQPExchangeTypeProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AMQP == nil { + eeOpts.AMQP = new(config.AMQPOpts) } - eeOpts.AMQPExchangeType = erOpts.AMQPExchangeTypeProcessed + eeOpts.AMQP.ExchangeType = erOpts.AMQPExchangeTypeProcessed } if erOpts.AMQPQueueIDProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AMQP == nil { + eeOpts.AMQP = new(config.AMQPOpts) } - eeOpts.AMQPQueueID = erOpts.AMQPQueueIDProcessed + eeOpts.AMQP.QueueID = erOpts.AMQPQueueIDProcessed } if erOpts.AMQPRoutingKeyProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AMQP == nil { + eeOpts.AMQP = new(config.AMQPOpts) } - eeOpts.AMQPRoutingKey = erOpts.AMQPRoutingKeyProcessed + eeOpts.AMQP.RoutingKey = erOpts.AMQPRoutingKeyProcessed } if erOpts.AMQPUsernameProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AMQP == nil { + eeOpts.AMQP = new(config.AMQPOpts) } - eeOpts.AMQPUsername = erOpts.AMQPUsernameProcessed + eeOpts.AMQP.Username = erOpts.AMQPUsernameProcessed } if erOpts.AMQPPasswordProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AMQP == nil { + eeOpts.AMQP = new(config.AMQPOpts) } - eeOpts.AMQPPassword = erOpts.AMQPPasswordProcessed + eeOpts.AMQP.Password = erOpts.AMQPPasswordProcessed } if erOpts.AWSKeyProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AWS == nil { + eeOpts.AWS = new(config.AWSOpts) } - eeOpts.AWSKey = erOpts.AWSKeyProcessed + eeOpts.AWS.Key = erOpts.AWSKeyProcessed } if erOpts.AWSRegionProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AWS == nil { + eeOpts.AWS = new(config.AWSOpts) } - eeOpts.AWSRegion = erOpts.AWSRegionProcessed + eeOpts.AWS.Region = erOpts.AWSRegionProcessed } if erOpts.AWSSecretProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AWS == nil { + eeOpts.AWS = new(config.AWSOpts) } - eeOpts.AWSSecret = erOpts.AWSSecretProcessed + eeOpts.AWS.Secret = erOpts.AWSSecretProcessed } if erOpts.AWSTokenProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AWS == nil { + eeOpts.AWS = new(config.AWSOpts) } - eeOpts.AWSToken = erOpts.AWSTokenProcessed + eeOpts.AWS.Token = erOpts.AWSTokenProcessed } if erOpts.KafkaTopicProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.Kafka == nil { + eeOpts.Kafka = new(config.KafkaOpts) } - eeOpts.KafkaTopic = erOpts.KafkaTopicProcessed + eeOpts.Kafka.KafkaTopic = erOpts.KafkaTopicProcessed } if erOpts.NATSCertificateAuthorityProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.NATS == nil { + eeOpts.NATS = new(config.NATSOpts) } - eeOpts.NATSCertificateAuthority = erOpts.NATSCertificateAuthorityProcessed + eeOpts.NATS.CertificateAuthority = erOpts.NATSCertificateAuthorityProcessed } if erOpts.NATSClientCertificateProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.NATS == nil { + eeOpts.NATS = new(config.NATSOpts) } - eeOpts.NATSClientCertificate = erOpts.NATSClientCertificateProcessed + eeOpts.NATS.ClientCertificate = erOpts.NATSClientCertificateProcessed } if erOpts.NATSClientKeyProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.NATS == nil { + eeOpts.NATS = new(config.NATSOpts) } - eeOpts.NATSClientKey = erOpts.NATSClientKeyProcessed + eeOpts.NATS.ClientKey = erOpts.NATSClientKeyProcessed } if erOpts.NATSJWTFileProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.NATS == nil { + eeOpts.NATS = new(config.NATSOpts) } - eeOpts.NATSJWTFile = erOpts.NATSJWTFileProcessed + eeOpts.NATS.JWTFile = erOpts.NATSJWTFileProcessed } if erOpts.NATSJetStreamMaxWaitProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.NATS == nil { + eeOpts.NATS = new(config.NATSOpts) } - eeOpts.NATSJetStreamMaxWait = erOpts.NATSJetStreamMaxWaitProcessed + eeOpts.NATS.JetStreamMaxWait = erOpts.NATSJetStreamMaxWaitProcessed } if erOpts.NATSJetStreamProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.NATS == nil { + eeOpts.NATS = new(config.NATSOpts) } - eeOpts.NATSJetStream = erOpts.NATSJetStreamProcessed + eeOpts.NATS.JetStream = erOpts.NATSJetStreamProcessed } if erOpts.NATSSeedFileProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.NATS == nil { + eeOpts.NATS = new(config.NATSOpts) } - eeOpts.NATSSeedFile = erOpts.NATSSeedFileProcessed + eeOpts.NATS.SeedFile = erOpts.NATSSeedFileProcessed } if erOpts.NATSSubjectProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.NATS == nil { + eeOpts.NATS = new(config.NATSOpts) } - eeOpts.NATSSubject = erOpts.NATSSubjectProcessed + eeOpts.NATS.Subject = erOpts.NATSSubjectProcessed } if erOpts.S3BucketIDProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AWS == nil { + eeOpts.AWS = new(config.AWSOpts) } - eeOpts.S3BucketID = erOpts.S3BucketIDProcessed + eeOpts.AWS.S3BucketID = erOpts.S3BucketIDProcessed } if erOpts.S3FolderPathProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AWS == nil { + eeOpts.AWS = new(config.AWSOpts) } - eeOpts.S3FolderPath = erOpts.S3FolderPathProcessed + eeOpts.AWS.S3FolderPath = erOpts.S3FolderPathProcessed } if erOpts.SQLDBNameProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.SQL == nil { + eeOpts.SQL = new(config.SQLOpts) } - eeOpts.SQLDBName = erOpts.SQLDBNameProcessed + eeOpts.SQL.DBName = erOpts.SQLDBNameProcessed } if erOpts.SQLTableNameProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.SQL == nil { + eeOpts.SQL = new(config.SQLOpts) } - eeOpts.SQLTableName = erOpts.SQLTableNameProcessed + eeOpts.SQL.TableName = erOpts.SQLTableNameProcessed } if erOpts.SQSQueueIDProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.AWS == nil { + eeOpts.AWS = new(config.AWSOpts) } - eeOpts.SQSQueueID = erOpts.SQSQueueIDProcessed + eeOpts.AWS.SQSQueueID = erOpts.SQSQueueIDProcessed } if erOpts.PgSSLModeProcessed != nil { - if eeOpts == nil { - eeOpts = new(config.EventExporterOpts) + if eeOpts.SQL == nil { + eeOpts.SQL = new(config.SQLOpts) } - eeOpts.PgSSLMode = erOpts.PgSSLModeProcessed + eeOpts.SQL.PgSSLMode = erOpts.PgSSLModeProcessed } + fmt.Println(utils.ToJSON(eeOpts)) return } diff --git a/ers/libers_test.go b/ers/libers_test.go index b2bf78dad..b8d625e0a 100644 --- a/ers/libers_test.go +++ b/ers/libers_test.go @@ -32,7 +32,9 @@ func TestGetProcessOptions(t *testing.T) { } result := getProcessOptions(opts) expected := &config.EventExporterOpts{ - AMQPQueueID: utils.StringPointer("processed"), + AMQP: &config.AMQPOpts{ + QueueID: utils.StringPointer("processed"), + }, } if !reflect.DeepEqual(result, expected) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) diff --git a/ers/sql.go b/ers/sql.go index 51ac410f5..ec3fcaee6 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -325,19 +325,20 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts *config.EventReader outPort = oURL.Port() } - outDBname = utils.SQLDefaultDBName - if processedOpt.SQLDBName != nil { - outDBname = *processedOpt.SQLDBName + if processedSql := processedOpt.SQL; processedSql != nil { + outDBname = utils.SQLDefaultDBName + if processedSql.DBName != nil { + outDBname = *processedSql.DBName + } + outSSL = utils.SQLDefaultSSLMode + if processedSql.PgSSLMode != nil { + outSSL = *processedSql.PgSSLMode + } + rdr.expTableName = utils.CDRsTBL + if processedSql.TableName != nil { + rdr.expTableName = *processedSql.TableName + } } - outSSL = utils.SQLDefaultSSLMode - if processedOpt.PgSSLMode != nil { - outSSL = *processedOpt.PgSSLMode - } - rdr.expTableName = utils.CDRsTBL - if processedOpt.SQLTableName != nil { - rdr.expTableName = *processedOpt.SQLTableName - } - switch rdr.expConnType { case utils.MySQL: rdr.expConnString = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",