From 2867942ebe87163990c85585de8f324250e086d7 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Mon, 16 Oct 2023 11:41:50 -0400 Subject: [PATCH] Update AMQP message processing loop Previously, msg.Ack(true) was used, which is mostly used for batch processing. It mistakenly acknowledged all previously unacknowledged messages, causing errors from the AMQP server. Now, messages are acknowledged individually after each one is processed. Messages that ERs failed to process are now rejected and requeued for future processing attempts. The reader is now closed immediately if the message delivery channel closes. Therefore, it prevents an endless loop by avoiding continuous consumption from empty or closed channels. Addresses: #4146 --- ers/amqp.go | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/ers/amqp.go b/ers/amqp.go index c14c47014..028b6132d 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -161,22 +161,48 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) { utils.ERs, rdr.dialURL)) rdr.close() return - case msg := <-msgChan: + case msg, ok := <-msgChan: + if !ok { + utils.Logger.Warning( + fmt.Sprintf("<%s> lost connection to AMQP server at %s, closing reader...", + utils.ERs, rdr.dialURL)) + rdr.close() + return + } go func(msg amqp.Delivery) { - if err := rdr.processMessage(msg.Body); err != nil { + err := rdr.processMessage(msg.Body) + if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> processing message %s error: %s", utils.ERs, msg.MessageId, err.Error())) + + err = msg.Reject(true) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error negatively acknowledging message %s: %s", + utils.ERs, msg.MessageId, err.Error())) + } + return } - if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString, - rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil { + + // Post the message if poster is available. + if rdr.poster != nil { + err = ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString, + rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant) + if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, msg.MessageId, err.Error())) + } } - msg.Ack(true) + + err = msg.Ack(false) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error acknowledging message %s: %s", + utils.ERs, msg.MessageId, err.Error())) + } if rdr.Config().ConcurrentReqs != -1 { rdr.cap <- struct{}{} }