mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-21 07:08:45 +05:00
Handle kafka topic creation/deletion within test
This commit is contained in:
committed by
Dan Christian Bogos
parent
5f079fbca8
commit
2869728de0
@@ -22,12 +22,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package ers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -42,27 +44,57 @@ var (
|
||||
)
|
||||
|
||||
func TestKafkaER(t *testing.T) {
|
||||
|
||||
// Create kafka topic
|
||||
conn, err := kafka.Dial("tcp", "localhost:9092")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
controller, err := conn.Controller()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer controllerConn.Close()
|
||||
|
||||
topicConfigs := []kafka.TopicConfig{
|
||||
{
|
||||
Topic: utils.KafkaDefaultTopic,
|
||||
NumPartitions: 1,
|
||||
ReplicationFactor: 1,
|
||||
},
|
||||
}
|
||||
|
||||
err = controllerConn.CreateTopics(topicConfigs...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
"enabled": true, // starts the EventReader service: <true|false>
|
||||
"ers": {
|
||||
"enabled": true,
|
||||
"sessions_conns":["*localhost"],
|
||||
"readers": [
|
||||
{
|
||||
"id": "kafka", // identifier of the EventReader profile
|
||||
"type": "*kafkaJSONMap", // reader type <*fileCSV>
|
||||
"run_delay": "-1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
|
||||
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
|
||||
"source_path": "localhost:9092", // read data from this path
|
||||
// "processed_path": "/var/spool/cgrates/ers/out", // move processed data here
|
||||
"tenant": "cgrates.org", // tenant used by import
|
||||
"filters": [], // limit parsing based on the filters
|
||||
"flags": [], // flags to influence the event processing
|
||||
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
"id": "kafka",
|
||||
"type": "*kafkaJSONMap",
|
||||
"run_delay": "-1",
|
||||
"concurrent_requests": 1024,
|
||||
"source_path": "localhost:9092",
|
||||
"tenant": "cgrates.org",
|
||||
"filters": [],
|
||||
"flags": [],
|
||||
"fields":[
|
||||
{"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}`)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -78,29 +110,35 @@ func TestKafkaER(t *testing.T) {
|
||||
rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w := kafka.NewWriter(kafka.WriterConfig{
|
||||
Brokers: []string{"localhost:9092"},
|
||||
Topic: utils.KafkaDefaultTopic,
|
||||
WriteTimeout: 5 * time.Second,
|
||||
ReadTimeout: 5 * time.Second,
|
||||
})
|
||||
randomOriginID := utils.UUIDSha1Prefix()
|
||||
w.WriteMessages(context.Background(),
|
||||
kafka.Message{
|
||||
Key: []byte(randomOriginID), // for the momment we do not proccess the key
|
||||
Value: []byte(fmt.Sprintf(`{"OriginID": "%s"}`, randomOriginID)),
|
||||
},
|
||||
)
|
||||
|
||||
w.Close()
|
||||
rdr.Serve()
|
||||
|
||||
randomOriginID := utils.UUIDSha1Prefix()
|
||||
go func(key string) {
|
||||
w := kafka.Writer{
|
||||
Addr: kafka.TCP("localhost:9092"),
|
||||
Topic: utils.KafkaDefaultTopic,
|
||||
}
|
||||
err := w.WriteMessages(context.Background(),
|
||||
kafka.Message{
|
||||
Key: []byte(randomOriginID), // for the moment we do not process the key
|
||||
Value: []byte(fmt.Sprintf(`{"OriginID": "%s"}`, randomOriginID)),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Error("failed to write messages:", err)
|
||||
}
|
||||
err = w.Close()
|
||||
if err != nil {
|
||||
t.Error("failed to close writer:", err)
|
||||
}
|
||||
}(randomOriginID)
|
||||
|
||||
select {
|
||||
case err = <-rdrErr:
|
||||
t.Error(err)
|
||||
case ev := <-rdrEvents:
|
||||
if ev.rdrCfg.ID != "kafka" {
|
||||
t.Errorf("Expected 'kakfa' received `%s`", ev.rdrCfg.ID)
|
||||
t.Errorf("expected %s, received %s", "kafka", ev.rdrCfg.ID)
|
||||
}
|
||||
expected := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
@@ -113,8 +151,28 @@ func TestKafkaER(t *testing.T) {
|
||||
if !reflect.DeepEqual(ev.cgrEvent, expected) {
|
||||
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
|
||||
}
|
||||
case <-time.After(30 * time.Second):
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Timeout")
|
||||
}
|
||||
close(rdrExit)
|
||||
|
||||
// Delete kafka topic
|
||||
partitions, err := conn.ReadPartitions(utils.KafkaDefaultTopic)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(partitions) != 1 || partitions[0].Topic != utils.KafkaDefaultTopic {
|
||||
t.Fatal("expected topic named cgrates to exist")
|
||||
}
|
||||
|
||||
if err := conn.DeleteTopics(utils.KafkaDefaultTopic); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
experr := `[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker`
|
||||
_, err = conn.ReadPartitions(utils.KafkaDefaultTopic)
|
||||
if err == nil || err.Error() != experr {
|
||||
t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user