Added tests for AMQPv1Poster

This commit is contained in:
Trial97
2020-04-29 12:52:56 +03:00
committed by Dan Christian Bogos
parent af9014e8aa
commit c5a6b27ff4

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"context"
"encoding/json"
"flag"
"fmt"
@@ -37,6 +38,7 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
amqpv1 "pack.ag/amqp"
)
var (
@@ -47,10 +49,13 @@ var (
go test -tags=integration -run=TestSQSPoster -sqs
- s3
go test -tags=integration -run=TestS3Poster -s3
- amqpv1
go test -tags=integration -run=TestAMQPv1Poster -amqpv1
also configure the credentials from test function
*/
itTestSQS = flag.Bool("sqs", false, "Run the test for SQSPoster")
itTestS3 = flag.Bool("s3", false, "Run the test for SQSPoster")
itTestSQS = flag.Bool("sqs", false, "Run the test for SQSPoster")
itTestS3 = flag.Bool("s3", false, "Run the test for S3Poster")
itTestAMQPv1 = flag.Bool("amqpv1", false, "Run the test for AMQPv1Poster")
)
type TestContent struct {
@@ -222,7 +227,7 @@ func TestS3Poster(t *testing.T) {
qname := "cgrates-cdrs"
//#####################################
// export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
// export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
dialURL := fmt.Sprintf("%s?aws_region=%s&aws_key=%s&aws_secret=%s&queue_id=%s", endpoint, region, awsKey, awsSecret, qname)
body := "testString"
@@ -261,3 +266,74 @@ func TestS3Poster(t *testing.T) {
t.Errorf("Expected: %q, received: %q", body, rply)
}
}
func TestAMQPv1Poster(t *testing.T) {
if !*itTestAMQPv1 {
return
}
cfg1, err := config.NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
utils.Newlogger(utils.MetaSysLog, cfg1.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
//#####################################
// update this variables
endpoint := "amqps://RootManageSharedAccessKey:UlfIJ%2But11L0ZzA%2Fgpje8biFJeQihpWibJsUhaOi1DU%3D@cdrscgrates.servicebus.windows.net"
qname := "cgrates-cdrs"
//#####################################
// export_path for amqpv1: "amqps://admin:admin@endpoint?queue_id=cgrates_cdrs",
dialURL := fmt.Sprintf("%s?queue_id=%s", endpoint, qname)
body := "testString"
pstr, err := PostersCache.GetAMQPv1Poster(dialURL, 5)
if err != nil {
t.Fatal(err)
}
if err := pstr.Post([]byte(body), ""); err != nil {
t.Fatal(err)
}
// Create client
client, err := amqpv1.Dial(endpoint)
if err != nil {
t.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
t.Fatal("Creating AMQP session:", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// Create a receiver
receiver, err := session.NewReceiver(
amqpv1.LinkSourceAddress("/" + qname),
)
if err != nil {
t.Fatal("Creating receiver link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
receiver.Close(ctx)
cancel()
}()
// Receive next message
msg, err := receiver.Receive(ctx)
cancel()
if err != nil {
t.Fatal("Reading message from AMQP:", err)
}
// Accept message
msg.Accept()
if rply := string(msg.GetData()); rply != body {
t.Errorf("Expected: %q, received: %q", body, rply)
}
}