From 309b9c998377b1309865044ea6d821a9dd7f97d8 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 14 Aug 2020 16:52:07 +0300 Subject: [PATCH] Added test for CDRServer.ProcessEvent with exporters --- config/config_defaults.go | 2 +- data/conf/cgrates/cgrates.json | 20 +- .../samples/cdrsexport_internal/cgrates.json | 139 +++++++ .../samples/cdrsexport_mongo/cgrates.json | 144 +++++++ .../samples/cdrsexport_mysql/cgrates.json | 141 +++++++ .../cdrsv_failover_internal/cgrates.json | 2 +- general_tests/cdrs_exp_it_test.go | 387 ++++++++++++++++++ general_tests/cdrs_onlexp_it_test.go | 19 +- 8 files changed, 833 insertions(+), 21 deletions(-) create mode 100644 data/conf/samples/cdrsexport_internal/cgrates.json create mode 100644 data/conf/samples/cdrsexport_mongo/cgrates.json create mode 100644 data/conf/samples/cdrsexport_mysql/cgrates.json create mode 100644 general_tests/cdrs_exp_it_test.go diff --git a/config/config_defaults.go b/config/config_defaults.go index 021bb3f9e..a2e083bb9 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -357,7 +357,7 @@ const CGRATES_CFG_JSON = ` "ees": { // EventExporterService "enabled": false, // starts the EventReader service: - "attributes_conns":["*internal"], // RPC Connections IDs + "attributes_conns":[], // RPC Connections IDs "cache": { "*file_csv": {"limit": -1, "ttl": "5s", "static_ttl": false}, }, diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 301d10036..ba2d43cca 100755 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -48,7 +48,10 @@ // "db_name": "10", // data_db database name to connect to // "db_user": "cgrates", // username to use when connecting to data_db // "db_password": "", // password to use when connecting to data_db -// "redis_sentinel":"", // the name of sentinel when used +// "redis_sentinel": "", // the name of sentinel when used +// "redis_cluster": false, // if enabled the datadb will try to connect to the redis cluster +// "cluster_sync": "5s", // the sync interval for the redis cluster +// "cluster_ondown_delay": "0", // the delay before executing the commands if the redis cluster is in the CLUSTERDOWN state // "query_timeout":"10s", // "remote_conns":[], // "replication_conns":[], @@ -294,6 +297,7 @@ // "ers": { // EventReaderService // "enabled": false, // starts the EventReader service: +// "templates":{}, // default templates for ERs // "sessions_conns":["*internal"], // RPC Connections IDs // "readers": [ // { @@ -304,8 +308,8 @@ // "header_define_character": ":", // the starting character for header definition used in case of CSV files // "run_delay": "0", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together // "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited -// "source_path": "/var/spool/cgrates/ers/in", // read data from this path -// "processed_path": "/var/spool/cgrates/ers/out", // move processed data here +// "source_path": "/var/spool/cgrates/ers/in", // read data from this path +// "processed_path": "/var/spool/cgrates/ers/out", // move processed data here // "xml_root_path": "", // path towards one event in case of XML CDRs // "tenant": "", // tenant used by import // "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> @@ -332,10 +336,11 @@ // "ees": { // EventExporterService // "enabled": false, // starts the EventReader service: -// "attributes_conns":["*internal"], // RPC Connections IDs +// "attributes_conns":[], // RPC Connections IDs // "cache": { // "*file_csv": {"limit": -1, "ttl": "5s", "static_ttl": false}, // }, +// "templates":{}, // default templates for EEs // "exporters": [ // { // "id": "*default", // identifier of the EventReader profile @@ -413,6 +418,9 @@ // "enabled": false, // starts the Asterisk agent: // "sessions_conns": ["*internal"], // "create_cdr": false, // create CDR out of events and sends it to CDRS component +// "low_balance_ann_file": "", // file to be played when low balance is reached for prepaid calls +// "empty_balance_context": "", // if defined, prepaid calls will be transferred to this context on empty balance +// "empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined) // "asterisk_conns":[ // instantiate connections to multiple Asterisk servers // {"address": "127.0.0.1:8088", "user": "cgrates", "password": "CGRateS.org", "connect_attempts": 3,"reconnects": 5} // ], @@ -877,6 +885,10 @@ // "out_datadb_user": "cgrates", // "out_datadb_password": "", // "out_datadb_encoding" : "msgpack", +// "out_datadb_redis_sentinel": "", +// "out_datadb_redis_cluster": false, +// "out_datadb_cluster_sync": "5s", +// "out_datadb_cluster_ondown_delay": "0", // "out_stordb_type": "mysql", // "out_stordb_host": "127.0.0.1", // "out_stordb_port": "3306", diff --git a/data/conf/samples/cdrsexport_internal/cgrates.json b/data/conf/samples/cdrsexport_internal/cgrates.json new file mode 100644 index 000000000..ec9deacb5 --- /dev/null +++ b/data/conf/samples/cdrsexport_internal/cgrates.json @@ -0,0 +1,139 @@ +{ +// Sample CGRateS Configuration file for EEs +// +// Copyright (C) ITsysCOM GmbH + +"general": { + "log_level": 7, + "poster_attempts": 2, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "failed_posts_ttl": "1s" +}, + +"data_db": { + "db_type": "*internal", +}, + + +"stor_db": { + "db_type": "*internal", +}, + + +"rals": { + "enabled": true +}, + + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + +"cdrs": { + "enabled": true, + "chargers_conns":["*internal"], + "rals_conns": ["*internal"], + "session_cost_retries": 0, + "online_cdr_exports": ["http_localhost", "amqp_localhost", "aws_test_file", "sqs_test_file", "kafka_localhost", "s3_test_file"], + "ees_conns": ["*localhost"] +}, + + +"ees": { + "enabled": true, + "templates": { + "requiredFields": [ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + + {"tag": "OrderID", "path": "*exp.OrderID", "type": "*variable", "value": "~*req.OrderID","filter":"*string:~*opts.AddOrderID:true"} + ] + }, + "exporters": [ + { + "id": "http_localhost", + "type": "*http_post", + "export_path": "http://127.0.0.1:12081/cdr_http", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "amqp_localhost", + "type": "*amqp_json_map", + "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs", + "tenant": "cgrates.org", + "attempts": 3, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "aws_test_file", + "type": "*amqpv1_json_map", + "export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "sqs_test_file", + "type": "*sqs_json_map", + // export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" + "export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "kafka_localhost", + "type": "*kafka_json_map", + "export_path": "localhost:9092?topic=cgrates_cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "s3_test_file", + "type": "*s3_json_map", + // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" + "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + ], +}, + + +"apiers": { + "enabled": true, +}, + + +} diff --git a/data/conf/samples/cdrsexport_mongo/cgrates.json b/data/conf/samples/cdrsexport_mongo/cgrates.json new file mode 100644 index 000000000..12a7bb6bc --- /dev/null +++ b/data/conf/samples/cdrsexport_mongo/cgrates.json @@ -0,0 +1,144 @@ +{ +// Sample CGRateS Configuration file for EEs +// +// Copyright (C) ITsysCOM GmbH + +"general": { + "log_level": 7, + "poster_attempts": 2, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "failed_posts_ttl": "1s" +}, + + +"data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017, +}, + + +"stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, +}, + + +"rals": { + "enabled": true +}, + + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + +"cdrs": { + "enabled": true, + "chargers_conns":["*internal"], + "rals_conns": ["*internal"], + "session_cost_retries": 0, + "online_cdr_exports": ["http_localhost", "amqp_localhost", "aws_test_file", "sqs_test_file", "kafka_localhost", "s3_test_file"], + "ees_conns": ["*localhost"] +}, + + +"ees": { + "enabled": true, + "templates": { + "requiredFields": [ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + + {"tag": "OrderID", "path": "*exp.OrderID", "type": "*variable", "value": "~*req.OrderID","filter":"*string:~*opts.AddOrderID:true"} + ] + }, + "exporters": [ + { + "id": "http_localhost", + "type": "*http_post", + "export_path": "http://127.0.0.1:12081/cdr_http", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "amqp_localhost", + "type": "*amqp_json_map", + "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs", + "tenant": "cgrates.org", + "attempts": 3, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "aws_test_file", + "type": "*amqpv1_json_map", + "export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "sqs_test_file", + "type": "*sqs_json_map", + // export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" + "export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "kafka_localhost", + "type": "*kafka_json_map", + "export_path": "localhost:9092?topic=cgrates_cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "s3_test_file", + "type": "*s3_json_map", + // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" + "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + ], +}, + + +"apiers": { + "enabled": true, +}, + + +} diff --git a/data/conf/samples/cdrsexport_mysql/cgrates.json b/data/conf/samples/cdrsexport_mysql/cgrates.json new file mode 100644 index 000000000..e6e54bbae --- /dev/null +++ b/data/conf/samples/cdrsexport_mysql/cgrates.json @@ -0,0 +1,141 @@ +{ +// Sample CGRateS Configuration file for EEs +// +// Copyright (C) ITsysCOM GmbH + +"general": { + "log_level": 7, + "poster_attempts": 1, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "failed_posts_ttl": "1s" +}, + +"data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "10" +}, + + +"stor_db": { + "db_password": "CGRateS.org" +}, + + +"rals": { + "enabled": true +}, + + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + +"cdrs": { + "enabled": true, + "chargers_conns":["*internal"], + "rals_conns": ["*internal"], + "session_cost_retries": 0, + "online_cdr_exports": ["http_localhost", "amqp_localhost", "aws_test_file", "sqs_test_file", "kafka_localhost", "s3_test_file"], + "ees_conns": ["*localhost"] +}, + + +"ees": { + "enabled": true, + "templates": { + "requiredFields": [ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + + {"tag": "OrderID", "path": "*exp.OrderID", "type": "*variable", "value": "~*req.OrderID","filter":"*string:~*opts.AddOrderID:true"} + ] + }, + "exporters": [ + { + "id": "http_localhost", + "type": "*http_post", + "export_path": "http://127.0.0.1:12081/cdr_http", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "amqp_localhost", + "type": "*amqp_json_map", + "export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs", + "tenant": "cgrates.org", + "attempts": 3, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "aws_test_file", + "type": "*amqpv1_json_map", + "export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "sqs_test_file", + "type": "*sqs_json_map", + // export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" + "export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "kafka_localhost", + "type": "*kafka_json_map", + "export_path": "localhost:9092?topic=cgrates_cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + { + "id": "s3_test_file", + "type": "*s3_json_map", + // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" + "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, + ], + }, + ], +}, + + +"apiers": { + "enabled": true, +}, + + +} diff --git a/data/conf/samples/cdrsv_failover_internal/cgrates.json b/data/conf/samples/cdrsv_failover_internal/cgrates.json index 74e8ec85e..8f1758b70 100644 --- a/data/conf/samples/cdrsv_failover_internal/cgrates.json +++ b/data/conf/samples/cdrsv_failover_internal/cgrates.json @@ -7,7 +7,7 @@ "general": { "log_level": 7, "node_id": "TestFailCDRS", - "poster_attempts": 1, // number of attempts before considering post request failed (eg: *http_post, CDR exports) + "poster_attempts": 1, // number of attempts before considering post request failed (eg: *http_post, CDR exports) "failed_posts_ttl": "1s", // time to wait before writing the failed posts in a single file }, diff --git a/general_tests/cdrs_exp_it_test.go b/general_tests/cdrs_exp_it_test.go new file mode 100644 index 000000000..f80056bcf --- /dev/null +++ b/general_tests/cdrs_exp_it_test.go @@ -0,0 +1,387 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package general_tests + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/rpc" + "net/url" + "os" + "os/exec" + "path" + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + kafka "github.com/segmentio/kafka-go" + "github.com/streadway/amqp" +) + +var ( + cdrsExpCfgPath string + cdrsExpCfgDir string + cdrsExpCfg *config.CGRConfig + cdrsExpRPC *rpc.Client + + cdrsExpHTTPEv = make(chan map[string]interface{}, 1) + cdrsExpHTTPServer *http.Server + + cdrsExpAMQPCon *amqp.Connection + + cdrsExpEv = &utils.CGREventWithOpts{ + Opts: map[string]interface{}{}, + CGREvent: &utils.CGREvent{ + ID: "Export", + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.CGRID: "TestCGRID", + utils.ToR: utils.VOICE, + utils.OriginID: "TestCDRsExp", + utils.OriginHost: "192.168.1.0", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC), + utils.AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), + utils.Usage: 10 * time.Second, + utils.Cost: 1.201, + }, + }, + } + + cdrsExpEvExp = map[string]interface{}{ + utils.CGRID: "TestCGRID", + utils.ToR: utils.VOICE, + utils.OriginID: "TestCDRsExp", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.RunID: utils.MetaRaw, + utils.OrderID: "0", + } + + cdrsExpTests = []func(t *testing.T){ + testCDRsExpInitConfig, + testCDRsExpInitDB, + testCDRsExpPrepareHTTP, + testCDRsExpPrepareAMQP, + testCDRsExpStartEngine, + testCDRsExpInitRPC, + testCDRsExpLoadAddCharger, + testCDRsExpExportEvent, + testCDRsExpHTTP, + testCDRsExpAMQP, + testCDRsExpKafka, + testCDRsExpFileFailover, + testCDRsExpStopEngine, + testCDRsExpStopHTTPServer, + testCDRsExpCloseAMQP, + } +) + +func TestCDRsExp(t *testing.T) { + switch *dbType { + case utils.MetaInternal: + cdrsExpCfgDir = "cdrsexport_internal" + case utils.MetaMySQL: + cdrsExpCfgDir = "cdrsexport_mysql" + case utils.MetaMongo: + cdrsExpCfgDir = "cdrsexport_mongo" + case utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("Unknown Database type") + } + + for _, stest := range cdrsExpTests { + t.Run(cdrsExpCfgDir, stest) + } +} + +func testCDRsExpInitConfig(t *testing.T) { + var err error + cdrsExpCfgPath = path.Join(*dataDir, "conf", "samples", cdrsExpCfgDir) + if cdrsExpCfg, err = config.NewCGRConfigFromPath(cdrsExpCfgPath); err != nil { + t.Fatal(err) + } +} + +func testCDRsExpInitDB(t *testing.T) { + if err := engine.InitDataDb(cdrsExpCfg); err != nil { + t.Fatal(err) + } + if err := engine.InitStorDb(cdrsExpCfg); err != nil { + t.Fatal(err) + } + if err := os.RemoveAll(cdrsExpCfg.GeneralCfg().FailedPostsDir); err != nil { + t.Fatal("Error removing folder: ", cdrsExpCfg.GeneralCfg().FailedPostsDir, err) + } + if err := os.MkdirAll(cdrsExpCfg.GeneralCfg().FailedPostsDir, 0700); err != nil { + t.Error(err) + } + +} + +func testCDRsExpPrepareHTTP(t *testing.T) { + srvMux := http.NewServeMux() + srvMux.HandleFunc("/cdr_http", func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + var buf bytes.Buffer + if _, err := buf.ReadFrom(r.Body); err != nil { + t.Logf("Error reading the body:%s", err) + return + } + vals, err := url.ParseQuery(buf.String()) + if err != nil { + t.Logf("Error parsing the values: %s", err) + return + } + + ev := make(map[string]interface{}) + for k, v := range vals { + ev[k] = v[0] + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, utils.OK) + cdrsExpHTTPEv <- ev + }) + cdrsExpHTTPServer = &http.Server{Addr: ":12081", Handler: srvMux} + + go func(t2 *testing.T) { + if err := cdrsExpHTTPServer.ListenAndServe(); err != nil && + err != http.ErrServerClosed { + t2.Log(err) + } + }(t) +} + +func testCDRsExpPrepareAMQP(t *testing.T) { + var err error + if cdrsExpAMQPCon, err = amqp.Dial("amqp://guest:guest@localhost:5672/"); err != nil { + t.Fatal(err) + } + defer cdrsExpAMQPCon.Close() + + var ch *amqp.Channel + if ch, err = cdrsExpAMQPCon.Channel(); err != nil { + t.Fatal(err) + } + defer ch.Close() + + if err = ch.ExchangeDeclare("exchangename", "fanout", true, false, false, false, nil); err != nil { + t.Fatal(err) + } +} + +func testCDRsExpStartEngine(t *testing.T) { + time.Sleep(1) + if _, err := engine.StopStartEngine(cdrsExpCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testCDRsExpInitRPC(t *testing.T) { + var err error + if cdrsExpRPC, err = newRPCClient(cdrsExpCfg.ListenCfg()); err != nil { + t.Fatal(err) + } +} + +func testCDRsExpLoadAddCharger(t *testing.T) { + // //add a default charger + chargerProfile := &engine.ChargerProfile{ + Tenant: "cgrates.org", + ID: "*raw", + RunID: utils.MetaRaw, + AttributeIDs: []string{"*constant:*opts.AddOrderID:true"}, + Weight: 20, + } + var result string + if err := cdrsExpRPC.Call(utils.APIerSv1SetChargerProfile, chargerProfile, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func testCDRsExpExportEvent(t *testing.T) { + // stop RabbitMQ server so we can test reconnects + if err := exec.Command("service", "rabbitmq-server", "stop").Run(); err != nil { + t.Error(err) + } + var reply string + if err := cdrsExpRPC.Call(utils.CDRsV1ProcessEvent, + &engine.ArgV1ProcessEvent{ + Flags: []string{"*export:true", utils.MetaRALs}, + CGREventWithOpts: *cdrsExpEv, + }, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if reply != utils.OK { + t.Error("Unexpected reply received: ", reply) + } + time.Sleep(100 * time.Millisecond) + filesInDir, _ := ioutil.ReadDir(cdrsExpCfg.GeneralCfg().FailedPostsDir) + if len(filesInDir) != 0 { + t.Fatalf("Should be no files in directory: %s", cdrsExpCfg.GeneralCfg().FailedPostsDir) + } + // start RabbitMQ server so we can test reconnects + if err := exec.Command("service", "rabbitmq-server", "start").Run(); err != nil { + t.Error(err) + } + time.Sleep(time.Second) + var err error + if cdrsExpAMQPCon, err = amqp.Dial("amqp://guest:guest@localhost:5672/"); err != nil { + t.Fatal(err) + } +} + +func testCDRsExpHTTP(t *testing.T) { + select { + case rcvCDR := <-cdrsExpHTTPEv: + if !reflect.DeepEqual(cdrsExpEvExp, rcvCDR) { + t.Errorf("Expected %s received %s", utils.ToJSON(cdrsExpEvExp), utils.ToJSON(rcvCDR)) + } + case <-time.After(time.Duration(100 * time.Millisecond)): + t.Error("No message received from RabbitMQ") + } +} + +func testCDRsExpAMQP(t *testing.T) { + ch, err := cdrsExpAMQPCon.Channel() + if err != nil { + t.Fatal(err) + } + defer ch.Close() + + msgs, err := ch.Consume("cgrates_cdrs", "", true, false, false, false, nil) + if err != nil { + t.Fatal(err) + } + select { + case d := <-msgs: + var rcvCDR map[string]interface{} + if err := json.Unmarshal(d.Body, &rcvCDR); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(cdrsExpEvExp, rcvCDR) { + t.Errorf("Expected %s received %s", utils.ToJSON(cdrsExpEvExp), utils.ToJSON(rcvCDR)) + } + case <-time.After(time.Duration(100 * time.Millisecond)): + t.Error("No message received from RabbitMQ") + } +} + +func testCDRsExpKafka(t *testing.T) { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "cgrates_cdrs", + GroupID: "tmp", + MaxWait: time.Millisecond, + }) + + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + var m kafka.Message + var err error + if m, err = reader.ReadMessage(ctx); err != nil { + t.Fatal(err) + } + var rcvCDR map[string]interface{} + if err := json.Unmarshal(m.Value, &rcvCDR); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(cdrsExpEvExp, rcvCDR) { + t.Errorf("Expected %s received %s", utils.ToJSON(cdrsExpEvExp), utils.ToJSON(rcvCDR)) + } + cancel() +} + +func testCDRsExpFileFailover(t *testing.T) { + time.Sleep(time.Second) + filesInDir, _ := ioutil.ReadDir(cdrsExpCfg.GeneralCfg().FailedPostsDir) + if len(filesInDir) == 0 { + t.Fatalf("No files in directory: %s", cdrsExpCfg.GeneralCfg().FailedPostsDir) + } + expectedFormats := utils.NewStringSet([]string{utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaS3jsonMap}) + rcvFormats := utils.StringSet{} + for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config + fileName := file.Name() + filePath := path.Join(cdrsExpCfg.GeneralCfg().FailedPostsDir, fileName) + + ev, err := engine.NewExportEventsFromFile(filePath) + if err != nil { + t.Errorf("<%s> for file <%s>", err, fileName) + continue + } else if len(ev.Events) == 0 { + t.Error("Expected at least one event") + continue + } + rcvFormats.Add(ev.Format) + if err := checkContent(ev, []interface{}{[]byte(utils.ToJSON(cdrsExpEvExp))}); err != nil { + t.Errorf("For file <%s> and event <%s> received %s", filePath, utils.ToJSON(ev), err) + } + } + if !reflect.DeepEqual(expectedFormats, rcvFormats) { + t.Errorf("Missing format expecting: %s received: %s", utils.ToJSON(expectedFormats), utils.ToJSON(rcvFormats)) + } +} + +func testCDRsExpStopEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} + +func testCDRsExpStopHTTPServer(t *testing.T) { + if err = cdrsExpHTTPServer.Shutdown(context.Background()); err != nil { + t.Fatal(err) + } +} + +func testCDRsExpCloseAMQP(t *testing.T) { + ch, err := cdrsExpAMQPCon.Channel() + if err != nil { + t.Fatal(err) + } + if _, err = ch.QueueDelete("cgrates_cdrs", false, false, true); err != nil { + ch.Close() + t.Fatal(err) + } + ch.Close() + if err := cdrsExpAMQPCon.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/general_tests/cdrs_onlexp_it_test.go b/general_tests/cdrs_onlexp_it_test.go index 72dd11e16..d504a58d3 100644 --- a/general_tests/cdrs_onlexp_it_test.go +++ b/general_tests/cdrs_onlexp_it_test.go @@ -324,18 +324,7 @@ func testCDRsOnExpAMQPReplication(t *testing.T) { } defer ch.Close() - q, err := ch.QueueDeclare("cgrates_cdrs", true, false, false, false, nil) - if err != nil { - conn.Close() - t.Fatal(err) - } - q1, err := ch.QueueDeclare("queue1", true, false, false, false, nil) - if err != nil { - conn.Close() - t.Fatal(err) - } - - msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) + msgs, err := ch.Consume("cgrates_cdrs", "", true, false, false, false, nil) if err != nil { conn.Close() t.Fatal(err) @@ -352,7 +341,7 @@ func testCDRsOnExpAMQPReplication(t *testing.T) { case <-time.After(time.Duration(100 * time.Millisecond)): t.Error("No message received from RabbitMQ") } - if msgs, err = ch.Consume(q1.Name, "consumer", true, false, false, false, nil); err != nil { + if msgs, err = ch.Consume("queue1", "consumer", true, false, false, false, nil); err != nil { conn.Close() t.Fatal(err) } @@ -421,7 +410,7 @@ func testCDRsOnExpAMQPReplication(t *testing.T) { } defer ch.Close() - if msgs, err = ch.Consume(q.Name, "", true, false, false, false, nil); err != nil { + if msgs, err = ch.Consume("cgrates_cdrs", "", true, false, false, false, nil); err != nil { t.Fatal(err) } select { @@ -437,7 +426,7 @@ func testCDRsOnExpAMQPReplication(t *testing.T) { t.Error("No message received from RabbitMQ") } - if msgs, err = ch.Consume(q1.Name, "", true, false, false, false, nil); err != nil { + if msgs, err = ch.Consume("queue1", "", true, false, false, false, nil); err != nil { t.Fatal(err) } select {