Files
cgrates/config/ers.go
2025-10-29 19:42:40 +01:00

1063 lines
29 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
*/
package config
import (
"slices"
"time"
"github.com/cgrates/cgrates/utils"
)
// ERsCfg the config for ERs
type ERsCfg struct {
Enabled bool
SessionSConns []string
EEsConns []string
StatSConns []string
ThresholdSConns []string
ConcurrentEvents int
Readers []*EventReaderCfg
PartialCacheTTL time.Duration
}
// ReaderCfg iterates over the Readers slice and returns the reader
// configuration associated with the specified "id". If none were found, the
// method will return nil.
func (c *ERsCfg) ReaderCfg(id string) *EventReaderCfg {
for _, rdr := range c.Readers {
if rdr.ID == id {
return rdr
}
}
return nil
}
func (c *ERsCfg) loadFromJSONCfg(jc *ERsJsonCfg, msgTemplates map[string][]*FCTemplate, sep string, dfltRdrCfg *EventReaderCfg) (err error) {
if jc == nil {
return
}
if jc.Enabled != nil {
c.Enabled = *jc.Enabled
}
if jc.SessionSConns != nil {
c.SessionSConns = make([]string, 0, len(*jc.SessionSConns))
for _, fID := range *jc.SessionSConns {
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
if fID != utils.MetaInternal {
c.SessionSConns = append(c.SessionSConns, fID)
} else {
c.SessionSConns = append(c.SessionSConns, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS))
}
}
}
if jc.EEsConns != nil {
c.EEsConns = tagInternalConns(*jc.EEsConns, utils.MetaEEs)
}
if jc.StatSConns != nil {
c.StatSConns = tagInternalConns(*jc.StatSConns, utils.MetaStats)
}
if jc.ThresholdSConns != nil {
c.ThresholdSConns = tagInternalConns(*jc.ThresholdSConns, utils.MetaThresholds)
}
if jc.ConcurrentEvents != nil {
c.ConcurrentEvents = max(*jc.ConcurrentEvents, 1)
}
if jc.PartialCacheTTL != nil {
if c.PartialCacheTTL, err = utils.ParseDurationWithNanosecs(*jc.PartialCacheTTL); err != nil {
return
}
}
return c.appendERsReaders(jc.Readers, msgTemplates, sep, dfltRdrCfg)
}
func (c *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, msgTemplates map[string][]*FCTemplate, sep string,
dfltRdrCfg *EventReaderCfg) (err error) {
if jsnReaders == nil {
return
}
for _, jsnReader := range *jsnReaders {
var rdr *EventReaderCfg
if jsnReader.Id != nil {
for _, reader := range c.Readers {
if reader.ID == *jsnReader.Id {
rdr = reader
break
}
}
}
if rdr == nil {
if dfltRdrCfg != nil {
rdr = dfltRdrCfg.Clone()
} else {
rdr = new(EventReaderCfg)
rdr.Opts = &EventReaderOpts{}
}
c.Readers = append(c.Readers, rdr)
}
if err := rdr.loadFromJSONCfg(jsnReader, msgTemplates, sep); err != nil {
return err
}
}
return nil
}
// Clone returns a deep copy of ERsCfg
func (c *ERsCfg) Clone() *ERsCfg {
clone := &ERsCfg{
Enabled: c.Enabled,
SessionSConns: slices.Clone(c.SessionSConns),
EEsConns: slices.Clone(c.EEsConns),
StatSConns: slices.Clone(c.StatSConns),
ThresholdSConns: slices.Clone(c.ThresholdSConns),
Readers: make([]*EventReaderCfg, len(c.Readers)),
ConcurrentEvents: c.ConcurrentEvents,
PartialCacheTTL: c.PartialCacheTTL,
}
for idx, rdr := range c.Readers {
clone.Readers[idx] = rdr.Clone()
}
return clone
}
// AsMapInterface returns the config as a map[string]any
func (c *ERsCfg) AsMapInterface(sep string) map[string]any {
m := map[string]any{
utils.EnabledCfg: c.Enabled,
utils.EEsConnsCfg: stripInternalConns(c.EEsConns),
utils.StatSConnsCfg: stripInternalConns(c.StatSConns),
utils.ThresholdSConnsCfg: stripInternalConns(c.ThresholdSConns),
utils.ConcurrentEventsCfg: c.ConcurrentEvents,
utils.PartialCacheTTLCfg: "0",
}
if c.PartialCacheTTL != 0 {
m[utils.PartialCacheTTLCfg] = c.PartialCacheTTL.String()
}
if c.SessionSConns != nil {
sessionSConns := make([]string, 0, len(c.SessionSConns))
for _, item := range c.SessionSConns {
if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS) {
sessionSConns = append(sessionSConns, utils.MetaInternal)
} else {
sessionSConns = append(sessionSConns, item)
}
}
m[utils.SessionSConnsCfg] = sessionSConns
}
if c.Readers != nil {
readers := make([]map[string]any, 0, len(c.Readers))
for _, item := range c.Readers {
readers = append(readers, item.AsMapInterface(sep))
}
m[utils.ReadersCfg] = readers
}
return m
}
type AMQPROpts struct {
QueueID *string
Username *string
Password *string
ConsumerTag *string
Exchange *string
ExchangeType *string
RoutingKey *string
}
func (amqpr *AMQPROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
if jsnCfg.AMQPQueueID != nil {
amqpr.QueueID = jsnCfg.AMQPQueueID
}
if jsnCfg.AMQPUsername != nil {
amqpr.Username = jsnCfg.AMQPUsername
}
if jsnCfg.AMQPPassword != nil {
amqpr.Password = jsnCfg.AMQPPassword
}
if jsnCfg.AMQPConsumerTag != nil {
amqpr.ConsumerTag = jsnCfg.AMQPConsumerTag
}
if jsnCfg.AMQPExchange != nil {
amqpr.Exchange = jsnCfg.AMQPExchange
}
if jsnCfg.AMQPExchangeType != nil {
amqpr.ExchangeType = jsnCfg.AMQPExchangeType
}
if jsnCfg.AMQPRoutingKey != nil {
amqpr.RoutingKey = jsnCfg.AMQPRoutingKey
}
return
}
type KafkaROpts struct {
Topic *string
GroupID *string
MaxWait *time.Duration
TLS *bool
CAPath *string
SkipTLSVerify *bool
}
func (kafkaROpts *KafkaROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
if jsnCfg.KafkaTopic != nil {
kafkaROpts.Topic = jsnCfg.KafkaTopic
}
if jsnCfg.KafkaGroupID != nil {
kafkaROpts.GroupID = jsnCfg.KafkaGroupID
}
if jsnCfg.KafkaMaxWait != nil {
var kafkaMaxWait time.Duration
if kafkaMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.KafkaMaxWait); err != nil {
return
}
kafkaROpts.MaxWait = utils.DurationPointer(kafkaMaxWait)
}
if jsnCfg.KafkaTLS != nil {
kafkaROpts.TLS = jsnCfg.KafkaTLS
}
if jsnCfg.KafkaCAPath != nil {
kafkaROpts.CAPath = jsnCfg.KafkaCAPath
}
if jsnCfg.KafkaSkipTLSVerify != nil {
kafkaROpts.SkipTLSVerify = jsnCfg.KafkaSkipTLSVerify
}
return
}
type SQLROpts struct {
DBName *string
TableName *string
BatchSize *int
DeleteIndexedFields *[]string
PgSSLMode *string
}
func (sqlOpts *SQLROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
if jsnCfg.SQLDBName != nil {
sqlOpts.DBName = jsnCfg.SQLDBName
}
if jsnCfg.SQLTableName != nil {
sqlOpts.TableName = jsnCfg.SQLTableName
}
if jsnCfg.SQLBatchSize != nil {
sqlOpts.BatchSize = jsnCfg.SQLBatchSize
}
if jsnCfg.SQLDeleteIndexedFields != nil {
dif := make([]string, len(*jsnCfg.SQLDeleteIndexedFields))
copy(dif, *jsnCfg.SQLDeleteIndexedFields)
sqlOpts.DeleteIndexedFields = &dif
}
if jsnCfg.PgSSLMode != nil {
sqlOpts.PgSSLMode = jsnCfg.PgSSLMode
}
return
}
type AWSROpts struct {
Region *string
Key *string
Secret *string
Token *string
SQSQueueID *string
S3BucketID *string
}
func (awsROpts *AWSROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
if jsnCfg.AWSRegion != nil {
awsROpts.Region = jsnCfg.AWSRegion
}
if jsnCfg.AWSKey != nil {
awsROpts.Key = jsnCfg.AWSKey
}
if jsnCfg.AWSSecret != nil {
awsROpts.Secret = jsnCfg.AWSSecret
}
if jsnCfg.AWSToken != nil {
awsROpts.Token = jsnCfg.AWSToken
}
if jsnCfg.SQSQueueID != nil {
awsROpts.SQSQueueID = jsnCfg.SQSQueueID
}
if jsnCfg.S3BucketID != nil {
awsROpts.S3BucketID = jsnCfg.S3BucketID
}
return
}
type NATSROpts struct {
JetStream *bool
ConsumerName *string
StreamName *string
Subject *string
QueueID *string
JWTFile *string
SeedFile *string
CertificateAuthority *string
ClientCertificate *string
ClientKey *string
JetStreamMaxWait *time.Duration
}
func (natsOpts *NATSROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
if jsnCfg.NATSJetStream != nil {
natsOpts.JetStream = jsnCfg.NATSJetStream
}
if jsnCfg.NATSConsumerName != nil {
natsOpts.ConsumerName = jsnCfg.NATSConsumerName
}
if jsnCfg.NATSStreamName != nil {
natsOpts.StreamName = jsnCfg.NATSStreamName
}
if jsnCfg.NATSSubject != nil {
natsOpts.Subject = jsnCfg.NATSSubject
}
if jsnCfg.NATSQueueID != nil {
natsOpts.QueueID = jsnCfg.NATSQueueID
}
if jsnCfg.NATSJWTFile != nil {
natsOpts.JWTFile = jsnCfg.NATSJWTFile
}
if jsnCfg.NATSSeedFile != nil {
natsOpts.SeedFile = jsnCfg.NATSSeedFile
}
if jsnCfg.NATSCertificateAuthority != nil {
natsOpts.CertificateAuthority = jsnCfg.NATSCertificateAuthority
}
if jsnCfg.NATSClientCertificate != nil {
natsOpts.ClientCertificate = jsnCfg.NATSClientCertificate
}
if jsnCfg.NATSClientKey != nil {
natsOpts.ClientKey = jsnCfg.NATSClientKey
}
if jsnCfg.NATSJetStreamMaxWait != nil {
var jetStreamMaxWait time.Duration
if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil {
return
}
natsOpts.JetStreamMaxWait = utils.DurationPointer(jetStreamMaxWait)
}
return
}
type CSVROpts struct {
PartialCSVFieldSeparator *string
RowLength *int
FieldSeparator *string
HeaderDefineChar *string
LazyQuotes *bool
}
func (csvROpts *CSVROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
if jsnCfg.PartialCSVFieldSeparator != nil {
csvROpts.PartialCSVFieldSeparator = jsnCfg.PartialCSVFieldSeparator
}
if jsnCfg.CSVRowLength != nil {
csvROpts.RowLength = jsnCfg.CSVRowLength
}
if jsnCfg.CSVFieldSeparator != nil {
csvROpts.FieldSeparator = jsnCfg.CSVFieldSeparator
}
if jsnCfg.CSVHeaderDefineChar != nil {
csvROpts.HeaderDefineChar = jsnCfg.CSVHeaderDefineChar
}
if jsnCfg.CSVLazyQuotes != nil {
csvROpts.LazyQuotes = jsnCfg.CSVLazyQuotes
}
return
}
type EventReaderOpts struct {
PartialPath *string
PartialCacheAction *string
PartialOrderField *string
XMLRootPath *string
CSV *CSVROpts
AMQP *AMQPROpts
AWS *AWSROpts
NATS *NATSROpts
Kafka *KafkaROpts
SQL *SQLROpts
}
// EventReaderCfg the event for the Event Reader
type EventReaderCfg struct {
ID string
Type string
// RunDelay determines how the Serve method initiates the reading process.
// - A value of 0 disables automatic reading, allowing manual control, possibly through an API.
// - A value of -1 enables watching directory changes indefinitely, applicable for file-based readers.
// - Any positive duration sets a fixed time interval for automatic reading cycles.
RunDelay time.Duration
// StartDelay adds a delay before starting reading loop
StartDelay time.Duration
ConcurrentReqs int
SourcePath string
ProcessedPath string
Tenant RSRParsers
Timezone string
Filters []string
Flags utils.FlagsWithParams
Reconnects int
MaxReconnectInterval time.Duration
EEsIDs []string
EEsSuccessIDs []string
EEsFailedIDs []string
Opts *EventReaderOpts
Fields []*FCTemplate
PartialCommitFields []*FCTemplate
CacheDumpFields []*FCTemplate
}
func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
if jsnCfg == nil {
return
}
if err = erOpts.AMQP.loadFromJSONCfg(jsnCfg); err != nil {
return
}
if err = erOpts.AWS.loadFromJSONCfg(jsnCfg); err != nil {
return
}
if err = erOpts.Kafka.loadFromJSONCfg(jsnCfg); err != nil {
return
}
if err = erOpts.NATS.loadFromJSONCfg(jsnCfg); err != nil {
return
}
if err = erOpts.SQL.loadFromJSONCfg(jsnCfg); err != nil {
return
}
if err = erOpts.CSV.loadFromJSONCfg(jsnCfg); err != nil {
return
}
if jsnCfg.PartialPath != nil {
erOpts.PartialPath = jsnCfg.PartialPath
}
if jsnCfg.PartialCacheAction != nil {
erOpts.PartialCacheAction = jsnCfg.PartialCacheAction
}
if jsnCfg.PartialOrderField != nil {
erOpts.PartialOrderField = jsnCfg.PartialOrderField
}
if jsnCfg.XMLRootPath != nil {
erOpts.XMLRootPath = jsnCfg.XMLRootPath
}
return
}
func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplates map[string][]*FCTemplate, sep string) (err error) {
if jsnCfg == nil {
return
}
if jsnCfg.Id != nil {
er.ID = *jsnCfg.Id
}
if jsnCfg.Type != nil {
er.Type = *jsnCfg.Type
}
if jsnCfg.Run_delay != nil {
if er.RunDelay, err = utils.ParseDurationWithNanosecs(*jsnCfg.Run_delay); err != nil {
return
}
}
if jsnCfg.Start_delay != nil {
if er.StartDelay, err = utils.ParseDurationWithNanosecs(*jsnCfg.Start_delay); err != nil {
return
}
}
if jsnCfg.Concurrent_requests != nil {
er.ConcurrentReqs = *jsnCfg.Concurrent_requests
}
if jsnCfg.Source_path != nil {
er.SourcePath = *jsnCfg.Source_path
}
if jsnCfg.Processed_path != nil {
er.ProcessedPath = *jsnCfg.Processed_path
}
if jsnCfg.Tenant != nil {
if er.Tenant, err = NewRSRParsers(*jsnCfg.Tenant, sep); err != nil {
return err
}
}
if jsnCfg.Timezone != nil {
er.Timezone = *jsnCfg.Timezone
}
if jsnCfg.Filters != nil {
er.Filters = make([]string, len(*jsnCfg.Filters))
copy(er.Filters, *jsnCfg.Filters)
}
if jsnCfg.Flags != nil {
er.Flags = utils.FlagsWithParamsFromSlice(*jsnCfg.Flags)
}
if jsnCfg.Reconnects != nil {
er.Reconnects = *jsnCfg.Reconnects
}
if jsnCfg.Max_reconnect_interval != nil {
if er.MaxReconnectInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Max_reconnect_interval); err != nil {
return err
}
}
if jsnCfg.Ees_ids != nil {
er.EEsIDs = make([]string, len(*jsnCfg.Ees_ids))
copy(er.EEsIDs, *jsnCfg.Ees_ids)
}
if jsnCfg.Ees_success_ids != nil {
er.EEsSuccessIDs = make([]string, len(*jsnCfg.Ees_success_ids))
copy(er.EEsSuccessIDs, *jsnCfg.Ees_success_ids)
}
if jsnCfg.Ees_failed_ids != nil {
er.EEsFailedIDs = make([]string, len(*jsnCfg.Ees_failed_ids))
copy(er.EEsFailedIDs, *jsnCfg.Ees_failed_ids)
}
if jsnCfg.Fields != nil {
if er.Fields, err = FCTemplatesFromFCTemplatesJSONCfg(*jsnCfg.Fields, sep); err != nil {
return err
}
if tpls, err := InflateTemplates(er.Fields, msgTemplates); err != nil {
return err
} else if tpls != nil {
er.Fields = tpls
}
}
if jsnCfg.Cache_dump_fields != nil {
if er.CacheDumpFields, err = FCTemplatesFromFCTemplatesJSONCfg(*jsnCfg.Cache_dump_fields, sep); err != nil {
return err
}
if tpls, err := InflateTemplates(er.CacheDumpFields, msgTemplates); err != nil {
return err
} else if tpls != nil {
er.CacheDumpFields = tpls
}
}
if jsnCfg.Partial_commit_fields != nil {
if er.PartialCommitFields, err = FCTemplatesFromFCTemplatesJSONCfg(*jsnCfg.Partial_commit_fields, sep); err != nil {
return err
}
if tpls, err := InflateTemplates(er.PartialCommitFields, msgTemplates); err != nil {
return err
} else if tpls != nil {
er.PartialCommitFields = tpls
}
}
if jsnCfg.Opts != nil {
err = er.Opts.loadFromJSONCfg(jsnCfg.Opts)
}
return
}
func (amqpOpts *AMQPROpts) Clone() *AMQPROpts {
cln := &AMQPROpts{}
if amqpOpts.QueueID != nil {
cln.QueueID = new(string)
*cln.QueueID = *amqpOpts.QueueID
}
if amqpOpts.Username != nil {
cln.Username = new(string)
*cln.Username = *amqpOpts.Username
}
if amqpOpts.Password != nil {
cln.Password = new(string)
*cln.Password = *amqpOpts.Password
}
if amqpOpts.ConsumerTag != nil {
cln.ConsumerTag = new(string)
*cln.ConsumerTag = *amqpOpts.ConsumerTag
}
if amqpOpts.Exchange != nil {
cln.Exchange = new(string)
*cln.Exchange = *amqpOpts.Exchange
}
if amqpOpts.ExchangeType != nil {
cln.ExchangeType = new(string)
*cln.ExchangeType = *amqpOpts.ExchangeType
}
if amqpOpts.RoutingKey != nil {
cln.RoutingKey = new(string)
*cln.RoutingKey = *amqpOpts.RoutingKey
}
return cln
}
func (csvOpts *CSVROpts) Clone() *CSVROpts {
cln := &CSVROpts{}
if csvOpts.PartialCSVFieldSeparator != nil {
cln.PartialCSVFieldSeparator = new(string)
*cln.PartialCSVFieldSeparator = *csvOpts.PartialCSVFieldSeparator
}
if csvOpts.RowLength != nil {
cln.RowLength = new(int)
*cln.RowLength = *csvOpts.RowLength
}
if csvOpts.FieldSeparator != nil {
cln.FieldSeparator = new(string)
*cln.FieldSeparator = *csvOpts.FieldSeparator
}
if csvOpts.HeaderDefineChar != nil {
cln.HeaderDefineChar = new(string)
*cln.HeaderDefineChar = *csvOpts.HeaderDefineChar
}
if csvOpts.LazyQuotes != nil {
cln.LazyQuotes = new(bool)
*cln.LazyQuotes = *csvOpts.LazyQuotes
}
return cln
}
func (kafkaOpts *KafkaROpts) Clone() *KafkaROpts {
cln := &KafkaROpts{}
if kafkaOpts.Topic != nil {
cln.Topic = new(string)
*cln.Topic = *kafkaOpts.Topic
}
if kafkaOpts.GroupID != nil {
cln.GroupID = new(string)
*cln.GroupID = *kafkaOpts.GroupID
}
if kafkaOpts.MaxWait != nil {
cln.MaxWait = new(time.Duration)
*cln.MaxWait = *kafkaOpts.MaxWait
}
if kafkaOpts.TLS != nil {
cln.TLS = new(bool)
*cln.TLS = *kafkaOpts.TLS
}
if kafkaOpts.CAPath != nil {
cln.CAPath = new(string)
*cln.CAPath = *kafkaOpts.CAPath
}
if kafkaOpts.SkipTLSVerify != nil {
cln.SkipTLSVerify = new(bool)
*cln.SkipTLSVerify = *kafkaOpts.SkipTLSVerify
}
return cln
}
func (sqlOpts *SQLROpts) Clone() *SQLROpts {
cln := &SQLROpts{}
if sqlOpts.DBName != nil {
cln.DBName = new(string)
*cln.DBName = *sqlOpts.DBName
}
if sqlOpts.TableName != nil {
cln.TableName = new(string)
*cln.TableName = *sqlOpts.TableName
}
if sqlOpts.BatchSize != nil {
cln.BatchSize = new(int)
*cln.BatchSize = *sqlOpts.BatchSize
}
if sqlOpts.DeleteIndexedFields != nil {
idx := make([]string, len(*sqlOpts.DeleteIndexedFields))
copy(idx, *sqlOpts.DeleteIndexedFields)
cln.DeleteIndexedFields = &idx
}
if sqlOpts.PgSSLMode != nil {
cln.PgSSLMode = new(string)
*cln.PgSSLMode = *sqlOpts.PgSSLMode
}
return cln
}
func (awsOpt *AWSROpts) Clone() *AWSROpts {
cln := &AWSROpts{}
if awsOpt.Region != nil {
cln.Region = new(string)
*cln.Region = *awsOpt.Region
}
if awsOpt.Key != nil {
cln.Key = new(string)
*cln.Key = *awsOpt.Key
}
if awsOpt.Secret != nil {
cln.Secret = new(string)
*cln.Secret = *awsOpt.Secret
}
if awsOpt.Token != nil {
cln.Token = new(string)
*cln.Token = *awsOpt.Token
}
if awsOpt.SQSQueueID != nil {
cln.SQSQueueID = new(string)
*cln.SQSQueueID = *awsOpt.SQSQueueID
}
if awsOpt.S3BucketID != nil {
cln.S3BucketID = new(string)
*cln.S3BucketID = *awsOpt.S3BucketID
}
return cln
}
func (natOpts *NATSROpts) Clone() *NATSROpts {
cln := &NATSROpts{}
if natOpts.JetStream != nil {
cln.JetStream = new(bool)
*cln.JetStream = *natOpts.JetStream
}
if natOpts.ConsumerName != nil {
cln.ConsumerName = new(string)
*cln.ConsumerName = *natOpts.ConsumerName
}
if natOpts.StreamName != nil {
cln.StreamName = new(string)
*cln.StreamName = *natOpts.StreamName
}
if natOpts.Subject != nil {
cln.Subject = new(string)
*cln.Subject = *natOpts.Subject
}
if natOpts.QueueID != nil {
cln.QueueID = new(string)
*cln.QueueID = *natOpts.QueueID
}
if natOpts.JWTFile != nil {
cln.JWTFile = new(string)
*cln.JWTFile = *natOpts.JWTFile
}
if natOpts.SeedFile != nil {
cln.SeedFile = new(string)
*cln.SeedFile = *natOpts.SeedFile
}
if natOpts.CertificateAuthority != nil {
cln.CertificateAuthority = new(string)
*cln.CertificateAuthority = *natOpts.CertificateAuthority
}
if natOpts.ClientCertificate != nil {
cln.ClientCertificate = new(string)
*cln.ClientCertificate = *natOpts.ClientCertificate
}
if natOpts.ClientKey != nil {
cln.ClientKey = new(string)
*cln.ClientKey = *natOpts.ClientKey
}
if natOpts.JetStreamMaxWait != nil {
cln.JetStreamMaxWait = new(time.Duration)
*cln.JetStreamMaxWait = *natOpts.JetStreamMaxWait
}
return cln
}
func (erOpts *EventReaderOpts) Clone() *EventReaderOpts {
cln := &EventReaderOpts{}
if erOpts.PartialPath != nil {
cln.PartialPath = new(string)
*cln.PartialPath = *erOpts.PartialPath
}
if erOpts.PartialCacheAction != nil {
cln.PartialCacheAction = new(string)
*cln.PartialCacheAction = *erOpts.PartialCacheAction
}
if erOpts.PartialOrderField != nil {
cln.PartialOrderField = new(string)
*cln.PartialOrderField = *erOpts.PartialOrderField
}
if erOpts.CSV != nil {
cln.CSV = erOpts.CSV.Clone()
}
if erOpts.XMLRootPath != nil {
cln.XMLRootPath = new(string)
*cln.XMLRootPath = *erOpts.XMLRootPath
}
if erOpts.AMQP != nil {
cln.AMQP = erOpts.AMQP.Clone()
}
if erOpts.NATS != nil {
cln.NATS = erOpts.NATS.Clone()
}
if erOpts.Kafka != nil {
cln.Kafka = erOpts.Kafka.Clone()
}
if erOpts.SQL != nil {
cln.SQL = erOpts.SQL.Clone()
}
if erOpts.AWS != nil {
cln.AWS = erOpts.AWS.Clone()
}
return cln
}
// Clone returns a deep copy of EventReaderCfg
func (er EventReaderCfg) Clone() (cln *EventReaderCfg) {
cln = &EventReaderCfg{
ID: er.ID,
Type: er.Type,
RunDelay: er.RunDelay,
StartDelay: er.StartDelay,
ConcurrentReqs: er.ConcurrentReqs,
SourcePath: er.SourcePath,
ProcessedPath: er.ProcessedPath,
Filters: slices.Clone(er.Filters),
Tenant: er.Tenant.Clone(),
Timezone: er.Timezone,
Flags: er.Flags.Clone(),
Reconnects: er.Reconnects,
MaxReconnectInterval: er.MaxReconnectInterval,
EEsIDs: slices.Clone(er.EEsIDs),
EEsSuccessIDs: slices.Clone(er.EEsSuccessIDs),
EEsFailedIDs: slices.Clone(er.EEsFailedIDs),
Opts: er.Opts.Clone(),
}
if er.Fields != nil {
cln.Fields = make([]*FCTemplate, len(er.Fields))
for idx, fld := range er.Fields {
cln.Fields[idx] = fld.Clone()
}
}
if er.CacheDumpFields != nil {
cln.CacheDumpFields = make([]*FCTemplate, len(er.CacheDumpFields))
for idx, fld := range er.CacheDumpFields {
cln.CacheDumpFields[idx] = fld.Clone()
}
}
if er.PartialCommitFields != nil {
cln.PartialCommitFields = make([]*FCTemplate, len(er.PartialCommitFields))
for idx, fld := range er.PartialCommitFields {
cln.PartialCommitFields[idx] = fld.Clone()
}
}
return
}
// AsMapInterface returns the config as a map[string]any
func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string]any) {
opts := map[string]any{}
if er.Opts.PartialPath != nil {
opts[utils.PartialPathOpt] = *er.Opts.PartialPath
}
if er.Opts.PartialCacheAction != nil {
opts[utils.PartialCacheActionOpt] = *er.Opts.PartialCacheAction
}
if er.Opts.PartialOrderField != nil {
opts[utils.PartialOrderFieldOpt] = *er.Opts.PartialOrderField
}
if csvOpts := er.Opts.CSV; csvOpts != nil {
if csvOpts.PartialCSVFieldSeparator != nil {
opts[utils.PartialCSVFieldSepartorOpt] = *csvOpts.PartialCSVFieldSeparator
}
if csvOpts.RowLength != nil {
opts[utils.CSVRowLengthOpt] = *csvOpts.RowLength
}
if csvOpts.FieldSeparator != nil {
opts[utils.CSVFieldSepOpt] = *csvOpts.FieldSeparator
}
if csvOpts.HeaderDefineChar != nil {
opts[utils.HeaderDefineCharOpt] = *csvOpts.HeaderDefineChar
}
if csvOpts.LazyQuotes != nil {
opts[utils.CSVLazyQuotes] = *csvOpts.LazyQuotes
}
}
if er.Opts.XMLRootPath != nil {
opts[utils.XMLRootPathOpt] = *er.Opts.XMLRootPath
}
if amqpOpts := er.Opts.AMQP; amqpOpts != nil {
if amqpOpts.QueueID != nil {
opts[utils.AMQPQueueID] = *amqpOpts.QueueID
}
if amqpOpts.Username != nil {
opts[utils.AMQPUsername] = *amqpOpts.Username
}
if amqpOpts.Password != nil {
opts[utils.AMQPPassword] = *amqpOpts.Password
}
if amqpOpts.ConsumerTag != nil {
opts[utils.AMQPConsumerTag] = *amqpOpts.ConsumerTag
}
if amqpOpts.Exchange != nil {
opts[utils.AMQPExchange] = *amqpOpts.Exchange
}
if amqpOpts.ExchangeType != nil {
opts[utils.AMQPExchangeType] = *amqpOpts.ExchangeType
}
if amqpOpts.RoutingKey != nil {
opts[utils.AMQPRoutingKey] = *amqpOpts.RoutingKey
}
}
if kafkaOpts := er.Opts.Kafka; kafkaOpts != nil {
if kafkaOpts.Topic != nil {
opts[utils.KafkaTopic] = *kafkaOpts.Topic
}
if kafkaOpts.GroupID != nil {
opts[utils.KafkaGroupID] = *kafkaOpts.GroupID
}
if kafkaOpts.MaxWait != nil {
opts[utils.KafkaMaxWait] = kafkaOpts.MaxWait.String()
}
if kafkaOpts.TLS != nil {
opts[utils.KafkaTLS] = *kafkaOpts.TLS
}
if kafkaOpts.CAPath != nil {
opts[utils.KafkaCAPath] = *kafkaOpts.CAPath
}
if kafkaOpts.SkipTLSVerify != nil {
opts[utils.KafkaSkipTLSVerify] = *kafkaOpts.SkipTLSVerify
}
}
if sqlOpts := er.Opts.SQL; sqlOpts != nil {
if sqlOpts.DBName != nil {
opts[utils.SQLDBNameOpt] = *sqlOpts.DBName
}
if sqlOpts.TableName != nil {
opts[utils.SQLTableNameOpt] = *sqlOpts.TableName
}
if sqlOpts.BatchSize != nil {
opts[utils.SQLBatchSize] = *sqlOpts.BatchSize
}
if sqlOpts.DeleteIndexedFields != nil {
deleteIndexedFields := make([]string, len(*sqlOpts.DeleteIndexedFields))
copy(deleteIndexedFields, *sqlOpts.DeleteIndexedFields)
opts[utils.SQLDeleteIndexedFieldsOpt] = deleteIndexedFields
}
if sqlOpts.PgSSLMode != nil {
opts[utils.PgSSLModeCfg] = *sqlOpts.PgSSLMode
}
}
if awsOpts := er.Opts.AWS; awsOpts != nil {
if awsOpts.Region != nil {
opts[utils.AWSRegion] = *awsOpts.Region
}
if awsOpts.Key != nil {
opts[utils.AWSKey] = *awsOpts.Key
}
if awsOpts.Secret != nil {
opts[utils.AWSSecret] = *awsOpts.Secret
}
if awsOpts.Token != nil {
opts[utils.AWSToken] = *awsOpts.Token
}
if awsOpts.SQSQueueID != nil {
opts[utils.SQSQueueID] = *awsOpts.SQSQueueID
}
if awsOpts.S3BucketID != nil {
opts[utils.S3Bucket] = *awsOpts.S3BucketID
}
}
if natsOpts := er.Opts.NATS; natsOpts != nil {
if natsOpts.JetStream != nil {
opts[utils.NatsJetStream] = *natsOpts.JetStream
}
if natsOpts.ConsumerName != nil {
opts[utils.NatsConsumerName] = *natsOpts.ConsumerName
}
if natsOpts.StreamName != nil {
opts[utils.NatsStreamName] = *natsOpts.StreamName
}
if natsOpts.Subject != nil {
opts[utils.NatsSubject] = *natsOpts.Subject
}
if natsOpts.QueueID != nil {
opts[utils.NatsQueueID] = *natsOpts.QueueID
}
if natsOpts.JWTFile != nil {
opts[utils.NatsJWTFile] = *natsOpts.JWTFile
}
if natsOpts.SeedFile != nil {
opts[utils.NatsSeedFile] = *natsOpts.SeedFile
}
if natsOpts.CertificateAuthority != nil {
opts[utils.NatsCertificateAuthority] = *natsOpts.CertificateAuthority
}
if natsOpts.ClientCertificate != nil {
opts[utils.NatsClientCertificate] = *natsOpts.ClientCertificate
}
if natsOpts.ClientKey != nil {
opts[utils.NatsClientKey] = *natsOpts.ClientKey
}
if natsOpts.JetStreamMaxWait != nil {
opts[utils.NatsJetStreamMaxWait] = natsOpts.JetStreamMaxWait.String()
}
}
initialMP = map[string]any{
utils.IDCfg: er.ID,
utils.TypeCfg: er.Type,
utils.ConcurrentRequestsCfg: er.ConcurrentReqs,
utils.SourcePathCfg: er.SourcePath,
utils.ProcessedPathCfg: er.ProcessedPath,
utils.TenantCfg: er.Tenant.GetRule(separator),
utils.TimezoneCfg: er.Timezone,
utils.FiltersCfg: er.Filters,
utils.FlagsCfg: []string{},
utils.RunDelayCfg: "0",
utils.StartDelayCfg: "0",
utils.ReconnectsCfg: er.Reconnects,
utils.MaxReconnectIntervalCfg: "0",
utils.OptsCfg: opts,
}
if len(er.EEsIDs) != 0 {
initialMP[utils.EEsIDsCfg] = er.EEsIDs
}
if len(er.EEsSuccessIDs) != 0 {
initialMP[utils.EEsSuccessIDsCfg] = er.EEsSuccessIDs
}
if len(er.EEsFailedIDs) != 0 {
initialMP[utils.EEsFailedIDsCfg] = er.EEsFailedIDs
}
if er.MaxReconnectInterval != 0 {
initialMP[utils.MaxReconnectIntervalCfg] = er.MaxReconnectInterval.String()
}
initialMP[utils.OptsCfg] = opts
if flags := er.Flags.SliceFlags(); flags != nil {
initialMP[utils.FlagsCfg] = flags
}
if er.Fields != nil {
fields := make([]map[string]any, len(er.Fields))
for i, item := range er.Fields {
fields[i] = item.AsMapInterface(separator)
}
initialMP[utils.FieldsCfg] = fields
}
if er.CacheDumpFields != nil {
cacheDumpFields := make([]map[string]any, len(er.CacheDumpFields))
for i, item := range er.CacheDumpFields {
cacheDumpFields[i] = item.AsMapInterface(separator)
}
initialMP[utils.CacheDumpFieldsCfg] = cacheDumpFields
}
if er.PartialCommitFields != nil {
parCFields := make([]map[string]any, len(er.PartialCommitFields))
for i, item := range er.PartialCommitFields {
parCFields[i] = item.AsMapInterface(separator)
}
initialMP[utils.PartialCommitFieldsCfg] = parCFields
}
if er.RunDelay > 0 {
initialMP[utils.RunDelayCfg] = er.RunDelay.String()
} else if er.RunDelay < 0 {
initialMP[utils.RunDelayCfg] = "-1"
}
if er.StartDelay > 0 {
initialMP[utils.StartDelayCfg] = er.StartDelay.String()
}
return
}