RabbitMQ如何保证消息不丢失?
在RabbitMQ中,消息丢失可能发生在三个阶段:生产者发送消息时、消息在RabbitMQ服务器内部传递时、消费者接收消息时。为了保证消息不丢失,需要从这三个方面分别采取措施:
1. 生产者确保消息发送成功
- 开启确认模式(Confirm Mode):生产者通过调用
channel.confirmSelect()
方法开启确认模式。在这种模式下,当生产者发送消息后,RabbitMQ会向生产者发送一个确认消息,告知生产者该消息是否成功到达服务器。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConfirmListener;public class Producer {private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.confirmSelect();// 开启发送确认机制channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws Exception {System.out.println("消息已确认送达,deliveryTag: " + deliveryTag + (multiple? ",多个消息" : ""));}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws Exception {System.out.println("消息未送达,deliveryTag: " + deliveryTag + (multiple? ",多个消息" : ""));// 添加消息重发逻辑,记录日志信息}});String message = "Hello, RabbitMQ!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF - 8"));}}
}
- 事务机制:生产者可以通过
channel.txSelect()
方法开启事务。发送消息前开启事务,发送消息后提交事务,如果发送过程中出现异常则回滚事务。但是事务机制会严重影响RabbitMQ的性能,因为它是同步阻塞的,不建议在高并发场景下使用。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ProducerWithTx {private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.txSelect();String message = "Hello, RabbitMQ!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF - 8"));channel.txCommit();} catch (Exception e) {// 如果出现异常,回滚事务if (channel != null) {try {channel.txRollback();} catch (Exception ex) {ex.printStackTrace();}}e.printStackTrace();}}
}
2. RabbitMQ服务器保证消息存储安全
- 持久化设置:
- 交换机持久化:通过
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
声明交换机时设置durable=true
- 队列持久化:通过
channel.queueDeclare(queueName, true, false, false, null)
方法声明队列时,将第二个参数设为true
,这样队列在服务器重启后依然存在。 - 消息持久化:通过
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF - 8"))
方法发送消息时,将第三个参数设为MessageProperties.PERSISTENT_TEXT_PLAIN
,这样消息会被持久化到磁盘。即使RabbitMQ服务器崩溃重启,持久化的消息和队列也不会丢失。
- 交换机持久化:通过
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;public class PersistentMessageSender {private final static String EXCHANGE_NAME = "persistent_exchange";private final static String QUEUE_NAME = "persistent_queue";private final static String ROUTING_KEY = "persistent_key";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明持久化交换机channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);// 声明持久化队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 将队列绑定到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);String message = "这是一条持久化消息";// 发送持久化消息channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF - 8"));System.out.println(" [x] Sent '" + message + "'");}}
}
3. 消费者确保消息正确接收
- 设置手动确认(Manual Acknowledgment):消费者通过
channel.basicConsume(queueName, false, consumerTag, false, false, null, consumer)
方法消费消息时,将第二个参数设为false
,表示关闭自动确认模式,开启手动确认。当消费者成功处理完消息后,调用channel.basicAck(deliveryTag, false)
方法向RabbitMQ服务器发送确认消息。如果消费者在处理消息过程中出现异常,没有发送确认消息,RabbitMQ会认为该消息没有被成功消费,会将其重新放入队列,分配给其他消费者(如果有多个消费者)或在下次消费者启动时重新分配给该消费者。
public class Consumer {private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);boolean autoAck = false; // 关闭自动确认,改为手动确认channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag",false, false, null, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF - 8");System.out.println("收到消息: '" + message + "'");// 模拟消息处理try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}});}}
}
- 消费端重试机制
- 处理失败时记录日志并重试
- 达到最大重试次数后转入死信队列
public class MessageConsumerWithRetryAndDLX {private final static String QUEUE_NAME = "main_queue";private final static String DLX_EXCHANGE_NAME = "dlx_exchange";private final static String DLX_QUEUE_NAME = "dlx_queue";private static final int MAX_RETRIES = 3;public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明死信交换机channel.exchangeDeclare(DLX_EXCHANGE_NAME, "direct", true);// 声明死信队列channel.queueDeclare(DLX_QUEUE_NAME, true, false, false, null);channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, "dlx_routing_key");// 声明主队列,并设置死信交换机和路由键Map<String, Object> args = new HashMap<>();args.put("x - dead - letter - exchange", DLX_EXCHANGE_NAME);args.put("x - dead - letter - routing - key", "dlx_routing_key");channel.queueDeclare(QUEUE_NAME, true, false, false, args);boolean autoAck = false; // 关闭自动确认channel.basicConsume(QUEUE_NAME, autoAck,"myConsumerTag", false, false, null,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF - 8");System.out.println(" [x] Received '" + message + "'");int retryCount = getRetryCount(properties);boolean success = processMessage(message, retryCount);if (success) {channel.basicAck(envelope.getDeliveryTag(), false);} else {if (retryCount < MAX_RETRIES) {// 重试,重新入队消息AMQP.BasicProperties newProps = properties.builder().headers(getUpdatedHeaders(retryCount)).build();channel.basicPublish("", envelope.getRoutingKey(), newProps, body);channel.basicAck(envelope.getDeliveryTag(), false);} else {// 达到最大重试次数,转入死信队列channel.basicPublish(DLX_EXCHANGE_NAME, "dlx_routing_key", properties, body);channel.basicAck(envelope.getDeliveryTag(), false);System.out.println("达到最大重试次数,消息转入死信队列: " + message);}}}});}}private static int getRetryCount(AMQP.BasicProperties properties) {if (properties.getHeaders() == null ||!properties.getHeaders().containsKey("retryCount")) {return 0;}return (int) properties.getHeaders().get("retryCount");}private static boolean processMessage(String message, int retryCount) {// 模拟消息处理逻辑,这里简单抛出异常模拟处理失败try {if (message.contains("error")) {throw new RuntimeException("模拟处理失败");}System.out.println("消息处理成功: " + message);return true;} catch (Exception e) {System.out.println("消息处理失败: " + e);return false;}}private static Map<String, Object> getUpdatedHeaders(int retryCount) {Map<String, Object> headers = new HashMap<>();headers.put("retryCount", retryCount + 1);return headers;}
}