From 37ac937f974978f5e4f8f9cceb5988c2565451d1 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 17 Oct 2023 09:12:56 -0400 Subject: [PATCH] Handle kafka topic creation/deletion within test --- ers/kafka_it_test.go | 124 +++++++++++++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 33 deletions(-) diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index a867263c3..aa7326bc7 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -23,7 +23,9 @@ package ers import ( "fmt" + "net" "reflect" + "strconv" "testing" "time" @@ -43,27 +45,57 @@ var ( ) func TestKafkaER(t *testing.T) { + + // Create kafka topic + conn, err := kafka.Dial("tcp", "localhost:9092") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + controller, err := conn.Controller() + if err != nil { + t.Fatal(err) + } + controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + t.Fatal(err) + } + defer controllerConn.Close() + + topicConfigs := []kafka.TopicConfig{ + { + Topic: utils.KafkaDefaultTopic, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + + err = controllerConn.CreateTopics(topicConfigs...) + if err != nil { + t.Fatal(err) + } + cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: +"ers": { + "enabled": true, "sessions_conns":["*localhost"], "readers": [ { - "id": "kafka", // identifier of the EventReader profile - "type": "*kafka_json_map", // reader type <*file_csv> - "run_delay": "-1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together - "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited - "source_path": "localhost:9092", // read data from this path - // "processed_path": "/var/spool/cgrates/ers/out", // move processed data here - "tenant": "cgrates.org", // tenant used by import - "filters": [], // limit parsing based on the filters - "flags": [], // flags to influence the event processing - "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - }, - ], -}, + "id": "kafka", + "type": "*kafka_json_map", + "run_delay": "-1", + "concurrent_requests": 1024, + "source_path": "localhost:9092", + "tenant": "cgrates.org", + "filters": [], + "flags": [], + "fields":[ + {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"} + ] + } + ] +} }`) if err != nil { t.Fatal(err) @@ -79,29 +111,35 @@ func TestKafkaER(t *testing.T) { rdrErr, new(engine.FilterS), rdrExit); err != nil { t.Fatal(err) } - w := kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{"localhost:9092"}, - Topic: utils.KafkaDefaultTopic, - WriteTimeout: 5 * time.Second, - ReadTimeout: 5 * time.Second, - }) - randomCGRID := utils.UUIDSha1Prefix() - w.WriteMessages(context.Background(), - kafka.Message{ - Key: []byte(randomCGRID), // for the momment we do not proccess the key - Value: []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)), - }, - ) - - w.Close() rdr.Serve() + randomCGRID := utils.UUIDSha1Prefix() + go func(key string) { + w := kafka.Writer{ + Addr: kafka.TCP("localhost:9092"), + Topic: utils.KafkaDefaultTopic, + } + err := w.WriteMessages(context.Background(), + kafka.Message{ + Key: []byte(randomCGRID), // for the moment we do not process the key + Value: []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)), + }, + ) + if err != nil { + t.Error("failed to write messages:", err) + } + err = w.Close() + if err != nil { + t.Error("failed to close writer:", err) + } + }(randomCGRID) + select { case err = <-rdrErr: t.Error(err) case ev := <-rdrEvents: if ev.rdrCfg.ID != "kafka" { - t.Errorf("Expected 'kakfa' received `%s`", ev.rdrCfg.ID) + t.Errorf("expected %s, received %s", "kafka", ev.rdrCfg.ID) } expected := &utils.CGREvent{ Tenant: "cgrates.org", @@ -119,4 +157,24 @@ func TestKafkaER(t *testing.T) { t.Fatal("Timeout") } close(rdrExit) + + // Delete kafka topic + partitions, err := conn.ReadPartitions(utils.KafkaDefaultTopic) + if err != nil { + t.Fatal(err) + } + + if len(partitions) != 1 || partitions[0].Topic != utils.KafkaDefaultTopic { + t.Fatal("expected topic named cgrates to exist") + } + + if err := conn.DeleteTopics(utils.KafkaDefaultTopic); err != nil { + t.Fatal(err) + } + + experr := `[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker` + _, err = conn.ReadPartitions(utils.KafkaDefaultTopic) + if err == nil || err.Error() != experr { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err) + } }