当前位置: 首页 > ds >正文

RocketMq如何保证消息的顺序性

文章目录

  • 1.顺序消息的全流程
    • 1.1 发送阶段:消息分区
    • 1.2.存储阶段:顺序写入
    • 1.3.消费阶段:串行消费
  • 2.第三把锁有什么用?
  • 3.顺序消费存在的问题

Kafka只支持同一个Partition内消息的顺序性一样,RocketMQ中也提供了基于队列(分区)的顺序消费。即同个队列内的消息可以做到有序,但是不同队列内的消息是无序的!

在RocketMq中,它的顺序消息通过客户端的三把锁以及同一队列顺序写入协同工作,确保消息从发送、存储到消费的全流程严格有序。其核心在于:

  • 分区一致性(同一业务逻辑的消息发送到同一个分区)
  • 存储顺序性(单线程顺序写入)
  • 消费串行化(单线程消费分区)

这种设计在需要严格顺序性的场景(如金融交易、订单处理)中非常关键。
RocketMQ 的顺序消息需要从 发送、存储、消费 三个环节严格控制顺序性,因此引入了三把锁:

锁类型作用阶段实现方式目标
发送端消息发送阶段, 固定 MessageQueue,确保同一业务逻辑的消息发送到同一个 MessageQueue,为后续的存储和消费顺序性奠定基础MessageQueueSelector消息分区一致性
Broker消息存储阶段, 确保同一 MessageQueue的消息按顺序写入磁盘CommmitLog顺序写机制+ConsumeQueue 分区索引,每个 MessageQueue 对应一个 ConsumeQueue 文件,记录消息在 CommitLog 中的位置(偏移量),确保同一 MessageQueue 的消息在 ConsumeQueue 中的顺序性消息存储顺序性
消费锁消息消费阶段,通过三把锁确保同一 MessageQueue的消息单线程串行消费分布式锁 + 本地锁 + ProcessQueue 锁
分布式锁:消费者向 Broker 申请分布式锁(默认每 20 秒续签),确保同一消费组内只有一个消费者能消费该 MessageQueue
本地锁:通过 MessageQueueLock 的 Synchronized 锁,确保同一消费者线程池中只有一个线程处理该 MessageQueue
ProcessQueue 锁:通过 ProcessQueue 的 ReentrantLock(consumeLock),防止消费过程中因负载均衡或重平衡导致 ProcessQueue 被删除
通过这三个组合确保同一 MessageQueue 的消息由单线程串行消费,避免多线程并发导致顺序错乱
消息消费顺序性

1.顺序消息的全流程

1.1 发送阶段:消息分区

生产者通过 MessageQueueSelector(如 ShardingKeySelector)将 同一业务逻辑的消息(如同一订单 ID)发送到 同一个 MessageQueue

当我们作为MQ的生产者需要发送顺序消息时,需要在Send方法中,传入一个MessageQueueSelector
MessageQueueSelector中需要实现一个select方法,这个方法就是用来定义要把消息发送到哪个MessageQueue的,通常可以使用取模法进行路由:

    public void send() {SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;//根据参数,计算出一个要接收消息的MessageQueue的下标int index = id % mqs.size();//返回这个MessageQueuereturn mqs.get(index);}}, orderId);}

通过以上形式就可以将需要有序的消息发送到同一个队列中。需要注意的是,这里需要使用同步发送的方式!

1.2.存储阶段:顺序写入

Broker 接收到消息后,根据 MessageQueue 的物理分区(CommitLog + ConsumeQueue)进行 顺序写入。

同一 MessageQueue 的消息在物理文件中 RocketMQ 通过单线程写入 CommitLog 保证顺序性,按顺序追加写入,保证存储顺序性。

消息按照顺序发送的消息队列中之后,那么,消费者如何按照发送顺序进行消费呢?

1.3.消费阶段:串行消费

RocketMQ的MessageListener回调函数提供了两种消费模式:

  • 有序消费模式MessageListenerOrderly
  • 并发消费模式MessageListenerConcurrently。

所以,想要实现顺序消费,需要使用MessageListenerOrderly模式接收消息:

        consumer.registerMessagelistener(new MessagelistenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {System.out.printf("Receive order msg:" + new String(msgs.get(0) .getBody( )));return ConsumeOrderlyStatus.SUCCESS;}}, new ConsumeOrderlyContext());

当我们用以上方式注册一个消费之后,为了保证同一个队列中的有序消息可以被顺序消费,就要保证RocketMQ Broker只会把消息发送到同一个消费者上,这时候就需要加锁了。

在实现中,ConsumeMessageOrderlyService 初始化的时候,会启动一个定时任务,会尝试向 Broker 为当前消费者客户端申请分布式锁(第一把锁)。如果获取成功,那么后续消息将会只发给这个Consumer。

接下来在消息拉取的过程中,消费者会一次性拉取多条消息的,并且会将拉取到的消息放入 ProcessQueue,同时将消息提交到消费线程池进行执行。

那么拉取之后的消费过程,怎么保证顺序消费呢?这里就需要更多的锁了

RocketMQ在消费的过程中,需要申请 MessageQueue 锁,消费线程通过 MessageQueueLock 的 synchronized 锁(第二把锁)住当前 MessageQueue 的消费过程,确保在同一时间,一个队列中只有一个线程能处理列中的消息。

获取到 MessageQueue 的锁后,就可以从ProcessQueue中依次拉取一批消息处理了,但是这个过程中,为了保证消息不会出现重复消费,还需要对ProcessQueue进行加锁,通过 ProcessQueue 的 ReentrantLock。然后就可以开始处理业务逻辑了

总结下来就是三次加锁:

  • 首先锁定Broker上的MessageQueue,确保消息只会投递到唯一的消费者(分布式锁)
  • 然后对本地的MessageQueue加锁,确保只有一个线程能处理这个消息队列。(synchronized)
  • 最后对存储消息的ProcessQueue加锁,确保在重平衡的过程中不会出现消息的重复消费。(ReentrantLock)

在这里插入图片描述

里面有几个点需要大家注意下:

2.第三把锁有什么用?

前面介绍客户端加锁过程中,一共加了三把锁,那么,有没有想过这样一个问题,第三把锁如果不加的话,是不是也没问题?

因为我们已经对MessageQueue加锁了,为啥还需要对ProcessQueue再次加锁呢?

这里其实主要考虑的是重平衡(Rebalance)的问题

当我们的消费者集群,新增了一些消费者,发生重平衡的时候,某个队列可能会原来属于客户端A消费的,但是现在要重新分配给客户端B了。

这时候客户端A就需要把自己加在Broker上的锁解掉,而在这个解锁的过程中,就需要确保消息不能在消费过程中就被移除了,因为如果客户端A可能正在处理一部分消息,但是位点信息还没有提交,如果客户端B立马去消费队列中的消息,那存在一部分数据会被重复消费

那么如何判断消息是否正在消费中呢,就需要通过这个ProcessQueue上面的锁来判断了,也就是说在解锁的线程也需要尝试对ProcessQueue进行加锁,加锁成功才能进行解锁操作。以避免过程中有消息消费。

3.顺序消费存在的问题

通过上面的介绍,我们知道了RocketMQ的顺序消费是通过在消费者上多次加锁实现的,这种方式带来的问题就是会降低吞吐量,并且如果前面的消息阻寨,会导致更多消息阻塞。所以,顺序消息需要慎用。

http://www.xdnf.cn/news/17220.html

相关文章:

  • 面向对象的七大设计原则
  • Kotlin属性委托
  • 探秘MOBILITY China 2026,新能源汽车与智慧出行的未来盛宴
  • React18 严格模式下的双重渲染之谜
  • 嵌入式硬件中运放的基本控制原理
  • 2025金九银十Java后端面试攻略
  • 天津大学2024-2025 预推免 机试题目(第二批)
  • 400V降24V,200mA,应用领域:从生活到工业的 “全能电源管家”
  • C++面向对象编程基础:从类定义到封装机制详解
  • 深度学习-卷积神经网络CNN-填充与步幅
  • 最新基于Python科研数据可视化实践技术
  • 【人工智能99问】什么是Post-Training,包含哪些内容?(19/99)
  • Next Terminal 实战:内网无密码安全登录
  • MCP进阶:工业协议与AI智能体的融合革命
  • Redis之Hash和List类型常用命令
  • VGMP(VRRP Group Management Protocol)VRRP组管理协议
  • Druid学习笔记 02、快速使用Druid的SqlParser解析
  • Solidity全局变量与安全实践指南
  • python中的字典
  • 雷达系统工程学习:自制极化合成孔径雷达无人机
  • bypass
  • SelectDB:新一代实时数仓的核心引擎与应用实战
  • 机器学习——基本算法
  • 笛卡尔坐标
  • Java 中 BigDecimal、Float、Double 的取整与保留小数处理方法详解
  • 简要探讨大型语言模型(LLMs)的发展历史
  • Android进程基础:Zygote
  • Linux 磁盘管理与分区配置
  • 【2025WACV-最佳论文】RayGauss:基于体积高斯的光线投射,用于逼真的小说视图合成
  • (JAVA)自建应用调用企业微信API接口,设置企业可信IP