当前位置: 首页 > news >正文

RabbitMQ 消费者确认 (Ack/Nack) (With Spring Boot)

引言

在复杂的分布式系统中,我们经常面临这样的挑战:“订单支付成功后,下游的积分、通知、物流服务却因为网络抖动或自身故障,未能成功处理消息,导致数据不一致。我们该怎么办?

这些场景都指向了一个核心痛点:如果消息被消费者取出后,在处理完成前消费者就宕机了,而消息队列却认为“任务已完成”并删除了消息,那么我们就会面临严重的数据丢失风险。这将直接影响系统的稳定性和业务的可靠性。

RabbitMQ 提供了强大的 消费者确认机制 (Consumer Acknowledgement),它是解决此类问题的利器。通过该机制,我们可以让消费者在真正成功处理完消息后,才通知 RabbitMQ 删除消息,从而极大地保证了消息处理的“至少一次”语义 (At-Least-Once Delivery)。

在Spring框架下,也提供了3中消息确认的方式

  1. AcknowledgeMode.NONE

    • 这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ 就会自动确认消息,从RabbitMQ队列中移除消息。如果消费者处理消息失败,消息可能会丢失。
  2. AcknowledgeMode.AUTO(默认)

    • 这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息。
  3. AcknowledgeMode.MANUAL

    • 手动确认模式下,消费者必须在成功处理消息后显式调用 basicAck 方法来确认消息。如果消息未被确认,RabbitMQ 会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。

核心概念解析

消费者确认机制的核心思想是:消息的生命周期控制权部分转移给了消费者。RabbitMQ 将消息投递给消费者后,会等待消费者的一个“回执”。根据回执的不同,RabbitMQ 会决定是彻底删除这条消息,还是将其重新放回队列。

两种确认模式

  1. 自动确认 (Automatic Acknowledgement / auto-ack): 这是 RabbitMQ 的默认模式。在这种模式下,RabbitMQ 在把消息发送给消费者后,就会立即认为消息已被成功消费,并将其从队列中移除。这种方式吞吐量高,但可靠性差。如果消费者在处理消息的过程中崩溃,消息就会丢失。
  2. 手动确认 (Manual Acknowledgement / manual-ack): 在此模式下,RabbitMQ 将消息投递给消费者后,会将该消息标记为“未确认”(Unacked) 状态,并等待消费者的显式指令。消费者必须在代码中明确告知 RabbitMQ 消息处理的结果。这是构建高可靠系统的标准选择。

手动确认的三个关键操作

当采用手动确认时,消费者可以对收到的消息执行以下三种操作,这些操作都通过 Channel 对象来完成:

  • channel.basicAck(long deliveryTag, boolean multiple): 肯定确认。告知 RabbitMQ 消息已被成功处理,可以安全地丢弃了。

    • deliveryTag: 消息的唯一投递凭证。这是一个单调递增的整数,由 RabbitMQ 在投递时分配,在 Channel 范围内唯一。
    • multiple: 是否批量确认。如果为 true,则表示确认所有 deliveryTag 小于或等于当前值的消息;为 false 则只确认当前这一条。
  • channel.basicNack(long deliveryTag, boolean multiple, boolean requeue): 否定确认 (Nack)。告知 RabbitMQ 消息处理失败。这是 AMQP 0-9-1 规范的 RabbitMQ 扩展,功能更强。

    • requeue: 这是一个至关重要的布尔值。
      • 如果为 true,RabbitMQ 会将该消息重新放入队列,等待投递给下一个(或同一个)消费者。
      • 如果为 false,RabbitMQ 会将消息从队列中移除。如果该队列配置了死信交换机 (Dead-Letter-Exchange),消息会被路由到那里,否则消息将被直接丢弃。
  • channel.basicReject(long deliveryTag, boolean requeue): 单条否定确认 (Reject)。与 basicNack 类似,但 basicReject 一次只能拒绝一条消息 (multiple 参数被省略了)。在功能上,channel.basicReject(deliveryTag, requeue) 等同于 channel.basicNack(deliveryTag, false, requeue)

工作流程图

下图清晰地展示了手动确认模式下的消息处理流程:

在这里插入图片描述

demo演练

接下来,我们通过一个 Spring Boot 项目来演示如何实现手动消息确认。

项目结构

一个简单的 Spring Boot 项目结构如下:

此处的com.example.ackdemo要改成读者自己的项目路径,包括后面相关代码的路径引入也需要进行对应修改

ack-demo
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── example
│   │   │           └── ackdemo
│   │   │               │   └── RabbitMQConfig.java
│   │   │               │   └── MessageController.java
│   │   │               │   └── OrderConsumer.java
│   │   │               └── AckDemoApplication.java
│   │   └── resources
│   │       └── application.yml
└── pom.xml

