Add support for kafka ssl encryption

This commit is contained in:
ionutboangiu
2022-08-18 17:45:37 +03:00
committed by Dan Christian Bogos
parent 0653e9b5d7
commit f66f05d0e3
5 changed files with 104 additions and 13 deletions

View File

@@ -525,7 +525,8 @@ const CGRATES_CFG_JSON = `
// Kafka
// "kafkaTopic": "cgrates", // the topic from where the events are exported
// "kafkaSkipTLSVerification": false, // if set to true it will skip certificate verification
// "kafkaCAPath": "", // path to certificate authority pem file
// "kafkaSkipTLSVerify": false, // if set to true it will skip certificate verification
// AMQP

View File

@@ -165,6 +165,8 @@ type EventExporterOpts struct {
SQLDBName *string
PgSSLMode *string
KafkaTopic *string
KafkaCAPath *string
KafkaSkipTLSVerify *bool
AMQPRoutingKey *string
AMQPQueueID *string
AMQPExchange *string
@@ -289,6 +291,12 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
if jsnCfg.KafkaTopic != nil {
eeOpts.KafkaTopic = jsnCfg.KafkaTopic
}
if jsnCfg.KafkaCAPath != nil {
eeOpts.KafkaCAPath = jsnCfg.KafkaCAPath
}
if jsnCfg.KafkaSkipTLSVerify != nil {
eeOpts.KafkaSkipTLSVerify = jsnCfg.KafkaSkipTLSVerify
}
if jsnCfg.AMQPQueueID != nil {
eeOpts.AMQPQueueID = jsnCfg.AMQPQueueID
}
@@ -548,6 +556,12 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
if eeOpts.KafkaTopic != nil {
cln.KafkaTopic = utils.StringPointer(*eeOpts.KafkaTopic)
}
if eeOpts.KafkaCAPath != nil {
cln.KafkaCAPath = utils.StringPointer(*eeOpts.KafkaCAPath)
}
if eeOpts.KafkaSkipTLSVerify != nil {
cln.KafkaSkipTLSVerify = utils.BoolPointer(*eeOpts.KafkaSkipTLSVerify)
}
if eeOpts.AMQPQueueID != nil {
cln.AMQPQueueID = utils.StringPointer(*eeOpts.AMQPQueueID)
}
@@ -778,6 +792,12 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]interface{} {
if optsEes.KafkaTopic != nil {
opts[utils.KafkaTopic] = *optsEes.KafkaTopic
}
if optsEes.KafkaCAPath != nil {
opts[utils.KafkaCAPath] = *optsEes.KafkaCAPath
}
if optsEes.KafkaSkipTLSVerify != nil {
opts[utils.KafkaSkipTLSVerify] = *optsEes.KafkaSkipTLSVerify
}
if optsEes.AMQPQueueID != nil {
opts[utils.AMQPQueueID] = *optsEes.AMQPQueueID
}
@@ -888,6 +908,8 @@ type EventExporterOptsJson struct {
SQLDBName *string `json:"sqlDBName"`
PgSSLMode *string `json:"pgSSLMode"`
KafkaTopic *string `json:"kafkaTopic"`
KafkaCAPath *string `json:"kafkaCAPath"`
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`
AMQPQueueID *string `json:"amqpQueueID"`
AMQPRoutingKey *string `json:"amqpRoutingKey"`
AMQPExchange *string `json:"amqpExchange"`
@@ -1094,6 +1116,22 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte
} else {
d.KafkaTopic = nil
}
if v2.KafkaCAPath != nil {
if v1.KafkaCAPath == nil ||
*v1.KafkaCAPath != *v2.KafkaCAPath {
d.KafkaCAPath = v2.KafkaCAPath
}
} else {
d.KafkaCAPath = nil
}
if v2.KafkaSkipTLSVerify != nil {
if v1.KafkaSkipTLSVerify == nil ||
*v1.KafkaSkipTLSVerify != *v2.KafkaSkipTLSVerify {
d.KafkaSkipTLSVerify = v2.KafkaSkipTLSVerify
}
} else {
d.KafkaSkipTLSVerify = nil
}
if v2.AMQPQueueID != nil {
if v1.AMQPQueueID == nil ||
*v1.AMQPQueueID != *v2.AMQPQueueID {

View File

@@ -1194,6 +1194,8 @@ func TestDiffEventExporterOptsJsonCfg(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPExchange: utils.StringPointer("amqp_exchange"),
@@ -1242,6 +1244,8 @@ func TestDiffEventExporterOptsJsonCfg(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPExchange: utils.StringPointer("amqp_exchange"),
@@ -1297,6 +1301,8 @@ func TestEventExporterOptsClone(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPExchange: utils.StringPointer("amqp_exchange"),
@@ -1345,6 +1351,8 @@ func TestEventExporterOptsClone(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPExchange: utils.StringPointer("amqp_exchange"),
@@ -1401,6 +1409,8 @@ func TestLoadFromJSONCfg(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPExchange: utils.StringPointer("amqp_exchange"),
@@ -1449,6 +1459,8 @@ func TestLoadFromJSONCfg(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPExchange: utils.StringPointer("amqp_exchange"),
@@ -1604,6 +1616,8 @@ func TestEEsAsMapInterface(t *testing.T) {
SQLDBName: utils.StringPointer("cgrates"),
PgSSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
KafkaCAPath: utils.StringPointer("kafkaCAPath"),
KafkaSkipTLSVerify: utils.BoolPointer(false),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPExchange: utils.StringPointer("amqp_exchange"),
@@ -1659,6 +1673,8 @@ func TestEEsAsMapInterface(t *testing.T) {
"elsVersionType": "version_type",
"elsWaitForActiveShards": "wfas",
"kafkaTopic": "topic1",
"kafkaCAPath": "kafkaCAPath",
"kafkaSkipTLSVerify": false,
"keyPath": "/path/to/key",
"natsCertificateAuthority": "ca",
"natsClientCertificate": "cc",
@@ -1687,6 +1703,6 @@ func TestEEsAsMapInterface(t *testing.T) {
rcv := eeCfg.AsMapInterface(",")
if !reflect.DeepEqual(exp[utils.OptsCfg], rcv[utils.OptsCfg]) {
t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(exp), utils.ToJSON(rcv))
t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(exp["opts"]), utils.ToJSON(rcv["opts"]))
}
}

View File

@@ -18,7 +18,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"crypto/tls"
"crypto/x509"
"net"
"os"
"sync"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -38,13 +43,21 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE
if cfg.Opts.KafkaTopic != nil {
kfkPstr.topic = *cfg.Opts.KafkaTopic
}
if cfg.Opts.KafkaCAPath != nil {
kfkPstr.caPath = *cfg.Opts.KafkaCAPath
}
if cfg.Opts.KafkaSkipTLSVerify != nil && *cfg.Opts.KafkaSkipTLSVerify {
kfkPstr.skipTLSVerify = true
}
return kfkPstr
}
// KafkaEE is a kafka poster
type KafkaEE struct {
topic string // identifier of the CDR queue where we publish
writer *kafka.Writer
topic string // identifier of the CDR queue where we publish
caPath string // path to CA pem file
skipTLSVerify bool // if true, it skips certificate validation
writer *kafka.Writer
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
@@ -55,7 +68,7 @@ type KafkaEE struct {
func (pstr *KafkaEE) Cfg() *config.EventExporterCfg { return pstr.cfg }
func (pstr *KafkaEE) Connect() (_ error) {
func (pstr *KafkaEE) Connect() (err error) {
pstr.Lock()
if pstr.writer == nil {
pstr.writer = &kafka.Writer{
@@ -63,11 +76,32 @@ func (pstr *KafkaEE) Connect() (_ error) {
Topic: pstr.topic,
MaxAttempts: pstr.Cfg().Attempts,
}
// pstr.writer = kafka.NewWriter(kafka.WriterConfig{
// Brokers: []string{pstr.Cfg().ExportPath},
// MaxAttempts: pstr.Cfg().Attempts,
// Topic: pstr.topic,
// })
}
var rootCAs *x509.CertPool
if rootCAs, err = x509.SystemCertPool(); err != nil {
return
}
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
if pstr.caPath != "" {
var ca []byte
if ca, err = os.ReadFile(pstr.caPath); err != nil {
return
}
if !rootCAs.AppendCertsFromPEM(ca) {
return
}
}
pstr.writer.Transport = &kafka.Transport{
Dial: (&net.Dialer{
Timeout: 3 * time.Second,
DualStack: true,
}).DialContext,
TLS: &tls.Config{
RootCAs: rootCAs,
InsecureSkipVerify: pstr.skipTLSVerify,
},
}
pstr.Unlock()
return

View File

@@ -2466,9 +2466,11 @@ const (
KafkaDefaultGroupID = "cgrates"
KafkaDefaultMaxWait = time.Millisecond
KafkaTopic = "kafkaTopic"
KafkaGroupID = "kafkaGroupID"
KafkaMaxWait = "kafkaMaxWait"
KafkaTopic = "kafkaTopic"
KafkaCAPath = "kafkaCAPath"
KafkaSkipTLSVerify = "kafkaSkipTLSVerify"
KafkaGroupID = "kafkaGroupID"
KafkaMaxWait = "kafkaMaxWait"
// partial
PartialOpt = "*partial"