From 55c3730cda51abb781a7b57bb4df02188c1c8b48 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Mon, 16 Oct 2023 09:53:39 -0400 Subject: [PATCH] Update AMQP message processing loop Previously, msg.Ack(true) was used, which is mostly used for batch processing. It mistakenly acknowledged all previously unacknowledged messages, causing errors from the AMQP server. Now, messages are acknowledged individually after each one is processed. Messages that ERs failed to process are now rejected and requeued for future processing attempts. The reader is now closed immediately if the message delivery channel closes. Therefore, it prevents an endless loop by avoiding continuous consumption from empty or closed channels. Addresses: #4146 --- ers/amqp.go | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/ers/amqp.go b/ers/amqp.go index 9a104a13d..610009c60 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -158,21 +158,47 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) { utils.ERs, rdr.dialURL)) rdr.close() return - case msg := <-msgChan: + case msg, ok := <-msgChan: + if !ok { + utils.Logger.Warning( + fmt.Sprintf("<%s> lost connection to AMQP server at %s, closing reader...", + utils.ERs, rdr.dialURL)) + rdr.close() + return + } go func(msg amqp.Delivery) { - if err := rdr.processMessage(msg.Body); err != nil { + err := rdr.processMessage(msg.Body) + if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> processing message %s error: %s", utils.ERs, msg.MessageId, err.Error())) + + err = msg.Reject(true) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error negatively acknowledging message %s: %s", + utils.ERs, msg.MessageId, err.Error())) + } + return } - if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString); err != nil { + + // Post the message if poster is available. + if rdr.poster != nil { + err = ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString) + if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, msg.MessageId, err.Error())) + } } - msg.Ack(true) + + err = msg.Ack(false) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error acknowledging message %s: %s", + utils.ERs, msg.MessageId, err.Error())) + } if rdr.Config().ConcurrentReqs != -1 { rdr.cap <- struct{}{} }