Add support for kafka reader ssl encryption

This commit is contained in:
ionutboangiu
2022-08-23 17:11:39 +03:00
committed by Dan Christian Bogos
parent f66f05d0e3
commit f3bd4b63c0
5 changed files with 194 additions and 8 deletions

View File

@@ -395,8 +395,14 @@ const CGRATES_CFG_JSON = `
// "kafkaTopic": "cgrates", // the topic from were the events are read
// "kafkaGroupID": "cgrates", // the group that reads the events
// "kafkaMaxWait": "1ms", // the maximum amount of time to wait for new data to come
// "kafkaTLS": false, // if set to true it will try to authenticate the server
// "kafkaCAPath": "",
// "kafkaSkipTLSVerify": false,
// "kafkaTopicProcessed": "", the topic were the events are sent after they are processed
// "kafkaTopicProcessed": "", //the topic were the events are sent after they are processed
// "kafkaTLSProcessed": false,
// "kafkaCAPathProcessed": "",
// "kafkaSkipTLSVerifyProcessed": false,
// SQL
// "sqlDBName": "cgrates", // the name of the database from were the events are read
@@ -525,6 +531,7 @@ const CGRATES_CFG_JSON = `
// Kafka
// "kafkaTopic": "cgrates", // the topic from where the events are exported
// "kafkaTLS": false, // if set to true it will try to authenticate the server
// "kafkaCAPath": "", // path to certificate authority pem file
// "kafkaSkipTLSVerify": false, // if set to true it will skip certificate verification

View File

