From 12ed87a5e1ef9f36c3584cb45740938585d0ce50 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 21 Feb 2023 10:19:26 -0500 Subject: [PATCH] Add integration tests for kafka exporter --- data/conf/samples/ees/cgrates.json | 9 ++ ees/kafka_it_test.go | 223 +++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+) create mode 100644 ees/kafka_it_test.go diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 59a8d1291..fd6d03128 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -441,6 +441,15 @@ "natsSubject": "processed_cdrs", } }, + { + "id": "KafkaExporter", + "type": "*kafka_json_map", + "export_path": "localhost:9092", + "opts": { + "kafkaTopic": "cgrates", + // "kafkaCAPath": "/home/kafka/kafka/ssl/ca-cert.pem" + } + }, { "id": "AMQPExporter", "type": "*amqp_json_map", diff --git a/ees/kafka_it_test.go b/ees/kafka_it_test.go new file mode 100644 index 000000000..51d5e9f09 --- /dev/null +++ b/ees/kafka_it_test.go @@ -0,0 +1,223 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "context" + "net" + "net/rpc" + "path" + "strconv" + "testing" + "time" + + kafka "github.com/segmentio/kafka-go" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + kafkaConfigDir string + kafkaCfgPath string + kafkaCfg *config.CGRConfig + kafkaRpc *rpc.Client + + sTestsKafka = []func(t *testing.T){ + testCreateDirectory, + testKafkaLoadConfig, + testKafkaResetDataDB, + testKafkaResetStorDB, + testKafkaStartEngine, + testKafkaRPCConn, + + testKafkaCreateTopic, + testKafkaExportEvent, + testKafkaVerifyExport, + testKafkaDeleteTopic, + + testStopCgrEngine, + testCleanDirectory, + } +) + +func TestKafkaExport(t *testing.T) { + kafkaConfigDir = "ees" + for _, stest := range sTestsKafka { + t.Run(kafkaConfigDir, stest) + } +} + +func testKafkaLoadConfig(t *testing.T) { + var err error + kafkaCfgPath = path.Join(*dataDir, "conf", "samples", kafkaConfigDir) + if kafkaCfg, err = config.NewCGRConfigFromPath(kafkaCfgPath); err != nil { + t.Error(err) + } +} + +func testKafkaResetDataDB(t *testing.T) { + if err := engine.InitDataDb(kafkaCfg); err != nil { + t.Fatal(err) + } +} + +func testKafkaResetStorDB(t *testing.T) { + if err := engine.InitStorDb(kafkaCfg); err != nil { + t.Fatal(err) + } +} + +func testKafkaStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(kafkaCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testKafkaRPCConn(t *testing.T) { + var err error + kafkaRpc, err = newRPCClient(kafkaCfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testKafkaCreateTopic(t *testing.T) { + conn, err := kafka.Dial("tcp", "localhost:9092") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + controller, err := conn.Controller() + if err != nil { + t.Fatal(err) + } + controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + t.Fatal(err) + } + defer controllerConn.Close() + + topicConfigs := []kafka.TopicConfig{ + { + Topic: utils.KafkaDefaultTopic, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + + err = controllerConn.CreateTopics(topicConfigs...) + if err != nil { + t.Fatal(err) + } +} + +func testKafkaExportEvent(t *testing.T) { + event := &engine.CGREventWithEeIDs{ + EeIDs: []string{"KafkaExporter"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "KafkaEvent", + Event: map[string]interface{}{ + utils.ToR: utils.MetaVoice, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813748, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + }, + }, + } + + var reply map[string]map[string]interface{} + if err := kafkaRpc.Call(utils.EeSv1ProcessEvent, event, &reply); err != nil { + t.Error(err) + } + time.Sleep(time.Second) +} + +func testKafkaVerifyExport(t *testing.T) { + // make a new reader that consumes from the cgrates topic + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: utils.KafkaDefaultTopic, + Partition: 0, + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }) + + ctx, cancel := context.WithCancel(context.Background()) + m, err := r.ReadMessage(ctx) + if err != nil { + t.Error(err) + } + rcv := string(m.Value) + cancel() + + exp := `{"Account":"1001","AnswerTime":"2013-11-07T08:42:28Z","Category":"call","Cost":1.01,"Destination":"1002","OriginHost":"192.168.1.1","OriginID":"abcdef","RequestType":"*rated","RunID":"*default","SetupTime":"2013-11-07T08:42:25Z","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice","Usage":10000000000}` + + if rcv != exp { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) + } + + if err := r.Close(); err != nil { + t.Fatal("failed to close reader:", err) + } +} + +func testKafkaDeleteTopic(t *testing.T) { + conn, err := kafka.Dial("tcp", "localhost:9092") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + partitions, err := conn.ReadPartitions("cgrates") + if err != nil { + t.Fatal(err) + } + + if len(partitions) != 1 || partitions[0].Topic != "cgrates" { + t.Fatal("expected topic named cgrates to exist") + } + + if err := conn.DeleteTopics(utils.KafkaDefaultTopic); err != nil { + t.Fatal(err) + } + + experr := `[5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes` + _, err = conn.ReadPartitions("cgrates") + if err == nil || err.Error() != experr { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err) + } + +}