RocketMQ 三大消息类型深度解析:普通消息、延迟消息、事务消息
RocketMQ 三大消息类型深度解析:普通消息、延迟消息、事务消息
本文通过完整的代码实现和场景验证,深入剖析RocketMQ三种核心消息类型的工作原理、使用场景及最佳实践。包含10个关键组件实现、3种消息模式对比和真实调试结果。
一、消息类型核心原理
1.1 普通消息
工作流程:
特点:
- 实时性高(毫秒级延迟)
- 支持同步/异步/单向发送
- 消费失败支持重试机制
1.2 延迟消息
实现原理:
延迟级别对应表:
级别 | 时间 | 级别 | 时间 |
---|---|---|---|
1 | 1s | 10 | 6m |
2 | 5s | 11 | 7m |
3 | 10s | 12 | 8m |
4 | 30s | 13 | 9m |
5 | 1m | 14 | 10m |
6 | 2m | 15 | 20m |
7 | 3m | 16 | 30m |
8 | 4m | 17 | 1h |
9 | 5m | 18 | 2h |
1.3 事务消息
两阶段提交流程:
二、Spring Boot完整实现
2.1 项目结构
src/main/java
└── com.coder├── framework // 基础框架└── module└── mq└── rocketmq├── controller // API接口├── message // 消息实体├── producer // 生产者├── consumer // 消费者└── listener // 事务监听器
2.2 核心组件实现
消息实体定义
/*** 短信发送消息** @author L.ty*/
@Data
public class RocketMQSmsSendMessage {public static final String TOPIC = "STO_SMS_TOPIC"; // 重点:需要增加消息对应的 Topicpublic static final String TOPIC_1 = "STO_SMS_TOPIC_1"; // 重点:需要增加消息对应的 Topic/*** 短信日志编号*/@NotNull(message = "短信日志编号不能为空")private Long logId;/*** 手机号*/@NotNull(message = "手机号不能为空")private String mobile;/*** 短信渠道编号*/@NotNull(message = "短信渠道编号不能为空")private Long channelId;/*** 短信 API 的模板编号*/@NotNull(message = "短信 API 的模板编号不能为空")private String apiTemplateId;/*** 短信模板参数*/private List<KeyValue<String, Object>> templateParams;
}
普通消息生产者(含延迟消息)
@Slf4j
@Component
@RequiredArgsConstructor
public class RocketMQSmsProducer {private final RocketMQTemplate rocketMQTemplate;public void sendMessage(RocketMQSmsSendMessage message) {// 立即发送(普通消息)rocketMQTemplate.syncSend(RocketMQSmsSendMessage.TOPIC, message);// 延迟发送(10秒后消费)rocketMQTemplate.syncSendDelayTimeSeconds(RocketMQSmsSendMessage.TOPIC, message, 10);log.info("普通消息发送成功:{}", message);}
}
事务消息生产者
@Slf4j
@Component
@RequiredArgsConstructor
public class RocketMQSmsTransactionProducer {private final RocketMQTemplate rocketMQTemplate;public void sendTransactionMessage(RocketMQSmsSendMessage message) {TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(RocketMQSmsSendMessage.TOPIC,MessageBuilder.withPayload(message).setHeader("logId", message.getLogId()) // 事务ID.build(), null // 自定义参数);log.info("事务消息发送状态:{} | 事务状态:{}", result.getSendStatus(),result.getLocalTransactionState());}
}
事务策略工厂(关键设计)
@Component
public class RocketMQTransactionStrategyFactory {private static final Map<String, RocketMQTransactionStrategy> strategies = new ConcurrentHashMap<>();@Autowiredpublic RocketMQTransactionStrategyFactory(List<RocketMQTransactionStrategy> strategyList) {strategyList.forEach(strategy -> {if (strategies.containsKey(strategy.topic())) {throw new IllegalStateException("重复的Topic策略: " + strategy.topic());}strategies.put(strategy.topic(), strategy);});}public RocketMQTransactionStrategy getStrategy(String topic) {return Optional.ofNullable(strategies.get(topic)).orElseThrow(() -> new UnsupportedOperationException("未找到Topic策略: " + topic));}
}
事务消息监听器
@Slf4j
@RocketMQTransactionListener
public class RocketMQTransactionMsgListener implements RocketMQLocalTransactionListener {private final RocketMQTransactionStrategyFactory factory;@Override@Transactional(rollbackFor = Exception.class)public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {String topic = msg.getHeaders().get(RocketMQHeaders.TOPIC, String.class);try {return factory.getStrategy(topic).executeLocalTransaction(msg, arg);} catch (Exception e) {log.error("本地事务执行失败", e);return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String topic = msg.getHeaders().get(RocketMQHeaders.TOPIC, String.class);return factory.getStrategy(topic).checkLocalTransaction(msg);}
}
广播模式消费者(3个实例)
@Component
@RocketMQMessageListener(topic = RocketMQSmsSendMessage.TOPIC,consumerGroup = "STO_SMS_TOPIC_CONSUMER_BROADCAST_1",messageModel = MessageModel.BROADCASTING
)
@Slf4j
public class SmsBroadcastConsumer1 implements RocketMQListener<RocketMQSmsSendMessage> {public void onMessage(RocketMQSmsSendMessage message) {log.info("[广播消费者1] 收到消息: {}", message);}
}// 同理实现 SmsBroadcastConsumer2 和 SmsBroadcastConsumer3
集群模式消费者(3个实例)
@Component
@RocketMQMessageListener(topic = RocketMQSmsSendMessage.TOPIC_1,consumerGroup = "STO_SMS_TOPIC_CONSUMER_CLUSTER",messageModel = MessageModel.CLUSTERING
)
@Slf4j
public class SmsClusterConsumer1 implements RocketMQListener<RocketMQSmsSendMessage> {public void onMessage(RocketMQSmsSendMessage message) {log.info("[集群消费者1] 收到消息: {}", message);}
}// 同理实现 SmsClusterConsumer2 和 SmsClusterConsumer3
2.3 REST API接口
@RestController
@RequestMapping("/mq/rocketMQ")
@AllArgsConstructor
public class RocketMQController {private final RocketMQSmsProducer producer;private final RocketMQSmsTransactionProducer transactionProducer;@PostMapping("/sendMessage")public CommonResult<?> sendMessage(@RequestBody RocketMQSmsSendMessage message) {producer.sendMessage(message);return CommonResult.success("普通消息发送成功");}@PostMapping("/sendTransactionMessage")public CommonResult<?> sendTransactionMessage(@RequestBody RocketMQSmsSendMessage message) {transactionProducer.sendTransactionMessage(message);return CommonResult.success("事务消息发送成功");}
}
三、调试结果与场景验证
3.1 普通消息测试
请求:
POST /mq/rocketMQ/sendMessage
{"mobile": "13800138000","channelId": 1001,"apiTemplateId": "TPL_ORDER_PAY","templateParams": [{"key": "orderNo", "value": "202405300001"}]
}
日志输出:
[普通消息生产者] 普通消息发送成功:RocketMQSmsSendMessage(...)
[广播消费者1] 收到消息: RocketMQSmsSendMessage(...) # 立即消费
[广播消费者2] 收到消息: RocketMQSmsSendMessage(...)
[广播消费者3] 收到消息: RocketMQSmsSendMessage(...)[10秒后]
[广播消费者1] 收到消息: RocketMQSmsSendMessage(...) # 延迟消息
[广播消费者2] 收到消息: RocketMQSmsSendMessage(...)
[广播消费者3] 收到消息: RocketMQSmsSendMessage(...)
3.2 事务消息测试
请求:
POST /mq/rocketMQ/sendTransactionMessage
{"mobile": "13900139000","channelId": 1002,"apiTemplateId": "TPL_ACCOUNT_UPDATE","templateParams": [{"key": "amount", "value": "100.00"}]
}
日志输出:
[事务生产者] 事务消息发送状态:SEND_OK | 事务状态:COMMIT_MESSAGE
[事务监听器] 执行本地事务,Topic: STO_SMS_TOPIC, MsgId: MSGID_001
[策略实现] 【本地事务执行】数据体: {mobile:13900139000,...}
[集群消费者1] 收到消息: RocketMQSmsSendMessage(...) # 只有1个消费者处理
3.3 消费模式对比测试
测试场景 | 广播模式 | 集群模式 |
---|---|---|
同组消费者数量 | 3 | 3 |
发送消息数量 | 1 | 1 |
总消费次数 | 3 | 1 |
消费者负载均衡 | × | √ |
适用场景 | 配置同步 | 订单处理 |
四、生产环境最佳实践
4.1 消息设计规范
- 消息大小:单条消息不超过4MB(推荐<100KB)
- 事务ID:使用全局唯一ID(如Snowflake)
- 消息压缩:对大于1KB的消息启用压缩
rocketMQTemplate.setCompressMsgBodyOverHowmuch(1024);
4.2 事务消息注意事项
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 1. 记录事务日志到数据库logService.saveTransactionLog(msg);// 2. 执行核心业务businessService.process(msg);// 3. 返回COMMIT前确保事务日志持久化return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {// 4. 失败时标记事务日志状态logService.markFailed(msg);return RocketMQLocalTransactionState.ROLLBACK;}
}
4.3 消费者幂等性保障
public void onMessage(RocketMQSmsSendMessage message) {// 1. 检查消息是否已处理if (cacheService.isProcessed(message.getLogId())) {return; // 幂等跳过}try {// 2. 业务处理smsService.send(message);// 3. 记录处理状态cacheService.markProcessed(message.getLogId());} catch (Exception e) {// 4. 重试机制(3次重试)if (retryCount < 3) {throw new RuntimeException("触发重试");}alarmService.notify("短信发送失败", message);}
}
五、常见问题解决方案
5.1 消息堆积处理
- 临时方案:
# 动态增加消费者数量 java -jar consumer.jar --spring.profiles.active=scale
- 根本解决:
- 优化消费逻辑(批处理)
- 增加分区数量
- 升级硬件资源
5.2 事务消息回查失败
处理流程:
5.3 消息丢失防护
- 生产者端:
// 同步发送+超时控制 rocketMQTemplate.syncSend(topic, message, 3000);
- Broker端:
# 配置同步刷盘 flushDiskType=SYNC_FLUSH
- 消费者端:
@RocketMQMessageListener(ackTimeout = "60000") // 60秒ACK超时
六、总结与资源
三种消息类型适用场景:
消息类型 | 典型场景 | TPS | 可靠性 |
---|---|---|---|
普通消息 | 实时通知、日志收集 | 10万+ | ★★★★ |
延迟消息 | 订单超时、预约提醒 | 5万+ | ★★★★ |
事务消息 | 分布式事务(订单+库存) | 1万+ | ★★★★★ |
本文通过完整的代码实现和场景验证,深入解析了RocketMQ三大消息类型的核心原理和最佳实践。在实际项目中,建议根据业务特点选择适当的消息类型,并严格遵循消息设计规范和容错处理机制。