当前位置: 首页 > java >正文

【Fifty Project - D34】

今日完成记录

TimePlan完成情况
7:00 - 7:40爬坡
8:30 - 11:30Rabbit MQ
17:30 - 18:30羽毛球

RabbitMQ

消费者端如何保证可靠性?

  • 消息投递过程出现网络故障
  • 消费者接收到消息但是突然宕机未消费消息
  • 消费者接收到消息后处理不当抛出异常
  • 。。。

消费者确认机制

Consumer Acknowledgement:消费者处理消息结束应该给MQ发送一个回执,告知自己的消息处理状态:ack【成功处理消息,MQ从队列中删除消息】nack【消息处理失败,MQ需要重新推送消息】reject【消息处理失败并拒绝该消息,MQ从队列删除消息】

springAMQP提供了三种ACK处理方式:

  • none:不处理,消息投递给消费者后直接返回ack【不安全,不建议】
  • manual:手动处理,自己在业务代码中调用api发送ack或者reject【存在业务入侵但是更灵活】
  • auto:自动处理,利用aop自动对业务代码进行增强,正常执行则返回ack,出现异常则根据异常类型处理【业务异常返回nack, 消息处理或者校验异常返回reject】

返回reject常见异常:MessageConversionException、MethodArgumentNotValidException、MethodArgumentTypeMissmatchException、NoSuchMethodException、ClassCastException
基本上就是消息校验异常以及不匹配处理方法或者参数的异常

通过如下配置可以设置ack处理方法:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理

1、测试none处理方式,修改消费者端代码,使其抛出触发reject的异常。在抛出异常前打断点,并观察发现rabbitmq的客户端发现消息在发送到消费者则触发了自动ack并且删除了消息 ,触发异常后客户端并没有做任何处理。
在这里插入图片描述

2、修改acknowledge-mode为auto,再观察发现阻塞异常触发前消息处于uack状态,但同时观察到收到了一个manual ack。
在这里插入图片描述
(1)当代码继续执行,抛出MessageConversionException,会向MQ发送reject,删除消息。

这里发生了一个有意思的现象,因为我消息阻塞了太久触发了MQ消息重新投递,因此又出现了一个manual ack以及交替出现的ready和unack。
在这里插入图片描述

(2)当抛出异常是RuntimeException,可以观察到unack一直是1,且一直尝试重新投递。(重新投递没有触发那个自动的manual ack)
在这里插入图片描述

这里留两个小问题:为什么会自动发送了一个manual ack?这个重传是否是超时重传还是什么其他机制?

3、设置acknowledge-mode为manual,修改消费者端代码手动调用api返回消息回执

@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {System.out.println("这是第" + (cnt++) + "条消息");channel.basicAck(deliveryTag, false);
}

(1)测试ack
首先是接着2的调试代码继续调试【也就是此时消息队列中有一个消息没有被接收】,所以启动测试代码后这个消息会被重新投递,消息被消费者接收后手动回复确定,整个过程如下图
在这里插入图片描述
接下来重新投递一条消息观察正常的手动ack全过程,图中上面的图蓝色线(unacked)被红色线遮挡,它们其实是同样的走势。也就是当消息成功投递到消费者,会触发一次自动的ack(Deliver manual ack),但是消息处于uack,等到业务代码完成手动进行ack后该消息被ack并且删除。
在这里插入图片描述
(2)测试nack

@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {System.out.println("这是第" + (cnt++) + "条消息");throw new BusinessException();
//            channel.basicAck(deliveryTag, false);}catch (BusinessException e){// nack且重新入队 重新推送channel.basicNack(deliveryTag, false, true);}catch (MessageConversionException e){// reject 并且不重新入队channel.basicReject(deliveryTag, false);}
}

上面的代码抛出了自定义的业务异常,这个异常会被捕获并且返回nack,然后重新推送,如下图
在这里插入图片描述
(3)测试reject

@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {System.out.println("这是第" + (cnt++) + "条消息");throw new MessageConversionException("just a test for msg reject");
//            channel.basicAck(deliveryTag, false);}catch (MessageConversionException e){// reject 并且不重新入队channel.basicReject(deliveryTag, false);}}

这里抛出MessageConversionException,捕获后手动返回拒绝并且不重新投递,过程如下
在这里插入图片描述
**总结:**实际上SpringAMQP只是提供了三个接口basicAckbasicNackbasicReject,这三个接口何时触发,基于何种规则触发都是可以自定义的,上面的三个实现是基本与acknowledge-mode: auto一样的逻辑:业务异常nack且重新投递、消息异常reject且不重新投递、正常接收和消费则ack

消息失败重试机制


在这里插入图片描述

上面提到的两个小问题

为什么会触发一次自动的Deliver(Manual ACK)

原来这个Deliver(Manual ACK)是一个投递事件ACK,当消息进入消息队列未被消费,其状态为ready,当其被投递到消费者,状态会更新为unacked,如果被成功消费并且确认,则会被删除。
当首次投递,则会触发一个投递事件(ready变为unacked)
当消息被重新投递,不会再触发投递事件
这是因为:

  1. 性能优化:重复发送投递事件会导致网络带宽浪费、Broker的CPU浪费、监控系统负载
  2. 语义精确性:RabbitMQ的事件新系统旨在“报告状态变化的边界,而非状态本身”,重复投递的状态变化是首次投递的重复,因此没有必要重复报告
  3. 避免误导性监控:重复报告投递事件会导致消息计数错误,无法区分实际新消息以及重新投递消息

这个重传是否是超时重传还是什么其他机制?

明天再补了

http://www.xdnf.cn/news/13117.html

相关文章:

  • C++.OpenGL (19/64)模板测试(Stencil Testing)
  • Vue3监听浏览器刷新/关闭/前进/后退事件【兼容pc+h5】
  • 2.2 传输介质
  • ArcPy扩展模块的使用(3)
  • Niushop商城系统
  • 38.第二阶段x64游戏实战-封包-改造封包劫持工具(一)
  • 若依登录用户名和密码加密
  • 门静脉高压——治疗
  • 【ubuntu24.04】普通用户如何操作samba挂载的文件夹
  • 深入探索CDC之Canal:解锁全量与增量复制的奥秘
  • SmolVLA: A vision-language-action model for affordable and efficient robotics
  • 日拱一卒 | awk的基本操作
  • 从0到1构建我的AI星逻系统: LLM智能控制 + Streamlit前端实战
  • 达梦数据库EXISTS子查询实战指南
  • 鸿蒙图片缓存(二)
  • Day09_刷题niuke20250609
  • riscv操作系统记录(一)
  • 缓存一致性性的 实现等价
  • Element Plus 表单(el-form)中关于正整数输入的校验规则
  • DeepSeek辅助实现的DuckDB copy to自定义函数
  • SHW汽车SAP系统拆分实战:24小时停机完成重组 | SNP全球案例
  • Brup Suite 2025.5简单暴力猜解攻击手记
  • 安装便捷、维护省心,强力巨彩租赁屏助力视觉体验升级
  • Win系统权限提升篇计算机用户进程注入令牌窃取服务启动远程管理
  • 基于51单片机的篮球计分器
  • C++ 时间处理指南:深入剖析<ctime>库
  • 医疗器械研发、质量与注册:全流程指南(简)
  • nnUNet V2代码——图像增强(四)
  • Android Jetpack Compose开发纯自定义表盘【可用于体重,温度计等项目】
  • 十一(3) 类,加深对拷贝构造函数的理解