diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go
index 06974122e..d92ccfc58 100644
--- a/ers/kafka_it_test.go
+++ b/ers/kafka_it_test.go
@@ -22,12 +22,14 @@ along with this program. If not, see
package ers
import (
- "context"
"fmt"
+ "net"
"reflect"
+ "strconv"
"testing"
"time"
+ "github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -42,27 +44,57 @@ var (
)
func TestKafkaER(t *testing.T) {
+
+ // Create kafka topic
+ conn, err := kafka.Dial("tcp", "localhost:9092")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer conn.Close()
+
+ controller, err := conn.Controller()
+ if err != nil {
+ t.Fatal(err)
+ }
+ controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer controllerConn.Close()
+
+ topicConfigs := []kafka.TopicConfig{
+ {
+ Topic: utils.KafkaDefaultTopic,
+ NumPartitions: 1,
+ ReplicationFactor: 1,
+ },
+ }
+
+ err = controllerConn.CreateTopics(topicConfigs...)
+ if err != nil {
+ t.Fatal(err)
+ }
+
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
-"ers": { // EventReaderService
- "enabled": true, // starts the EventReader service:
+"ers": {
+ "enabled": true,
"sessions_conns":["*localhost"],
"readers": [
{
- "id": "kafka", // identifier of the EventReader profile
- "type": "*kafkaJSONMap", // reader type <*fileCSV>
- "run_delay": "-1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
- "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
- "source_path": "localhost:9092", // read data from this path
- // "processed_path": "/var/spool/cgrates/ers/out", // move processed data here
- "tenant": "cgrates.org", // tenant used by import
- "filters": [], // limit parsing based on the filters
- "flags": [], // flags to influence the event processing
- "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
- {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"},
- ],
- },
- ],
-},
+ "id": "kafka",
+ "type": "*kafkaJSONMap",
+ "run_delay": "-1",
+ "concurrent_requests": 1024,
+ "source_path": "localhost:9092",
+ "tenant": "cgrates.org",
+ "filters": [],
+ "flags": [],
+ "fields":[
+ {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}
+ ]
+ }
+ ]
+}
}`)
if err != nil {
t.Fatal(err)
@@ -78,29 +110,35 @@ func TestKafkaER(t *testing.T) {
rdrErr, new(engine.FilterS), rdrExit, nil); err != nil {
t.Fatal(err)
}
- w := kafka.NewWriter(kafka.WriterConfig{
- Brokers: []string{"localhost:9092"},
- Topic: utils.KafkaDefaultTopic,
- WriteTimeout: 5 * time.Second,
- ReadTimeout: 5 * time.Second,
- })
- randomOriginID := utils.UUIDSha1Prefix()
- w.WriteMessages(context.Background(),
- kafka.Message{
- Key: []byte(randomOriginID), // for the momment we do not proccess the key
- Value: []byte(fmt.Sprintf(`{"OriginID": "%s"}`, randomOriginID)),
- },
- )
-
- w.Close()
rdr.Serve()
+ randomOriginID := utils.UUIDSha1Prefix()
+ go func(key string) {
+ w := kafka.Writer{
+ Addr: kafka.TCP("localhost:9092"),
+ Topic: utils.KafkaDefaultTopic,
+ }
+ err := w.WriteMessages(context.Background(),
+ kafka.Message{
+ Key: []byte(randomOriginID), // for the moment we do not process the key
+ Value: []byte(fmt.Sprintf(`{"OriginID": "%s"}`, randomOriginID)),
+ },
+ )
+ if err != nil {
+ t.Error("failed to write messages:", err)
+ }
+ err = w.Close()
+ if err != nil {
+ t.Error("failed to close writer:", err)
+ }
+ }(randomOriginID)
+
select {
case err = <-rdrErr:
t.Error(err)
case ev := <-rdrEvents:
if ev.rdrCfg.ID != "kafka" {
- t.Errorf("Expected 'kakfa' received `%s`", ev.rdrCfg.ID)
+ t.Errorf("expected %s, received %s", "kafka", ev.rdrCfg.ID)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
@@ -113,8 +151,28 @@ func TestKafkaER(t *testing.T) {
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
}
- case <-time.After(30 * time.Second):
+ case <-time.After(10 * time.Second):
t.Fatal("Timeout")
}
close(rdrExit)
+
+ // Delete kafka topic
+ partitions, err := conn.ReadPartitions(utils.KafkaDefaultTopic)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if len(partitions) != 1 || partitions[0].Topic != utils.KafkaDefaultTopic {
+ t.Fatal("expected topic named cgrates to exist")
+ }
+
+ if err := conn.DeleteTopics(utils.KafkaDefaultTopic); err != nil {
+ t.Fatal(err)
+ }
+
+ experr := `[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker`
+ _, err = conn.ReadPartitions(utils.KafkaDefaultTopic)
+ if err == nil || err.Error() != experr {
+ t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err)
+ }
}