diff --git a/general_tests/kafka_ssl_it_test.go b/general_tests/kafka_ssl_it_test.go index 3cf39035f..6e26620f4 100644 --- a/general_tests/kafka_ssl_it_test.go +++ b/general_tests/kafka_ssl_it_test.go @@ -72,6 +72,7 @@ func TestKafkaSSL(t *testing.T) { "synchronous": true, "opts": { "kafkaTopic": "%s", + "kafkaBatchSize": 1, "kafkaTLS": true, "kafkaCAPath": "/tmp/ssl/kafka/ca.crt", "kafkaSkipTLSVerify": false @@ -84,8 +85,9 @@ func TestKafkaSSL(t *testing.T) { "export_path": "%s", "synchronous": true, "opts": { - "kafkaTopic": "%s" - }, + "kafkaTopic": "%s", + "kafkaBatchSize": 1 + }, "failed_posts_dir": "*none" } ] @@ -104,8 +106,9 @@ func TestKafkaSSL(t *testing.T) { "source_path": "%s", "ees_success_ids": ["kafka_processed"], "opts": { - "kafkaTopic": "%s", - "kafkaTLS": true, + "kafkaTopic": "%s", + "kafkaGroupID": "", + "kafkaTLS": true, "kafkaCAPath": "/tmp/ssl/kafka/ca.crt", "kafkaSkipTLSVerify": false }, @@ -167,23 +170,38 @@ func TestKafkaSSL(t *testing.T) { r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{brokerPlainURL}, Topic: "processed", + MaxWait: time.Millisecond, }) - t.Cleanup(func() { if err := r.Close(); err != nil { t.Error("failed to close reader:", err) } }) - m, err := r.ReadMessage(context.Background()) - if err != nil { - t.Fatalf("kafka.Reader.ReadMessage() failed unexpectedly: %v", err) - } - - got := string(m.Value) want := `{"Account":"1001","Destination":"1002","OriginID":"abcdef","ToR":"*voice","Usage":"10000000000"}` - if got != want { - t.Errorf("kafka.Reader.ReadMessage() = %v, want %v", got, want) + + readErr := make(chan error) + msg := make(chan string) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + m, err := r.FetchMessage(ctx) + if err != nil { + readErr <- err + return + } + msg <- string(m.Value) + }() + + select { + case err := <-readErr: + t.Errorf("kafka.Reader.ReadMessage() failed unexpectedly: %v", err) + case got := <-msg: + if got != want { + t.Errorf("kafka.Reader.ReadMessage() = %v, want %v", got, want) + } + case <-time.After(2 * time.Second): + t.Errorf("kafka.Reader.ReadMessage() took too long (>%s)", 2*time.Second) } }) }