From ca5273243029fd6a9b5ce81bbca3dbb1fe3d1d4a Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 17 Oct 2023 08:28:01 -0400 Subject: [PATCH] Update kafka test to use the correct topic name --- ers/kafka_it_test.go | 47 ++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index bcab9507d..d24b4c242 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -64,7 +64,7 @@ func TestKafkaER(t *testing.T) { topicConfigs := []kafka.TopicConfig{ { - Topic: "cgrates_cdrs", + Topic: "cgrates", NumPartitions: 1, ReplicationFactor: 1, }, @@ -76,25 +76,24 @@ func TestKafkaER(t *testing.T) { } cfg, err := config.NewCGRConfigFromJsonStringWithDefaults(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: +"ers": { + "enabled": true, "readers": [ { - "id": "kafka", // identifier of the EventReader profile - "type": "*kafka_json_map", // reader type <*file_csv> - "run_delay": "-1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together - "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited - "source_path": "localhost:9092", // read data from this path - // "processed_path": "/var/spool/cgrates/ers/out", // move processed data here - "tenant": "cgrates.org", // tenant used by import - "filters": [], // limit parsing based on the filters - "flags": [], // flags to influence the event processing - "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - }, - ], -}, + "id": "kafka", + "type": "*kafka_json_map", + "run_delay": "-1", + "concurrent_requests": 1024, + "source_path": "localhost:9092", + "tenant": "cgrates.org", + "filters": [], + "flags": [], + "fields":[ + {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"} + ] + } + ] +} }`) if err != nil { t.Fatal(err) @@ -156,21 +155,21 @@ func TestKafkaER(t *testing.T) { // Delete kafka topic - partitions, err := conn.ReadPartitions("cgrates_cdrs") + partitions, err := conn.ReadPartitions("cgrates") if err != nil { t.Fatal(err) } - if len(partitions) != 1 || partitions[0].Topic != "cgrates_cdrs" { - t.Fatal("expected topic named cgrates_cdrs to exist") + if len(partitions) != 1 || partitions[0].Topic != "cgrates" { + t.Fatal("expected topic named cgrates to exist") } - if err := conn.DeleteTopics("cgrates_cdrs"); err != nil { + if err := conn.DeleteTopics("cgrates"); err != nil { t.Fatal(err) } - experr := `[5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes` - _, err = conn.ReadPartitions("cgrates_cdrs") + experr := `[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker` + _, err = conn.ReadPartitions("cgrates") if err == nil || err.Error() != experr { t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err) }