diff --git a/config/config.go b/config/config.go index 1fbe9773f..61ac9dde1 100644 --- a/config/config.go +++ b/config/config.go @@ -46,7 +46,7 @@ var ( getDftRemHstCfg = func() *RemoteHost { return new(RemoteHost) } getDftEvExpCfg = func() *EventExporterCfg { return &EventExporterCfg{Opts: &EventExporterOpts{}} } - getDftEvRdrCfg = func() *EventReaderCfg { return &EventReaderCfg{Opts: make(map[string]interface{})} } + getDftEvRdrCfg = func() *EventReaderCfg { return &EventReaderCfg{Opts: &EventReaderOpts{}} } ) func init() { diff --git a/config/erscfg.go b/config/erscfg.go index aa8b78154..999443e29 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -127,6 +127,68 @@ func (erS ERsCfg) AsMapInterface(separator string) interface{} { return mp } +type EventReaderOpts struct { + PartialPath *string + PartialCacheAction *string + PartialOrderField *string + PartialCSVFieldSeparator *string + CSVRowLength *int + CSVFieldSeparator *string + CSVHeaderDefineChar *string + CSVLazyQuotes *bool + XMLRootPath *string + AMQPQueueID *string + AMQPQueueIDProcessed *string + AMQPConsumerTag *string + AMQPExchange *string + AMQPExchangeType *string + AMQPRoutingKey *string + AMQPExchangeProcessed *string + AMQPExchangeTypeProcessed *string + AMQPRoutingKeyProcessed *string + KafkaTopic *string + KafkaGroupID *string + KafkaMaxWait *time.Duration + KafkaTopicProcessed *string + SQLDBName *string + SQLTableName *string + SSLMode *string + SQLDBNameProcessed *string + SQLTableNameProcessed *string + SSLModeProcessed *string + AWSRegion *string + AWSKey *string + AWSSecret *string + AWSToken *string + AWSRegionProcessed *string + AWSKeyProcessed *string + AWSSecretProcessed *string + AWSTokenProcessed *string + SQSQueueID *string + SQSQueueIDProcessed *string + S3BucketID *string + S3FolderPathProcessed *string + S3BucketIDProcessed *string + NATSJetStream *bool + NATSConsumerName *string + NATSSubject *string + NATSQueueID *string + NATSJWTFile *string + NATSSeedFile *string + NATSCertificateAuthority *string + NATSClientCertificate *string + NATSClientKey *string + NATSJetStreamMaxWait *time.Duration + NATSJetStreamProcessed *bool + NATSSubjectProcessed *string + NATSJWTFileProcessed *string + NATSSeedFileProcessed *string + NATSCertificateAuthorityProcessed *string + NATSClientCertificateProcessed *string + NATSClientKeyProcessed *string + NATSJetStreamMaxWaitProcessed *time.Duration +} + // EventReaderCfg the event for the Event Reader type EventReaderCfg struct { ID string @@ -135,7 +197,7 @@ type EventReaderCfg struct { ConcurrentReqs int SourcePath string ProcessedPath string - Opts map[string]interface{} + Opts *EventReaderOpts Tenant RSRParsers Timezone string Filters []string @@ -145,6 +207,202 @@ type EventReaderCfg struct { CacheDumpFields []*FCTemplate } +func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) { + if jsnCfg == nil { + return + } + if jsnCfg.PartialPath != nil { + erOpts.PartialPath = jsnCfg.PartialPath + } + if jsnCfg.PartialCacheAction != nil { + erOpts.PartialCacheAction = jsnCfg.PartialCacheAction + } + if jsnCfg.PartialOrderField != nil { + erOpts.PartialOrderField = jsnCfg.PartialOrderField + } + if jsnCfg.PartialCSVFieldSeparator != nil { + erOpts.PartialCSVFieldSeparator = jsnCfg.PartialCSVFieldSeparator + } + if jsnCfg.CSVRowLength != nil { + erOpts.CSVRowLength = jsnCfg.CSVRowLength + } + if jsnCfg.CSVFieldSeparator != nil { + erOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator + } + if jsnCfg.CSVHeaderDefineChar != nil { + erOpts.CSVHeaderDefineChar = jsnCfg.CSVHeaderDefineChar + } + if jsnCfg.CSVLazyQuotes != nil { + erOpts.CSVLazyQuotes = jsnCfg.CSVLazyQuotes + } + if jsnCfg.XMLRootPath != nil { + erOpts.XMLRootPath = jsnCfg.XMLRootPath + } + if jsnCfg.AMQPQueueID != nil { + erOpts.AMQPQueueID = jsnCfg.AMQPQueueID + } + if jsnCfg.AMQPQueueIDProcessed != nil { + erOpts.AMQPQueueIDProcessed = jsnCfg.AMQPQueueIDProcessed + } + if jsnCfg.AMQPConsumerTag != nil { + erOpts.AMQPConsumerTag = jsnCfg.AMQPConsumerTag + } + if jsnCfg.AMQPExchange != nil { + erOpts.AMQPExchange = jsnCfg.AMQPExchange + } + if jsnCfg.AMQPExchangeType != nil { + erOpts.AMQPExchangeType = jsnCfg.AMQPExchangeType + } + if jsnCfg.AMQPRoutingKey != nil { + erOpts.AMQPRoutingKey = jsnCfg.AMQPRoutingKey + } + if jsnCfg.AMQPExchangeProcessed != nil { + erOpts.AMQPExchangeProcessed = jsnCfg.AMQPExchangeProcessed + } + if jsnCfg.AMQPExchangeTypeProcessed != nil { + erOpts.AMQPExchangeTypeProcessed = jsnCfg.AMQPExchangeTypeProcessed + } + if jsnCfg.AMQPRoutingKeyProcessed != nil { + erOpts.AMQPRoutingKeyProcessed = jsnCfg.AMQPRoutingKeyProcessed + } + if jsnCfg.KafkaTopic != nil { + erOpts.KafkaTopic = jsnCfg.KafkaTopic + } + if jsnCfg.KafkaGroupID != nil { + erOpts.KafkaGroupID = jsnCfg.KafkaGroupID + } + if jsnCfg.KafkaMaxWait != nil { + var kafkaMaxWait time.Duration + if kafkaMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.KafkaMaxWait); err != nil { + return + } + erOpts.KafkaMaxWait = utils.DurationPointer(kafkaMaxWait) + } + if jsnCfg.KafkaTopicProcessed != nil { + erOpts.KafkaTopicProcessed = jsnCfg.KafkaTopicProcessed + } + if jsnCfg.SQLDBName != nil { + erOpts.SQLDBName = jsnCfg.SQLDBName + } + if jsnCfg.SQLTableName != nil { + erOpts.SQLTableName = jsnCfg.SQLTableName + } + if jsnCfg.SSLMode != nil { + erOpts.SSLMode = jsnCfg.SSLMode + } + if jsnCfg.SQLDBNameProcessed != nil { + erOpts.SQLDBNameProcessed = jsnCfg.SQLDBNameProcessed + } + if jsnCfg.SQLTableNameProcessed != nil { + erOpts.SQLTableNameProcessed = jsnCfg.SQLTableNameProcessed + } + if jsnCfg.SSLModeProcessed != nil { + erOpts.SSLModeProcessed = jsnCfg.SSLModeProcessed + } + if jsnCfg.AWSRegion != nil { + erOpts.AWSRegion = jsnCfg.AWSRegion + } + if jsnCfg.AWSKey != nil { + erOpts.AWSKey = jsnCfg.AWSKey + } + if jsnCfg.AWSSecret != nil { + erOpts.AWSSecret = jsnCfg.AWSSecret + } + if jsnCfg.AWSToken != nil { + erOpts.AWSToken = jsnCfg.AWSToken + } + if jsnCfg.AWSRegionProcessed != nil { + erOpts.AWSRegionProcessed = jsnCfg.AWSRegionProcessed + } + if jsnCfg.AWSKeyProcessed != nil { + erOpts.AWSKeyProcessed = jsnCfg.AWSKeyProcessed + } + if jsnCfg.AWSSecretProcessed != nil { + erOpts.AWSSecretProcessed = jsnCfg.AWSSecretProcessed + } + if jsnCfg.AWSTokenProcessed != nil { + erOpts.AWSTokenProcessed = jsnCfg.AWSTokenProcessed + } + if jsnCfg.SQSQueueID != nil { + erOpts.SQSQueueID = jsnCfg.SQSQueueID + } + if jsnCfg.SQSQueueIDProcessed != nil { + erOpts.SQSQueueIDProcessed = jsnCfg.SQSQueueIDProcessed + } + if jsnCfg.S3BucketID != nil { + erOpts.S3BucketID = jsnCfg.S3BucketID + } + if jsnCfg.S3FolderPathProcessed != nil { + erOpts.S3FolderPathProcessed = jsnCfg.S3FolderPathProcessed + } + if jsnCfg.S3BucketIDProcessed != nil { + erOpts.S3BucketIDProcessed = jsnCfg.S3BucketIDProcessed + } + if jsnCfg.NATSJetStream != nil { + erOpts.NATSJetStream = jsnCfg.NATSJetStream + } + if jsnCfg.NATSConsumerName != nil { + erOpts.NATSConsumerName = jsnCfg.NATSConsumerName + } + if jsnCfg.NATSSubject != nil { + erOpts.NATSSubject = jsnCfg.NATSSubject + } + if jsnCfg.NATSQueueID != nil { + erOpts.NATSQueueID = jsnCfg.NATSQueueID + } + if jsnCfg.NATSJWTFile != nil { + erOpts.NATSJWTFile = jsnCfg.NATSJWTFile + } + if jsnCfg.NATSSeedFile != nil { + erOpts.NATSSeedFile = jsnCfg.NATSSeedFile + } + if jsnCfg.NATSCertificateAuthority != nil { + erOpts.NATSCertificateAuthority = jsnCfg.NATSCertificateAuthority + } + if jsnCfg.NATSClientCertificate != nil { + erOpts.NATSClientCertificate = jsnCfg.NATSClientCertificate + } + if jsnCfg.NATSClientKey != nil { + erOpts.NATSClientKey = jsnCfg.NATSClientKey + } + if jsnCfg.NATSJetStreamMaxWait != nil { + var jetStreamMaxWait time.Duration + if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil { + return + } + erOpts.NATSJetStreamMaxWait = utils.DurationPointer(jetStreamMaxWait) + } + if jsnCfg.NATSJetStreamProcessed != nil { + erOpts.NATSJetStreamProcessed = jsnCfg.NATSJetStreamProcessed + } + if jsnCfg.NATSSubjectProcessed != nil { + erOpts.NATSSubjectProcessed = jsnCfg.NATSSubjectProcessed + } + if jsnCfg.NATSJWTFileProcessed != nil { + erOpts.NATSJWTFileProcessed = jsnCfg.NATSJWTFileProcessed + } + if jsnCfg.NATSSeedFileProcessed != nil { + erOpts.NATSSeedFileProcessed = jsnCfg.NATSSeedFileProcessed + } + if jsnCfg.NATSCertificateAuthorityProcessed != nil { + erOpts.NATSCertificateAuthorityProcessed = jsnCfg.NATSCertificateAuthorityProcessed + } + if jsnCfg.NATSClientCertificateProcessed != nil { + erOpts.NATSClientCertificateProcessed = jsnCfg.NATSClientCertificateProcessed + } + if jsnCfg.NATSClientKeyProcessed != nil { + erOpts.NATSClientKeyProcessed = jsnCfg.NATSClientKeyProcessed + } + if jsnCfg.NATSJetStreamMaxWaitProcessed != nil { + var jetStreamMaxWait time.Duration + if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWaitProcessed); err != nil { + return + } + erOpts.NATSJetStreamMaxWaitProcessed = utils.DurationPointer(jetStreamMaxWait) + } + return +} + func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplates map[string][]*FCTemplate, sep string) (err error) { if jsnCfg == nil { return @@ -214,13 +472,75 @@ func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplat } } if jsnCfg.Opts != nil { - for k, v := range jsnCfg.Opts { - er.Opts[k] = v - } + err = er.Opts.loadFromJSONCfg(jsnCfg.Opts) } return } +func (erOpts *EventReaderOpts) Clone() *EventReaderOpts { + return &EventReaderOpts{ + PartialPath: erOpts.PartialPath, + PartialCacheAction: erOpts.PartialCacheAction, + PartialOrderField: erOpts.PartialOrderField, + PartialCSVFieldSeparator: erOpts.PartialCSVFieldSeparator, + CSVRowLength: erOpts.CSVRowLength, + CSVFieldSeparator: erOpts.CSVFieldSeparator, + CSVHeaderDefineChar: erOpts.CSVHeaderDefineChar, + CSVLazyQuotes: erOpts.CSVLazyQuotes, + XMLRootPath: erOpts.XMLRootPath, + AMQPQueueID: erOpts.AMQPQueueID, + AMQPQueueIDProcessed: erOpts.AMQPQueueIDProcessed, + AMQPConsumerTag: erOpts.AMQPConsumerTag, + AMQPExchange: erOpts.AMQPExchange, + AMQPExchangeType: erOpts.AMQPExchangeType, + AMQPRoutingKey: erOpts.AMQPRoutingKey, + AMQPExchangeProcessed: erOpts.AMQPExchangeProcessed, + AMQPExchangeTypeProcessed: erOpts.AMQPExchangeTypeProcessed, + AMQPRoutingKeyProcessed: erOpts.AMQPRoutingKeyProcessed, + KafkaTopic: erOpts.KafkaTopic, + KafkaGroupID: erOpts.KafkaGroupID, + KafkaMaxWait: erOpts.KafkaMaxWait, + KafkaTopicProcessed: erOpts.KafkaTopicProcessed, + SQLDBName: erOpts.SQLDBName, + SQLTableName: erOpts.SQLTableName, + SSLMode: erOpts.SSLMode, + SQLDBNameProcessed: erOpts.SQLDBNameProcessed, + SQLTableNameProcessed: erOpts.SQLTableNameProcessed, + SSLModeProcessed: erOpts.SSLModeProcessed, + AWSRegion: erOpts.AWSRegion, + AWSKey: erOpts.AWSKey, + AWSSecret: erOpts.AWSSecret, + AWSToken: erOpts.AWSToken, + AWSRegionProcessed: erOpts.AWSRegionProcessed, + AWSKeyProcessed: erOpts.AWSKeyProcessed, + AWSSecretProcessed: erOpts.AWSSecretProcessed, + AWSTokenProcessed: erOpts.AWSTokenProcessed, + SQSQueueID: erOpts.SQSQueueID, + SQSQueueIDProcessed: erOpts.SQSQueueIDProcessed, + S3BucketID: erOpts.S3BucketID, + S3FolderPathProcessed: erOpts.S3FolderPathProcessed, + S3BucketIDProcessed: erOpts.S3BucketIDProcessed, + NATSJetStream: erOpts.NATSJetStream, + NATSConsumerName: erOpts.NATSConsumerName, + NATSSubject: erOpts.NATSSubject, + NATSQueueID: erOpts.NATSQueueID, + NATSJWTFile: erOpts.NATSJWTFile, + NATSSeedFile: erOpts.NATSSeedFile, + NATSCertificateAuthority: erOpts.NATSCertificateAuthority, + NATSClientCertificate: erOpts.NATSClientCertificate, + NATSClientKey: erOpts.NATSClientKey, + NATSJetStreamMaxWait: erOpts.NATSJetStreamMaxWait, + NATSJetStreamProcessed: erOpts.NATSJetStreamProcessed, + NATSSubjectProcessed: erOpts.NATSSubjectProcessed, + NATSJWTFileProcessed: erOpts.NATSJWTFileProcessed, + NATSSeedFileProcessed: erOpts.NATSSeedFileProcessed, + NATSCertificateAuthorityProcessed: erOpts.NATSCertificateAuthorityProcessed, + NATSClientCertificateProcessed: erOpts.NATSClientCertificateProcessed, + NATSClientKeyProcessed: erOpts.NATSClientKeyProcessed, + NATSJetStreamMaxWaitProcessed: erOpts.NATSJetStreamMaxWaitProcessed, + } +} + // Clone returns a deep copy of EventReaderCfg func (er EventReaderCfg) Clone() (cln *EventReaderCfg) { cln = &EventReaderCfg{ @@ -233,7 +553,7 @@ func (er EventReaderCfg) Clone() (cln *EventReaderCfg) { Tenant: er.Tenant.Clone(), Timezone: er.Timezone, Flags: er.Flags.Clone(), - Opts: make(map[string]interface{}), + Opts: er.Opts.Clone(), } if er.Filters != nil { cln.Filters = utils.CloneStringSlice(er.Filters) @@ -256,14 +576,72 @@ func (er EventReaderCfg) Clone() (cln *EventReaderCfg) { cln.PartialCommitFields[idx] = fld.Clone() } } - for k, v := range er.Opts { - cln.Opts[k] = v - } return } // AsMapInterface returns the config as a map[string]interface{} func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string]interface{}) { + opts := map[string]interface{}{ + utils.PartialPathOpt: er.Opts.PartialPath, + utils.PartialCacheActionOpt: er.Opts.PartialCacheAction, + utils.PartialOrderFieldOpt: er.Opts.PartialOrderField, + utils.PartialCSVFieldSepartorOpt: er.Opts.PartialCSVFieldSeparator, + utils.CSVRowLengthOpt: er.Opts.CSVRowLength, + utils.CSVFieldSepOpt: er.Opts.CSVFieldSeparator, + utils.HeaderDefineCharOpt: er.Opts.CSVHeaderDefineChar, + utils.CSVLazyQuotes: er.Opts.CSVLazyQuotes, + utils.XMLRootPathOpt: er.Opts.XMLRootPath, + utils.AMQPQueueID: er.Opts.AMQPQueueID, + utils.AMQPQueueIDProcessedCfg: er.Opts.AMQPQueueIDProcessed, + utils.AMQPConsumerTag: er.Opts.AMQPConsumerTag, + utils.AMQPExchange: er.Opts.AMQPExchange, + utils.AMQPExchangeType: er.Opts.AMQPExchangeType, + utils.AMQPRoutingKey: er.Opts.AMQPRoutingKey, + utils.AMQPExchangeProcessedCfg: er.Opts.AMQPExchangeProcessed, + utils.AMQPExchangeTypeProcessedCfg: er.Opts.AMQPExchangeTypeProcessed, + utils.AMQPRoutingKeyProcessedCfg: er.Opts.AMQPRoutingKeyProcessed, + utils.KafkaTopic: er.Opts.KafkaTopic, + utils.KafkaGroupID: er.Opts.KafkaGroupID, + utils.KafkaMaxWait: er.Opts.KafkaMaxWait, + utils.KafkaTopicProcessedCfg: er.Opts.KafkaTopicProcessed, + utils.SQLDBNameOpt: er.Opts.SQLDBName, + utils.SQLTableNameOpt: er.Opts.SQLTableName, + utils.SSLModeCfg: er.Opts.SSLMode, + utils.SQLDBNameProcessedCfg: er.Opts.SQLDBNameProcessed, + utils.SQLTableNameProcessedCfg: er.Opts.SQLTableNameProcessed, + utils.SSLModeProcessedCfg: er.Opts.SSLModeProcessed, + utils.AWSRegion: er.Opts.AWSRegion, + utils.AWSKey: er.Opts.AWSKey, + utils.AWSSecret: er.Opts.AWSSecret, + utils.AWSToken: er.Opts.AWSToken, + utils.AWSRegionProcessedCfg: er.Opts.AWSRegionProcessed, + utils.AWSKeyProcessedCfg: er.Opts.AWSKeyProcessed, + utils.AWSSecretProcessedCfg: er.Opts.AWSSecretProcessed, + utils.AWSTokenProcessedCfg: er.Opts.AWSTokenProcessed, + utils.SQSQueueID: er.Opts.SQSQueueID, + utils.SQSQueueIDProcessedCfg: er.Opts.SQSQueueIDProcessed, + utils.S3Bucket: er.Opts.S3BucketID, + utils.S3FolderPathProcessedCfg: er.Opts.S3FolderPathProcessed, + utils.S3BucketIDProcessedCfg: er.Opts.S3BucketIDProcessed, + utils.NatsJetStream: er.Opts.NATSJetStream, + utils.NatsConsumerName: er.Opts.NATSConsumerName, + utils.NatsSubject: er.Opts.NATSSubject, + utils.NatsQueueID: er.Opts.NATSQueueID, + utils.NatsJWTFile: er.Opts.NATSJWTFile, + utils.NatsSeedFile: er.Opts.NATSSeedFile, + utils.NatsCertificateAuthority: er.Opts.NATSCertificateAuthority, + utils.NatsClientCertificate: er.Opts.NATSClientCertificate, + utils.NatsClientKey: er.Opts.NATSClientKey, + utils.NatsJetStreamMaxWait: er.Opts.NATSJetStreamMaxWait, + utils.NATSJetStreamProcessedCfg: er.Opts.NATSJetStreamProcessed, + utils.NATSSubjectProcessedCfg: er.Opts.NATSSubjectProcessed, + utils.NATSJWTFileProcessedCfg: er.Opts.NATSJWTFileProcessed, + utils.NATSSeedFileProcessedCfg: er.Opts.NATSSeedFileProcessed, + utils.NATSCertificateAuthorityProcessedCfg: er.Opts.NATSCertificateAuthorityProcessed, + utils.NATSClientCertificateProcessed: er.Opts.NATSClientCertificateProcessed, + utils.NATSClientKeyProcessedCfg: er.Opts.NATSClientKeyProcessed, + utils.NATSJetStreamMaxWaitProcessedCfg: er.Opts.NATSJetStreamMaxWaitProcessed, + } initialMP = map[string]interface{}{ utils.IDCfg: er.ID, utils.TypeCfg: er.Type, @@ -275,14 +653,9 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string utils.FiltersCfg: er.Filters, utils.FlagsCfg: []string{}, utils.RunDelayCfg: "0", + utils.OptsCfg: opts, } - opts := make(map[string]interface{}) - for k, v := range er.Opts { - opts[k] = v - } - initialMP[utils.OptsCfg] = opts - if flags := er.Flags.SliceFlags(); flags != nil { initialMP[utils.FlagsCfg] = flags } @@ -317,6 +690,68 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string return } +type EventReaderOptsJson struct { + PartialPath *string + PartialCacheAction *string + PartialOrderField *string + PartialCSVFieldSeparator *string + CSVRowLength *int + CSVFieldSeparator *string + CSVHeaderDefineChar *string + CSVLazyQuotes *bool + XMLRootPath *string + AMQPQueueID *string + AMQPQueueIDProcessed *string + AMQPConsumerTag *string + AMQPExchange *string + AMQPExchangeType *string + AMQPRoutingKey *string + AMQPExchangeProcessed *string + AMQPExchangeTypeProcessed *string + AMQPRoutingKeyProcessed *string + KafkaTopic *string + KafkaGroupID *string + KafkaMaxWait *string + KafkaTopicProcessed *string + SQLDBName *string + SQLTableName *string + SSLMode *string + SQLDBNameProcessed *string + SQLTableNameProcessed *string + SSLModeProcessed *string + AWSRegion *string + AWSKey *string + AWSSecret *string + AWSToken *string + AWSRegionProcessed *string + AWSKeyProcessed *string + AWSSecretProcessed *string + AWSTokenProcessed *string + SQSQueueID *string + SQSQueueIDProcessed *string + S3BucketID *string + S3FolderPathProcessed *string + S3BucketIDProcessed *string + NATSJetStream *bool + NATSConsumerName *string + NATSSubject *string + NATSQueueID *string + NATSJWTFile *string + NATSSeedFile *string + NATSCertificateAuthority *string + NATSClientCertificate *string + NATSClientKey *string + NATSJetStreamMaxWait *string + NATSJetStreamProcessed *bool + NATSSubjectProcessed *string + NATSJWTFileProcessed *string + NATSSeedFileProcessed *string + NATSCertificateAuthorityProcessed *string + NATSClientCertificateProcessed *string + NATSClientKeyProcessed *string + NATSJetStreamMaxWaitProcessed *string +} + // EventReaderSJsonCfg is the configuration of a single EventReader type EventReaderJsonCfg struct { Id *string @@ -325,7 +760,7 @@ type EventReaderJsonCfg struct { Concurrent_requests *int Source_path *string Processed_path *string - Opts map[string]interface{} + Opts *EventReaderOptsJson Tenant *string Timezone *string Filters *[]string @@ -335,6 +770,190 @@ type EventReaderJsonCfg struct { Cache_dump_fields *[]*FcTemplateJsonCfg } +func diffEventReaderOptsJsonCfg(d *EventReaderOptsJson, v1, v2 *EventReaderOpts) *EventReaderOptsJson { + if d == nil { + d = new(EventReaderOptsJson) + } + if *v1.PartialPath != *v2.PartialPath { + d.PartialPath = v2.PartialPath + } + if *v1.PartialCacheAction != *v2.PartialCacheAction { + d.PartialCacheAction = v2.PartialCacheAction + } + if *v1.PartialOrderField != *v2.PartialOrderField { + d.PartialOrderField = v2.PartialOrderField + } + if *v1.PartialCSVFieldSeparator != *v2.PartialCSVFieldSeparator { + d.PartialCSVFieldSeparator = v2.PartialCSVFieldSeparator + } + if *v1.CSVRowLength != *v2.CSVRowLength { + d.CSVRowLength = v2.CSVRowLength + } + if *v1.CSVFieldSeparator != *v2.CSVFieldSeparator { + d.CSVFieldSeparator = v2.CSVFieldSeparator + } + if *v1.CSVHeaderDefineChar != *v2.CSVHeaderDefineChar { + d.CSVHeaderDefineChar = v2.CSVHeaderDefineChar + } + if *v1.CSVLazyQuotes != *v2.CSVLazyQuotes { + d.CSVLazyQuotes = v2.CSVLazyQuotes + } + if *v1.XMLRootPath != *v2.XMLRootPath { + d.XMLRootPath = v2.XMLRootPath + } + if *v1.AMQPQueueID != *v2.AMQPQueueID { + d.AMQPQueueID = v2.AMQPQueueID + } + if *v1.AMQPQueueIDProcessed != *v2.AMQPQueueIDProcessed { + d.AMQPQueueIDProcessed = v2.AMQPQueueIDProcessed + } + if *v1.AMQPConsumerTag != *v2.AMQPConsumerTag { + d.AMQPConsumerTag = v2.AMQPConsumerTag + } + if *v1.AMQPExchange != *v2.AMQPExchange { + d.AMQPExchange = v2.AMQPExchange + } + if *v1.AMQPExchangeType != *v2.AMQPExchangeType { + d.AMQPExchangeType = v2.AMQPExchangeType + } + if *v1.AMQPRoutingKey != *v2.AMQPRoutingKey { + d.AMQPRoutingKey = v2.AMQPRoutingKey + } + if *v1.AMQPExchangeProcessed != *v2.AMQPExchangeProcessed { + d.AMQPExchangeProcessed = v2.AMQPExchangeProcessed + } + if *v1.AMQPExchangeTypeProcessed != *v2.AMQPExchangeTypeProcessed { + d.AMQPExchangeTypeProcessed = v2.AMQPExchangeTypeProcessed + } + if *v1.AMQPRoutingKeyProcessed != *v2.AMQPRoutingKeyProcessed { + d.AMQPRoutingKeyProcessed = v2.AMQPRoutingKeyProcessed + } + if *v1.KafkaTopic != *v2.KafkaTopic { + d.KafkaTopic = v2.KafkaTopic + } + if *v1.KafkaGroupID != *v2.KafkaGroupID { + d.KafkaGroupID = v2.KafkaGroupID + } + if *v1.KafkaMaxWait != *v2.KafkaMaxWait { + d.KafkaMaxWait = utils.StringPointer(v2.KafkaMaxWait.String()) + } + if *v1.KafkaTopicProcessed != *v2.KafkaTopicProcessed { + d.KafkaTopicProcessed = v2.KafkaTopicProcessed + } + if *v1.SQLDBName != *v2.SQLDBName { + d.SQLDBName = v2.SQLDBName + } + if *v1.SQLTableName != *v2.SQLTableName { + d.SQLTableName = v2.SQLTableName + } + if *v1.SSLMode != *v2.SSLMode { + d.SSLMode = v2.SSLMode + } + if *v1.SQLDBNameProcessed != *v2.SQLDBNameProcessed { + d.SQLDBNameProcessed = v2.SQLDBNameProcessed + } + if *v1.SQLTableNameProcessed != *v2.SQLTableNameProcessed { + d.SQLTableNameProcessed = v2.SQLTableNameProcessed + } + if *v1.SSLModeProcessed != *v2.SSLModeProcessed { + d.SSLModeProcessed = v2.SSLModeProcessed + } + if *v1.AWSRegion != *v2.AWSRegion { + d.AWSRegion = v2.AWSRegion + } + if *v1.AWSKey != *v2.AWSKey { + d.AWSKey = v2.AWSKey + } + if *v1.AWSSecret != *v2.AWSSecret { + d.AWSSecret = v2.AWSSecret + } + if *v1.AWSToken != *v2.AWSToken { + d.AWSToken = v2.AWSToken + } + if *v1.AWSRegionProcessed != *v2.AWSRegionProcessed { + d.AWSRegionProcessed = v2.AWSRegionProcessed + } + if *v1.AWSKeyProcessed != *v2.AWSKeyProcessed { + d.AWSKeyProcessed = v2.AWSKeyProcessed + } + if *v1.AWSSecretProcessed != *v2.AWSSecretProcessed { + d.AWSSecretProcessed = v2.AWSSecretProcessed + } + if *v1.AWSTokenProcessed != *v2.AWSTokenProcessed { + d.AWSTokenProcessed = v2.AWSTokenProcessed + } + if *v1.SQSQueueID != *v2.SQSQueueID { + d.SQSQueueID = v2.SQSQueueID + } + if *v1.SQSQueueIDProcessed != *v2.SQSQueueIDProcessed { + d.SQSQueueIDProcessed = v2.SQSQueueIDProcessed + } + if *v1.S3BucketID != *v2.S3BucketID { + d.S3BucketID = v2.S3BucketID + } + if *v1.S3FolderPathProcessed != *v2.S3FolderPathProcessed { + d.S3FolderPathProcessed = v2.S3FolderPathProcessed + } + if *v1.S3BucketIDProcessed != *v2.S3BucketIDProcessed { + d.S3BucketIDProcessed = v2.S3BucketIDProcessed + } + if *v1.NATSJetStream != *v2.NATSJetStream { + d.NATSJetStream = v2.NATSJetStream + } + if *v1.NATSConsumerName != *v2.NATSConsumerName { + d.NATSConsumerName = v2.NATSConsumerName + } + if *v1.NATSSubject != *v2.NATSSubject { + d.NATSSubject = v2.NATSSubject + } + if *v1.NATSQueueID != *v2.NATSQueueID { + d.NATSQueueID = v2.NATSQueueID + } + if *v1.NATSJWTFile != *v2.NATSJWTFile { + d.NATSJWTFile = v2.NATSJWTFile + } + if *v1.NATSSeedFile != *v2.NATSSeedFile { + d.NATSSeedFile = v2.NATSSeedFile + } + if *v1.NATSCertificateAuthority != *v2.NATSCertificateAuthority { + d.NATSCertificateAuthority = v2.NATSCertificateAuthority + } + if *v1.NATSClientCertificate != *v2.NATSClientCertificate { + d.NATSClientCertificate = v2.NATSClientCertificate + } + if *v1.NATSClientKey != *v2.NATSClientKey { + d.NATSClientKey = v2.NATSClientKey + } + if *v1.NATSJetStreamMaxWait != *v2.NATSJetStreamMaxWait { + d.NATSJetStreamMaxWait = utils.StringPointer(v2.NATSJetStreamMaxWait.String()) + } + if *v1.NATSJetStreamProcessed != *v2.NATSJetStreamProcessed { + d.NATSJetStreamProcessed = v2.NATSJetStreamProcessed + } + if *v1.NATSSubjectProcessed != *v2.NATSSubjectProcessed { + d.NATSSubjectProcessed = v2.NATSSubjectProcessed + } + if *v1.NATSJWTFileProcessed != *v2.NATSJWTFileProcessed { + d.NATSJWTFileProcessed = v2.NATSJWTFileProcessed + } + if *v1.NATSSeedFileProcessed != *v2.NATSSeedFileProcessed { + d.NATSSeedFileProcessed = v2.NATSSeedFileProcessed + } + if *v1.NATSCertificateAuthorityProcessed != *v2.NATSCertificateAuthorityProcessed { + d.NATSCertificateAuthorityProcessed = v2.NATSCertificateAuthorityProcessed + } + if *v1.NATSClientCertificateProcessed != *v2.NATSClientCertificateProcessed { + d.NATSClientCertificateProcessed = v2.NATSClientCertificateProcessed + } + if *v1.NATSClientKeyProcessed != *v2.NATSClientKeyProcessed { + d.NATSClientKeyProcessed = v2.NATSClientKeyProcessed + } + if *v1.NATSJetStreamMaxWaitProcessed != *v2.NATSJetStreamMaxWaitProcessed { + d.NATSJetStreamMaxWaitProcessed = utils.StringPointer(v2.NATSJetStreamMaxWaitProcessed.String()) + } + return d +} + func diffEventReaderJsonCfg(d *EventReaderJsonCfg, v1, v2 *EventReaderCfg, separator string) *EventReaderJsonCfg { if d == nil { d = new(EventReaderJsonCfg) @@ -357,7 +976,7 @@ func diffEventReaderJsonCfg(d *EventReaderJsonCfg, v1, v2 *EventReaderCfg, separ if v1.ProcessedPath != v2.ProcessedPath { d.Processed_path = utils.StringPointer(v2.ProcessedPath) } - d.Opts = diffMap(d.Opts, v1.Opts, v2.Opts) + d.Opts = diffEventReaderOptsJsonCfg(d.Opts, v1.Opts, v2.Opts) tnt1 := v1.Tenant.GetRule(separator) tnt2 := v2.Tenant.GetRule(separator) if tnt1 != tnt2 {