mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
Add support for kafka ssl encryption
For both poster and reader. Added integration test.
This commit is contained in:
committed by
Dan Christian Bogos
parent
e0d15f1a9e
commit
36bdc41e97
@@ -414,6 +414,9 @@ const CGRATES_CFG_JSON = `
|
||||
// "kafkaTopic": "cgrates", // the topic from were the events are read
|
||||
// "kafkaGroupID": "cgrates", // the group that reads the events
|
||||
// "kafkaMaxWait": "1ms", // the maximum amount of time to wait for new data to come
|
||||
// "kafkaTLS": false, // if true it will try to authenticate the client
|
||||
// "kafkaCAPath": "", // path to certificate authority pem
|
||||
// "kafkaSkipTLSVerify": false, // if true it will skip certificate verification
|
||||
|
||||
// SQL
|
||||
// "sqlDBName": "cgrates", // the name of the database from were the events are read
|
||||
@@ -534,6 +537,9 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
// Kafka
|
||||
// "kafkaTopic": "cgrates", // the topic from where the events are exported
|
||||
// "kafkaTLS": false, // if true, it will try to authenticate the server
|
||||
// "kafkaCAPath": "", // path to certificate authority pem
|
||||
// "kafkaSkipTLSVerify: false, // if true it will skip certificate verification
|
||||
|
||||
|
||||
// AMQP
|
||||
|
||||
@@ -242,7 +242,10 @@ type RPCOpts struct {
|
||||
}
|
||||
|
||||
type KafkaOpts struct {
|
||||
KafkaTopic *string
|
||||
Topic *string
|
||||
TLS *bool
|
||||
CAPath *string
|
||||
SkipTLSVerify *bool
|
||||
}
|
||||
|
||||
type EventExporterOpts struct {
|
||||
@@ -381,7 +384,16 @@ func (elsOpts *ElsOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err erro
|
||||
|
||||
func (kafkaOpts *KafkaOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) {
|
||||
if jsnCfg.KafkaTopic != nil {
|
||||
kafkaOpts.KafkaTopic = jsnCfg.KafkaTopic
|
||||
kafkaOpts.Topic = jsnCfg.KafkaTopic
|
||||
}
|
||||
if jsnCfg.KafkaTLS != nil {
|
||||
kafkaOpts.TLS = jsnCfg.KafkaTLS
|
||||
}
|
||||
if jsnCfg.KafkaCAPath != nil {
|
||||
kafkaOpts.CAPath = jsnCfg.KafkaCAPath
|
||||
}
|
||||
if jsnCfg.KafkaSkipTLSVerify != nil {
|
||||
kafkaOpts.SkipTLSVerify = jsnCfg.KafkaSkipTLSVerify
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -711,10 +723,21 @@ func (elsOpts *ElsOpts) Clone() *ElsOpts {
|
||||
|
||||
func (kafkaOpts *KafkaOpts) Clone() *KafkaOpts {
|
||||
cln := &KafkaOpts{}
|
||||
|
||||
if kafkaOpts.KafkaTopic != nil {
|
||||
cln.KafkaTopic = new(string)
|
||||
*cln.KafkaTopic = *kafkaOpts.KafkaTopic
|
||||
if kafkaOpts.Topic != nil {
|
||||
cln.Topic = new(string)
|
||||
*cln.Topic = *kafkaOpts.Topic
|
||||
}
|
||||
if kafkaOpts.TLS != nil {
|
||||
cln.TLS = new(bool)
|
||||
*cln.TLS = *kafkaOpts.TLS
|
||||
}
|
||||
if kafkaOpts.CAPath != nil {
|
||||
cln.CAPath = new(string)
|
||||
*cln.CAPath = *kafkaOpts.CAPath
|
||||
}
|
||||
if kafkaOpts.SkipTLSVerify != nil {
|
||||
cln.SkipTLSVerify = new(bool)
|
||||
*cln.SkipTLSVerify = *kafkaOpts.SkipTLSVerify
|
||||
}
|
||||
return cln
|
||||
}
|
||||
@@ -1032,8 +1055,17 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
|
||||
}
|
||||
}
|
||||
if kafkaOpts := eeC.Opts.Kafka; kafkaOpts != nil {
|
||||
if kafkaOpts.KafkaTopic != nil {
|
||||
opts[utils.KafkaTopic] = *kafkaOpts.KafkaTopic
|
||||
if kafkaOpts.Topic != nil {
|
||||
opts[utils.KafkaTopic] = *kafkaOpts.Topic
|
||||
}
|
||||
if kafkaOpts.TLS != nil {
|
||||
opts[utils.KafkaTLS] = *kafkaOpts.TLS
|
||||
}
|
||||
if kafkaOpts.CAPath != nil {
|
||||
opts[utils.KafkaCAPath] = *kafkaOpts.CAPath
|
||||
}
|
||||
if kafkaOpts.SkipTLSVerify != nil {
|
||||
opts[utils.KafkaSkipTLSVerify] = *kafkaOpts.SkipTLSVerify
|
||||
}
|
||||
}
|
||||
if amOpts := eeC.Opts.AMQP; amOpts != nil {
|
||||
|
||||
@@ -255,7 +255,7 @@ func TestEESClone(t *testing.T) {
|
||||
WaitForActiveShards: utils.StringPointer("test6"),
|
||||
},
|
||||
Kafka: &KafkaOpts{
|
||||
KafkaTopic: utils.StringPointer("kafka"),
|
||||
Topic: utils.StringPointer("kafka"),
|
||||
},
|
||||
AWS: &AWSOpts{
|
||||
Token: utils.StringPointer("token"),
|
||||
|
||||
@@ -197,9 +197,12 @@ func (amqpr *AMQPROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error)
|
||||
}
|
||||
|
||||
type KafkaROpts struct {
|
||||
Topic *string
|
||||
GroupID *string
|
||||
MaxWait *time.Duration
|
||||
Topic *string
|
||||
GroupID *string
|
||||
MaxWait *time.Duration
|
||||
TLS *bool
|
||||
CAPath *string
|
||||
SkipTLSVerify *bool
|
||||
}
|
||||
|
||||
func (kafkaROpts *KafkaROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
|
||||
@@ -216,6 +219,15 @@ func (kafkaROpts *KafkaROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err
|
||||
}
|
||||
kafkaROpts.MaxWait = utils.DurationPointer(kafkaMaxWait)
|
||||
}
|
||||
if jsnCfg.KafkaTLS != nil {
|
||||
kafkaROpts.TLS = jsnCfg.KafkaTLS
|
||||
}
|
||||
if jsnCfg.KafkaCAPath != nil {
|
||||
kafkaROpts.CAPath = jsnCfg.KafkaCAPath
|
||||
}
|
||||
if jsnCfg.KafkaSkipTLSVerify != nil {
|
||||
kafkaROpts.SkipTLSVerify = jsnCfg.KafkaSkipTLSVerify
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -592,6 +604,18 @@ func (kafkaOpts *KafkaROpts) Clone() *KafkaROpts {
|
||||
cln.MaxWait = new(time.Duration)
|
||||
*cln.MaxWait = *kafkaOpts.MaxWait
|
||||
}
|
||||
if kafkaOpts.TLS != nil {
|
||||
cln.TLS = new(bool)
|
||||
*cln.TLS = *kafkaOpts.TLS
|
||||
}
|
||||
if kafkaOpts.CAPath != nil {
|
||||
cln.CAPath = new(string)
|
||||
*cln.CAPath = *kafkaOpts.CAPath
|
||||
}
|
||||
if kafkaOpts.SkipTLSVerify != nil {
|
||||
cln.SkipTLSVerify = new(bool)
|
||||
*cln.SkipTLSVerify = *kafkaOpts.SkipTLSVerify
|
||||
}
|
||||
return cln
|
||||
}
|
||||
|
||||
@@ -837,6 +861,15 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
|
||||
if kafkaOpts.MaxWait != nil {
|
||||
opts[utils.KafkaMaxWait] = kafkaOpts.MaxWait.String()
|
||||
}
|
||||
if kafkaOpts.TLS != nil {
|
||||
opts[utils.KafkaTLS] = *kafkaOpts.TLS
|
||||
}
|
||||
if kafkaOpts.CAPath != nil {
|
||||
opts[utils.KafkaCAPath] = *kafkaOpts.CAPath
|
||||
}
|
||||
if kafkaOpts.SkipTLSVerify != nil {
|
||||
opts[utils.KafkaSkipTLSVerify] = *kafkaOpts.SkipTLSVerify
|
||||
}
|
||||
}
|
||||
|
||||
if sqlOpts := er.Opts.SQL; sqlOpts != nil {
|
||||
|
||||
@@ -229,6 +229,9 @@ type EventReaderOptsJson struct {
|
||||
KafkaTopic *string `json:"kafkaTopic"`
|
||||
KafkaGroupID *string `json:"kafkaGroupID"`
|
||||
KafkaMaxWait *string `json:"kafkaMaxWait"`
|
||||
KafkaTLS *bool `json:"kafkaTLS"`
|
||||
KafkaCAPath *string `json:"kafkaCAPath"`
|
||||
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`
|
||||
SQLDBName *string `json:"sqlDBName"`
|
||||
SQLTableName *string `json:"sqlTableName"`
|
||||
PgSSLMode *string `json:"pgSSLMode"`
|
||||
@@ -316,6 +319,9 @@ type EventExporterOptsJson struct {
|
||||
SQLDBName *string `json:"sqlDBName"`
|
||||
PgSSLMode *string `json:"pgSSLMode"`
|
||||
KafkaTopic *string `json:"kafkaTopic"`
|
||||
KafkaTLS *bool `json:"kafkaTLS"`
|
||||
KafkaCAPath *string `json:"kafkaCAPath"`
|
||||
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`
|
||||
AMQPQueueID *string `json:"amqpQueueID"`
|
||||
AMQPRoutingKey *string `json:"amqpRoutingKey"`
|
||||
AMQPExchange *string `json:"amqpExchange"`
|
||||
|
||||
85
data/conf/samples/kafka_ssl/cgrates.json
Normal file
85
data/conf/samples/kafka_ssl/cgrates.json
Normal file
@@ -0,0 +1,85 @@
|
||||
{
|
||||
|
||||
"logger": {
|
||||
"type": "*syslog",
|
||||
"level": 7
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":2012",
|
||||
"rpc_gob": ":2013",
|
||||
"http": ":2080"
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "redis",
|
||||
"db_port": 6379,
|
||||
"db_name": "10"
|
||||
},
|
||||
|
||||
"ees": {
|
||||
"enabled": true,
|
||||
"exporters": [
|
||||
{
|
||||
"id": "*default",
|
||||
"type": "*kafka_json_map",
|
||||
"export_path": "localhost:9093",
|
||||
"opts": {
|
||||
"kafkaTopic": "ssl-topic",
|
||||
"kafkaTLS": true,
|
||||
"kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem",
|
||||
"kafkaSkipTLSVerify": false
|
||||
},
|
||||
"failed_posts_dir": "/var/spool/cgrates/failed_posts"
|
||||
},
|
||||
{
|
||||
"id": "kafka_processed",
|
||||
"type": "*kafka_json_map",
|
||||
"export_path": "localhost:9092",
|
||||
"opts": {
|
||||
"kafkaTopic": "processed-topic",
|
||||
"kafkaTLS": false,
|
||||
"kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem",
|
||||
"kafkaSkipTLSVerify": false
|
||||
},
|
||||
"failed_posts_dir": "/var/spool/cgrates/failed_posts"
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
"ers": {
|
||||
"enabled": true,
|
||||
"sessions_conns":[],
|
||||
"ees_conns": ["*internal"],
|
||||
"readers": [
|
||||
{
|
||||
"id": "*default",
|
||||
"type": "*kafka_json_map",
|
||||
"run_delay": "-1",
|
||||
"flags": ["*dryRun"],
|
||||
"source_path": "localhost:9093",
|
||||
"ees_success_ids": ["kafka_processed"],
|
||||
"opts": {
|
||||
"kafkaTopic": "ssl-topic",
|
||||
"kafkaTLS": true,
|
||||
"kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem",
|
||||
"kafkaSkipTLSVerify": false
|
||||
},
|
||||
"fields": [
|
||||
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.ToR", "mandatory": true},
|
||||
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true},
|
||||
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true},
|
||||
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.Tenant", "mandatory": true},
|
||||
{"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.Category", "mandatory": true},
|
||||
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true},
|
||||
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true},
|
||||
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true},
|
||||
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "mandatory": true},
|
||||
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
}
|
||||
58
ees/kafka.go
58
ees/kafka.go
@@ -19,7 +19,12 @@ package ees
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -34,18 +39,28 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE
|
||||
topic: utils.DefaultQueueID,
|
||||
reqs: newConcReq(cfg.ConcurrentRequests),
|
||||
}
|
||||
if kafkaOpts := cfg.Opts.Kafka; kafkaOpts != nil {
|
||||
if kafkaOpts.KafkaTopic != nil {
|
||||
kfkPstr.topic = *cfg.Opts.Kafka.KafkaTopic
|
||||
}
|
||||
if cfg.Opts.Kafka.Topic != nil {
|
||||
kfkPstr.topic = *cfg.Opts.Kafka.Topic
|
||||
}
|
||||
if cfg.Opts.Kafka.TLS != nil && *cfg.Opts.Kafka.TLS {
|
||||
kfkPstr.tls = true
|
||||
}
|
||||
if cfg.Opts.Kafka.CAPath != nil {
|
||||
kfkPstr.caPath = *cfg.Opts.Kafka.CAPath
|
||||
}
|
||||
if cfg.Opts.Kafka.SkipTLSVerify != nil && *cfg.Opts.Kafka.SkipTLSVerify {
|
||||
kfkPstr.skipTLSVerify = true
|
||||
}
|
||||
return kfkPstr
|
||||
}
|
||||
|
||||
// KafkaEE is a kafka poster
|
||||
type KafkaEE struct {
|
||||
topic string // identifier of the CDR queue where we publish
|
||||
writer *kafka.Writer
|
||||
topic string // identifier of the CDR queue where we publish
|
||||
tls bool // if true, it will attempt to authenticate the server
|
||||
caPath string // path to CA pem file
|
||||
skipTLSVerify bool // if true, it skips certificate verification
|
||||
writer *kafka.Writer
|
||||
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
@@ -58,6 +73,7 @@ func (pstr *KafkaEE) Cfg() *config.EventExporterCfg { return pstr.cfg }
|
||||
|
||||
func (pstr *KafkaEE) Connect() (_ error) {
|
||||
pstr.Lock()
|
||||
defer pstr.Unlock()
|
||||
if pstr.writer == nil {
|
||||
pstr.writer = &kafka.Writer{
|
||||
Addr: kafka.TCP(pstr.Cfg().ExportPath),
|
||||
@@ -65,7 +81,35 @@ func (pstr *KafkaEE) Connect() (_ error) {
|
||||
MaxAttempts: pstr.Cfg().Attempts,
|
||||
}
|
||||
}
|
||||
pstr.Unlock()
|
||||
if pstr.tls {
|
||||
rootCAs, err := x509.SystemCertPool()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if rootCAs == nil {
|
||||
rootCAs = x509.NewCertPool()
|
||||
}
|
||||
if pstr.caPath != "" {
|
||||
ca, err := os.ReadFile(pstr.caPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !rootCAs.AppendCertsFromPEM(ca) {
|
||||
return
|
||||
}
|
||||
}
|
||||
pstr.writer.Transport = &kafka.Transport{
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: 3 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
TLS: &tls.Config{
|
||||
RootCAs: rootCAs,
|
||||
InsecureSkipVerify: pstr.skipTLSVerify,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -79,8 +79,8 @@ func AddFailedPost(failedPostsDir, expPath, format string, ev any, opts *config.
|
||||
}
|
||||
}
|
||||
if kfkOpts := opts.Kafka; kfkOpts != nil {
|
||||
if opts.Kafka.KafkaTopic != nil {
|
||||
kafkaTopic = *opts.Kafka.KafkaTopic
|
||||
if opts.Kafka.Topic != nil {
|
||||
kafkaTopic = *opts.Kafka.Topic
|
||||
}
|
||||
}
|
||||
if qID := utils.FirstNonEmpty(amqpQueueID, s3BucketID, sqsQueueID,
|
||||
|
||||
@@ -56,7 +56,7 @@ func TestKafkaParseURL(t *testing.T) {
|
||||
Attempts: 10,
|
||||
Opts: &config.EventExporterOpts{
|
||||
Kafka: &config.KafkaOpts{
|
||||
KafkaTopic: utils.StringPointer("cdr_billing"),
|
||||
Topic: utils.StringPointer("cdr_billing"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -274,15 +274,15 @@ func TestDebitCreditBlocker(t *testing.T) {
|
||||
var err error
|
||||
cc, err = rifsBalance.debitCreditBalance(cd, false, true, true, nil)
|
||||
if err != nil {
|
||||
t.Error("Error debiting balance: ", err)
|
||||
t.Fatal("Error debiting balance: ", err)
|
||||
}
|
||||
if len(cc.Timespans) != 0 {
|
||||
t.Error("Wrong call cost: ", utils.ToIJSON(cc))
|
||||
t.Error("Wrong call cost: ", utils.ToJSON(cc))
|
||||
}
|
||||
if rifsBalance.BalanceMap[utils.MetaMonetary][0].GetValue() != 0.1152 ||
|
||||
rifsBalance.BalanceMap[utils.MetaMonetary][1].GetValue() != 1.5 {
|
||||
t.Error("should not have touched the balances: ",
|
||||
utils.ToIJSON(rifsBalance.BalanceMap[utils.MetaMonetary]))
|
||||
utils.ToJSON(rifsBalance.BalanceMap[utils.MetaMonetary]))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
56
ers/kafka.go
56
ers/kafka.go
@@ -20,9 +20,12 @@ package ers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/agents"
|
||||
@@ -67,10 +70,13 @@ type KafkaER struct {
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
|
||||
dialURL string
|
||||
topic string
|
||||
groupID string
|
||||
maxWait time.Duration
|
||||
dialURL string
|
||||
topic string
|
||||
groupID string
|
||||
maxWait time.Duration
|
||||
tls bool // if true it will attempt to authenticate the server it connects to
|
||||
caPath string // path to CA pem file
|
||||
skipTLSVerify bool // if true it skips certificate validation
|
||||
|
||||
rdrEvents chan *erEvent // channel to dispatch the events created to
|
||||
partialEvents chan *erEvent // channel to dispatch the partial events created to
|
||||
@@ -86,12 +92,41 @@ func (rdr *KafkaER) Config() *config.EventReaderCfg {
|
||||
|
||||
// Serve will start the gorutines needed to watch the kafka topic
|
||||
func (rdr *KafkaER) Serve() (err error) {
|
||||
r := kafka.NewReader(kafka.ReaderConfig{
|
||||
readerCfg := kafka.ReaderConfig{
|
||||
Brokers: []string{rdr.dialURL},
|
||||
GroupID: rdr.groupID,
|
||||
Topic: rdr.topic,
|
||||
MaxWait: rdr.maxWait,
|
||||
})
|
||||
}
|
||||
|
||||
if rdr.tls {
|
||||
var rootCAs *x509.CertPool
|
||||
if rootCAs, err = x509.SystemCertPool(); err != nil {
|
||||
return
|
||||
}
|
||||
if rootCAs == nil {
|
||||
rootCAs = x509.NewCertPool()
|
||||
}
|
||||
if rdr.caPath != "" {
|
||||
var ca []byte
|
||||
if ca, err = os.ReadFile(rdr.caPath); err != nil {
|
||||
return
|
||||
}
|
||||
if !rootCAs.AppendCertsFromPEM(ca) {
|
||||
return
|
||||
}
|
||||
}
|
||||
readerCfg.Dialer = &kafka.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
DualStack: true,
|
||||
TLS: &tls.Config{
|
||||
RootCAs: rootCAs,
|
||||
InsecureSkipVerify: rdr.skipTLSVerify,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
r := kafka.NewReader(readerCfg)
|
||||
|
||||
if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API
|
||||
return
|
||||
@@ -184,6 +219,15 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) {
|
||||
if kfkOpts.MaxWait != nil {
|
||||
rdr.maxWait = *kfkOpts.MaxWait
|
||||
}
|
||||
if kfkOpts.TLS != nil && *kfkOpts.TLS {
|
||||
rdr.tls = true
|
||||
}
|
||||
if kfkOpts.CAPath != nil {
|
||||
rdr.caPath = *kfkOpts.CAPath
|
||||
}
|
||||
if kfkOpts.SkipTLSVerify != nil && *kfkOpts.SkipTLSVerify {
|
||||
rdr.skipTLSVerify = true
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
180
general_tests/kafka_ssl_it_test.go
Normal file
180
general_tests/kafka_ssl_it_test.go
Normal file
@@ -0,0 +1,180 @@
|
||||
//go:build kafka
|
||||
// +build kafka
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package general_tests
|
||||
|
||||
import (
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/segmentio/kafka-go"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
kafkaSSLConfigDir string
|
||||
kafkaSSLCfgPath string
|
||||
kafkaSSLCfg *config.CGRConfig
|
||||
kafkaSSLRpc *birpc.Client
|
||||
|
||||
sTestsKafkaSSL = []func(t *testing.T){
|
||||
testKafkaSSLLoadConfig,
|
||||
testKafkaSSLResetDataDB,
|
||||
|
||||
testKafkaSSLStartEngine,
|
||||
testKafkaSSLRPCConn,
|
||||
testKafkaSSLExportEvent, // exports event to ssl-topic, then the reader will consume said event and export it to processed-topic
|
||||
testKafkaSSLVerifyProcessedExport, // checks whether ERs managed to successfully read and export the events served by Kafka server
|
||||
testKafkaSSLStopEngine,
|
||||
}
|
||||
)
|
||||
|
||||
// The test is exporting and reading from a kafka broker with the following configuration
|
||||
|
||||
/*
|
||||
listeners=PLAINTEXT://:9092,SSL://localhost:9093
|
||||
...
|
||||
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
|
||||
...
|
||||
ssl.truststore.location=/home/kafka/kafka/ssl/kafka.server.truststore.jks
|
||||
ssl.truststore.password=123456
|
||||
ssl.keystore.type=PKCS12
|
||||
ssl.keystore.location=/home/kafka/kafka/ssl/kafka.server.keystore.p12
|
||||
ssl.keystore.password=123456
|
||||
ssl.key.password=123456
|
||||
ssl.client.auth=none
|
||||
ssl.protocol=TLSv1.2
|
||||
security.inter.broker.protocol=SSL
|
||||
*/
|
||||
|
||||
// How to create TLS keys and certificates:
|
||||
|
||||
/*
|
||||
1. Generate CA if needed (openssl req -new -x509 -keyout ca-key.pem -out ca.pem -days 365);
|
||||
2. Add the generated CA to the brokers’ truststore;
|
||||
3. Generate key-certificate pair using the CA from step 1 to sign it and convert the pem files to p12 format;
|
||||
4. Import both the certificate of the CA and the signed certificate into the broker keystore.
|
||||
*/
|
||||
|
||||
func TestKafkaSSL(t *testing.T) {
|
||||
kafkaSSLConfigDir = "kafka_ssl"
|
||||
for _, stest := range sTestsKafkaSSL {
|
||||
t.Run(kafkaSSLConfigDir, stest)
|
||||
}
|
||||
}
|
||||
|
||||
func testKafkaSSLLoadConfig(t *testing.T) {
|
||||
var err error
|
||||
kafkaSSLCfgPath = path.Join(*dataDir, "conf", "samples", kafkaSSLConfigDir)
|
||||
if kafkaSSLCfg, err = config.NewCGRConfigFromPath(kafkaSSLCfgPath); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testKafkaSSLResetDataDB(t *testing.T) {
|
||||
if err := engine.InitDataDb(kafkaSSLCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testKafkaSSLStartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(kafkaSSLCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testKafkaSSLRPCConn(t *testing.T) {
|
||||
var err error
|
||||
kafkaSSLRpc, err = newRPCClient(kafkaSSLCfg.ListenCfg())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testKafkaSSLExportEvent(t *testing.T) {
|
||||
event := &engine.CGREventWithEeIDs{
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "KafkaEvent",
|
||||
Event: map[string]interface{}{
|
||||
utils.ToR: utils.MetaVoice,
|
||||
utils.OriginID: "abcdef",
|
||||
utils.OriginHost: "192.168.1.1",
|
||||
utils.RequestType: utils.MetaRated,
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.Category: "call",
|
||||
utils.AccountField: "1001",
|
||||
utils.Subject: "1001",
|
||||
utils.Destination: "1002",
|
||||
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
|
||||
utils.AnswerTime: time.Unix(1383813748, 0).UTC(),
|
||||
utils.Usage: 10 * time.Second,
|
||||
utils.RunID: utils.MetaDefault,
|
||||
utils.Cost: 1.01,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var reply map[string]map[string]interface{}
|
||||
if err := kafkaSSLRpc.Call(context.Background(), utils.EeSv1ProcessEvent, event, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
func testKafkaSSLVerifyProcessedExport(t *testing.T) {
|
||||
r := kafka.NewReader(kafka.ReaderConfig{
|
||||
Brokers: []string{"localhost:9092"},
|
||||
Topic: "processed-topic",
|
||||
// MinBytes: 10e3, // 10KB
|
||||
// MaxBytes: 10e6, // 10MB
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
var rcv string
|
||||
for {
|
||||
m, err := r.ReadMessage(ctx)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
rcv = string(m.Value)
|
||||
cancel()
|
||||
}
|
||||
|
||||
exp := `{"Account":"1001","AnswerTime":"2013-11-07T08:42:28Z","Category":"call","Cost":1.01,"Destination":"1002","OriginHost":"192.168.1.1","OriginID":"abcdef","RequestType":"*rated","RunID":"*default","SetupTime":"2013-11-07T08:42:25Z","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice","Usage":10000000000}`
|
||||
|
||||
if rcv != exp {
|
||||
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
|
||||
}
|
||||
|
||||
if err := r.Close(); err != nil {
|
||||
t.Fatal("failed to close reader:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func testKafkaSSLStopEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(*waitRater); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
@@ -2655,9 +2655,12 @@ const (
|
||||
KafkaDefaultGroupID = "cgrates"
|
||||
KafkaDefaultMaxWait = time.Millisecond
|
||||
|
||||
KafkaTopic = "kafkaTopic"
|
||||
KafkaGroupID = "kafkaGroupID"
|
||||
KafkaMaxWait = "kafkaMaxWait"
|
||||
KafkaTopic = "kafkaTopic"
|
||||
KafkaTLS = "kafkaTLS"
|
||||
KafkaCAPath = "kafkaCAPath"
|
||||
KafkaSkipTLSVerify = "kafkaSkipTLSVerify"
|
||||
KafkaGroupID = "kafkaGroupID"
|
||||
KafkaMaxWait = "kafkaMaxWait"
|
||||
|
||||
// partial
|
||||
PartialOpt = "*partial"
|
||||
|
||||
Reference in New Issue
Block a user