@@ -149,7 +149,13 @@ type EventReaderOpts struct {
KafkaTopic *string
KafkaGroupID *string
KafkaMaxWait *time.Duration
KafkaTLS *bool
KafkaCAPath *string
KafkaSkipTLSVerify *bool
KafkaTopicProcessed *string
KafkaTLSProcessed *bool
KafkaCAPathProcessed *string
KafkaSkipTLSVerifyProcessed *bool
SQLDBName *string
SQLTableName *string
PgSSLMode *string
@@ -278,9 +284,27 @@ func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err
}
erOpts.KafkaMaxWait = utils.DurationPointer(kafkaMaxWait)
}
if jsnCfg.KafkaTLS != nil {
erOpts.KafkaTLS = jsnCfg.KafkaTLS
}
if jsnCfg.KafkaCAPath != nil {
erOpts.KafkaCAPath = jsnCfg.KafkaCAPath
}
if jsnCfg.KafkaSkipTLSVerify != nil {
erOpts.KafkaSkipTLSVerify = jsnCfg.KafkaSkipTLSVerify
}
if jsnCfg.KafkaTopicProcessed != nil {
erOpts.KafkaTopicProcessed = jsnCfg.KafkaTopicProcessed
}
if jsnCfg.KafkaTLSProcessed != nil {
erOpts.KafkaTLSProcessed = jsnCfg.KafkaTLSProcessed
}
if jsnCfg.KafkaCAPathProcessed != nil {
erOpts.KafkaCAPathProcessed = jsnCfg.KafkaCAPathProcessed
}
if jsnCfg.KafkaSkipTLSVerifyProcessed != nil {
erOpts.KafkaSkipTLSVerifyProcessed = jsnCfg.KafkaSkipTLSVerifyProcessed
}
if jsnCfg.SQLDBName != nil {
erOpts.SQLDBName = jsnCfg.SQLDBName
}
@@ -542,9 +566,27 @@ func (erOpts *EventReaderOpts) Clone() *EventReaderOpts {
if erOpts.KafkaMaxWait != nil {
cln.KafkaMaxWait = utils.DurationPointer(*erOpts.KafkaMaxWait)
}
if erOpts.KafkaTLS != nil {
cln.KafkaTLS = utils.BoolPointer(*erOpts.KafkaTLS)
}
if erOpts.KafkaCAPath != nil {
cln.KafkaCAPath = utils.StringPointer(*erOpts.KafkaCAPath)
}
if erOpts.KafkaSkipTLSVerify != nil {
cln.KafkaSkipTLSVerify = utils.BoolPointer(*erOpts.KafkaSkipTLSVerify)
}
if erOpts.KafkaTopicProcessed != nil {
cln.KafkaTopicProcessed = utils.StringPointer(*erOpts.KafkaTopicProcessed)
}
if erOpts.KafkaTLSProcessed != nil {
cln.KafkaTLSProcessed = utils.BoolPointer(*erOpts.KafkaTLSProcessed)
}
if erOpts.KafkaCAPathProcessed != nil {
cln.KafkaCAPathProcessed = utils.StringPointer(*erOpts.KafkaCAPathProcessed)
}
if erOpts.KafkaSkipTLSVerify != nil {
cln.KafkaSkipTLSVerify = utils.BoolPointer(*erOpts.KafkaSkipTLSVerify)
}
if erOpts.SQLDBName != nil {
cln.SQLDBName = utils.StringPointer(*erOpts.SQLDBName)
}
@@ -764,9 +806,27 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
if er.Opts.KafkaMaxWait != nil {
opts[utils.KafkaMaxWait] = er.Opts.KafkaMaxWait.String()
}
if er.Opts.KafkaTLS != nil {
opts[utils.KafkaTLS] = *er.Opts.KafkaTLS
}
if er.Opts.KafkaCAPath != nil {
opts[utils.KafkaCAPath] = *er.Opts.KafkaCAPath
}
if er.Opts.KafkaSkipTLSVerify != nil {
opts[utils.KafkaSkipTLSVerify] = *er.Opts.KafkaSkipTLSVerify
}
if er.Opts.KafkaTopicProcessed != nil {
opts[utils.KafkaTopicProcessedCfg] = *er.Opts.KafkaTopicProcessed
}
if er.Opts.KafkaTLSProcessed != nil {
opts[utils.KafkaTLSProcessedCfg] = *er.Opts.KafkaTLSProcessed
}
if er.Opts.KafkaCAPathProcessed != nil {
opts[utils.KafkaCAPathProcessedCfg] = *er.Opts.KafkaCAPathProcessed
}
if er.Opts.KafkaSkipTLSVerifyProcessed != nil {
opts[utils.KafkaSkipTLSVerifyProcessedCfg] = *er.Opts.KafkaSkipTLSVerifyProcessed
}
if er.Opts.SQLDBName != nil {
opts[utils.SQLDBNameOpt] = *er.Opts.SQLDBName
}
@@ -949,7 +1009,13 @@ type EventReaderOptsJson struct {
KafkaTopic *string `json:"kafkaTopic"`
KafkaGroupID *string `json:"kafkaGroupID"`
KafkaMaxWait *string `json:"kafkaMaxWait"`
KafkaTLS *bool `json:"kafkaTLS"`
KafkaCAPath *string `json:"kafkaCAPath"`
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`
KafkaTopicProcessed *string `json:"kafkaTopicProcessed"`
KafkaTLSProcessed *bool `json:"kafkaTLSProcessed"`
KafkaCAPathProcessed *string `json:"kafkaCAPathProcessed"`
KafkaSkipTLSVerifyProcessed *bool `json:"kafkaSkipTLSVerifyProcessed"`
SQLDBName *string `json:"sqlDBName"`
SQLTableName *string `json:"sqlTableName"`
PgSSLMode *string `json:"pgSSLMode"`
@@ -1179,6 +1245,30 @@ func diffEventReaderOptsJsonCfg(d *EventReaderOptsJson, v1, v2 *EventReaderOpts)
} else {
d.KafkaMaxWait = nil
}
if v2.KafkaTLS != nil {
if v1.KafkaTLS == nil ||
*v1.KafkaTLS != *v2.KafkaTLS {
d.KafkaTLS = v2.KafkaTLS
}
} else {
d.KafkaTLS = nil
}
if v2.KafkaCAPath != nil {
if v1.KafkaCAPath == nil ||
*v1.KafkaCAPath != *v2.KafkaCAPath {
d.KafkaCAPath = v2.KafkaCAPath
}
} else {
d.KafkaCAPath = nil
}
if v2.KafkaSkipTLSVerify != nil {
if v1.KafkaSkipTLSVerify == nil ||
*v1.KafkaSkipTLSVerify != *v2.KafkaSkipTLSVerify {
d.KafkaSkipTLSVerify = v2.KafkaSkipTLSVerify
}
} else {
d.KafkaSkipTLSVerify = nil
}
if v2.KafkaTopicProcessed != nil {
if v1.KafkaTopicProcessed == nil ||
*v1.KafkaTopicProcessed != *v2.KafkaTopicProcessed {
@@ -1187,6 +1277,30 @@ func diffEventReaderOptsJsonCfg(d *EventReaderOptsJson, v1, v2 *EventReaderOpts)
} else {
d.KafkaTopicProcessed = nil
}
if v2.KafkaTLSProcessed != nil {
if v1.KafkaTLSProcessed == nil ||
*v1.KafkaTLSProcessed != *v2.KafkaTLSProcessed {
d.KafkaTLSProcessed = v2.KafkaTLSProcessed
}
} else {
d.KafkaTLSProcessed = nil
}
if v2.KafkaCAPathProcessed != nil {
if v1.KafkaCAPathProcessed == nil ||
*v1.KafkaCAPathProcessed != *v2.KafkaCAPathProcessed {
d.KafkaCAPathProcessed = v2.KafkaCAPathProcessed
}
} else {
d.KafkaCAPathProcessed = nil
}
if v2.KafkaSkipTLSVerifyProcessed != nil {
if v1.KafkaSkipTLSVerifyProcessed == nil ||
*v1.KafkaSkipTLSVerifyProcessed != *v2.KafkaSkipTLSVerifyProcessed {
d.KafkaSkipTLSVerifyProcessed = v2.KafkaSkipTLSVerifyProcessed
}
} else {
d.KafkaSkipTLSVerifyProcessed = nil
}
if v2.SQLDBName != nil {
if v1.SQLDBName == nil ||
*v1.SQLDBName != *v2.SQLDBName {

View File

@@ -19,9 +19,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ers
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"os"
"time"
"github.com/cgrates/birpc/context"
@@ -71,10 +74,13 @@ type KafkaER struct {
fltrS *engine.FilterS
connMgr *engine.ConnManager
dialURL string
topic string
groupID string
maxWait time.Duration
dialURL string
topic string
groupID string
maxWait time.Duration
TLS bool // if true, it will attempt to authentica the server it connects to
caPath string // path to CA pem file
skipTLSVerify bool // if true, it skips certificate validation
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
@@ -92,12 +98,40 @@ func (rdr *KafkaER) Config() *config.EventReaderCfg {
// Serve will start the gorutines needed to watch the kafka topic
func (rdr *KafkaER) Serve() (err error) {
r := kafka.NewReader(kafka.ReaderConfig{
readerCfg := kafka.ReaderConfig{
Brokers: []string{rdr.dialURL},
GroupID: rdr.groupID,
Topic: rdr.topic,
MaxWait: rdr.maxWait,
})
}
if rdr.TLS {
var rootCAs *x509.CertPool
if rootCAs, err = x509.SystemCertPool(); err != nil {
return
}
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
if rdr.caPath != "" {
var ca []byte
if ca, err = os.ReadFile(rdr.caPath); err != nil {
return
}
if !rootCAs.AppendCertsFromPEM(ca) {
return
}
}
readerCfg.Dialer = &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{
RootCAs: rootCAs,
InsecureSkipVerify: rdr.skipTLSVerify,
},
}
}
r := kafka.NewReader(readerCfg)
if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API
return
@@ -203,6 +237,15 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) {
if opts.KafkaMaxWait != nil {
rdr.maxWait = *opts.KafkaMaxWait
}
if opts.KafkaTLS != nil && *opts.KafkaTLS {
rdr.TLS = true
}
if opts.KafkaCAPath != nil {
rdr.caPath = *opts.KafkaCAPath
}
if opts.KafkaSkipTLSVerify != nil && *opts.KafkaSkipTLSVerify {
rdr.skipTLSVerify = true
}
return
}

View File

@@ -83,6 +83,24 @@ func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExpo
}
eeOpts.KafkaTopic = erOpts.KafkaTopicProcessed
}
if erOpts.KafkaTLSProcessed != nil {
if eeOpts == nil {
eeOpts = new(config.EventExporterOpts)
}
eeOpts.KafkaTLS = erOpts.KafkaTLSProcessed
}
if erOpts.KafkaCAPathProcessed != nil {
if eeOpts == nil {
eeOpts = new(config.EventExporterOpts)
}
eeOpts.KafkaCAPath = erOpts.KafkaCAPathProcessed
}
if erOpts.KafkaSkipTLSVerifyProcessed != nil {
if eeOpts == nil {
eeOpts = new(config.EventExporterOpts)
}
eeOpts.KafkaSkipTLSVerify = erOpts.KafkaSkipTLSVerifyProcessed
}
if erOpts.NATSCertificateAuthorityProcessed != nil {
if eeOpts == nil {
eeOpts = new(config.EventExporterOpts)

View File

@@ -2467,6 +2467,7 @@ const (
KafkaDefaultMaxWait = time.Millisecond
KafkaTopic = "kafkaTopic"
KafkaTLS = "kafkaTLS"
KafkaCAPath = "kafkaCAPath"
KafkaSkipTLSVerify = "kafkaSkipTLSVerify"
KafkaGroupID = "kafkaGroupID"
@@ -2522,7 +2523,10 @@ const (
AMQPExchangeTypeProcessedCfg = "amqpExchangeTypeProcessed"
AMQPRoutingKeyProcessedCfg = "amqpRoutingKeyProcessed"
KafkaTopicProcessedCfg = "kafkaTopicProcessed"
KafkaTopicProcessedCfg = "kafkaTopicProcessed"
KafkaTLSProcessedCfg = "kafkaTLSProcessed"
KafkaCAPathProcessedCfg = "kafkaCAPathProcessed"
KafkaSkipTLSVerifyProcessedCfg = "kafkaSkipTLSVerifyProcessed"
SQLDBNameProcessedCfg = "sqlDBNameProcessed"
SQLTableNameProcessedCfg = "sqlTableNameProcessed"