From 1398cf3fd2c0fe21509a319e33049adced784f73 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 10 May 2022 17:50:11 +0300 Subject: [PATCH] Add it test for kafka exporter --- data/conf/samples/ees/cgrates.json | 21 ++- ees/kafka_it_test.go | 218 +++++++++++++++++++++++++++++ 2 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 ees/kafka_it_test.go diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 24a857422..267113b64 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -510,12 +510,31 @@ {"tag": "CostDetails", "path": "*exp.CostDetails", "type": "*variable", "value": "~*req.CostDetails"} ] }, + { + "id": "KafkaExporter", + "type": "*kafkaJSONMap", + "export_path": "127.0.0.1:9092", + "concurrent_requests": 0, + "opts": { + "kafkaTopic": "cgrates" + }, + "timezone": "", + "filters": [], + "flags": [], + "attribute_ids": [], + "attribute_context": "", + "synchronous": false, + "blocker": false, + "attempts": 1, + "fields":[], + "failed_posts_dir": "/var/spool/cgrates/failed_posts" + } ] }, "admins": { - "enabled": true, + "enabled": true }, diff --git a/ees/kafka_it_test.go b/ees/kafka_it_test.go new file mode 100644 index 000000000..5cc4e2cd7 --- /dev/null +++ b/ees/kafka_it_test.go @@ -0,0 +1,218 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "net" + "net/rpc" + "path" + "strconv" + "testing" + "time" + + "github.com/cgrates/birpc/context" + kafka "github.com/segmentio/kafka-go" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + kafkaConfigDir string + kafkaCfgPath string + kafkaCfg *config.CGRConfig + kafkaRpc *rpc.Client + + sTestsKafka = []func(t *testing.T){ + testCreateDirectory, + testKafkaLoadConfig, + testKafkaResetDataDB, + + testKafkaStartEngine, + testKafkaRPCConn, + testKafkaCreateTopic, + testKafkaExportEvent, + testKafkaVerifyExport, + testKafkaDeleteTopic, + testStopCgrEngine, + testCleanDirectory, + } +) + +func TestKafkaExport(t *testing.T) { + kafkaConfigDir = "ees" + for _, stest := range sTestsKafka { + t.Run(kafkaConfigDir, stest) + } +} + +func testKafkaLoadConfig(t *testing.T) { + var err error + kafkaCfgPath = path.Join(*dataDir, "conf", "samples", kafkaConfigDir) + if kafkaCfg, err = config.NewCGRConfigFromPath(context.Background(), kafkaCfgPath); err != nil { + t.Error(err) + } +} + +func testKafkaResetDataDB(t *testing.T) { + if err := engine.InitDataDB(kafkaCfg); err != nil { + t.Fatal(err) + } +} + +func testKafkaStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(kafkaCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testKafkaRPCConn(t *testing.T) { + var err error + kafkaRpc, err = newRPCClient(kafkaCfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testKafkaCreateTopic(t *testing.T) { + conn, err := kafka.Dial("tcp", "localhost:9092") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + controller, err := conn.Controller() + if err != nil { + t.Fatal(err) + } + controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + t.Fatal(err) + } + defer controllerConn.Close() + + topicConfigs := []kafka.TopicConfig{ + { + Topic: utils.KafkaDefaultTopic, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + + err = controllerConn.CreateTopics(topicConfigs...) + if err != nil { + t.Fatal(err) + } +} + +func testKafkaExportEvent(t *testing.T) { + event := &utils.CGREventWithEeIDs{ + EeIDs: []string{"KafkaExporter"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "KafkaEvent", + Event: map[string]interface{}{ + utils.ToR: utils.MetaVoice, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813748, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + }, + }, + } + + var reply map[string]map[string]interface{} + if err := kafkaRpc.Call(utils.EeSv1ProcessEvent, event, &reply); err != nil { + t.Error(err) + } + time.Sleep(time.Second) +} + +func testKafkaVerifyExport(t *testing.T) { + // make a new reader that consumes from the cgrates topic + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: utils.KafkaDefaultTopic, + Partition: 0, + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }) + + ctx, cancel := context.WithCancel(context.Background()) + var rcv string + for { + m, err := r.ReadMessage(ctx) + if err != nil { + break + } + rcv = string(m.Value) + cancel() + } + + exp := `{"Account":"1001","AnswerTime":"2013-11-07T08:42:28Z","Category":"call","Cost":1.01,"Destination":"1002","OriginHost":"192.168.1.1","OriginID":"abcdef","RequestType":"*rated","RunID":"*default","SetupTime":"2013-11-07T08:42:25Z","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice","Usage":10000000000}` + + if rcv != exp { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) + } + + if err := r.Close(); err != nil { + t.Fatal("failed to close reader:", err) + } +} + +func testKafkaDeleteTopic(t *testing.T) { + conn, err := kafka.Dial("tcp", "localhost:9092") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + partitions, err := conn.ReadPartitions("cgrates") + if err != nil { + t.Fatal(err) + } + + if len(partitions) != 1 || partitions[0].Topic != "cgrates" { + t.Fatal("expected topic named cgrates to exist") + } + + if err := conn.DeleteTopics(utils.KafkaDefaultTopic); err != nil { + t.Fatal(err) + } + + experr := `[5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes` + _, err = conn.ReadPartitions("cgrates") + if err == nil || err.Error() != experr { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err) + } + +}