当前位置: 首页 > ai >正文

RocketMQ 三大消息类型深度解析:普通消息、延迟消息、事务消息

RocketMQ 三大消息类型深度解析:普通消息、延迟消息、事务消息

本文通过完整的代码实现和场景验证,深入剖析RocketMQ三种核心消息类型的工作原理、使用场景及最佳实践。包含10个关键组件实现3种消息模式对比真实调试结果

一、消息类型核心原理

1.1 普通消息

工作流程

Producer Broker Consumer 同步发送(syncSend) 发送确认 拉取消息 返回消息 提交消费位点 Producer Broker Consumer

特点

  • 实时性高(毫秒级延迟)
  • 支持同步/异步/单向发送
  • 消费失败支持重试机制

1.2 延迟消息

实现原理

发送延迟消息
设置delayLevel
1-18级对应不同延迟时间
Broker内部SCHEDULE_TOPIC队列
定时任务扫描
转移到目标Topic
消费者可见

延迟级别对应表

级别时间级别时间
11s106m
25s117m
310s128m
430s139m
51m1410m
62m1520m
73m1630m
84m171h
95m182h

1.3 事务消息

两阶段提交流程

Producer Broker Consumer 1. 发送半消息(PREPARED) 存储成功 2. 执行本地事务 3a. 提交(COMMIT) 4. 消息可见 3b. 回滚(ROLLBACK) 丢弃消息 5. 定时回查 6. 返回事务状态 alt [事务成功] [事务失败] [未知状态] Producer Broker Consumer

二、Spring Boot完整实现

2.1 项目结构

src/main/java
└── com.coder├── framework  // 基础框架└── module└── mq└── rocketmq├── controller  // API接口├── message     // 消息实体├── producer    // 生产者├── consumer    // 消费者└── listener    // 事务监听器

2.2 核心组件实现

