diff --git a/ers/amqp.go b/ers/amqp.go index c14c47014..028b6132d 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -161,22 +161,48 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) { utils.ERs, rdr.dialURL)) rdr.close() return - case msg := <-msgChan: + case msg, ok := <-msgChan: + if !ok { + utils.Logger.Warning( + fmt.Sprintf("<%s> lost connection to AMQP server at %s, closing reader...", + utils.ERs, rdr.dialURL)) + rdr.close() + return + } go func(msg amqp.Delivery) { - if err := rdr.processMessage(msg.Body); err != nil { + err := rdr.processMessage(msg.Body) + if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> processing message %s error: %s", utils.ERs, msg.MessageId, err.Error())) + + err = msg.Reject(true) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error negatively acknowledging message %s: %s", + utils.ERs, msg.MessageId, err.Error())) + } + return } - if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString, - rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil { + + // Post the message if poster is available. + if rdr.poster != nil { + err = ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString, + rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant) + if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, msg.MessageId, err.Error())) + } } - msg.Ack(true) + + err = msg.Ack(false) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error acknowledging message %s: %s", + utils.ERs, msg.MessageId, err.Error())) + } if rdr.Config().ConcurrentReqs != -1 { rdr.cap <- struct{}{} }