Make ees and ers opts into structs (incomplete)

This commit is contained in:
ionutboangiu
2021-11-24 16:32:16 +02:00
committed by Dan Christian Bogos
parent ac171f050e
commit aa0ae292a2
9 changed files with 1164 additions and 10 deletions

View File

@@ -508,12 +508,12 @@ const CGRATES_CFG_JSON = `
// SQL
// "sqlMaxIdleConns": 0, // SQLMaxIdleConns
// "sqlMaxOpenConns": 0, // SQLMaxOpenConns
// "sqlMaxConnLifetime": 0, // SQLMaxConnLifetime
// "sqlConnMaxLifetime": 0, // SQLConnMaxLifetime
// "sqlTableName":"cdrs", // the name of the table from where the events are exported
// "sqlDBName": "cgrates", // the name of the database from where the events are exported
// "sslmode": "disable", // the postgresSSLMode for postgres
// "sslMode": "disable", // the postgresSSLMode for postgres
// Kafka

View File

@@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
import (
"time"
"github.com/cgrates/cgrates/utils"
)
@@ -151,6 +153,46 @@ func (eeS *EEsCfg) AsMapInterface(separator string) (initialMP map[string]interf
return
}
type EventExporterOpts struct {
CSVFieldSeparator *string
ElsIndex *string
ElsIfPrimaryTerm *int
ElsIfSeqNo *int
ElsOpType *string
ElsPipeline *string
ElsRouting *string
ElsTimeout *time.Duration
ElsVersion *int
ElsVersionType *string
ElsWaitForActiveShards *string
SQLMaxIdleConns *int
SQLMaxOpenConns *int
SQLConnMaxLifetime *time.Duration
SQLTableName *string
SQLDBName *string
SSLMode *string
KafkaTopic *string
AMQPRoutingKey *string
AMQPQueueID *string
AMQPExchange *string
AMQPExchangeType *string
AWSRegion *string
AWSKey *string
AWSSecret *string
AWSToken *string
SQSQueueID *string
S3BucketID *string
S3FolderPath *string
NATSJetStream *bool
NATSSubject *string
NATSJWTFile *string
NATSSeedFile *string
NATSCertificateAuthority *string
NATSClientCertificate *string
NATSClientKey *string
NATSJetStreamMaxWait *time.Duration
}
// EventExporterCfg the config for a Event Exporter
type EventExporterCfg struct {
ID string
@@ -172,6 +214,136 @@ type EventExporterCfg struct {
trailerFields []*FCTemplate
}
func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) {
if jsnCfg == nil {
return
}
if jsnCfg.CSVFieldSeparator != nil {
eeOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator
}
if jsnCfg.ElsIndex != nil {
eeOpts.ElsIndex = jsnCfg.ElsIndex
}
if jsnCfg.ElsIfPrimaryTerm != nil {
eeOpts.ElsIfPrimaryTerm = jsnCfg.ElsIfPrimaryTerm
}
if jsnCfg.ElsIfSeqNo != nil {
eeOpts.ElsIfSeqNo = jsnCfg.ElsIfSeqNo
}
if jsnCfg.ElsOpType != nil {
eeOpts.ElsOpType = jsnCfg.ElsOpType
}
if jsnCfg.ElsPipeline != nil {
eeOpts.ElsPipeline = jsnCfg.ElsPipeline
}
if jsnCfg.ElsRouting != nil {
eeOpts.ElsRouting = jsnCfg.ElsRouting
}
if jsnCfg.ElsTimeout != nil {
var elsTimeout time.Duration
if elsTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.ElsTimeout); err != nil {
return
}
eeOpts.ElsTimeout = utils.DurationPointer(elsTimeout)
}
if jsnCfg.ElsVersion != nil {
eeOpts.ElsVersion = jsnCfg.ElsVersion
}
if jsnCfg.ElsVersionType != nil {
eeOpts.ElsVersionType = jsnCfg.ElsVersionType
}
if jsnCfg.ElsWaitForActiveShards != nil {
eeOpts.ElsWaitForActiveShards = jsnCfg.ElsWaitForActiveShards
}
if jsnCfg.SQLMaxIdleConns != nil {
eeOpts.SQLMaxIdleConns = jsnCfg.SQLMaxIdleConns
}
if jsnCfg.SQLMaxOpenConns != nil {
eeOpts.SQLMaxOpenConns = jsnCfg.SQLMaxOpenConns
}
if jsnCfg.SQLConnMaxLifetime != nil {
var sqlConnMaxLifetime time.Duration
if sqlConnMaxLifetime, err = utils.ParseDurationWithNanosecs(*jsnCfg.SQLConnMaxLifetime); err != nil {
return
}
eeOpts.SQLConnMaxLifetime = utils.DurationPointer(sqlConnMaxLifetime)
}
if jsnCfg.SQLTableName != nil {
eeOpts.SQLTableName = jsnCfg.SQLTableName
}
if jsnCfg.SQLDBName != nil {
eeOpts.SQLDBName = jsnCfg.SQLDBName
}
if jsnCfg.SSLMode != nil {
eeOpts.SSLMode = jsnCfg.SSLMode
}
if jsnCfg.KafkaTopic != nil {
eeOpts.KafkaTopic = jsnCfg.KafkaTopic
}
if jsnCfg.AMQPQueueID != nil {
eeOpts.AMQPQueueID = jsnCfg.AMQPQueueID
}
if jsnCfg.AMQPRoutingKey != nil {
eeOpts.AMQPRoutingKey = jsnCfg.AMQPRoutingKey
}
if jsnCfg.AMQPExchange != nil {
eeOpts.AMQPExchange = jsnCfg.AMQPExchange
}
if jsnCfg.AMQPExchangeType != nil {
eeOpts.AMQPExchangeType = jsnCfg.AMQPExchangeType
}
if jsnCfg.AWSRegion != nil {
eeOpts.AWSRegion = jsnCfg.AWSRegion
}
if jsnCfg.AWSKey != nil {
eeOpts.AWSKey = jsnCfg.AWSKey
}
if jsnCfg.AWSSecret != nil {
eeOpts.AWSSecret = jsnCfg.AWSSecret
}
if jsnCfg.AWSToken != nil {
eeOpts.AWSToken = jsnCfg.AWSToken
}
if jsnCfg.SQSQueueID != nil {
eeOpts.SQSQueueID = jsnCfg.SQSQueueID
}
if jsnCfg.S3BucketID != nil {
eeOpts.S3BucketID = jsnCfg.S3BucketID
}
if jsnCfg.S3FolderPath != nil {
eeOpts.S3FolderPath = jsnCfg.S3FolderPath
}
if jsnCfg.NATSJetStream != nil {
eeOpts.NATSJetStream = jsnCfg.NATSJetStream
}
if jsnCfg.NATSSubject != nil {
eeOpts.NATSSubject = jsnCfg.NATSSubject
}
if jsnCfg.NATSJWTFile != nil {
eeOpts.NATSJWTFile = jsnCfg.NATSJWTFile
}
if jsnCfg.NATSSeedFile != nil {
eeOpts.NATSSeedFile = jsnCfg.NATSSeedFile
}
if jsnCfg.NATSCertificateAuthority != nil {
eeOpts.NATSCertificateAuthority = jsnCfg.NATSCertificateAuthority
}
if jsnCfg.NATSClientCertificate != nil {
eeOpts.NATSClientCertificate = jsnCfg.NATSClientCertificate
}
if jsnCfg.NATSClientKey != nil {
eeOpts.NATSClientKey = jsnCfg.NATSClientKey
}
if jsnCfg.NATSJetStreamMaxWait != nil {
var natsJetStreamMaxWait time.Duration
if natsJetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil {
return
}
eeOpts.NATSJetStreamMaxWait = utils.DurationPointer(natsJetStreamMaxWait)
}
return
}
func (eeC *EventExporterCfg) loadFromJSONCfg(jsnEec *EventExporterJsonCfg, msgTemplates map[string][]*FCTemplate, separator string) (err error) {
if jsnEec == nil {
return
@@ -271,6 +443,122 @@ func (eeC *EventExporterCfg) TrailerFields() []*FCTemplate {
return eeC.trailerFields
}
func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
cln := &EventExporterOpts{}
if eeOpts.CSVFieldSeparator != nil {
cln.CSVFieldSeparator = utils.StringPointer(*eeOpts.CSVFieldSeparator)
}
if eeOpts.ElsIndex != nil {
cln.ElsIndex = utils.StringPointer(*eeOpts.ElsIndex)
}
if eeOpts.ElsIfPrimaryTerm != nil {
cln.ElsIfPrimaryTerm = utils.IntPointer(*eeOpts.ElsIfPrimaryTerm)
}
if eeOpts.ElsIfSeqNo != nil {
cln.ElsIfSeqNo = utils.IntPointer(*eeOpts.ElsIfSeqNo)
}
if eeOpts.ElsOpType != nil {
cln.ElsOpType = utils.StringPointer(*eeOpts.ElsOpType)
}
if eeOpts.ElsPipeline != nil {
cln.ElsPipeline = utils.StringPointer(*eeOpts.ElsPipeline)
}
if eeOpts.ElsRouting != nil {
cln.ElsRouting = utils.StringPointer(*eeOpts.ElsRouting)
}
if eeOpts.ElsTimeout != nil {
cln.ElsTimeout = utils.DurationPointer(*eeOpts.ElsTimeout)
}
if eeOpts.ElsVersion != nil {
cln.ElsVersion = utils.IntPointer(*eeOpts.ElsVersion)
}
if eeOpts.ElsVersionType != nil {
cln.ElsVersionType = utils.StringPointer(*eeOpts.ElsVersionType)
}
if eeOpts.ElsWaitForActiveShards != nil {
cln.ElsWaitForActiveShards = utils.StringPointer(*eeOpts.ElsWaitForActiveShards)
}
if eeOpts.SQLMaxIdleConns != nil {
cln.SQLMaxIdleConns = utils.IntPointer(*eeOpts.SQLMaxIdleConns)
}
if eeOpts.SQLMaxOpenConns != nil {
cln.SQLMaxOpenConns = utils.IntPointer(*eeOpts.SQLMaxOpenConns)
}
if eeOpts.SQLConnMaxLifetime != nil {
cln.SQLConnMaxLifetime = utils.DurationPointer(*eeOpts.SQLConnMaxLifetime)
}
if eeOpts.SQLTableName != nil {
cln.SQLTableName = utils.StringPointer(*eeOpts.SQLTableName)
}
if eeOpts.SQLDBName != nil {
cln.SQLDBName = utils.StringPointer(*eeOpts.SQLDBName)
}
if eeOpts.SSLMode != nil {
cln.SSLMode = utils.StringPointer(*eeOpts.SSLMode)
}
if eeOpts.KafkaTopic != nil {
cln.KafkaTopic = utils.StringPointer(*eeOpts.KafkaTopic)
}
if eeOpts.AMQPQueueID != nil {
cln.AMQPQueueID = utils.StringPointer(*eeOpts.AMQPQueueID)
}
if eeOpts.AMQPRoutingKey != nil {
cln.AMQPRoutingKey = utils.StringPointer(*eeOpts.AMQPRoutingKey)
}
if eeOpts.AMQPExchange != nil {
cln.AMQPExchange = utils.StringPointer(*eeOpts.AMQPExchange)
}
if eeOpts.AMQPExchangeType != nil {
cln.AMQPExchangeType = utils.StringPointer(*eeOpts.AMQPExchangeType)
}
if eeOpts.AWSRegion != nil {
cln.AWSRegion = utils.StringPointer(*eeOpts.AWSRegion)
}
if eeOpts.AWSKey != nil {
cln.AWSKey = utils.StringPointer(*eeOpts.AWSKey)
}
if eeOpts.AWSSecret != nil {
cln.AWSSecret = utils.StringPointer(*eeOpts.AWSSecret)
}
if eeOpts.AWSToken != nil {
cln.AWSToken = utils.StringPointer(*eeOpts.AWSToken)
}
if eeOpts.SQSQueueID != nil {
cln.SQSQueueID = utils.StringPointer(*eeOpts.SQSQueueID)
}
if eeOpts.S3BucketID != nil {
cln.S3BucketID = utils.StringPointer(*eeOpts.S3BucketID)
}
if eeOpts.S3FolderPath != nil {
cln.S3FolderPath = utils.StringPointer(*eeOpts.S3FolderPath)
}
if eeOpts.NATSJetStream != nil {
cln.NATSJetStream = utils.BoolPointer(*eeOpts.NATSJetStream)
}
if eeOpts.NATSSubject != nil {
cln.NATSSubject = utils.StringPointer(*eeOpts.NATSSubject)
}
if eeOpts.NATSJWTFile != nil {
cln.NATSJWTFile = utils.StringPointer(*eeOpts.NATSJWTFile)
}
if eeOpts.NATSSeedFile != nil {
cln.NATSSeedFile = utils.StringPointer(*eeOpts.NATSSeedFile)
}
if eeOpts.NATSCertificateAuthority != nil {
cln.NATSCertificateAuthority = utils.StringPointer(*eeOpts.NATSCertificateAuthority)
}
if eeOpts.NATSClientCertificate != nil {
cln.NATSClientCertificate = utils.StringPointer(*eeOpts.NATSClientCertificate)
}
if eeOpts.NATSClientKey != nil {
cln.NATSClientKey = utils.StringPointer(*eeOpts.NATSClientKey)
}
if eeOpts.NATSJetStreamMaxWait != nil {
cln.NATSJetStreamMaxWait = utils.DurationPointer(*eeOpts.NATSJetStreamMaxWait)
}
return cln
}
// Clone returns a deep copy of EventExporterCfg
func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) {
cln = &EventExporterCfg{
@@ -324,6 +612,119 @@ func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) {
// AsMapInterface returns the config as a map[string]interface{}
func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[string]interface{}) {
// opts := map[string]interface{}{}
// if eeC.Opts.CSVFieldSeparator != nil {
// opts[utils.CSVFieldSepOpt] = *eeC.Opts.CSVFieldSeparator
// }
// if eeC.Opts.ElsIndex != nil {
// opts[utils.ElsIndex] = *eeC.Opts.ElsIndex
// }
// if eeC.Opts.ElsIfPrimaryTerm != nil {
// opts[utils.ElsIfPrimaryTerm] = *eeC.Opts.ElsIfPrimaryTerm
// }
// if eeC.Opts.ElsIfSeqNo != nil {
// opts[utils.ElsIfSeqNo] = *eeC.Opts.ElsIfSeqNo
// }
// if eeC.Opts.ElsOpType != nil {
// opts[utils.ElsOpType] = *eeC.Opts.ElsOpType
// }
// if eeC.Opts.ElsPipeline != nil {
// opts[utils.ElsPipeline] = *eeC.Opts.ElsPipeline
// }
// if eeC.Opts.ElsRouting != nil {
// opts[utils.ElsRouting] = *eeC.Opts.ElsRouting
// }
// if eeC.Opts.ElsTimeout != nil {
// opts[utils.ElsTimeout] = eeC.Opts.ElsTimeout.String()
// }
// if eeC.Opts.ElsVersion != nil {
// opts[utils.ElsVersionLow] = *eeC.Opts.ElsVersion
// }
// if eeC.Opts.ElsVersionType != nil {
// opts[utils.ElsVersionType] = *eeC.Opts.ElsVersionType
// }
// if eeC.Opts.ElsWaitForActiveShards != nil {
// opts[utils.ElsWaitForActiveShards] = *eeC.Opts.ElsWaitForActiveShards
// }
// if eeC.Opts.SQLMaxIdleConns != nil {
// opts[utils.SQLMaxIdleConnsCfg] = *eeC.Opts.SQLMaxIdleConns
// }
// if eeC.Opts.SQLMaxOpenConns != nil {
// opts[utils.SQLMaxOpenConns] = *eeC.Opts.SQLMaxOpenConns
// }
// if eeC.Opts.SQLConnMaxLifetime != nil {
// opts[utils.SQLConnMaxLifetime] = eeC.Opts.SQLConnMaxLifetime.String()
// }
// if eeC.Opts.SQLTableName != nil {
// opts[utils.SQLTableNameOpt] = *eeC.Opts.SQLTableName
// }
// if eeC.Opts.SQLDBName != nil {
// opts[utils.SQLDBNameOpt] = *eeC.Opts.SQLDBName
// }
// if eeC.Opts.SSLMode != nil {
// opts[utils.SSLModeCfg] = *eeC.Opts.SSLMode
// }
// if eeC.Opts.KafkaTopic != nil {
// opts[utils.KafkaTopic] = *eeC.Opts.KafkaTopic
// }
// if eeC.Opts.AMQPQueueID != nil {
// opts[utils.AMQPQueueID] = *eeC.Opts.AMQPQueueID
// }
// if eeC.Opts.AMQPRoutingKey != nil {
// opts[utils.AMQPRoutingKey] = *eeC.Opts.AMQPRoutingKey
// }
// if eeC.Opts.AMQPExchange != nil {
// opts[utils.AMQPExchange] = *eeC.Opts.AMQPExchange
// }
// if eeC.Opts.AMQPExchangeType != nil {
// opts[utils.AMQPExchangeType] = *eeC.Opts.AMQPExchangeType
// }
// if eeC.Opts.AWSRegion != nil {
// opts[utils.AWSRegion] = *eeC.Opts.AWSRegion
// }
// if eeC.Opts.AWSKey != nil {
// opts[utils.AWSKey] = *eeC.Opts.AWSKey
// }
// if eeC.Opts.AWSSecret != nil {
// opts[utils.AWSSecret] = *eeC.Opts.AWSSecret
// }
// if eeC.Opts.AWSToken != nil {
// opts[utils.AWSToken] = *eeC.Opts.AWSToken
// }
// if eeC.Opts.SQSQueueID != nil {
// opts[utils.SQSQueueID] = *eeC.Opts.SQSQueueID
// }
// if eeC.Opts.S3BucketID != nil {
// opts[utils.S3Bucket] = *eeC.Opts.S3BucketID
// }
// if eeC.Opts.S3FolderPath != nil {
// opts[utils.S3FolderPath] = *eeC.Opts.S3FolderPath
// }
// if eeC.Opts.NATSJetStream != nil {
// opts[utils.NatsJetStream] = *eeC.Opts.NATSJetStream
// }
// if eeC.Opts.NATSSubject != nil {
// opts[utils.NatsSubject] = *eeC.Opts.NATSSubject
// }
// if eeC.Opts.NATSJWTFile != nil {
// opts[utils.NatsJWTFile] = *eeC.Opts.NATSJWTFile
// }
// if eeC.Opts.NATSSeedFile != nil {
// opts[utils.NatsSeedFile] = *eeC.Opts.NATSSeedFile
// }
// if eeC.Opts.NATSCertificateAuthority != nil {
// opts[utils.NatsCertificateAuthority] = *eeC.Opts.NATSCertificateAuthority
// }
// if eeC.Opts.NATSClientCertificate != nil {
// opts[utils.NatsClientCertificate] = *eeC.Opts.NATSClientCertificate
// }
// if eeC.Opts.NATSClientKey != nil {
// opts[utils.NatsClientKey] = *eeC.Opts.NATSClientKey
// }
// if eeC.Opts.NATSJetStreamMaxWait != nil {
// opts[utils.NatsJetStreamMaxWait] = eeC.Opts.NATSJetStreamMaxWait.String()
// }
flgs := eeC.Flags.SliceFlags()
if flgs == nil {
flgs = []string{}

View File

@@ -135,6 +135,68 @@ func (erS *ERsCfg) AsMapInterface(separator string) (initialMP map[string]interf
return
}
type EventReaderOpts struct {
PartialPath *string
PartialCacheAction *string
PartialOrderField *string
PartialCSVFieldSeparator *string
CSVRowLength *int
CSVFieldSeparator *string
CSVHeaderDefineChar *string
CSVLazyQuotes *bool
XMLRootPath *string
AMQPQueueID *string
AMQPQueueIDProcessed *string
AMQPConsumerTag *string
AMQPExchange *string
AMQPExchangeType *string
AMQPRoutingKey *string
AMQPExchangeProcessed *string
AMQPExchangeTypeProcessed *string
AMQPRoutingKeyProcessed *string
KafkaTopic *string
KafkaGroupID *string
KafkaMaxWait *time.Duration
KafkaTopicProcessed *string
SQLDBName *string
SQLTableName *string
SSLMode *string
SQLDBNameProcessed *string
SQLTableNameProcessed *string
SSLModeProcessed *string
AWSRegion *string
AWSKey *string
AWSSecret *string
AWSToken *string
AWSRegionProcessed *string
AWSKeyProcessed *string
AWSSecretProcessed *string
AWSTokenProcessed *string
SQSQueueID *string
SQSQueueIDProcessed *string
S3BucketID *string
S3FolderPathProcessed *string
S3BucketIDProcessed *string
NATSJetStream *bool
NATSConsumerName *string
NATSSubject *string
NATSQueueID *string
NATSJWTFile *string
NATSSeedFile *string
NATSCertificateAuthority *string
NATSClientCertificate *string
NATSClientKey *string
NATSJetStreamMaxWait *time.Duration
NATSJetStreamProcessed *bool
NATSSubjectProcessed *string
NATSJWTFileProcessed *string
NATSSeedFileProcessed *string
NATSCertificateAuthorityProcessed *string
NATSClientCertificateProcessed *string
NATSClientKeyProcessed *string
NATSJetStreamMaxWaitProcessed *time.Duration
}
// EventReaderCfg the event for the Event Reader
type EventReaderCfg struct {
ID string
@@ -153,6 +215,202 @@ type EventReaderCfg struct {
CacheDumpFields []*FCTemplate
}
func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
if jsnCfg == nil {
return
}
if jsnCfg.PartialPath != nil {
erOpts.PartialPath = jsnCfg.PartialPath
}
if jsnCfg.PartialCacheAction != nil {
erOpts.PartialCacheAction = jsnCfg.PartialCacheAction
}
if jsnCfg.PartialOrderField != nil {
erOpts.PartialOrderField = jsnCfg.PartialOrderField
}
if jsnCfg.PartialCSVFieldSeparator != nil {
erOpts.PartialCSVFieldSeparator = jsnCfg.PartialCSVFieldSeparator
}
if jsnCfg.CSVRowLength != nil {
erOpts.CSVRowLength = jsnCfg.CSVRowLength
}
if jsnCfg.CSVFieldSeparator != nil {
erOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator
}
if jsnCfg.CSVHeaderDefineChar != nil {
erOpts.CSVHeaderDefineChar = jsnCfg.CSVHeaderDefineChar
}
if jsnCfg.CSVLazyQuotes != nil {
erOpts.CSVLazyQuotes = jsnCfg.CSVLazyQuotes
}
if jsnCfg.XMLRootPath != nil {
erOpts.XMLRootPath = jsnCfg.XMLRootPath
}
if jsnCfg.AMQPQueueID != nil {
erOpts.AMQPQueueID = jsnCfg.AMQPQueueID
}
if jsnCfg.AMQPQueueIDProcessed != nil {
erOpts.AMQPQueueIDProcessed = jsnCfg.AMQPQueueIDProcessed
}
if jsnCfg.AMQPConsumerTag != nil {
erOpts.AMQPConsumerTag = jsnCfg.AMQPConsumerTag
}
if jsnCfg.AMQPExchange != nil {
erOpts.AMQPExchange = jsnCfg.AMQPExchange
}
if jsnCfg.AMQPExchangeType != nil {
erOpts.AMQPExchangeType = jsnCfg.AMQPExchangeType
}
if jsnCfg.AMQPRoutingKey != nil {
erOpts.AMQPRoutingKey = jsnCfg.AMQPRoutingKey
}
if jsnCfg.AMQPExchangeProcessed != nil {
erOpts.AMQPExchangeProcessed = jsnCfg.AMQPExchangeProcessed
}
if jsnCfg.AMQPExchangeTypeProcessed != nil {
erOpts.AMQPExchangeTypeProcessed = jsnCfg.AMQPExchangeTypeProcessed
}
if jsnCfg.AMQPRoutingKeyProcessed != nil {
erOpts.AMQPRoutingKeyProcessed = jsnCfg.AMQPRoutingKeyProcessed
}
if jsnCfg.KafkaTopic != nil {
erOpts.KafkaTopic = jsnCfg.KafkaTopic
}
if jsnCfg.KafkaGroupID != nil {
erOpts.KafkaGroupID = jsnCfg.KafkaGroupID
}
if jsnCfg.KafkaMaxWait != nil {
var kafkaMaxWait time.Duration
if kafkaMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.KafkaMaxWait); err != nil {
return
}
erOpts.KafkaMaxWait = utils.DurationPointer(kafkaMaxWait)
}
if jsnCfg.KafkaTopicProcessed != nil {
erOpts.KafkaTopicProcessed = jsnCfg.KafkaTopicProcessed
}
if jsnCfg.SQLDBName != nil {
erOpts.SQLDBName = jsnCfg.SQLDBName
}
if jsnCfg.SQLTableName != nil {
erOpts.SQLTableName = jsnCfg.SQLTableName
}
if jsnCfg.SSLMode != nil {
erOpts.SSLMode = jsnCfg.SSLMode
}
if jsnCfg.SQLDBNameProcessed != nil {
erOpts.SQLDBNameProcessed = jsnCfg.SQLDBNameProcessed
}
if jsnCfg.SQLTableNameProcessed != nil {
erOpts.SQLTableNameProcessed = jsnCfg.SQLTableNameProcessed
}
if jsnCfg.SSLModeProcessed != nil {
erOpts.SSLModeProcessed = jsnCfg.SSLModeProcessed
}
if jsnCfg.AWSRegion != nil {
erOpts.AWSRegion = jsnCfg.AWSRegion
}
if jsnCfg.AWSKey != nil {
erOpts.AWSKey = jsnCfg.AWSKey
}
if jsnCfg.AWSSecret != nil {
erOpts.AWSSecret = jsnCfg.AWSSecret
}
if jsnCfg.AWSToken != nil {
erOpts.AWSToken = jsnCfg.AWSToken
}
if jsnCfg.AWSRegionProcessed != nil {
erOpts.AWSRegionProcessed = jsnCfg.AWSRegionProcessed
}
if jsnCfg.AWSKeyProcessed != nil {
erOpts.AWSKeyProcessed = jsnCfg.AWSKeyProcessed
}
if jsnCfg.AWSSecretProcessed != nil {
erOpts.AWSSecretProcessed = jsnCfg.AWSSecretProcessed
}
if jsnCfg.AWSTokenProcessed != nil {
erOpts.AWSTokenProcessed = jsnCfg.AWSTokenProcessed
}
if jsnCfg.SQSQueueID != nil {
erOpts.SQSQueueID = jsnCfg.SQSQueueID
}
if jsnCfg.SQSQueueIDProcessed != nil {
erOpts.SQSQueueIDProcessed = jsnCfg.SQSQueueIDProcessed
}
if jsnCfg.S3BucketID != nil {
erOpts.S3BucketID = jsnCfg.S3BucketID
}
if jsnCfg.S3FolderPathProcessed != nil {
erOpts.S3FolderPathProcessed = jsnCfg.S3FolderPathProcessed
}
if jsnCfg.S3BucketIDProcessed != nil {
erOpts.S3BucketIDProcessed = jsnCfg.S3BucketIDProcessed
}
if jsnCfg.NATSJetStream != nil {
erOpts.NATSJetStream = jsnCfg.NATSJetStream
}
if jsnCfg.NATSConsumerName != nil {
erOpts.NATSConsumerName = jsnCfg.NATSConsumerName
}
if jsnCfg.NATSSubject != nil {
erOpts.NATSSubject = jsnCfg.NATSSubject
}
if jsnCfg.NATSQueueID != nil {
erOpts.NATSQueueID = jsnCfg.NATSQueueID
}
if jsnCfg.NATSJWTFile != nil {
erOpts.NATSJWTFile = jsnCfg.NATSJWTFile
}
if jsnCfg.NATSSeedFile != nil {
erOpts.NATSSeedFile = jsnCfg.NATSSeedFile
}
if jsnCfg.NATSCertificateAuthority != nil {
erOpts.NATSCertificateAuthority = jsnCfg.NATSCertificateAuthority
}
if jsnCfg.NATSClientCertificate != nil {
erOpts.NATSClientCertificate = jsnCfg.NATSClientCertificate
}
if jsnCfg.NATSClientKey != nil {
erOpts.NATSClientKey = jsnCfg.NATSClientKey
}
if jsnCfg.NATSJetStreamMaxWait != nil {
var jetStreamMaxWait time.Duration
if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil {
return
}
erOpts.NATSJetStreamMaxWait = utils.DurationPointer(jetStreamMaxWait)
}
if jsnCfg.NATSJetStreamProcessed != nil {
erOpts.NATSJetStreamProcessed = jsnCfg.NATSJetStreamProcessed
}
if jsnCfg.NATSSubjectProcessed != nil {
erOpts.NATSSubjectProcessed = jsnCfg.NATSSubjectProcessed
}
if jsnCfg.NATSJWTFileProcessed != nil {
erOpts.NATSJWTFileProcessed = jsnCfg.NATSJWTFileProcessed
}
if jsnCfg.NATSSeedFileProcessed != nil {
erOpts.NATSSeedFileProcessed = jsnCfg.NATSSeedFileProcessed
}
if jsnCfg.NATSCertificateAuthorityProcessed != nil {
erOpts.NATSCertificateAuthorityProcessed = jsnCfg.NATSCertificateAuthorityProcessed
}
if jsnCfg.NATSClientCertificateProcessed != nil {
erOpts.NATSClientCertificateProcessed = jsnCfg.NATSClientCertificateProcessed
}
if jsnCfg.NATSClientKeyProcessed != nil {
erOpts.NATSClientKeyProcessed = jsnCfg.NATSClientKeyProcessed
}
if jsnCfg.NATSJetStreamMaxWaitProcessed != nil {
var jetStreamMaxWait time.Duration
if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWaitProcessed); err != nil {
return
}
erOpts.NATSJetStreamMaxWaitProcessed = utils.DurationPointer(jetStreamMaxWait)
}
return
}
func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplates map[string][]*FCTemplate, sep string) (err error) {
if jsnCfg == nil {
return
@@ -232,6 +490,188 @@ func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplat
return
}
func (erOpts *EventReaderOpts) Clone() *EventReaderOpts {
cln := &EventReaderOpts{}
if erOpts.PartialPath != nil {
cln.PartialPath = utils.StringPointer(*erOpts.PartialPath)
}
if erOpts.PartialCacheAction != nil {
cln.PartialCacheAction = utils.StringPointer(*erOpts.PartialCacheAction)
}
if erOpts.PartialOrderField != nil {
cln.PartialOrderField = utils.StringPointer(*erOpts.PartialOrderField)
}
if erOpts.PartialCSVFieldSeparator != nil {
cln.PartialCSVFieldSeparator = utils.StringPointer(*erOpts.PartialCSVFieldSeparator)
}
if erOpts.CSVRowLength != nil {
cln.CSVRowLength = utils.IntPointer(*erOpts.CSVRowLength)
}
if erOpts.CSVFieldSeparator != nil {
cln.CSVFieldSeparator = utils.StringPointer(*erOpts.CSVFieldSeparator)
}
if erOpts.CSVHeaderDefineChar != nil {
cln.CSVHeaderDefineChar = utils.StringPointer(*erOpts.CSVHeaderDefineChar)
}
if erOpts.CSVLazyQuotes != nil {
cln.CSVLazyQuotes = utils.BoolPointer(*erOpts.CSVLazyQuotes)
}
if erOpts.XMLRootPath != nil {
cln.XMLRootPath = utils.StringPointer(*erOpts.XMLRootPath)
}
if erOpts.AMQPQueueID != nil {
cln.AMQPQueueID = utils.StringPointer(*erOpts.AMQPQueueID)
}
if erOpts.AMQPQueueIDProcessed != nil {
cln.AMQPQueueIDProcessed = utils.StringPointer(*erOpts.AMQPQueueIDProcessed)
}
if erOpts.AMQPConsumerTag != nil {
cln.AMQPConsumerTag = utils.StringPointer(*erOpts.AMQPConsumerTag)
}
if erOpts.AMQPExchange != nil {
cln.AMQPExchange = utils.StringPointer(*erOpts.AMQPExchange)
}
if erOpts.AMQPExchangeType != nil {
cln.AMQPExchangeType = utils.StringPointer(*erOpts.AMQPExchangeType)
}
if erOpts.AMQPRoutingKey != nil {
cln.AMQPRoutingKey = utils.StringPointer(*erOpts.AMQPRoutingKey)
}
if erOpts.AMQPExchangeProcessed != nil {
cln.AMQPExchangeProcessed = utils.StringPointer(*erOpts.AMQPExchangeProcessed)
}
if erOpts.AMQPExchangeTypeProcessed != nil {
cln.AMQPExchangeTypeProcessed = utils.StringPointer(*erOpts.AMQPExchangeTypeProcessed)
}
if erOpts.AMQPRoutingKeyProcessed != nil {
cln.AMQPRoutingKeyProcessed = utils.StringPointer(*erOpts.AMQPRoutingKeyProcessed)
}
if erOpts.KafkaTopic != nil {
cln.KafkaTopic = utils.StringPointer(*erOpts.KafkaTopic)
}
if erOpts.KafkaGroupID != nil {
cln.KafkaGroupID = utils.StringPointer(*erOpts.KafkaGroupID)
}
if erOpts.KafkaMaxWait != nil {
cln.KafkaMaxWait = utils.DurationPointer(*erOpts.KafkaMaxWait)
}
if erOpts.KafkaTopicProcessed != nil {
cln.KafkaTopicProcessed = utils.StringPointer(*erOpts.KafkaTopicProcessed)
}
if erOpts.SQLDBName != nil {
cln.SQLDBName = utils.StringPointer(*erOpts.SQLDBName)
}
if erOpts.SQLTableName != nil {
cln.SQLTableName = utils.StringPointer(*erOpts.SQLTableName)
}
if erOpts.SSLMode != nil {
cln.SSLMode = utils.StringPointer(*erOpts.SSLMode)
}
if erOpts.SQLDBNameProcessed != nil {
cln.SQLDBNameProcessed = utils.StringPointer(*erOpts.SQLDBNameProcessed)
}
if erOpts.SQLTableNameProcessed != nil {
cln.SQLTableNameProcessed = utils.StringPointer(*erOpts.SQLTableNameProcessed)
}
if erOpts.SSLModeProcessed != nil {
cln.SSLModeProcessed = utils.StringPointer(*erOpts.SSLModeProcessed)
}
if erOpts.AWSRegion != nil {
cln.AWSRegion = utils.StringPointer(*erOpts.AWSRegion)
}
if erOpts.AWSKey != nil {
cln.AWSKey = utils.StringPointer(*erOpts.AWSKey)
}
if erOpts.AWSSecret != nil {
cln.AWSSecret = utils.StringPointer(*erOpts.AWSSecret)
}
if erOpts.AWSToken != nil {
cln.AWSToken = utils.StringPointer(*erOpts.AWSToken)
}
if erOpts.AWSRegionProcessed != nil {
cln.AWSRegionProcessed = utils.StringPointer(*erOpts.AWSRegionProcessed)
}
if erOpts.AWSKeyProcessed != nil {
cln.AWSKeyProcessed = utils.StringPointer(*erOpts.AWSKeyProcessed)
}
if erOpts.AWSSecretProcessed != nil {
cln.AWSSecretProcessed = utils.StringPointer(*erOpts.AWSSecretProcessed)
}
if erOpts.AWSTokenProcessed != nil {
cln.AWSTokenProcessed = utils.StringPointer(*erOpts.AWSTokenProcessed)
}
if erOpts.SQSQueueID != nil {
cln.SQSQueueID = utils.StringPointer(*erOpts.SQSQueueID)
}
if erOpts.SQSQueueIDProcessed != nil {
cln.SQSQueueIDProcessed = utils.StringPointer(*erOpts.SQSQueueIDProcessed)
}
if erOpts.S3BucketID != nil {
cln.S3BucketID = utils.StringPointer(*erOpts.S3BucketID)
}
if erOpts.S3FolderPathProcessed != nil {
cln.S3FolderPathProcessed = utils.StringPointer(*erOpts.S3FolderPathProcessed)
}
if erOpts.S3BucketIDProcessed != nil {
cln.S3BucketIDProcessed = utils.StringPointer(*erOpts.S3BucketIDProcessed)
}
if erOpts.NATSJetStream != nil {
cln.NATSJetStream = utils.BoolPointer(*erOpts.NATSJetStream)
}
if erOpts.NATSConsumerName != nil {
cln.NATSConsumerName = utils.StringPointer(*erOpts.NATSConsumerName)
}
if erOpts.NATSSubject != nil {
cln.NATSSubject = utils.StringPointer(*erOpts.NATSSubject)
}
if erOpts.NATSQueueID != nil {
cln.NATSQueueID = utils.StringPointer(*erOpts.NATSQueueID)
}
if erOpts.NATSJWTFile != nil {
cln.NATSJWTFile = utils.StringPointer(*erOpts.NATSJWTFile)
}
if erOpts.NATSSeedFile != nil {
cln.NATSSeedFile = utils.StringPointer(*erOpts.NATSSeedFile)
}
if erOpts.NATSCertificateAuthority != nil {
cln.NATSCertificateAuthority = utils.StringPointer(*erOpts.NATSCertificateAuthority)
}
if erOpts.NATSClientCertificate != nil {
cln.NATSClientCertificate = utils.StringPointer(*erOpts.NATSClientCertificate)
}
if erOpts.NATSClientKey != nil {
cln.NATSClientKey = utils.StringPointer(*erOpts.NATSClientKey)
}
if erOpts.NATSJetStreamMaxWait != nil {
cln.NATSJetStreamMaxWait = utils.DurationPointer(*erOpts.NATSJetStreamMaxWait)
}
if erOpts.NATSJetStreamProcessed != nil {
cln.NATSJetStreamProcessed = utils.BoolPointer(*erOpts.NATSJetStreamProcessed)
}
if erOpts.NATSSubjectProcessed != nil {
cln.NATSSubjectProcessed = utils.StringPointer(*erOpts.NATSSubjectProcessed)
}
if erOpts.NATSJWTFileProcessed != nil {
cln.NATSJWTFileProcessed = utils.StringPointer(*erOpts.NATSJWTFileProcessed)
}
if erOpts.NATSSeedFileProcessed != nil {
cln.NATSSeedFileProcessed = utils.StringPointer(*erOpts.NATSSeedFileProcessed)
}
if erOpts.NATSCertificateAuthorityProcessed != nil {
cln.NATSCertificateAuthorityProcessed = utils.StringPointer(*erOpts.NATSCertificateAuthorityProcessed)
}
if erOpts.NATSClientCertificateProcessed != nil {
cln.NATSClientCertificateProcessed = utils.StringPointer(*erOpts.NATSClientCertificateProcessed)
}
if erOpts.NATSClientKeyProcessed != nil {
cln.NATSClientKeyProcessed = utils.StringPointer(*erOpts.NATSClientKeyProcessed)
}
if erOpts.NATSJetStreamMaxWaitProcessed != nil {
cln.NATSJetStreamMaxWaitProcessed = utils.DurationPointer(*erOpts.NATSJetStreamMaxWaitProcessed)
}
return cln
}
// Clone returns a deep copy of EventReaderCfg
func (er EventReaderCfg) Clone() (cln *EventReaderCfg) {
cln = &EventReaderCfg{
@@ -278,6 +718,186 @@ func (er EventReaderCfg) Clone() (cln *EventReaderCfg) {
// AsMapInterface returns the config as a map[string]interface{}
func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string]interface{}) {
// opts := map[string]interface{}{}
// if er.Opts.PartialPath != nil {
// opts[utils.PartialPathOpt] = *er.Opts.PartialPath
// }
// if er.Opts.PartialCacheAction != nil {
// opts[utils.PartialCacheActionOpt] = *er.Opts.PartialCacheAction
// }
// if er.Opts.PartialOrderField != nil {
// opts[utils.PartialOrderFieldOpt] = *er.Opts.PartialOrderField
// }
// if er.Opts.PartialCSVFieldSeparator != nil {
// opts[utils.PartialCSVFieldSepartorOpt] = *er.Opts.PartialCSVFieldSeparator
// }
// if er.Opts.CSVRowLength != nil {
// opts[utils.CSVRowLengthOpt] = *er.Opts.CSVRowLength
// }
// if er.Opts.CSVFieldSeparator != nil {
// opts[utils.CSVFieldSepOpt] = *er.Opts.CSVFieldSeparator
// }
// if er.Opts.CSVHeaderDefineChar != nil {
// opts[utils.HeaderDefineCharOpt] = *er.Opts.CSVHeaderDefineChar
// }
// if er.Opts.CSVLazyQuotes != nil {
// opts[utils.CSVLazyQuotes] = *er.Opts.CSVLazyQuotes
// }
// if er.Opts.XMLRootPath != nil {
// opts[utils.XMLRootPathOpt] = *er.Opts.XMLRootPath
// }
// if er.Opts.AMQPQueueID != nil {
// opts[utils.AMQPQueueID] = *er.Opts.AMQPQueueID
// }
// if er.Opts.AMQPQueueIDProcessed != nil {
// opts[utils.AMQPQueueIDProcessedCfg] = *er.Opts.AMQPQueueIDProcessed
// }
// if er.Opts.AMQPConsumerTag != nil {
// opts[utils.AMQPConsumerTag] = *er.Opts.AMQPConsumerTag
// }
// if er.Opts.AMQPExchange != nil {
// opts[utils.AMQPExchange] = *er.Opts.AMQPExchange
// }
// if er.Opts.AMQPExchangeType != nil {
// opts[utils.AMQPExchangeType] = *er.Opts.AMQPExchangeType
// }
// if er.Opts.AMQPRoutingKey != nil {
// opts[utils.AMQPRoutingKey] = *er.Opts.AMQPRoutingKey
// }
// if er.Opts.AMQPExchangeProcessed != nil {
// opts[utils.AMQPExchangeProcessedCfg] = *er.Opts.AMQPExchangeProcessed
// }
// if er.Opts.AMQPExchangeTypeProcessed != nil {
// opts[utils.AMQPExchangeTypeProcessedCfg] = *er.Opts.AMQPExchangeTypeProcessed
// }
// if er.Opts.AMQPRoutingKeyProcessed != nil {
// opts[utils.AMQPRoutingKeyProcessedCfg] = *er.Opts.AMQPRoutingKeyProcessed
// }
// if er.Opts.KafkaTopic != nil {
// opts[utils.KafkaTopic] = *er.Opts.KafkaTopic
// }
// if er.Opts.KafkaGroupID != nil {
// opts[utils.KafkaGroupID] = *er.Opts.KafkaGroupID
// }
// if er.Opts.KafkaMaxWait != nil {
// opts[utils.KafkaMaxWait] = er.Opts.KafkaMaxWait.String()
// }
// if er.Opts.KafkaTopicProcessed != nil {
// opts[utils.KafkaTopicProcessedCfg] = *er.Opts.KafkaTopicProcessed
// }
// if er.Opts.SQLDBName != nil {
// opts[utils.SQLDBNameOpt] = *er.Opts.SQLDBName
// }
// if er.Opts.SQLTableName != nil {
// opts[utils.SQLTableNameOpt] = *er.Opts.SQLTableName
// }
// if er.Opts.SSLMode != nil {
// opts[utils.SSLModeCfg] = *er.Opts.SSLMode
// }
// if er.Opts.SQLDBNameProcessed != nil {
// opts[utils.SQLDBNameProcessedCfg] = *er.Opts.SQLDBNameProcessed
// }
// if er.Opts.SQLTableNameProcessed != nil {
// opts[utils.SQLTableNameProcessedCfg] = *er.Opts.SQLTableNameProcessed
// }
// if er.Opts.SSLModeProcessed != nil {
// opts[utils.SSLModeProcessedCfg] = *er.Opts.SSLModeProcessed
// }
// if er.Opts.AWSRegion != nil {
// opts[utils.AWSRegion] = *er.Opts.AWSRegion
// }
// if er.Opts.AWSKey != nil {
// opts[utils.AWSKey] = *er.Opts.AWSKey
// }
// if er.Opts.AWSSecret != nil {
// opts[utils.AWSSecret] = *er.Opts.AWSSecret
// }
// if er.Opts.AWSToken != nil {
// opts[utils.AWSToken] = *er.Opts.AWSToken
// }
// if er.Opts.AWSRegionProcessed != nil {
// opts[utils.AWSRegionProcessedCfg] = *er.Opts.AWSRegionProcessed
// }
// if er.Opts.AWSKeyProcessed != nil {
// opts[utils.AWSKeyProcessedCfg] = *er.Opts.AWSKeyProcessed
// }
// if er.Opts.AWSSecretProcessed != nil {
// opts[utils.AWSSecretProcessedCfg] = *er.Opts.AWSSecretProcessed
// }
// if er.Opts.AWSTokenProcessed != nil {
// opts[utils.AWSTokenProcessedCfg] = *er.Opts.AWSTokenProcessed
// }
// if er.Opts.SQSQueueID != nil {
// opts[utils.SQSQueueID] = *er.Opts.SQSQueueID
// }
// if er.Opts.SQSQueueIDProcessed != nil {
// opts[utils.SQSQueueIDProcessedCfg] = *er.Opts.SQSQueueIDProcessed
// }
// if er.Opts.S3BucketID != nil {
// opts[utils.S3Bucket] = *er.Opts.S3BucketID
// }
// if er.Opts.S3FolderPathProcessed != nil {
// opts[utils.S3FolderPathProcessedCfg] = *er.Opts.S3FolderPathProcessed
// }
// if er.Opts.S3BucketIDProcessed != nil {
// opts[utils.S3BucketIDProcessedCfg] = *er.Opts.S3BucketIDProcessed
// }
// if er.Opts.NATSJetStream != nil {
// opts[utils.NatsJetStream] = *er.Opts.NATSJetStream
// }
// if er.Opts.NATSConsumerName != nil {
// opts[utils.NatsConsumerName] = *er.Opts.NATSConsumerName
// }
// if er.Opts.NATSSubject != nil {
// opts[utils.NatsSubject] = *er.Opts.NATSSubject
// }
// if er.Opts.NATSQueueID != nil {
// opts[utils.NatsQueueID] = *er.Opts.NATSQueueID
// }
// if er.Opts.NATSJWTFile != nil {
// opts[utils.NatsJWTFile] = *er.Opts.NATSJWTFile
// }
// if er.Opts.NATSSeedFile != nil {
// opts[utils.NatsSeedFile] = *er.Opts.NATSSeedFile
// }
// if er.Opts.NATSCertificateAuthority != nil {
// opts[utils.NatsCertificateAuthority] = *er.Opts.NATSCertificateAuthority
// }
// if er.Opts.NATSClientCertificate != nil {
// opts[utils.NatsClientCertificate] = *er.Opts.NATSClientCertificate
// }
// if er.Opts.NATSClientKey != nil {
// opts[utils.NatsClientKey] = *er.Opts.NATSClientKey
// }
// if er.Opts.NATSJetStreamMaxWait != nil {
// opts[utils.NatsJetStreamMaxWait] = er.Opts.NATSJetStreamMaxWait.String()
// }
// if er.Opts.NATSJetStreamProcessed != nil {
// opts[utils.NATSJetStreamProcessedCfg] = *er.Opts.NATSJetStreamProcessed
// }
// if er.Opts.NATSSubjectProcessed != nil {
// opts[utils.NATSSubjectProcessedCfg] = *er.Opts.NATSSubjectProcessed
// }
// if er.Opts.NATSJWTFileProcessed != nil {
// opts[utils.NATSJWTFileProcessedCfg] = *er.Opts.NATSJWTFileProcessed
// }
// if er.Opts.NATSSeedFileProcessed != nil {
// opts[utils.NATSSeedFileProcessedCfg] = *er.Opts.NATSSeedFileProcessed
// }
// if er.Opts.NATSCertificateAuthorityProcessed != nil {
// opts[utils.NATSCertificateAuthorityProcessedCfg] = *er.Opts.NATSCertificateAuthorityProcessed
// }
// if er.Opts.NATSClientCertificateProcessed != nil {
// opts[utils.NATSClientCertificateProcessed] = *er.Opts.NATSClientCertificateProcessed
// }
// if er.Opts.NATSClientKeyProcessed != nil {
// opts[utils.NATSClientKeyProcessedCfg] = *er.Opts.NATSClientKeyProcessed
// }
// if er.Opts.NATSJetStreamMaxWaitProcessed != nil {
// opts[utils.NATSJetStreamMaxWaitProcessedCfg] = er.Opts.NATSJetStreamMaxWaitProcessed.String()
// }
initialMP = map[string]interface{}{
utils.IDCfg: er.ID,
utils.TypeCfg: er.Type,

View File

@@ -181,6 +181,68 @@ type ERsJsonCfg struct {
Partial_cache_ttl *string
}
type EventReaderOptsJson struct {
PartialPath *string `json:"partialPath"`
PartialCacheAction *string `json:"partialCacheAction"`
PartialOrderField *string `json:"partialOrderField"`
PartialCSVFieldSeparator *string `json:"partialcsvFieldSeparator"`
CSVRowLength *int `json:"csvRowLength"`
CSVFieldSeparator *string `json:"csvFieldSeparator"`
CSVHeaderDefineChar *string `json:"csvHeaderDefineChar"`
CSVLazyQuotes *bool `json:"csvLazyQuotes"`
XMLRootPath *string `json:"xmlRootPath"`
AMQPQueueID *string `json:"amqpQueueID"`
AMQPQueueIDProcessed *string `json:"amqpQueueIDProcessed"`
AMQPConsumerTag *string `json:"amqpConsumerTag"`
AMQPExchange *string `json:"amqpExchange"`
AMQPExchangeType *string `json:"amqpExchangeType"`
AMQPRoutingKey *string `json:"amqpRoutingKey"`
AMQPExchangeProcessed *string `json:"amqpExchangeProcessed"`
AMQPExchangeTypeProcessed *string `json:"amqpExchangeTypeProcessed"`
AMQPRoutingKeyProcessed *string `json:"amqpRoutingKeyProcessed"`
KafkaTopic *string `json:"kafkaTopic"`
KafkaGroupID *string `json:"kafkaGroupID"`
KafkaMaxWait *string `json:"kafkaMaxWait"`
KafkaTopicProcessed *string `json:"kafkaTopicProcessed"`
SQLDBName *string `json:"sqlDBName"`
SQLTableName *string `json:"sqlTableName"`
SSLMode *string `json:"postgresSSLMode"`
SQLDBNameProcessed *string `json:"sqlDBNameProcessed"`
SQLTableNameProcessed *string `json:"sqlTableNameProcessed"`
SSLModeProcessed *string `json:"postgresSSLModeProcessed"`
AWSRegion *string `json:"awsRegion"`
AWSKey *string `json:"awsKey"`
AWSSecret *string `json:"awsSecret"`
AWSToken *string `json:"awsToken"`
AWSRegionProcessed *string `json:"awsRegionProcessed"`
AWSKeyProcessed *string `json:"awsKeyProcessed"`
AWSSecretProcessed *string `json:"awsSecretProcessed"`
AWSTokenProcessed *string `json:"awsTokenProcessed"`
SQSQueueID *string `json:"sqsQueueID"`
SQSQueueIDProcessed *string `json:"sqsQueueIDProcessed"`
S3BucketID *string `json:"s3BucketID"`
S3FolderPathProcessed *string `json:"s3FolderPathProcessed"`
S3BucketIDProcessed *string `json:"s3BucketIDProcessed"`
NATSJetStream *bool `json:"natsJetStream"`
NATSConsumerName *string `json:"natsConsumerName"`
NATSSubject *string `json:"natsSubject"`
NATSQueueID *string `json:"natsQueueID"`
NATSJWTFile *string `json:"natsJWTFile"`
NATSSeedFile *string `json:"natsSeedFile"`
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
NATSClientCertificate *string `json:"natsClientCertificate"`
NATSClientKey *string `json:"natsClientKey"`
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
NATSJetStreamProcessed *bool `json:"natsJetStreamProcessed"`
NATSSubjectProcessed *string `json:"natsSubjectProcessed"`
NATSJWTFileProcessed *string `json:"natsJWTFileProcessed"`
NATSSeedFileProcessed *string `json:"natsSeedFileProcessed"`
NATSCertificateAuthorityProcessed *string `json:"natsCertificateAuthorityProcessed"`
NATSClientCertificateProcessed *string `json:"natsClientCertificateProcessed"`
NATSClientKeyProcessed *string `json:"natsClientKeyProcessed"`
NATSJetStreamMaxWaitProcessed *string `json:"natsJetStreamMaxWaitProcessed"`
}
// EventReaderSJsonCfg is the configuration of a single EventReader
type EventReaderJsonCfg struct {
Id *string
@@ -207,6 +269,46 @@ type EEsJsonCfg struct {
Exporters *[]*EventExporterJsonCfg
}
type EventExporterOptsJson struct {
CSVFieldSeparator *string `json:"csvFieldSeparator"`
ElsIndex *string `json:"elsIndex"`
ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"`
ElsIfSeqNo *int `json:"elsIfSeqNo"`
ElsOpType *string `json:"elsOpType"`
ElsPipeline *string `json:"elsPipeline"`
ElsRouting *string `json:"elsRouting"`
ElsTimeout *string `json:"elsTimeout"`
ElsVersion *int `json:"elsVersion"`
ElsVersionType *string `json:"elsVersionType"`
ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"`
SQLMaxIdleConns *int `json:"sqlMaxIdleConns"`
SQLMaxOpenConns *int `json:"sqlMaxOpenConns"`
SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"`
SQLTableName *string `json:"sqlTableName"`
SQLDBName *string `json:"sqlDBName"`
SSLMode *string `json:"sslMode"`
KafkaTopic *string `json:"kafkaTopic"`
AMQPQueueID *string `json:"amqpQueueID"`
AMQPRoutingKey *string `json:"amqpRoutingKey"`
AMQPExchange *string `json:"amqpExchange"`
AMQPExchangeType *string `json:"amqpExchangeType"`
AWSRegion *string `json:"awsRegion"`
AWSKey *string `json:"awsKey"`
AWSSecret *string `json:"awsSecret"`
AWSToken *string `json:"awsToken"`
SQSQueueID *string `json:"sqsQueueID"`
S3BucketID *string `json:"s3BucketID"`
S3FolderPath *string `json:"s3FolderPath"`
NATSJetStream *bool `json:"natsJetStream"`
NATSSubject *string `json:"natsSubject"`
NATSJWTFile *string `json:"natsJWTFile"`
NATSSeedFile *string `json:"natsSeedFile"`
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
NATSClientCertificate *string `json:"natsClientCertificate"`
NATSClientKey *string `json:"natsClientKey"`
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
}
// EventExporterJsonCfg is the configuration of a single EventExporter
type EventExporterJsonCfg struct {
Id *string

View File

@@ -487,12 +487,12 @@
// // SQL
// // "sqlMaxIdleConns": 0, // SQLMaxIdleConns
// // "sqlMaxOpenConns": 0, // SQLMaxOpenConns
// // "sqlMaxConnLifetime": 0, // SQLMaxConnLifetime
// // "sqlConnMaxLifetime": 0, // SQLConnMaxLifetime
// // "sqlTableName":"cdrs", // the name of the table from where the events are exported
// // "sqlDBName": "cgrates", // the name of the database from where the events are exported
// // "sslmode": "disable", // the postgresSSLMode for postgres
// // "sslMode": "disable", // the postgresSSLMode for postgres
// // Kafka

View File

@@ -385,7 +385,7 @@
"postgresSSLMode": "disable",
"sqlMaxIdleConns": "10",
"sqlMaxOpenConns": "100",
"sqlMaxConnLifetime": "0",
"sqlConnMaxLifetime": "0",
},
"fields":[ // in case that the path is *exp.*row user must complete all the fields one to one with his sql schema in the correct order
{"tag": "CGRID", "path": "*exp.*row", "type": "*group", "value": "~*req.CGRID"},
@@ -405,7 +405,7 @@
"postgresSSLMode": "disable",
"sqlMaxIdleConns": "10",
"sqlMaxOpenConns": "100",
"sqlMaxConnLifetime": "0",
"sqlConnMaxLifetime": "0",
},
"fields":[ // the path constains *exp.columnName
{"tag": "CGRID", "path": "*exp.cgrid", "type": "*variable", "value": "~*req.CGRID"},

View File

@@ -120,7 +120,7 @@ func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, s
}
sqlDB.SetMaxOpenConns(int(val))
}
if iface, has := opts[utils.SQLMaxConnLifetime]; has {
if iface, has := opts[utils.SQLConnMaxLifetime]; has {
val, err := utils.IfaceAsDuration(iface)
if err != nil {
return nil, nil, err

View File

@@ -271,7 +271,7 @@ func TestOpenDB2Err(t *testing.T) {
func TestOpenDB3(t *testing.T) {
dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
"cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates"))
_, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxConnLifetime: 2})
_, _, err := openDB(dialect, map[string]interface{}{utils.SQLConnMaxLifetime: 2})
if err != nil {
t.Error(err)
}
@@ -280,7 +280,7 @@ func TestOpenDB3(t *testing.T) {
func TestOpenDB3Err(t *testing.T) {
dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
"cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates"))
_, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxConnLifetime: "test"})
_, _, err := openDB(dialect, map[string]interface{}{utils.SQLConnMaxLifetime: "test"})
errExpect := "time: invalid duration \"test\""
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)

