当前位置: 首页 > ops >正文

RocketMQ 消息消费 单个消费和批量消费配置实现对比(Springboot),完整实现示例对比

在 Spring Boot 中使用 RocketMQ 实现消息消费时,单个消费批量消费是两种常见的消费模式。它们的核心区别在于:单个消费一次处理一条消息,而批量消费一次处理多条消息,以提升吞吐量和性能。

以下是 单个消费 vs 批量消费 的完整对比说明与 Spring Boot 实现示例。


✅ 一、核心对比

特性单个消费(Individual Consumption)批量消费(Batch Consumption)
每次处理消息数1 条多条(可配置,最大 1024 条)
消费监听器接口RocketMQListener<T>RocketMQListener<List<T>> 或自定义批量监听
性能较低,适合实时性要求高、处理逻辑重的场景较高,适合高吞吐、可聚合处理的场景
网络开销高(每条消息确认一次)低(一批消息统一确认)
配置参数无需特殊配置consumeMessageBatchMaxSize, batchConsumeMaxAwaitDurationInSeconds
失败处理可单独重试某条消息通常整批重试或丢弃(需幂等设计)
适用场景实时订单处理、敏感业务日志聚合、批量入库、报表统计

✅ 二、Spring Boot 实现示例对比

📌 1. 项目依赖(pom.xml)

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>

✅ 场景一:单个消息消费

1. 生产者(发送单条消息)
@RestController
public class MessageController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendSingle")public String sendSingle() {for (int i = 1; i <= 10; i++) {rocketMQTemplate.convertAndSend("single-topic", "单条消息 " + i);}return "单条消息发送完成";}
}
2. 消费者(逐条消费)
@Service
@RocketMQMessageListener(topic = "single-topic",consumerGroup = "single-consumer-group"
)
public class SingleMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("【单条消费】收到消息: " + message);}
}

✅ 特点:每条消息独立处理,简单直观。


✅ 场景二:批量消息消费

1. 生产者(发送批量消息)
@RestController
public class BatchMessageController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendBatch")public String sendBatch() {List<Message<String>> messageList = new ArrayList<>();for (int i = 1; i <= 100; i++) {messageList.add(MessageBuilder.withPayload("批量消息 " + i).build());}// 发送批量消息(总大小不能超过 4MB)SendResult result = rocketMQTemplate.syncSend("batch-topic", messageList);System.out.println("批量消息发送结果: " + result);return "批量消息已发送";}
}

⚠️ 注意:批量消息总大小建议不超过 1MB~4MB,避免网络超时。


2. 消费者(批量消费)
@Service
@RocketMQMessageListener(topic = "batch-topic",consumerGroup = "batch-consumer-group",consumeMode = ConsumeMode.CONCURRENTLY,// 每批最多消费 32 条(默认值),可自定义为 1~1024consumeMessageBatchMaxSize = 32
)
public class BatchMessageConsumer implements RocketMQListener<List<String>> {@Overridepublic void onMessage(List<String> messages) {System.out.println("【批量消费】本次共消费 " + messages.size() + " 条消息:");for (String msg : messages) {System.out.println("  -> " + msg);}}
}

✅ 特点:一次接收一个 List<String>,可批量入库、聚合处理,提升吞吐量。


✅ 3. 配置文件(application.yml)

rocketmq:name-server: 127.0.0.1:9876producer:group: batch-producer-group# 批量消费相关参数(部分版本需通过代码设置)consumer:# 每批最大消费消息数consume-message-batch-max-size: 32# 批量等待最大时长(秒),达到即触发消费(即使未满批)batch-consume-max-await-duration-in-seconds: 1

🔔 注意:consume-message-batch-max-sizebatch-consume-max-await-duration-in-seconds 参数在部分 RocketMQ Spring 版本中需通过代码设置,YAML 不生效。


✅ 三、关键配置说明

参数说明
consumeMessageBatchMaxSize缓存达到该数量时触发批量消费,默认 32,最大 1024
batchConsumeMaxAwaitDurationInSeconds最大等待时间,超时即消费,即使未满批,默认 0(不等待)
触发条件任一条件满足即触发:数量达标 或 时间超时

📌 示例:设置为 1281,表示:1 秒内收到 50 条也会触发消费


✅ 四、注意事项

  1. 幂等性要求更高
    批量消费中,若处理失败,整批可能重试,需保证消息处理幂等

  2. 消费确认(ACK)是整批的
    只有当 onMessage 成功返回,整批才算成功;抛异常则整批重试。

  3. 消息大小限制
    单个批次总大小不要超过 4MB(RocketMQ 限制)。

  4. 版本兼容性
    批量消费功能依赖 RocketMQ 客户端版本,建议使用 2.2.0+ 或商业版。

  5. 线程模型
    批量消费仍使用并发线程池处理,可通过 consumeThreadMin/Max 调整。


✅ 五、总结

选择建议推荐模式
实时性强、处理复杂单个消费
吞吐量高、可聚合处理(如日志、统计)批量消费
消息量大但延迟敏感度低批量消费 + 合理设置等待时间

✅ 推荐:在 数据同步、日志采集、批量入库 等场景优先使用批量消费,性能提升显著。


🔗 参考文档

  • Apache RocketMQ 官方文档
  • Spring Boot RocketMQ Starter
  • 阿里云 RocketMQ 批量消费参数说明

如需更高级功能(如顺序批量消费、事务消息),可进一步扩展。

http://www.xdnf.cn/news/18314.html

相关文章:

  • 链表-143.重排链表-力扣(LeetCode)
  • SQL视图、存储过程和触发器
  • npm全局安装后,cmd命令行可以访问,vscode访问报错
  • Django REST框架核心:GenericAPIView详解
  • GitHub Push 认证失败 fatal Authentication failed
  • OceanBase 分区裁剪(Partition Pruning)原理解读
  • Binlog Server守护MySQL数据0丢失
  • 基于Pytochvideo训练自己的的视频分类模型
  • python中view把矩阵维度降低的时候是什么一个排序顺序
  • 机器学习——数据清洗
  • 【论文阅读】Multi-metrics adaptively identifies backdoors in Federated Learning
  • Linux 文本处理与 Shell 编程笔记:正则表达式、sed、awk 与变量脚本
  • 本地文件上传到gitee仓库的详细步骤
  • Excel表格复制到word中格式错乱
  • Nginx 的完整配置文件结构、配置语法以及模块详解
  • 【学习笔记】大话设计模式——一些心得及各设计模式思想记录
  • Vue3全局配置Loading的完整指南:从基础到实战
  • PyTorch API 4
  • Mac 4步 安装 Jenv 管理多版本JDK
  • Linux Capability 解析
  • strncpy 函数使用及其模拟实现
  • 为什么我的UI界面会突然卡顿,失去响应
  • 安装使用Conda
  • pyqt 的自动滚动区QScrollArea
  • Rust 入门 包 (二十一)
  • Ubuntu 虚拟显示器自动控制服务设置(有无显示器的切换)
  • 华为数通认证学习
  • 微算法科技(NASDAQ: MLGO)引入高级区块链DSR算法:重塑区块链网络安全新范式
  • K8S-Configmap资源
  • C++中的 Eigen库使用