From d2115893b88018596cb02c9f67bb97bc9c60ce12 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 3 Feb 2017 16:30:26 +0100 Subject: [PATCH] Adding tests for CDR replication over *amqp_json_map --- .../cdrsreplicationmaster.json | 37 +++++++++++++++ general_tests/cdrs_replication_it_test.go | 47 +++++++++++++++++-- utils/poster.go | 2 +- 3 files changed, 81 insertions(+), 5 deletions(-) diff --git a/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json b/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json index a86ba5782..69c9b855d 100644 --- a/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json +++ b/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json @@ -4,6 +4,11 @@ // Used in apier_local_tests // Starts rater, cdrs and mediator connecting over internal channel +"general": { + "log_level": 7, + "poster_attempts": 1, +}, + "rals": { "enabled": true, // enable Rater service: }, @@ -36,6 +41,30 @@ {"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"}, ], }, + { + "transport": "*amqp_json_map", + "address": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs", + "attempts": 1, + "cdr_filter": "", + "content_fields": [ + {"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"}, + {"tag":"RunID", "type": "*composed", "value": "RunID", "field_id": "RunID"}, + {"tag":"TOR", "type": "*composed", "value": "ToR", "field_id": "ToR"}, + {"tag":"OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"}, + {"tag":"OriginHost", "type": "*composed", "value": "OriginHost", "field_id": "OriginHost"}, + {"tag":"RequestType", "type": "*composed", "value": "RequestType", "field_id": "RequestType"}, + {"tag":"Direction", "type": "*composed", "value": "Direction", "field_id": "Direction"}, + {"tag":"Tenant", "type": "*composed", "value": "Tenant", "field_id": "Tenant"}, + {"tag":"Category", "type": "*composed", "value": "Category", "field_id": "Category"}, + {"tag":"Account", "type": "*composed", "value": "Account", "field_id": "Account"}, + {"tag":"Subject", "type": "*composed", "value": "Subject", "field_id": "Subject"}, + {"tag":"Destination", "type": "*composed", "value": "Destination", "field_id": "Destination"}, + {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"}, + {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"}, + {"tag":"Usage", "type": "*composed", "value": "Usage", "field_id": "Usage"}, + {"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"}, + ], + }, { "transport": "*http_post", "address": "http://127.0.0.1:12080/invalid", @@ -43,6 +72,14 @@ "content_fields": [ {"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"}, ], + }, + { + "transport": "*amqp_json_map", + "address": "amqp://guest:guest@localhost:25672/?queue_id=cgrates_cdrs", + "attempts": 1, + "content_fields": [ + {"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"}, + ], }, ], }, diff --git a/general_tests/cdrs_replication_it_test.go b/general_tests/cdrs_replication_it_test.go index e421b5881..290e6c64d 100644 --- a/general_tests/cdrs_replication_it_test.go +++ b/general_tests/cdrs_replication_it_test.go @@ -21,8 +21,9 @@ along with this program. If not, see package general_tests import ( + "encoding/json" "io/ioutil" - "os" + //"os" "path" "reflect" "strings" @@ -33,6 +34,7 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" + "github.com/streadway/amqp" ) var cdrsMasterCfgPath, cdrsSlaveCfgPath string @@ -136,6 +138,41 @@ func TestCdrsHttpCdrReplication(t *testing.T) { } } +func TestCdrsAMQPReplication(t *testing.T) { + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + ch, err := conn.Channel() + if err != nil { + t.Fatal(err) + } + defer ch.Close() + + q, err := ch.QueueDeclare("cgrates_cdrs", true, false, false, false, nil) + if err != nil { + t.Fatal(err) + } + msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) + if err != nil { + t.Fatal(err) + } + select { + case d := <-msgs: + var rcvCDR map[string]string + if err := json.Unmarshal(d.Body, &rcvCDR); err != nil { + t.Error(err) + } + if rcvCDR[utils.CGRID] != utils.Sha1("httpjsonrpc1", time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC).String()) { + t.Errorf("Unexpected CDR received: %+v", rcvCDR) + } + case <-time.After(time.Duration(100 * time.Millisecond)): + t.Error("No message received from RabbitMQ") + } +} + // Connect rpc client to rater func TestCdrsFileFailover(t *testing.T) { time.Sleep(time.Duration(2 * time.Second)) @@ -161,9 +198,11 @@ func TestCdrsFileFailover(t *testing.T) { } else if !reflect.DeepEqual(failoverContent, readBytes) { // Checking just the prefix should do since some content is dynamic t.Errorf("Expecting: %q, received: %q", string(failoverContent), string(readBytes)) } - if err := os.Remove(filePath); err != nil { - t.Error("Failed removing file: ", filePath) - } + /* + if err := os.Remove(filePath); err != nil { + t.Error("Failed removing file: ", filePath) + } + */ } /* diff --git a/utils/poster.go b/utils/poster.go index caf7b2840..e29193baf 100644 --- a/utils/poster.go +++ b/utils/poster.go @@ -230,7 +230,7 @@ func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int, fallbac return pc.cache[dialURL], nil } -// "amqp://guest:guest@localhost:5672/?queueID=cgr_cdrs" +// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) { u, err := url.Parse(dialURL) if err != nil {