From 9ee38516acee76ed6ec02387937841d05a9a94f6 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 1 Feb 2023 18:19:47 +0200 Subject: [PATCH] Add it test for AMQP exporter --- data/conf/samples/ees_amqp/cgrates.json | 58 +++++++++ ees/amqp_it_test.go | 159 ++++++++++++++++++++++++ 2 files changed, 217 insertions(+) create mode 100644 data/conf/samples/ees_amqp/cgrates.json create mode 100644 ees/amqp_it_test.go diff --git a/data/conf/samples/ees_amqp/cgrates.json b/data/conf/samples/ees_amqp/cgrates.json new file mode 100644 index 000000000..8db513541 --- /dev/null +++ b/data/conf/samples/ees_amqp/cgrates.json @@ -0,0 +1,58 @@ +{ + + "general": { + "log_level": 7, + "poster_attempts": 2, + "failed_posts_ttl": "1s" + }, + + "data_db": { + "db_type": "*internal" + }, + + + "stor_db": { + "db_type": "*internal" + }, + + "ees": { + "enabled": true, + "exporters": [ + { + "id": "amqp_test_file", + "type": "*amqp_json_map", + "export_path": "amqp://guest:guest@localhost:5672/", + "opts": { + "amqpQueueID": "cgrates_cdrs", + "amqpExchange": "exchangename", + "amqpExchangeType": "fanout", + "amqpRoutingKey": "cgr_cdrs" + }, + "attempts": 1, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "synchronous": true, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] + } + ] + }, + + + "templates": { + "requiredFields": [ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "OrderID", "path": "*exp.OrderID", "type": "*variable", "value": "~*req.OrderID","filter":"*string:~*opts.AddOrderID:true"} + ] + } + + } \ No newline at end of file diff --git a/ees/amqp_it_test.go b/ees/amqp_it_test.go new file mode 100644 index 000000000..9eef43e4e --- /dev/null +++ b/ees/amqp_it_test.go @@ -0,0 +1,159 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "net/rpc" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/streadway/amqp" +) + +var ( + amqpConfDir string + amqpCfgPath string + amqpCfg *config.CGRConfig + amqpRPC *rpc.Client + + sTestsAMQP = []func(t *testing.T){ + testAMQPLoadConfig, + testAMQPResetDataDB, + testAMQPResetStorDb, + testAMQPStartEngine, + testAMQPRPCConn, + testAMQPExportEvent, + testAMQPVerifyExport, + testStopCgrEngine, + } +) + +func TestAMQPExport(t *testing.T) { + amqpConfDir = "ees_amqp" + for _, stest := range sTestsAMQP { + t.Run(amqpConfDir, stest) + } +} + +func testAMQPLoadConfig(t *testing.T) { + var err error + amqpCfgPath = path.Join(*dataDir, "conf", "samples", amqpConfDir) + if amqpCfg, err = config.NewCGRConfigFromPath(amqpCfgPath); err != nil { + t.Error(err) + } +} + +func testAMQPResetDataDB(t *testing.T) { + if err := engine.InitDataDb(amqpCfg); err != nil { + t.Fatal(err) + } +} + +func testAMQPResetStorDb(t *testing.T) { + if err := engine.InitStorDb(amqpCfg); err != nil { + t.Fatal(err) + } +} + +func testAMQPStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(amqpCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testAMQPRPCConn(t *testing.T) { + var err error + amqpRPC, err = newRPCClient(amqpCfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testAMQPExportEvent(t *testing.T) { + ev := &engine.CGREventWithEeIDs{ + EeIDs: []string{"amqp_test_file"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "voiceEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.MetaVoice, + utils.OriginID: "dsafdsaf", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + }, + }, + } + + var reply map[string]utils.MapStorage + if err := amqpRPC.Call(utils.EeSv1ProcessEvent, ev, &reply); err != nil { + t.Error(err) + } + + time.Sleep(2 * time.Second) +} + +func testAMQPVerifyExport(t *testing.T) { + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + ch, err := conn.Channel() + if err != nil { + t.Fatal(err) + } + defer ch.Close() + q, err := ch.QueueDeclare("cgrates_cdrs", true, false, false, false, nil) + if err != nil { + t.Fatal(err) + } + msgs, err := ch.Consume(q.Name, utils.EmptyString, true, false, false, false, nil) + if err != nil { + t.Fatal(err) + } + expCDR := `{"Account":"1001","CGRID":"dbafe9c8614c785a65aabd116dd3959c3c56f7f6","Category":"call","Destination":"1002","OriginID":"dsafdsaf","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice"}` + select { + case d := <-msgs: + rcvCDR := string(d.Body) + if rcvCDR != expCDR { + t.Errorf("unexpected CDR received: <%s>", rcvCDR) + } + case <-time.After(100 * time.Millisecond): + t.Error("No message received from RabbitMQ") + } +}