当前位置: 首页 > java >正文

#RabbitMQ# 消息队列进阶

目录

消息可靠性

一 生产者的可靠性

1 生产者的重连

2 生产者的确认

(1 Confirm*

(2 Return

二 MQ的可靠性

1 数据持久化

2 Lazy Queue*

三 消费者的可靠性

1 消费者确认机制

2 消费失败处理

3 业务幂等性

四 延迟消息


消息可靠性

在消息队列中,可靠性(Reliability) 是指确保消息在 生产者、Broker(消息中间件)、消费者 三者之间传输时,不会因系统故障、网络问题或程序错误而丢失或重复处理。其核心目标是保证消息从发送到最终消费的 完整性和一致性

首先整体概括一下大体的内容,首先我们研究的对象为三个,生产者消费者Broker

生产者的可靠性的保障需要有retry重试机制以及确认机制。确认机制有两个confirm和return(confirm是监听消息到达Broker,return是监听消息从交换机路由到队列)retry是生产者连接或 Confirm 失败后的重试。

Broker可靠性需要数据的持久化以及懒队列存储方式(默认)的实现

消费者的可靠性的保障需要有确认机制以及失败重试机制,确认机制主要是分为两种,一种是自动一种是手动。而失败重试机制可以借助死信队列的存在以及本地重试的方式。

一 生产者的可靠性

1 生产者的重试

可以人为配置,但是要注意配置的时长,防止影响业务性能。并且SpringAMQP的重试机制默认是阻塞式重试,会出现线程资源占用。

2 生产者的确认

首先生产者的确认有两种机制,一种是Publisher Confirm 一种是Publisher Return.

(1 Confirm*

其作用是确保将信息发送到Broke.

1 配置:开启confirm机制

correlated:使用异步回调,自己可以定义一个回调实现类用于接收回调消息,不需要阻塞等待,可以提高性能。

# 配置
spring:rabbitmq:publisher-confirm-type: correlated  # 开启Confirm模式

2 然后编写回调实现类

这个onfirm方法的作用是当生产者通过RabbitTemplate发送消息时,若启动Confirm机制,消息会异步发送到Broker,在Broke处理完消息后,会向生产者发送ack,或者nack,这个重写的confirm方法是在底层被SpringAMQP监听的,当SpringAMQP监听到Broke返回的确认结果时,会自动调用已注册的ConfirmCallBack实现类的confirm方法。

@Component
@Slf4j
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("Confirm 成功: 消息ID={}", correlationData.getId());} else {log.error("Confirm 失败: 消息ID={}, 原因={}", correlationData.getId(), cause);}}
}

3 发送消息

@Service
public class OrderProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendOrderMessage(String message) {// 1. 创建关联数据(用于 Confirm 回调)CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2. 发送消息到交换机rabbitTemplate.convertAndSend("order.direct",  // 交换机名称"order.key",    // 路由键message,         // 消息内容cd              // 关联数据);log.info("消息已发送: ID={}", cd.getId());}
}

(2 Return

其作用是确保信息到达Broke后,并能正确路由到队列。(这种情况可以避免,人为确保可以正确路由)

1 配置开启Return机制

publisher-returns: true              # 启用Return机制

2 回调实现类Return

@Component
@Slf4j
public class RabbitReturnCallback implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("Return 回调: 消息无法路由到队列 [Exchange={}, RoutingKey={}, ReplyCode={}]", returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode());}
}

二 MQ的可靠性

在默认情况下,RabbitMQ会将连接收到的消息保存在内存中以降低消息收发的延迟。

  • 一旦MQ宕机,内存当中的消息会丢失
  • 内存空间有限,当消费者故障或者处理过慢时,会导致消息积压,会触发PageOut机制存储,将消息写入磁盘,但是会引发MQ阻塞。

1 数据持久化

