mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 05:39:54 +05:00
Revise kafka ers test
The Serve call was moved before the export attempt. Revised deprecated kafka writer creation and made sure to handle errors.
This commit is contained in:
committed by
Dan Christian Bogos
parent
60929112fa
commit
f3fc6d050f
@@ -75,21 +75,29 @@ func TestKafkaER(t *testing.T) {
|
||||
rdrErr, new(engine.FilterS), rdrExit); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w := kafka.NewWriter(kafka.WriterConfig{
|
||||
Brokers: []string{"localhost:9092"},
|
||||
Topic: defaultTopic,
|
||||
})
|
||||
randomCGRID := utils.UUIDSha1Prefix()
|
||||
w.WriteMessages(context.Background(),
|
||||
kafka.Message{
|
||||
Key: []byte(randomCGRID), // for the moment we do not process the key
|
||||
Value: []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)),
|
||||
},
|
||||
)
|
||||
|
||||
w.Close()
|
||||
kfk.Serve()
|
||||
|
||||
randomCGRID := utils.UUIDSha1Prefix()
|
||||
go func(key string) {
|
||||
w := kafka.Writer{
|
||||
Addr: kafka.TCP("localhost:9092"),
|
||||
Topic: defaultTopic,
|
||||
}
|
||||
err := w.WriteMessages(context.Background(),
|
||||
kafka.Message{
|
||||
Key: []byte(randomCGRID), // for the moment we do not process the key
|
||||
Value: []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Error("failed to write messages:", err)
|
||||
}
|
||||
err = w.Close()
|
||||
if err != nil {
|
||||
t.Error("failed to close writer:", err)
|
||||
}
|
||||
}(randomCGRID)
|
||||
|
||||
select {
|
||||
case err = <-rdrErr:
|
||||
t.Error(err)
|
||||
|
||||
Reference in New Issue
Block a user