From f3fc6d050f0d4c9da70bda1378b5978e6d43a796 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Fri, 11 Aug 2023 10:38:49 -0400 Subject: [PATCH] Revise kafka ers test The Serve call was moved before the export attempt. Revised deprecated kafka writer creation and made sure to handle errors. --- ers/kafka_it_test.go | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index d2067114b..fb9366cab 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -75,21 +75,29 @@ func TestKafkaER(t *testing.T) { rdrErr, new(engine.FilterS), rdrExit); err != nil { t.Fatal(err) } - w := kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{"localhost:9092"}, - Topic: defaultTopic, - }) - randomCGRID := utils.UUIDSha1Prefix() - w.WriteMessages(context.Background(), - kafka.Message{ - Key: []byte(randomCGRID), // for the moment we do not process the key - Value: []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)), - }, - ) - - w.Close() kfk.Serve() + randomCGRID := utils.UUIDSha1Prefix() + go func(key string) { + w := kafka.Writer{ + Addr: kafka.TCP("localhost:9092"), + Topic: defaultTopic, + } + err := w.WriteMessages(context.Background(), + kafka.Message{ + Key: []byte(randomCGRID), // for the moment we do not process the key + Value: []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)), + }, + ) + if err != nil { + t.Error("failed to write messages:", err) + } + err = w.Close() + if err != nil { + t.Error("failed to close writer:", err) + } + }(randomCGRID) + select { case err = <-rdrErr: t.Error(err)