Leave export_path as is when exporting (AMQP) + tests

This commit is contained in:
ionutboangiu
2023-02-16 05:23:31 -05:00
committed by Dan Christian Bogos
parent 682904ad9d
commit c2a824a94b
3 changed files with 316 additions and 0 deletions

View File

@@ -273,6 +273,8 @@ func (api *APIerSv1) ExportCDRs(arg ArgExportCDRs, reply *RplExportedCDRs) (err
filePath = path.Join(eDir, fileName)
case utils.DRYRUN:
filePath = utils.DRYRUN
case utils.MetaAMQPjsonMap:
filePath = eDir
default:
u, _ := url.Parse(eDir)
u.Path = path.Join(u.Path, fileName)

View File

@@ -0,0 +1,252 @@
//go:build integration
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"net/rpc"
"path"
"reflect"
"sort"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
amqpCfgPath string
amqpCfg *config.CGRConfig
amqpRPC *rpc.Client
amqpConfigDIR string
sTestsCDReAMQP = []func(t *testing.T){
testAMQPInitCfg,
testAMQPInitDataDb,
testAMQPResetStorDb,
testAMQPStartEngine,
testAMQPRPCConn,
testAMQPAddCDRs,
testAMQPExportCDRs,
testAMQPVerifyExport,
testAMQPKillEngine,
}
)
func TestAMQPExport(t *testing.T) {
amqpConfigDIR = "cdre_amqp"
for _, stest := range sTestsCDReAMQP {
t.Run(amqpConfigDIR, stest)
}
}
func testAMQPInitCfg(t *testing.T) {
var err error
amqpCfgPath = path.Join(alsPrfDataDir, "conf", "samples", amqpConfigDIR)
amqpCfg, err = config.NewCGRConfigFromPath(amqpCfgPath)
if err != nil {
t.Fatal(err)
}
amqpCfg.DataFolderPath = alsPrfDataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(amqpCfg)
}
func testAMQPInitDataDb(t *testing.T) {
if err := engine.InitDataDb(amqpCfg); err != nil {
t.Fatal(err)
}
}
func testAMQPResetStorDb(t *testing.T) {
if err := engine.InitStorDb(amqpCfg); err != nil {
t.Fatal(err)
}
}
func testAMQPStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(amqpCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testAMQPRPCConn(t *testing.T) {
var err error
amqpRPC, err = newRPCClient(amqpCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
func testAMQPAddCDRs(t *testing.T) {
storedCdrs := []*engine.CDR{
{
CGRID: "Cdr1",
OrderID: 101,
ToR: utils.VOICE,
OriginID: "OriginCDR1",
OriginHost: "192.168.1.1",
Source: "test",
RequestType: utils.META_RATED,
Tenant: "cgrates.org",
Category: "call",
Account: "1001",
Subject: "1001",
Destination: "+4986517174963",
SetupTime: time.Now(),
AnswerTime: time.Now(),
RunID: utils.MetaDefault,
Usage: time.Duration(10) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
Cost: 1.01,
},
{
CGRID: "Cdr2",
OrderID: 102,
ToR: utils.VOICE,
OriginID: "OriginCDR2",
OriginHost: "192.168.1.1",
Source: "test2",
RequestType: utils.META_RATED,
Tenant: "cgrates.org",
Category: "call",
Account: "1001",
Subject: "1001",
Destination: "+4986517174963",
SetupTime: time.Now(),
AnswerTime: time.Now(),
RunID: utils.MetaDefault,
Usage: time.Duration(5) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
Cost: 1.01,
},
{
CGRID: "Cdr3",
OrderID: 103,
ToR: utils.VOICE,
OriginID: "OriginCDR3",
OriginHost: "192.168.1.1",
Source: "test2",
RequestType: utils.META_RATED,
Tenant: "cgrates.org",
Category: "call",
Account: "1001",
Subject: "1001",
Destination: "+4986517174963",
SetupTime: time.Now(),
AnswerTime: time.Now(),
RunID: utils.MetaDefault,
Usage: time.Duration(30) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
Cost: 1.01,
},
{
CGRID: "Cdr4",
OrderID: 104,
ToR: utils.VOICE,
OriginID: "OriginCDR4",
OriginHost: "192.168.1.1",
Source: "test3",
RequestType: utils.META_RATED,
Tenant: "cgrates.org",
Category: "call",
Account: "1001",
Subject: "1001",
Destination: "+4986517174963",
SetupTime: time.Now(),
AnswerTime: time.Time{},
RunID: utils.MetaDefault,
Usage: time.Duration(0) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
}
for _, cdr := range storedCdrs {
var reply string
if err := amqpRPC.Call(utils.CDRsV1ProcessCDR, &engine.CDRWithArgDispatcher{CDR: cdr}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
}
time.Sleep(100 * time.Millisecond)
}
func testAMQPExportCDRs(t *testing.T) {
attr := ArgExportCDRs{
ExportArgs: map[string]interface{}{
utils.ExportTemplate: "AMQP_EXPORTER",
},
Verbose: true,
}
var rply RplExportedCDRs
if err := amqpRPC.Call(utils.APIerSv1ExportCDRs, attr, &rply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(rply.ExportedCGRIDs) != 2 {
t.Errorf("Unexpected number of CDR exported: %s ", utils.ToJSON(rply))
}
}
func testAMQPVerifyExport(t *testing.T) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
t.Fatal(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
t.Fatal(err)
}
defer ch.Close()
q, err := ch.QueueDeclare("cgrates_cdrs", true, false, false, false, nil)
if err != nil {
t.Fatal(err)
}
msgs, err := ch.Consume(q.Name, utils.EmptyString, true, false, false, false, nil)
if err != nil {
t.Fatal(err)
}
expCDRs := []string{
`{"Account":"1001","CGRID":"Cdr2","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR2","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"5s"}`,
`{"Account":"1001","CGRID":"Cdr3","Category":"call","Cost":"-1.0000","Destination":"+4986517174963","OriginID":"OriginCDR3","RunID":"*default","Source":"test2","Tenant":"cgrates.org","Usage":"30s"}`,
}
rcvCDRs := make([]string, 0)
waiting := true
for waiting {
select {
case d := <-msgs:
rcvCDRs = append(rcvCDRs, string(d.Body))
case <-time.After(100 * time.Millisecond):
waiting = false
}
}
sort.Strings(rcvCDRs)
if !reflect.DeepEqual(rcvCDRs, expCDRs) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", expCDRs, rcvCDRs)
}
}
func testAMQPKillEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
}

View File

@@ -0,0 +1,62 @@
{
"general": {
"log_level": 7,
"connect_timeout": "1h",
"reply_timeout": "1h"
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080"
},
"data_db": {
"db_type": "redis",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_password": "CGRateS.org"
},
"schedulers": {
"enabled": true
},
"cdrs": {
"enabled": true
},
"cdre": {
"AMQP_EXPORTER": {
"export_format": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&routing_key=cgrates_cdrs",
"filters" :["*string:~*req.Source:test2"],
"synchronous": true,
"fields": [
{"path": "*exp.CGRID", "type": "*composed", "value": "~*req.CGRID"},
{"path": "*exp.RunID", "type": "*composed", "value": "~*req.RunID"},
{"path": "*exp.Source", "type": "*composed", "value": "~*req.Source"},
{"path": "*exp.OriginID", "type": "*composed", "value": "~*req.OriginID"},
{"path": "*exp.Tenant", "type": "*composed", "value": "~*req.Tenant"},
{"path": "*exp.Category", "type": "*composed", "value": "~*req.Category"},
{"path": "*exp.Account", "type": "*composed", "value": "~*req.Account"},
{"path": "*exp.Destination", "type": "*composed", "value": "~*req.Destination"},
{"path": "*exp.Usage", "type": "*composed", "value": "~*req.Usage"},
{"path": "*exp.Cost", "type": "*composed", "value": "~*req.Cost", "rounding_decimals": 4}
]
}
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"]
}
}