diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 52a6c3b5d..59a8d1291 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -440,6 +440,22 @@ "opts": { "natsSubject": "processed_cdrs", } + }, + { + "id": "AMQPExporter", + "type": "*amqp_json_map", + "export_path": "amqp://guest:guest@localhost:5672/", + "opts": { + "amqpQueueID": "cgrates_cdrs", + "amqpExchange": "exchangename", + "amqpExchangeType": "fanout", + "amqpRoutingKey": "cgr_cdrs" + }, + "failed_posts_dir": "/var/spool/cgrates/failed_posts2", + "synchronous": true, + "fields":[ + {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} + ] } ] }, diff --git a/data/conf/samples/ees_amqp/cgrates.json b/data/conf/samples/ees_amqp/cgrates.json deleted file mode 100644 index 8db513541..000000000 --- a/data/conf/samples/ees_amqp/cgrates.json +++ /dev/null @@ -1,58 +0,0 @@ -{ - - "general": { - "log_level": 7, - "poster_attempts": 2, - "failed_posts_ttl": "1s" - }, - - "data_db": { - "db_type": "*internal" - }, - - - "stor_db": { - "db_type": "*internal" - }, - - "ees": { - "enabled": true, - "exporters": [ - { - "id": "amqp_test_file", - "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:5672/", - "opts": { - "amqpQueueID": "cgrates_cdrs", - "amqpExchange": "exchangename", - "amqpExchangeType": "fanout", - "amqpRoutingKey": "cgr_cdrs" - }, - "attempts": 1, - "failed_posts_dir": "/var/spool/cgrates/failed_posts2", - "synchronous": true, - "fields":[ - {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} - ] - } - ] - }, - - - "templates": { - "requiredFields": [ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, - {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, - {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, - {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, - {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, - {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, - {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, - {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, - {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, - {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, - {"tag": "OrderID", "path": "*exp.OrderID", "type": "*variable", "value": "~*req.OrderID","filter":"*string:~*opts.AddOrderID:true"} - ] - } - - } \ No newline at end of file diff --git a/ees/amqp_it_test.go b/ees/amqp_it_test.go index 7309df003..a24d9a1c3 100644 --- a/ees/amqp_it_test.go +++ b/ees/amqp_it_test.go @@ -34,25 +34,30 @@ import ( ) var ( - amqpConfDir string - amqpCfgPath string - amqpCfg *config.CGRConfig - amqpRPC *rpc.Client + amqpConfDir string + amqpCfgPath string + amqpCfg *config.CGRConfig + amqpRPC *rpc.Client + amqpExportPath string sTestsAMQP = []func(t *testing.T){ + testCreateDirectory, testAMQPLoadConfig, testAMQPResetDataDB, testAMQPResetStorDb, testAMQPStartEngine, testAMQPRPCConn, + testAMQPExportEvent, testAMQPVerifyExport, + testStopCgrEngine, + testCleanDirectory, } ) func TestAMQPExport(t *testing.T) { - amqpConfDir = "ees_amqp" + amqpConfDir = "ees" for _, stest := range sTestsAMQP { t.Run(amqpConfDir, stest) } @@ -64,6 +69,11 @@ func testAMQPLoadConfig(t *testing.T) { if amqpCfg, err = config.NewCGRConfigFromPath(amqpCfgPath); err != nil { t.Error(err) } + for _, value := range amqpCfg.EEsCfg().Exporters { + if value.ID == "AMQPExporter" { + amqpExportPath = value.ExportPath + } + } } func testAMQPResetDataDB(t *testing.T) { @@ -94,7 +104,7 @@ func testAMQPRPCConn(t *testing.T) { func testAMQPExportEvent(t *testing.T) { ev := &engine.CGREventWithEeIDs{ - EeIDs: []string{"amqp_test_file"}, + EeIDs: []string{"AMQPExporter"}, CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", ID: "voiceEvent", @@ -124,11 +134,11 @@ func testAMQPExportEvent(t *testing.T) { t.Error(err) } - time.Sleep(2 * time.Second) + time.Sleep(1 * time.Second) } func testAMQPVerifyExport(t *testing.T) { - conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + conn, err := amqp.Dial(amqpExportPath) if err != nil { t.Fatal(err) } @@ -146,14 +156,20 @@ func testAMQPVerifyExport(t *testing.T) { if err != nil { t.Fatal(err) } - expCDR := `{"Account":"1001","CGRID":"dbafe9c8614c785a65aabd116dd3959c3c56f7f6","Category":"call","Destination":"1002","OriginID":"dsafdsaf","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice"}` + expCDR := `{"Account":"1001","AnswerTime":"2013-11-07T08:42:26Z","CGRID":"dbafe9c8614c785a65aabd116dd3959c3c56f7f6","Category":"call","Cost":"1.01","Destination":"1002","OriginID":"dsafdsaf","RequestType":"*rated","RunID":"*default","SetupTime":"2013-11-07T08:42:25Z","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice","Usage":"10000000000"}` select { case d := <-msgs: rcvCDR := string(d.Body) if rcvCDR != expCDR { - t.Errorf("unexpected CDR received: <%s>", rcvCDR) + t.Errorf("expected: <%+v>, \nreceived: <%+v>", expCDR, rcvCDR) } case <-time.After(100 * time.Millisecond): t.Error("No message received from RabbitMQ") } + + // Delete the queue after verifying if the export was successful + _, err = ch.QueueDelete("cgrates_cdrs", false, false, true) + if err != nil { + t.Error(err) + } }