View File

@@ -2551,7 +2551,7 @@ const (
SQLTableNameOpt = "sqlTableName"
SQLMaxOpenConns = "sqlMaxOpenConns"
SQLMaxConnLifetime = "sqlMaxConnLifetime"
SQLConnMaxLifetime = "sqlConnMaxLifetime"
// fileCSV
CSVRowLengthOpt = "csvRowLength"
@@ -2611,6 +2611,37 @@ const (
NatsCertificateAuthority = "natsCertificateAuthority"
NatsJetStream = "natsJetStream"
NatsJetStreamMaxWait = "natsJetStreamMaxWait"
// processed opts
AMQPQueueIDProcessedCfg = "amqpQueueIDProcessed"
AMQPExchangeProcessedCfg = "amqpExchangeProcessed"
AMQPExchangeTypeProcessedCfg = "amqpExchangeTypeProcessed"
AMQPRoutingKeyProcessedCfg = "amqpRoutingKeyProcessed"
KafkaTopicProcessedCfg = "kafkaTopicProcessed"
SQLDBNameProcessedCfg = "sqlDBNameProcessed"
SQLTableNameProcessedCfg = "sqlTableNameProcessed"
SSLModeProcessedCfg = "sslModeProcessed"
AWSRegionProcessedCfg = "awsRegionProcessed"
AWSKeyProcessedCfg = "awsKeyProcessed"
AWSSecretProcessedCfg = "awsSecretProcessed"
AWSTokenProcessedCfg = "awsTokenProcessed"
SQSQueueIDProcessedCfg = "sqsQueueIDProcessed"
S3FolderPathProcessedCfg = "s3FolderPathProcessed"
S3BucketIDProcessedCfg = "s3BucketIDProcessed"
NATSJetStreamProcessedCfg = "natsJetStreamProcessed"
NATSSubjectProcessedCfg = "natsSubjectProcessed"
NATSJWTFileProcessedCfg = "natsJWTFileProcessed"
NATSSeedFileProcessedCfg = "natsSeedFileProcessed"
NATSCertificateAuthorityProcessedCfg = "natsCertificateAuthorityProcessed"
NATSClientCertificateProcessed = "natsClientCertificateProcessed"
NATSClientKeyProcessedCfg = "natsClientKeyProcessed"
NATSJetStreamMaxWaitProcessedCfg = "natsJetStreamMaxWaitProcessed"
)
// Analyzers constants