当前位置: 首页 > news >正文

【Fifty Project - D35】

今日完成记录

TimePlan完成情况
7:00 - 7:40爬坡
8:30 - 11:30Rabbit MQ
17:30 - 18:30羽毛球

RabbitMQ

消费者端如何保证可靠性?

  • 消息投递过程出现网络故障
  • 消费者接收到消息但是突然宕机未消费消息
  • 消费者接收到消息后处理不当抛出异常
  • 。。。

消费者确认机制

Consumer Acknowledgement:消费者处理消息结束应该给MQ发送一个回执,告知自己的消息处理状态:ack【成功处理消息,MQ从队列中删除消息】nack【消息处理失败,MQ需要重新推送消息】reject【消息处理失败并拒绝该消息,MQ从队列删除消息】

springAMQP提供了三种ACK处理方式:

  • none:不处理,消息投递给消费者后直接返回ack【不安全,不建议】
  • manual:手动处理,自己在业务代码中调用api发送ack或者reject【存在业务入侵但是更灵活】
  • auto:自动处理,利用aop自动对业务代码进行增强,正常执行则返回ack,出现异常则根据异常类型处理【业务异常返回nack, 消息处理或者校验异常返回reject】

返回reject常见异常:MessageConversionException、MethodArgumentNotValidException、MethodArgumentTypeMissmatchException、NoSuchMethodException、ClassCastException
基本上就是消息校验异常以及不匹配处理方法或者参数的异常

通过如下配置可以设置ack处理方法:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理

1、测试none处理方式,修改消费者端代码,使其抛出触发reject的异常。在抛出异常前打断点,并观察发现rabbitmq的客户端发现消息在发送到消费者则触发了自动ack并且删除了消息 ,触发异常后客户端并没有做任何处理。
在这里插入图片描述

2、修改acknowledge-mode为auto,再观察发现阻塞异常触发前消息处于uack状态,但同时观察到收到了一个manual ack。
在这里插入图片描述
(1)当代码继续执行,抛出MessageConversionException,会向MQ发送reject,删除消息。

这里发生了一个有意思的现象,因为我消息阻塞了太久触发了MQ消息重新投递,因此又出现了一个manual ack以及交替出现的ready和unack。
在这里插入图片描述

(2)当抛出异常是RuntimeException,可以观察到unack一直是1,且一直尝试重新投递。(重新投递没有触发那个自动的manual ack)
在这里插入图片描述

这里留两个小问题:为什么会自动发送了一个manual ack?这个重传是否是超时重传还是什么其他机制?

3、设置acknowledge-mode为manual,修改消费者端代码手动调用api返回消息回执

@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {System.out.println("这是第" + (cnt++) + "条消息");channel.basicAck(deliveryTag, false);
}

(1)测试ack
首先是接着2的调试代码继续调试【也就是此时消息队列中有一个消息没有被接收】,所以启动测试代码后这个消息会被重新投递,消息被消费者接收后手动回复确定,整个过程如下图
在这里插入图片描述
接下来重新投递一条消息观察正常的手动ack全过程,图中上面的图蓝色线(unacked)被红色线遮挡,它们其实是同样的走势。也就是当消息成功投递到消费者,会触发一次自动的ack(Deliver manual ack),但是消息处于uack,等到业务代码完成手动进行ack后该消息被ack并且删除。
在这里插入图片描述
(2)测试nack

@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {System.out.println("这是第" + (cnt++) + "条消息");throw new BusinessException();
//            channel.basicAck(deliveryTag, false);}catch (BusinessException e){// nack且重新入队 重新推送channel.basicNack(deliveryTag, false, true);}catch (MessageConversionException e){// reject 并且不重新入队channel.basicReject(deliveryTag, false);}
}

上面的代码抛出了自定义的业务异常,这个异常会被捕获并且返回nack,然后重新推送,如下图
在这里插入图片描述
(3)测试reject

@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {System.out.println("这是第" + (cnt++) + "条消息");throw new MessageConversionException("just a test for msg reject");
//            channel.basicAck(deliveryTag, false);}catch (MessageConversionException e){// reject 并且不重新入队channel.basicReject(deliveryTag, false);}}

这里抛出MessageConversionException,捕获后手动返回拒绝并且不重新投递,过程如下
在这里插入图片描述
**总结:**实际上SpringAMQP只是提供了三个接口basicAckbasicNackbasicReject,这三个接口何时触发,基于何种规则触发都是可以自定义的,上面的三个实现是基本与acknowledge-mode: auto一样的逻辑:业务异常nack且重新投递、消息异常reject且不重新投递、正常接收和消费则ack

消息失败重试机制


在这里插入图片描述

上面提到的两个小问题

为什么会触发一次自动的Deliver(Manual ACK)

原来这个Deliver(Manual ACK)是一个投递事件ACK,当消息进入消息队列未被消费,其状态为ready,当其被投递到消费者,状态会更新为unacked,如果被成功消费并且确认,则会被删除。
当首次投递,则会触发一个投递事件(ready变为unacked)
当消息被重新投递,不会再触发投递事件
这是因为:

  1. 性能优化:重复发送投递事件会导致网络带宽浪费、Broker的CPU浪费、监控系统负载
  2. 语义精确性:RabbitMQ的事件新系统旨在“报告状态变化的边界,而非状态本身”,重复投递的状态变化是首次投递的重复,因此没有必要重复报告
  3. 避免误导性监控:重复报告投递事件会导致消息计数错误,无法区分实际新消息以及重新投递消息

这个重传是否是超时重传还是什么其他机制?

明天再补了

http://www.xdnf.cn/news/960769.html

相关文章:

  • 在线学堂-第二章媒资管理模块上
  • 高效清理C盘
  • Quick BI 自定义组件开发 -- 第一篇 Lifecycle 接口的定义
  • esp_image: invalid segment length 0xffffffff
  • MySQL自定义函数零基础学习教程
  • FastAPI 与 JWT 身份验证:保护你的 API
  • SpringBoot配置最新的AI版本加入Maven的配置方式
  • CDBench论文精读
  • 树莓派4B, ubuntu20.04, 安装Ros Noetic[踩坑记录]
  • 当拼音文字遇上回文:英语中的诗意镜像与文化密码
  • Profinet转CAN网关如何实现profinet与can协议互转
  • 如何通过API接口获取淘宝商品列表?操作详解
  • Quick BI 自定义组件开发 -- 第二篇 添加 echart 组件,开发图表
  • Spring AMQP
  • 打造高效能技术组织的逆向法则
  • 解读新交规中关于“电动自行车能否在快车道骑行”的核心问题
  • Shellshock漏洞与永恒之蓝(WannaCry)勒索病毒深度分析
  • [大A量化专栏] 看盘界面设置(未完待续)
  • Linux进程信号(一)
  • AI Bot到底是真助手,还是又一个流量收割伎俩?
  • 软件功能测试有哪些类型?软件测评机构
  • CppCon 2015 学习:The Importance of Being const
  • 鸠摩搜书官网入口,免费电子书小说在线搜索下载网站
  • 火山 RTC 引擎10 ----远端视频 转网易视频格式
  • 镜头景深的影响因素有哪些
  • 【西门子杯工业嵌入式-7-OLED】
  • 高防CDN是什么?和传统CDN有什么区别?
  • 深入浅出 红黑树:如何手写红黑树(基于TreeMap,算法导论的实现)
  • 振动力学:复模态法和状态空间描述(一般阻尼系统的自由振动)
  • 网站维护页面Plus + HTML源码(源码下载)