当前位置: 首页 > web >正文

RabbitMQ如何保证消息不丢失?

在RabbitMQ中,消息丢失可能发生在三个阶段:生产者发送消息时、消息在RabbitMQ服务器内部传递时、消费者接收消息时。为了保证消息不丢失,需要从这三个方面分别采取措施:

1. 生产者确保消息发送成功

  • 开启确认模式(Confirm Mode):生产者通过调用 channel.confirmSelect() 方法开启确认模式。在这种模式下,当生产者发送消息后,RabbitMQ会向生产者发送一个确认消息,告知生产者该消息是否成功到达服务器。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConfirmListener;public class Producer {private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.confirmSelect();// 开启发送确认机制channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws Exception {System.out.println("消息已确认送达,deliveryTag: " + deliveryTag + (multiple? ",多个消息" : ""));}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws Exception {System.out.println("消息未送达,deliveryTag: " + deliveryTag + (multiple? ",多个消息" : ""));// 添加消息重发逻辑,记录日志信息}});String message = "Hello, RabbitMQ!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF - 8"));}}
}
  • 事务机制:生产者可以通过 channel.txSelect() 方法开启事务。发送消息前开启事务,发送消息后提交事务,如果发送过程中出现异常则回滚事务。但是事务机制会严重影响RabbitMQ的性能,因为它是同步阻塞的,不建议在高并发场景下使用。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ProducerWithTx {private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.txSelect();String message = "Hello, RabbitMQ!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF - 8"));channel.txCommit();} catch (Exception e) {// 如果出现异常,回滚事务if (channel != null) {try {channel.txRollback();} catch (Exception ex) {ex.printStackTrace();}}e.printStackTrace();}}
}

2. RabbitMQ服务器保证消息存储安全

  • 持久化设置
    • 交换机持久化:通过channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);声明交换机时设置 durable=true
    • 队列持久化:通过 channel.queueDeclare(queueName, true, false, false, null) 方法声明队列时,将第二个参数设为 true,这样队列在服务器重启后依然存在。
    • 消息持久化:通过 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF - 8")) 方法发送消息时,将第三个参数设为 MessageProperties.PERSISTENT_TEXT_PLAIN,这样消息会被持久化到磁盘。即使RabbitMQ服务器崩溃重启,持久化的消息和队列也不会丢失。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;public class PersistentMessageSender {private final static String EXCHANGE_NAME = "persistent_exchange";private final static String QUEUE_NAME = "persistent_queue";private final static String ROUTING_KEY = "persistent_key";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明持久化交换机channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);// 声明持久化队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 将队列绑定到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);String message = "这是一条持久化消息";// 发送持久化消息channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF - 8"));System.out.println(" [x] Sent '" + message + "'");}}
}

3. 消费者确保消息正确接收

  • 设置手动确认(Manual Acknowledgment):消费者通过 channel.basicConsume(queueName, false, consumerTag, false, false, null, consumer) 方法消费消息时,将第二个参数设为 false,表示关闭自动确认模式,开启手动确认。当消费者成功处理完消息后,调用 channel.basicAck(deliveryTag, false) 方法向RabbitMQ服务器发送确认消息。如果消费者在处理消息过程中出现异常,没有发送确认消息,RabbitMQ会认为该消息没有被成功消费,会将其重新放入队列,分配给其他消费者(如果有多个消费者)或在下次消费者启动时重新分配给该消费者。
public class Consumer {private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);boolean autoAck = false; // 关闭自动确认,改为手动确认channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag",false, false, null, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF - 8");System.out.println("收到消息: '" + message + "'");// 模拟消息处理try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}});}}
}
  • 消费端重试机制
    • 处理失败时记录日志并重试
    • 达到最大重试次数后转入死信队列

