Revise Kafka SSL test

- Updated to use the test suite
- Deleted kafka_ssl sample configuration (moved to test file)
- Replaced 'logger' section with 'general' in configuration
- Revised Kafka server SSL setup comment
- Ensured exporters are synchronous to avoid missing errors
- Implemented helper function to create and clean up kafka topics
This commit is contained in:
ionutboangiu
2024-06-30 10:00:55 +03:00
committed by Dan Christian Bogos
parent 4d8a2d7bdd
commit 36598294a2
3 changed files with 257 additions and 217 deletions

View File

@@ -1,85 +0,0 @@
{
"logger": {
"type": "*syslog",
"level": 7
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080"
},
"data_db": {
"db_type": "redis",
"db_port": 6379,
"db_name": "10"
},
"ees": {
"enabled": true,
"exporters": [
{
"id": "*default",
"type": "*kafka_json_map",
"export_path": "localhost:9093",
"opts": {
"kafkaTopic": "ssl-topic",
"kafkaTLS": true,
"kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem",
"kafkaSkipTLSVerify": false
},
"failed_posts_dir": "/var/spool/cgrates/failed_posts"
},
{
"id": "kafka_processed",
"type": "*kafka_json_map",
"export_path": "localhost:9092",
"opts": {
"kafkaTopic": "processed-topic",
"kafkaTLS": false,
"kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem",
"kafkaSkipTLSVerify": false
},
"failed_posts_dir": "/var/spool/cgrates/failed_posts"
}
]
},
"ers": {
"enabled": true,
"sessions_conns":[],
"ees_conns": ["*internal"],
"readers": [
{
"id": "*default",
"type": "*kafka_json_map",
"run_delay": "-1",
"flags": ["*dryRun"],
"source_path": "localhost:9093",
"ees_success_ids": ["kafka_processed"],
"opts": {
"kafkaTopic": "ssl-topic",
"kafkaTLS": true,
"kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem",
"kafkaSkipTLSVerify": false
},
"fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.ToR", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.Tenant", "mandatory": true},
{"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.Category", "mandatory": true},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "mandatory": true},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true}
]
}
]
}
}

View File

@@ -19,162 +19,287 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package general_tests
import (
"path"
"fmt"
"net"
"strconv"
"testing"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/segmentio/kafka-go"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
kafkaSSLConfigDir string
kafkaSSLCfgPath string
kafkaSSLCfg *config.CGRConfig
kafkaSSLRpc *birpc.Client
sTestsKafkaSSL = []func(t *testing.T){
testKafkaSSLLoadConfig,
testKafkaSSLResetDataDB,
testKafkaSSLStartEngine,
testKafkaSSLRPCConn,
testKafkaSSLExportEvent, // exports event to ssl-topic, then the reader will consume said event and export it to processed-topic
testKafkaSSLVerifyProcessedExport, // checks whether ERs managed to successfully read and export the events served by Kafka server
testKafkaSSLStopEngine,
}
)
// The test is exporting and reading from a kafka broker with the following configuration
/*
listeners=PLAINTEXT://:9092,SSL://localhost:9093
...
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
...
ssl.truststore.location=/home/kafka/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=123456
ssl.keystore.type=PKCS12
ssl.keystore.location=/home/kafka/kafka/ssl/kafka.server.keystore.p12
ssl.keystore.password=123456
ssl.key.password=123456
ssl.client.auth=none
ssl.protocol=TLSv1.2
security.inter.broker.protocol=SSL
*/
// How to create TLS keys and certificates:
/*
1. Generate CA if needed (openssl req -new -x509 -keyout ca-key.pem -out ca.pem -days 365);
2. Add the generated CA to the brokers truststore;
3. Generate key-certificate pair using the CA from step 1 to sign it and convert the pem files to p12 format;
4. Import both the certificate of the CA and the signed certificate into the broker keystore.
*/
// TestKafkaSSL tests exporting to and reading from a kafka broker through an SSL connection.
// Steps to set up a local kafka server with SSL setup can be found at the bottom of the file.
func TestKafkaSSL(t *testing.T) {
kafkaSSLConfigDir = "kafka_ssl"
for _, stest := range sTestsKafkaSSL {
t.Run(kafkaSSLConfigDir, stest)
switch *utils.DBType {
case utils.MetaInternal:
case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("unsupported dbtype value")
}
brokerPlainURL := "localhost:9092"
brokerSSLURL := "localhost:9094"
mainTopic := "cgrates-cdrs"
processedTopic := "processed"
content := fmt.Sprintf(`{
"general": {
"log_level": 7
},
"data_db": {
"db_type": "*internal"
},
"stor_db": {
"db_type": "*internal"
},
"ees": {
"enabled": true,
"exporters": [
{
"id": "kafka_ssl",
"type": "*kafka_json_map",
"export_path": "%s",
"synchronous": true,
"opts": {
"kafkaTopic": "%s",
"kafkaTLS": true,
"kafkaCAPath": "/tmp/ssl/kafka/ca.crt",
"kafkaSkipTLSVerify": false
},
"failed_posts_dir": "*none"
},
{
"id": "kafka_processed",
"type": "*kafka_json_map",
"export_path": "%s",
"synchronous": true,
"opts": {
"kafkaTopic": "%s"
},
"failed_posts_dir": "*none"
}
]
},
"ers": {
"enabled": true,
"sessions_conns":[],
"ees_conns": ["*localhost"],
"readers": [
{
"id": "kafka_ssl",
"type": "*kafka_json_map",
"run_delay": "-1",
"flags": ["*dryrun"],
"source_path": "%s",
"ees_success_ids": ["kafka_processed"],
"opts": {
"kafkaTopic": "%s",
"kafkaTLS": true,
"kafkaCAPath": "/tmp/ssl/kafka/ca.crt",
"kafkaSkipTLSVerify": false
},
"fields": [
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.ToR", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true}
]
}
]
}
func testKafkaSSLLoadConfig(t *testing.T) {
var err error
kafkaSSLCfgPath = path.Join(*utils.DataDir, "conf", "samples", kafkaSSLConfigDir)
if kafkaSSLCfg, err = config.NewCGRConfigFromPath(kafkaSSLCfgPath); err != nil {
t.Error(err)
}`, brokerSSLURL, mainTopic, brokerPlainURL, processedTopic, brokerSSLURL, mainTopic)
testEnv := TestEnvironment{
ConfigJSON: content,
}
client, _ := testEnv.Setup(t, 0)
createKafkaTopics(t, brokerPlainURL, true, mainTopic, processedTopic)
// export event to cgrates-cdrs topic, then the reader will consume it and
// export it to the 'processed' topic
t.Run("export kafka event", func(t *testing.T) {
var reply map[string]map[string]interface{}
if err := client.Call(context.Background(), utils.EeSv1ProcessEvent,
&engine.CGREventWithEeIDs{
EeIDs: []string{"kafka_ssl"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "KafkaEvent",
Event: map[string]interface{}{
utils.ToR: utils.MetaVoice,
utils.OriginID: "abcdef",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.MetaRated,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.AccountField: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813748, 0).UTC(),
utils.Usage: 10 * time.Second,
utils.RunID: utils.MetaDefault,
utils.Cost: 1.01,
},
},
}, &reply); err != nil {
t.Error(err)
}
})
// Check whether ERs managed to successfully consume the event from the
// 'cgrates-cdrs' topic and exported it to the 'processed' topic.
t.Run("verify kafka export", func(t *testing.T) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerPlainURL},
Topic: "processed",
})
t.Cleanup(func() {
if err := r.Close(); err != nil {
t.Error("failed to close reader:", err)
}
})
m, err := r.ReadMessage(context.Background())
if err != nil {
t.Fatalf("kafka.Reader.ReadMessage() failed unexpectedly: %v", err)
}
got := string(m.Value)
want := `{"Account":"1001","Destination":"1002","OriginID":"abcdef","ToR":"*voice","Usage":"10000000000"}`
if got != want {
t.Errorf("kafka.Reader.ReadMessage() = %v, want %v", got, want)
}
})
}
func testKafkaSSLResetDataDB(t *testing.T) {
if err := engine.InitDataDb(kafkaSSLCfg); err != nil {
t.Fatal(err)
}
}
func testKafkaSSLStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(kafkaSSLCfgPath, *utils.WaitRater); err != nil {
t.Fatal(err)
}
}
func testKafkaSSLRPCConn(t *testing.T) {
var err error
kafkaSSLRpc, err = newRPCClient(kafkaSSLCfg.ListenCfg())
func createKafkaTopics(t *testing.T, brokerURL string, cleanup bool, topics ...string) {
t.Helper()
conn, err := kafka.Dial("tcp", brokerURL)
if err != nil {
t.Fatal(err)
}
}
t.Cleanup(func() { conn.Close() })
func testKafkaSSLExportEvent(t *testing.T) {
event := &engine.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "KafkaEvent",
Event: map[string]interface{}{
utils.ToR: utils.MetaVoice,
utils.OriginID: "abcdef",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.MetaRated,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.AccountField: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813748, 0).UTC(),
utils.Usage: 10 * time.Second,
utils.RunID: utils.MetaDefault,
utils.Cost: 1.01,
},
},
controller, err := conn.Controller()
if err != nil {
t.Fatal(err)
}
controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
t.Fatal(err)
}
defer controllerConn.Close()
topicConfigs := make([]kafka.TopicConfig, 0, len(topics))
for _, topic := range topics {
topicConfigs = append(topicConfigs, kafka.TopicConfig{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
})
}
var reply map[string]map[string]interface{}
if err := kafkaSSLRpc.Call(context.Background(), utils.EeSv1ProcessEvent, event, &reply); err != nil {
t.Error(err)
}
time.Sleep(time.Second)
}
func testKafkaSSLVerifyProcessedExport(t *testing.T) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "processed-topic",
// MinBytes: 10e3, // 10KB
// MaxBytes: 10e6, // 10MB
})
ctx, cancel := context.WithCancel(context.Background())
var rcv string
for {
m, err := r.ReadMessage(ctx)
if err != nil {
break
}
rcv = string(m.Value)
cancel()
if err := controllerConn.CreateTopics(topicConfigs...); err != nil {
t.Fatal(err)
}
exp := `{"Account":"1001","AnswerTime":"2013-11-07T08:42:28Z","Category":"call","Cost":1.01,"Destination":"1002","OriginHost":"192.168.1.1","OriginID":"abcdef","RequestType":"*rated","RunID":"*default","SetupTime":"2013-11-07T08:42:25Z","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice","Usage":10000000000}`
if rcv != exp {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
}
if err := r.Close(); err != nil {
t.Fatal("failed to close reader:", err)
if cleanup {
t.Cleanup(func() {
if err := conn.DeleteTopics(topics...); err != nil {
t.Log(err)
}
})
}
}
func testKafkaSSLStopEngine(t *testing.T) {
if err := engine.KillEngine(*utils.WaitRater); err != nil {
t.Error(err)
}
}
// Kafka broker has the following configuration:
/*
/opt/kafka/config/kraft/server.properties
----------------------------------------
listeners=PLAINTEXT://:9092,CONTROLLER://:9093,SSL://:9094
...
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9094
...
ssl.truststore.location=/tmp/ssl/kafka/kafka.truststore.jks
ssl.truststore.password=123456
ssl.keystore.location=/tmp/ssl/kafka/kafka.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.client.auth=none
*/
// Script to generate TLS keys and certificates:
/*
#!/bin/bash
mkdir -p /tmp/ssl/kafka
# Generate CA key
openssl genpkey -algorithm RSA -out /tmp/ssl/kafka/ca.key
# Generate CA certificate
openssl req -x509 -new -key /tmp/ssl/kafka/ca.key -days 3650 -out /tmp/ssl/kafka/ca.crt \
-subj "/C=US/ST=California/L=San Francisco/O=MyOrg/CN=localhost/emailAddress=example@email.com"
# Generate server key and CSR
openssl req -new -newkey rsa:4096 -nodes -keyout /tmp/ssl/kafka/server.key \
-out /tmp/ssl/kafka/server.csr \
-subj "/C=US/ST=California/L=San Francisco/O=MyOrg/CN=localhost/emailAddress=example@email.com"
# Create SAN configuration file
echo "authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment
subjectAltName = @alt_names
[alt_names]
DNS.1=localhost
IP.1=127.0.0.1
" > /tmp/ssl/kafka/san.cnf
# Sign server certificate with CA
openssl x509 -req -in /tmp/ssl/kafka/server.csr -CA /tmp/ssl/kafka/ca.crt \
-CAkey /tmp/ssl/kafka/ca.key -CAcreateserial -out /tmp/ssl/kafka/server.crt \
-days 3650 -extfile /tmp/ssl/kafka/san.cnf
# Convert server certificate and key to PKCS12 format
openssl pkcs12 -export \
-in /tmp/ssl/kafka/server.crt \
-inkey /tmp/ssl/kafka/server.key \
-name kafka-broker \
-out /tmp/ssl/kafka/kafka.p12 \
-password pass:123456
# Import PKCS12 file into Java keystore
keytool -importkeystore \
-srckeystore /tmp/ssl/kafka/kafka.p12 \
-destkeystore /tmp/ssl/kafka/kafka.keystore.jks \
-srcstoretype pkcs12 \
-srcstorepass 123456 \
-deststorepass 123456 \
-noprompt
# Create truststore and import CA certificate
keytool -keystore /tmp/ssl/kafka/kafka.truststore.jks -alias CARoot -import -file /tmp/ssl/kafka/ca.crt \
-storepass 123456 \
-noprompt
*/

View File

@@ -1,4 +1,4 @@
//go:build integration || flaky
//go:build integration || flaky || kafka
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments