Updated integration tests

This commit is contained in:
Trial97
2020-12-08 17:36:44 +02:00
committed by Dan Christian Bogos
parent f63dc62519
commit 6e0bf5ad70
5 changed files with 59 additions and 180 deletions

View File

@@ -32,7 +32,7 @@
"store_cdrs": false, // store cdrs in storDb
"chargers_conns":["*internal"],
"rals_conns": ["*internal"],
"online_cdr_exports": ["http_localhost", "amqp_localhost", "http_test_file", "amqp_test_file","aws_test_file","sqs_test_file","kafka_localhost","s3_test_file", "eventcost_filter"],
"online_cdr_exports": ["http_localhost", "amqp_localhost", "http_test_file", "kafka_localhost", "eventcost_filter"],
"ees_conns": ["*localhost"]
},
@@ -44,6 +44,7 @@
"enabled": true,
},
"ees": {
"enabled": true,
"exporters": [
@@ -52,7 +53,9 @@
"type": "*http_post",
"export_path": "http://127.0.0.1:12080/cdr_http",
"tenant": "cgrates.org",
"synchronous": true,
"attempts": 1,
"filters":["*string:~*opts.ExporterID:http_localhost"],
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
@@ -104,54 +107,12 @@
"type": "*http_post",
"export_path": "http://127.0.0.1:12080/invalid",
"tenant": "cgrates.org",
"synchronous": true,
"attempts": 1,
"fields":[
{"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"},
],
},
{
"id": "aws_test_file",
"type": "*amqpv1_json_map",
"export_path": "amqps://guest:guest@localhost:25672/",
"opts": {
"queueID": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}
],
},
{
"id": "sqs_test_file",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint"
"export_path": "http://sqs.eu-west-2.amazonaws.com/",
"opts": {
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}
],
},
{
"id": "amqp_test_file",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:25672/",
"opts": {
"queueID": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}
],
},
{
"id": "kafka_localhost",
"type": "*kafka_json_map",
@@ -160,23 +121,7 @@
"topic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}
],
},
{
"id": "s3_test_file",
"type": "*s3_json_map",
// export_path for s3: "endpoint"
"export_path": "http://s3.us-east-2.amazonaws.com/",
"opts": {
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"synchronous": true,
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}
@@ -191,6 +136,7 @@
},
"tenant": "cgrates.org",
"filters":["*string:~*ec.Cost:100"],
"synchronous": true,
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}

View File

@@ -30,7 +30,7 @@
"store_cdrs": false, // store cdrs in storDb
"chargers_conns":["*internal"],
"rals_conns": ["*internal"],
"online_cdr_exports": ["http_localhost", "amqp_localhost", "http_test_file", "amqp_test_file","aws_test_file","sqs_test_file","kafka_localhost","s3_test_file", "eventcost_filter"],
"online_cdr_exports": ["http_localhost", "amqp_localhost", "http_test_file", "kafka_localhost", "eventcost_filter"],
"ees_conns": ["*localhost"]
},
@@ -51,7 +51,9 @@
"type": "*http_post",
"export_path": "http://127.0.0.1:12080/cdr_http",
"tenant": "cgrates.org",
"synchronous": true,
"attempts": 1,
"filters":["*string:~*opts.ExporterID:http_localhost"],
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
@@ -103,54 +105,12 @@
"type": "*http_post",
"export_path": "http://127.0.0.1:12080/invalid",
"tenant": "cgrates.org",
"synchronous": true,
"attempts": 1,
"fields":[
{"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"},
],
},
{
"id": "aws_test_file",
"type": "*amqpv1_json_map",
"export_path": "amqps://guest:guest@localhost:25672/",
"opts": {
"queueID": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}
],
},
{
"id": "sqs_test_file",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint"
"export_path": "http://sqs.eu-west-2.amazonaws.com/",
"opts": {
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}
],
},
{
"id": "amqp_test_file",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:25672/",
"opts": {
"queueID": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}
],
},
{
"id": "kafka_localhost",
"type": "*kafka_json_map",
@@ -159,23 +119,7 @@
"topic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}
],
},
{
"id": "s3_test_file",
"type": "*s3_json_map",
// export_path for s3: "endpoint"
"export_path": "http://s3.us-east-2.amazonaws.com/",
"opts": {
"awsRegion": "eu-west-2",
"awsKey": "testkey",
"awsSecret": "testsecret",
"queueID": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"synchronous": true,
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}
@@ -190,6 +134,7 @@
},
"tenant": "cgrates.org",
"filters":["*string:~*ec.Cost:100"],
"synchronous": true,
"attempts": 1,
"fields":[
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}

