Update AMQP message processing loop

Previously, msg.Ack(true) was used, which is mostly used for batch
processing. It mistakenly acknowledged all previously unacknowledged
messages, causing errors from the AMQP server. Now, messages are
acknowledged individually after each one is processed.

Messages that ERs failed to process are now rejected and requeued
for future processing attempts.

The reader is now closed immediately if the message delivery
channel closes. Therefore, it prevents an endless loop by avoiding
continuous consumption from empty or closed channels.

Addresses: #4146
This commit is contained in:
ionutboangiu
2023-10-16 09:53:39 -04:00
committed by Dan Christian Bogos
parent d7aabe20e3
commit 55c3730cda

View File

@@ -158,21 +158,47 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) {
utils.ERs, rdr.dialURL))
rdr.close()
return
case msg := <-msgChan:
case msg, ok := <-msgChan:
if !ok {
utils.Logger.Warning(
fmt.Sprintf("<%s> lost connection to AMQP server at %s, closing reader...",
utils.ERs, rdr.dialURL))
rdr.close()
return
}
go func(msg amqp.Delivery) {
if err := rdr.processMessage(msg.Body); err != nil {
err := rdr.processMessage(msg.Body)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing message %s error: %s",
utils.ERs, msg.MessageId, err.Error()))
err = msg.Reject(true)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error negatively acknowledging message %s: %s",
utils.ERs, msg.MessageId, err.Error()))
}
return
}
if rdr.poster != nil { // post it
if err := ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString); err != nil {
// Post the message if poster is available.
if rdr.poster != nil {
err = ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, msg.MessageId, err.Error()))
}
}
msg.Ack(true)
err = msg.Ack(false)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error acknowledging message %s: %s",
utils.ERs, msg.MessageId, err.Error()))
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
}