diff --git a/data/conf/samples/kafka_ssl/cgrates.json b/data/conf/samples/kafka_ssl/cgrates.json deleted file mode 100644 index 00d973f72..000000000 --- a/data/conf/samples/kafka_ssl/cgrates.json +++ /dev/null @@ -1,84 +0,0 @@ -{ - - "logger": { - "type": "*syslog", - "level": 7 - }, - - "listen": { - "rpc_json": ":2012", - "rpc_gob": ":2013", - "http": ":2080" - }, - - "data_db": { - "db_type": "redis", - "db_port": 6379, - "db_name": "10" - }, - - "ees": { - "enabled": true, - "exporters": [ - { - "id": "*default", - "type": "*kafkaJSONMap", - "export_path": "localhost:9093", - "opts": { - "kafkaTopic": "ssl-topic", - "kafkaTLS": true, - "kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem", - "kafkaSkipTLSVerify": false - }, - "failed_posts_dir": "/var/spool/cgrates/failed_posts", - "efs_conns": ["*internal"] - } - ] - }, - - "efs": { - "enabled": true - }, - - "sessions": { - "enabled": true - }, - - "ers": { - "enabled": true, - "readers": [ - { - "id": "*default", - "type": "*kafkaJSONMap", - "run_delay": "-1", - "flags": ["*dryRun"], - "source_path": "localhost:9093", - "processed_path": "localhost:9092", - "opts": { - "kafkaTopic": "ssl-topic", - "kafkaTLS": true, - "kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem", - "kafkaSkipTLSVerify": false, - "kafkaTopicProcessed": "processed-topic", - "kafkaTLSProcessed": false, - "kafkaCAPathProcessed": "/home/kafka/kafka/ssl/ca.pem", - "kafkaSkipTLSVerifyProcessed": false - }, - "fields": [ - {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.ToR", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.Tenant", "mandatory": true}, - {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.Category", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true} - ] - } - ] - } - -} \ No newline at end of file diff --git a/engine/libtest.go b/engine/libtest.go index ca94b8b78..7a4d9522f 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -1,4 +1,4 @@ -//go:build integration || flaky || call || performance +//go:build integration || flaky || call || performance || kafka /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments diff --git a/general_tests/kafka_ssl_it_test.go b/general_tests/kafka_ssl_it_test.go index ef9aac4f2..223465962 100644 --- a/general_tests/kafka_ssl_it_test.go +++ b/general_tests/kafka_ssl_it_test.go @@ -1,5 +1,4 @@ //go:build kafka -// +build kafka /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments @@ -22,165 +21,331 @@ along with this program. If not, see package general_tests import ( - "path" + "bytes" + "fmt" + "net" + "strconv" + "sync" "testing" "time" - "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/segmentio/kafka-go" - "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -var ( - kafkaSSLConfigDir string - kafkaSSLCfgPath string - kafkaSSLCfg *config.CGRConfig - kafkaSSLRpc *birpc.Client - - sTestsKafkaSSL = []func(t *testing.T){ - testKafkaSSLLoadConfig, - testKafkaSSLFlushDBs, - - testKafkaSSLStartEngine, - testKafkaSSLRPCConn, - testKafkaSSLExportEvent, // exports event to ssl-topic, then the reader will consume said event and export it to processed-topic - testKafkaSSLVerifyProcessedExport, // checks whether ERs managed to successfully read and export the events served by Kafka server - testKafkaSSLStopEngine, - } -) - -// The test is exporting and reading from a kafka broker with the following configuration - -/* -listeners=PLAINTEXT://:9092,SSL://localhost:9093 -... -advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093 -... -ssl.truststore.location=/home/kafka/kafka/ssl/kafka.server.truststore.jks -ssl.truststore.password=123456 -ssl.keystore.type=PKCS12 -ssl.keystore.location=/home/kafka/kafka/ssl/kafka.server.keystore.p12 -ssl.keystore.password=123456 -ssl.key.password=123456 -ssl.client.auth=none -ssl.protocol=TLSv1.2 -security.inter.broker.protocol=SSL -*/ - -// How to create TLS keys and certificates: - -/* -1. Generate CA if needed (openssl req -new -x509 -keyout ca-key.pem -out ca.pem -days 365); -2. Add the generated CA to the brokers’ truststore; -3. Generate key-certificate pair using the CA from step 1 to sign it and convert the pem files to p12 format; -4. Import both the certificate of the CA and the signed certificate into the broker keystore. -*/ - +// TestKafkaSSL tests exporting to and reading from a kafka broker through an SSL connection. +// Steps to set up a local kafka server with SSL setup can be found at the bottom of the file. func TestKafkaSSL(t *testing.T) { - kafkaSSLConfigDir = "kafka_ssl" - for _, stest := range sTestsKafkaSSL { - t.Run(kafkaSSLConfigDir, stest) + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") } + + brokerPlainURL := "localhost:9092" + brokerSSLURL := "localhost:9094" + mainTopic := "cgrates-cdrs" + processedTopic := "processed" + + content := fmt.Sprintf(`{ +"data_db": { + "db_type": "*internal" +}, +"stor_db": { + "db_type": "*internal" +}, +"ees": { + "enabled": true, + // "cache": { + // "*kafkaJSONMap": { + // "limit": -1, + // "ttl": "5s", + // "precache": false + // } + // }, + "exporters": [{ + "id": "kafka_ssl", + "type": "*kafkaJSONMap", + "export_path": "%s", + "synchronous": true, + "opts": { + "kafkaTopic": "%s", + "kafkaBatchSize": 1, + "kafkaTLS": true, + "kafkaCAPath": "/tmp/ssl/kafka/ca.crt", + "kafkaSkipTLSVerify": false + }, + "failed_posts_dir": "*none" + }, + { + "id": "kafka_processed", + "type": "*kafkaJSONMap", + "export_path": "%s", + "synchronous": true, + "opts": { + "kafkaTopic": "%s", + "kafkaBatchSize": 1 + }, + "failed_posts_dir": "*none" + } + ] +}, +"ers": { + "enabled": true, + "sessions_conns": [], + "ees_conns": ["*internal"], + "readers": [{ + "id": "kafka_ssl", + "type": "*kafkaJSONMap", + "run_delay": "-1", + "flags": ["*dryRun"], + "source_path": "%s", + "ees_success_ids": ["kafka_processed"], + "opts": { + "kafkaTopic": "%s", + "kafkaGroupID": "", + "kafkaTLS": true, + "kafkaCAPath": "/tmp/ssl/kafka/ca.crt", + "kafkaSkipTLSVerify": false + }, + "fields": [{ + "tag": "Account", + "path": "*cgreq.Account", + "type": "*variable", + "value": "~*req.Account", + "mandatory": true + }, + { + "tag": "Destination", + "path": "*cgreq.Destination", + "type": "*variable", + "value": "~*req.Destination", + "mandatory": true + }, + { + "tag": "Usage", + "path": "*cgreq.Usage", + "type": "*variable", + "value": "~*req.Usage", + "mandatory": true + } + ] + }] +} +}`, brokerSSLURL, mainTopic, brokerPlainURL, processedTopic, brokerSSLURL, mainTopic) + + ng := engine.TestEnvironment{ + ConfigJSON: content, + Encoding: *utils.Encoding, + LogBuffer: &bytes.Buffer{}, + } + defer fmt.Println(ng.LogBuffer) + client, _ := ng.Setup(t, context.Background()) + + createKafkaTopics(t, brokerPlainURL, true, mainTopic, processedTopic) + + // export event to cgrates-cdrs topic, then the reader will consume it and + // export it to the 'processed' topic + t.Run("export kafka event", func(t *testing.T) { + n := 1 + var wg sync.WaitGroup + wg.Add(n) + + var reply map[string]map[string]interface{} + for range n { + go func() { + defer wg.Done() + if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, + &utils.CGREventWithEeIDs{ + EeIDs: []string{"kafka_ssl"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "KafkaEvent", + Event: map[string]interface{}{ + utils.AccountField: "1001", + utils.Destination: "1002", + utils.Usage: 20 * time.Second, + "IgnoredField1": "test", + "IgnoredField2": "test", + }, + }, + }, &reply); err != nil { + t.Error(err) + } + }() + } + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Errorf("timed out waiting for %s replies", utils.EeSv1ProcessEvent) + } + }) + + // Check whether ERs managed to successfully consume the event from the + // 'cgrates-cdrs' topic and exported it to the 'processed' topic. + t.Run("verify kafka export", func(t *testing.T) { + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerPlainURL}, + Topic: "processed", + MaxWait: time.Millisecond, + }) + t.Cleanup(func() { + if err := r.Close(); err != nil { + t.Error("failed to close reader:", err) + } + }) + + want := `{"Account":"1001","Destination":"1002","Usage":"20000000000"}` + + readErr := make(chan error) + msg := make(chan string) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + m, err := r.FetchMessage(ctx) + if err != nil { + readErr <- err + return + } + msg <- string(m.Value) + }() + + select { + case err := <-readErr: + t.Errorf("kafka.Reader.ReadMessage() failed unexpectedly: %v", err) + case got := <-msg: + if got != want { + t.Errorf("kafka.Reader.ReadMessage() = %v, want %v", got, want) + } + case <-time.After(2 * time.Second): + t.Errorf("kafka.Reader.ReadMessage() took too long (>%s)", 2*time.Second) + } + }) } -func testKafkaSSLLoadConfig(t *testing.T) { - var err error - kafkaSSLCfgPath = path.Join(*utils.DataDir, "conf", "samples", kafkaSSLConfigDir) - if kafkaSSLCfg, err = config.NewCGRConfigFromPath(context.Background(), kafkaSSLCfgPath); err != nil { - t.Error(err) - } -} - -func testKafkaSSLFlushDBs(t *testing.T) { - if err := engine.InitDataDB(kafkaSSLCfg); err != nil { - t.Fatal(err) - } - if err := engine.InitStorDB(kafkaSSLCfg); err != nil { - t.Fatal(err) - } -} - -func testKafkaSSLStartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(kafkaSSLCfgPath, *utils.WaitRater); err != nil { - t.Fatal(err) - } -} - -func testKafkaSSLRPCConn(t *testing.T) { - var err error - kafkaSSLRpc, err = engine.NewRPCClient(kafkaSSLCfg.ListenCfg(), *utils.Encoding) +func createKafkaTopics(t *testing.T, brokerURL string, cleanup bool, topics ...string) { + t.Helper() + conn, err := kafka.Dial("tcp", brokerURL) if err != nil { t.Fatal(err) } -} + t.Cleanup(func() { conn.Close() }) -func testKafkaSSLExportEvent(t *testing.T) { - event := &utils.CGREventWithEeIDs{ - CGREvent: &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "KafkaEvent", - Event: map[string]any{ - utils.ToR: utils.MetaVoice, - utils.OriginID: "abcdef", - utils.OriginHost: "192.168.1.1", - utils.RequestType: utils.MetaRated, - utils.Tenant: "cgrates.org", - utils.Category: "call", - utils.AccountField: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.SetupTime: time.Unix(1383813745, 0).UTC(), - utils.AnswerTime: time.Unix(1383813748, 0).UTC(), - utils.Usage: 10 * time.Second, - utils.RunID: utils.MetaDefault, - utils.Cost: 1.01, - }, - }, + controller, err := conn.Controller() + if err != nil { + t.Fatal(err) + } + controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + t.Fatal(err) + } + defer controllerConn.Close() + + topicConfigs := make([]kafka.TopicConfig, 0, len(topics)) + for _, topic := range topics { + topicConfigs = append(topicConfigs, kafka.TopicConfig{ + Topic: topic, + NumPartitions: 1, + ReplicationFactor: 1, + }) } - var reply map[string]map[string]any - if err := kafkaSSLRpc.Call(context.Background(), utils.EeSv1ProcessEvent, event, &reply); err != nil { - t.Error(err) - } - time.Sleep(time.Second) -} - -func testKafkaSSLVerifyProcessedExport(t *testing.T) { - r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: "processed-topic", - // MinBytes: 10e3, // 10KB - // MaxBytes: 10e6, // 10MB - }) - - ctx, cancel := context.WithCancel(context.Background()) - var rcv string - for { - m, err := r.ReadMessage(ctx) - if err != nil { - break - } - rcv = string(m.Value) - cancel() + if err := controllerConn.CreateTopics(topicConfigs...); err != nil { + t.Fatal(err) } - exp := `{"Account":"1001","AnswerTime":"2013-11-07T08:42:28Z","Category":"call","Cost":1.01,"Destination":"1002","OriginHost":"192.168.1.1","OriginID":"abcdef","RequestType":"*rated","RunID":"*default","SetupTime":"2013-11-07T08:42:25Z","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice","Usage":10000000000}` - - if rcv != exp { - t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) - } - - if err := r.Close(); err != nil { - t.Fatal("failed to close reader:", err) + if cleanup { + t.Cleanup(func() { + if err := conn.DeleteTopics(topics...); err != nil { + t.Log(err) + } + }) } } -func testKafkaSSLStopEngine(t *testing.T) { - if err := engine.KillEngine(*utils.WaitRater); err != nil { - t.Error(err) - } -} +// Kafka broker has the following configuration: + +/* +/opt/kafka/config/kraft/server.properties +---------------------------------------- + +listeners=PLAINTEXT://:9092,CONTROLLER://:9093,SSL://:9094 +... +advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9094 +... +ssl.truststore.location=/tmp/ssl/kafka/kafka.truststore.jks +ssl.truststore.password=123456 +ssl.keystore.location=/tmp/ssl/kafka/kafka.keystore.jks +ssl.keystore.password=123456 +ssl.key.password=123456 +ssl.client.auth=none +*/ + +// Script to generate TLS keys and certificates: + +/* +#!/bin/bash + +mkdir -p /tmp/ssl/kafka + +# Generate CA key +openssl genpkey -algorithm RSA -out /tmp/ssl/kafka/ca.key + +# Generate CA certificate +openssl req -x509 -new -key /tmp/ssl/kafka/ca.key -days 3650 -out /tmp/ssl/kafka/ca.crt \ +-subj "/C=US/ST=California/L=San Francisco/O=MyOrg/CN=localhost/emailAddress=example@email.com" + +# Generate server key and CSR +openssl req -new -newkey rsa:4096 -nodes -keyout /tmp/ssl/kafka/server.key \ +-out /tmp/ssl/kafka/server.csr \ +-subj "/C=US/ST=California/L=San Francisco/O=MyOrg/CN=localhost/emailAddress=example@email.com" + +# Create SAN configuration file +echo "authorityKeyIdentifier=keyid,issuer +basicConstraints=CA:FALSE +keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment +subjectAltName = @alt_names + +[alt_names] +DNS.1=localhost +IP.1=127.0.0.1 +" > /tmp/ssl/kafka/san.cnf + +# Sign server certificate with CA +openssl x509 -req -in /tmp/ssl/kafka/server.csr -CA /tmp/ssl/kafka/ca.crt \ +-CAkey /tmp/ssl/kafka/ca.key -CAcreateserial -out /tmp/ssl/kafka/server.crt \ +-days 3650 -extfile /tmp/ssl/kafka/san.cnf + +# Convert server certificate and key to PKCS12 format +openssl pkcs12 -export \ + -in /tmp/ssl/kafka/server.crt \ + -inkey /tmp/ssl/kafka/server.key \ + -name kafka-broker \ + -out /tmp/ssl/kafka/kafka.p12 \ + -password pass:123456 + +# Import PKCS12 file into Java keystore +keytool -importkeystore \ + -srckeystore /tmp/ssl/kafka/kafka.p12 \ + -destkeystore /tmp/ssl/kafka/kafka.keystore.jks \ + -srcstoretype pkcs12 \ + -srcstorepass 123456 \ + -deststorepass 123456 \ + -noprompt + +# Create truststore and import CA certificate +keytool -keystore /tmp/ssl/kafka/kafka.truststore.jks -alias CARoot -import -file /tmp/ssl/kafka/ca.crt \ + -storepass 123456 \ + -noprompt +*/ diff --git a/utils/testflag.go b/utils/testflag.go index a0728e1c4..d1a82e883 100644 --- a/utils/testflag.go +++ b/utils/testflag.go @@ -1,4 +1,4 @@ -//go:build integration || flaky || call || performance +//go:build integration || flaky || call || performance || kafka /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments