From 36598294a29474c85836d84e1bd3d88b50076796 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Sun, 30 Jun 2024 10:00:55 +0300 Subject: [PATCH] Revise Kafka SSL test - Updated to use the test suite - Deleted kafka_ssl sample configuration (moved to test file) - Replaced 'logger' section with 'general' in configuration - Revised Kafka server SSL setup comment - Ensured exporters are synchronous to avoid missing errors - Implemented helper function to create and clean up kafka topics --- data/conf/samples/kafka_ssl/cgrates.json | 85 ----- general_tests/kafka_ssl_it_test.go | 387 +++++++++++++++-------- general_tests/lib_test.go | 2 +- 3 files changed, 257 insertions(+), 217 deletions(-) delete mode 100644 data/conf/samples/kafka_ssl/cgrates.json diff --git a/data/conf/samples/kafka_ssl/cgrates.json b/data/conf/samples/kafka_ssl/cgrates.json deleted file mode 100644 index 8f115570b..000000000 --- a/data/conf/samples/kafka_ssl/cgrates.json +++ /dev/null @@ -1,85 +0,0 @@ -{ - -"logger": { - "type": "*syslog", - "level": 7 -}, - -"listen": { - "rpc_json": ":2012", - "rpc_gob": ":2013", - "http": ":2080" -}, - -"data_db": { - "db_type": "redis", - "db_port": 6379, - "db_name": "10" -}, - -"ees": { - "enabled": true, - "exporters": [ - { - "id": "*default", - "type": "*kafka_json_map", - "export_path": "localhost:9093", - "opts": { - "kafkaTopic": "ssl-topic", - "kafkaTLS": true, - "kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem", - "kafkaSkipTLSVerify": false - }, - "failed_posts_dir": "/var/spool/cgrates/failed_posts" - }, - { - "id": "kafka_processed", - "type": "*kafka_json_map", - "export_path": "localhost:9092", - "opts": { - "kafkaTopic": "processed-topic", - "kafkaTLS": false, - "kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem", - "kafkaSkipTLSVerify": false - }, - "failed_posts_dir": "/var/spool/cgrates/failed_posts" - } - ] -}, - -"ers": { - "enabled": true, - "sessions_conns":[], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "*default", - "type": "*kafka_json_map", - "run_delay": "-1", - "flags": ["*dryRun"], - "source_path": "localhost:9093", - "ees_success_ids": ["kafka_processed"], - "opts": { - "kafkaTopic": "ssl-topic", - "kafkaTLS": true, - "kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem", - "kafkaSkipTLSVerify": false - }, - "fields": [ - {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.ToR", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.Tenant", "mandatory": true}, - {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.Category", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true} - ] - } - ] -} - -} \ No newline at end of file diff --git a/general_tests/kafka_ssl_it_test.go b/general_tests/kafka_ssl_it_test.go index 3aa6af52d..3cf39035f 100644 --- a/general_tests/kafka_ssl_it_test.go +++ b/general_tests/kafka_ssl_it_test.go @@ -19,162 +19,287 @@ along with this program. If not, see package general_tests import ( - "path" + "fmt" + "net" + "strconv" "testing" "time" - "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/segmentio/kafka-go" - "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -var ( - kafkaSSLConfigDir string - kafkaSSLCfgPath string - kafkaSSLCfg *config.CGRConfig - kafkaSSLRpc *birpc.Client - - sTestsKafkaSSL = []func(t *testing.T){ - testKafkaSSLLoadConfig, - testKafkaSSLResetDataDB, - - testKafkaSSLStartEngine, - testKafkaSSLRPCConn, - testKafkaSSLExportEvent, // exports event to ssl-topic, then the reader will consume said event and export it to processed-topic - testKafkaSSLVerifyProcessedExport, // checks whether ERs managed to successfully read and export the events served by Kafka server - testKafkaSSLStopEngine, - } -) - -// The test is exporting and reading from a kafka broker with the following configuration - -/* -listeners=PLAINTEXT://:9092,SSL://localhost:9093 -... -advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093 -... -ssl.truststore.location=/home/kafka/kafka/ssl/kafka.server.truststore.jks -ssl.truststore.password=123456 -ssl.keystore.type=PKCS12 -ssl.keystore.location=/home/kafka/kafka/ssl/kafka.server.keystore.p12 -ssl.keystore.password=123456 -ssl.key.password=123456 -ssl.client.auth=none -ssl.protocol=TLSv1.2 -security.inter.broker.protocol=SSL -*/ - -// How to create TLS keys and certificates: - -/* -1. Generate CA if needed (openssl req -new -x509 -keyout ca-key.pem -out ca.pem -days 365); -2. Add the generated CA to the brokers’ truststore; -3. Generate key-certificate pair using the CA from step 1 to sign it and convert the pem files to p12 format; -4. Import both the certificate of the CA and the signed certificate into the broker keystore. -*/ - +// TestKafkaSSL tests exporting to and reading from a kafka broker through an SSL connection. +// Steps to set up a local kafka server with SSL setup can be found at the bottom of the file. func TestKafkaSSL(t *testing.T) { - kafkaSSLConfigDir = "kafka_ssl" - for _, stest := range sTestsKafkaSSL { - t.Run(kafkaSSLConfigDir, stest) + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") } + + brokerPlainURL := "localhost:9092" + brokerSSLURL := "localhost:9094" + mainTopic := "cgrates-cdrs" + processedTopic := "processed" + + content := fmt.Sprintf(`{ + +"general": { + "log_level": 7 +}, + +"data_db": { + "db_type": "*internal" +}, + +"stor_db": { + "db_type": "*internal" +}, + +"ees": { + "enabled": true, + "exporters": [ + { + "id": "kafka_ssl", + "type": "*kafka_json_map", + "export_path": "%s", + "synchronous": true, + "opts": { + "kafkaTopic": "%s", + "kafkaTLS": true, + "kafkaCAPath": "/tmp/ssl/kafka/ca.crt", + "kafkaSkipTLSVerify": false + }, + "failed_posts_dir": "*none" + }, + { + "id": "kafka_processed", + "type": "*kafka_json_map", + "export_path": "%s", + "synchronous": true, + "opts": { + "kafkaTopic": "%s" + }, + "failed_posts_dir": "*none" + } + ] +}, + +"ers": { + "enabled": true, + "sessions_conns":[], + "ees_conns": ["*localhost"], + "readers": [ + { + "id": "kafka_ssl", + "type": "*kafka_json_map", + "run_delay": "-1", + "flags": ["*dryrun"], + "source_path": "%s", + "ees_success_ids": ["kafka_processed"], + "opts": { + "kafkaTopic": "%s", + "kafkaTLS": true, + "kafkaCAPath": "/tmp/ssl/kafka/ca.crt", + "kafkaSkipTLSVerify": false + }, + "fields": [ + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.ToR", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true} + ] + } + ] } -func testKafkaSSLLoadConfig(t *testing.T) { - var err error - kafkaSSLCfgPath = path.Join(*utils.DataDir, "conf", "samples", kafkaSSLConfigDir) - if kafkaSSLCfg, err = config.NewCGRConfigFromPath(kafkaSSLCfgPath); err != nil { - t.Error(err) +}`, brokerSSLURL, mainTopic, brokerPlainURL, processedTopic, brokerSSLURL, mainTopic) + + testEnv := TestEnvironment{ + ConfigJSON: content, } + client, _ := testEnv.Setup(t, 0) + + createKafkaTopics(t, brokerPlainURL, true, mainTopic, processedTopic) + + // export event to cgrates-cdrs topic, then the reader will consume it and + // export it to the 'processed' topic + t.Run("export kafka event", func(t *testing.T) { + var reply map[string]map[string]interface{} + if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, + &engine.CGREventWithEeIDs{ + EeIDs: []string{"kafka_ssl"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "KafkaEvent", + Event: map[string]interface{}{ + utils.ToR: utils.MetaVoice, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813748, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + }, + }, + }, &reply); err != nil { + t.Error(err) + } + }) + + // Check whether ERs managed to successfully consume the event from the + // 'cgrates-cdrs' topic and exported it to the 'processed' topic. + t.Run("verify kafka export", func(t *testing.T) { + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerPlainURL}, + Topic: "processed", + }) + + t.Cleanup(func() { + if err := r.Close(); err != nil { + t.Error("failed to close reader:", err) + } + }) + + m, err := r.ReadMessage(context.Background()) + if err != nil { + t.Fatalf("kafka.Reader.ReadMessage() failed unexpectedly: %v", err) + } + + got := string(m.Value) + want := `{"Account":"1001","Destination":"1002","OriginID":"abcdef","ToR":"*voice","Usage":"10000000000"}` + if got != want { + t.Errorf("kafka.Reader.ReadMessage() = %v, want %v", got, want) + } + }) } -func testKafkaSSLResetDataDB(t *testing.T) { - if err := engine.InitDataDb(kafkaSSLCfg); err != nil { - t.Fatal(err) - } -} - -func testKafkaSSLStartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(kafkaSSLCfgPath, *utils.WaitRater); err != nil { - t.Fatal(err) - } -} - -func testKafkaSSLRPCConn(t *testing.T) { - var err error - kafkaSSLRpc, err = newRPCClient(kafkaSSLCfg.ListenCfg()) +func createKafkaTopics(t *testing.T, brokerURL string, cleanup bool, topics ...string) { + t.Helper() + conn, err := kafka.Dial("tcp", brokerURL) if err != nil { t.Fatal(err) } -} + t.Cleanup(func() { conn.Close() }) -func testKafkaSSLExportEvent(t *testing.T) { - event := &engine.CGREventWithEeIDs{ - CGREvent: &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "KafkaEvent", - Event: map[string]interface{}{ - utils.ToR: utils.MetaVoice, - utils.OriginID: "abcdef", - utils.OriginHost: "192.168.1.1", - utils.RequestType: utils.MetaRated, - utils.Tenant: "cgrates.org", - utils.Category: "call", - utils.AccountField: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.SetupTime: time.Unix(1383813745, 0).UTC(), - utils.AnswerTime: time.Unix(1383813748, 0).UTC(), - utils.Usage: 10 * time.Second, - utils.RunID: utils.MetaDefault, - utils.Cost: 1.01, - }, - }, + controller, err := conn.Controller() + if err != nil { + t.Fatal(err) + } + controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + t.Fatal(err) + } + defer controllerConn.Close() + + topicConfigs := make([]kafka.TopicConfig, 0, len(topics)) + for _, topic := range topics { + topicConfigs = append(topicConfigs, kafka.TopicConfig{ + Topic: topic, + NumPartitions: 1, + ReplicationFactor: 1, + }) } - var reply map[string]map[string]interface{} - if err := kafkaSSLRpc.Call(context.Background(), utils.EeSv1ProcessEvent, event, &reply); err != nil { - t.Error(err) - } - time.Sleep(time.Second) -} - -func testKafkaSSLVerifyProcessedExport(t *testing.T) { - r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: "processed-topic", - // MinBytes: 10e3, // 10KB - // MaxBytes: 10e6, // 10MB - }) - - ctx, cancel := context.WithCancel(context.Background()) - var rcv string - for { - m, err := r.ReadMessage(ctx) - if err != nil { - break - } - rcv = string(m.Value) - cancel() + if err := controllerConn.CreateTopics(topicConfigs...); err != nil { + t.Fatal(err) } - exp := `{"Account":"1001","AnswerTime":"2013-11-07T08:42:28Z","Category":"call","Cost":1.01,"Destination":"1002","OriginHost":"192.168.1.1","OriginID":"abcdef","RequestType":"*rated","RunID":"*default","SetupTime":"2013-11-07T08:42:25Z","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice","Usage":10000000000}` - - if rcv != exp { - t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) - } - - if err := r.Close(); err != nil { - t.Fatal("failed to close reader:", err) + if cleanup { + t.Cleanup(func() { + if err := conn.DeleteTopics(topics...); err != nil { + t.Log(err) + } + }) } } -func testKafkaSSLStopEngine(t *testing.T) { - if err := engine.KillEngine(*utils.WaitRater); err != nil { - t.Error(err) - } -} +// Kafka broker has the following configuration: + +/* +/opt/kafka/config/kraft/server.properties +---------------------------------------- + +listeners=PLAINTEXT://:9092,CONTROLLER://:9093,SSL://:9094 +... +advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9094 +... +ssl.truststore.location=/tmp/ssl/kafka/kafka.truststore.jks +ssl.truststore.password=123456 +ssl.keystore.location=/tmp/ssl/kafka/kafka.keystore.jks +ssl.keystore.password=123456 +ssl.key.password=123456 +ssl.client.auth=none +*/ + +// Script to generate TLS keys and certificates: + +/* +#!/bin/bash + +mkdir -p /tmp/ssl/kafka + +# Generate CA key +openssl genpkey -algorithm RSA -out /tmp/ssl/kafka/ca.key + +# Generate CA certificate +openssl req -x509 -new -key /tmp/ssl/kafka/ca.key -days 3650 -out /tmp/ssl/kafka/ca.crt \ +-subj "/C=US/ST=California/L=San Francisco/O=MyOrg/CN=localhost/emailAddress=example@email.com" + +# Generate server key and CSR +openssl req -new -newkey rsa:4096 -nodes -keyout /tmp/ssl/kafka/server.key \ +-out /tmp/ssl/kafka/server.csr \ +-subj "/C=US/ST=California/L=San Francisco/O=MyOrg/CN=localhost/emailAddress=example@email.com" + +# Create SAN configuration file +echo "authorityKeyIdentifier=keyid,issuer +basicConstraints=CA:FALSE +keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment +subjectAltName = @alt_names + +[alt_names] +DNS.1=localhost +IP.1=127.0.0.1 +" > /tmp/ssl/kafka/san.cnf + +# Sign server certificate with CA +openssl x509 -req -in /tmp/ssl/kafka/server.csr -CA /tmp/ssl/kafka/ca.crt \ +-CAkey /tmp/ssl/kafka/ca.key -CAcreateserial -out /tmp/ssl/kafka/server.crt \ +-days 3650 -extfile /tmp/ssl/kafka/san.cnf + +# Convert server certificate and key to PKCS12 format +openssl pkcs12 -export \ + -in /tmp/ssl/kafka/server.crt \ + -inkey /tmp/ssl/kafka/server.key \ + -name kafka-broker \ + -out /tmp/ssl/kafka/kafka.p12 \ + -password pass:123456 + +# Import PKCS12 file into Java keystore +keytool -importkeystore \ + -srckeystore /tmp/ssl/kafka/kafka.p12 \ + -destkeystore /tmp/ssl/kafka/kafka.keystore.jks \ + -srcstoretype pkcs12 \ + -srcstorepass 123456 \ + -deststorepass 123456 \ + -noprompt + +# Create truststore and import CA certificate +keytool -keystore /tmp/ssl/kafka/kafka.truststore.jks -alias CARoot -import -file /tmp/ssl/kafka/ca.crt \ + -storepass 123456 \ + -noprompt +*/ diff --git a/general_tests/lib_test.go b/general_tests/lib_test.go index 2d796c5ea..ab00ed8c9 100644 --- a/general_tests/lib_test.go +++ b/general_tests/lib_test.go @@ -1,4 +1,4 @@ -//go:build integration || flaky +//go:build integration || flaky || kafka /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments