当前位置: 首页 > news >正文

如何保证RocketMQ消息不丢失

目录

前言

1.哪些环节可能丢消息?

2.保证producer到MQ消息不丢失

3.保证MQ内部消息不丢失

3.1.利用同步刷盘数据可以不丢失

3.2.同步刷盘的缺点

4.保证MQ到consumer消息不丢失


前言

这是作者RocketMQ系列文章中的一篇,一切操作和API都默认基于Spring Boot+RocketMQ,详细的各种代码示例详见:

Spring Boot集成RocketMQ_springboot集成rocketmq-CSDN博客

1.哪些环节可能丢消息?

回忆一下RocketMQ的生产消费过程:

  1. producer生产message发给MQ,
  2. MQ收到消息后将message写入commit log中完成持久化
  3. MQ将message推给consumer(假设是常用的推模式)

上面的环节有丢消息可能的是:

  1. producer发送消息给MQ的路上,消息丢了,MQ没有收到消息,或者MQ挂了,或者网络波动等等原因都有可能,反正就是producer发了,MQ没有收到消息。
  2. message到了MQ内存中,还没有写入commit log中还没有落磁盘,MQ挂了,消息会丢。
  3. MQ推送消息给consumer的路上,message丢了,consumer没有收到

保证消息不丢失,其实就是保证这三步消息不会丢失。接下来分开解决。

2.保证producer到MQ消息不丢失

producer端去校验MQ返回的ACK,要是收到失败的ACK就重新投递消息:

producer在api层面有三种发送方式:

  1. 同步发送:producer同步等待,Broker 返回写入结果(明确 ACK),失败自动重试
  2. 异步发送:通过回调函数接收 Broker 的 ACK/失败结果,需要自定义补偿逻辑
  3. Oneway发送:不等待 Broker 响应(无 ACK)

以下是Sping Boot集成RocketMQ异步发送的代码示例:

public class AsyncProducerService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;
​public void sendAsync() {Message<String> message = MessageBuilder.withPayload("异步消息内容").setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 延迟级别.build();
​rocketMQTemplate.asyncSend("ASYNC_TOPIC", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步发送成功: " + sendResult.getMsgId());}
​@Overridepublic void onException(Throwable e) {System.err.println("异步发送失败: " + e.getMessage());}});}
}

3.保证MQ内部消息不丢失

3.1.利用同步刷盘数据可以不丢失

producer将message发送到consumer中,写入commit log内,commit log在内存里,message在commit log中还没有落磁盘,MQ断电挂了,消息会丢。MQ支持两种刷盘方式:

  1. 同步刷盘,message落磁盘后向producer返回成功的ACK
  2. 异步刷盘,message写入内存后向producer返回成功的ACK

很明显,只要将刷盘方式配置为同步刷盘,MQ内message就一定不会丢失。在broker的配置文件中可以配置刷盘方式:

# 同步刷盘配置
flushDiskType = SYNC_FLUSH

# 异步刷盘配置(默认)
flushDiskType = ASYNC_FLUSH

3.2.同步刷盘的缺点

MQ的刷盘要稍微展开聊一下,才能彻底明白同步刷盘和异步刷盘的不同之处:

数据的读写,即数据IO,向网络上进行读写是网络IO,向磁盘上读写是磁盘IO。操作系统底层IO就是向内存中某块具体的地方进行读写,磁盘的存储和内存地址间存在一种映射关系。

RocketMQ收到message后将消息从内存中写入硬盘中自然也是上面说的这种机制,展开来说就是,commit log是个逻辑概念,是message的一个集合,其整体是存在磁盘上的。操作系统利用内存映射(MappedByteBuffer)将磁盘上的commit映射到内存中的 PageCache,向PageCache中进行读写。

同步刷盘会进行系统调用,强制将到达PageCache的数据强行刷入磁盘中,这就会存在一个问题,每次message来都要触发一次IO,很明显这个效率和吞吐量会是很低的。所以在工业界用的都是异步刷盘。肯定这里会有个疑惑:

异步刷盘数据丢了怎么办?

只能这样说,概率太低太低了,无数前面的工程经验显示用异步刷盘不会有什么问题。

4.保证MQ到consumer消息不丢失

consumer一侧,消息丢失的可能性有两种:

  1. consumer自身就没收到,在传输过程中就丢了或者consumer挂了
  2. consumer收到了,然后用这条消息去走下面的业务流程的时候消息丢失了,比如落库,还没落库断电了,那么对MQ来说message consumer是收到了的,但是实际整个业务上来说message约等于是没有收到。

情况一不需要我们去处理,因为consumer和MQ之间的ACK是默认打开的,要是MQ没收到consumer的成功ACK,那么它会间隔一段时间进行消息的重新投递,默认重发16次,具体重试次数可在配置文件中配置:

超过重试次数,message会被MQ投入死信队列,进入死信队列的消息可由人工去手动处理。

情况二就要我们在consumer端编码的时候注意了,在消费message的时候既要考虑到给MQ返回ACK,保证消息抵达consumer,还要考虑到message真的在业务层面被消费到了,所以将自动ack改为手动ACK,与此同时要根据自己的业务有所思考。以下是作者曾经做的一个订单系统的消费者端如何进行消息的可靠消费的代码示例:

http://www.xdnf.cn/news/965143.html

相关文章:

  • TDengine 快速体验(Docker 镜像方式)
  • docker 网络管理
  • 观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
  • MyBatis的#和$符号详解(Java面试)
  • 企业产品网络安全日志6月10日-WAF资费消耗排查
  • 【大模型01---Flash Attention】
  • 常见的http状态码
  • MySQL主从复制实现指南
  • AWS Lambda Python + AWS Secrets Manager + AWS Aurora Mysql
  • AI+预测3D新模型百十个定位预测+胆码预测+去和尾2025年6月10日第104弹
  • 深度学习聊天机器人 需要考虑
  • Linux简单的操作
  • 基于算法竞赛的c++编程(29)类的概念和简单应用
  • v-bind 与 v-model 的区别与联系详解
  • python第48天打卡
  • 通过 VS Code 连接 GitLab 并上传项目
  • 第十四届蓝桥杯_省赛B组(C).冶炼金属
  • 【单片机期末】汇编试卷
  • 64页|PPT|基于华为IPD与质量管理体系融合的研发质量管理:L1-L6分层架构驱动高效运营、标准化质量管理体系
  • 【解密LSTM、GRU如何解决传统RNN梯度消失问题】
  • 详解CNN
  • node+express+jwt+sequelize+mysql+本地服务器部署前端+云服务器公网部署:入门教程
  • 线程与进程(java)
  • 解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八
  • 数据库——MongoDB
  • 定时器任务——若依源码分析
  • Python包(Package)详解:模块的高级组织方式
  • DeviceNet转Modbus RTU,为纺织厂生产线赋能
  • uniapp的请求封装,如何避免重复提交请求
  • mysql-innoDB存储引擎事务的原理