Make ers opts into structs

This commit is contained in:
ionutboangiu
2021-10-27 13:14:58 +03:00
committed by Dan Christian Bogos
parent 801c8b06c0
commit 75353e759d
2 changed files with 636 additions and 17 deletions

View File

@@ -46,7 +46,7 @@ var (
getDftRemHstCfg = func() *RemoteHost { return new(RemoteHost) }
getDftEvExpCfg = func() *EventExporterCfg { return &EventExporterCfg{Opts: &EventExporterOpts{}} }
getDftEvRdrCfg = func() *EventReaderCfg { return &EventReaderCfg{Opts: make(map[string]interface{})} }
getDftEvRdrCfg = func() *EventReaderCfg { return &EventReaderCfg{Opts: &EventReaderOpts{}} }
)
func init() {

View File

@@ -127,6 +127,68 @@ func (erS ERsCfg) AsMapInterface(separator string) interface{} {
return mp
}
type EventReaderOpts struct {
PartialPath *string
PartialCacheAction *string
PartialOrderField *string
PartialCSVFieldSeparator *string
CSVRowLength *int
CSVFieldSeparator *string
CSVHeaderDefineChar *string
CSVLazyQuotes *bool
XMLRootPath *string
AMQPQueueID *string
AMQPQueueIDProcessed *string
AMQPConsumerTag *string
AMQPExchange *string
AMQPExchangeType *string
AMQPRoutingKey *string
AMQPExchangeProcessed *string
AMQPExchangeTypeProcessed *string
AMQPRoutingKeyProcessed *string
KafkaTopic *string
KafkaGroupID *string
KafkaMaxWait *time.Duration
KafkaTopicProcessed *string
SQLDBName *string
SQLTableName *string
SSLMode *string
SQLDBNameProcessed *string
SQLTableNameProcessed *string
SSLModeProcessed *string
AWSRegion *string
AWSKey *string
AWSSecret *string
AWSToken *string
AWSRegionProcessed *string
AWSKeyProcessed *string
AWSSecretProcessed *string
AWSTokenProcessed *string
SQSQueueID *string
SQSQueueIDProcessed *string
S3BucketID *string
S3FolderPathProcessed *string
S3BucketIDProcessed *string
NATSJetStream *bool
NATSConsumerName *string
NATSSubject *string
NATSQueueID *string
NATSJWTFile *string
NATSSeedFile *string
NATSCertificateAuthority *string
NATSClientCertificate *string
NATSClientKey *string
NATSJetStreamMaxWait *time.Duration
NATSJetStreamProcessed *bool
NATSSubjectProcessed *string
NATSJWTFileProcessed *string
NATSSeedFileProcessed *string
NATSCertificateAuthorityProcessed *string
NATSClientCertificateProcessed *string
NATSClientKeyProcessed *string
NATSJetStreamMaxWaitProcessed *time.Duration
}
// EventReaderCfg the event for the Event Reader
type EventReaderCfg struct {
ID string
@@ -135,7 +197,7 @@ type EventReaderCfg struct {
ConcurrentReqs int
SourcePath string
ProcessedPath string
Opts map[string]interface{}
Opts *EventReaderOpts
Tenant RSRParsers
Timezone string
Filters []string
@@ -145,6 +207,202 @@ type EventReaderCfg struct {
CacheDumpFields []*FCTemplate
}
func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
if jsnCfg == nil {
return
}
if jsnCfg.PartialPath != nil {
erOpts.PartialPath = jsnCfg.PartialPath
}
if jsnCfg.PartialCacheAction != nil {
erOpts.PartialCacheAction = jsnCfg.PartialCacheAction
}
if jsnCfg.PartialOrderField != nil {
erOpts.PartialOrderField = jsnCfg.PartialOrderField
}
if jsnCfg.PartialCSVFieldSeparator != nil {
erOpts.PartialCSVFieldSeparator = jsnCfg.PartialCSVFieldSeparator
}
if jsnCfg.CSVRowLength != nil {
erOpts.CSVRowLength = jsnCfg.CSVRowLength
}
if jsnCfg.CSVFieldSeparator != nil {
erOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator
}
if jsnCfg.CSVHeaderDefineChar != nil {
erOpts.CSVHeaderDefineChar = jsnCfg.CSVHeaderDefineChar
}
if jsnCfg.CSVLazyQuotes != nil {
erOpts.CSVLazyQuotes = jsnCfg.CSVLazyQuotes
}
if jsnCfg.XMLRootPath != nil {
erOpts.XMLRootPath = jsnCfg.XMLRootPath
}
if jsnCfg.AMQPQueueID != nil {
erOpts.AMQPQueueID = jsnCfg.AMQPQueueID
}
if jsnCfg.AMQPQueueIDProcessed != nil {
erOpts.AMQPQueueIDProcessed = jsnCfg.AMQPQueueIDProcessed
}
if jsnCfg.AMQPConsumerTag != nil {
erOpts.AMQPConsumerTag = jsnCfg.AMQPConsumerTag
}
if jsnCfg.AMQPExchange != nil {
erOpts.AMQPExchange = jsnCfg.AMQPExchange
}
if jsnCfg.AMQPExchangeType != nil {
erOpts.AMQPExchangeType = jsnCfg.AMQPExchangeType
}
if jsnCfg.AMQPRoutingKey != nil {
erOpts.AMQPRoutingKey = jsnCfg.AMQPRoutingKey
}
if jsnCfg.AMQPExchangeProcessed != nil {
erOpts.AMQPExchangeProcessed = jsnCfg.AMQPExchangeProcessed
}
if jsnCfg.AMQPExchangeTypeProcessed != nil {
erOpts.AMQPExchangeTypeProcessed = jsnCfg.AMQPExchangeTypeProcessed
}
if jsnCfg.AMQPRoutingKeyProcessed != nil {
erOpts.AMQPRoutingKeyProcessed = jsnCfg.AMQPRoutingKeyProcessed
}
if jsnCfg.KafkaTopic != nil {
erOpts.KafkaTopic = jsnCfg.KafkaTopic
}
if jsnCfg.KafkaGroupID != nil {
erOpts.KafkaGroupID = jsnCfg.KafkaGroupID
}
if jsnCfg.KafkaMaxWait != nil {
var kafkaMaxWait time.Duration
if kafkaMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.KafkaMaxWait); err != nil {
return
}
erOpts.KafkaMaxWait = utils.DurationPointer(kafkaMaxWait)
}
if jsnCfg.KafkaTopicProcessed != nil {
erOpts.KafkaTopicProcessed = jsnCfg.KafkaTopicProcessed
}
if jsnCfg.SQLDBName != nil {
erOpts.SQLDBName = jsnCfg.SQLDBName
}
if jsnCfg.SQLTableName != nil {
erOpts.SQLTableName = jsnCfg.SQLTableName
}
if jsnCfg.SSLMode != nil {
erOpts.SSLMode = jsnCfg.SSLMode
}
if jsnCfg.SQLDBNameProcessed != nil {
erOpts.SQLDBNameProcessed = jsnCfg.SQLDBNameProcessed
}
if jsnCfg.SQLTableNameProcessed != nil {
erOpts.SQLTableNameProcessed = jsnCfg.SQLTableNameProcessed
}
if jsnCfg.SSLModeProcessed != nil {
erOpts.SSLModeProcessed = jsnCfg.SSLModeProcessed
}
if jsnCfg.AWSRegion != nil {
erOpts.AWSRegion = jsnCfg.AWSRegion
}
if jsnCfg.AWSKey != nil {
erOpts.AWSKey = jsnCfg.AWSKey
}
if jsnCfg.AWSSecret != nil {
erOpts.AWSSecret = jsnCfg.AWSSecret
}
if jsnCfg.AWSToken != nil {
erOpts.AWSToken = jsnCfg.AWSToken
}
if jsnCfg.AWSRegionProcessed != nil {
erOpts.AWSRegionProcessed = jsnCfg.AWSRegionProcessed
}
if jsnCfg.AWSKeyProcessed != nil {
erOpts.AWSKeyProcessed = jsnCfg.AWSKeyProcessed
}
if jsnCfg.AWSSecretProcessed != nil {
erOpts.AWSSecretProcessed = jsnCfg.AWSSecretProcessed
}
if jsnCfg.AWSTokenProcessed != nil {
erOpts.AWSTokenProcessed = jsnCfg.AWSTokenProcessed
}
if jsnCfg.SQSQueueID != nil {
erOpts.SQSQueueID = jsnCfg.SQSQueueID
}
if jsnCfg.SQSQueueIDProcessed != nil {
erOpts.SQSQueueIDProcessed = jsnCfg.SQSQueueIDProcessed
}
if jsnCfg.S3BucketID != nil {
erOpts.S3BucketID = jsnCfg.S3BucketID
}
if jsnCfg.S3FolderPathProcessed != nil {
erOpts.S3FolderPathProcessed = jsnCfg.S3FolderPathProcessed
}
if jsnCfg.S3BucketIDProcessed != nil {
erOpts.S3BucketIDProcessed = jsnCfg.S3BucketIDProcessed
}
if jsnCfg.NATSJetStream != nil {
erOpts.NATSJetStream = jsnCfg.NATSJetStream
}
if jsnCfg.NATSConsumerName != nil {
erOpts.NATSConsumerName = jsnCfg.NATSConsumerName
}
if jsnCfg.NATSSubject != nil {
erOpts.NATSSubject = jsnCfg.NATSSubject
}
if jsnCfg.NATSQueueID != nil {
erOpts.NATSQueueID = jsnCfg.NATSQueueID
}
if jsnCfg.NATSJWTFile != nil {
erOpts.NATSJWTFile = jsnCfg.NATSJWTFile
}
if jsnCfg.NATSSeedFile != nil {
erOpts.NATSSeedFile = jsnCfg.NATSSeedFile
}
if jsnCfg.NATSCertificateAuthority != nil {
erOpts.NATSCertificateAuthority = jsnCfg.NATSCertificateAuthority
}
if jsnCfg.NATSClientCertificate != nil {
erOpts.NATSClientCertificate = jsnCfg.NATSClientCertificate
}
if jsnCfg.NATSClientKey != nil {
erOpts.NATSClientKey = jsnCfg.NATSClientKey
}
if jsnCfg.NATSJetStreamMaxWait != nil {
var jetStreamMaxWait time.Duration
if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil {
return
}
erOpts.NATSJetStreamMaxWait = utils.DurationPointer(jetStreamMaxWait)
}
if jsnCfg.NATSJetStreamProcessed != nil {
erOpts.NATSJetStreamProcessed = jsnCfg.NATSJetStreamProcessed
}
if jsnCfg.NATSSubjectProcessed != nil {
erOpts.NATSSubjectProcessed = jsnCfg.NATSSubjectProcessed
}
if jsnCfg.NATSJWTFileProcessed != nil {
erOpts.NATSJWTFileProcessed = jsnCfg.NATSJWTFileProcessed
}
if jsnCfg.NATSSeedFileProcessed != nil {
erOpts.NATSSeedFileProcessed = jsnCfg.NATSSeedFileProcessed
}
if jsnCfg.NATSCertificateAuthorityProcessed != nil {
erOpts.NATSCertificateAuthorityProcessed = jsnCfg.NATSCertificateAuthorityProcessed
}
if jsnCfg.NATSClientCertificateProcessed != nil {
erOpts.NATSClientCertificateProcessed = jsnCfg.NATSClientCertificateProcessed
}
if jsnCfg.NATSClientKeyProcessed != nil {
erOpts.NATSClientKeyProcessed = jsnCfg.NATSClientKeyProcessed
}
if jsnCfg.NATSJetStreamMaxWaitProcessed != nil {
var jetStreamMaxWait time.Duration
if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWaitProcessed); err != nil {
return
}
erOpts.NATSJetStreamMaxWaitProcessed = utils.DurationPointer(jetStreamMaxWait)
}
return
}
func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplates map[string][]*FCTemplate, sep string) (err error) {
if jsnCfg == nil {
return
@@ -214,13 +472,75 @@ func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplat
}
}
if jsnCfg.Opts != nil {
for k, v := range jsnCfg.Opts {
er.Opts[k] = v
}
err = er.Opts.loadFromJSONCfg(jsnCfg.Opts)
}
return
}
func (erOpts *EventReaderOpts) Clone() *EventReaderOpts {
return &EventReaderOpts{
PartialPath: erOpts.PartialPath,
PartialCacheAction: erOpts.PartialCacheAction,
PartialOrderField: erOpts.PartialOrderField,
PartialCSVFieldSeparator: erOpts.PartialCSVFieldSeparator,
CSVRowLength: erOpts.CSVRowLength,
CSVFieldSeparator: erOpts.CSVFieldSeparator,
CSVHeaderDefineChar: erOpts.CSVHeaderDefineChar,
CSVLazyQuotes: erOpts.CSVLazyQuotes,
XMLRootPath: erOpts.XMLRootPath,
AMQPQueueID: erOpts.AMQPQueueID,
AMQPQueueIDProcessed: erOpts.AMQPQueueIDProcessed,
AMQPConsumerTag: erOpts.AMQPConsumerTag,
AMQPExchange: erOpts.AMQPExchange,
AMQPExchangeType: erOpts.AMQPExchangeType,
AMQPRoutingKey: erOpts.AMQPRoutingKey,
AMQPExchangeProcessed: erOpts.AMQPExchangeProcessed,
AMQPExchangeTypeProcessed: erOpts.AMQPExchangeTypeProcessed,
AMQPRoutingKeyProcessed: erOpts.AMQPRoutingKeyProcessed,
KafkaTopic: erOpts.KafkaTopic,
KafkaGroupID: erOpts.KafkaGroupID,
KafkaMaxWait: erOpts.KafkaMaxWait,
KafkaTopicProcessed: erOpts.KafkaTopicProcessed,
SQLDBName: erOpts.SQLDBName,
SQLTableName: erOpts.SQLTableName,
SSLMode: erOpts.SSLMode,
SQLDBNameProcessed: erOpts.SQLDBNameProcessed,
SQLTableNameProcessed: erOpts.SQLTableNameProcessed,
SSLModeProcessed: erOpts.SSLModeProcessed,
AWSRegion: erOpts.AWSRegion,
AWSKey: erOpts.AWSKey,
AWSSecret: erOpts.AWSSecret,
AWSToken: erOpts.AWSToken,
AWSRegionProcessed: erOpts.AWSRegionProcessed,
AWSKeyProcessed: erOpts.AWSKeyProcessed,
AWSSecretProcessed: erOpts.AWSSecretProcessed,
AWSTokenProcessed: erOpts.AWSTokenProcessed,
SQSQueueID: erOpts.SQSQueueID,
SQSQueueIDProcessed: erOpts.SQSQueueIDProcessed,
S3BucketID: erOpts.S3BucketID,
S3FolderPathProcessed: erOpts.S3FolderPathProcessed,
S3BucketIDProcessed: erOpts.S3BucketIDProcessed,
NATSJetStream: erOpts.NATSJetStream,
NATSConsumerName: erOpts.NATSConsumerName,
NATSSubject: erOpts.NATSSubject,
NATSQueueID: erOpts.NATSQueueID,
NATSJWTFile: erOpts.NATSJWTFile,
NATSSeedFile: erOpts.NATSSeedFile,
NATSCertificateAuthority: erOpts.NATSCertificateAuthority,
NATSClientCertificate: erOpts.NATSClientCertificate,
NATSClientKey: erOpts.NATSClientKey,
NATSJetStreamMaxWait: erOpts.NATSJetStreamMaxWait,
NATSJetStreamProcessed: erOpts.NATSJetStreamProcessed,
NATSSubjectProcessed: erOpts.NATSSubjectProcessed,
NATSJWTFileProcessed: erOpts.NATSJWTFileProcessed,
NATSSeedFileProcessed: erOpts.NATSSeedFileProcessed,
NATSCertificateAuthorityProcessed: erOpts.NATSCertificateAuthorityProcessed,
NATSClientCertificateProcessed: erOpts.NATSClientCertificateProcessed,
NATSClientKeyProcessed: erOpts.NATSClientKeyProcessed,
NATSJetStreamMaxWaitProcessed: erOpts.NATSJetStreamMaxWaitProcessed,
}
}
// Clone returns a deep copy of EventReaderCfg
func (er EventReaderCfg) Clone() (cln *EventReaderCfg) {
cln = &EventReaderCfg{
@@ -233,7 +553,7 @@ func (er EventReaderCfg) Clone() (cln *EventReaderCfg) {
Tenant: er.Tenant.Clone(),
Timezone: er.Timezone,
Flags: er.Flags.Clone(),
Opts: make(map[string]interface{}),
Opts: er.Opts.Clone(),
}
if er.Filters != nil {
cln.Filters = utils.CloneStringSlice(er.Filters)
@@ -256,14 +576,72 @@ func (er EventReaderCfg) Clone() (cln *EventReaderCfg) {
cln.PartialCommitFields[idx] = fld.Clone()
}
}
for k, v := range er.Opts {
cln.Opts[k] = v
}
return
}
// AsMapInterface returns the config as a map[string]interface{}
func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string]interface{}) {
opts := map[string]interface{}{
utils.PartialPathOpt: er.Opts.PartialPath,
utils.PartialCacheActionOpt: er.Opts.PartialCacheAction,
utils.PartialOrderFieldOpt: er.Opts.PartialOrderField,
utils.PartialCSVFieldSepartorOpt: er.Opts.PartialCSVFieldSeparator,
utils.CSVRowLengthOpt: er.Opts.CSVRowLength,
utils.CSVFieldSepOpt: er.Opts.CSVFieldSeparator,
utils.HeaderDefineCharOpt: er.Opts.CSVHeaderDefineChar,
utils.CSVLazyQuotes: er.Opts.CSVLazyQuotes,
utils.XMLRootPathOpt: er.Opts.XMLRootPath,
utils.AMQPQueueID: er.Opts.AMQPQueueID,
utils.AMQPQueueIDProcessedCfg: er.Opts.AMQPQueueIDProcessed,
utils.AMQPConsumerTag: er.Opts.AMQPConsumerTag,
utils.AMQPExchange: er.Opts.AMQPExchange,
utils.AMQPExchangeType: er.Opts.AMQPExchangeType,
utils.AMQPRoutingKey: er.Opts.AMQPRoutingKey,
utils.AMQPExchangeProcessedCfg: er.Opts.AMQPExchangeProcessed,
utils.AMQPExchangeTypeProcessedCfg: er.Opts.AMQPExchangeTypeProcessed,
utils.AMQPRoutingKeyProcessedCfg: er.Opts.AMQPRoutingKeyProcessed,
utils.KafkaTopic: er.Opts.KafkaTopic,
utils.KafkaGroupID: er.Opts.KafkaGroupID,
utils.KafkaMaxWait: er.Opts.KafkaMaxWait,
utils.KafkaTopicProcessedCfg: er.Opts.KafkaTopicProcessed,
utils.SQLDBNameOpt: er.Opts.SQLDBName,
utils.SQLTableNameOpt: er.Opts.SQLTableName,
utils.SSLModeCfg: er.Opts.SSLMode,
utils.SQLDBNameProcessedCfg: er.Opts.SQLDBNameProcessed,
utils.SQLTableNameProcessedCfg: er.Opts.SQLTableNameProcessed,
utils.SSLModeProcessedCfg: er.Opts.SSLModeProcessed,
utils.AWSRegion: er.Opts.AWSRegion,
utils.AWSKey: er.Opts.AWSKey,
utils.AWSSecret: er.Opts.AWSSecret,
utils.AWSToken: er.Opts.AWSToken,
utils.AWSRegionProcessedCfg: er.Opts.AWSRegionProcessed,
utils.AWSKeyProcessedCfg: er.Opts.AWSKeyProcessed,
utils.AWSSecretProcessedCfg: er.Opts.AWSSecretProcessed,
utils.AWSTokenProcessedCfg: er.Opts.AWSTokenProcessed,
utils.SQSQueueID: er.Opts.SQSQueueID,
utils.SQSQueueIDProcessedCfg: er.Opts.SQSQueueIDProcessed,
utils.S3Bucket: er.Opts.S3BucketID,
utils.S3FolderPathProcessedCfg: er.Opts.S3FolderPathProcessed,
utils.S3BucketIDProcessedCfg: er.Opts.S3BucketIDProcessed,
utils.NatsJetStream: er.Opts.NATSJetStream,
utils.NatsConsumerName: er.Opts.NATSConsumerName,
utils.NatsSubject: er.Opts.NATSSubject,
utils.NatsQueueID: er.Opts.NATSQueueID,
utils.NatsJWTFile: er.Opts.NATSJWTFile,
utils.NatsSeedFile: er.Opts.NATSSeedFile,
utils.NatsCertificateAuthority: er.Opts.NATSCertificateAuthority,
utils.NatsClientCertificate: er.Opts.NATSClientCertificate,
utils.NatsClientKey: er.Opts.NATSClientKey,
utils.NatsJetStreamMaxWait: er.Opts.NATSJetStreamMaxWait,
utils.NATSJetStreamProcessedCfg: er.Opts.NATSJetStreamProcessed,
utils.NATSSubjectProcessedCfg: er.Opts.NATSSubjectProcessed,
utils.NATSJWTFileProcessedCfg: er.Opts.NATSJWTFileProcessed,
utils.NATSSeedFileProcessedCfg: er.Opts.NATSSeedFileProcessed,
utils.NATSCertificateAuthorityProcessedCfg: er.Opts.NATSCertificateAuthorityProcessed,
utils.NATSClientCertificateProcessed: er.Opts.NATSClientCertificateProcessed,
utils.NATSClientKeyProcessedCfg: er.Opts.NATSClientKeyProcessed,
utils.NATSJetStreamMaxWaitProcessedCfg: er.Opts.NATSJetStreamMaxWaitProcessed,
}
initialMP = map[string]interface{}{
utils.IDCfg: er.ID,
utils.TypeCfg: er.Type,
@@ -275,14 +653,9 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
utils.FiltersCfg: er.Filters,
utils.FlagsCfg: []string{},
utils.RunDelayCfg: "0",
utils.OptsCfg: opts,
}
opts := make(map[string]interface{})
for k, v := range er.Opts {
opts[k] = v
}
initialMP[utils.OptsCfg] = opts
if flags := er.Flags.SliceFlags(); flags != nil {
initialMP[utils.FlagsCfg] = flags
}
@@ -317,6 +690,68 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
return
}
type EventReaderOptsJson struct {
PartialPath *string
PartialCacheAction *string
PartialOrderField *string
PartialCSVFieldSeparator *string
CSVRowLength *int
CSVFieldSeparator *string
CSVHeaderDefineChar *string
CSVLazyQuotes *bool
XMLRootPath *string
AMQPQueueID *string
AMQPQueueIDProcessed *string
AMQPConsumerTag *string
AMQPExchange *string
AMQPExchangeType *string
AMQPRoutingKey *string
AMQPExchangeProcessed *string
AMQPExchangeTypeProcessed *string
AMQPRoutingKeyProcessed *string
KafkaTopic *string
KafkaGroupID *string
KafkaMaxWait *string
KafkaTopicProcessed *string
SQLDBName *string
SQLTableName *string
SSLMode *string
SQLDBNameProcessed *string
SQLTableNameProcessed *string
SSLModeProcessed *string
AWSRegion *string
AWSKey *string
AWSSecret *string
AWSToken *string
AWSRegionProcessed *string
AWSKeyProcessed *string
AWSSecretProcessed *string
AWSTokenProcessed *string
SQSQueueID *string
SQSQueueIDProcessed *string
S3BucketID *string
S3FolderPathProcessed *string
S3BucketIDProcessed *string
NATSJetStream *bool
NATSConsumerName *string
NATSSubject *string
NATSQueueID *string
NATSJWTFile *string
NATSSeedFile *string
NATSCertificateAuthority *string
NATSClientCertificate *string
NATSClientKey *string
NATSJetStreamMaxWait *string
NATSJetStreamProcessed *bool
NATSSubjectProcessed *string
NATSJWTFileProcessed *string
NATSSeedFileProcessed *string
NATSCertificateAuthorityProcessed *string
NATSClientCertificateProcessed *string
NATSClientKeyProcessed *string
NATSJetStreamMaxWaitProcessed *string
}
// EventReaderSJsonCfg is the configuration of a single EventReader
type EventReaderJsonCfg struct {
Id *string
@@ -325,7 +760,7 @@ type EventReaderJsonCfg struct {
Concurrent_requests *int
Source_path *string
Processed_path *string
Opts map[string]interface{}
Opts *EventReaderOptsJson
Tenant *string
Timezone *string
Filters *[]string
@@ -335,6 +770,190 @@ type EventReaderJsonCfg struct {
Cache_dump_fields *[]*FcTemplateJsonCfg
}
func diffEventReaderOptsJsonCfg(d *EventReaderOptsJson, v1, v2 *EventReaderOpts) *EventReaderOptsJson {
if d == nil {
d = new(EventReaderOptsJson)
}
if *v1.PartialPath != *v2.PartialPath {
d.PartialPath = v2.PartialPath
}
if *v1.PartialCacheAction != *v2.PartialCacheAction {
d.PartialCacheAction = v2.PartialCacheAction
}
if *v1.PartialOrderField != *v2.PartialOrderField {
d.PartialOrderField = v2.PartialOrderField
}
if *v1.PartialCSVFieldSeparator != *v2.PartialCSVFieldSeparator {
d.PartialCSVFieldSeparator = v2.PartialCSVFieldSeparator
}
if *v1.CSVRowLength != *v2.CSVRowLength {
d.CSVRowLength = v2.CSVRowLength
}
if *v1.CSVFieldSeparator != *v2.CSVFieldSeparator {
d.CSVFieldSeparator = v2.CSVFieldSeparator
}
if *v1.CSVHeaderDefineChar != *v2.CSVHeaderDefineChar {
d.CSVHeaderDefineChar = v2.CSVHeaderDefineChar
}
if *v1.CSVLazyQuotes != *v2.CSVLazyQuotes {
d.CSVLazyQuotes = v2.CSVLazyQuotes
}
if *v1.XMLRootPath != *v2.XMLRootPath {
d.XMLRootPath = v2.XMLRootPath
}
if *v1.AMQPQueueID != *v2.AMQPQueueID {
d.AMQPQueueID = v2.AMQPQueueID
}
if *v1.AMQPQueueIDProcessed != *v2.AMQPQueueIDProcessed {
d.AMQPQueueIDProcessed = v2.AMQPQueueIDProcessed
}
if *v1.AMQPConsumerTag != *v2.AMQPConsumerTag {
d.AMQPConsumerTag = v2.AMQPConsumerTag
}
if *v1.AMQPExchange != *v2.AMQPExchange {
d.AMQPExchange = v2.AMQPExchange
}
if *v1.AMQPExchangeType != *v2.AMQPExchangeType {
d.AMQPExchangeType = v2.AMQPExchangeType
}
if *v1.AMQPRoutingKey != *v2.AMQPRoutingKey {
d.AMQPRoutingKey = v2.AMQPRoutingKey
}
if *v1.AMQPExchangeProcessed != *v2.AMQPExchangeProcessed {
d.AMQPExchangeProcessed = v2.AMQPExchangeProcessed
}
if *v1.AMQPExchangeTypeProcessed != *v2.AMQPExchangeTypeProcessed {
d.AMQPExchangeTypeProcessed = v2.AMQPExchangeTypeProcessed
}
if *v1.AMQPRoutingKeyProcessed != *v2.AMQPRoutingKeyProcessed {
d.AMQPRoutingKeyProcessed = v2.AMQPRoutingKeyProcessed
}
if *v1.KafkaTopic != *v2.KafkaTopic {
d.KafkaTopic = v2.KafkaTopic
}
if *v1.KafkaGroupID != *v2.KafkaGroupID {
d.KafkaGroupID = v2.KafkaGroupID
}
if *v1.KafkaMaxWait != *v2.KafkaMaxWait {
d.KafkaMaxWait = utils.StringPointer(v2.KafkaMaxWait.String())
}
if *v1.KafkaTopicProcessed != *v2.KafkaTopicProcessed {
d.KafkaTopicProcessed = v2.KafkaTopicProcessed
}
if *v1.SQLDBName != *v2.SQLDBName {
d.SQLDBName = v2.SQLDBName
}
if *v1.SQLTableName != *v2.SQLTableName {
d.SQLTableName = v2.SQLTableName
}
if *v1.SSLMode != *v2.SSLMode {
d.SSLMode = v2.SSLMode
}
if *v1.SQLDBNameProcessed != *v2.SQLDBNameProcessed {
d.SQLDBNameProcessed = v2.SQLDBNameProcessed
}
if *v1.SQLTableNameProcessed != *v2.SQLTableNameProcessed {
d.SQLTableNameProcessed = v2.SQLTableNameProcessed
}
if *v1.SSLModeProcessed != *v2.SSLModeProcessed {
d.SSLModeProcessed = v2.SSLModeProcessed
}
if *v1.AWSRegion != *v2.AWSRegion {
d.AWSRegion = v2.AWSRegion
}
if *v1.AWSKey != *v2.AWSKey {
d.AWSKey = v2.AWSKey
}
if *v1.AWSSecret != *v2.AWSSecret {
d.AWSSecret = v2.AWSSecret
}
if *v1.AWSToken != *v2.AWSToken {
d.AWSToken = v2.AWSToken
}
if *v1.AWSRegionProcessed != *v2.AWSRegionProcessed {
d.AWSRegionProcessed = v2.AWSRegionProcessed
}
if *v1.AWSKeyProcessed != *v2.AWSKeyProcessed {
d.AWSKeyProcessed = v2.AWSKeyProcessed
}
if *v1.AWSSecretProcessed != *v2.AWSSecretProcessed {
d.AWSSecretProcessed = v2.AWSSecretProcessed
}
if *v1.AWSTokenProcessed != *v2.AWSTokenProcessed {
d.AWSTokenProcessed = v2.AWSTokenProcessed
}
if *v1.SQSQueueID != *v2.SQSQueueID {
d.SQSQueueID = v2.SQSQueueID
}
if *v1.SQSQueueIDProcessed != *v2.SQSQueueIDProcessed {
d.SQSQueueIDProcessed = v2.SQSQueueIDProcessed
}
if *v1.S3BucketID != *v2.S3BucketID {
d.S3BucketID = v2.S3BucketID
}
if *v1.S3FolderPathProcessed != *v2.S3FolderPathProcessed {
d.S3FolderPathProcessed = v2.S3FolderPathProcessed
}
if *v1.S3BucketIDProcessed != *v2.S3BucketIDProcessed {
d.S3BucketIDProcessed = v2.S3BucketIDProcessed
}
if *v1.NATSJetStream != *v2.NATSJetStream {
d.NATSJetStream = v2.NATSJetStream
}
if *v1.NATSConsumerName != *v2.NATSConsumerName {
d.NATSConsumerName = v2.NATSConsumerName
}
if *v1.NATSSubject != *v2.NATSSubject {
d.NATSSubject = v2.NATSSubject
}
if *v1.NATSQueueID != *v2.NATSQueueID {
d.NATSQueueID = v2.NATSQueueID
}
if *v1.NATSJWTFile != *v2.NATSJWTFile {
d.NATSJWTFile = v2.NATSJWTFile
}
if *v1.NATSSeedFile != *v2.NATSSeedFile {
d.NATSSeedFile = v2.NATSSeedFile
}
if *v1.NATSCertificateAuthority != *v2.NATSCertificateAuthority {
d.NATSCertificateAuthority = v2.NATSCertificateAuthority
}
if *v1.NATSClientCertificate != *v2.NATSClientCertificate {
d.NATSClientCertificate = v2.NATSClientCertificate
}
if *v1.NATSClientKey != *v2.NATSClientKey {
d.NATSClientKey = v2.NATSClientKey
}
if *v1.NATSJetStreamMaxWait != *v2.NATSJetStreamMaxWait {
d.NATSJetStreamMaxWait = utils.StringPointer(v2.NATSJetStreamMaxWait.String())
}
if *v1.NATSJetStreamProcessed != *v2.NATSJetStreamProcessed {
d.NATSJetStreamProcessed = v2.NATSJetStreamProcessed
}
if *v1.NATSSubjectProcessed != *v2.NATSSubjectProcessed {
d.NATSSubjectProcessed = v2.NATSSubjectProcessed
}
if *v1.NATSJWTFileProcessed != *v2.NATSJWTFileProcessed {
d.NATSJWTFileProcessed = v2.NATSJWTFileProcessed
}
if *v1.NATSSeedFileProcessed != *v2.NATSSeedFileProcessed {
d.NATSSeedFileProcessed = v2.NATSSeedFileProcessed
}
if *v1.NATSCertificateAuthorityProcessed != *v2.NATSCertificateAuthorityProcessed {
d.NATSCertificateAuthorityProcessed = v2.NATSCertificateAuthorityProcessed
}
if *v1.NATSClientCertificateProcessed != *v2.NATSClientCertificateProcessed {
d.NATSClientCertificateProcessed = v2.NATSClientCertificateProcessed
}
if *v1.NATSClientKeyProcessed != *v2.NATSClientKeyProcessed {
d.NATSClientKeyProcessed = v2.NATSClientKeyProcessed
}
if *v1.NATSJetStreamMaxWaitProcessed != *v2.NATSJetStreamMaxWaitProcessed {
d.NATSJetStreamMaxWaitProcessed = utils.StringPointer(v2.NATSJetStreamMaxWaitProcessed.String())
}
return d
}
func diffEventReaderJsonCfg(d *EventReaderJsonCfg, v1, v2 *EventReaderCfg, separator string) *EventReaderJsonCfg {
if d == nil {
d = new(EventReaderJsonCfg)
@@ -357,7 +976,7 @@ func diffEventReaderJsonCfg(d *EventReaderJsonCfg, v1, v2 *EventReaderCfg, separ
if v1.ProcessedPath != v2.ProcessedPath {
d.Processed_path = utils.StringPointer(v2.ProcessedPath)
}
d.Opts = diffMap(d.Opts, v1.Opts, v2.Opts)
d.Opts = diffEventReaderOptsJsonCfg(d.Opts, v1.Opts, v2.Opts)
tnt1 := v1.Tenant.GetRule(separator)
tnt2 := v2.Tenant.GetRule(separator)
if tnt1 != tnt2 {