diff --git a/data/conf/samples/ees_s3&sqs/cgrates.json b/data/conf/samples/ees_s3&sqs/cgrates.json new file mode 100644 index 000000000..99d73d74f --- /dev/null +++ b/data/conf/samples/ees_s3&sqs/cgrates.json @@ -0,0 +1,73 @@ +{ + "general": { + "log_level": 7, + "poster_attempts": 2, + "failed_posts_ttl": "1s" + }, + + "data_db": { + "db_type": "*internal" + }, + + + "stor_db": { + "db_type": "*internal" + }, + + "ees": { + "enabled": true, + "exporters": [ + { + "id": "sqs_test_file", + "type": "*sqs_json_map", + "export_path": "https://sqs.eu-central-1.amazonaws.com/?awsRegion=eu-central-1&awsKey=AKIAYPZSIYZCZ5U45KEO&awsSecret=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", + "opts": { + "awsRegion": "eu-central-1", + "awsKey": "AKIAYPZSIYZCZ5U45KEO", + "awsSecret": "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", + "sqsQueueID": "testQueue" + }, + "attempts": 1, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "synchronous": true, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] + }, + { + "id": "s3_test_file", + "type": "*s3_json_map", + "export_path": "https://s3.eu-central-1.amazonaws.com/?awsRegion=eu-central-1&awsKey=AKIAYPZSIYZCZ5U45KEO&awsSecret=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", + "opts": { + "awsRegion": "eu-central-1", + "awsKey": "AKIAYPZSIYZCZ5U45KEO", + "awsSecret": "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", + "s3BucketID": "cgrates-cdrs" + }, + "attempts": 1, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "synchronous": true, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] + } + ] + }, + + + "templates": { + "requiredFields": [ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "OrderID", "path": "*exp.OrderID", "type": "*variable", "value": "~*req.OrderID","filter":"*string:~*opts.AddOrderID:true"} + ] + } +} \ No newline at end of file diff --git a/ees/s3_it_test.go b/ees/s3_it_test.go new file mode 100644 index 000000000..8eaebae79 --- /dev/null +++ b/ees/s3_it_test.go @@ -0,0 +1,173 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "net/rpc" + "path" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + s3ConfDir string + s3CfgPath string + s3Cfg *config.CGRConfig + s3RPC *rpc.Client + + sTestsS3 = []func(t *testing.T){ + testS3LoadConfig, + testS3ResetDataDB, + testS3ResetStorDb, + testS3StartEngine, + testS3RPCConn, + // testS3ExportEvent, + // testS3VerifyExport, + testStopCgrEngine, + } +) + +func TestS3Export(t *testing.T) { + s3ConfDir = "ees_s3&sqs" + for _, stest := range sTestsS3 { + t.Run(s3ConfDir, stest) + } +} + +func testS3LoadConfig(t *testing.T) { + var err error + s3CfgPath = path.Join(*dataDir, "conf", "samples", s3ConfDir) + if s3Cfg, err = config.NewCGRConfigFromPath(s3CfgPath); err != nil { + t.Error(err) + } +} + +func testS3ResetDataDB(t *testing.T) { + if err := engine.InitDataDb(s3Cfg); err != nil { + t.Fatal(err) + } +} + +func testS3ResetStorDb(t *testing.T) { + if err := engine.InitStorDb(s3Cfg); err != nil { + t.Fatal(err) + } +} + +func testS3StartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(s3CfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testS3RPCConn(t *testing.T) { + var err error + s3RPC, err = newRPCClient(s3Cfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testS3ExportEvent(t *testing.T) { + ev := &engine.CGREventWithEeIDs{ + EeIDs: []string{"s3_test_file"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "dataEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.MetaData, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "AnotherTenant", + utils.Category: "call", //for data CDR use different Tenant + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Nanosecond, + utils.RunID: utils.MetaDefault, + utils.Cost: 0.012, + }, + }, + } + + var reply map[string]utils.MapStorage + if err := s3RPC.Call(utils.EeSv1ProcessEvent, ev, &reply); err != nil { + t.Error(err) + } + time.Sleep(time.Second) +} + +func testS3VerifyExport(t *testing.T) { + endpoint := "https://s3.eu-central-1.amazonaws.com/?awsRegion=eu-central-1&awsKey=AKIAYPZSIYZCZ5U45KEO&awsSecret=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" + region := "eu-central-1" + awsKey := "AKIAYPZSIYZCZ5U45KEO" + awsSecret := "RIUlDyxh7qpoxSBomGOjymIZqSs/pgdXkW16HlKxt" + qname := "cgrates-cdrs" + + key := "key" + key += ".json" + var sess *session.Session + cfg := aws.Config{Endpoint: aws.String(endpoint)} + cfg.Region = aws.String(region) + + cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "") + var err error + sess, err = session.NewSessionWithOptions( + session.Options{ + Config: cfg, + }, + ) + if err != nil { + t.Error(err) + } + s3Clnt := s3.New(sess) + s3Clnt.DeleteObject(&s3.DeleteObjectInput{}) + file := aws.NewWriteAtBuffer([]byte{}) + svc := s3manager.NewDownloader(sess) + + if _, err = svc.Download(file, + &s3.GetObjectInput{ + Bucket: aws.String(qname), + Key: aws.String(key), + }); err != nil { + t.Fatalf("Unable to download item %v", err) + } + + expected := `{"Account":"1001","CGRID":"dbafe9c8614c785a65aabd116dd3959c3c56f7f6","Category":"call","Destination":"1002","OriginID":"dsafdsaf","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice"}` + if rply := string(file.Bytes()); rply != expected { + t.Errorf("Expected: %q, received: %q", expected, rply) + } +} diff --git a/ees/sqs_it_test.go b/ees/sqs_it_test.go new file mode 100644 index 000000000..15e7365ed --- /dev/null +++ b/ees/sqs_it_test.go @@ -0,0 +1,189 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "net/rpc" + "path" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + sqsConfDir string + sqsCfgPath string + sqsCfg *config.CGRConfig + sqsRPC *rpc.Client + + sTestsSQS = []func(t *testing.T){ + testSQSLoadConfig, + testSQSResetDataDB, + testSQSResetStorDb, + testSQSStartEngine, + testSQSRPCConn, + testSQSExportEvent, + // testSQSVerifyExport, + testStopCgrEngine, + } +) + +func TestSQSExport(t *testing.T) { + sqsConfDir = "ees_s3&sqs" + for _, stest := range sTestsSQS { + t.Run(sqsConfDir, stest) + } +} + +func testSQSLoadConfig(t *testing.T) { + var err error + sqsCfgPath = path.Join(*dataDir, "conf", "samples", sqsConfDir) + if sqsCfg, err = config.NewCGRConfigFromPath(sqsCfgPath); err != nil { + t.Error(err) + } +} + +func testSQSResetDataDB(t *testing.T) { + if err := engine.InitDataDb(sqsCfg); err != nil { + t.Fatal(err) + } +} + +func testSQSResetStorDb(t *testing.T) { + if err := engine.InitStorDb(sqsCfg); err != nil { + t.Fatal(err) + } +} + +func testSQSStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(sqsCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testSQSRPCConn(t *testing.T) { + var err error + sqsRPC, err = newRPCClient(sqsCfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testSQSExportEvent(t *testing.T) { + ev := &engine.CGREventWithEeIDs{ + EeIDs: []string{"sqs_test_file"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "voiceEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.MetaVoice, + utils.OriginID: "dsafdsaf", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + }, + }, + } + + var reply map[string]utils.MapStorage + if err := sqsRPC.Call(utils.EeSv1ProcessEvent, ev, &reply); err != nil { + t.Error(err) + } + + time.Sleep(time.Second) +} + +func testSQSVerifyExport(t *testing.T) { + endpoint := "https://sqs.eu-central-1.amazonaws.com/?awsRegion=eu-central-1&awsKey=AKIAYPZSIYZCZ5U45KEO&awsSecret=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" + region := "eu-central-1" + awsKey := "AKIAYPZSIYZCZ5U45KEO" + awsSecret := "RIUlDyxh7qpoxSBomGOjymIZqSs/pgdXkW16HlKxt" + qname := "testQueue" + + var sess *session.Session + cfg := aws.Config{Endpoint: aws.String(endpoint)} + cfg.Region = aws.String(region) + var err error + cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "") + sess, err = session.NewSessionWithOptions( + session.Options{ + Config: cfg, + }, + ) + if err != nil { + t.Error(err) + } + + svc := sqs.New(sess) + + resultURL, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(qname), + }) + if err != nil { + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { + t.Fatalf("Unable to find queue %q.", qname) + } + t.Fatalf("Unable to queue %q, %v.", qname, err) + } + + result, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: resultURL.QueueUrl, + MaxNumberOfMessages: aws.Int64(1), + VisibilityTimeout: aws.Int64(30), + WaitTimeSeconds: aws.Int64(0), + }) + + if err != nil { + t.Error(err) + return + } + + expected := `{"Account":"1001","CGRID":"dbafe9c8614c785a65aabd116dd3959c3c56f7f6","Category":"call","Destination":"1002","OriginID":"dsafdsaf","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice"}` + if len(result.Messages) != 1 { + t.Fatalf("Expected 1 message received: %d", len(result.Messages)) + } + if result.Messages[0].Body == nil { + t.Fatal("No Msg Body") + } + if *result.Messages[0].Body != expected { + t.Errorf("Expected: %q, received: %q", expected, *result.Messages[0].Body) + } +}