当前位置: 首页 > ops >正文

RocketMQ核心编程模型

RocketMQ核心编程模型与SpringBoot整合深度解析

笔记整理自RocketMQ官方文档与实战经验 | 图灵楼兰出品
配套视频课程学习效果更佳

一、RocketMQ架构核心回顾

RocketMQ采用经典发布-订阅模型,核心组件包括:

  • NameServer:轻量级服务发现中心(无状态)
  • Broker:消息存储与转发节点(主从架构)
  • Producer:消息生产者
  • Consumer:消息消费者

二、深入消息模型

1. 客户端基础流程

生产者固定步骤
// 1. 创建生产者(指定组名)
DefaultMQProducer producer = new DefaultMQProducer("group_name");
// 2. 配置NameServer地址
producer.setNamesrvAddr("192.168.65.112:9876"); 
// 3. 启动服务
producer.start();
// 4. 构建消息(Topic/Tag/Body)
Message msg = new Message("TopicTest", "TagA", "Hello".getBytes());
// 5. 发送消息
SendResult result = producer.send(msg);
// 6. 关闭生产者
producer.shutdown();
消费者固定步骤
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("192.168.65.112:9876");
consumer.subscribe("TopicTest", "*"); // 订阅Topic
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {msgs.forEach(msg -> System.out.println(new String(msg.getBody())));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费状态
});
consumer.start();

2. 消息确认机制

三种发送方式对比
发送方式特点适用场景
单向发送不关心结果,吞吐量最高日志收集等低可靠性场景
同步发送阻塞等待Broker响应金融交易等高可靠性场景
异步发送回调处理结果,平衡性能与可靠性电商下单等并发场景
消费端重试策略
  • 返回RECONSUME_LATER触发重试
  • 最大重试次数默认16次(可配置)
  • 重试消息进入专属重试Topic:%RETRY%+ConsumerGroup

3. 高级消息类型

顺序消息(局部有序)
// 生产者:相同订单号的消息发往同一队列
Message msg = new Message("OrderTopic", "PAY", orderId.getBytes(), body);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;return mqs.get(id % mqs.size()); // 自定义队列选择}
}, orderId);// 消费者:实现MessageListenerOrderly
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {// 处理顺序消息...
});
事务消息(两阶段提交)
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {return LocalTransactionState.UNKNOW; // 执行本地事务}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return LocalTransactionState.COMMIT_MESSAGE; // 事务回查}
});
延迟消息
// 指定延迟级别(1-18对应预设时间)
message.setDelayTimeLevel(3); // 10秒后投递// 指定精确时间点(5.0+版本)
message.setDeliverTimeMs(System.currentTimeMillis() + 30_000); // 30秒后

4. ACL权限控制

启用步骤:

  1. Broker端开启aclEnable=true
  2. 配置plain_acl.yml
accounts:
- accessKey: RocketMQsecretKey: 12345678topicPerms:- topicA=DENY- topicB=PUB|SUB
  1. 客户端添加认证Hook:
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials("RocketMQ", "12345678"));
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);

三、SpringBoot整合实战

1. 快速集成

依赖配置:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.1</version>
</dependency>

配置文件application.yml

rocketmq:name-server: 192.168.65.112:9876producer:group: springboot-group

2. 消息生产与消费

// 生产者模板
@Autowired private RocketMQTemplate rocketMQTemplate;public void sendMessage() {rocketMQTemplate.convertAndSend("TestTopic", "Hello SpringBoot");
}// 消费者监听
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode = ConsumeMode.CONCURRENTLY
)
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received: " + message);}
}

3. 事务消息整合

@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {return RocketMQLocalTransactionState.COMMIT;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {return RocketMQLocalTransactionState.UNKNOWN;}
}

四、客户端最佳实践

1. 消息三要素规范

属性作用
MessageIdBroker生成的消息唯一标识(不建议作业务主键)
Key业务唯一键(如订单ID),用于消息追踪
Tag消息标签,用于高效过滤(性能远高于SQL过滤

2. 消费者幂等设计

重复消息场景:

  • 网络闪断导致生产者重试
  • 消费端ACK失败触发重投
  • Rebalance过程消息重复

解决方案示例:

consumer.registerMessageListener((msgs, context) -> {MessageExt msg = msgs.get(0);String orderId = msg.getKeys(); // 获取业务主键// 分布式锁或数据库唯一索引校验if (orderService.isProcessed(orderId)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 处理业务逻辑...
});

3. 死信队列处理

  • 命名规则:%DLQ%+ConsumerGroup
  • 运维注意:默认权限禁读,需手动改为可读
  • 处理方式:
    1. 查询死信原因:sh mqadmin queryMsgById
    2. 修复后重新投递到正常Topic
    3. 设置单独消费者处理死信

4. 重试策略优化

// 调整最大重试次数(超过16次间隔固定2小时)
consumer.setMaxReconsumeTimes(10); // 重试间隔配置(需Broker端配合)
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 30m 1h

官方说明:RocketMQ 仅保证 At Least Once,业务需自行实现幂等
源码位置:org.apache.rocketmq.common.consumer.ConsumeFromWhere


总结
RocketMQ的客户端设计充分考虑了金融级场景需求,通过多种消息模型组合可满足复杂业务场景。在实际使用中需特别注意:

  1. 生产环境务必启用ACL
  2. 顺序消息避免单队列堆积
  3. 死信队列监控不可或缺
  4. 消费者幂等是系统稳定性的生命线
http://www.xdnf.cn/news/15852.html

相关文章:

  • 自动找客户软件有那些?
  • 【Linux性能优化】常用工具和实战指令
  • 深入理解浏览器解析机制和XSS向量编码
  • 在Ubutu22系统上面离线安装Go语言环境【教程】
  • 《P2680 [NOIP 2015 提高组] 运输计划》
  • RPG62.制作敌人攻击波数二:攻击ui
  • 不只是“能用”:从语义化到 ARIA,打造“信息无障碍”Web 应用的实战清单
  • 在vue中遇到Uncaught TypeError: Assignment to constant variable(常亮无法修改)
  • ubuntu24.04安装CUDA和VLLM
  • #SVA语法滴水穿石# (014)关于链式蕴含的陷阱
  • 学习C++、QT---30(QT库中如何自定义控件(自定义按钮)讲解)
  • Python桌面版数独(二版)-增加4X4、6X6
  • 元宇宙经济的四个要素
  • python 字典中取值
  • SpringBoot的配置文件
  • python的pywebview库结合Flask和waitress开发桌面应用程序简介
  • 反欺诈业务 Elasticsearch 分页与导出问题分析及解决方案
  • 基于单片机的智能家居安防系统设计
  • Linux文件系统三要素:块划分、分区管理与inode结构解析
  • Linux: rsync+inotify实时同步及rsync+sersync实时同步
  • Claude Code 逆向工程分析,探索最新Agent设计
  • 【机器学习深度学习】量化与选择小模型的区别:如何理解两者的优势与局限?
  • Day1||Vue指令学习
  • PyTorch的基础概念和复杂模型的基本使用
  • Facebook 开源多季节性时间序列数据预测工具:Prophet 快速入门 Quick Start
  • macOs上交叉编译ffmpeg及安装ffmpeg工具
  • 测试中的bug
  • 基于深度学习的自然语言处理:构建情感分析模型
  • urllib.parse.urlencode 的使用详解
  • AI+预测3D新模型百十个定位预测+胆码预测+去和尾2025年7月20日第144弹