Added integration test for RabbitMQ poster

This commit is contained in:
Trial97
2018-12-19 14:05:25 +02:00
committed by Dan Christian Bogos
parent b3378d62af
commit c1aa7c5400
3 changed files with 78 additions and 22 deletions

View File

@@ -51,7 +51,7 @@
},
"amqp_localhost": {
"export_format": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs",
"export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs",
"attempts": 3,
"content_fields": [ // template of the exported content fields
{"tag": "CGRID", "type": "*composed", "value": "~CGRID", "field_id": "CGRID"},

View File

@@ -249,10 +249,10 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by
}
for i := 0; i < pstr.attempts; i++ {
if err = chn.Publish(
pstr.exchange, // exchange
pstr.queueID, // routing key
false, // mandatory
false, // immediate
pstr.exchange, // exchange
pstr.routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: contentType,
@@ -302,7 +302,7 @@ func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) {
}
if pstr.exchange != "" {
err = postChan.ExchangeDeclare(
if err = postChan.ExchangeDeclare(
pstr.exchange, // name
pstr.exchangeType, // type
true, // durable
@@ -310,33 +310,30 @@ func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) {
false, // internal
false, // no-wait
nil, // args
)
if err != nil {
); err != nil {
return
}
}
_, err = postChan.QueueDeclare(
if _, err = postChan.QueueDeclare(
pstr.queueID, // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // args
)
if err != nil {
); err != nil {
return
}
if pstr.exchange != "" {
err = postChan.QueueBind(
pstr.queueID, // queue
routingKey, // key
pstr.exchange, // exchange
false, // no-wait
nil, // args
)
if err != nil {
if err = postChan.QueueBind(
pstr.queueID, // queue
pstr.routingKey, // key
pstr.exchange, // exchange
false, // no-wait
nil, // args
); err != nil {
return
}
}

View File

@@ -84,6 +84,32 @@ func TestCDRsOnExpStartSlaveEngine(t *testing.T) {
}
}
// Create Queues dor amq
func TestCDRsOnExpAMQPQueuesCreation(t *testing.T) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
t.Fatal(err)
}
ch, err := conn.Channel()
if err != nil {
t.Fatal(err)
}
defer ch.Close()
if err = ch.ExchangeDeclare("exchangename", "fanout", true, false, false, false, nil); err != nil {
return
}
q1, err := ch.QueueDeclare("queue1", true, false, false, false, nil)
if err != nil {
t.Fatal(err)
}
if err = ch.QueueBind(q1.Name, "cgr_cdrs", "exchangename", false, nil); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func TestCDRsOnExpHttpCdrReplication(t *testing.T) {
cdrsMasterRpc, err = rpcclient.NewRpcClient("tcp", cdrsMasterCfg.ListenCfg().RPCJSONListen, false, "", "", "", 1, 1,
@@ -171,6 +197,11 @@ func TestCDRsOnExpAMQPReplication(t *testing.T) {
if err != nil {
t.Fatal(err)
}
q1, err := ch.QueueDeclare("queue1", true, false, false, false, nil)
if err != nil {
t.Fatal(err)
}
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
t.Fatal(err)
@@ -187,6 +218,21 @@ func TestCDRsOnExpAMQPReplication(t *testing.T) {
case <-time.After(time.Duration(100 * time.Millisecond)):
t.Error("No message received from RabbitMQ")
}
if msgs, err = ch.Consume(q1.Name, "consumer", true, false, false, false, nil); err != nil {
t.Fatal(err)
}
select {
case d := <-msgs:
var rcvCDR map[string]string
if err := json.Unmarshal(d.Body, &rcvCDR); err != nil {
t.Error(err)
}
if rcvCDR[utils.CGRID] != utils.Sha1("httpjsonrpc1", time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC).String()) {
t.Errorf("Unexpected CDR received: %+v", rcvCDR)
}
case <-time.After(time.Duration(100 * time.Millisecond)):
t.Error("No message received from RabbitMQ")
}
conn.Close()
// restart RabbitMQ server so we can test reconnects
if err := exec.Command("service", "rabbitmq-server", "restart").Run(); err != nil {
@@ -230,9 +276,6 @@ func TestCDRsOnExpAMQPReplication(t *testing.T) {
}
defer ch.Close()
if q, err = ch.QueueDeclare("cgrates_cdrs", true, false, false, false, nil); err != nil {
t.Fatal(err)
}
if msgs, err = ch.Consume(q.Name, "", true, false, false, false, nil); err != nil {
t.Fatal(err)
}
@@ -249,6 +292,22 @@ func TestCDRsOnExpAMQPReplication(t *testing.T) {
t.Error("No message received from RabbitMQ")
}
if msgs, err = ch.Consume(q1.Name, "", true, false, false, false, nil); err != nil {
t.Fatal(err)
}
select {
case d := <-msgs:
var rcvCDR map[string]string
if err := json.Unmarshal(d.Body, &rcvCDR); err != nil {
t.Error(err)
}
if rcvCDR[utils.CGRID] != testCdr.CGRID {
t.Errorf("Unexpected CDR received: %s expeced: %s", utils.ToJSON(rcvCDR), utils.ToJSON(testCdr))
}
case <-time.After(150 * time.Millisecond):
t.Error("No message received from RabbitMQ")
}
}
func TestCDRsOnExpHTTPPosterFileFailover(t *testing.T) {