配置手动确认模式

src/main/resources/application.yml 文件中,进行如下配置:

spring:application:name: ack-demorabbitmq:host: localhostport: 5672username: <your_username>password: <your_password>listener:simple:# 这是开启手动确认模式的关键acknowledge-mode: manual# (可选) 设置预取数量,后面最佳实践会讲到prefetch: 1 

声明 Exchange 和 Queue

config/RabbitMQConfig.java 中声明我们需要的交换机和队列。

注意此处引入的包为org.springframework.amqp.core

package com.example.ackdemo;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration  
public class RabbitMQConfig {  //此处的常量提取到一个单独的静态类中更好,此处为了方便演式不单独提取  public static final String EXCHANGE_NAME = "order.exchange";  public static final String QUEUE_NAME = "order.queue";  public static final String ROUTING_KEY = "order.create";  @Bean  public TopicExchange exchange() {  return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();  }  @Bean  public Queue queue() {  return QueueBuilder.durable(QUEUE_NAME).build();  }  @Bean  public Binding binding(Queue queue, TopicExchange exchange) {  return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);  }  
}

生产者代码 (Publisher)

创建一个简单的 Rest Controller 用于发送消息。

import com.example.ackdemo.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;@RestController
public class MessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String sendMessage(@RequestParam(defaultValue = "success") String type) {String messageContent = "order_id:" + UUID.randomUUID() + ",type:" + type;// 发送消息到指定的交换机和路由键rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.ROUTING_KEY,messageContent);return "Message sent: " + messageContent;}
}

消费者代码 (Consumer)

这是我们的核心逻辑。消费者监听 order.queue,并根据消息内容模拟成功或失败的场景。

package com.example.ackdemo.consumer;import com.example.ackdemo.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class OrderConsumer {@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)public void receiveMessage(Message message, Channel channel) throws IOException {// 从 Message 对象中获取 deliveryTaglong deliveryTag = message.getMessageProperties().getDeliveryTag();String content = new String(message.getBody());log.info("Received message with deliveryTag: {}, content: {}", deliveryTag, content);try {// 模拟业务逻辑处理if (content.contains("type:fail")) {// 如果消息内容包含 'fail',则抛出异常,模拟处理失败throw new RuntimeException("Business logic processing failed for message: " + content);}// 业务逻辑成功处理log.info("Message processed successfully. Acknowledging...");// 发送 ACK 确认// 参数1: deliveryTag,消息的唯一标识// 参数2: multiple,是否批量确认,false表示只确认当前消息channel.basicAck(deliveryTag, false);} catch (Exception e) {log.error("Error processing message. Sending NACK...", e);// 业务逻辑处理失败,发送 NACK// 参数1: deliveryTag// 参数2: multiple,是否批量拒绝// 参数3: requeue,是否重新入队。false表示不重新入队,消息会根据策略被丢弃或进入死信队列channel.basicNack(deliveryTag, false, false);}}
}

运行与验证

  1. 启动应用。
  2. 验证成功场景
    • 打开浏览器访问 http://localhost:8080/send?type=success
    • 日志打印
