From c5a6b27ff4f41da7f4f453ae7681fc330ff4d57f Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 29 Apr 2020 12:52:56 +0300 Subject: [PATCH] Added tests for AMQPv1Poster --- engine/poster_it_test.go | 82 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 3 deletions(-) diff --git a/engine/poster_it_test.go b/engine/poster_it_test.go index 88bbf52e8..1a5cf9036 100644 --- a/engine/poster_it_test.go +++ b/engine/poster_it_test.go @@ -20,6 +20,7 @@ along with this program. If not, see package engine import ( + "context" "encoding/json" "flag" "fmt" @@ -37,6 +38,7 @@ import ( "github.com/aws/aws-sdk-go/service/sqs" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + amqpv1 "pack.ag/amqp" ) var ( @@ -47,10 +49,13 @@ var ( go test -tags=integration -run=TestSQSPoster -sqs - s3 go test -tags=integration -run=TestS3Poster -s3 + - amqpv1 + go test -tags=integration -run=TestAMQPv1Poster -amqpv1 also configure the credentials from test function */ - itTestSQS = flag.Bool("sqs", false, "Run the test for SQSPoster") - itTestS3 = flag.Bool("s3", false, "Run the test for SQSPoster") + itTestSQS = flag.Bool("sqs", false, "Run the test for SQSPoster") + itTestS3 = flag.Bool("s3", false, "Run the test for S3Poster") + itTestAMQPv1 = flag.Bool("amqpv1", false, "Run the test for AMQPv1Poster") ) type TestContent struct { @@ -222,7 +227,7 @@ func TestS3Poster(t *testing.T) { qname := "cgrates-cdrs" //##################################### - // export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" + // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" dialURL := fmt.Sprintf("%s?aws_region=%s&aws_key=%s&aws_secret=%s&queue_id=%s", endpoint, region, awsKey, awsSecret, qname) body := "testString" @@ -261,3 +266,74 @@ func TestS3Poster(t *testing.T) { t.Errorf("Expected: %q, received: %q", body, rply) } } + +func TestAMQPv1Poster(t *testing.T) { + if !*itTestAMQPv1 { + return + } + cfg1, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + utils.Newlogger(utils.MetaSysLog, cfg1.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + + //##################################### + // update this variables + endpoint := "amqps://RootManageSharedAccessKey:UlfIJ%2But11L0ZzA%2Fgpje8biFJeQihpWibJsUhaOi1DU%3D@cdrscgrates.servicebus.windows.net" + qname := "cgrates-cdrs" + //##################################### + + // export_path for amqpv1: "amqps://admin:admin@endpoint?queue_id=cgrates_cdrs", + dialURL := fmt.Sprintf("%s?queue_id=%s", endpoint, qname) + + body := "testString" + + pstr, err := PostersCache.GetAMQPv1Poster(dialURL, 5) + if err != nil { + t.Fatal(err) + } + if err := pstr.Post([]byte(body), ""); err != nil { + t.Fatal(err) + } + // Create client + client, err := amqpv1.Dial(endpoint) + if err != nil { + t.Fatal("Dialing AMQP server:", err) + } + defer client.Close() + + // Open a session + session, err := client.NewSession() + if err != nil { + t.Fatal("Creating AMQP session:", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + + // Create a receiver + receiver, err := session.NewReceiver( + amqpv1.LinkSourceAddress("/" + qname), + ) + if err != nil { + t.Fatal("Creating receiver link:", err) + } + defer func() { + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + receiver.Close(ctx) + cancel() + }() + + // Receive next message + msg, err := receiver.Receive(ctx) + cancel() + if err != nil { + t.Fatal("Reading message from AMQP:", err) + } + + // Accept message + msg.Accept() + if rply := string(msg.GetData()); rply != body { + t.Errorf("Expected: %q, received: %q", body, rply) + } +}