diff --git a/ers/amqp.go b/ers/amqp.go index 9a104a13d..610009c60 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -158,21 +158,47 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) { utils.ERs, rdr.dialURL)) rdr.close() return - case msg := <-msgChan: + case msg, ok := <-msgChan: + if !ok { + utils.Logger.Warning( + fmt.Sprintf("<%s> lost connection to AMQP server at %s, closing reader...", + utils.ERs, rdr.dialURL)) + rdr.close() + return + } go func(msg amqp.Delivery) { - if err := rdr.processMessage(msg.Body); err != nil { + err := rdr.processMessage(msg.Body) + if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> processing message %s error: %s", utils.ERs, msg.MessageId, err.Error())) + + err = msg.Reject(true) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error negatively acknowledging message %s: %s", + utils.ERs, msg.MessageId, err.Error())) + } + return } - if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString); err != nil { + + // Post the message if poster is available. + if rdr.poster != nil { + err = ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString) + if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, msg.MessageId, err.Error())) + } } - msg.Ack(true) + + err = msg.Ack(false) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error acknowledging message %s: %s", + utils.ERs, msg.MessageId, err.Error())) + } if rdr.Config().ConcurrentReqs != -1 { rdr.cap <- struct{}{} }