mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Update kafka test to use the correct topic name
This commit is contained in:
committed by
Dan Christian Bogos
parent
0651f3cb8f
commit
ca52732430
@@ -64,7 +64,7 @@ func TestKafkaER(t *testing.T) {
|
||||
|
||||
topicConfigs := []kafka.TopicConfig{
|
||||
{
|
||||
Topic: "cgrates_cdrs",
|
||||
Topic: "cgrates",
|
||||
NumPartitions: 1,
|
||||
ReplicationFactor: 1,
|
||||
},
|
||||
@@ -76,25 +76,24 @@ func TestKafkaER(t *testing.T) {
|
||||
}
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJsonStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
"enabled": true, // starts the EventReader service: <true|false>
|
||||
"ers": {
|
||||
"enabled": true,
|
||||
"readers": [
|
||||
{
|
||||
"id": "kafka", // identifier of the EventReader profile
|
||||
"type": "*kafka_json_map", // reader type <*file_csv>
|
||||
"run_delay": "-1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
|
||||
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
|
||||
"source_path": "localhost:9092", // read data from this path
|
||||
// "processed_path": "/var/spool/cgrates/ers/out", // move processed data here
|
||||
"tenant": "cgrates.org", // tenant used by import
|
||||
"filters": [], // limit parsing based on the filters
|
||||
"flags": [], // flags to influence the event processing
|
||||
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
"id": "kafka",
|
||||
"type": "*kafka_json_map",
|
||||
"run_delay": "-1",
|
||||
"concurrent_requests": 1024,
|
||||
"source_path": "localhost:9092",
|
||||
"tenant": "cgrates.org",
|
||||
"filters": [],
|
||||
"flags": [],
|
||||
"fields":[
|
||||
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}`)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -156,21 +155,21 @@ func TestKafkaER(t *testing.T) {
|
||||
|
||||
// Delete kafka topic
|
||||
|
||||
partitions, err := conn.ReadPartitions("cgrates_cdrs")
|
||||
partitions, err := conn.ReadPartitions("cgrates")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(partitions) != 1 || partitions[0].Topic != "cgrates_cdrs" {
|
||||
t.Fatal("expected topic named cgrates_cdrs to exist")
|
||||
if len(partitions) != 1 || partitions[0].Topic != "cgrates" {
|
||||
t.Fatal("expected topic named cgrates to exist")
|
||||
}
|
||||
|
||||
if err := conn.DeleteTopics("cgrates_cdrs"); err != nil {
|
||||
if err := conn.DeleteTopics("cgrates"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
experr := `[5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes`
|
||||
_, err = conn.ReadPartitions("cgrates_cdrs")
|
||||
experr := `[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker`
|
||||
_, err = conn.ReadPartitions("cgrates")
|
||||
if err == nil || err.Error() != experr {
|
||||
t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user