RabbitMQ 消费者确认 (Ack/Nack) (With Spring Boot)
引言
在复杂的分布式系统中,我们经常面临这样的挑战:“订单支付成功后,下游的积分、通知、物流服务却因为网络抖动或自身故障,未能成功处理消息,导致数据不一致。我们该怎么办?
这些场景都指向了一个核心痛点:如果消息被消费者取出后,在处理完成前消费者就宕机了,而消息队列却认为“任务已完成”并删除了消息,那么我们就会面临严重的数据丢失风险。这将直接影响系统的稳定性和业务的可靠性。
RabbitMQ 提供了强大的 消费者确认机制 (Consumer Acknowledgement),它是解决此类问题的利器。通过该机制,我们可以让消费者在真正成功处理完消息后,才通知 RabbitMQ 删除消息,从而极大地保证了消息处理的“至少一次”语义 (At-Least-Once Delivery)。
在Spring框架下,也提供了3中消息确认的方式
-
AcknowledgeMode.NONE
- 这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ 就会自动确认消息,从RabbitMQ队列中移除消息。如果消费者处理消息失败,消息可能会丢失。
-
AcknowledgeMode.AUTO(默认)
- 这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息。
-
AcknowledgeMode.MANUAL
- 手动确认模式下,消费者必须在成功处理消息后显式调用
basicAck
方法来确认消息。如果消息未被确认,RabbitMQ 会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。
- 手动确认模式下,消费者必须在成功处理消息后显式调用
核心概念解析
消费者确认机制的核心思想是:消息的生命周期控制权部分转移给了消费者。RabbitMQ 将消息投递给消费者后,会等待消费者的一个“回执”。根据回执的不同,RabbitMQ 会决定是彻底删除这条消息,还是将其重新放回队列。
两种确认模式
- 自动确认 (Automatic Acknowledgement /
auto-ack
): 这是 RabbitMQ 的默认模式。在这种模式下,RabbitMQ 在把消息发送给消费者后,就会立即认为消息已被成功消费,并将其从队列中移除。这种方式吞吐量高,但可靠性差。如果消费者在处理消息的过程中崩溃,消息就会丢失。 - 手动确认 (Manual Acknowledgement /
manual-ack
): 在此模式下,RabbitMQ 将消息投递给消费者后,会将该消息标记为“未确认”(Unacked) 状态,并等待消费者的显式指令。消费者必须在代码中明确告知 RabbitMQ 消息处理的结果。这是构建高可靠系统的标准选择。
手动确认的三个关键操作
当采用手动确认时,消费者可以对收到的消息执行以下三种操作,这些操作都通过 Channel
对象来完成:
-
channel.basicAck(long deliveryTag, boolean multiple)
: 肯定确认。告知 RabbitMQ 消息已被成功处理,可以安全地丢弃了。deliveryTag
: 消息的唯一投递凭证。这是一个单调递增的整数,由 RabbitMQ 在投递时分配,在Channel
范围内唯一。multiple
: 是否批量确认。如果为true
,则表示确认所有deliveryTag
小于或等于当前值的消息;为false
则只确认当前这一条。
-
channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
: 否定确认 (Nack)。告知 RabbitMQ 消息处理失败。这是 AMQP 0-9-1 规范的 RabbitMQ 扩展,功能更强。requeue
: 这是一个至关重要的布尔值。- 如果为
true
,RabbitMQ 会将该消息重新放入队列,等待投递给下一个(或同一个)消费者。 - 如果为
false
,RabbitMQ 会将消息从队列中移除。如果该队列配置了死信交换机 (Dead-Letter-Exchange),消息会被路由到那里,否则消息将被直接丢弃。
- 如果为
-
channel.basicReject(long deliveryTag, boolean requeue)
: 单条否定确认 (Reject)。与basicNack
类似,但basicReject
一次只能拒绝一条消息 (multiple
参数被省略了)。在功能上,channel.basicReject(deliveryTag, requeue)
等同于channel.basicNack(deliveryTag, false, requeue)
。
工作流程图
下图清晰地展示了手动确认模式下的消息处理流程:
demo演练
接下来,我们通过一个 Spring Boot 项目来演示如何实现手动消息确认。
项目结构
一个简单的 Spring Boot 项目结构如下:
此处的
com.example.ackdemo
要改成读者自己的项目路径,包括后面相关代码的路径引入也需要进行对应修改
ack-demo
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── example
│ │ │ └── ackdemo
│ │ │ │ └── RabbitMQConfig.java
│ │ │ │ └── MessageController.java
│ │ │ │ └── OrderConsumer.java
│ │ │ └── AckDemoApplication.java
│ │ └── resources
│ │ └── application.yml
└── pom.xml
配置手动确认模式
在 src/main/resources/application.yml
文件中,进行如下配置:
spring:application:name: ack-demorabbitmq:host: localhostport: 5672username: <your_username>password: <your_password>listener:simple:# 这是开启手动确认模式的关键acknowledge-mode: manual# (可选) 设置预取数量,后面最佳实践会讲到prefetch: 1
声明 Exchange 和 Queue
在 config/RabbitMQConfig.java
中声明我们需要的交换机和队列。
注意此处引入的包为
org.springframework.amqp.core
!
package com.example.ackdemo;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig { //此处的常量提取到一个单独的静态类中更好,此处为了方便演式不单独提取 public static final String EXCHANGE_NAME = "order.exchange"; public static final String QUEUE_NAME = "order.queue"; public static final String ROUTING_KEY = "order.create"; @Bean public TopicExchange exchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build(); } @Bean public Queue queue() { return QueueBuilder.durable(QUEUE_NAME).build(); } @Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); }
}
生产者代码 (Publisher)
创建一个简单的 Rest Controller 用于发送消息。
import com.example.ackdemo.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;@RestController
public class MessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String sendMessage(@RequestParam(defaultValue = "success") String type) {String messageContent = "order_id:" + UUID.randomUUID() + ",type:" + type;// 发送消息到指定的交换机和路由键rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.ROUTING_KEY,messageContent);return "Message sent: " + messageContent;}
}
消费者代码 (Consumer)
这是我们的核心逻辑。消费者监听 order.queue
,并根据消息内容模拟成功或失败的场景。
package com.example.ackdemo.consumer;import com.example.ackdemo.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class OrderConsumer {@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)public void receiveMessage(Message message, Channel channel) throws IOException {// 从 Message 对象中获取 deliveryTaglong deliveryTag = message.getMessageProperties().getDeliveryTag();String content = new String(message.getBody());log.info("Received message with deliveryTag: {}, content: {}", deliveryTag, content);try {// 模拟业务逻辑处理if (content.contains("type:fail")) {// 如果消息内容包含 'fail',则抛出异常,模拟处理失败throw new RuntimeException("Business logic processing failed for message: " + content);}// 业务逻辑成功处理log.info("Message processed successfully. Acknowledging...");// 发送 ACK 确认// 参数1: deliveryTag,消息的唯一标识// 参数2: multiple,是否批量确认,false表示只确认当前消息channel.basicAck(deliveryTag, false);} catch (Exception e) {log.error("Error processing message. Sending NACK...", e);// 业务逻辑处理失败,发送 NACK// 参数1: deliveryTag// 参数2: multiple,是否批量拒绝// 参数3: requeue,是否重新入队。false表示不重新入队,消息会根据策略被丢弃或进入死信队列channel.basicNack(deliveryTag, false, false);}}
}
运行与验证
- 启动应用。
- 验证成功场景:
- 打开浏览器访问
http://localhost:8080/send?type=success
。 - 日志打印:
- 打开浏览器访问
2025-07-10T22:26:46.857+08:00 INFO 32516 --- [mqDemo] [ntContainer#0-1] c.doublez.mqdemo.ackdemo.OrderConsumer : Received message with deliveryTag: 3, content: order_id:4d61a833-594e-4b04-a89f-4cb2fe750d8f,type:success
2025-07-10T22:26:46.858+08:00 INFO 32516 --- [mqDemo] [ntContainer#0-1] c.doublez.mqdemo.ackdemo.OrderConsumer : Message processed successfully. Acknowledging...
- 观察 RabbitMQ 管理界面 (
http://localhost:15672
):进入 Queues 标签页,你会看到order.queue
的Ready
和Unacked
计数都为 0,Total
消息数量短暂变为 1 后迅速归零。
- 验证失败场景:
- 访问
http://localhost:8080/send?type=fail
。 - 日志打印:
- 访问
2025-07-10T22:28:43.936+08:00 INFO 32516 --- [mqDemo] [ntContainer#0-1] c.doublez.mqdemo.ackdemo.OrderConsumer : Received message with deliveryTag: 4, content: order_id:4d6aaeb9-9e4c-496e-959f-940ea57090d6,type:fail
2025-07-10T22:28:43.936+08:00 ERROR 32516 --- [mqDemo] [ntContainer#0-1] c.doublez.mqdemo.ackdemo.OrderConsumer : Error processing message. Sending NACK...java.lang.RuntimeException: Business logic processing failed for message: order_id:4d6aaeb9-9e4c-496e-959f-940ea57090d6,type:fail
....//堆栈消息
- 观察 RabbitMQ 管理界面:由于我们设置了
requeue=false
,消息同样被移除了。如果配置了死信队列,它会出现在那里。
- 如果我们把
requeue=true
,那么消息出错之后会被重新入队,又再次被取出,进而疯狂打印日志,停止程序之后我们也能在管理界面看见存在一条消息
如果把配置中的
manual
改成none
那么就不会触发requeue=true
的重试,因为none
发送消息之后就会被丢弃,如果是auto
那么消息会被保留,因为执行过程中抛出异常了
最佳实践与注意事项
-
消费幂等性是必须的:手动确认保证了“至少一次”投递,这意味着在网络异常、Broker 重启等情况下,同一条消息可能被重复投递。消费者的业务逻辑必须设计成幂等的,即多次处理同一条消息的结果和只处理一次完全相同。通常可以通过消息 ID 或业务主键来做防重判断。
-
监控
Unacked
消息和死信队列:Unacked
消息数量的持续增长,或死信队列 (DLQ) 中消息的堆积,都是严重的系统故障信号。这通常意味着消费者出现了大面积的持续性故障。必须对这些指标设置监控和告警,以便及时介入。 -
明智地处理消费失败的消息 (
requeue=false
):将requeue
设置为false
是一个明智的选择,它可以防止有“毒”的消息(那些总会导致消费失败的消息)无限循环,拖垮整个消费者集群。最佳实践是:- 配置死信队列 (DLQ):将这些处理失败的消息路由到 DLQ。
- 建立配套处理机制:为 DLQ 配备专门的消费者,用于记录错误日志、发送告警通知、或在修复问题后进行手动/自动重试。
-
合理设置
prefetch
值:spring.rabbitmq.listener.simple.prefetch
参数(对应basic.qos
)指定了 RabbitMQ 一次可以向单个消费者发送多少条Unacked
消息。- 设置一个合理的
prefetch
值 (如 5-10) 可以有效防止单个消费者囤积过多消息而其他消费者饿死的情况,起到负载均衡和限流的作用。 - 如果处理一条消息非常耗时,应将
prefetch
设为 1,确保消费者处理完一条再接收下一条。
- 设置一个合理的
-
警惕批量确认 (
multiple=true
) 的风险:虽然批量确认可以减少网络开销,但它也带来了风险。如果你确认了deliveryTag=10
的消息,但实际上deliveryTag=8
的消息处理失败了,那么这条失败的消息也会被错误地确认。除非业务场景非常明确且简单,否则坚持使用multiple=false
。
总结
RabbitMQ 的消费者确认机制是构建健壮、可靠消息系统的基石。通过将确认模式从默认的 auto
切换到 manual
,并结合 try-catch
块中的 basicAck
与 basicNack
,我们可以精确地控制每一条消息的生命周期,从而有效地防止因消费者异常导致的数据丢失。