From ce512854f24604c4da8e425353ee7cff8bf19a65 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Mon, 23 Jan 2023 17:54:36 +0200 Subject: [PATCH] Port sqs and s3 tests to 1.0 --- data/conf/samples/ees_s3&sqs/cgrates.json | 75 +++++++++ ees/s3_it_test.go | 177 ++++++++++++++++++++ ees/sqs_it_test.go | 192 ++++++++++++++++++++++ 3 files changed, 444 insertions(+) create mode 100644 data/conf/samples/ees_s3&sqs/cgrates.json create mode 100644 ees/s3_it_test.go create mode 100644 ees/sqs_it_test.go diff --git a/data/conf/samples/ees_s3&sqs/cgrates.json b/data/conf/samples/ees_s3&sqs/cgrates.json new file mode 100644 index 000000000..98da10ed9 --- /dev/null +++ b/data/conf/samples/ees_s3&sqs/cgrates.json @@ -0,0 +1,75 @@ +{ + +"general": { + "log_level": 7, + "poster_attempts": 2, + "failed_posts_ttl": "1s" +}, + +"data_db": { + "db_type": "*internal" +}, + + +"stor_db": { + "db_type": "*internal" +}, + +"ees": { + "enabled": true, + "exporters": [ + { + "id": "sqs_test_file", + "type": "*sqsJSONMap", + "export_path": "sqs.eu-central-1.amazonaws.com", + "opts": { + "awsRegion": "eu-central-1", + "awsKey": "AKIAYPZSIYZCW22KRSVY", + "awsSecret": "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", + "sqsQueueID": "testQueue" + }, + "attempts": 1, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "synchronous": true, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] + }, + { + "id": "s3_test_file", + "type": "*s3JSONMap", + "export_path": "s3.eu-central-1.amazonaws.com", + "opts": { + "awsRegion": "eu-central-1", + "awsKey": "AKIAYPZSIYZCW22KRSVY", + "awsSecret": "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", + "s3BucketID": "cgrates-cdrs" + }, + "attempts": 1, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "synchronous": true, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] + } + ] +}, + + +"templates": { + "requiredFields": [ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "OrderID", "path": "*exp.OrderID", "type": "*variable", "value": "~*req.OrderID","filter":"*string:~*opts.AddOrderID:true"} + ] +} + +} \ No newline at end of file diff --git a/ees/s3_it_test.go b/ees/s3_it_test.go new file mode 100644 index 000000000..3879b623f --- /dev/null +++ b/ees/s3_it_test.go @@ -0,0 +1,177 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "flag" + "fmt" + "net/rpc" + "path" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + runS3Test = flag.Bool("s3_ees", false, "Run the integration test for the S3 exporter") + originID string // needed to compute the key when verifying export + s3ConfDir string + s3CfgPath string + s3Cfg *config.CGRConfig + s3RPC *rpc.Client + + sTestsS3 = []func(t *testing.T){ + testS3LoadConfig, + testS3ResetDataDB, + testS3StartEngine, + testS3RPCConn, + testS3ExportEvent, + testS3VerifyExport, + testStopCgrEngine, + } +) + +func TestS3Export(t *testing.T) { + if !*runS3Test { + t.SkipNow() + } + s3ConfDir = "ees_s3&sqs" + for _, stest := range sTestsS3 { + t.Run(s3ConfDir, stest) + } +} + +func testS3LoadConfig(t *testing.T) { + var err error + s3CfgPath = path.Join(*dataDir, "conf", "samples", s3ConfDir) + if s3Cfg, err = config.NewCGRConfigFromPath(context.Background(), s3CfgPath); err != nil { + t.Error(err) + } + for _, value := range s3Cfg.EEsCfg().Exporters { + if value.ID == "sqs_test_file" { + awsKey = *value.Opts.AWSKey + awsSecret = *value.Opts.AWSSecret + } + } +} + +func testS3ResetDataDB(t *testing.T) { + if err := engine.InitDataDB(s3Cfg); err != nil { + t.Fatal(err) + } +} + +func testS3StartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(s3CfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testS3RPCConn(t *testing.T) { + var err error + s3RPC, err = newRPCClient(s3Cfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testS3ExportEvent(t *testing.T) { + originID = utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()) + ev := &utils.CGREventWithEeIDs{ + EeIDs: []string{"s3_test_file"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "dataEvent", + Event: map[string]interface{}{ + utils.ToR: utils.MetaData, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "AnotherTenant", + utils.Category: "call", //for data CDR use different Tenant + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Nanosecond, + utils.RunID: utils.MetaDefault, + utils.Cost: 0.012, + }, + APIOpts: map[string]interface{}{ + utils.MetaOriginID: originID, + }, + }, + } + + var reply map[string]utils.MapStorage + if err := s3RPC.Call(utils.EeSv1ProcessEvent, ev, &reply); err != nil { + t.Error(err) + } + time.Sleep(2 * time.Second) +} + +func testS3VerifyExport(t *testing.T) { + endpoint := "s3.eu-central-1.amazonaws.com" + region := "eu-central-1" + qname := "cgrates-cdrs" + + key := fmt.Sprintf("%s/%s:%s.json", "", originID, utils.MetaDefault) + + var sess *session.Session + cfg := aws.Config{Endpoint: aws.String(endpoint)} + cfg.Region = aws.String(region) + + cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "") + var err error + sess, err = session.NewSessionWithOptions( + session.Options{ + Config: cfg, + }, + ) + if err != nil { + t.Error(err) + } + s3Clnt := s3.New(sess) + s3Clnt.DeleteObject(&s3.DeleteObjectInput{}) + file := aws.NewWriteAtBuffer([]byte{}) + svc := s3manager.NewDownloader(sess) + + if _, err = svc.Download(file, + &s3.GetObjectInput{ + Bucket: aws.String(qname), + Key: aws.String(key), + }); err != nil { + t.Fatalf("Unable to download item %v", err) + } + + expected := `{"Account":"1001","Category":"call","Destination":"1002","OriginID":"abcdef","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"AnotherTenant","ToR":"*data"}` + if rply := string(file.Bytes()); rply != expected { + t.Errorf("Expected: %q, received: %q", expected, rply) + } +} diff --git a/ees/sqs_it_test.go b/ees/sqs_it_test.go new file mode 100644 index 000000000..475d1399a --- /dev/null +++ b/ees/sqs_it_test.go @@ -0,0 +1,192 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "flag" + "net/rpc" + "path" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + runSQSTest = flag.Bool("sqs_ees", false, "Run the integration test for the SQS exporter") + awsKey string + awsSecret string + sqsConfDir string + sqsCfgPath string + sqsCfg *config.CGRConfig + sqsRPC *rpc.Client + + sTestsSQS = []func(t *testing.T){ + testSQSLoadConfig, + testSQSResetDataDB, + testSQSStartEngine, + testSQSRPCConn, + testSQSExportEvent, + testSQSVerifyExport, + testStopCgrEngine, + } +) + +func TestSQSExport(t *testing.T) { + if !*runSQSTest { + t.SkipNow() + } + sqsConfDir = "ees_s3&sqs" + for _, stest := range sTestsSQS { + t.Run(sqsConfDir, stest) + } +} + +func testSQSLoadConfig(t *testing.T) { + var err error + sqsCfgPath = path.Join(*dataDir, "conf", "samples", sqsConfDir) + if sqsCfg, err = config.NewCGRConfigFromPath(context.Background(), sqsCfgPath); err != nil { + t.Error(err) + } + for _, value := range sqsCfg.EEsCfg().Exporters { + if value.ID == "sqs_test_file" { + awsKey = *value.Opts.AWSKey + awsSecret = *value.Opts.AWSSecret + } + } +} + +func testSQSResetDataDB(t *testing.T) { + if err := engine.InitDataDB(sqsCfg); err != nil { + t.Fatal(err) + } +} + +func testSQSStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(sqsCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testSQSRPCConn(t *testing.T) { + var err error + sqsRPC, err = newRPCClient(sqsCfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testSQSExportEvent(t *testing.T) { + ev := &utils.CGREventWithEeIDs{ + EeIDs: []string{"sqs_test_file"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "voiceEvent", + Event: map[string]interface{}{ + utils.ToR: utils.MetaVoice, + utils.OriginID: "dsafdsaf", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + }, + APIOpts: map[string]interface{}{ + utils.MetaOriginID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), + }, + }, + } + + var reply map[string]utils.MapStorage + if err := sqsRPC.Call(utils.EeSv1ProcessEvent, ev, &reply); err != nil { + t.Error(err) + } + + time.Sleep(2 * time.Second) +} + +func testSQSVerifyExport(t *testing.T) { + endpoint := "sqs.eu-central-1.amazonaws.com" + region := "eu-central-1" + qname := "testQueue" + + var sess *session.Session + cfg := aws.Config{Endpoint: aws.String(endpoint)} + cfg.Region = aws.String(region) + var err error + cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "") + sess, err = session.NewSessionWithOptions( + session.Options{ + Config: cfg, + }, + ) + if err != nil { + t.Error(err) + } + + svc := sqs.New(sess) + + resultURL, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(qname), + }) + if err != nil { + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { + t.Fatalf("Unable to find queue %q.", qname) + } + t.Fatalf("Unable to queue %q, %v.", qname, err) + } + + result, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: resultURL.QueueUrl, + MaxNumberOfMessages: aws.Int64(1), + VisibilityTimeout: aws.Int64(30), + WaitTimeSeconds: aws.Int64(0), + }) + + if err != nil { + t.Error(err) + return + } + + expected := `{"Account":"1001","Category":"call","Destination":"1002","OriginID":"dsafdsaf","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice"}` + if len(result.Messages) != 1 { + t.Fatalf("Expected 1 message received: %d", len(result.Messages)) + } + if result.Messages[0].Body == nil { + t.Fatal("No Msg Body") + } + if *result.Messages[0].Body != expected { + t.Errorf("Expected: %q, received: %q", expected, *result.Messages[0].Body) + } +}