From f66f05d0e3ff8d539646566ffb223664f07062ca Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 18 Aug 2022 17:45:37 +0300 Subject: [PATCH] Add support for kafka ssl encryption --- config/config_defaults.go | 3 ++- config/eescfg.go | 38 +++++++++++++++++++++++++++++ config/eescfg_test.go | 18 +++++++++++++- ees/kafka.go | 50 ++++++++++++++++++++++++++++++++------- utils/consts.go | 8 ++++--- 5 files changed, 104 insertions(+), 13 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index be662d966..311bc3928 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -525,7 +525,8 @@ const CGRATES_CFG_JSON = ` // Kafka // "kafkaTopic": "cgrates", // the topic from where the events are exported - // "kafkaSkipTLSVerification": false, // if set to true it will skip certificate verification + // "kafkaCAPath": "", // path to certificate authority pem file + // "kafkaSkipTLSVerify": false, // if set to true it will skip certificate verification // AMQP diff --git a/config/eescfg.go b/config/eescfg.go index ed0b41685..fde4d7212 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -165,6 +165,8 @@ type EventExporterOpts struct { SQLDBName *string PgSSLMode *string KafkaTopic *string + KafkaCAPath *string + KafkaSkipTLSVerify *bool AMQPRoutingKey *string AMQPQueueID *string AMQPExchange *string @@ -289,6 +291,12 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) if jsnCfg.KafkaTopic != nil { eeOpts.KafkaTopic = jsnCfg.KafkaTopic } + if jsnCfg.KafkaCAPath != nil { + eeOpts.KafkaCAPath = jsnCfg.KafkaCAPath + } + if jsnCfg.KafkaSkipTLSVerify != nil { + eeOpts.KafkaSkipTLSVerify = jsnCfg.KafkaSkipTLSVerify + } if jsnCfg.AMQPQueueID != nil { eeOpts.AMQPQueueID = jsnCfg.AMQPQueueID } @@ -548,6 +556,12 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts { if eeOpts.KafkaTopic != nil { cln.KafkaTopic = utils.StringPointer(*eeOpts.KafkaTopic) } + if eeOpts.KafkaCAPath != nil { + cln.KafkaCAPath = utils.StringPointer(*eeOpts.KafkaCAPath) + } + if eeOpts.KafkaSkipTLSVerify != nil { + cln.KafkaSkipTLSVerify = utils.BoolPointer(*eeOpts.KafkaSkipTLSVerify) + } if eeOpts.AMQPQueueID != nil { cln.AMQPQueueID = utils.StringPointer(*eeOpts.AMQPQueueID) } @@ -778,6 +792,12 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]interface{} { if optsEes.KafkaTopic != nil { opts[utils.KafkaTopic] = *optsEes.KafkaTopic } + if optsEes.KafkaCAPath != nil { + opts[utils.KafkaCAPath] = *optsEes.KafkaCAPath + } + if optsEes.KafkaSkipTLSVerify != nil { + opts[utils.KafkaSkipTLSVerify] = *optsEes.KafkaSkipTLSVerify + } if optsEes.AMQPQueueID != nil { opts[utils.AMQPQueueID] = *optsEes.AMQPQueueID } @@ -888,6 +908,8 @@ type EventExporterOptsJson struct { SQLDBName *string `json:"sqlDBName"` PgSSLMode *string `json:"pgSSLMode"` KafkaTopic *string `json:"kafkaTopic"` + KafkaCAPath *string `json:"kafkaCAPath"` + KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` AMQPQueueID *string `json:"amqpQueueID"` AMQPRoutingKey *string `json:"amqpRoutingKey"` AMQPExchange *string `json:"amqpExchange"` @@ -1094,6 +1116,22 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte } else { d.KafkaTopic = nil } + if v2.KafkaCAPath != nil { + if v1.KafkaCAPath == nil || + *v1.KafkaCAPath != *v2.KafkaCAPath { + d.KafkaCAPath = v2.KafkaCAPath + } + } else { + d.KafkaCAPath = nil + } + if v2.KafkaSkipTLSVerify != nil { + if v1.KafkaSkipTLSVerify == nil || + *v1.KafkaSkipTLSVerify != *v2.KafkaSkipTLSVerify { + d.KafkaSkipTLSVerify = v2.KafkaSkipTLSVerify + } + } else { + d.KafkaSkipTLSVerify = nil + } if v2.AMQPQueueID != nil { if v1.AMQPQueueID == nil || *v1.AMQPQueueID != *v2.AMQPQueueID { diff --git a/config/eescfg_test.go b/config/eescfg_test.go index aab833906..065072321 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -1194,6 +1194,8 @@ func TestDiffEventExporterOptsJsonCfg(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaCAPath: utils.StringPointer("kafkaCAPath"), + KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), AMQPQueueID: utils.StringPointer("queue_id"), AMQPExchange: utils.StringPointer("amqp_exchange"), @@ -1242,6 +1244,8 @@ func TestDiffEventExporterOptsJsonCfg(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaCAPath: utils.StringPointer("kafkaCAPath"), + KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), AMQPQueueID: utils.StringPointer("queue_id"), AMQPExchange: utils.StringPointer("amqp_exchange"), @@ -1297,6 +1301,8 @@ func TestEventExporterOptsClone(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaCAPath: utils.StringPointer("kafkaCAPath"), + KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), AMQPQueueID: utils.StringPointer("queue_id"), AMQPExchange: utils.StringPointer("amqp_exchange"), @@ -1345,6 +1351,8 @@ func TestEventExporterOptsClone(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaCAPath: utils.StringPointer("kafkaCAPath"), + KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), AMQPQueueID: utils.StringPointer("queue_id"), AMQPExchange: utils.StringPointer("amqp_exchange"), @@ -1401,6 +1409,8 @@ func TestLoadFromJSONCfg(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaCAPath: utils.StringPointer("kafkaCAPath"), + KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), AMQPQueueID: utils.StringPointer("queue_id"), AMQPExchange: utils.StringPointer("amqp_exchange"), @@ -1449,6 +1459,8 @@ func TestLoadFromJSONCfg(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaCAPath: utils.StringPointer("kafkaCAPath"), + KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), AMQPQueueID: utils.StringPointer("queue_id"), AMQPExchange: utils.StringPointer("amqp_exchange"), @@ -1604,6 +1616,8 @@ func TestEEsAsMapInterface(t *testing.T) { SQLDBName: utils.StringPointer("cgrates"), PgSSLMode: utils.StringPointer("sslm"), KafkaTopic: utils.StringPointer("topic1"), + KafkaCAPath: utils.StringPointer("kafkaCAPath"), + KafkaSkipTLSVerify: utils.BoolPointer(false), AMQPRoutingKey: utils.StringPointer("routing_key"), AMQPQueueID: utils.StringPointer("queue_id"), AMQPExchange: utils.StringPointer("amqp_exchange"), @@ -1659,6 +1673,8 @@ func TestEEsAsMapInterface(t *testing.T) { "elsVersionType": "version_type", "elsWaitForActiveShards": "wfas", "kafkaTopic": "topic1", + "kafkaCAPath": "kafkaCAPath", + "kafkaSkipTLSVerify": false, "keyPath": "/path/to/key", "natsCertificateAuthority": "ca", "natsClientCertificate": "cc", @@ -1687,6 +1703,6 @@ func TestEEsAsMapInterface(t *testing.T) { rcv := eeCfg.AsMapInterface(",") if !reflect.DeepEqual(exp[utils.OptsCfg], rcv[utils.OptsCfg]) { - t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(exp), utils.ToJSON(rcv)) + t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(exp["opts"]), utils.ToJSON(rcv["opts"])) } } diff --git a/ees/kafka.go b/ees/kafka.go index 73b7874e8..f096fd14d 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -18,7 +18,12 @@ along with this program. If not, see package ees import ( + "crypto/tls" + "crypto/x509" + "net" + "os" "sync" + "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" @@ -38,13 +43,21 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE if cfg.Opts.KafkaTopic != nil { kfkPstr.topic = *cfg.Opts.KafkaTopic } + if cfg.Opts.KafkaCAPath != nil { + kfkPstr.caPath = *cfg.Opts.KafkaCAPath + } + if cfg.Opts.KafkaSkipTLSVerify != nil && *cfg.Opts.KafkaSkipTLSVerify { + kfkPstr.skipTLSVerify = true + } return kfkPstr } // KafkaEE is a kafka poster type KafkaEE struct { - topic string // identifier of the CDR queue where we publish - writer *kafka.Writer + topic string // identifier of the CDR queue where we publish + caPath string // path to CA pem file + skipTLSVerify bool // if true, it skips certificate validation + writer *kafka.Writer cfg *config.EventExporterCfg dc *utils.SafeMapStorage @@ -55,7 +68,7 @@ type KafkaEE struct { func (pstr *KafkaEE) Cfg() *config.EventExporterCfg { return pstr.cfg } -func (pstr *KafkaEE) Connect() (_ error) { +func (pstr *KafkaEE) Connect() (err error) { pstr.Lock() if pstr.writer == nil { pstr.writer = &kafka.Writer{ @@ -63,11 +76,32 @@ func (pstr *KafkaEE) Connect() (_ error) { Topic: pstr.topic, MaxAttempts: pstr.Cfg().Attempts, } - // pstr.writer = kafka.NewWriter(kafka.WriterConfig{ - // Brokers: []string{pstr.Cfg().ExportPath}, - // MaxAttempts: pstr.Cfg().Attempts, - // Topic: pstr.topic, - // }) + } + var rootCAs *x509.CertPool + if rootCAs, err = x509.SystemCertPool(); err != nil { + return + } + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + if pstr.caPath != "" { + var ca []byte + if ca, err = os.ReadFile(pstr.caPath); err != nil { + return + } + if !rootCAs.AppendCertsFromPEM(ca) { + return + } + } + pstr.writer.Transport = &kafka.Transport{ + Dial: (&net.Dialer{ + Timeout: 3 * time.Second, + DualStack: true, + }).DialContext, + TLS: &tls.Config{ + RootCAs: rootCAs, + InsecureSkipVerify: pstr.skipTLSVerify, + }, } pstr.Unlock() return diff --git a/utils/consts.go b/utils/consts.go index c1fb96c4c..167e18d2d 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2466,9 +2466,11 @@ const ( KafkaDefaultGroupID = "cgrates" KafkaDefaultMaxWait = time.Millisecond - KafkaTopic = "kafkaTopic" - KafkaGroupID = "kafkaGroupID" - KafkaMaxWait = "kafkaMaxWait" + KafkaTopic = "kafkaTopic" + KafkaCAPath = "kafkaCAPath" + KafkaSkipTLSVerify = "kafkaSkipTLSVerify" + KafkaGroupID = "kafkaGroupID" + KafkaMaxWait = "kafkaMaxWait" // partial PartialOpt = "*partial"