diff --git a/data/conf/samples/ees_s3&sqs/cgrates.json b/data/conf/samples/ees_cloud/cgrates.json similarity index 76% rename from data/conf/samples/ees_s3&sqs/cgrates.json rename to data/conf/samples/ees_cloud/cgrates.json index bebd43ae1..993f5bc7e 100644 --- a/data/conf/samples/ees_s3&sqs/cgrates.json +++ b/data/conf/samples/ees_cloud/cgrates.json @@ -51,6 +51,23 @@ "fields":[ {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} ] + }, + { + "id": "amqpv1_test_file", + "type": "*amqpv1_json_map", + // connection string from Azure Portal can be found here: Home > [Service Bus namespace] > + // > Shared access policies > RootManageSharedAccessKey > Primary Connection String where + // access-key-name is RootManageSharedAccessKey and access-key is the primary connection string + "export_path": "amqps://access-key-name:access-key@name-space.servicebus.windows.net", + "opts": { + "amqpQueueID": "cgrates_cdrs" + }, + "attempts": 1, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "synchronous": true, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] } ] }, diff --git a/ees/amqpv1_it_test.go b/ees/amqpv1_it_test.go new file mode 100644 index 000000000..8bee9622f --- /dev/null +++ b/ees/amqpv1_it_test.go @@ -0,0 +1,188 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "context" + "flag" + "net/rpc" + "path" + "testing" + "time" + + amqpv1 "github.com/Azure/go-amqp" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + runAMQPv1Test = flag.Bool("amqpv1_ees", false, "Run the integration test for the AMQPv1 exporter") + amqpv1ConfDir string + amqpv1CfgPath string + amqpv1Cfg *config.CGRConfig + amqpv1RPC *rpc.Client + amqpv1DialURL string + + sTestsAMQPv1 = []func(t *testing.T){ + testAMQPv1LoadConfig, + testAMQPv1ResetDataDB, + testAMQPv1ResetStorDB, + testAMQPv1StartEngine, + testAMQPv1RPCConn, + + testAMQPv1ExportEvent, + testAMQPv1VerifyExport, + + testStopCgrEngine, + } +) + +func TestAMQPv1Export(t *testing.T) { + if !*runAMQPv1Test { + t.SkipNow() + } + amqpv1ConfDir = "ees_cloud" + for _, stest := range sTestsAMQPv1 { + t.Run(amqpv1ConfDir, stest) + } +} + +func testAMQPv1LoadConfig(t *testing.T) { + var err error + amqpv1CfgPath = path.Join(*dataDir, "conf", "samples", amqpv1ConfDir) + if amqpv1Cfg, err = config.NewCGRConfigFromPath(amqpv1CfgPath); err != nil { + t.Error(err) + } + for _, value := range amqpv1Cfg.EEsCfg().Exporters { + if value.ID == "amqpv1_test_file" { + amqpv1DialURL = value.ExportPath + } + } +} + +func testAMQPv1ResetDataDB(t *testing.T) { + if err := engine.InitDataDb(amqpv1Cfg); err != nil { + t.Fatal(err) + } +} + +func testAMQPv1ResetStorDB(t *testing.T) { + if err := engine.InitStorDb(amqpv1Cfg); err != nil { + t.Fatal(err) + } +} + +func testAMQPv1StartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(amqpv1CfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testAMQPv1RPCConn(t *testing.T) { + var err error + amqpv1RPC, err = newRPCClient(amqpv1Cfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testAMQPv1ExportEvent(t *testing.T) { + ev := &engine.CGREventWithEeIDs{ + EeIDs: []string{"amqpv1_test_file"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "dataEvent", + Event: map[string]interface{}{ + utils.ToR: utils.MetaData, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "AnotherTenant", + utils.Category: "call", //for data CDR use different Tenant + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Nanosecond, + utils.RunID: utils.MetaDefault, + utils.Cost: 0.012, + }, + }, + } + + var reply map[string]utils.MapStorage + if err := amqpv1RPC.Call(utils.EeSv1ProcessEvent, ev, &reply); err != nil { + t.Error(err) + } + time.Sleep(2 * time.Second) +} + +func testAMQPv1VerifyExport(t *testing.T) { + // Create client + client, err := amqpv1.Dial(amqpv1DialURL) + /* an alternative way to create the client + client, err := amqpv1.Dial("amqps://cgratescdrs.servicebus.windows.net", + amqpv1.ConnSASLPlain("access-key-name", "access-key"), + ) + */ + if err != nil { + t.Fatal("Dialing AMQP server:", err) + } + defer client.Close() + + // Open a session + session, err := client.NewSession() + if err != nil { + t.Fatal("Creating AMQP session:", err) + } + + ctx := context.Background() + + // Create a receiver + receiver, err := session.NewReceiver( + amqpv1.LinkSourceAddress("/cgrates_cdrs"), + amqpv1.LinkCredit(10), + ) + if err != nil { + t.Fatal("Creating receiver link:", err) + } + defer func() { + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + receiver.Close(ctx) + cancel() + }() + + // Receive message + msg, err := receiver.Receive(ctx) + if err != nil { + t.Fatal("Reading message from AMQP:", err) + } + + // Accept message + if err = receiver.AcceptMessage(context.Background(), msg); err != nil { + t.Fatalf("Failure accepting message: %v", err) + } + + expected := `{"Account":"1001","Category":"call","Destination":"1002","OriginID":"abcdef","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"AnotherTenant","ToR":"*data"}` + if rply := string(msg.GetData()); rply != expected { + t.Errorf("expected: %s, \nreceived: %s", expected, rply) + } +} diff --git a/ees/poster_it_test.go b/ees/poster_it_test.go index 79e61158e..e4ecc0fc8 100644 --- a/ees/poster_it_test.go +++ b/ees/poster_it_test.go @@ -352,7 +352,7 @@ func TestAMQPv1Poster(t *testing.T) { } // Accept message - msg.Accept(ctx) + receiver.AcceptMessage(ctx, msg) if rply := string(msg.GetData()); rply != body { t.Errorf("Expected: %q, received: %q", body, rply) } diff --git a/ees/s3_it_test.go b/ees/s3_it_test.go index b43ef5330..210444c9e 100644 --- a/ees/s3_it_test.go +++ b/ees/s3_it_test.go @@ -63,7 +63,7 @@ func TestS3Export(t *testing.T) { if !*runS3Test { t.SkipNow() } - s3ConfDir = "ees_s3&sqs" + s3ConfDir = "ees_cloud" for _, stest := range sTestsS3 { t.Run(s3ConfDir, stest) } diff --git a/ees/sqs_it_test.go b/ees/sqs_it_test.go index f97ad5843..408889f96 100644 --- a/ees/sqs_it_test.go +++ b/ees/sqs_it_test.go @@ -63,7 +63,7 @@ func TestSQSExport(t *testing.T) { if !*runSQSTest { t.SkipNow() } - sqsConfDir = "ees_s3&sqs" + sqsConfDir = "ees_cloud" for _, stest := range sTestsSQS { t.Run(sqsConfDir, stest) }