2025-07-10T22:26:46.857+08:00  INFO 32516 --- [mqDemo] [ntContainer#0-1] c.doublez.mqdemo.ackdemo.OrderConsumer   : Received message with deliveryTag: 3, content: order_id:4d61a833-594e-4b04-a89f-4cb2fe750d8f,type:success
2025-07-10T22:26:46.858+08:00  INFO 32516 --- [mqDemo] [ntContainer#0-1] c.doublez.mqdemo.ackdemo.OrderConsumer   : Message processed successfully. Acknowledging...
  • 观察 RabbitMQ 管理界面 (http://localhost:15672):进入 Queues 标签页,你会看到 order.queueReadyUnacked 计数都为 0,Total 消息数量短暂变为 1 后迅速归零。
  1. 验证失败场景
    • 访问 http://localhost:8080/send?type=fail
    • 日志打印
2025-07-10T22:28:43.936+08:00  INFO 32516 --- [mqDemo] [ntContainer#0-1] c.doublez.mqdemo.ackdemo.OrderConsumer   : Received message with deliveryTag: 4, content: order_id:4d6aaeb9-9e4c-496e-959f-940ea57090d6,type:fail
2025-07-10T22:28:43.936+08:00 ERROR 32516 --- [mqDemo] [ntContainer#0-1] c.doublez.mqdemo.ackdemo.OrderConsumer   : Error processing message. Sending NACK...java.lang.RuntimeException: Business logic processing failed for message: order_id:4d6aaeb9-9e4c-496e-959f-940ea57090d6,type:fail
....//堆栈消息
  • 观察 RabbitMQ 管理界面:由于我们设置了 requeue=false,消息同样被移除了。如果配置了死信队列,它会出现在那里。
  • 如果我们把requeue=true,那么消息出错之后会被重新入队,又再次被取出,进而疯狂打印日志,停止程序之后我们也能在管理界面看见存在一条消息
    在这里插入图片描述

如果把配置中的manual 改成none 那么就不会触发requeue=true的重试,因为none发送消息之后就会被丢弃,如果是auto那么消息会被保留,因为执行过程中抛出异常了

最佳实践与注意事项

  1. 消费幂等性是必须的:手动确认保证了“至少一次”投递,这意味着在网络异常、Broker 重启等情况下,同一条消息可能被重复投递。消费者的业务逻辑必须设计成幂等的,即多次处理同一条消息的结果和只处理一次完全相同。通常可以通过消息 ID 或业务主键来做防重判断。

  2. 监控 Unacked 消息和死信队列Unacked 消息数量的持续增长,或死信队列 (DLQ) 中消息的堆积,都是严重的系统故障信号。这通常意味着消费者出现了大面积的持续性故障。必须对这些指标设置监控和告警,以便及时介入。

  3. 明智地处理消费失败的消息 (requeue=false):将 requeue 设置为 false 是一个明智的选择,它可以防止有“毒”的消息(那些总会导致消费失败的消息)无限循环,拖垮整个消费者集群。最佳实践是:

    • 配置死信队列 (DLQ):将这些处理失败的消息路由到 DLQ。
    • 建立配套处理机制:为 DLQ 配备专门的消费者,用于记录错误日志、发送告警通知、或在修复问题后进行手动/自动重试。
  4. 合理设置 prefetchspring.rabbitmq.listener.simple.prefetch 参数(对应 basic.qos)指定了 RabbitMQ 一次可以向单个消费者发送多少条 Unacked 消息。

    • 设置一个合理的 prefetch 值 (如 5-10) 可以有效防止单个消费者囤积过多消息而其他消费者饿死的情况,起到负载均衡限流的作用。
    • 如果处理一条消息非常耗时,应将 prefetch 设为 1,确保消费者处理完一条再接收下一条。
  5. 警惕批量确认 (multiple=true) 的风险:虽然批量确认可以减少网络开销,但它也带来了风险。如果你确认了 deliveryTag=10 的消息,但实际上 deliveryTag=8 的消息处理失败了,那么这条失败的消息也会被错误地确认。除非业务场景非常明确且简单,否则坚持使用 multiple=false

总结

RabbitMQ 的消费者确认机制是构建健壮、可靠消息系统的基石。通过将确认模式从默认的 auto 切换到 manual,并结合 try-catch 块中的 basicAckbasicNack,我们可以精确地控制每一条消息的生命周期,从而有效地防止因消费者异常导致的数据丢失。

http://www.xdnf.cn/news/1219141.html

相关文章:

  • 10. NAT,代理服务,内网穿透
  • DoRA详解:从LoRA到权重分解的进化
  • coze 开源版 coze-studio 配置域名 处理上传问题 教程
  • JAVA后端开发:使用 MapStruct 实现 Java 对象映射
  • uni-app用css编写族谱树家谱树
  • USRP捕获手机/路由器数据传输信号波形(下)
  • Java试题-选择题(2)
  • Python爬虫07_Requests爬取图片
  • 如何将照片从 realme 手机传输到电脑?
  • 设计模式之代理模式
  • 网关 + MDC 过滤器方案,5分钟集成 日志 traceid
  • React中的this绑定
  • node.js之Koa框架
  • Linux Flathub软件管理方法 使用指南
  • [12月考试] E
  • 进程控制:从创建到终结的完整指南
  • 【Django】-1- 开发项目搭建
  • MongoDB系列教程-第四章:MongoDB Compass可视化和管理MongoDB数据库
  • 抓大鹅小游戏微信抖音流量主小程序开源
  • HUD抬头显示器-杂散光测试设备 太阳光模拟器
  • AI学习笔记三十三:基于Opencv的单目标跟踪
  • 对git 熟悉时,常用操作
  • day36 力扣1049.最后一块石头的重量II 力扣494.目标和 力扣474.一和零
  • 【LeetCode 热题 100】4. 寻找两个正序数组的中位数——(解法一)线性扫描
  • [论文阅读] 人工智能 + 软件工程 | KnowledgeMind:基于MCTS的微服务故障定位新方案——告别LLM幻觉,提升根因分析准确率
  • SFT最佳实践教程 —— 基于方舟直接进行模型精调
  • 构型空间(Configuration Space,简称C-space)
  • 全基因组关联分析(GWAS)中模型参数选择:MLM、GLM与FarmCPU的深度解析
  • 数据库中使用SQL作分组处理01(简单分组)
  • 【worklist】worklist的hl7、dicom是什么关系