消息实体定义
/*** 短信发送消息** @author L.ty*/
@Data
public class RocketMQSmsSendMessage {public static final String TOPIC = "STO_SMS_TOPIC"; // 重点:需要增加消息对应的 Topicpublic static final String TOPIC_1 = "STO_SMS_TOPIC_1"; // 重点:需要增加消息对应的 Topic/*** 短信日志编号*/@NotNull(message = "短信日志编号不能为空")private Long logId;/*** 手机号*/@NotNull(message = "手机号不能为空")private String mobile;/*** 短信渠道编号*/@NotNull(message = "短信渠道编号不能为空")private Long channelId;/*** 短信 API 的模板编号*/@NotNull(message = "短信 API 的模板编号不能为空")private String apiTemplateId;/*** 短信模板参数*/private List<KeyValue<String, Object>> templateParams;
}
普通消息生产者(含延迟消息)
@Slf4j
@Component
@RequiredArgsConstructor
public class RocketMQSmsProducer {private final RocketMQTemplate rocketMQTemplate;public void sendMessage(RocketMQSmsSendMessage message) {// 立即发送(普通消息)rocketMQTemplate.syncSend(RocketMQSmsSendMessage.TOPIC, message);// 延迟发送(10秒后消费)rocketMQTemplate.syncSendDelayTimeSeconds(RocketMQSmsSendMessage.TOPIC, message, 10);log.info("普通消息发送成功:{}", message);}
}
事务消息生产者
@Slf4j
@Component
@RequiredArgsConstructor
public class RocketMQSmsTransactionProducer {private final RocketMQTemplate rocketMQTemplate;public void sendTransactionMessage(RocketMQSmsSendMessage message) {TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(RocketMQSmsSendMessage.TOPIC,MessageBuilder.withPayload(message).setHeader("logId", message.getLogId()) // 事务ID.build(), null // 自定义参数);log.info("事务消息发送状态:{} | 事务状态:{}", result.getSendStatus(),result.getLocalTransactionState());}
}
事务策略工厂(关键设计)
@Component
public class RocketMQTransactionStrategyFactory {private static final Map<String, RocketMQTransactionStrategy> strategies = new ConcurrentHashMap<>();@Autowiredpublic RocketMQTransactionStrategyFactory(List<RocketMQTransactionStrategy> strategyList) {strategyList.forEach(strategy -> {if (strategies.containsKey(strategy.topic())) {throw new IllegalStateException("重复的Topic策略: " + strategy.topic());}strategies.put(strategy.topic(), strategy);});}public RocketMQTransactionStrategy getStrategy(String topic) {return Optional.ofNullable(strategies.get(topic)).orElseThrow(() -> new UnsupportedOperationException("未找到Topic策略: " + topic));}
}
事务消息监听器
@Slf4j
@RocketMQTransactionListener
public class RocketMQTransactionMsgListener implements RocketMQLocalTransactionListener {private final RocketMQTransactionStrategyFactory factory;@Override@Transactional(rollbackFor = Exception.class)public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {String topic = msg.getHeaders().get(RocketMQHeaders.TOPIC, String.class);try {return factory.getStrategy(topic).executeLocalTransaction(msg, arg);} catch (Exception e) {log.error("本地事务执行失败", e);return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String topic = msg.getHeaders().get(RocketMQHeaders.TOPIC, String.class);return factory.getStrategy(topic).checkLocalTransaction(msg);}
}
广播模式消费者(3个实例)
@Component
@RocketMQMessageListener(topic = RocketMQSmsSendMessage.TOPIC,consumerGroup = "STO_SMS_TOPIC_CONSUMER_BROADCAST_1",messageModel = MessageModel.BROADCASTING
)
@Slf4j
public class SmsBroadcastConsumer1 implements RocketMQListener<RocketMQSmsSendMessage> {public void onMessage(RocketMQSmsSendMessage message) {log.info("[广播消费者1] 收到消息: {}", message);}
}// 同理实现 SmsBroadcastConsumer2 和 SmsBroadcastConsumer3
集群模式消费者(3个实例)
@Component
@RocketMQMessageListener(topic = RocketMQSmsSendMessage.TOPIC_1,consumerGroup = "STO_SMS_TOPIC_CONSUMER_CLUSTER",messageModel = MessageModel.CLUSTERING
)
@Slf4j
public class SmsClusterConsumer1 implements RocketMQListener<RocketMQSmsSendMessage> {public void onMessage(RocketMQSmsSendMessage message) {log.info("[集群消费者1] 收到消息: {}", message);}
}// 同理实现 SmsClusterConsumer2 和 SmsClusterConsumer3

2.3 REST API接口

@RestController
@RequestMapping("/mq/rocketMQ")
@AllArgsConstructor
public class RocketMQController {private final RocketMQSmsProducer producer;private final RocketMQSmsTransactionProducer transactionProducer;@PostMapping("/sendMessage")public CommonResult<?> sendMessage(@RequestBody RocketMQSmsSendMessage message) {producer.sendMessage(message);return CommonResult.success("普通消息发送成功");}@PostMapping("/sendTransactionMessage")public CommonResult<?> sendTransactionMessage(@RequestBody RocketMQSmsSendMessage message) {transactionProducer.sendTransactionMessage(message);return CommonResult.success("事务消息发送成功");}
}

三、调试结果与场景验证

3.1 普通消息测试

请求

POST /mq/rocketMQ/sendMessage
{"mobile": "13800138000","channelId": 1001,"apiTemplateId": "TPL_ORDER_PAY","templateParams": [{"key": "orderNo", "value": "202405300001"}]
}

日志输出

[普通消息生产者] 普通消息发送成功:RocketMQSmsSendMessage(...)
[广播消费者1] 收到消息: RocketMQSmsSendMessage(...)  # 立即消费
[广播消费者2] 收到消息: RocketMQSmsSendMessage(...)
[广播消费者3] 收到消息: RocketMQSmsSendMessage(...)[10秒后]
[广播消费者1] 收到消息: RocketMQSmsSendMessage(...)  # 延迟消息
[广播消费者2] 收到消息: RocketMQSmsSendMessage(...)
[广播消费者3] 收到消息: RocketMQSmsSendMessage(...)

3.2 事务消息测试

请求

POST /mq/rocketMQ/sendTransactionMessage
{"mobile": "13900139000","channelId": 1002,"apiTemplateId": "TPL_ACCOUNT_UPDATE","templateParams": [{"key": "amount", "value": "100.00"}]
}

日志输出

[事务生产者] 事务消息发送状态:SEND_OK | 事务状态:COMMIT_MESSAGE
[事务监听器] 执行本地事务,Topic: STO_SMS_TOPIC, MsgId: MSGID_001
[策略实现] 【本地事务执行】数据体: {mobile:13900139000,...}
[集群消费者1] 收到消息: RocketMQSmsSendMessage(...)  # 只有1个消费者处理

3.3 消费模式对比测试

测试场景广播模式集群模式
同组消费者数量33
发送消息数量11
总消费次数31
消费者负载均衡×
适用场景配置同步订单处理

四、生产环境最佳实践

4.1 消息设计规范

