Add integration tests for kafka exporter

This commit is contained in:
ionutboangiu
2023-02-21 10:19:26 -05:00
committed by Dan Christian Bogos
parent 70c393cbb0
commit 12ed87a5e1
2 changed files with 232 additions and 0 deletions

View File

@@ -441,6 +441,15 @@
"natsSubject": "processed_cdrs",
}
},
{
"id": "KafkaExporter",
"type": "*kafka_json_map",
"export_path": "localhost:9092",
"opts": {
"kafkaTopic": "cgrates",
// "kafkaCAPath": "/home/kafka/kafka/ssl/ca-cert.pem"
}
},
{
"id": "AMQPExporter",
"type": "*amqp_json_map",

223
ees/kafka_it_test.go Normal file
View File

@@ -0,0 +1,223 @@
//go:build integration
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ees
import (
"context"
"net"
"net/rpc"
"path"
"strconv"
"testing"
"time"
kafka "github.com/segmentio/kafka-go"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
kafkaConfigDir string
kafkaCfgPath string
kafkaCfg *config.CGRConfig
kafkaRpc *rpc.Client
sTestsKafka = []func(t *testing.T){
testCreateDirectory,
testKafkaLoadConfig,
testKafkaResetDataDB,
testKafkaResetStorDB,
testKafkaStartEngine,
testKafkaRPCConn,
testKafkaCreateTopic,
testKafkaExportEvent,
testKafkaVerifyExport,
testKafkaDeleteTopic,
testStopCgrEngine,
testCleanDirectory,
}
)
func TestKafkaExport(t *testing.T) {
kafkaConfigDir = "ees"
for _, stest := range sTestsKafka {
t.Run(kafkaConfigDir, stest)
}
}
func testKafkaLoadConfig(t *testing.T) {
var err error
kafkaCfgPath = path.Join(*dataDir, "conf", "samples", kafkaConfigDir)
if kafkaCfg, err = config.NewCGRConfigFromPath(kafkaCfgPath); err != nil {
t.Error(err)
}
}
func testKafkaResetDataDB(t *testing.T) {
if err := engine.InitDataDb(kafkaCfg); err != nil {
t.Fatal(err)
}
}
func testKafkaResetStorDB(t *testing.T) {
if err := engine.InitStorDb(kafkaCfg); err != nil {
t.Fatal(err)
}
}
func testKafkaStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(kafkaCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testKafkaRPCConn(t *testing.T) {
var err error
kafkaRpc, err = newRPCClient(kafkaCfg.ListenCfg())
if err != nil {
t.Fatal(err)
}
}
func testKafkaCreateTopic(t *testing.T) {
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
t.Fatal(err)
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
t.Fatal(err)
}
controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
t.Fatal(err)
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: utils.KafkaDefaultTopic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
t.Fatal(err)
}
}
func testKafkaExportEvent(t *testing.T) {
event := &engine.CGREventWithEeIDs{
EeIDs: []string{"KafkaExporter"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "KafkaEvent",
Event: map[string]interface{}{
utils.ToR: utils.MetaVoice,
utils.OriginID: "abcdef",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.MetaRated,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.AccountField: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813748, 0).UTC(),
utils.Usage: 10 * time.Second,
utils.RunID: utils.MetaDefault,
utils.Cost: 1.01,
},
},
}
var reply map[string]map[string]interface{}
if err := kafkaRpc.Call(utils.EeSv1ProcessEvent, event, &reply); err != nil {
t.Error(err)
}
time.Sleep(time.Second)
}
func testKafkaVerifyExport(t *testing.T) {
// make a new reader that consumes from the cgrates topic
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: utils.KafkaDefaultTopic,
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
ctx, cancel := context.WithCancel(context.Background())
m, err := r.ReadMessage(ctx)
if err != nil {
t.Error(err)
}
rcv := string(m.Value)
cancel()
exp := `{"Account":"1001","AnswerTime":"2013-11-07T08:42:28Z","Category":"call","Cost":1.01,"Destination":"1002","OriginHost":"192.168.1.1","OriginID":"abcdef","RequestType":"*rated","RunID":"*default","SetupTime":"2013-11-07T08:42:25Z","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice","Usage":10000000000}`
if rcv != exp {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv)
}
if err := r.Close(); err != nil {
t.Fatal("failed to close reader:", err)
}
}
func testKafkaDeleteTopic(t *testing.T) {
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
t.Fatal(err)
}
defer conn.Close()
partitions, err := conn.ReadPartitions("cgrates")
if err != nil {
t.Fatal(err)
}
if len(partitions) != 1 || partitions[0].Topic != "cgrates" {
t.Fatal("expected topic named cgrates to exist")
}
if err := conn.DeleteTopics(utils.KafkaDefaultTopic); err != nil {
t.Fatal(err)
}
experr := `[5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes`
_, err = conn.ReadPartitions("cgrates")
if err == nil || err.Error() != experr {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err)
}
}