From f3bd4b63c0bced09b3e7784c53cec7bd2c6da174 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 23 Aug 2022 17:11:39 +0300 Subject: [PATCH] Add support for kafka reader ssl encryption --- config/config_defaults.go | 9 ++- config/erscfg.go | 114 ++++++++++++++++++++++++++++++++++++++ ers/kafka.go | 55 ++++++++++++++++-- ers/libers.go | 18 ++++++ utils/consts.go | 6 +- 5 files changed, 194 insertions(+), 8 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 311bc3928..a9c3c1ad2 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -395,8 +395,14 @@ const CGRATES_CFG_JSON = ` // "kafkaTopic": "cgrates", // the topic from were the events are read // "kafkaGroupID": "cgrates", // the group that reads the events // "kafkaMaxWait": "1ms", // the maximum amount of time to wait for new data to come + // "kafkaTLS": false, // if set to true it will try to authenticate the server + // "kafkaCAPath": "", + // "kafkaSkipTLSVerify": false, - // "kafkaTopicProcessed": "", the topic were the events are sent after they are processed + // "kafkaTopicProcessed": "", //the topic were the events are sent after they are processed + // "kafkaTLSProcessed": false, + // "kafkaCAPathProcessed": "", + // "kafkaSkipTLSVerifyProcessed": false, // SQL // "sqlDBName": "cgrates", // the name of the database from were the events are read @@ -525,6 +531,7 @@ const CGRATES_CFG_JSON = ` // Kafka // "kafkaTopic": "cgrates", // the topic from where the events are exported + // "kafkaTLS": false, // if set to true it will try to authenticate the server // "kafkaCAPath": "", // path to certificate authority pem file // "kafkaSkipTLSVerify": false, // if set to true it will skip certificate verification diff --git a/config/erscfg.go b/config/erscfg.go index 79641758a..87812acda 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -149,7 +149,13 @@ type EventReaderOpts struct { KafkaTopic *string KafkaGroupID *string KafkaMaxWait *time.Duration + KafkaTLS *bool + KafkaCAPath *string + KafkaSkipTLSVerify *bool KafkaTopicProcessed *string + KafkaTLSProcessed *bool + KafkaCAPathProcessed *string + KafkaSkipTLSVerifyProcessed *bool SQLDBName *string SQLTableName *string PgSSLMode *string @@ -278,9 +284,27 @@ func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err } erOpts.KafkaMaxWait = utils.DurationPointer(kafkaMaxWait) } + if jsnCfg.KafkaTLS != nil { + erOpts.KafkaTLS = jsnCfg.KafkaTLS + } + if jsnCfg.KafkaCAPath != nil { + erOpts.KafkaCAPath = jsnCfg.KafkaCAPath + } + if jsnCfg.KafkaSkipTLSVerify != nil { + erOpts.KafkaSkipTLSVerify = jsnCfg.KafkaSkipTLSVerify + } if jsnCfg.KafkaTopicProcessed != nil { erOpts.KafkaTopicProcessed = jsnCfg.KafkaTopicProcessed } + if jsnCfg.KafkaTLSProcessed != nil { + erOpts.KafkaTLSProcessed = jsnCfg.KafkaTLSProcessed + } + if jsnCfg.KafkaCAPathProcessed != nil { + erOpts.KafkaCAPathProcessed = jsnCfg.KafkaCAPathProcessed + } + if jsnCfg.KafkaSkipTLSVerifyProcessed != nil { + erOpts.KafkaSkipTLSVerifyProcessed = jsnCfg.KafkaSkipTLSVerifyProcessed + } if jsnCfg.SQLDBName != nil { erOpts.SQLDBName = jsnCfg.SQLDBName } @@ -542,9 +566,27 @@ func (erOpts *EventReaderOpts) Clone() *EventReaderOpts { if erOpts.KafkaMaxWait != nil { cln.KafkaMaxWait = utils.DurationPointer(*erOpts.KafkaMaxWait) } + if erOpts.KafkaTLS != nil { + cln.KafkaTLS = utils.BoolPointer(*erOpts.KafkaTLS) + } + if erOpts.KafkaCAPath != nil { + cln.KafkaCAPath = utils.StringPointer(*erOpts.KafkaCAPath) + } + if erOpts.KafkaSkipTLSVerify != nil { + cln.KafkaSkipTLSVerify = utils.BoolPointer(*erOpts.KafkaSkipTLSVerify) + } if erOpts.KafkaTopicProcessed != nil { cln.KafkaTopicProcessed = utils.StringPointer(*erOpts.KafkaTopicProcessed) } + if erOpts.KafkaTLSProcessed != nil { + cln.KafkaTLSProcessed = utils.BoolPointer(*erOpts.KafkaTLSProcessed) + } + if erOpts.KafkaCAPathProcessed != nil { + cln.KafkaCAPathProcessed = utils.StringPointer(*erOpts.KafkaCAPathProcessed) + } + if erOpts.KafkaSkipTLSVerify != nil { + cln.KafkaSkipTLSVerify = utils.BoolPointer(*erOpts.KafkaSkipTLSVerify) + } if erOpts.SQLDBName != nil { cln.SQLDBName = utils.StringPointer(*erOpts.SQLDBName) } @@ -764,9 +806,27 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string if er.Opts.KafkaMaxWait != nil { opts[utils.KafkaMaxWait] = er.Opts.KafkaMaxWait.String() } + if er.Opts.KafkaTLS != nil { + opts[utils.KafkaTLS] = *er.Opts.KafkaTLS + } + if er.Opts.KafkaCAPath != nil { + opts[utils.KafkaCAPath] = *er.Opts.KafkaCAPath + } + if er.Opts.KafkaSkipTLSVerify != nil { + opts[utils.KafkaSkipTLSVerify] = *er.Opts.KafkaSkipTLSVerify + } if er.Opts.KafkaTopicProcessed != nil { opts[utils.KafkaTopicProcessedCfg] = *er.Opts.KafkaTopicProcessed } + if er.Opts.KafkaTLSProcessed != nil { + opts[utils.KafkaTLSProcessedCfg] = *er.Opts.KafkaTLSProcessed + } + if er.Opts.KafkaCAPathProcessed != nil { + opts[utils.KafkaCAPathProcessedCfg] = *er.Opts.KafkaCAPathProcessed + } + if er.Opts.KafkaSkipTLSVerifyProcessed != nil { + opts[utils.KafkaSkipTLSVerifyProcessedCfg] = *er.Opts.KafkaSkipTLSVerifyProcessed + } if er.Opts.SQLDBName != nil { opts[utils.SQLDBNameOpt] = *er.Opts.SQLDBName } @@ -949,7 +1009,13 @@ type EventReaderOptsJson struct { KafkaTopic *string `json:"kafkaTopic"` KafkaGroupID *string `json:"kafkaGroupID"` KafkaMaxWait *string `json:"kafkaMaxWait"` + KafkaTLS *bool `json:"kafkaTLS"` + KafkaCAPath *string `json:"kafkaCAPath"` + KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` KafkaTopicProcessed *string `json:"kafkaTopicProcessed"` + KafkaTLSProcessed *bool `json:"kafkaTLSProcessed"` + KafkaCAPathProcessed *string `json:"kafkaCAPathProcessed"` + KafkaSkipTLSVerifyProcessed *bool `json:"kafkaSkipTLSVerifyProcessed"` SQLDBName *string `json:"sqlDBName"` SQLTableName *string `json:"sqlTableName"` PgSSLMode *string `json:"pgSSLMode"` @@ -1179,6 +1245,30 @@ func diffEventReaderOptsJsonCfg(d *EventReaderOptsJson, v1, v2 *EventReaderOpts) } else { d.KafkaMaxWait = nil } + if v2.KafkaTLS != nil { + if v1.KafkaTLS == nil || + *v1.KafkaTLS != *v2.KafkaTLS { + d.KafkaTLS = v2.KafkaTLS + } + } else { + d.KafkaTLS = nil + } + if v2.KafkaCAPath != nil { + if v1.KafkaCAPath == nil || + *v1.KafkaCAPath != *v2.KafkaCAPath { + d.KafkaCAPath = v2.KafkaCAPath + } + } else { + d.KafkaCAPath = nil + } + if v2.KafkaSkipTLSVerify != nil { + if v1.KafkaSkipTLSVerify == nil || + *v1.KafkaSkipTLSVerify != *v2.KafkaSkipTLSVerify { + d.KafkaSkipTLSVerify = v2.KafkaSkipTLSVerify + } + } else { + d.KafkaSkipTLSVerify = nil + } if v2.KafkaTopicProcessed != nil { if v1.KafkaTopicProcessed == nil || *v1.KafkaTopicProcessed != *v2.KafkaTopicProcessed { @@ -1187,6 +1277,30 @@ func diffEventReaderOptsJsonCfg(d *EventReaderOptsJson, v1, v2 *EventReaderOpts) } else { d.KafkaTopicProcessed = nil } + if v2.KafkaTLSProcessed != nil { + if v1.KafkaTLSProcessed == nil || + *v1.KafkaTLSProcessed != *v2.KafkaTLSProcessed { + d.KafkaTLSProcessed = v2.KafkaTLSProcessed + } + } else { + d.KafkaTLSProcessed = nil + } + if v2.KafkaCAPathProcessed != nil { + if v1.KafkaCAPathProcessed == nil || + *v1.KafkaCAPathProcessed != *v2.KafkaCAPathProcessed { + d.KafkaCAPathProcessed = v2.KafkaCAPathProcessed + } + } else { + d.KafkaCAPathProcessed = nil + } + if v2.KafkaSkipTLSVerifyProcessed != nil { + if v1.KafkaSkipTLSVerifyProcessed == nil || + *v1.KafkaSkipTLSVerifyProcessed != *v2.KafkaSkipTLSVerifyProcessed { + d.KafkaSkipTLSVerifyProcessed = v2.KafkaSkipTLSVerifyProcessed + } + } else { + d.KafkaSkipTLSVerifyProcessed = nil + } if v2.SQLDBName != nil { if v1.SQLDBName == nil || *v1.SQLDBName != *v2.SQLDBName { diff --git a/ers/kafka.go b/ers/kafka.go index c578a05a0..5ce7230f1 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -19,9 +19,12 @@ along with this program. If not, see package ers import ( + "crypto/tls" + "crypto/x509" "encoding/json" "fmt" "io" + "os" "time" "github.com/cgrates/birpc/context" @@ -71,10 +74,13 @@ type KafkaER struct { fltrS *engine.FilterS connMgr *engine.ConnManager - dialURL string - topic string - groupID string - maxWait time.Duration + dialURL string + topic string + groupID string + maxWait time.Duration + TLS bool // if true, it will attempt to authentica the server it connects to + caPath string // path to CA pem file + skipTLSVerify bool // if true, it skips certificate validation rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to @@ -92,12 +98,40 @@ func (rdr *KafkaER) Config() *config.EventReaderCfg { // Serve will start the gorutines needed to watch the kafka topic func (rdr *KafkaER) Serve() (err error) { - r := kafka.NewReader(kafka.ReaderConfig{ + readerCfg := kafka.ReaderConfig{ Brokers: []string{rdr.dialURL}, GroupID: rdr.groupID, Topic: rdr.topic, MaxWait: rdr.maxWait, - }) + } + if rdr.TLS { + var rootCAs *x509.CertPool + if rootCAs, err = x509.SystemCertPool(); err != nil { + return + } + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + if rdr.caPath != "" { + var ca []byte + if ca, err = os.ReadFile(rdr.caPath); err != nil { + return + } + if !rootCAs.AppendCertsFromPEM(ca) { + return + } + } + readerCfg.Dialer = &kafka.Dialer{ + Timeout: 10 * time.Second, + DualStack: true, + TLS: &tls.Config{ + RootCAs: rootCAs, + InsecureSkipVerify: rdr.skipTLSVerify, + }, + } + } + + r := kafka.NewReader(readerCfg) if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API return @@ -203,6 +237,15 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) { if opts.KafkaMaxWait != nil { rdr.maxWait = *opts.KafkaMaxWait } + if opts.KafkaTLS != nil && *opts.KafkaTLS { + rdr.TLS = true + } + if opts.KafkaCAPath != nil { + rdr.caPath = *opts.KafkaCAPath + } + if opts.KafkaSkipTLSVerify != nil && *opts.KafkaSkipTLSVerify { + rdr.skipTLSVerify = true + } return } diff --git a/ers/libers.go b/ers/libers.go index c23643826..717e9cbf7 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -83,6 +83,24 @@ func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExpo } eeOpts.KafkaTopic = erOpts.KafkaTopicProcessed } + if erOpts.KafkaTLSProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.KafkaTLS = erOpts.KafkaTLSProcessed + } + if erOpts.KafkaCAPathProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.KafkaCAPath = erOpts.KafkaCAPathProcessed + } + if erOpts.KafkaSkipTLSVerifyProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + eeOpts.KafkaSkipTLSVerify = erOpts.KafkaSkipTLSVerifyProcessed + } if erOpts.NATSCertificateAuthorityProcessed != nil { if eeOpts == nil { eeOpts = new(config.EventExporterOpts) diff --git a/utils/consts.go b/utils/consts.go index 167e18d2d..95624a300 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2467,6 +2467,7 @@ const ( KafkaDefaultMaxWait = time.Millisecond KafkaTopic = "kafkaTopic" + KafkaTLS = "kafkaTLS" KafkaCAPath = "kafkaCAPath" KafkaSkipTLSVerify = "kafkaSkipTLSVerify" KafkaGroupID = "kafkaGroupID" @@ -2522,7 +2523,10 @@ const ( AMQPExchangeTypeProcessedCfg = "amqpExchangeTypeProcessed" AMQPRoutingKeyProcessedCfg = "amqpRoutingKeyProcessed" - KafkaTopicProcessedCfg = "kafkaTopicProcessed" + KafkaTopicProcessedCfg = "kafkaTopicProcessed" + KafkaTLSProcessedCfg = "kafkaTLSProcessed" + KafkaCAPathProcessedCfg = "kafkaCAPathProcessed" + KafkaSkipTLSVerifyProcessedCfg = "kafkaSkipTLSVerifyProcessed" SQLDBNameProcessedCfg = "sqlDBNameProcessed" SQLTableNameProcessedCfg = "sqlTableNameProcessed"