  1. 消息大小:单条消息不超过4MB(推荐<100KB)
  2. 事务ID:使用全局唯一ID(如Snowflake)
  3. 消息压缩:对大于1KB的消息启用压缩
    rocketMQTemplate.setCompressMsgBodyOverHowmuch(1024);
    

4.2 事务消息注意事项

public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 1. 记录事务日志到数据库logService.saveTransactionLog(msg);// 2. 执行核心业务businessService.process(msg);// 3. 返回COMMIT前确保事务日志持久化return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {// 4. 失败时标记事务日志状态logService.markFailed(msg);return RocketMQLocalTransactionState.ROLLBACK;}
}

4.3 消费者幂等性保障

public void onMessage(RocketMQSmsSendMessage message) {// 1. 检查消息是否已处理if (cacheService.isProcessed(message.getLogId())) {return; // 幂等跳过}try {// 2. 业务处理smsService.send(message);// 3. 记录处理状态cacheService.markProcessed(message.getLogId());} catch (Exception e) {// 4. 重试机制(3次重试)if (retryCount < 3) {throw new RuntimeException("触发重试");}alarmService.notify("短信发送失败", message);}
}

五、常见问题解决方案

5.1 消息堆积处理

  1. 临时方案
    # 动态增加消费者数量
    java -jar consumer.jar --spring.profiles.active=scale
    
  2. 根本解决
    • 优化消费逻辑(批处理)
    • 增加分区数量
    • 升级硬件资源

5.2 事务消息回查失败

处理流程

存在
不存在
业务成功
业务失败
状态未知
Broker发起回查
事务日志是否存在
根据日志返回状态
检查业务状态
返回COMMIT
返回ROLLBACK
返回UNKNOWN
等待下次回查

5.3 消息丢失防护

  1. 生产者端
    // 同步发送+超时控制
    rocketMQTemplate.syncSend(topic, message, 3000);
    
  2. Broker端
    # 配置同步刷盘
    flushDiskType=SYNC_FLUSH
    
  3. 消费者端
    @RocketMQMessageListener(ackTimeout = "60000") // 60秒ACK超时
    

六、总结与资源

三种消息类型适用场景

消息类型典型场景TPS可靠性
普通消息实时通知、日志收集10万+★★★★
延迟消息订单超时、预约提醒5万+★★★★
事务消息分布式事务(订单+库存)1万+★★★★★

本文通过完整的代码实现和场景验证,深入解析了RocketMQ三大消息类型的核心原理和最佳实践。在实际项目中,建议根据业务特点选择适当的消息类型,并严格遵循消息设计规范和容错处理机制。

http://www.xdnf.cn/news/9985.html

相关文章:

  • C++ —— B/类与对象(中)
  • Python字典键的使用与应用:从基础到高级实践
  • OCC笔记:BRepMesh_IncrementalMesh的使用
  • python打卡day40@浙大疏锦行
  • 汽车高速通信的EMC挑战
  • Langchain4j Function Calling (5)
  • 关于ffplay在macos上运行奔溃的问题
  • 嵌入式开发学习日志(linux系统编程--进程(4)——线程锁)Day30
  • Google car key:安全、便捷的汽车解锁新选择
  • day40打卡
  • Netty 实战篇:为 Netty RPC 框架引入调用链追踪,实现链路透明化
  • 特伦斯 S75 电钢琴:奏响音乐新时代的华章
  • mongodb集群之分片集群
  • Ubuntu 22.04 系统下 Docker 安装与配置全指南
  • Android JNI开发
  • 大语言模型的技术原理与应用前景:从Transformer到ChatGPT
  • 技术原理简析:卫星遥感如何感知水体环境?
  • 基于Matlab实现卫星轨道模拟仿真
  • 云计算Linux Rocky day02(安装Linux系统、设备表示方式、Linux基本操作)
  • vue2 + webpack 老项目升级 node v22 + vite + vue2 实战全记录
  • 【OpenSearch】高性能 OpenSearch 数据导入
  • OpenTelemetry × Elastic Observability 系列(一):整体架构介绍
  • rm删除到回收站
  • 【设计模式】策略模式
  • 【软件】在 macOS 上安装 MySQL
  • Python学习(5) ----- Python的JSON处理
  • 分布式存储技术全景解析:从架构演进到场景实践
  • 私有云大数据部署:从开发到生产(Docker、K8s、HDFS/Flink on K8s)
  • docker部署ELK,ES开启安全认证
  • 基于RK3568/RK3588/全志H3/飞腾芯片/音视频通话程序/语音对讲/视频对讲/实时性好/极低延迟