当前位置: 首页 > news >正文

springboot java开发的rocketmq 顺序消息保证

首先要明确一个关键点:RocketMQ 保证的是一种局部顺序(Partially Ordered)​,而非全局顺序(Globally Ordered)。这意味着消息的顺序性只在某个特定维度(比如同一个订单ID)下保证,而不是整个 Topic 下的所有消息都严格有序。

保证顺序的核心在于:​将需要保证顺序的一批消息,通过相同的策略,路由到同一个消息队列(MessageQueue)中,然后由同一个消费者单线程地消费这个队列。​

保证顺序消费的三大步骤

整个过程涉及三个环节:​顺序发送顺序存储​ 和 ​顺序消费。任何一个环节出错都会导致顺序紊乱。

步骤

角色

关键动作

目的

1. 顺序发送

Producer

使用 ​MessageQueueSelector,将同一业务ID(如OrderId)的消息发送到同一个MessageQueue

从源头上保证具有顺序性的消息被集中到同一队列

2. 顺序存储

Broker

自然保证,Broker 会顺序地将消息写入到其持有的 MessageQueue 中

存储层面维持消息的写入顺序

3. 顺序消费

Consumer

以 ​MessageListenerOrderly​ 方式消费,对每个 MessageQueue ​加锁并进行单线程消费

消费层面确保一个队列只被一个线程顺序处理

1. 顺序发送 (Producer)

生产者必须将具有顺序性的一批消息(例如,同一个订单的创建、付款、发货)发送到同一个 MessageQueue。

// 示例:发送顺序消息
public class OrderProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();for (int i = 0; i < 10; i++) {// 假设这是订单ID,同一订单的操作会有相同的orderIdint orderId = i % 3; // 共有3个订单,订单ID为0, 1, 2String body = "Order " + orderId + " - Step " + i;Message msg = new Message("OrderTopic", "TagA", body.getBytes(RemotingHelper.DEFAULT_CHARSET));// 关键:使用MessageQueueSelector,根据orderId选择发送到哪个队列SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// arg 就是传入的orderIdInteger id = (Integer) arg;int index = id % mqs.size(); // 通过订单ID取模,固定选择某个队列return mqs.get(index);}}, orderId); // 将orderId作为参数传入System.out.printf("SendResult status:%s, queue:%s, body:%s%n",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),body);}producer.shutdown();}
}
2. 顺序存储 (Broker)

Broker 接收到消息后,会将其顺序追加(Append)到指定的 MessageQueue 对应的日志文件上。这个过程是天然顺序的,先到的消息先存储。

3. 顺序消费 (Consumer)

这是最复杂的一环。消费者必须使用 MessageListenerOrderly来监听消息。

// 示例:顺序消费消息
public class OrderConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");consumer.setNamesrvAddr("localhost:9876");// 关键:注册顺序消息监听器 MessageListenerOrderlyconsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true); // 通常设置为自动提交for (MessageExt msg : msgs) {System.out.println("Received msg: " + new String(msg.getBody()) + ", QueueId: " + msg.getQueueId());// 模拟业务处理// 如果处理失败,返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;// RocketMQ 会暂停当前队列片刻,然后重试这批消息,而不是跳过。}return ConsumeOrderlyStatus.SUCCESS;}});consumer.subscribe("OrderTopic", "*");consumer.start();System.out.println("Consumer Started.");}
}

总结

保证 RocketMQ 顺序消费的秘诀:

  1. 发送端​:使用 MessageQueueSelector,将同一业务ID的消息发送到同一个队列

  2. 消费端​:使用 MessageListenerOrderly进行消费,依靠 Broker 的队列锁机制实现单线程顺序处理失败暂停重试

    注意事项与局限性

  3. 性能牺牲​:顺序消费意味着并发度的下降。​一个队列只能被一个线程消费。如果要提高吞吐量,通常需要增加队列数(Topic 的 queueNum),但这会使得顺序的“局部”范围变小(例如,能保证的订单顺序从1000个变成了100个)。

  4. 失败重试​:如果某条消息消费失败,它会阻塞后续同一队列的所有消息,直到它自己被成功消费或超过最大重试次数进入死信队列。​业务逻辑需要保证幂等性,因为同一条消息可能会被重复消费。

  5. 局部顺序​:只能保证同一个 MessageQueue 内的消息顺序,​不同 MessageQueue 之间的消息消费顺序是无法保证的。这就是“局部顺序”的含义。

  6. 广播模式不支持​:顺序消费只在集群模式(CLUSTERING)​​ 下有效。在广播模式(BROADCASTING)下,每个消费者都会消费所有消息,无法保证顺序。

http://www.xdnf.cn/news/1386901.html

相关文章:

  • CAN总线(Controller Area Network Bus)控制器局域网总线(二)
  • 无人机图传模块原理及作用——开启飞行视野的关键技术
  • 第二阶段WinForm-9:委托复习
  • 应用转生APP:无需Root权限的应用双开和Xposed模块加载工具
  • 计算机是如何运行的
  • [AI人脸替换] docs | 环境部署指南 | 用户界面解析
  • c++ template
  • OpenCSG月度更新2025.8
  • 电影交流|基于SprinBoot+vue的电影交流平台小程序系统(源码+数据库+文档)
  • C++基础(④链表反转(链表 + 迭代 / 递归))
  • 公司内网部署离线deepseek+docker+ragflow本地模型实战
  • SpringBoot整合Spring WebFlux弃用自带的logback,使用log4j2,并启动异步日志处理
  • Python计算点云的均值、方差、标准差、凸点(顶点)、质心和去中心化
  • Docker03-知识点整理
  • Go Vendor 和 Go Modules:管理和扩展依赖的最佳实践
  • 项目一系列-第9章 集成AI千帆大模型
  • C/C++---预定义常量
  • iCloud 备份与 iTunes 备份:有何不同
  • Jenkins Pipeline(二)-设置Docker Agent
  • Python中的匿名函数详解(lambda)
  • 无人机固件升级与技术要点解析
  • 命令行操作:逻辑运算符、重定向与管道
  • Cesium 入门教程(十二):时间动画实例
  • AI共链·智存未来 | 绿算技术受邀出席华为AI SSD发布会
  • 预测模型及超参数:3.集成学习:[1]LightGBM
  • TDengine 3.3.7.0 新增性能基准工具 taosgen
  • Django开发规范:构建可维护的AWS资源管理应用
  • LRU 内存淘汰策略
  • 扩展中国剩余定理脚本(恢复密文c)
  • 匠心传承,古韵新生——记木雕名家龙巍的艺术人生