public class MessageConsumerWithRetryAndDLX {private final static String QUEUE_NAME = "main_queue";private final static String DLX_EXCHANGE_NAME = "dlx_exchange";private final static String DLX_QUEUE_NAME = "dlx_queue";private static final int MAX_RETRIES = 3;public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明死信交换机channel.exchangeDeclare(DLX_EXCHANGE_NAME, "direct", true);// 声明死信队列channel.queueDeclare(DLX_QUEUE_NAME, true, false, false, null);channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, "dlx_routing_key");// 声明主队列,并设置死信交换机和路由键Map<String, Object> args = new HashMap<>();args.put("x - dead - letter - exchange", DLX_EXCHANGE_NAME);args.put("x - dead - letter - routing - key", "dlx_routing_key");channel.queueDeclare(QUEUE_NAME, true, false, false, args);boolean autoAck = false; // 关闭自动确认channel.basicConsume(QUEUE_NAME, autoAck,"myConsumerTag", false, false, null,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF - 8");System.out.println(" [x] Received '" + message + "'");int retryCount = getRetryCount(properties);boolean success = processMessage(message, retryCount);if (success) {channel.basicAck(envelope.getDeliveryTag(), false);} else {if (retryCount < MAX_RETRIES) {// 重试,重新入队消息AMQP.BasicProperties newProps = properties.builder().headers(getUpdatedHeaders(retryCount)).build();channel.basicPublish("", envelope.getRoutingKey(), newProps, body);channel.basicAck(envelope.getDeliveryTag(), false);} else {// 达到最大重试次数,转入死信队列channel.basicPublish(DLX_EXCHANGE_NAME, "dlx_routing_key", properties, body);channel.basicAck(envelope.getDeliveryTag(), false);System.out.println("达到最大重试次数,消息转入死信队列: " + message);}}}});}}private static int getRetryCount(AMQP.BasicProperties properties) {if (properties.getHeaders() == null ||!properties.getHeaders().containsKey("retryCount")) {return 0;}return (int) properties.getHeaders().get("retryCount");}private static boolean processMessage(String message, int retryCount) {// 模拟消息处理逻辑,这里简单抛出异常模拟处理失败try {if (message.contains("error")) {throw new RuntimeException("模拟处理失败");}System.out.println("消息处理成功: " + message);return true;} catch (Exception e) {System.out.println("消息处理失败: " + e);return false;}}private static Map<String, Object> getUpdatedHeaders(int retryCount) {Map<String, Object> headers = new HashMap<>();headers.put("retryCount", retryCount + 1);return headers;}
}

http://www.xdnf.cn/news/4397.html

相关文章:

  • 【Leetcode 每日一题 - 扩展】3342. 到达最后一个房间的最少时间 II
  • 什么是 token-level 嵌入
  • JVM局部变量表和操作数栈的内存布局
  • C24-数组
  • MedCLIP-SAMv2 实验计划
  • DevExpressWinForms-AlertControl-使用教程
  • 【计算机视觉】OpenCV项目实战:OpenCV_Position 项目深度解析:基于 OpenCV 的相机定位技术
  • 深入探讨 UDP 协议与多线程 HTTP 服务器
  • python-71-基于pyecharts的通用绘图流程
  • 路由器NAT回流踩坑
  • 边缘计算:开启智能新时代的“秘密武器”
  • 性能比拼: HTTP/2 vs. HTTP/3
  • 基于大模型的输卵管妊娠全流程预测与治疗方案研究报告
  • MCP连接Agent:AI时代的TCP/IP
  • 新能源汽车中的NVM计时与RTC计时:区别与应用详解
  • XSS 攻击:深入剖析“暗藏在网页中的脚本“与防御之道
  • 怎么在非 hadoop 用户下启动 hadoop
  • PBR材质-Unity/Blender/UE
  • hadoop的运行模式
  • Web前端技术栈:从入门到进阶都需要学什么内容
  • 【Prompt工程—文生图】案例大全
  • c# LINQ-Query01
  • C 语言编码规范
  • Ubuntu也开始锈化了?Ubuntu 计划在 25.10 版本开始引入 Rust Coreutils
  • 鸿蒙开发——1.ArkTS声明式开发(UI范式基本语法)
  • kotlin一个函数返回多个值
  • 线性代数之矩阵运算:驱动深度学习模型进化的数学引擎
  • 数据可视化与数据编辑器:直观呈现数据价值
  • 在 Ubuntu 中配置 Samba 实现「特定用户可写,其他用户只读」的共享目录
  • SAP如何反查增强点的位置呢?怎么判断这个报错是增强,还是标准信息呢?