mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated Kafka reader
This commit is contained in:
committed by
Dan Christian Bogos
parent
41cdcacff5
commit
c6f41d7003
@@ -50,4 +50,3 @@ func TestCmdTriggerRemove(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -114,7 +114,7 @@
|
||||
"topic": "cgrates_cdrs",
|
||||
},
|
||||
"tenant": "cgrates.org",
|
||||
"attempts": 1,
|
||||
"attempts": 10,
|
||||
"fields":[
|
||||
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"},
|
||||
],
|
||||
|
||||
@@ -119,7 +119,7 @@
|
||||
"topic": "cgrates_cdrs",
|
||||
},
|
||||
"tenant": "cgrates.org",
|
||||
"attempts": 1,
|
||||
"attempts": 10,
|
||||
"fields":[
|
||||
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"},
|
||||
],
|
||||
|
||||
@@ -115,7 +115,7 @@
|
||||
"topic": "cgrates_cdrs",
|
||||
},
|
||||
"tenant": "cgrates.org",
|
||||
"attempts": 1,
|
||||
"attempts": 10,
|
||||
"fields":[
|
||||
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"},
|
||||
],
|
||||
|
||||
@@ -110,7 +110,7 @@ func TestKafkaER(t *testing.T) {
|
||||
if !reflect.DeepEqual(ev.cgrEvent, expected) {
|
||||
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
case <-time.After(30 * time.Second):
|
||||
t.Fatal("Timeout")
|
||||
}
|
||||
close(rdrExit)
|
||||
|
||||
@@ -314,7 +314,7 @@ func testCDRsExpKafka(t *testing.T) {
|
||||
|
||||
defer reader.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
var m kafka.Message
|
||||
var err error
|
||||
if m, err = reader.ReadMessage(ctx); err != nil {
|
||||
|
||||
@@ -491,7 +491,7 @@ func testCDRsOnExpKafkaPosterFileFailover(t *testing.T) {
|
||||
defer reader.Close()
|
||||
|
||||
for i := 0; i < 2; i++ { // no raw CDR
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
if m, err := reader.ReadMessage(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !reflect.DeepEqual(failoverContent[0], m.Value) && !reflect.DeepEqual(failoverContent[1], m.Value) { // Checking just the prefix should do since some content is dynamic
|
||||
|
||||
Reference in New Issue
Block a user