diff --git a/apier/v1/cdre_kafka_it_test.go b/apier/v1/cdre_kafka_it_test.go index cbbdc8375..1e0123782 100644 --- a/apier/v1/cdre_kafka_it_test.go +++ b/apier/v1/cdre_kafka_it_test.go @@ -243,55 +243,37 @@ func testKafkaExportCDRs(t *testing.T) { } func testKafkaVerifyExport(t *testing.T) { - topic := "cgrates_cdrs" - partition := 0 - - conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) - if err != nil { - t.Fatal("failed to dial leader:", err) - } - - conn.SetReadDeadline(time.Now().Add(10 * time.Second)) - batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max - - b := make([]byte, 10e3) // 10KB max per message - var cdrs []string - msgChan := make(chan []string) - go func() { - var i int - for i < 2 { - n, err := batch.Read(b) - if err != nil { - break - } - cdrs = append(cdrs, string(b[:n])) - i++ - } - msgChan <- cdrs - }() + // make a new reader that consumes from cgrates_cdrs, partition 0 + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "cgrates_cdrs", + Partition: 0, + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }) expCDRs := []string{ `{"Account":"1001","CGRID":"Cdr2","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR2","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"5s"}`, `{"Account":"1001","CGRID":"Cdr3","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR3","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"30s"}`, } - select { - case rcvCDRs := <-msgChan: - sort.Strings(rcvCDRs) - if !reflect.DeepEqual(rcvCDRs, expCDRs) { - t.Errorf("expected: <%+v>, \nreceived: <%+v>", expCDRs, rcvCDRs) + rcvCDRs := make([]string, 2) + for i := 0; i < 2; i++ { + m, err := r.ReadMessage(context.Background()) + if err != nil { + t.Errorf("Failed to read message nr. %d: %s", i, err.Error()) + break } - case <-time.After(20 * time.Second): - t.Error("Timeout: Failed to consume the messages in due time") + rcvCDRs[i] = string(m.Value) } - if err := batch.Close(); err != nil { - t.Fatal("failed to close batch:", err) + sort.Strings(rcvCDRs) + if !reflect.DeepEqual(rcvCDRs, expCDRs) { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", expCDRs, rcvCDRs) } - if err := conn.Close(); err != nil { - t.Fatal("failed to close connection:", err) + if err := r.Close(); err != nil { + t.Fatal("failed to close reader:", err) } - } func testKafkaDeleteTopic(t *testing.T) {