View File

@@ -115,6 +115,7 @@ func (pstr *HTTPPoster) do(req *http.Request) (respBody []byte, err error) {
return
}
if resp.StatusCode > 299 {
err = fmt.Errorf("unexpected status code received: <%d>", resp.StatusCode)
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", pstr.addr, resp.StatusCode))
return
}

View File

@@ -25,6 +25,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"os/exec"
@@ -48,7 +49,7 @@ var (
cdrsMasterRpc *rpcclient.RPCClient
httpCGRID = utils.UUIDSha1Prefix()
amqpCGRID = utils.UUIDSha1Prefix()
failoverContent = []interface{}{[]byte(fmt.Sprintf(`{"CGRID":"%s"}`, httpCGRID)), []byte(fmt.Sprintf(`{"CGRID":"%s"}`, amqpCGRID))}
failoverContent = [][]byte{[]byte(fmt.Sprintf(`{"CGRID":"%s"}`, httpCGRID)), []byte(fmt.Sprintf(`{"CGRID":"%s"}`, amqpCGRID))}
sTestsCDRsOnExp = []func(t *testing.T){
testCDRsOnExpInitConfig,
@@ -69,7 +70,7 @@ var (
func TestCDRsOnExp(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
case utils.MetaInternal, utils.MetaPostgres:
t.SkipNow()
case utils.MetaMySQL:
cdrsMasterCfgDIR = "cdrsonexpmaster_mysql"
@@ -77,8 +78,6 @@ func TestCDRsOnExp(t *testing.T) {
case utils.MetaMongo:
cdrsMasterCfgDIR = "cdrsonexpmaster_mongo"
cdrsSlaveCfgDIR = "cdrsonexpslave_mongo"
case utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("Unknown Database type")
}
@@ -160,13 +159,33 @@ func testCDRsOnExpAMQPQueuesCreation(t *testing.T) {
if err = ch.QueueBind(q1.Name, "cgr_cdrs", "exchangename", false, nil); err != nil {
t.Fatal(err)
}
if err = ch.Close(); err != nil {
t.Error(err)
}
if err = conn.Close(); err != nil {
t.Error(err)
}
v, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
t.Fatal(err)
}
if err := v.CreateTopics(kafka.TopicConfig{
Topic: "cgrates_cdrs",
NumPartitions: 1,
ReplicationFactor: 1,
}); err != nil {
t.Fatal(err)
}
if err = v.Close(); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func testCDRsOnExpInitMasterRPC(t *testing.T) {
var err error
cdrsMasterRpc, err = rpcclient.NewRPCClient(utils.TCP, cdrsMasterCfg.ListenCfg().RPCJSONListen, false, "", "", "", 1, 1,
time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false)
time.Second, 5*time.Second, rpcclient.JSONrpc, nil, false)
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
@@ -221,7 +240,7 @@ func testCDRsOnExpDisableOnlineExport(t *testing.T) {
var reply string
if err := cdrsMasterRpc.Call(utils.CDRsV1ProcessEvent,
&engine.ArgV1ProcessEvent{
Flags: []string{"*export:false"},
Flags: []string{"*export:false", "*chargers:false"},
CGREventWithOpts: utils.CGREventWithOpts{
CGREvent: testCdr.AsCGREvent(),
},
@@ -249,7 +268,7 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) {
OriginID: "httpjsonrpc1",
OriginHost: "192.168.1.1",
Source: "UNKNOWN",
RequestType: utils.META_PSEUDOPREPAID,
RequestType: utils.META_NONE,
Tenant: "cgrates.org",
Category: "call",
Account: "1001",
@@ -269,8 +288,12 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) {
var reply string
// we expect that the cdr export to fail and go into the failed post directory
if err := cdrsMasterRpc.Call(utils.CDRsV1ProcessEvent,
&engine.ArgV1ProcessEvent{CGREventWithOpts: utils.CGREventWithOpts{
CGREvent: testCdr1.AsCGREvent()}}, &reply); err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() {
&engine.ArgV1ProcessEvent{
CGREventWithOpts: utils.CGREventWithOpts{
CGREvent: testCdr1.AsCGREvent(),
Opts: map[string]interface{}{"ExporterID": "http_localhost"},
},
}, &reply); err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() {
t.Error("Unexpected error: ", err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
@@ -280,7 +303,6 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) {
t.Fatal("Could not connect to rater: ", err.Error())
}
// ToDo: Fix cdr_http to be compatible with rest of processCdr methods
time.Sleep(200 * time.Millisecond)
var rcvedCdrs []*engine.ExternalCDR
if err := cdrsSlaveRpc.Call(utils.APIerSv2GetCDRs,
&utils.RPCCDRsFilter{CGRIDs: []string{testCdr1.CGRID}, RunIDs: []string{utils.MetaDefault}}, &rcvedCdrs); err != nil {
@@ -290,10 +312,10 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) {
} else {
rcvSetupTime, _ := utils.ParseTimeDetectLayout(rcvedCdrs[0].SetupTime, "")
rcvAnswerTime, _ := utils.ParseTimeDetectLayout(rcvedCdrs[0].AnswerTime, "")
//rcvUsage, _ := utils.ParseDurationWithSecs(rcvedCdrs[0].Usage)
if rcvedCdrs[0].CGRID != testCdr1.CGRID ||
rcvedCdrs[0].RunID != testCdr1.RunID ||
rcvedCdrs[0].ToR != testCdr1.ToR ||
rcvedCdrs[0].OriginHost != testCdr1.OriginHost ||
rcvedCdrs[0].OriginID != testCdr1.OriginID ||
rcvedCdrs[0].RequestType != testCdr1.RequestType ||
rcvedCdrs[0].Tenant != testCdr1.Tenant ||
rcvedCdrs[0].Category != testCdr1.Category ||
@@ -302,10 +324,8 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) {
rcvedCdrs[0].Destination != testCdr1.Destination ||
!rcvSetupTime.Equal(testCdr1.SetupTime) ||
!rcvAnswerTime.Equal(testCdr1.AnswerTime) ||
//rcvUsage != 10 ||
rcvedCdrs[0].RunID != testCdr1.RunID {
//rcvedCdrs[0].Cost != testCdr1.Cost ||
//!reflect.DeepEqual(rcvedCdrs[0].ExtraFields, testCdr1.ExtraFields) {
rcvedCdrs[0].Usage != testCdr1.Usage.String() ||
rcvedCdrs[0].Cost != testCdr1.Cost {
t.Errorf("Expected: %+v, received: %+v", utils.ToJSON(testCdr1), utils.ToJSON(rcvedCdrs[0]))
}
}
@@ -357,7 +377,6 @@ func testCDRsOnExpAMQPReplication(t *testing.T) {
t.Error("No message received from RabbitMQ")
}
conn.Close()
time.Sleep(500 * time.Millisecond)
// restart RabbitMQ server so we can test reconnects
if err := exec.Command("service", "rabbitmq-server", "restart").Run(); err != nil {
t.Error(err)
@@ -463,25 +482,22 @@ func checkContent(ev *engine.ExportEvents, content []interface{}) error {
for i, con := range ev.Events {
recv[i] = utils.IfaceAsString(con)
}
return fmt.Errorf("Expecting: one of %q, received: %q", utils.ToJSON(exp), utils.ToJSON(recv))
return fmt.Errorf("Expecting: one of %s, received: %s", utils.ToJSON(exp), utils.ToJSON(recv))
}
return nil
}
func testCDRsOnExpFileFailover(t *testing.T) {
time.Sleep(5 * time.Second)
v1 := url.Values{}
v2 := url.Values{}
v1.Set("OriginID", "httpjsonrpc1")
v2.Set("OriginID", "amqpreconnect")
httpContent := []interface{}{v1, v2}
httpContent := []interface{}{&engine.HTTPPosterRequest{Body: v1, Header: http.Header{"Content-Type": []string{"application/x-www-form-urlencoded"}}},
&engine.HTTPPosterRequest{Body: v2, Header: http.Header{"Content-Type": []string{"application/x-www-form-urlencoded"}}}}
filesInDir, _ := ioutil.ReadDir(cdrsMasterCfg.GeneralCfg().FailedPostsDir)
if len(filesInDir) == 0 {
t.Fatalf("No files in directory: %s", cdrsMasterCfg.GeneralCfg().FailedPostsDir)
}
expectedFormats := utils.NewStringSet([]string{utils.MetaHTTPPost, utils.MetaAMQPjsonMap,
utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaS3jsonMap})
rcvFormats := utils.NewStringSet([]string{})
for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config
fileName := file.Name()
filePath := path.Join(cdrsMasterCfg.GeneralCfg().FailedPostsDir, fileName)
@@ -494,18 +510,13 @@ func testCDRsOnExpFileFailover(t *testing.T) {
t.Error("Expected at least one event")
continue
}
rcvFormats.Add(ev.Format)
content := failoverContent
if ev.Format == utils.MetaHTTPPost {
content = httpContent
if ev.Format != utils.MetaHTTPPost {
t.Errorf("Expected %s to be only failed exporter,received <%s>", utils.MetaHTTPPost, ev.Format)
}
if err := checkContent(ev, content); err != nil {
if err := checkContent(ev, httpContent); err != nil {
t.Errorf("For file <%s> and event <%s> received %s", filePath, utils.ToJSON(ev), err)
}
}
if !reflect.DeepEqual(expectedFormats, rcvFormats) {
t.Errorf("Missing format expecting: %s received: %s", utils.ToJSON(expectedFormats), utils.ToJSON(rcvFormats))
}
}
func testCDRsOnExpKafkaPosterFileFailover(t *testing.T) {
@@ -522,37 +533,13 @@ func testCDRsOnExpKafkaPosterFileFailover(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if m, err := reader.ReadMessage(ctx); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(failoverContent[0].([]byte), m.Value) && !reflect.DeepEqual(failoverContent[1].([]byte), m.Value) { // Checking just the prefix should do since some content is dynamic
t.Errorf("Expecting: %v or %v, received: %v", utils.IfaceAsString(failoverContent[0]), utils.IfaceAsString(failoverContent[1]), string(m.Value))
} else if !reflect.DeepEqual(failoverContent[0], m.Value) && !reflect.DeepEqual(failoverContent[1], m.Value) { // Checking just the prefix should do since some content is dynamic
t.Errorf("Expecting: %v or %v, received: %v", string(failoverContent[0]), string(failoverContent[1]), string(m.Value))
}
cancel()
}
}
/*
// Performance test, check `lsof -a -p 8427 | wc -l`
func testCdrsHttpCdrReplication2(t *testing.T) {
cdrs := make([]*engine.CDR, 0)
for i := 0; i < 10000; i++ {
cdr := &engine.CDR{OriginID: fmt.Sprintf("httpjsonrpc_%d", i),
ToR: utils.VOICE, OriginHost: "192.168.1.1", Source: "UNKNOWN", RequestType: utils.META_PSEUDOPREPAID,
Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002",
SetupTime: time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC), AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC),
Usage: 10 * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}}
cdrs = append(cdrs, cdr)
}
var reply string
for _, cdr := range cdrs {
if err := cdrsMasterRpc.Call(utils.CDRsV1ProcessCDR, cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
}
}
*/
func testCDRsOnExpStopEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)

View File

@@ -318,7 +318,7 @@ func testV1DataDataDebitUsage1G0(t *testing.T) {
&engine.UsageRecordWithOpts{UsageRecord: usageRecord}, &reply); err != nil {
t.Error(err)
}
if time.Now().Sub(tStart) > 50*time.Millisecond {
if time.Now().Sub(tStart) > 100*time.Millisecond {
t.Errorf("Take's too long for GetDataCost : %+v", time.Now().Sub(tStart))
}