/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see */ package config import ( "time" "github.com/cgrates/cgrates/utils" ) // EEsCfg the config for Event Exporters type EEsCfg struct { Enabled bool AttributeSConns []string Cache map[string]*CacheParamCfg FailedPosts *FailedPostsCfg Exporters []*EventExporterCfg } // ExporterCfg iterates over the Exporters slice and returns the exporter // configuration associated with the specified "id". If none were found, the // method will return nil. func (c *EEsCfg) ExporterCfg(id string) *EventExporterCfg { for _, eeCfg := range c.Exporters { if eeCfg.ID == id { return eeCfg } } return nil } func (c *EEsCfg) loadFromJSONCfg(jc *EEsJsonCfg, msgTemplates map[string][]*FCTemplate, sep string, dfltExpCfg *EventExporterCfg) error { if jc == nil { return nil } if jc.Enabled != nil { c.Enabled = *jc.Enabled } if jc.Cache != nil { for kJsn, vJsn := range *jc.Cache { val := new(CacheParamCfg) if err := val.loadFromJSONCfg(vJsn); err != nil { return err } c.Cache[kJsn] = val } } if jc.AttributeSConns != nil { c.AttributeSConns = make([]string, len(*jc.AttributeSConns)) for i, fID := range *jc.AttributeSConns { // if we have the connection internal we change the name so we can have internal rpc for each subsystem c.AttributeSConns[i] = fID if fID == utils.MetaInternal { c.AttributeSConns[i] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes) } } } if err := c.FailedPosts.loadFromJSONCfg(jc.FailedPosts); err != nil { return err } return c.appendEEsExporters(jc.Exporters, msgTemplates, sep, dfltExpCfg) } func (c *EEsCfg) appendEEsExporters(exporters *[]*EventExporterJsonCfg, msgTemplates map[string][]*FCTemplate, separator string, dfltExpCfg *EventExporterCfg) (err error) { if exporters == nil { return } for _, jsnExp := range *exporters { var exp *EventExporterCfg if jsnExp.Id != nil { for _, exporter := range c.Exporters { if exporter.ID == *jsnExp.Id { exp = exporter break } } } if exp == nil { if dfltExpCfg != nil { exp = dfltExpCfg.Clone() } else { exp = new(EventExporterCfg) exp.Opts = &EventExporterOpts{} } c.Exporters = append(c.Exporters, exp) } if err = exp.loadFromJSONCfg(jsnExp, msgTemplates, separator); err != nil { return } } return } // Clone returns a deep copy of EEsCfg func (c *EEsCfg) Clone() (cln *EEsCfg) { cln = &EEsCfg{ Enabled: c.Enabled, AttributeSConns: make([]string, len(c.AttributeSConns)), Cache: make(map[string]*CacheParamCfg), FailedPosts: c.FailedPosts.Clone(), Exporters: make([]*EventExporterCfg, len(c.Exporters)), } copy(cln.AttributeSConns, c.AttributeSConns) for key, value := range c.Cache { cln.Cache[key] = value.Clone() } for idx, exp := range c.Exporters { cln.Exporters[idx] = exp.Clone() } return } // AsMapInterface returns the config as a map[string]any func (c *EEsCfg) AsMapInterface(separator string) (initialMP map[string]any) { initialMP = map[string]any{ utils.EnabledCfg: c.Enabled, utils.FailedPostsCfg: c.FailedPosts.AsMapInterface(), } if c.AttributeSConns != nil { attributeSConns := make([]string, len(c.AttributeSConns)) for i, item := range c.AttributeSConns { attributeSConns[i] = item if item == utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes) { attributeSConns[i] = utils.MetaInternal } } initialMP[utils.AttributeSConnsCfg] = attributeSConns } if c.Cache != nil { cache := make(map[string]any, len(c.Cache)) for key, value := range c.Cache { cache[key] = value.AsMapInterface() } initialMP[utils.CacheCfg] = cache } if c.FailedPosts != nil { } if c.Exporters != nil { exporters := make([]map[string]any, len(c.Exporters)) for i, item := range c.Exporters { exporters[i] = item.AsMapInterface(separator) } initialMP[utils.ExportersCfg] = exporters } return } func (c *EEsCfg) exporterIDs() []string { ids := make([]string, 0, len(c.Exporters)) for _, exporter := range c.Exporters { ids = append(ids, exporter.ID) } return ids } type ElsOpts struct { // index request opts Index *string Refresh *string OpType *string Pipeline *string Routing *string Timeout *time.Duration WaitForActiveShards *string // elasticsearch client opts CAPath *string DiscoverNodesOnStart *bool DiscoverNodeInterval *time.Duration Cloud *bool APIKey *string CertificateFingerprint *string ServiceToken *string Username *string // Username for HTTP Basic Authentication. Password *string EnableDebugLogger *bool Logger *string CompressRequestBody *bool CompressRequestBodyLevel *int RetryOnStatus *[]int MaxRetries *int DisableRetry *bool } type SQLOpts struct { MaxIdleConns *int MaxOpenConns *int ConnMaxLifetime *time.Duration MYSQLDSNParams map[string]string TableName *string DBName *string UpdateIndexedFields *[]string PgSSLMode *string } type AMQPOpts struct { RoutingKey *string QueueID *string Exchange *string ExchangeType *string Username *string Password *string } type AWSOpts struct { Region *string Key *string Secret *string Token *string SQSQueueID *string SQSForcePathStyle *bool SQSSkipTlsVerify *bool S3BucketID *string S3FolderPath *string S3ForcePathStyle *bool S3SkipTlsVerify *bool } type NATSOpts struct { JetStream *bool Subject *string JWTFile *string SeedFile *string CertificateAuthority *string ClientCertificate *string ClientKey *string JetStreamMaxWait *time.Duration } type RPCOpts struct { RPCCodec *string ServiceMethod *string KeyPath *string CertPath *string CAPath *string TLS *bool ConnIDs *[]string RPCConnTimeout *time.Duration RPCReplyTimeout *time.Duration RPCAPIOpts map[string]any } type KafkaOpts struct { Topic *string BatchSize *int TLS *bool CAPath *string SkipTLSVerify *bool } type EventExporterOpts struct { CSVFieldSeparator *string Els *ElsOpts SQL *SQLOpts AMQP *AMQPOpts AWS *AWSOpts NATS *NATSOpts RPC *RPCOpts Kafka *KafkaOpts } // EventExporterCfg the config for a Event Exporter type EventExporterCfg struct { ID string Type string ExportPath string Opts *EventExporterOpts Timezone string Filters []string Flags utils.FlagsWithParams AttributeSIDs []string // selective AttributeS profiles AttributeSCtx string // context to use when querying AttributeS Synchronous bool Attempts int FailedPostsDir string ConcurrentRequests int MetricsResetSchedule string Fields []*FCTemplate headerFields []*FCTemplate contentFields []*FCTemplate trailerFields []*FCTemplate } // NewEventExporterCfg is a constructor for the EventExporterCfg, that is needed to initialize posters that are used by the // readers and HTTP exporter actions func NewEventExporterCfg(ID, exportType, exportPath, failedPostsDir string, attempts int, opts *EventExporterOpts) *EventExporterCfg { if opts == nil { opts = new(EventExporterOpts) } return &EventExporterCfg{ ID: ID, Type: exportType, ExportPath: exportPath, FailedPostsDir: failedPostsDir, Attempts: attempts, Opts: opts, } } func (elsOpts *ElsOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg.ElsCloud != nil { elsOpts.Cloud = jsnCfg.ElsCloud } if jsnCfg.ElsAPIKey != nil { elsOpts.APIKey = jsnCfg.ElsAPIKey } if jsnCfg.ElsServiceToken != nil { elsOpts.ServiceToken = jsnCfg.ElsServiceToken } if jsnCfg.ElsCertificateFingerprint != nil { elsOpts.CertificateFingerprint = jsnCfg.ElsCertificateFingerprint } if jsnCfg.ElsEnableDebugLogger != nil { elsOpts.EnableDebugLogger = jsnCfg.ElsEnableDebugLogger } if jsnCfg.ElsLogger != nil { elsOpts.Logger = jsnCfg.ElsLogger } if jsnCfg.ElsCompressRequestBody != nil { elsOpts.CompressRequestBody = jsnCfg.ElsCompressRequestBody } if jsnCfg.ElsCompressRequestBodyLevel != nil { elsOpts.CompressRequestBodyLevel = jsnCfg.ElsCompressRequestBodyLevel } if jsnCfg.ElsUsername != nil { elsOpts.Username = jsnCfg.ElsUsername } if jsnCfg.ElsPassword != nil { elsOpts.Password = jsnCfg.ElsPassword } if jsnCfg.ElsCAPath != nil { elsOpts.CAPath = jsnCfg.ElsCAPath } if jsnCfg.ElsDiscoverNodesOnStart != nil { elsOpts.DiscoverNodesOnStart = jsnCfg.ElsDiscoverNodesOnStart } if jsnCfg.ElsDiscoverNodesInterval != nil { var nodesInterval time.Duration if nodesInterval, err = utils.ParseDurationWithSecs(*jsnCfg.ElsDiscoverNodesInterval); err != nil { return } elsOpts.DiscoverNodeInterval = utils.DurationPointer(nodesInterval) } if jsnCfg.ElsRetryOnStatus != nil { elsOpts.RetryOnStatus = jsnCfg.ElsRetryOnStatus } if jsnCfg.ElsMaxRetries != nil { elsOpts.MaxRetries = jsnCfg.ElsMaxRetries } if jsnCfg.ElsDisableRetry != nil { elsOpts.DisableRetry = jsnCfg.ElsDisableRetry } if jsnCfg.ElsIndex != nil { elsOpts.Index = jsnCfg.ElsIndex } if jsnCfg.ElsRefresh != nil { elsOpts.Refresh = jsnCfg.ElsRefresh } if jsnCfg.ElsOpType != nil { elsOpts.OpType = jsnCfg.ElsOpType } if jsnCfg.ElsPipeline != nil { elsOpts.Pipeline = jsnCfg.ElsPipeline } if jsnCfg.ElsRouting != nil { elsOpts.Routing = jsnCfg.ElsRouting } if jsnCfg.ElsTimeout != nil { var elsTimeout time.Duration if elsTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.ElsTimeout); err != nil { return } elsOpts.Timeout = utils.DurationPointer(elsTimeout) } if jsnCfg.ElsWaitForActiveShards != nil { elsOpts.WaitForActiveShards = jsnCfg.ElsWaitForActiveShards } return } func (kafkaOpts *KafkaOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg.KafkaTopic != nil { kafkaOpts.Topic = jsnCfg.KafkaTopic } if jsnCfg.KafkaBatchSize != nil { kafkaOpts.BatchSize = jsnCfg.KafkaBatchSize } if jsnCfg.KafkaTLS != nil { kafkaOpts.TLS = jsnCfg.KafkaTLS } if jsnCfg.KafkaCAPath != nil { kafkaOpts.CAPath = jsnCfg.KafkaCAPath } if jsnCfg.KafkaSkipTLSVerify != nil { kafkaOpts.SkipTLSVerify = jsnCfg.KafkaSkipTLSVerify } return } func (sqlOpts *SQLOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg.SQLMaxIdleConns != nil { sqlOpts.MaxIdleConns = jsnCfg.SQLMaxIdleConns } if jsnCfg.SQLMaxOpenConns != nil { sqlOpts.MaxOpenConns = jsnCfg.SQLMaxOpenConns } if jsnCfg.SQLConnMaxLifetime != nil { var sqlConnMaxLifetime time.Duration if sqlConnMaxLifetime, err = utils.ParseDurationWithNanosecs(*jsnCfg.SQLConnMaxLifetime); err != nil { return } sqlOpts.ConnMaxLifetime = utils.DurationPointer(sqlConnMaxLifetime) } if jsnCfg.MYSQLDSNParams != nil { sqlOpts.MYSQLDSNParams = make(map[string]string) sqlOpts.MYSQLDSNParams = jsnCfg.MYSQLDSNParams } if jsnCfg.SQLTableName != nil { sqlOpts.TableName = jsnCfg.SQLTableName } if jsnCfg.SQLDBName != nil { sqlOpts.DBName = jsnCfg.SQLDBName } if jsnCfg.SQLUpdateIndexedFields != nil { uif := make([]string, len(*jsnCfg.SQLUpdateIndexedFields)) copy(uif, *jsnCfg.SQLUpdateIndexedFields) sqlOpts.UpdateIndexedFields = &uif } if jsnCfg.PgSSLMode != nil { sqlOpts.PgSSLMode = jsnCfg.PgSSLMode } return } func (amqpOpts *AMQPOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg.AMQPQueueID != nil { amqpOpts.QueueID = jsnCfg.AMQPQueueID } if jsnCfg.AMQPRoutingKey != nil { amqpOpts.RoutingKey = jsnCfg.AMQPRoutingKey } if jsnCfg.AMQPExchange != nil { amqpOpts.Exchange = jsnCfg.AMQPExchange } if jsnCfg.AMQPExchangeType != nil { amqpOpts.ExchangeType = jsnCfg.AMQPExchangeType } if jsnCfg.AMQPUsername != nil { amqpOpts.Username = jsnCfg.AMQPUsername } if jsnCfg.AMQPPassword != nil { amqpOpts.Password = jsnCfg.AMQPPassword } return } func (awsOpts *AWSOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg.AWSRegion != nil { awsOpts.Region = jsnCfg.AWSRegion } if jsnCfg.AWSKey != nil { awsOpts.Key = jsnCfg.AWSKey } if jsnCfg.AWSSecret != nil { awsOpts.Secret = jsnCfg.AWSSecret } if jsnCfg.AWSToken != nil { awsOpts.Token = jsnCfg.AWSToken } if jsnCfg.SQSQueueID != nil { awsOpts.SQSQueueID = jsnCfg.SQSQueueID } if jsnCfg.SQSForcePathStyle != nil { awsOpts.SQSForcePathStyle = jsnCfg.SQSForcePathStyle } if jsnCfg.SQSSkipTlsVerify != nil { awsOpts.SQSSkipTlsVerify = jsnCfg.SQSSkipTlsVerify } if jsnCfg.S3BucketID != nil { awsOpts.S3BucketID = jsnCfg.S3BucketID } if jsnCfg.S3FolderPath != nil { awsOpts.S3FolderPath = jsnCfg.S3FolderPath } if jsnCfg.S3ForcePathStyle != nil { awsOpts.S3ForcePathStyle = jsnCfg.S3ForcePathStyle } if jsnCfg.S3SkipTlsVerify != nil { awsOpts.S3SkipTlsVerify = jsnCfg.S3SkipTlsVerify } return } func (natsOpts *NATSOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg.NATSJetStream != nil { natsOpts.JetStream = jsnCfg.NATSJetStream } if jsnCfg.NATSSubject != nil { natsOpts.Subject = jsnCfg.NATSSubject } if jsnCfg.NATSJWTFile != nil { natsOpts.JWTFile = jsnCfg.NATSJWTFile } if jsnCfg.NATSSeedFile != nil { natsOpts.SeedFile = jsnCfg.NATSSeedFile } if jsnCfg.NATSCertificateAuthority != nil { natsOpts.CertificateAuthority = jsnCfg.NATSCertificateAuthority } if jsnCfg.NATSClientCertificate != nil { natsOpts.ClientCertificate = jsnCfg.NATSClientCertificate } if jsnCfg.NATSClientKey != nil { natsOpts.ClientKey = jsnCfg.NATSClientKey } if jsnCfg.NATSJetStreamMaxWait != nil { var natsJetStreamMaxWait time.Duration if natsJetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil { return } natsOpts.JetStreamMaxWait = utils.DurationPointer(natsJetStreamMaxWait) } return } func (rpcOpts *RPCOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg.RPCCodec != nil { rpcOpts.RPCCodec = jsnCfg.RPCCodec } if jsnCfg.ServiceMethod != nil { rpcOpts.ServiceMethod = jsnCfg.ServiceMethod } if jsnCfg.KeyPath != nil { rpcOpts.KeyPath = jsnCfg.KeyPath } if jsnCfg.CertPath != nil { rpcOpts.CertPath = jsnCfg.CertPath } if jsnCfg.CAPath != nil { rpcOpts.CAPath = jsnCfg.CAPath } if jsnCfg.TLS != nil { rpcOpts.TLS = jsnCfg.TLS } if jsnCfg.ConnIDs != nil { rpcOpts.ConnIDs = jsnCfg.ConnIDs } if jsnCfg.RPCConnTimeout != nil { var rpcConnTimeout time.Duration if rpcConnTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.RPCConnTimeout); err != nil { return } rpcOpts.RPCConnTimeout = utils.DurationPointer(rpcConnTimeout) } if jsnCfg.RPCReplyTimeout != nil { var rpcReplyTimeout time.Duration if rpcReplyTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.RPCReplyTimeout); err != nil { return } rpcOpts.RPCReplyTimeout = utils.DurationPointer(rpcReplyTimeout) } if jsnCfg.RPCAPIOpts != nil { rpcOpts.RPCAPIOpts = make(map[string]any) rpcOpts.RPCAPIOpts = jsnCfg.RPCAPIOpts } return } func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg == nil { return } if jsnCfg.CSVFieldSeparator != nil { eeOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator } if err = eeOpts.Els.loadFromJSONCfg(jsnCfg); err != nil { return } if err = eeOpts.Kafka.loadFromJSONCfg(jsnCfg); err != nil { return } if err = eeOpts.SQL.loadFromJSONCfg(jsnCfg); err != nil { return } if err = eeOpts.AMQP.loadFromJSONCfg(jsnCfg); err != nil { return } if err = eeOpts.AWS.loadFromJSONCfg(jsnCfg); err != nil { return } if err = eeOpts.NATS.loadFromJSONCfg(jsnCfg); err != nil { return } if err = eeOpts.RPC.loadFromJSONCfg(jsnCfg); err != nil { return } return } func (eeC *EventExporterCfg) loadFromJSONCfg(jsnEec *EventExporterJsonCfg, msgTemplates map[string][]*FCTemplate, separator string) (err error) { if jsnEec == nil { return } if jsnEec.Id != nil { eeC.ID = *jsnEec.Id } if jsnEec.Type != nil { eeC.Type = *jsnEec.Type } if jsnEec.Export_path != nil { eeC.ExportPath = *jsnEec.Export_path } if jsnEec.Timezone != nil { eeC.Timezone = *jsnEec.Timezone } if jsnEec.Filters != nil { eeC.Filters = make([]string, len(*jsnEec.Filters)) copy(eeC.Filters, *jsnEec.Filters) } if jsnEec.Flags != nil { eeC.Flags = utils.FlagsWithParamsFromSlice(*jsnEec.Flags) } if jsnEec.Attribute_context != nil { eeC.AttributeSCtx = *jsnEec.Attribute_context } if jsnEec.Attribute_ids != nil { eeC.AttributeSIDs = make([]string, len(*jsnEec.Attribute_ids)) copy(eeC.AttributeSIDs, *jsnEec.Attribute_ids) } if jsnEec.Synchronous != nil { eeC.Synchronous = *jsnEec.Synchronous } if jsnEec.Attempts != nil { eeC.Attempts = *jsnEec.Attempts } if jsnEec.Concurrent_requests != nil { eeC.ConcurrentRequests = *jsnEec.Concurrent_requests } if jsnEec.MetricsResetSchedule != nil { eeC.MetricsResetSchedule = *jsnEec.MetricsResetSchedule } if jsnEec.Fields != nil { eeC.Fields, err = FCTemplatesFromFCTemplatesJSONCfg(*jsnEec.Fields, separator) if err != nil { return } if tpls, err := InflateTemplates(eeC.Fields, msgTemplates); err != nil { return err } else if tpls != nil { eeC.Fields = tpls } eeC.ComputeFields() } if jsnEec.Failed_posts_dir != nil { eeC.FailedPostsDir = *jsnEec.Failed_posts_dir } if jsnEec.Opts != nil { err = eeC.Opts.loadFromJSONCfg(jsnEec.Opts) } return } // ComputeFields will split the fields in header trailer or content // exported for ees testing func (eeC *EventExporterCfg) ComputeFields() { eeC.headerFields = make([]*FCTemplate, 0) eeC.contentFields = make([]*FCTemplate, 0) eeC.trailerFields = make([]*FCTemplate, 0) for _, field := range eeC.Fields { switch field.GetPathSlice()[0] { case utils.MetaHdr: eeC.headerFields = append(eeC.headerFields, field) case utils.MetaExp, utils.MetaUCH: eeC.contentFields = append(eeC.contentFields, field) case utils.MetaTrl: eeC.trailerFields = append(eeC.trailerFields, field) } } } // HeaderFields returns the fields that have *hdr prefix func (eeC *EventExporterCfg) HeaderFields() []*FCTemplate { return eeC.headerFields } // ContentFields returns the fields that do not have *hdr or *trl prefix func (eeC *EventExporterCfg) ContentFields() []*FCTemplate { return eeC.contentFields } // TrailerFields returns the fields that have *trl prefix func (eeC *EventExporterCfg) TrailerFields() []*FCTemplate { return eeC.trailerFields } func (elsOpts *ElsOpts) Clone() *ElsOpts { cln := &ElsOpts{} if elsOpts.Index != nil { cln.Index = new(string) *cln.Index = *elsOpts.Index } if elsOpts.Refresh != nil { cln.Refresh = new(string) *cln.Refresh = *elsOpts.Refresh } if elsOpts.OpType != nil { cln.OpType = new(string) *cln.OpType = *elsOpts.OpType } if elsOpts.Pipeline != nil { cln.Pipeline = new(string) *cln.Pipeline = *elsOpts.Pipeline } if elsOpts.Routing != nil { cln.Routing = new(string) *cln.Routing = *elsOpts.Routing } if elsOpts.Timeout != nil { cln.Timeout = new(time.Duration) *cln.Timeout = *elsOpts.Timeout } if elsOpts.WaitForActiveShards != nil { cln.WaitForActiveShards = new(string) *cln.WaitForActiveShards = *elsOpts.WaitForActiveShards } if elsOpts.CAPath != nil { cln.CAPath = new(string) *cln.CAPath = *elsOpts.CAPath } if elsOpts.DiscoverNodesOnStart != nil { cln.DiscoverNodesOnStart = new(bool) *cln.DiscoverNodesOnStart = *elsOpts.DiscoverNodesOnStart } if elsOpts.DiscoverNodeInterval != nil { cln.DiscoverNodeInterval = new(time.Duration) *cln.DiscoverNodeInterval = *elsOpts.DiscoverNodeInterval } if elsOpts.Cloud != nil { cln.Cloud = new(bool) *cln.Cloud = *elsOpts.Cloud } if elsOpts.APIKey != nil { cln.APIKey = new(string) *cln.APIKey = *elsOpts.APIKey } if elsOpts.CertificateFingerprint != nil { cln.CertificateFingerprint = new(string) *cln.CertificateFingerprint = *elsOpts.CertificateFingerprint } if elsOpts.ServiceToken != nil { cln.ServiceToken = new(string) *cln.ServiceToken = *elsOpts.ServiceToken } if elsOpts.Username != nil { cln.Username = new(string) *cln.Username = *elsOpts.Username } if elsOpts.Password != nil { cln.Password = new(string) *cln.Password = *elsOpts.Password } if elsOpts.EnableDebugLogger != nil { cln.EnableDebugLogger = new(bool) *cln.EnableDebugLogger = *elsOpts.EnableDebugLogger } if elsOpts.Logger != nil { cln.Logger = new(string) *cln.Logger = *elsOpts.Logger } if elsOpts.CompressRequestBody != nil { cln.CompressRequestBody = new(bool) *cln.CompressRequestBody = *elsOpts.CompressRequestBody } if elsOpts.CompressRequestBodyLevel != nil { cln.CompressRequestBodyLevel = new(int) *cln.CompressRequestBodyLevel = *elsOpts.CompressRequestBodyLevel } if elsOpts.RetryOnStatus != nil { cln.RetryOnStatus = new([]int) *cln.RetryOnStatus = *elsOpts.RetryOnStatus } if elsOpts.MaxRetries != nil { cln.MaxRetries = new(int) *cln.MaxRetries = *elsOpts.MaxRetries } if elsOpts.DisableRetry != nil { cln.DisableRetry = new(bool) *cln.DisableRetry = *elsOpts.DisableRetry } return cln } func (kafkaOpts *KafkaOpts) Clone() *KafkaOpts { cln := &KafkaOpts{} if kafkaOpts.Topic != nil { cln.Topic = new(string) *cln.Topic = *kafkaOpts.Topic } if kafkaOpts.BatchSize != nil { cln.BatchSize = new(int) *cln.BatchSize = *kafkaOpts.BatchSize } if kafkaOpts.TLS != nil { cln.TLS = new(bool) *cln.TLS = *kafkaOpts.TLS } if kafkaOpts.CAPath != nil { cln.CAPath = new(string) *cln.CAPath = *kafkaOpts.CAPath } if kafkaOpts.SkipTLSVerify != nil { cln.SkipTLSVerify = new(bool) *cln.SkipTLSVerify = *kafkaOpts.SkipTLSVerify } return cln } func (sqlOpts *SQLOpts) Clone() *SQLOpts { cln := &SQLOpts{} if sqlOpts.MaxIdleConns != nil { cln.MaxIdleConns = new(int) *cln.MaxIdleConns = *sqlOpts.MaxIdleConns } if sqlOpts.MaxOpenConns != nil { cln.MaxOpenConns = new(int) *cln.MaxOpenConns = *sqlOpts.MaxOpenConns } if sqlOpts.ConnMaxLifetime != nil { cln.ConnMaxLifetime = new(time.Duration) *cln.ConnMaxLifetime = *sqlOpts.ConnMaxLifetime } if sqlOpts.MYSQLDSNParams != nil { cln.MYSQLDSNParams = make(map[string]string) cln.MYSQLDSNParams = sqlOpts.MYSQLDSNParams } if sqlOpts.TableName != nil { cln.TableName = new(string) *cln.TableName = *sqlOpts.TableName } if sqlOpts.DBName != nil { cln.DBName = new(string) *cln.DBName = *sqlOpts.DBName } if sqlOpts.UpdateIndexedFields != nil { idx := make([]string, len(*sqlOpts.UpdateIndexedFields)) copy(idx, *sqlOpts.UpdateIndexedFields) cln.UpdateIndexedFields = &idx } if sqlOpts.PgSSLMode != nil { cln.PgSSLMode = new(string) *cln.PgSSLMode = *sqlOpts.PgSSLMode } return cln } func (amqpOpts *AMQPOpts) Clone() *AMQPOpts { cln := &AMQPOpts{} if amqpOpts.QueueID != nil { cln.QueueID = new(string) *cln.QueueID = *amqpOpts.QueueID } if amqpOpts.RoutingKey != nil { cln.RoutingKey = new(string) *cln.RoutingKey = *amqpOpts.RoutingKey } if amqpOpts.Exchange != nil { cln.Exchange = new(string) *cln.Exchange = *amqpOpts.Exchange } if amqpOpts.ExchangeType != nil { cln.ExchangeType = new(string) *cln.ExchangeType = *amqpOpts.ExchangeType } if amqpOpts.Username != nil { cln.Username = new(string) *cln.Username = *amqpOpts.Username } if amqpOpts.Password != nil { cln.Password = new(string) *cln.Password = *amqpOpts.Password } return cln } func (awsOpts *AWSOpts) Clone() *AWSOpts { cln := &AWSOpts{} if awsOpts.Region != nil { cln.Region = new(string) *cln.Region = *awsOpts.Region } if awsOpts.Key != nil { cln.Key = new(string) *cln.Key = *awsOpts.Key } if awsOpts.Secret != nil { cln.Secret = new(string) *cln.Secret = *awsOpts.Secret } if awsOpts.Token != nil { cln.Token = new(string) *cln.Token = *awsOpts.Token } if awsOpts.SQSQueueID != nil { cln.SQSQueueID = new(string) *cln.SQSQueueID = *awsOpts.SQSQueueID } if awsOpts.SQSForcePathStyle != nil { cln.SQSForcePathStyle = new(bool) *cln.SQSForcePathStyle = *awsOpts.SQSForcePathStyle } if awsOpts.SQSSkipTlsVerify != nil { cln.SQSSkipTlsVerify = new(bool) *cln.SQSSkipTlsVerify = *awsOpts.SQSSkipTlsVerify } if awsOpts.S3BucketID != nil { cln.S3BucketID = new(string) *cln.S3BucketID = *awsOpts.S3BucketID } if awsOpts.S3FolderPath != nil { cln.S3FolderPath = new(string) *cln.S3FolderPath = *awsOpts.S3FolderPath } if awsOpts.S3ForcePathStyle != nil { cln.S3ForcePathStyle = new(bool) *cln.S3ForcePathStyle = *awsOpts.S3ForcePathStyle } if awsOpts.S3SkipTlsVerify != nil { cln.S3SkipTlsVerify = new(bool) *cln.S3SkipTlsVerify = *awsOpts.S3SkipTlsVerify } return cln } func (natsOpts *NATSOpts) Clone() *NATSOpts { cln := &NATSOpts{} if natsOpts.JetStream != nil { cln.JetStream = new(bool) *cln.JetStream = *natsOpts.JetStream } if natsOpts.Subject != nil { cln.Subject = new(string) *cln.Subject = *natsOpts.Subject } if natsOpts.JWTFile != nil { cln.JWTFile = new(string) *cln.JWTFile = *natsOpts.JWTFile } if natsOpts.SeedFile != nil { cln.SeedFile = new(string) *cln.SeedFile = *natsOpts.SeedFile } if natsOpts.CertificateAuthority != nil { cln.CertificateAuthority = new(string) *cln.CertificateAuthority = *natsOpts.CertificateAuthority } if natsOpts.ClientCertificate != nil { cln.ClientCertificate = new(string) *cln.ClientCertificate = *natsOpts.ClientCertificate } if natsOpts.ClientKey != nil { cln.ClientKey = new(string) *cln.ClientKey = *natsOpts.ClientKey } if natsOpts.JetStreamMaxWait != nil { cln.JetStreamMaxWait = new(time.Duration) *cln.JetStreamMaxWait = *natsOpts.JetStreamMaxWait } return cln } func (rpcOpts *RPCOpts) Clone() *RPCOpts { cln := &RPCOpts{} if rpcOpts.RPCCodec != nil { cln.RPCCodec = new(string) *cln.RPCCodec = *rpcOpts.RPCCodec } if rpcOpts.ServiceMethod != nil { cln.ServiceMethod = new(string) *cln.ServiceMethod = *rpcOpts.ServiceMethod } if rpcOpts.KeyPath != nil { cln.KeyPath = new(string) *cln.KeyPath = *rpcOpts.KeyPath } if rpcOpts.CertPath != nil { cln.CertPath = new(string) *cln.CertPath = *rpcOpts.CertPath } if rpcOpts.CAPath != nil { cln.CAPath = new(string) *cln.CAPath = *rpcOpts.CAPath } if rpcOpts.TLS != nil { cln.TLS = new(bool) *cln.TLS = *rpcOpts.TLS } if rpcOpts.ConnIDs != nil { cln.ConnIDs = new([]string) *cln.ConnIDs = *rpcOpts.ConnIDs } if rpcOpts.RPCConnTimeout != nil { cln.RPCConnTimeout = new(time.Duration) *cln.RPCConnTimeout = *rpcOpts.RPCConnTimeout } if rpcOpts.RPCReplyTimeout != nil { cln.RPCReplyTimeout = new(time.Duration) *cln.RPCReplyTimeout = *rpcOpts.RPCReplyTimeout } if rpcOpts.RPCAPIOpts != nil { cln.RPCAPIOpts = make(map[string]any) cln.RPCAPIOpts = rpcOpts.RPCAPIOpts } return cln } func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts { cln := &EventExporterOpts{} if eeOpts.CSVFieldSeparator != nil { cln.CSVFieldSeparator = new(string) *cln.CSVFieldSeparator = *eeOpts.CSVFieldSeparator } if eeOpts.Els != nil { cln.Els = eeOpts.Els.Clone() } if eeOpts.SQL != nil { cln.SQL = eeOpts.SQL.Clone() } if eeOpts.Kafka != nil { cln.Kafka = eeOpts.Kafka.Clone() } if eeOpts.AMQP != nil { cln.AMQP = eeOpts.AMQP.Clone() } if eeOpts.AWS != nil { cln.AWS = eeOpts.AWS.Clone() } if eeOpts.NATS != nil { cln.NATS = eeOpts.NATS.Clone() } if eeOpts.RPC != nil { cln.RPC = eeOpts.RPC.Clone() } return cln } // Clone returns a deep copy of EventExporterCfg func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) { cln = &EventExporterCfg{ ID: eeC.ID, Type: eeC.Type, ExportPath: eeC.ExportPath, Timezone: eeC.Timezone, Flags: eeC.Flags.Clone(), AttributeSCtx: eeC.AttributeSCtx, Synchronous: eeC.Synchronous, Attempts: eeC.Attempts, ConcurrentRequests: eeC.ConcurrentRequests, MetricsResetSchedule: eeC.MetricsResetSchedule, Fields: make([]*FCTemplate, len(eeC.Fields)), headerFields: make([]*FCTemplate, len(eeC.headerFields)), contentFields: make([]*FCTemplate, len(eeC.contentFields)), trailerFields: make([]*FCTemplate, len(eeC.trailerFields)), Opts: eeC.Opts.Clone(), FailedPostsDir: eeC.FailedPostsDir, } if eeC.Filters != nil { cln.Filters = make([]string, len(eeC.Filters)) copy(cln.Filters, eeC.Filters) } if eeC.AttributeSIDs != nil { cln.AttributeSIDs = make([]string, len(eeC.AttributeSIDs)) copy(cln.AttributeSIDs, eeC.AttributeSIDs) } for idx, fld := range eeC.Fields { cln.Fields[idx] = fld.Clone() } for idx, fld := range eeC.headerFields { cln.headerFields[idx] = fld.Clone() } for idx, fld := range eeC.contentFields { cln.contentFields[idx] = fld.Clone() } for idx, fld := range eeC.trailerFields { cln.trailerFields[idx] = fld.Clone() } return } // AsMapInterface returns the config as a map[string]any func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[string]any) { opts := map[string]any{} if eeC.Opts.CSVFieldSeparator != nil { opts[utils.CSVFieldSepOpt] = *eeC.Opts.CSVFieldSeparator } if elsOpts := eeC.Opts.Els; elsOpts != nil { if elsOpts.Index != nil { opts[utils.ElsIndex] = *elsOpts.Index } if elsOpts.Refresh != nil { opts[utils.ElsRefresh] = *elsOpts.Refresh } if elsOpts.OpType != nil { opts[utils.ElsOpType] = *elsOpts.OpType } if elsOpts.Pipeline != nil { opts[utils.ElsPipeline] = *elsOpts.Pipeline } if elsOpts.Routing != nil { opts[utils.ElsRouting] = *elsOpts.Routing } if elsOpts.Timeout != nil { opts[utils.ElsTimeout] = elsOpts.Timeout.String() } if elsOpts.WaitForActiveShards != nil { opts[utils.ElsWaitForActiveShards] = *elsOpts.WaitForActiveShards } if elsOpts.CAPath != nil { opts[utils.ElsCAPath] = *elsOpts.CAPath } if elsOpts.DiscoverNodesOnStart != nil { opts[utils.ElsDiscoverNodesOnStart] = *elsOpts.DiscoverNodesOnStart } if elsOpts.DiscoverNodeInterval != nil { opts[utils.ElsDiscoverNodeInterval] = *elsOpts.DiscoverNodeInterval } if elsOpts.Cloud != nil { opts[utils.ElsCloud] = *elsOpts.Cloud } if elsOpts.APIKey != nil { opts[utils.ElsAPIKey] = *elsOpts.APIKey } if elsOpts.CertificateFingerprint != nil { opts[utils.ElsCertificateFingerprint] = *elsOpts.CertificateFingerprint } if elsOpts.ServiceToken != nil { opts[utils.ElsServiceToken] = *elsOpts.ServiceToken } if elsOpts.Username != nil { opts[utils.ElsUsername] = *elsOpts.Username } if elsOpts.Password != nil { opts[utils.ElsPassword] = *elsOpts.Password } if elsOpts.EnableDebugLogger != nil { opts[utils.ElsEnableDebugLogger] = *elsOpts.EnableDebugLogger } if elsOpts.Logger != nil { opts[utils.ElsLogger] = *elsOpts.Logger } if elsOpts.CompressRequestBody != nil { opts[utils.ElsCompressRequestBody] = *elsOpts.CompressRequestBody } if elsOpts.CompressRequestBodyLevel != nil { opts[utils.ElsCompressRequestBodyLevel] = *elsOpts.CompressRequestBodyLevel } if elsOpts.RetryOnStatus != nil { opts[utils.ElsRetryOnStatus] = *elsOpts.RetryOnStatus } if elsOpts.MaxRetries != nil { opts[utils.ElsMaxRetries] = *elsOpts.MaxRetries } if elsOpts.DisableRetry != nil { opts[utils.ElsDisableRetry] = *elsOpts.DisableRetry } } if sqlOpts := eeC.Opts.SQL; sqlOpts != nil { if sqlOpts.MaxIdleConns != nil { opts[utils.SQLMaxIdleConnsCfg] = *sqlOpts.MaxIdleConns } if sqlOpts.MaxOpenConns != nil { opts[utils.SQLMaxOpenConns] = *sqlOpts.MaxOpenConns } if sqlOpts.ConnMaxLifetime != nil { opts[utils.SQLConnMaxLifetime] = sqlOpts.ConnMaxLifetime.String() } if sqlOpts.MYSQLDSNParams != nil { opts[utils.MYSQLDSNParams] = sqlOpts.MYSQLDSNParams } if sqlOpts.TableName != nil { opts[utils.SQLTableNameOpt] = *sqlOpts.TableName } if sqlOpts.DBName != nil { opts[utils.SQLDBNameOpt] = *sqlOpts.DBName } if sqlOpts.UpdateIndexedFields != nil { updateIndexedFields := make([]string, len(*sqlOpts.UpdateIndexedFields)) copy(updateIndexedFields, *sqlOpts.UpdateIndexedFields) opts[utils.SQLUpdateIndexedFieldsOpt] = updateIndexedFields } if sqlOpts.PgSSLMode != nil { opts[utils.PgSSLModeCfg] = *sqlOpts.PgSSLMode } } if kafkaOpts := eeC.Opts.Kafka; kafkaOpts != nil { if kafkaOpts.Topic != nil { opts[utils.KafkaTopic] = *kafkaOpts.Topic } if kafkaOpts.BatchSize != nil { opts[utils.KafkaBatchSize] = *kafkaOpts.BatchSize } if kafkaOpts.TLS != nil { opts[utils.KafkaTLS] = *kafkaOpts.TLS } if kafkaOpts.CAPath != nil { opts[utils.KafkaCAPath] = *kafkaOpts.CAPath } if kafkaOpts.SkipTLSVerify != nil { opts[utils.KafkaSkipTLSVerify] = *kafkaOpts.SkipTLSVerify } } if amOpts := eeC.Opts.AMQP; amOpts != nil { if amOpts.QueueID != nil { opts[utils.AMQPQueueID] = *amOpts.QueueID } if amOpts.RoutingKey != nil { opts[utils.AMQPRoutingKey] = *amOpts.RoutingKey } if amOpts.Exchange != nil { opts[utils.AMQPExchange] = *amOpts.Exchange } if amOpts.ExchangeType != nil { opts[utils.AMQPExchangeType] = *amOpts.ExchangeType } if amOpts.Username != nil { opts[utils.AMQPUsername] = *amOpts.Username } if amOpts.Password != nil { opts[utils.AMQPPassword] = *amOpts.Password } } if awsOpts := eeC.Opts.AWS; awsOpts != nil { if awsOpts.Region != nil { opts[utils.AWSRegion] = *awsOpts.Region } if awsOpts.Key != nil { opts[utils.AWSKey] = *awsOpts.Key } if awsOpts.Secret != nil { opts[utils.AWSSecret] = *awsOpts.Secret } if awsOpts.Token != nil { opts[utils.AWSToken] = *awsOpts.Token } if awsOpts.SQSQueueID != nil { opts[utils.SQSQueueID] = *awsOpts.SQSQueueID } if awsOpts.SQSForcePathStyle != nil { opts[utils.SQSForcePathStyle] = *awsOpts.SQSForcePathStyle } if awsOpts.SQSSkipTlsVerify != nil { opts[utils.SQSSkipTlsVerify] = *awsOpts.SQSSkipTlsVerify } if awsOpts.S3BucketID != nil { opts[utils.S3Bucket] = *awsOpts.S3BucketID } if awsOpts.S3FolderPath != nil { opts[utils.S3FolderPath] = *awsOpts.S3FolderPath } if awsOpts.S3ForcePathStyle != nil { opts[utils.S3ForcePathStyle] = *awsOpts.S3ForcePathStyle } if awsOpts.S3SkipTlsVerify != nil { opts[utils.S3SkipTlsVerify] = *awsOpts.S3SkipTlsVerify } } if natOpts := eeC.Opts.NATS; natOpts != nil { if natOpts.JetStream != nil { opts[utils.NatsJetStream] = *natOpts.JetStream } if natOpts.Subject != nil { opts[utils.NatsSubject] = *natOpts.Subject } if natOpts.JWTFile != nil { opts[utils.NatsJWTFile] = *natOpts.JWTFile } if natOpts.SeedFile != nil { opts[utils.NatsSeedFile] = *natOpts.SeedFile } if natOpts.CertificateAuthority != nil { opts[utils.NatsCertificateAuthority] = *natOpts.CertificateAuthority } if natOpts.ClientCertificate != nil { opts[utils.NatsClientCertificate] = *natOpts.ClientCertificate } if natOpts.ClientKey != nil { opts[utils.NatsClientKey] = *natOpts.ClientKey } if natOpts.JetStreamMaxWait != nil { opts[utils.NatsJetStreamMaxWait] = natOpts.JetStreamMaxWait.String() } } if rpcOpts := eeC.Opts.RPC; rpcOpts != nil { if rpcOpts.RPCCodec != nil { opts[utils.RpcCodec] = *rpcOpts.RPCCodec } if rpcOpts.ServiceMethod != nil { opts[utils.ServiceMethod] = *rpcOpts.ServiceMethod } if rpcOpts.KeyPath != nil { opts[utils.KeyPath] = *rpcOpts.KeyPath } if rpcOpts.CertPath != nil { opts[utils.CertPath] = *rpcOpts.CertPath } if rpcOpts.CAPath != nil { opts[utils.CaPath] = *rpcOpts.CAPath } if rpcOpts.TLS != nil { opts[utils.Tls] = *rpcOpts.TLS } if rpcOpts.ConnIDs != nil { opts[utils.ConnIDs] = *rpcOpts.ConnIDs } if rpcOpts.RPCConnTimeout != nil { opts[utils.RpcConnTimeout] = rpcOpts.RPCConnTimeout.String() } if rpcOpts.RPCReplyTimeout != nil { opts[utils.RpcReplyTimeout] = rpcOpts.RPCReplyTimeout.String() } if rpcOpts.RPCAPIOpts != nil { opts[utils.RPCAPIOpts] = rpcOpts.RPCAPIOpts } } flgs := eeC.Flags.SliceFlags() if flgs == nil { flgs = []string{} } initialMP = map[string]any{ utils.IDCfg: eeC.ID, utils.TypeCfg: eeC.Type, utils.ExportPathCfg: eeC.ExportPath, utils.TimezoneCfg: eeC.Timezone, utils.FiltersCfg: eeC.Filters, utils.FlagsCfg: flgs, utils.AttributeContextCfg: eeC.AttributeSCtx, utils.AttributeIDsCfg: eeC.AttributeSIDs, utils.SynchronousCfg: eeC.Synchronous, utils.AttemptsCfg: eeC.Attempts, utils.ConcurrentRequestsCfg: eeC.ConcurrentRequests, utils.MetricsResetScheduleCfg: eeC.MetricsResetSchedule, utils.FailedPostsDirCfg: eeC.FailedPostsDir, utils.OptsCfg: opts, } if eeC.Fields != nil { fields := make([]map[string]any, 0, len(eeC.Fields)) for _, fld := range eeC.Fields { fields = append(fields, fld.AsMapInterface(separator)) } initialMP[utils.FieldsCfg] = fields } return } type FailedPostsCfg struct { Dir string TTL time.Duration StaticTTL bool } func (c *FailedPostsCfg) loadFromJSONCfg(jc *FailedPostsJsonCfg) error { if jc == nil { return nil } if jc.Dir != nil { c.Dir = *jc.Dir } if jc.TTL != nil { var err error if c.TTL, err = utils.ParseDurationWithNanosecs(*jc.TTL); err != nil { return err } } if jc.StaticTTL != nil { c.StaticTTL = *jc.StaticTTL } return nil } func (c *FailedPostsCfg) Clone() *FailedPostsCfg { if c == nil { return nil } return &FailedPostsCfg{ Dir: c.Dir, TTL: c.TTL, StaticTTL: c.StaticTTL, } } func (c *FailedPostsCfg) AsMapInterface() map[string]any { return map[string]any{ utils.DirCfg: c.Dir, utils.TTLCfg: c.TTL.String(), utils.StaticTTLCfg: c.StaticTTL, } }