diff --git a/config/config_defaults.go b/config/config_defaults.go index 5dea5ae19..89df9b8d2 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -508,12 +508,12 @@ const CGRATES_CFG_JSON = ` // SQL // "sqlMaxIdleConns": 0, // SQLMaxIdleConns // "sqlMaxOpenConns": 0, // SQLMaxOpenConns - // "sqlMaxConnLifetime": 0, // SQLMaxConnLifetime + // "sqlConnMaxLifetime": 0, // SQLConnMaxLifetime // "sqlTableName":"cdrs", // the name of the table from where the events are exported // "sqlDBName": "cgrates", // the name of the database from where the events are exported - // "sslmode": "disable", // the postgresSSLMode for postgres + // "sslMode": "disable", // the postgresSSLMode for postgres // Kafka diff --git a/config/eescfg.go b/config/eescfg.go index 870c92cbc..35e49afc4 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -19,6 +19,8 @@ along with this program. If not, see package config import ( + "time" + "github.com/cgrates/cgrates/utils" ) @@ -151,6 +153,46 @@ func (eeS *EEsCfg) AsMapInterface(separator string) (initialMP map[string]interf return } +type EventExporterOpts struct { + CSVFieldSeparator *string + ElsIndex *string + ElsIfPrimaryTerm *int + ElsIfSeqNo *int + ElsOpType *string + ElsPipeline *string + ElsRouting *string + ElsTimeout *time.Duration + ElsVersion *int + ElsVersionType *string + ElsWaitForActiveShards *string + SQLMaxIdleConns *int + SQLMaxOpenConns *int + SQLConnMaxLifetime *time.Duration + SQLTableName *string + SQLDBName *string + SSLMode *string + KafkaTopic *string + AMQPRoutingKey *string + AMQPQueueID *string + AMQPExchange *string + AMQPExchangeType *string + AWSRegion *string + AWSKey *string + AWSSecret *string + AWSToken *string + SQSQueueID *string + S3BucketID *string + S3FolderPath *string + NATSJetStream *bool + NATSSubject *string + NATSJWTFile *string + NATSSeedFile *string + NATSCertificateAuthority *string + NATSClientCertificate *string + NATSClientKey *string + NATSJetStreamMaxWait *time.Duration +} + // EventExporterCfg the config for a Event Exporter type EventExporterCfg struct { ID string @@ -172,6 +214,136 @@ type EventExporterCfg struct { trailerFields []*FCTemplate } +func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { + if jsnCfg == nil { + return + } + if jsnCfg.CSVFieldSeparator != nil { + eeOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator + } + if jsnCfg.ElsIndex != nil { + eeOpts.ElsIndex = jsnCfg.ElsIndex + } + if jsnCfg.ElsIfPrimaryTerm != nil { + eeOpts.ElsIfPrimaryTerm = jsnCfg.ElsIfPrimaryTerm + } + if jsnCfg.ElsIfSeqNo != nil { + eeOpts.ElsIfSeqNo = jsnCfg.ElsIfSeqNo + } + if jsnCfg.ElsOpType != nil { + eeOpts.ElsOpType = jsnCfg.ElsOpType + } + if jsnCfg.ElsPipeline != nil { + eeOpts.ElsPipeline = jsnCfg.ElsPipeline + } + if jsnCfg.ElsRouting != nil { + eeOpts.ElsRouting = jsnCfg.ElsRouting + } + if jsnCfg.ElsTimeout != nil { + var elsTimeout time.Duration + if elsTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.ElsTimeout); err != nil { + return + } + eeOpts.ElsTimeout = utils.DurationPointer(elsTimeout) + } + if jsnCfg.ElsVersion != nil { + eeOpts.ElsVersion = jsnCfg.ElsVersion + } + if jsnCfg.ElsVersionType != nil { + eeOpts.ElsVersionType = jsnCfg.ElsVersionType + } + if jsnCfg.ElsWaitForActiveShards != nil { + eeOpts.ElsWaitForActiveShards = jsnCfg.ElsWaitForActiveShards + } + if jsnCfg.SQLMaxIdleConns != nil { + eeOpts.SQLMaxIdleConns = jsnCfg.SQLMaxIdleConns + } + if jsnCfg.SQLMaxOpenConns != nil { + eeOpts.SQLMaxOpenConns = jsnCfg.SQLMaxOpenConns + } + if jsnCfg.SQLConnMaxLifetime != nil { + var sqlConnMaxLifetime time.Duration + if sqlConnMaxLifetime, err = utils.ParseDurationWithNanosecs(*jsnCfg.SQLConnMaxLifetime); err != nil { + return + } + eeOpts.SQLConnMaxLifetime = utils.DurationPointer(sqlConnMaxLifetime) + } + if jsnCfg.SQLTableName != nil { + eeOpts.SQLTableName = jsnCfg.SQLTableName + } + if jsnCfg.SQLDBName != nil { + eeOpts.SQLDBName = jsnCfg.SQLDBName + } + if jsnCfg.SSLMode != nil { + eeOpts.SSLMode = jsnCfg.SSLMode + } + if jsnCfg.KafkaTopic != nil { + eeOpts.KafkaTopic = jsnCfg.KafkaTopic + } + if jsnCfg.AMQPQueueID != nil { + eeOpts.AMQPQueueID = jsnCfg.AMQPQueueID + } + if jsnCfg.AMQPRoutingKey != nil { + eeOpts.AMQPRoutingKey = jsnCfg.AMQPRoutingKey + } + if jsnCfg.AMQPExchange != nil { + eeOpts.AMQPExchange = jsnCfg.AMQPExchange + } + if jsnCfg.AMQPExchangeType != nil { + eeOpts.AMQPExchangeType = jsnCfg.AMQPExchangeType + } + if jsnCfg.AWSRegion != nil { + eeOpts.AWSRegion = jsnCfg.AWSRegion + } + if jsnCfg.AWSKey != nil { + eeOpts.AWSKey = jsnCfg.AWSKey + } + if jsnCfg.AWSSecret != nil { + eeOpts.AWSSecret = jsnCfg.AWSSecret + } + if jsnCfg.AWSToken != nil { + eeOpts.AWSToken = jsnCfg.AWSToken + } + if jsnCfg.SQSQueueID != nil { + eeOpts.SQSQueueID = jsnCfg.SQSQueueID + } + if jsnCfg.S3BucketID != nil { + eeOpts.S3BucketID = jsnCfg.S3BucketID + } + if jsnCfg.S3FolderPath != nil { + eeOpts.S3FolderPath = jsnCfg.S3FolderPath + } + if jsnCfg.NATSJetStream != nil { + eeOpts.NATSJetStream = jsnCfg.NATSJetStream + } + if jsnCfg.NATSSubject != nil { + eeOpts.NATSSubject = jsnCfg.NATSSubject + } + if jsnCfg.NATSJWTFile != nil { + eeOpts.NATSJWTFile = jsnCfg.NATSJWTFile + } + if jsnCfg.NATSSeedFile != nil { + eeOpts.NATSSeedFile = jsnCfg.NATSSeedFile + } + if jsnCfg.NATSCertificateAuthority != nil { + eeOpts.NATSCertificateAuthority = jsnCfg.NATSCertificateAuthority + } + if jsnCfg.NATSClientCertificate != nil { + eeOpts.NATSClientCertificate = jsnCfg.NATSClientCertificate + } + if jsnCfg.NATSClientKey != nil { + eeOpts.NATSClientKey = jsnCfg.NATSClientKey + } + if jsnCfg.NATSJetStreamMaxWait != nil { + var natsJetStreamMaxWait time.Duration + if natsJetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil { + return + } + eeOpts.NATSJetStreamMaxWait = utils.DurationPointer(natsJetStreamMaxWait) + } + return +} + func (eeC *EventExporterCfg) loadFromJSONCfg(jsnEec *EventExporterJsonCfg, msgTemplates map[string][]*FCTemplate, separator string) (err error) { if jsnEec == nil { return @@ -271,6 +443,122 @@ func (eeC *EventExporterCfg) TrailerFields() []*FCTemplate { return eeC.trailerFields } +func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts { + cln := &EventExporterOpts{} + if eeOpts.CSVFieldSeparator != nil { + cln.CSVFieldSeparator = utils.StringPointer(*eeOpts.CSVFieldSeparator) + } + if eeOpts.ElsIndex != nil { + cln.ElsIndex = utils.StringPointer(*eeOpts.ElsIndex) + } + if eeOpts.ElsIfPrimaryTerm != nil { + cln.ElsIfPrimaryTerm = utils.IntPointer(*eeOpts.ElsIfPrimaryTerm) + } + if eeOpts.ElsIfSeqNo != nil { + cln.ElsIfSeqNo = utils.IntPointer(*eeOpts.ElsIfSeqNo) + } + if eeOpts.ElsOpType != nil { + cln.ElsOpType = utils.StringPointer(*eeOpts.ElsOpType) + } + if eeOpts.ElsPipeline != nil { + cln.ElsPipeline = utils.StringPointer(*eeOpts.ElsPipeline) + } + if eeOpts.ElsRouting != nil { + cln.ElsRouting = utils.StringPointer(*eeOpts.ElsRouting) + } + if eeOpts.ElsTimeout != nil { + cln.ElsTimeout = utils.DurationPointer(*eeOpts.ElsTimeout) + } + if eeOpts.ElsVersion != nil { + cln.ElsVersion = utils.IntPointer(*eeOpts.ElsVersion) + } + if eeOpts.ElsVersionType != nil { + cln.ElsVersionType = utils.StringPointer(*eeOpts.ElsVersionType) + } + if eeOpts.ElsWaitForActiveShards != nil { + cln.ElsWaitForActiveShards = utils.StringPointer(*eeOpts.ElsWaitForActiveShards) + } + if eeOpts.SQLMaxIdleConns != nil { + cln.SQLMaxIdleConns = utils.IntPointer(*eeOpts.SQLMaxIdleConns) + } + if eeOpts.SQLMaxOpenConns != nil { + cln.SQLMaxOpenConns = utils.IntPointer(*eeOpts.SQLMaxOpenConns) + } + if eeOpts.SQLConnMaxLifetime != nil { + cln.SQLConnMaxLifetime = utils.DurationPointer(*eeOpts.SQLConnMaxLifetime) + } + if eeOpts.SQLTableName != nil { + cln.SQLTableName = utils.StringPointer(*eeOpts.SQLTableName) + } + if eeOpts.SQLDBName != nil { + cln.SQLDBName = utils.StringPointer(*eeOpts.SQLDBName) + } + if eeOpts.SSLMode != nil { + cln.SSLMode = utils.StringPointer(*eeOpts.SSLMode) + } + if eeOpts.KafkaTopic != nil { + cln.KafkaTopic = utils.StringPointer(*eeOpts.KafkaTopic) + } + if eeOpts.AMQPQueueID != nil { + cln.AMQPQueueID = utils.StringPointer(*eeOpts.AMQPQueueID) + } + if eeOpts.AMQPRoutingKey != nil { + cln.AMQPRoutingKey = utils.StringPointer(*eeOpts.AMQPRoutingKey) + } + if eeOpts.AMQPExchange != nil { + cln.AMQPExchange = utils.StringPointer(*eeOpts.AMQPExchange) + } + if eeOpts.AMQPExchangeType != nil { + cln.AMQPExchangeType = utils.StringPointer(*eeOpts.AMQPExchangeType) + } + if eeOpts.AWSRegion != nil { + cln.AWSRegion = utils.StringPointer(*eeOpts.AWSRegion) + } + if eeOpts.AWSKey != nil { + cln.AWSKey = utils.StringPointer(*eeOpts.AWSKey) + } + if eeOpts.AWSSecret != nil { + cln.AWSSecret = utils.StringPointer(*eeOpts.AWSSecret) + } + if eeOpts.AWSToken != nil { + cln.AWSToken = utils.StringPointer(*eeOpts.AWSToken) + } + if eeOpts.SQSQueueID != nil { + cln.SQSQueueID = utils.StringPointer(*eeOpts.SQSQueueID) + } + if eeOpts.S3BucketID != nil { + cln.S3BucketID = utils.StringPointer(*eeOpts.S3BucketID) + } + if eeOpts.S3FolderPath != nil { + cln.S3FolderPath = utils.StringPointer(*eeOpts.S3FolderPath) + } + if eeOpts.NATSJetStream != nil { + cln.NATSJetStream = utils.BoolPointer(*eeOpts.NATSJetStream) + } + if eeOpts.NATSSubject != nil { + cln.NATSSubject = utils.StringPointer(*eeOpts.NATSSubject) + } + if eeOpts.NATSJWTFile != nil { + cln.NATSJWTFile = utils.StringPointer(*eeOpts.NATSJWTFile) + } + if eeOpts.NATSSeedFile != nil { + cln.NATSSeedFile = utils.StringPointer(*eeOpts.NATSSeedFile) + } + if eeOpts.NATSCertificateAuthority != nil { + cln.NATSCertificateAuthority = utils.StringPointer(*eeOpts.NATSCertificateAuthority) + } + if eeOpts.NATSClientCertificate != nil { + cln.NATSClientCertificate = utils.StringPointer(*eeOpts.NATSClientCertificate) + } + if eeOpts.NATSClientKey != nil { + cln.NATSClientKey = utils.StringPointer(*eeOpts.NATSClientKey) + } + if eeOpts.NATSJetStreamMaxWait != nil { + cln.NATSJetStreamMaxWait = utils.DurationPointer(*eeOpts.NATSJetStreamMaxWait) + } + return cln +} + // Clone returns a deep copy of EventExporterCfg func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) { cln = &EventExporterCfg{ @@ -324,6 +612,119 @@ func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) { // AsMapInterface returns the config as a map[string]interface{} func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[string]interface{}) { + // opts := map[string]interface{}{} + // if eeC.Opts.CSVFieldSeparator != nil { + // opts[utils.CSVFieldSepOpt] = *eeC.Opts.CSVFieldSeparator + // } + // if eeC.Opts.ElsIndex != nil { + // opts[utils.ElsIndex] = *eeC.Opts.ElsIndex + // } + // if eeC.Opts.ElsIfPrimaryTerm != nil { + // opts[utils.ElsIfPrimaryTerm] = *eeC.Opts.ElsIfPrimaryTerm + // } + // if eeC.Opts.ElsIfSeqNo != nil { + // opts[utils.ElsIfSeqNo] = *eeC.Opts.ElsIfSeqNo + // } + // if eeC.Opts.ElsOpType != nil { + // opts[utils.ElsOpType] = *eeC.Opts.ElsOpType + // } + // if eeC.Opts.ElsPipeline != nil { + // opts[utils.ElsPipeline] = *eeC.Opts.ElsPipeline + // } + // if eeC.Opts.ElsRouting != nil { + // opts[utils.ElsRouting] = *eeC.Opts.ElsRouting + // } + // if eeC.Opts.ElsTimeout != nil { + // opts[utils.ElsTimeout] = eeC.Opts.ElsTimeout.String() + // } + // if eeC.Opts.ElsVersion != nil { + // opts[utils.ElsVersionLow] = *eeC.Opts.ElsVersion + // } + // if eeC.Opts.ElsVersionType != nil { + // opts[utils.ElsVersionType] = *eeC.Opts.ElsVersionType + // } + // if eeC.Opts.ElsWaitForActiveShards != nil { + // opts[utils.ElsWaitForActiveShards] = *eeC.Opts.ElsWaitForActiveShards + // } + // if eeC.Opts.SQLMaxIdleConns != nil { + // opts[utils.SQLMaxIdleConnsCfg] = *eeC.Opts.SQLMaxIdleConns + // } + // if eeC.Opts.SQLMaxOpenConns != nil { + // opts[utils.SQLMaxOpenConns] = *eeC.Opts.SQLMaxOpenConns + // } + // if eeC.Opts.SQLConnMaxLifetime != nil { + // opts[utils.SQLConnMaxLifetime] = eeC.Opts.SQLConnMaxLifetime.String() + // } + // if eeC.Opts.SQLTableName != nil { + // opts[utils.SQLTableNameOpt] = *eeC.Opts.SQLTableName + // } + // if eeC.Opts.SQLDBName != nil { + // opts[utils.SQLDBNameOpt] = *eeC.Opts.SQLDBName + // } + // if eeC.Opts.SSLMode != nil { + // opts[utils.SSLModeCfg] = *eeC.Opts.SSLMode + // } + // if eeC.Opts.KafkaTopic != nil { + // opts[utils.KafkaTopic] = *eeC.Opts.KafkaTopic + // } + // if eeC.Opts.AMQPQueueID != nil { + // opts[utils.AMQPQueueID] = *eeC.Opts.AMQPQueueID + // } + // if eeC.Opts.AMQPRoutingKey != nil { + // opts[utils.AMQPRoutingKey] = *eeC.Opts.AMQPRoutingKey + // } + // if eeC.Opts.AMQPExchange != nil { + // opts[utils.AMQPExchange] = *eeC.Opts.AMQPExchange + // } + // if eeC.Opts.AMQPExchangeType != nil { + // opts[utils.AMQPExchangeType] = *eeC.Opts.AMQPExchangeType + // } + // if eeC.Opts.AWSRegion != nil { + // opts[utils.AWSRegion] = *eeC.Opts.AWSRegion + // } + // if eeC.Opts.AWSKey != nil { + // opts[utils.AWSKey] = *eeC.Opts.AWSKey + // } + // if eeC.Opts.AWSSecret != nil { + // opts[utils.AWSSecret] = *eeC.Opts.AWSSecret + // } + // if eeC.Opts.AWSToken != nil { + // opts[utils.AWSToken] = *eeC.Opts.AWSToken + // } + // if eeC.Opts.SQSQueueID != nil { + // opts[utils.SQSQueueID] = *eeC.Opts.SQSQueueID + // } + // if eeC.Opts.S3BucketID != nil { + // opts[utils.S3Bucket] = *eeC.Opts.S3BucketID + // } + // if eeC.Opts.S3FolderPath != nil { + // opts[utils.S3FolderPath] = *eeC.Opts.S3FolderPath + // } + // if eeC.Opts.NATSJetStream != nil { + // opts[utils.NatsJetStream] = *eeC.Opts.NATSJetStream + // } + // if eeC.Opts.NATSSubject != nil { + // opts[utils.NatsSubject] = *eeC.Opts.NATSSubject + // } + // if eeC.Opts.NATSJWTFile != nil { + // opts[utils.NatsJWTFile] = *eeC.Opts.NATSJWTFile + // } + // if eeC.Opts.NATSSeedFile != nil { + // opts[utils.NatsSeedFile] = *eeC.Opts.NATSSeedFile + // } + // if eeC.Opts.NATSCertificateAuthority != nil { + // opts[utils.NatsCertificateAuthority] = *eeC.Opts.NATSCertificateAuthority + // } + // if eeC.Opts.NATSClientCertificate != nil { + // opts[utils.NatsClientCertificate] = *eeC.Opts.NATSClientCertificate + // } + // if eeC.Opts.NATSClientKey != nil { + // opts[utils.NatsClientKey] = *eeC.Opts.NATSClientKey + // } + // if eeC.Opts.NATSJetStreamMaxWait != nil { + // opts[utils.NatsJetStreamMaxWait] = eeC.Opts.NATSJetStreamMaxWait.String() + // } + flgs := eeC.Flags.SliceFlags() if flgs == nil { flgs = []string{} diff --git a/config/erscfg.go b/config/erscfg.go index 27d9d9118..9af36a456 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -135,6 +135,68 @@ func (erS *ERsCfg) AsMapInterface(separator string) (initialMP map[string]interf return } +type EventReaderOpts struct { + PartialPath *string + PartialCacheAction *string + PartialOrderField *string + PartialCSVFieldSeparator *string + CSVRowLength *int + CSVFieldSeparator *string + CSVHeaderDefineChar *string + CSVLazyQuotes *bool + XMLRootPath *string + AMQPQueueID *string + AMQPQueueIDProcessed *string + AMQPConsumerTag *string + AMQPExchange *string + AMQPExchangeType *string + AMQPRoutingKey *string + AMQPExchangeProcessed *string + AMQPExchangeTypeProcessed *string + AMQPRoutingKeyProcessed *string + KafkaTopic *string + KafkaGroupID *string + KafkaMaxWait *time.Duration + KafkaTopicProcessed *string + SQLDBName *string + SQLTableName *string + SSLMode *string + SQLDBNameProcessed *string + SQLTableNameProcessed *string + SSLModeProcessed *string + AWSRegion *string + AWSKey *string + AWSSecret *string + AWSToken *string + AWSRegionProcessed *string + AWSKeyProcessed *string + AWSSecretProcessed *string + AWSTokenProcessed *string + SQSQueueID *string + SQSQueueIDProcessed *string + S3BucketID *string + S3FolderPathProcessed *string + S3BucketIDProcessed *string + NATSJetStream *bool + NATSConsumerName *string + NATSSubject *string + NATSQueueID *string + NATSJWTFile *string + NATSSeedFile *string + NATSCertificateAuthority *string + NATSClientCertificate *string + NATSClientKey *string + NATSJetStreamMaxWait *time.Duration + NATSJetStreamProcessed *bool + NATSSubjectProcessed *string + NATSJWTFileProcessed *string + NATSSeedFileProcessed *string + NATSCertificateAuthorityProcessed *string + NATSClientCertificateProcessed *string + NATSClientKeyProcessed *string + NATSJetStreamMaxWaitProcessed *time.Duration +} + // EventReaderCfg the event for the Event Reader type EventReaderCfg struct { ID string @@ -153,6 +215,202 @@ type EventReaderCfg struct { CacheDumpFields []*FCTemplate } +func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) { + if jsnCfg == nil { + return + } + if jsnCfg.PartialPath != nil { + erOpts.PartialPath = jsnCfg.PartialPath + } + if jsnCfg.PartialCacheAction != nil { + erOpts.PartialCacheAction = jsnCfg.PartialCacheAction + } + if jsnCfg.PartialOrderField != nil { + erOpts.PartialOrderField = jsnCfg.PartialOrderField + } + if jsnCfg.PartialCSVFieldSeparator != nil { + erOpts.PartialCSVFieldSeparator = jsnCfg.PartialCSVFieldSeparator + } + if jsnCfg.CSVRowLength != nil { + erOpts.CSVRowLength = jsnCfg.CSVRowLength + } + if jsnCfg.CSVFieldSeparator != nil { + erOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator + } + if jsnCfg.CSVHeaderDefineChar != nil { + erOpts.CSVHeaderDefineChar = jsnCfg.CSVHeaderDefineChar + } + if jsnCfg.CSVLazyQuotes != nil { + erOpts.CSVLazyQuotes = jsnCfg.CSVLazyQuotes + } + if jsnCfg.XMLRootPath != nil { + erOpts.XMLRootPath = jsnCfg.XMLRootPath + } + if jsnCfg.AMQPQueueID != nil { + erOpts.AMQPQueueID = jsnCfg.AMQPQueueID + } + if jsnCfg.AMQPQueueIDProcessed != nil { + erOpts.AMQPQueueIDProcessed = jsnCfg.AMQPQueueIDProcessed + } + if jsnCfg.AMQPConsumerTag != nil { + erOpts.AMQPConsumerTag = jsnCfg.AMQPConsumerTag + } + if jsnCfg.AMQPExchange != nil { + erOpts.AMQPExchange = jsnCfg.AMQPExchange + } + if jsnCfg.AMQPExchangeType != nil { + erOpts.AMQPExchangeType = jsnCfg.AMQPExchangeType + } + if jsnCfg.AMQPRoutingKey != nil { + erOpts.AMQPRoutingKey = jsnCfg.AMQPRoutingKey + } + if jsnCfg.AMQPExchangeProcessed != nil { + erOpts.AMQPExchangeProcessed = jsnCfg.AMQPExchangeProcessed + } + if jsnCfg.AMQPExchangeTypeProcessed != nil { + erOpts.AMQPExchangeTypeProcessed = jsnCfg.AMQPExchangeTypeProcessed + } + if jsnCfg.AMQPRoutingKeyProcessed != nil { + erOpts.AMQPRoutingKeyProcessed = jsnCfg.AMQPRoutingKeyProcessed + } + if jsnCfg.KafkaTopic != nil { + erOpts.KafkaTopic = jsnCfg.KafkaTopic + } + if jsnCfg.KafkaGroupID != nil { + erOpts.KafkaGroupID = jsnCfg.KafkaGroupID + } + if jsnCfg.KafkaMaxWait != nil { + var kafkaMaxWait time.Duration + if kafkaMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.KafkaMaxWait); err != nil { + return + } + erOpts.KafkaMaxWait = utils.DurationPointer(kafkaMaxWait) + } + if jsnCfg.KafkaTopicProcessed != nil { + erOpts.KafkaTopicProcessed = jsnCfg.KafkaTopicProcessed + } + if jsnCfg.SQLDBName != nil { + erOpts.SQLDBName = jsnCfg.SQLDBName + } + if jsnCfg.SQLTableName != nil { + erOpts.SQLTableName = jsnCfg.SQLTableName + } + if jsnCfg.SSLMode != nil { + erOpts.SSLMode = jsnCfg.SSLMode + } + if jsnCfg.SQLDBNameProcessed != nil { + erOpts.SQLDBNameProcessed = jsnCfg.SQLDBNameProcessed + } + if jsnCfg.SQLTableNameProcessed != nil { + erOpts.SQLTableNameProcessed = jsnCfg.SQLTableNameProcessed + } + if jsnCfg.SSLModeProcessed != nil { + erOpts.SSLModeProcessed = jsnCfg.SSLModeProcessed + } + if jsnCfg.AWSRegion != nil { + erOpts.AWSRegion = jsnCfg.AWSRegion + } + if jsnCfg.AWSKey != nil { + erOpts.AWSKey = jsnCfg.AWSKey + } + if jsnCfg.AWSSecret != nil { + erOpts.AWSSecret = jsnCfg.AWSSecret + } + if jsnCfg.AWSToken != nil { + erOpts.AWSToken = jsnCfg.AWSToken + } + if jsnCfg.AWSRegionProcessed != nil { + erOpts.AWSRegionProcessed = jsnCfg.AWSRegionProcessed + } + if jsnCfg.AWSKeyProcessed != nil { + erOpts.AWSKeyProcessed = jsnCfg.AWSKeyProcessed + } + if jsnCfg.AWSSecretProcessed != nil { + erOpts.AWSSecretProcessed = jsnCfg.AWSSecretProcessed + } + if jsnCfg.AWSTokenProcessed != nil { + erOpts.AWSTokenProcessed = jsnCfg.AWSTokenProcessed + } + if jsnCfg.SQSQueueID != nil { + erOpts.SQSQueueID = jsnCfg.SQSQueueID + } + if jsnCfg.SQSQueueIDProcessed != nil { + erOpts.SQSQueueIDProcessed = jsnCfg.SQSQueueIDProcessed + } + if jsnCfg.S3BucketID != nil { + erOpts.S3BucketID = jsnCfg.S3BucketID + } + if jsnCfg.S3FolderPathProcessed != nil { + erOpts.S3FolderPathProcessed = jsnCfg.S3FolderPathProcessed + } + if jsnCfg.S3BucketIDProcessed != nil { + erOpts.S3BucketIDProcessed = jsnCfg.S3BucketIDProcessed + } + if jsnCfg.NATSJetStream != nil { + erOpts.NATSJetStream = jsnCfg.NATSJetStream + } + if jsnCfg.NATSConsumerName != nil { + erOpts.NATSConsumerName = jsnCfg.NATSConsumerName + } + if jsnCfg.NATSSubject != nil { + erOpts.NATSSubject = jsnCfg.NATSSubject + } + if jsnCfg.NATSQueueID != nil { + erOpts.NATSQueueID = jsnCfg.NATSQueueID + } + if jsnCfg.NATSJWTFile != nil { + erOpts.NATSJWTFile = jsnCfg.NATSJWTFile + } + if jsnCfg.NATSSeedFile != nil { + erOpts.NATSSeedFile = jsnCfg.NATSSeedFile + } + if jsnCfg.NATSCertificateAuthority != nil { + erOpts.NATSCertificateAuthority = jsnCfg.NATSCertificateAuthority + } + if jsnCfg.NATSClientCertificate != nil { + erOpts.NATSClientCertificate = jsnCfg.NATSClientCertificate + } + if jsnCfg.NATSClientKey != nil { + erOpts.NATSClientKey = jsnCfg.NATSClientKey + } + if jsnCfg.NATSJetStreamMaxWait != nil { + var jetStreamMaxWait time.Duration + if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil { + return + } + erOpts.NATSJetStreamMaxWait = utils.DurationPointer(jetStreamMaxWait) + } + if jsnCfg.NATSJetStreamProcessed != nil { + erOpts.NATSJetStreamProcessed = jsnCfg.NATSJetStreamProcessed + } + if jsnCfg.NATSSubjectProcessed != nil { + erOpts.NATSSubjectProcessed = jsnCfg.NATSSubjectProcessed + } + if jsnCfg.NATSJWTFileProcessed != nil { + erOpts.NATSJWTFileProcessed = jsnCfg.NATSJWTFileProcessed + } + if jsnCfg.NATSSeedFileProcessed != nil { + erOpts.NATSSeedFileProcessed = jsnCfg.NATSSeedFileProcessed + } + if jsnCfg.NATSCertificateAuthorityProcessed != nil { + erOpts.NATSCertificateAuthorityProcessed = jsnCfg.NATSCertificateAuthorityProcessed + } + if jsnCfg.NATSClientCertificateProcessed != nil { + erOpts.NATSClientCertificateProcessed = jsnCfg.NATSClientCertificateProcessed + } + if jsnCfg.NATSClientKeyProcessed != nil { + erOpts.NATSClientKeyProcessed = jsnCfg.NATSClientKeyProcessed + } + if jsnCfg.NATSJetStreamMaxWaitProcessed != nil { + var jetStreamMaxWait time.Duration + if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWaitProcessed); err != nil { + return + } + erOpts.NATSJetStreamMaxWaitProcessed = utils.DurationPointer(jetStreamMaxWait) + } + return +} + func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplates map[string][]*FCTemplate, sep string) (err error) { if jsnCfg == nil { return @@ -232,6 +490,188 @@ func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplat return } +func (erOpts *EventReaderOpts) Clone() *EventReaderOpts { + cln := &EventReaderOpts{} + if erOpts.PartialPath != nil { + cln.PartialPath = utils.StringPointer(*erOpts.PartialPath) + } + if erOpts.PartialCacheAction != nil { + cln.PartialCacheAction = utils.StringPointer(*erOpts.PartialCacheAction) + } + if erOpts.PartialOrderField != nil { + cln.PartialOrderField = utils.StringPointer(*erOpts.PartialOrderField) + } + if erOpts.PartialCSVFieldSeparator != nil { + cln.PartialCSVFieldSeparator = utils.StringPointer(*erOpts.PartialCSVFieldSeparator) + } + if erOpts.CSVRowLength != nil { + cln.CSVRowLength = utils.IntPointer(*erOpts.CSVRowLength) + } + if erOpts.CSVFieldSeparator != nil { + cln.CSVFieldSeparator = utils.StringPointer(*erOpts.CSVFieldSeparator) + } + if erOpts.CSVHeaderDefineChar != nil { + cln.CSVHeaderDefineChar = utils.StringPointer(*erOpts.CSVHeaderDefineChar) + } + if erOpts.CSVLazyQuotes != nil { + cln.CSVLazyQuotes = utils.BoolPointer(*erOpts.CSVLazyQuotes) + } + if erOpts.XMLRootPath != nil { + cln.XMLRootPath = utils.StringPointer(*erOpts.XMLRootPath) + } + if erOpts.AMQPQueueID != nil { + cln.AMQPQueueID = utils.StringPointer(*erOpts.AMQPQueueID) + } + if erOpts.AMQPQueueIDProcessed != nil { + cln.AMQPQueueIDProcessed = utils.StringPointer(*erOpts.AMQPQueueIDProcessed) + } + if erOpts.AMQPConsumerTag != nil { + cln.AMQPConsumerTag = utils.StringPointer(*erOpts.AMQPConsumerTag) + } + if erOpts.AMQPExchange != nil { + cln.AMQPExchange = utils.StringPointer(*erOpts.AMQPExchange) + } + if erOpts.AMQPExchangeType != nil { + cln.AMQPExchangeType = utils.StringPointer(*erOpts.AMQPExchangeType) + } + if erOpts.AMQPRoutingKey != nil { + cln.AMQPRoutingKey = utils.StringPointer(*erOpts.AMQPRoutingKey) + } + if erOpts.AMQPExchangeProcessed != nil { + cln.AMQPExchangeProcessed = utils.StringPointer(*erOpts.AMQPExchangeProcessed) + } + if erOpts.AMQPExchangeTypeProcessed != nil { + cln.AMQPExchangeTypeProcessed = utils.StringPointer(*erOpts.AMQPExchangeTypeProcessed) + } + if erOpts.AMQPRoutingKeyProcessed != nil { + cln.AMQPRoutingKeyProcessed = utils.StringPointer(*erOpts.AMQPRoutingKeyProcessed) + } + if erOpts.KafkaTopic != nil { + cln.KafkaTopic = utils.StringPointer(*erOpts.KafkaTopic) + } + if erOpts.KafkaGroupID != nil { + cln.KafkaGroupID = utils.StringPointer(*erOpts.KafkaGroupID) + } + if erOpts.KafkaMaxWait != nil { + cln.KafkaMaxWait = utils.DurationPointer(*erOpts.KafkaMaxWait) + } + if erOpts.KafkaTopicProcessed != nil { + cln.KafkaTopicProcessed = utils.StringPointer(*erOpts.KafkaTopicProcessed) + } + if erOpts.SQLDBName != nil { + cln.SQLDBName = utils.StringPointer(*erOpts.SQLDBName) + } + if erOpts.SQLTableName != nil { + cln.SQLTableName = utils.StringPointer(*erOpts.SQLTableName) + } + if erOpts.SSLMode != nil { + cln.SSLMode = utils.StringPointer(*erOpts.SSLMode) + } + if erOpts.SQLDBNameProcessed != nil { + cln.SQLDBNameProcessed = utils.StringPointer(*erOpts.SQLDBNameProcessed) + } + if erOpts.SQLTableNameProcessed != nil { + cln.SQLTableNameProcessed = utils.StringPointer(*erOpts.SQLTableNameProcessed) + } + if erOpts.SSLModeProcessed != nil { + cln.SSLModeProcessed = utils.StringPointer(*erOpts.SSLModeProcessed) + } + if erOpts.AWSRegion != nil { + cln.AWSRegion = utils.StringPointer(*erOpts.AWSRegion) + } + if erOpts.AWSKey != nil { + cln.AWSKey = utils.StringPointer(*erOpts.AWSKey) + } + if erOpts.AWSSecret != nil { + cln.AWSSecret = utils.StringPointer(*erOpts.AWSSecret) + } + if erOpts.AWSToken != nil { + cln.AWSToken = utils.StringPointer(*erOpts.AWSToken) + } + if erOpts.AWSRegionProcessed != nil { + cln.AWSRegionProcessed = utils.StringPointer(*erOpts.AWSRegionProcessed) + } + if erOpts.AWSKeyProcessed != nil { + cln.AWSKeyProcessed = utils.StringPointer(*erOpts.AWSKeyProcessed) + } + if erOpts.AWSSecretProcessed != nil { + cln.AWSSecretProcessed = utils.StringPointer(*erOpts.AWSSecretProcessed) + } + if erOpts.AWSTokenProcessed != nil { + cln.AWSTokenProcessed = utils.StringPointer(*erOpts.AWSTokenProcessed) + } + if erOpts.SQSQueueID != nil { + cln.SQSQueueID = utils.StringPointer(*erOpts.SQSQueueID) + } + if erOpts.SQSQueueIDProcessed != nil { + cln.SQSQueueIDProcessed = utils.StringPointer(*erOpts.SQSQueueIDProcessed) + } + if erOpts.S3BucketID != nil { + cln.S3BucketID = utils.StringPointer(*erOpts.S3BucketID) + } + if erOpts.S3FolderPathProcessed != nil { + cln.S3FolderPathProcessed = utils.StringPointer(*erOpts.S3FolderPathProcessed) + } + if erOpts.S3BucketIDProcessed != nil { + cln.S3BucketIDProcessed = utils.StringPointer(*erOpts.S3BucketIDProcessed) + } + if erOpts.NATSJetStream != nil { + cln.NATSJetStream = utils.BoolPointer(*erOpts.NATSJetStream) + } + if erOpts.NATSConsumerName != nil { + cln.NATSConsumerName = utils.StringPointer(*erOpts.NATSConsumerName) + } + if erOpts.NATSSubject != nil { + cln.NATSSubject = utils.StringPointer(*erOpts.NATSSubject) + } + if erOpts.NATSQueueID != nil { + cln.NATSQueueID = utils.StringPointer(*erOpts.NATSQueueID) + } + if erOpts.NATSJWTFile != nil { + cln.NATSJWTFile = utils.StringPointer(*erOpts.NATSJWTFile) + } + if erOpts.NATSSeedFile != nil { + cln.NATSSeedFile = utils.StringPointer(*erOpts.NATSSeedFile) + } + if erOpts.NATSCertificateAuthority != nil { + cln.NATSCertificateAuthority = utils.StringPointer(*erOpts.NATSCertificateAuthority) + } + if erOpts.NATSClientCertificate != nil { + cln.NATSClientCertificate = utils.StringPointer(*erOpts.NATSClientCertificate) + } + if erOpts.NATSClientKey != nil { + cln.NATSClientKey = utils.StringPointer(*erOpts.NATSClientKey) + } + if erOpts.NATSJetStreamMaxWait != nil { + cln.NATSJetStreamMaxWait = utils.DurationPointer(*erOpts.NATSJetStreamMaxWait) + } + if erOpts.NATSJetStreamProcessed != nil { + cln.NATSJetStreamProcessed = utils.BoolPointer(*erOpts.NATSJetStreamProcessed) + } + if erOpts.NATSSubjectProcessed != nil { + cln.NATSSubjectProcessed = utils.StringPointer(*erOpts.NATSSubjectProcessed) + } + if erOpts.NATSJWTFileProcessed != nil { + cln.NATSJWTFileProcessed = utils.StringPointer(*erOpts.NATSJWTFileProcessed) + } + if erOpts.NATSSeedFileProcessed != nil { + cln.NATSSeedFileProcessed = utils.StringPointer(*erOpts.NATSSeedFileProcessed) + } + if erOpts.NATSCertificateAuthorityProcessed != nil { + cln.NATSCertificateAuthorityProcessed = utils.StringPointer(*erOpts.NATSCertificateAuthorityProcessed) + } + if erOpts.NATSClientCertificateProcessed != nil { + cln.NATSClientCertificateProcessed = utils.StringPointer(*erOpts.NATSClientCertificateProcessed) + } + if erOpts.NATSClientKeyProcessed != nil { + cln.NATSClientKeyProcessed = utils.StringPointer(*erOpts.NATSClientKeyProcessed) + } + if erOpts.NATSJetStreamMaxWaitProcessed != nil { + cln.NATSJetStreamMaxWaitProcessed = utils.DurationPointer(*erOpts.NATSJetStreamMaxWaitProcessed) + } + return cln +} + // Clone returns a deep copy of EventReaderCfg func (er EventReaderCfg) Clone() (cln *EventReaderCfg) { cln = &EventReaderCfg{ @@ -278,6 +718,186 @@ func (er EventReaderCfg) Clone() (cln *EventReaderCfg) { // AsMapInterface returns the config as a map[string]interface{} func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string]interface{}) { + // opts := map[string]interface{}{} + + // if er.Opts.PartialPath != nil { + // opts[utils.PartialPathOpt] = *er.Opts.PartialPath + // } + // if er.Opts.PartialCacheAction != nil { + // opts[utils.PartialCacheActionOpt] = *er.Opts.PartialCacheAction + // } + // if er.Opts.PartialOrderField != nil { + // opts[utils.PartialOrderFieldOpt] = *er.Opts.PartialOrderField + // } + // if er.Opts.PartialCSVFieldSeparator != nil { + // opts[utils.PartialCSVFieldSepartorOpt] = *er.Opts.PartialCSVFieldSeparator + // } + // if er.Opts.CSVRowLength != nil { + // opts[utils.CSVRowLengthOpt] = *er.Opts.CSVRowLength + // } + // if er.Opts.CSVFieldSeparator != nil { + // opts[utils.CSVFieldSepOpt] = *er.Opts.CSVFieldSeparator + // } + // if er.Opts.CSVHeaderDefineChar != nil { + // opts[utils.HeaderDefineCharOpt] = *er.Opts.CSVHeaderDefineChar + // } + // if er.Opts.CSVLazyQuotes != nil { + // opts[utils.CSVLazyQuotes] = *er.Opts.CSVLazyQuotes + // } + // if er.Opts.XMLRootPath != nil { + // opts[utils.XMLRootPathOpt] = *er.Opts.XMLRootPath + // } + // if er.Opts.AMQPQueueID != nil { + // opts[utils.AMQPQueueID] = *er.Opts.AMQPQueueID + // } + // if er.Opts.AMQPQueueIDProcessed != nil { + // opts[utils.AMQPQueueIDProcessedCfg] = *er.Opts.AMQPQueueIDProcessed + // } + // if er.Opts.AMQPConsumerTag != nil { + // opts[utils.AMQPConsumerTag] = *er.Opts.AMQPConsumerTag + // } + // if er.Opts.AMQPExchange != nil { + // opts[utils.AMQPExchange] = *er.Opts.AMQPExchange + // } + // if er.Opts.AMQPExchangeType != nil { + // opts[utils.AMQPExchangeType] = *er.Opts.AMQPExchangeType + // } + // if er.Opts.AMQPRoutingKey != nil { + // opts[utils.AMQPRoutingKey] = *er.Opts.AMQPRoutingKey + // } + // if er.Opts.AMQPExchangeProcessed != nil { + // opts[utils.AMQPExchangeProcessedCfg] = *er.Opts.AMQPExchangeProcessed + // } + // if er.Opts.AMQPExchangeTypeProcessed != nil { + // opts[utils.AMQPExchangeTypeProcessedCfg] = *er.Opts.AMQPExchangeTypeProcessed + // } + // if er.Opts.AMQPRoutingKeyProcessed != nil { + // opts[utils.AMQPRoutingKeyProcessedCfg] = *er.Opts.AMQPRoutingKeyProcessed + // } + // if er.Opts.KafkaTopic != nil { + // opts[utils.KafkaTopic] = *er.Opts.KafkaTopic + // } + // if er.Opts.KafkaGroupID != nil { + // opts[utils.KafkaGroupID] = *er.Opts.KafkaGroupID + // } + // if er.Opts.KafkaMaxWait != nil { + // opts[utils.KafkaMaxWait] = er.Opts.KafkaMaxWait.String() + // } + // if er.Opts.KafkaTopicProcessed != nil { + // opts[utils.KafkaTopicProcessedCfg] = *er.Opts.KafkaTopicProcessed + // } + // if er.Opts.SQLDBName != nil { + // opts[utils.SQLDBNameOpt] = *er.Opts.SQLDBName + // } + // if er.Opts.SQLTableName != nil { + // opts[utils.SQLTableNameOpt] = *er.Opts.SQLTableName + // } + // if er.Opts.SSLMode != nil { + // opts[utils.SSLModeCfg] = *er.Opts.SSLMode + // } + // if er.Opts.SQLDBNameProcessed != nil { + // opts[utils.SQLDBNameProcessedCfg] = *er.Opts.SQLDBNameProcessed + // } + // if er.Opts.SQLTableNameProcessed != nil { + // opts[utils.SQLTableNameProcessedCfg] = *er.Opts.SQLTableNameProcessed + // } + // if er.Opts.SSLModeProcessed != nil { + // opts[utils.SSLModeProcessedCfg] = *er.Opts.SSLModeProcessed + // } + // if er.Opts.AWSRegion != nil { + // opts[utils.AWSRegion] = *er.Opts.AWSRegion + // } + // if er.Opts.AWSKey != nil { + // opts[utils.AWSKey] = *er.Opts.AWSKey + // } + // if er.Opts.AWSSecret != nil { + // opts[utils.AWSSecret] = *er.Opts.AWSSecret + // } + // if er.Opts.AWSToken != nil { + // opts[utils.AWSToken] = *er.Opts.AWSToken + // } + // if er.Opts.AWSRegionProcessed != nil { + // opts[utils.AWSRegionProcessedCfg] = *er.Opts.AWSRegionProcessed + // } + // if er.Opts.AWSKeyProcessed != nil { + // opts[utils.AWSKeyProcessedCfg] = *er.Opts.AWSKeyProcessed + // } + // if er.Opts.AWSSecretProcessed != nil { + // opts[utils.AWSSecretProcessedCfg] = *er.Opts.AWSSecretProcessed + // } + // if er.Opts.AWSTokenProcessed != nil { + // opts[utils.AWSTokenProcessedCfg] = *er.Opts.AWSTokenProcessed + // } + // if er.Opts.SQSQueueID != nil { + // opts[utils.SQSQueueID] = *er.Opts.SQSQueueID + // } + // if er.Opts.SQSQueueIDProcessed != nil { + // opts[utils.SQSQueueIDProcessedCfg] = *er.Opts.SQSQueueIDProcessed + // } + // if er.Opts.S3BucketID != nil { + // opts[utils.S3Bucket] = *er.Opts.S3BucketID + // } + // if er.Opts.S3FolderPathProcessed != nil { + // opts[utils.S3FolderPathProcessedCfg] = *er.Opts.S3FolderPathProcessed + // } + // if er.Opts.S3BucketIDProcessed != nil { + // opts[utils.S3BucketIDProcessedCfg] = *er.Opts.S3BucketIDProcessed + // } + // if er.Opts.NATSJetStream != nil { + // opts[utils.NatsJetStream] = *er.Opts.NATSJetStream + // } + // if er.Opts.NATSConsumerName != nil { + // opts[utils.NatsConsumerName] = *er.Opts.NATSConsumerName + // } + // if er.Opts.NATSSubject != nil { + // opts[utils.NatsSubject] = *er.Opts.NATSSubject + // } + // if er.Opts.NATSQueueID != nil { + // opts[utils.NatsQueueID] = *er.Opts.NATSQueueID + // } + // if er.Opts.NATSJWTFile != nil { + // opts[utils.NatsJWTFile] = *er.Opts.NATSJWTFile + // } + // if er.Opts.NATSSeedFile != nil { + // opts[utils.NatsSeedFile] = *er.Opts.NATSSeedFile + // } + // if er.Opts.NATSCertificateAuthority != nil { + // opts[utils.NatsCertificateAuthority] = *er.Opts.NATSCertificateAuthority + // } + // if er.Opts.NATSClientCertificate != nil { + // opts[utils.NatsClientCertificate] = *er.Opts.NATSClientCertificate + // } + // if er.Opts.NATSClientKey != nil { + // opts[utils.NatsClientKey] = *er.Opts.NATSClientKey + // } + // if er.Opts.NATSJetStreamMaxWait != nil { + // opts[utils.NatsJetStreamMaxWait] = er.Opts.NATSJetStreamMaxWait.String() + // } + // if er.Opts.NATSJetStreamProcessed != nil { + // opts[utils.NATSJetStreamProcessedCfg] = *er.Opts.NATSJetStreamProcessed + // } + // if er.Opts.NATSSubjectProcessed != nil { + // opts[utils.NATSSubjectProcessedCfg] = *er.Opts.NATSSubjectProcessed + // } + // if er.Opts.NATSJWTFileProcessed != nil { + // opts[utils.NATSJWTFileProcessedCfg] = *er.Opts.NATSJWTFileProcessed + // } + // if er.Opts.NATSSeedFileProcessed != nil { + // opts[utils.NATSSeedFileProcessedCfg] = *er.Opts.NATSSeedFileProcessed + // } + // if er.Opts.NATSCertificateAuthorityProcessed != nil { + // opts[utils.NATSCertificateAuthorityProcessedCfg] = *er.Opts.NATSCertificateAuthorityProcessed + // } + // if er.Opts.NATSClientCertificateProcessed != nil { + // opts[utils.NATSClientCertificateProcessed] = *er.Opts.NATSClientCertificateProcessed + // } + // if er.Opts.NATSClientKeyProcessed != nil { + // opts[utils.NATSClientKeyProcessedCfg] = *er.Opts.NATSClientKeyProcessed + // } + // if er.Opts.NATSJetStreamMaxWaitProcessed != nil { + // opts[utils.NATSJetStreamMaxWaitProcessedCfg] = er.Opts.NATSJetStreamMaxWaitProcessed.String() + // } + initialMP = map[string]interface{}{ utils.IDCfg: er.ID, utils.TypeCfg: er.Type, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index beacaee14..648972263 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -181,6 +181,68 @@ type ERsJsonCfg struct { Partial_cache_ttl *string } +type EventReaderOptsJson struct { + PartialPath *string `json:"partialPath"` + PartialCacheAction *string `json:"partialCacheAction"` + PartialOrderField *string `json:"partialOrderField"` + PartialCSVFieldSeparator *string `json:"partialcsvFieldSeparator"` + CSVRowLength *int `json:"csvRowLength"` + CSVFieldSeparator *string `json:"csvFieldSeparator"` + CSVHeaderDefineChar *string `json:"csvHeaderDefineChar"` + CSVLazyQuotes *bool `json:"csvLazyQuotes"` + XMLRootPath *string `json:"xmlRootPath"` + AMQPQueueID *string `json:"amqpQueueID"` + AMQPQueueIDProcessed *string `json:"amqpQueueIDProcessed"` + AMQPConsumerTag *string `json:"amqpConsumerTag"` + AMQPExchange *string `json:"amqpExchange"` + AMQPExchangeType *string `json:"amqpExchangeType"` + AMQPRoutingKey *string `json:"amqpRoutingKey"` + AMQPExchangeProcessed *string `json:"amqpExchangeProcessed"` + AMQPExchangeTypeProcessed *string `json:"amqpExchangeTypeProcessed"` + AMQPRoutingKeyProcessed *string `json:"amqpRoutingKeyProcessed"` + KafkaTopic *string `json:"kafkaTopic"` + KafkaGroupID *string `json:"kafkaGroupID"` + KafkaMaxWait *string `json:"kafkaMaxWait"` + KafkaTopicProcessed *string `json:"kafkaTopicProcessed"` + SQLDBName *string `json:"sqlDBName"` + SQLTableName *string `json:"sqlTableName"` + SSLMode *string `json:"postgresSSLMode"` + SQLDBNameProcessed *string `json:"sqlDBNameProcessed"` + SQLTableNameProcessed *string `json:"sqlTableNameProcessed"` + SSLModeProcessed *string `json:"postgresSSLModeProcessed"` + AWSRegion *string `json:"awsRegion"` + AWSKey *string `json:"awsKey"` + AWSSecret *string `json:"awsSecret"` + AWSToken *string `json:"awsToken"` + AWSRegionProcessed *string `json:"awsRegionProcessed"` + AWSKeyProcessed *string `json:"awsKeyProcessed"` + AWSSecretProcessed *string `json:"awsSecretProcessed"` + AWSTokenProcessed *string `json:"awsTokenProcessed"` + SQSQueueID *string `json:"sqsQueueID"` + SQSQueueIDProcessed *string `json:"sqsQueueIDProcessed"` + S3BucketID *string `json:"s3BucketID"` + S3FolderPathProcessed *string `json:"s3FolderPathProcessed"` + S3BucketIDProcessed *string `json:"s3BucketIDProcessed"` + NATSJetStream *bool `json:"natsJetStream"` + NATSConsumerName *string `json:"natsConsumerName"` + NATSSubject *string `json:"natsSubject"` + NATSQueueID *string `json:"natsQueueID"` + NATSJWTFile *string `json:"natsJWTFile"` + NATSSeedFile *string `json:"natsSeedFile"` + NATSCertificateAuthority *string `json:"natsCertificateAuthority"` + NATSClientCertificate *string `json:"natsClientCertificate"` + NATSClientKey *string `json:"natsClientKey"` + NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"` + NATSJetStreamProcessed *bool `json:"natsJetStreamProcessed"` + NATSSubjectProcessed *string `json:"natsSubjectProcessed"` + NATSJWTFileProcessed *string `json:"natsJWTFileProcessed"` + NATSSeedFileProcessed *string `json:"natsSeedFileProcessed"` + NATSCertificateAuthorityProcessed *string `json:"natsCertificateAuthorityProcessed"` + NATSClientCertificateProcessed *string `json:"natsClientCertificateProcessed"` + NATSClientKeyProcessed *string `json:"natsClientKeyProcessed"` + NATSJetStreamMaxWaitProcessed *string `json:"natsJetStreamMaxWaitProcessed"` +} + // EventReaderSJsonCfg is the configuration of a single EventReader type EventReaderJsonCfg struct { Id *string @@ -207,6 +269,46 @@ type EEsJsonCfg struct { Exporters *[]*EventExporterJsonCfg } +type EventExporterOptsJson struct { + CSVFieldSeparator *string `json:"csvFieldSeparator"` + ElsIndex *string `json:"elsIndex"` + ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"` + ElsIfSeqNo *int `json:"elsIfSeqNo"` + ElsOpType *string `json:"elsOpType"` + ElsPipeline *string `json:"elsPipeline"` + ElsRouting *string `json:"elsRouting"` + ElsTimeout *string `json:"elsTimeout"` + ElsVersion *int `json:"elsVersion"` + ElsVersionType *string `json:"elsVersionType"` + ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"` + SQLMaxIdleConns *int `json:"sqlMaxIdleConns"` + SQLMaxOpenConns *int `json:"sqlMaxOpenConns"` + SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"` + SQLTableName *string `json:"sqlTableName"` + SQLDBName *string `json:"sqlDBName"` + SSLMode *string `json:"sslMode"` + KafkaTopic *string `json:"kafkaTopic"` + AMQPQueueID *string `json:"amqpQueueID"` + AMQPRoutingKey *string `json:"amqpRoutingKey"` + AMQPExchange *string `json:"amqpExchange"` + AMQPExchangeType *string `json:"amqpExchangeType"` + AWSRegion *string `json:"awsRegion"` + AWSKey *string `json:"awsKey"` + AWSSecret *string `json:"awsSecret"` + AWSToken *string `json:"awsToken"` + SQSQueueID *string `json:"sqsQueueID"` + S3BucketID *string `json:"s3BucketID"` + S3FolderPath *string `json:"s3FolderPath"` + NATSJetStream *bool `json:"natsJetStream"` + NATSSubject *string `json:"natsSubject"` + NATSJWTFile *string `json:"natsJWTFile"` + NATSSeedFile *string `json:"natsSeedFile"` + NATSCertificateAuthority *string `json:"natsCertificateAuthority"` + NATSClientCertificate *string `json:"natsClientCertificate"` + NATSClientKey *string `json:"natsClientKey"` + NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"` +} + // EventExporterJsonCfg is the configuration of a single EventExporter type EventExporterJsonCfg struct { Id *string diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 4c9c1d453..8e339dd6e 100755 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -487,12 +487,12 @@ // // SQL // // "sqlMaxIdleConns": 0, // SQLMaxIdleConns // // "sqlMaxOpenConns": 0, // SQLMaxOpenConns -// // "sqlMaxConnLifetime": 0, // SQLMaxConnLifetime +// // "sqlConnMaxLifetime": 0, // SQLConnMaxLifetime // // "sqlTableName":"cdrs", // the name of the table from where the events are exported // // "sqlDBName": "cgrates", // the name of the database from where the events are exported -// // "sslmode": "disable", // the postgresSSLMode for postgres +// // "sslMode": "disable", // the postgresSSLMode for postgres // // Kafka diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index e40d019ae..16bd700f5 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -385,7 +385,7 @@ "postgresSSLMode": "disable", "sqlMaxIdleConns": "10", "sqlMaxOpenConns": "100", - "sqlMaxConnLifetime": "0", + "sqlConnMaxLifetime": "0", }, "fields":[ // in case that the path is *exp.*row user must complete all the fields one to one with his sql schema in the correct order {"tag": "CGRID", "path": "*exp.*row", "type": "*group", "value": "~*req.CGRID"}, @@ -405,7 +405,7 @@ "postgresSSLMode": "disable", "sqlMaxIdleConns": "10", "sqlMaxOpenConns": "100", - "sqlMaxConnLifetime": "0", + "sqlConnMaxLifetime": "0", }, "fields":[ // the path constains *exp.columnName {"tag": "CGRID", "path": "*exp.cgrid", "type": "*variable", "value": "~*req.CGRID"}, diff --git a/ees/sql.go b/ees/sql.go index 89f45edac..253f5b8d3 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -120,7 +120,7 @@ func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, s } sqlDB.SetMaxOpenConns(int(val)) } - if iface, has := opts[utils.SQLMaxConnLifetime]; has { + if iface, has := opts[utils.SQLConnMaxLifetime]; has { val, err := utils.IfaceAsDuration(iface) if err != nil { return nil, nil, err diff --git a/ees/sql_it_test.go b/ees/sql_it_test.go index 262ddc04f..b2e3412de 100644 --- a/ees/sql_it_test.go +++ b/ees/sql_it_test.go @@ -271,7 +271,7 @@ func TestOpenDB2Err(t *testing.T) { func TestOpenDB3(t *testing.T) { dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxConnLifetime: 2}) + _, _, err := openDB(dialect, map[string]interface{}{utils.SQLConnMaxLifetime: 2}) if err != nil { t.Error(err) } @@ -280,7 +280,7 @@ func TestOpenDB3(t *testing.T) { func TestOpenDB3Err(t *testing.T) { dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates")) - _, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxConnLifetime: "test"}) + _, _, err := openDB(dialect, map[string]interface{}{utils.SQLConnMaxLifetime: "test"}) errExpect := "time: invalid duration \"test\"" if err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) diff --git a/utils/consts.go b/utils/consts.go index b8528b1e6..96ca365d9 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2551,7 +2551,7 @@ const ( SQLTableNameOpt = "sqlTableName" SQLMaxOpenConns = "sqlMaxOpenConns" - SQLMaxConnLifetime = "sqlMaxConnLifetime" + SQLConnMaxLifetime = "sqlConnMaxLifetime" // fileCSV CSVRowLengthOpt = "csvRowLength" @@ -2611,6 +2611,37 @@ const ( NatsCertificateAuthority = "natsCertificateAuthority" NatsJetStream = "natsJetStream" NatsJetStreamMaxWait = "natsJetStreamMaxWait" + + // processed opts + AMQPQueueIDProcessedCfg = "amqpQueueIDProcessed" + AMQPExchangeProcessedCfg = "amqpExchangeProcessed" + AMQPExchangeTypeProcessedCfg = "amqpExchangeTypeProcessed" + AMQPRoutingKeyProcessedCfg = "amqpRoutingKeyProcessed" + + KafkaTopicProcessedCfg = "kafkaTopicProcessed" + + SQLDBNameProcessedCfg = "sqlDBNameProcessed" + SQLTableNameProcessedCfg = "sqlTableNameProcessed" + SSLModeProcessedCfg = "sslModeProcessed" + + AWSRegionProcessedCfg = "awsRegionProcessed" + AWSKeyProcessedCfg = "awsKeyProcessed" + AWSSecretProcessedCfg = "awsSecretProcessed" + AWSTokenProcessedCfg = "awsTokenProcessed" + + SQSQueueIDProcessedCfg = "sqsQueueIDProcessed" + + S3FolderPathProcessedCfg = "s3FolderPathProcessed" + S3BucketIDProcessedCfg = "s3BucketIDProcessed" + + NATSJetStreamProcessedCfg = "natsJetStreamProcessed" + NATSSubjectProcessedCfg = "natsSubjectProcessed" + NATSJWTFileProcessedCfg = "natsJWTFileProcessed" + NATSSeedFileProcessedCfg = "natsSeedFileProcessed" + NATSCertificateAuthorityProcessedCfg = "natsCertificateAuthorityProcessed" + NATSClientCertificateProcessed = "natsClientCertificateProcessed" + NATSClientKeyProcessedCfg = "natsClientKeyProcessed" + NATSJetStreamMaxWaitProcessedCfg = "natsJetStreamMaxWaitProcessed" ) // Analyzers constants