RabbitMQ实现数据持久化包括3个方面

  • 交换机持久化
  • 队列持久化
  • 消息持久化
  1. 三要素需同时持久化

    • 队列(durable=true)、交换机(durable=true)、消息(deliveryMode=2三者缺一不可。
    • 任意一环未持久化,消息可能丢失。
    • 队列持久化与交换机持久化都可以通过注解当中的配置属性指定。
  2. 消费者手动确认

    • 消费端需关闭自动确认(autoAck=false),并手动发送ACK(channel.basicAck())后,消息才会从队列删除。

代码:

举例说明

@Configuration
public class RabbitMQConfig {// 持久化交换机@Beanpublic DirectExchange durableExchange() {return new DirectExchange("durable.direct", true, false); // durable=true}// 持久化队列@Beanpublic Queue durableQueue() {return new Queue("durable.queue", true); // durable=true}// 绑定关系@Beanpublic Binding durableBinding() {return BindingBuilder.bind(durableQueue()).to(durableExchange()).with("routing.key");}
}
@Service
@RequiredArgsConstructor
public class MessageProducer {private final RabbitTemplate rabbitTemplate;public void sendPersistentMessage() {// 消息内容String message = "hello everyone!";// 创建消息属性并设置持久化MessageProperties properties = new MessageProperties();properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // deliveryMode=2// 构建消息Message amqpMessage = new Message(message.getBytes(), properties);// 发送消息(交换机、路由键、消息、关联数据)rabbitTemplate.convertAndSend("durable.direct",   // 交换机名称"routing.key",      // 路由键amqpMessage,        // 消息对象new CorrelationData(UUID.randomUUID().toString()));}
}

持久化的本质是通过磁盘写入实现数据的持久存储

2 Lazy Queue*

从 RabbitMQ 3.6.0 版本开始,引入了惰性队列 Lazy Queue 的概念,也就是惰性队列。

惰性队列的特性:

  • 接收到消息后直接存入磁盘而非内存。(内存中会存储少量消息)
  • 消费者要获取消息时才会从磁盘中读取并加载到内存当中。
  • 支持数百万条的消息存储。
  • 在3.12版本之后所有的队列都是LazyQueue模式,无法更改。
  • 开启持久化和生产者确认时,只有在消息持久化完成后才会给生产者返回ACK

如果是老版本的话需要手动再指定,新版本不需指定,默认

@RabbitListener(queuesToDeclare = @Queue(name = "explicit.lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy") // 显式设置惰性队列
))
public void listen(String msg) {// 处理消息
}
@Bean
public Queue explicitLazyQueue() {return QueueBuilder.durable("explicit.lazy.queue").lazy() // 显式声明惰性队列(兼容低版本 RabbitMQ).build();
}

三 消费者的可靠性

1 消费者确认机制

概念:为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制。当消费者消息处理结束时向Broker发送一个回执,告知消息处理的状态。

  • ack:成功处理消息,RabbitMQ从队列当中删除消息。
  • nack:消息处理失败,RabbitMQ需要再次投递消息。
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息。

配置

spring:rabbitmq:host: 192.168.100.100port: 5672virtual-host: /hmallusername: hmallpassword: 123456connection-timeout: 1slistener:simple:prefetch: 1acknowledge-mode: auto #开启消费者确认机制publisher-confirm-type: correlated # 开启confirm模式

2 消费失败处理

自动确认

当消费者出现异常后,消息会不断requeue重新入队,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。

    listener:simple:prefetch: 1acknowledge-mode: auto # 默认自动确认retry:enabled: true # 开启消费者重试机制max-attempts: 3 # 最大重试次数multiplier: 1 # 重试时间间隔initial-interval: 1000ms # 初始重试时间间隔

本地重试(Local Retry)
Spring 的 retry 机制通过 本地重试 替代 RabbitMQ 的 requeue,避免消息反复入队。从而提升性能。

将失败策略修改为RepublicMessageRecoverer,首先先定义接收失败的交换机队列机器绑定关系,然后定义RepublicMessageRecoverer

@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled",havingValue = "true")
public class RabbitListenerConfig {@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {// 关键点:失败时将消息重新发布到死信交换机return new RepublishMessageRecoverer(rabbitTemplate,"dlx.direct",   // 死信交换机名称"dlx.routing.key" // 死信路由键);}
}

然后再定义:死信队列+死信交换机+正常队列

首先定义死信交换机死信队列并绑定,然后定义队列规则,将死信队列交换机与队列绑定

可在yml配置中编写重试规则

    // 1. 定义死信交换机(DLX)@Beanpublic DirectExchange dlxExchange() {return new DirectExchange("dlx.direct", true, false);}// 2. 定义死信队列(DLQ)@Beanpublic Queue dlq() {return new Queue("dlq.queue", true);}// 3. 绑定死信队列到死信交换机@Beanpublic Binding dlqBinding() {return BindingBuilder.bind(dlq()).to(dlxExchange()).with("dlx.routing.key");}// 4. 定义原始队列,并绑定到死信交换机@Beanpublic Queue originQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dlx.direct");    // 指定死信交换机args.put("x-dead-letter-routing-key", "dlx.routing.key"); // 指定死信路由键return new Queue("origin.queue", true, false, false, args);}

如果这个队列出现重试依旧出错将会进入死信交换机后再路由进入死信队列

生产者发送消息 → 原始队列(origin.queue)↓
消费者尝试处理消息 → 成功 → ACK↓
消费者处理失败 → 重试 → 仍失败 → 拒绝消息(requeue=false)↓
消息变为死信 → 转发到死信交换机(dlx.direct)↓
死信交换机根据路由键(dlx.routing.key) → 路由到死信队列(dlx.queue)↓
死信队列中的消息被专门的消费者处理(如人工干预、日志记录、重试等)

手动确认

1 xml文件

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:acknowledge-mode: manual # 手动确认模式retry:enabled: true         # 开启消费者重试max-attempts: 3       # 最大重试次数(含首次消费)initial-interval: 1000 # 初始重试间隔(ms)multiplier: 2          # 重试间隔乘数(1s → 2s → 4s)max-interval: 10000    # 最大重试间隔(ms)

2 声明队列,交换机以及绑定关系

@Configuration
public class RabbitMQConfig {// 正常业务交换机public static final String ORDER_EXCHANGE = "order_exchange";// 正常业务队列public static final String ORDER_QUEUE = "order_queue";// 死信交换机public static final String DLX_EXCHANGE = "dlx_exchange";// 死信队列public static final String DLX_QUEUE = "dlx_queue";/*** 声明正常业务交换机(直连交换机)*/@Beanpublic DirectExchange orderExchange() {return new DirectExchange(ORDER_EXCHANGE);}/*** 声明正常业务队列,并绑定死信交换机*/@Beanpublic Queue orderQueue() {return QueueBuilder.durable(ORDER_QUEUE).deadLetterExchange(DLX_EXCHANGE) // 绑定死信交换机.deadLetterRoutingKey("dlx.routing.key") // 死信路由键.build();}/*** 绑定队列与交换机*/@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.routing.key");}/*** 声明死信交换机(直连交换机)*/@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE);}/*** 声明死信队列*/@Beanpublic Queue dlxQueue() {return new Queue(DLX_QUEUE, true);}/*** 绑定死信队列与交换机*/@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key");}
}

3 配置重试策略与失败消息转发

@Configuration
public class RetryConfig {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 配置重试策略*/@Beanpublic RetryPolicy retryPolicy() {return new SimpleRetryPolicy();}/*** 配置重试模板*/@Beanpublic RetryTemplate retryTemplate(RetryPolicy retryPolicy) {RetryTemplate retryTemplate = new RetryTemplate();retryTemplate.setRetryPolicy(retryPolicy);// 指数退避策略ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();backOffPolicy.setInitialInterval(1000);    // 初始间隔 1sbackOffPolicy.setMultiplier(2);            // 间隔乘数backOffPolicy.setMaxInterval(10000);       // 最大间隔 10sretryTemplate.setBackOffPolicy(backOffPolicy);return retryTemplate;}/*** 配置消息恢复器(失败后投递到死信交换机)*/@Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate,RabbitMQConfig.DLX_EXCHANGE,  // 死信交换机"dlx.routing.key"             // 死信路由键);}/*** 配置监听器容器工厂(核心)*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,RetryTemplate retryTemplate,MessageRecoverer messageRecoverer) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认// 配置重试factory.setAdviceChain(RetryInterceptorBuilder.stateless().retryOperations(retryTemplate).recoverer(messageRecoverer) // 失败后转发到死信队列.build());return factory;}
}

4 消费者实现

@Component
public class OrderConsumer {private static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);@RabbitListener(queues = RabbitMQConfig.ORDER_QUEUE)public void handleOrderMessage(@Payload Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {// 业务处理逻辑(模拟可能抛出异常)processOrder(order);// 处理成功,手动确认channel.basicAck(deliveryTag, false);log.info("订单处理成功: {}", order.getId());} catch (Exception e) {// 记录异常日志log.error("订单处理失败: {}", order.getId(), e);// 手动拒绝消息(不重新入队,由重试机制处理)channel.basicNack(deliveryTag, false, false);}}private void processOrder(Order order) throws Exception {// 模拟业务处理(如数据库操作、远程调用等)if (order.getAmount() < 0) {throw new IllegalArgumentException("订单金额非法");}// ... 其他业务逻辑}
}

死信队列消费者

@Component
public class DlxConsumer {@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)public void handleDlxMessage(Order failedOrder) {// 记录日志或人工处理System.err.println("收到死信消息: " + failedOrder);// 可在此处发送告警邮件或记录到数据库}
}

3 业务幂等性

解决方案:

1 唯一id

代码:

    @Beanpublic MessageConverter jacksonMessageConvertor() {Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();converter.setCreateMessageIds(true);return converter;}

为每一条消息设置一个全局唯一id以达到幂等性

2 业务判断

四 延迟消息

1 死信交换机

2 延迟消息插件

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列_rabbitmq延迟队列插件-CSDN博客

spring:rabbitmq:host: 127.0.0.1  # RabbitMQ服务器地址port: 5672         # RabbitMQ默认端口username: guest    # RabbitMQ用户名password: guest    # RabbitMQ密码virtual-host: /    # 虚拟主机路径(默认为/)# 连接工厂配置connection-timeout: 5000ms  # 连接超时时间requested-heartbeat: 60s    # 心跳间隔requested-channel-max: 200  # 最大通道数publisher-confirm-type: correlated  # 启用生产者确认publisher-returns: true             # 启用未路由消息返回template:retry:enabled: true                   # 启用重试机制max-attempts: 3                 # 最大重试次数initial-interval: 1000ms        # 初始重试间隔listener:simple:acknowledge-mode: manual      # 消费者手动确认模式auto-startup: true            # 自动启动监听器concurrency: 1-5              # 消费者线程池范围(最小1~最大5)max-concurrency: 10           # 最大并发数prefetch-count: 10            # 每次从MQ拉取的消息数量(防止内存溢出)default-requeue-rejected: false  # 拒绝的消息不重新入队retry:enabled: true               # 启用消费者重试max-attempts: 3             # 最大重试次数initial-interval: 2000ms    # 初始重试间隔# 消息转换器配置(JSON序列化)message-converter: jackson2JsonMessageConverter  # 使用Jackson2JsonMessageConverter# 高级配置(惰性队列)lazy-queues: true  # 启用惰性队列(RabbitMQ 3.12+ 默认开启)queue:mode: lazy       # 显式声明队列为惰性队列(兼容旧版本)version: 2       # 使用Classic Queue v2(CQv2)优化性能# RabbitMQ连接超时相关配置(可选)
rabbitmq:connection:timeout: 5000msblocking-io-timeout: 5000ms  # 阻塞连接检查超时时间

http://www.xdnf.cn/news/9305.html

相关文章:

  • yolo最终笔记
  • 《棒球特长生》棒球升学途径·棒球1号位
  • 梯度消失和梯度爆炸的原因及解决办法
  • torch cuda 版本安装
  • Java 各版本核心新特性的详细说明
  • 2025软考软件设计师题目
  • 【CATIA的二次开发12】根对象Application的Documents集合概述
  • IEEE出版|2025人工智能驱动图像处理与计算机视觉技术国际学术研讨会 (AIPCVT 2025)
  • MobaXterm连接Docker Desktop中的容器(shell)
  • 人脸识别打卡项目
  • MySQL问题:什么是MySQL的中的最左匹配原则?
  • RY2200 One Cell Li-ion and Li-poly Battery Protection IC
  • 【运维实战】Linux 内存调优之进程内存深度监控
  • 基于深度学习双塔模型的食堂菜品推荐系统
  • 【MQTT】TLS证书双向验证
  • 天大《电视原理》背诵考点整理+计算/框图/作业题 (个人整理)
  • FPGA中的“BPI“指什么
  • 软件项目交付阶段,验收报告记录了什么?有哪些标准要求?
  • centos7.5安装kubernetes1.25.0
  • 随叫随到的电力补给:移动充电服务如何重塑用户体验?
  • cursor-stats 实时监控 Cursor IDE 的使用情况和订阅状态
  • 线代第四章线性方程组第三节:齐次线性方程组
  • JDK21深度解密 Day 7:FFM与VarHandle底层剖析
  • langchain 0.3.x 版本如何初始化本地模型
  • js-day3
  • Tailwind css实战,基于Kooboo构建AI对话框页面(二)
  • 鸿蒙OSUniApp 开发支持图片和视频的多媒体展示组件#三方框架 #Uniapp
  • AI学习搭档:开启终身学习新时代
  • 强大的免费工具,集合了30+功能
  • 科技赋能建筑行业,智能楼宇自控系统崭露头角成发展新势力