diff --git a/config/eescfg.go b/config/eescfg.go index fde4d7212..272e4a745 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -165,6 +165,7 @@ type EventExporterOpts struct { SQLDBName *string PgSSLMode *string KafkaTopic *string + KafkaTLS *bool KafkaCAPath *string KafkaSkipTLSVerify *bool AMQPRoutingKey *string @@ -291,6 +292,9 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) if jsnCfg.KafkaTopic != nil { eeOpts.KafkaTopic = jsnCfg.KafkaTopic } + if jsnCfg.KafkaTLS != nil { + eeOpts.KafkaTLS = jsnCfg.KafkaTLS + } if jsnCfg.KafkaCAPath != nil { eeOpts.KafkaCAPath = jsnCfg.KafkaCAPath } @@ -556,6 +560,9 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts { if eeOpts.KafkaTopic != nil { cln.KafkaTopic = utils.StringPointer(*eeOpts.KafkaTopic) } + if eeOpts.KafkaTLS != nil { + cln.KafkaTLS = utils.BoolPointer(*eeOpts.KafkaTLS) + } if eeOpts.KafkaCAPath != nil { cln.KafkaCAPath = utils.StringPointer(*eeOpts.KafkaCAPath) } @@ -792,6 +799,9 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]interface{} { if optsEes.KafkaTopic != nil { opts[utils.KafkaTopic] = *optsEes.KafkaTopic } + if optsEes.KafkaTLS != nil { + opts[utils.KafkaTLS] = *optsEes.KafkaTLS + } if optsEes.KafkaCAPath != nil { opts[utils.KafkaCAPath] = *optsEes.KafkaCAPath } @@ -908,6 +918,7 @@ type EventExporterOptsJson struct { SQLDBName *string `json:"sqlDBName"` PgSSLMode *string `json:"pgSSLMode"` KafkaTopic *string `json:"kafkaTopic"` + KafkaTLS *bool `json:"kafkaTLS"` KafkaCAPath *string `json:"kafkaCAPath"` KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` AMQPQueueID *string `json:"amqpQueueID"` @@ -1116,6 +1127,14 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte } else { d.KafkaTopic = nil } + if v2.KafkaTLS != nil { + if v1.KafkaTLS == nil || + *v1.KafkaTLS != *v2.KafkaTLS { + d.KafkaTLS = v2.KafkaTLS + } + } else { + d.KafkaTLS = nil + } if v2.KafkaCAPath != nil { if v1.KafkaCAPath == nil || *v1.KafkaCAPath != *v2.KafkaCAPath { diff --git a/ees/kafka.go b/ees/kafka.go index f096fd14d..84c8f194e 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -43,6 +43,9 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE if cfg.Opts.KafkaTopic != nil { kfkPstr.topic = *cfg.Opts.KafkaTopic } + if cfg.Opts.KafkaTLS != nil && *cfg.Opts.KafkaTLS { + kfkPstr.TLS = true + } if cfg.Opts.KafkaCAPath != nil { kfkPstr.caPath = *cfg.Opts.KafkaCAPath } @@ -55,6 +58,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE // KafkaEE is a kafka poster type KafkaEE struct { topic string // identifier of the CDR queue where we publish + TLS bool // if true, it will attempt to authenticate the server caPath string // path to CA pem file skipTLSVerify bool // if true, it skips certificate validation writer *kafka.Writer @@ -77,31 +81,33 @@ func (pstr *KafkaEE) Connect() (err error) { MaxAttempts: pstr.Cfg().Attempts, } } - var rootCAs *x509.CertPool - if rootCAs, err = x509.SystemCertPool(); err != nil { - return - } - if rootCAs == nil { - rootCAs = x509.NewCertPool() - } - if pstr.caPath != "" { - var ca []byte - if ca, err = os.ReadFile(pstr.caPath); err != nil { + if pstr.TLS { + var rootCAs *x509.CertPool + if rootCAs, err = x509.SystemCertPool(); err != nil { return } - if !rootCAs.AppendCertsFromPEM(ca) { - return + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + if pstr.caPath != "" { + var ca []byte + if ca, err = os.ReadFile(pstr.caPath); err != nil { + return + } + if !rootCAs.AppendCertsFromPEM(ca) { + return + } + } + pstr.writer.Transport = &kafka.Transport{ + Dial: (&net.Dialer{ + Timeout: 3 * time.Second, + DualStack: true, + }).DialContext, + TLS: &tls.Config{ + RootCAs: rootCAs, + InsecureSkipVerify: pstr.skipTLSVerify, + }, } - } - pstr.writer.Transport = &kafka.Transport{ - Dial: (&net.Dialer{ - Timeout: 3 * time.Second, - DualStack: true, - }).DialContext, - TLS: &tls.Config{ - RootCAs: rootCAs, - InsecureSkipVerify: pstr.skipTLSVerify, - }, } pstr.Unlock() return