mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Optimize Kafka SSL integration test
- ERs sometimes took too long to receive a message. Setting kafkaGroupID to "" prevents this. - Kafka reader took 10s to close (default MaxWait). Set MaxWait to 1ms. - Exporters took 1s each to export due to BatchSize not being hit. Set BatchSize to 1 to prevent it.
This commit is contained in:
committed by
Dan Christian Bogos
parent
11b96de00a
commit
22cbaa58c7
@@ -72,6 +72,7 @@ func TestKafkaSSL(t *testing.T) {
|
||||
"synchronous": true,
|
||||
"opts": {
|
||||
"kafkaTopic": "%s",
|
||||
"kafkaBatchSize": 1,
|
||||
"kafkaTLS": true,
|
||||
"kafkaCAPath": "/tmp/ssl/kafka/ca.crt",
|
||||
"kafkaSkipTLSVerify": false
|
||||
@@ -84,8 +85,9 @@ func TestKafkaSSL(t *testing.T) {
|
||||
"export_path": "%s",
|
||||
"synchronous": true,
|
||||
"opts": {
|
||||
"kafkaTopic": "%s"
|
||||
},
|
||||
"kafkaTopic": "%s",
|
||||
"kafkaBatchSize": 1
|
||||
},
|
||||
"failed_posts_dir": "*none"
|
||||
}
|
||||
]
|
||||
@@ -104,8 +106,9 @@ func TestKafkaSSL(t *testing.T) {
|
||||
"source_path": "%s",
|
||||
"ees_success_ids": ["kafka_processed"],
|
||||
"opts": {
|
||||
"kafkaTopic": "%s",
|
||||
"kafkaTLS": true,
|
||||
"kafkaTopic": "%s",
|
||||
"kafkaGroupID": "",
|
||||
"kafkaTLS": true,
|
||||
"kafkaCAPath": "/tmp/ssl/kafka/ca.crt",
|
||||
"kafkaSkipTLSVerify": false
|
||||
},
|
||||
@@ -167,23 +170,38 @@ func TestKafkaSSL(t *testing.T) {
|
||||
r := kafka.NewReader(kafka.ReaderConfig{
|
||||
Brokers: []string{brokerPlainURL},
|
||||
Topic: "processed",
|
||||
MaxWait: time.Millisecond,
|
||||
})
|
||||
|
||||
t.Cleanup(func() {
|
||||
if err := r.Close(); err != nil {
|
||||
t.Error("failed to close reader:", err)
|
||||
}
|
||||
})
|
||||
|
||||
m, err := r.ReadMessage(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("kafka.Reader.ReadMessage() failed unexpectedly: %v", err)
|
||||
}
|
||||
|
||||
got := string(m.Value)
|
||||
want := `{"Account":"1001","Destination":"1002","OriginID":"abcdef","ToR":"*voice","Usage":"10000000000"}`
|
||||
if got != want {
|
||||
t.Errorf("kafka.Reader.ReadMessage() = %v, want %v", got, want)
|
||||
|
||||
readErr := make(chan error)
|
||||
msg := make(chan string)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go func() {
|
||||
m, err := r.FetchMessage(ctx)
|
||||
if err != nil {
|
||||
readErr <- err
|
||||
return
|
||||
}
|
||||
msg <- string(m.Value)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-readErr:
|
||||
t.Errorf("kafka.Reader.ReadMessage() failed unexpectedly: %v", err)
|
||||
case got := <-msg:
|
||||
if got != want {
|
||||
t.Errorf("kafka.Reader.ReadMessage() = %v, want %v", got, want)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Errorf("kafka.Reader.ReadMessage() took too long (>%s)", 2*time.Second)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user