Handle kafka topic creation/deletion within test

This commit is contained in:
ionutboangiu
2023-10-17 09:12:56 -04:00
committed by Dan Christian Bogos
parent 5ce0e8dfa5
commit 37ac937f97

View File

@@ -23,7 +23,9 @@ package ers
import (
"fmt"
"net"
"reflect"
"strconv"
"testing"
"time"
@@ -43,27 +45,57 @@ var (
)
func TestKafkaER(t *testing.T) {
// Create kafka topic
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
t.Fatal(err)
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
t.Fatal(err)
}
controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
t.Fatal(err)
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: utils.KafkaDefaultTopic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
t.Fatal(err)
}
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
"enabled": true, // starts the EventReader service: <true|false>
"ers": {
"enabled": true,
"sessions_conns":["*localhost"],
"readers": [
{
"id": "kafka", // identifier of the EventReader profile
"type": "*kafka_json_map", // reader type <*file_csv>
"run_delay": "-1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "localhost:9092", // read data from this path
// "processed_path": "/var/spool/cgrates/ers/out", // move processed data here
"tenant": "cgrates.org", // tenant used by import
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
],
},
],
},
"id": "kafka",
"type": "*kafka_json_map",
"run_delay": "-1",
"concurrent_requests": 1024,
"source_path": "localhost:9092",
"tenant": "cgrates.org",
"filters": [],
"flags": [],
"fields":[
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}
]
}
]
}
}`)
if err != nil {
t.Fatal(err)
@@ -79,29 +111,35 @@ func TestKafkaER(t *testing.T) {
rdrErr, new(engine.FilterS), rdrExit); err != nil {
t.Fatal(err)
}
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: utils.KafkaDefaultTopic,
WriteTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second,
})
randomCGRID := utils.UUIDSha1Prefix()
w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte(randomCGRID), // for the momment we do not proccess the key
Value: []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)),
},
)
w.Close()
rdr.Serve()
randomCGRID := utils.UUIDSha1Prefix()
go func(key string) {
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: utils.KafkaDefaultTopic,
}
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte(randomCGRID), // for the moment we do not process the key
Value: []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)),
},
)
if err != nil {
t.Error("failed to write messages:", err)
}
err = w.Close()
if err != nil {
t.Error("failed to close writer:", err)
}
}(randomCGRID)
select {
case err = <-rdrErr:
t.Error(err)
case ev := <-rdrEvents:
if ev.rdrCfg.ID != "kafka" {
t.Errorf("Expected 'kakfa' received `%s`", ev.rdrCfg.ID)
t.Errorf("expected %s, received %s", "kafka", ev.rdrCfg.ID)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
@@ -119,4 +157,24 @@ func TestKafkaER(t *testing.T) {
t.Fatal("Timeout")
}
close(rdrExit)
// Delete kafka topic
partitions, err := conn.ReadPartitions(utils.KafkaDefaultTopic)
if err != nil {
t.Fatal(err)
}
if len(partitions) != 1 || partitions[0].Topic != utils.KafkaDefaultTopic {
t.Fatal("expected topic named cgrates to exist")
}
if err := conn.DeleteTopics(utils.KafkaDefaultTopic); err != nil {
t.Fatal(err)
}
experr := `[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker`
_, err = conn.ReadPartitions(utils.KafkaDefaultTopic)
if err == nil || err.Error() != experr {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err)
}
}