Add kafka exporter option that signals whether or not to attempt TLS connection

This commit is contained in:
ionutboangiu
2022-08-23 17:13:54 +03:00
committed by Dan Christian Bogos
parent f3bd4b63c0
commit 5056400751
2 changed files with 47 additions and 22 deletions

View File

@@ -165,6 +165,7 @@ type EventExporterOpts struct {
SQLDBName *string
PgSSLMode *string
KafkaTopic *string
KafkaTLS *bool
KafkaCAPath *string
KafkaSkipTLSVerify *bool
AMQPRoutingKey *string
@@ -291,6 +292,9 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
if jsnCfg.KafkaTopic != nil {
eeOpts.KafkaTopic = jsnCfg.KafkaTopic
}
if jsnCfg.KafkaTLS != nil {
eeOpts.KafkaTLS = jsnCfg.KafkaTLS
}
if jsnCfg.KafkaCAPath != nil {
eeOpts.KafkaCAPath = jsnCfg.KafkaCAPath
}
@@ -556,6 +560,9 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
if eeOpts.KafkaTopic != nil {
cln.KafkaTopic = utils.StringPointer(*eeOpts.KafkaTopic)
}
if eeOpts.KafkaTLS != nil {
cln.KafkaTLS = utils.BoolPointer(*eeOpts.KafkaTLS)
}
if eeOpts.KafkaCAPath != nil {
cln.KafkaCAPath = utils.StringPointer(*eeOpts.KafkaCAPath)
}
@@ -792,6 +799,9 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]interface{} {
if optsEes.KafkaTopic != nil {
opts[utils.KafkaTopic] = *optsEes.KafkaTopic
}
if optsEes.KafkaTLS != nil {
opts[utils.KafkaTLS] = *optsEes.KafkaTLS
}
if optsEes.KafkaCAPath != nil {
opts[utils.KafkaCAPath] = *optsEes.KafkaCAPath
}
@@ -908,6 +918,7 @@ type EventExporterOptsJson struct {
SQLDBName *string `json:"sqlDBName"`
PgSSLMode *string `json:"pgSSLMode"`
KafkaTopic *string `json:"kafkaTopic"`
KafkaTLS *bool `json:"kafkaTLS"`
KafkaCAPath *string `json:"kafkaCAPath"`
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`
AMQPQueueID *string `json:"amqpQueueID"`
@@ -1116,6 +1127,14 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte
} else {
d.KafkaTopic = nil
}
if v2.KafkaTLS != nil {
if v1.KafkaTLS == nil ||
*v1.KafkaTLS != *v2.KafkaTLS {
d.KafkaTLS = v2.KafkaTLS
}
} else {
d.KafkaTLS = nil
}
if v2.KafkaCAPath != nil {
if v1.KafkaCAPath == nil ||
*v1.KafkaCAPath != *v2.KafkaCAPath {

View File

@@ -43,6 +43,9 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE
if cfg.Opts.KafkaTopic != nil {
kfkPstr.topic = *cfg.Opts.KafkaTopic
}
if cfg.Opts.KafkaTLS != nil && *cfg.Opts.KafkaTLS {
kfkPstr.TLS = true
}
if cfg.Opts.KafkaCAPath != nil {
kfkPstr.caPath = *cfg.Opts.KafkaCAPath
}
@@ -55,6 +58,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE
// KafkaEE is a kafka poster
type KafkaEE struct {
topic string // identifier of the CDR queue where we publish
TLS bool // if true, it will attempt to authenticate the server
caPath string // path to CA pem file
skipTLSVerify bool // if true, it skips certificate validation
writer *kafka.Writer
@@ -77,31 +81,33 @@ func (pstr *KafkaEE) Connect() (err error) {
MaxAttempts: pstr.Cfg().Attempts,
}
}
var rootCAs *x509.CertPool
if rootCAs, err = x509.SystemCertPool(); err != nil {
return
}
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
if pstr.caPath != "" {
var ca []byte
if ca, err = os.ReadFile(pstr.caPath); err != nil {
if pstr.TLS {
var rootCAs *x509.CertPool
if rootCAs, err = x509.SystemCertPool(); err != nil {
return
}
if !rootCAs.AppendCertsFromPEM(ca) {
return
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
if pstr.caPath != "" {
var ca []byte
if ca, err = os.ReadFile(pstr.caPath); err != nil {
return
}
if !rootCAs.AppendCertsFromPEM(ca) {
return
}
}
pstr.writer.Transport = &kafka.Transport{
Dial: (&net.Dialer{
Timeout: 3 * time.Second,
DualStack: true,
}).DialContext,
TLS: &tls.Config{
RootCAs: rootCAs,
InsecureSkipVerify: pstr.skipTLSVerify,
},
}
}
pstr.writer.Transport = &kafka.Transport{
Dial: (&net.Dialer{
Timeout: 3 * time.Second,
DualStack: true,
}).DialContext,
TLS: &tls.Config{
RootCAs: rootCAs,
InsecureSkipVerify: pstr.skipTLSVerify,
},
}
pstr.Unlock()
return