Revise kafka poster integration test

When verifying whether the export was successful, read message by message instead of reading in batches.
This commit is contained in:
ionutboangiu
2023-03-19 12:52:45 -04:00
committed by Dan Christian Bogos
parent e822a5ae63
commit ee7a0d131a

View File

@@ -243,55 +243,37 @@ func testKafkaExportCDRs(t *testing.T) {
}
func testKafkaVerifyExport(t *testing.T) {
topic := "cgrates_cdrs"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
t.Fatal("failed to dial leader:", err)
}
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
var cdrs []string
msgChan := make(chan []string)
go func() {
var i int
for i < 2 {
n, err := batch.Read(b)
if err != nil {
break
}
cdrs = append(cdrs, string(b[:n]))
i++
}
msgChan <- cdrs
}()
// make a new reader that consumes from cgrates_cdrs, partition 0
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "cgrates_cdrs",
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
expCDRs := []string{
`{"Account":"1001","CGRID":"Cdr2","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR2","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"5s"}`,
`{"Account":"1001","CGRID":"Cdr3","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR3","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"30s"}`,
}
select {
case rcvCDRs := <-msgChan:
sort.Strings(rcvCDRs)
if !reflect.DeepEqual(rcvCDRs, expCDRs) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", expCDRs, rcvCDRs)
rcvCDRs := make([]string, 2)
for i := 0; i < 2; i++ {
m, err := r.ReadMessage(context.Background())
if err != nil {
t.Errorf("Failed to read message nr. %d: %s", i, err.Error())
break
}
case <-time.After(20 * time.Second):
t.Error("Timeout: Failed to consume the messages in due time")
rcvCDRs[i] = string(m.Value)
}
if err := batch.Close(); err != nil {
t.Fatal("failed to close batch:", err)
sort.Strings(rcvCDRs)
if !reflect.DeepEqual(rcvCDRs, expCDRs) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", expCDRs, rcvCDRs)
}
if err := conn.Close(); err != nil {
t.Fatal("failed to close connection:", err)
if err := r.Close(); err != nil {
t.Fatal("failed to close reader:", err)
}
}
func testKafkaDeleteTopic(t *testing.T) {