当前位置: 首页 > java >正文

202533 | SpringBoot集成RocketMQ

SpringBoot集成RocketMQ极简入门

一、基础配置(3步完成)
  1. 添加依赖
<!-- pom.xml -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>
  1. 配置文件
# application.yml
rocketmq:name-server: 127.0.0.1:9876  # NameServer地址producer:group: my-producer-group    # 生产者组名
  1. 启动类添加注解
@SpringBootApplication
public class MyApp {public static void main(String[] args) {SpringApplication.run(MyApp.class, args);}
}
二、发送消息(2种方式)
  1. 同步发送
@RestController
public class MyController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send")public String send() {rocketMQTemplate.convertAndSend("test-topic", "Hello World!");return "消息已发送";}
}
  1. 带Tag发送
// 发送到test-topic下的tagA
rocketMQTemplate.convertAndSend("test-topic:tagA", "带标签的消息");
三、接收消息
@Service
@RocketMQMessageListener(topic = "test-topic",consumerGroup = "my-consumer-group"
)
public class MyConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("收到消息: " + message);}
}
四、测试流程
  1. 启动SpringBoot应用
  2. 访问 http://localhost:8080/send
  3. 控制台将打印:
收到消息: Hello World!

RocketMQ 发送对象和集合消息

一、发送对象消息
  1. 基础发送方式
// 1. 定义可序列化的对象
@Data // 需要实现Serializable
public class User implements Serializable {private Integer id;private String name;
}// 2. 发送对象
@RestController
public class ObjectController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send-user")public String sendUser() {User user = new User(1, "张三");rocketMQTemplate.convertAndSend("user-topic", user);return "用户对象已发送";}
}
  1. 接收对象消息
@Service
@RocketMQMessageListener(topic = "user-topic",consumerGroup = "user-consumer-group"
)
public class UserConsumer implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println("收到用户: " + user.getName());}
}
二、发送集合消息
  1. 发送List集合
@GetMapping("/send-list")
public String sendList() {List<String> names = Arrays.asList("张三", "李四", "王五");rocketMQTemplate.convertAndSend("list-topic", names);return "列表已发送";
}
  1. 接收List消息
@Service
@RocketMQMessageListener(topic = "list-topic",consumerGroup = "list-consumer-group"
)
public class ListConsumer implements RocketMQListener<List<String>> {@Overridepublic void onMessage(List<String> names) {names.forEach(name -> System.out.println("收到名字: " + name));}
}
  1. 发送Map集合
@GetMapping("/send-map")
public String sendMap() {Map<String, Integer> scores = new HashMap<>();scores.put("数学", 90);scores.put("英语", 85);rocketMQTemplate.convertAndSend("map-topic", scores);return "Map已发送";
}
三、注意事项
  1. 序列化要求

    • 所有对象必须实现Serializable接口
    • 推荐使用@Data Lombok注解简化代码
  2. 集合类型限制

    // 这些集合类型都可以直接发送
    List/Set/Map/Queue...
    
  3. 消息大小限制

    • 默认最大4MB(可通过配置修改)
    rocketmq.producer.max-message-size=5242880 # 修改为5MB
    
四、完整示例
  1. 发送复杂对象集合
@Data
public class Order implements Serializable {private Long orderId;private List<Product> products;
}@GetMapping("/send-orders")
public String sendOrders() {List<Order> orders = Arrays.asList(new Order(1001L, Arrays.asList(new Product("手机"), new Product("耳机"))),new Order(1002L, Arrays.asList(new Product("电脑"))));rocketMQTemplate.convertAndSend("order-topic", orders);return "订单集合已发送";
}
  1. 接收复杂集合
@Service
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<List<Order>> {@Overridepublic void onMessage(List<Order> orders) {orders.forEach(order -> {System.out.println("订单ID: " + order.getOrderId());order.getProducts().forEach(p -> System.out.println("  商品: " + p.getName()));});}
}
五、常见问题解决
  1. 序列化错误

    org.apache.rocketmq.client.exception.MQClientException: 
    The message body cannot be null
    

    解决方法:检查对象是否实现了Serializable

  2. 集合泛型问题

    // 如果遇到泛型擦除问题,可以指定TypeReference
    rocketMQTemplate.convertAndSend("generic-topic", list, new ParameterizedTypeReference<List<User>>(){});
    

这样就完成了对象和集合消息的发送与接收!

SpringBoot集成RocketMQ发送不同消息模式

一、同步消息(可靠但性能较低)
@RestController
public class SyncController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sync")public String sendSync() {// 同步发送会阻塞直到收到Broker确认SendResult result = rocketMQTemplate.syncSend("sync-topic", "同步消息");return "发送成功,MsgId:" + result.getMsgId();}
}

特点:适用于重要通知、支付结果等需要确认的场景

二、异步消息(高性能)
@GetMapping("/async")
public String sendAsync() {rocketMQTemplate.asyncSend("async-topic", "异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功:" + sendResult);}@Overridepublic void onException(Throwable e) {System.err.println("发送失败:" + e.getMessage());}});return "异步请求已提交";
}

特点:适合日志收集等允许少量丢失的场景

三、单向消息(只发送不等待)
@GetMapping("/oneway")
public String sendOneWay() {rocketMQTemplate.sendOneWay("oneway-topic", "单向消息");return "单向消息已发出";
}

特点:吞吐量最高,但不保证可靠性

四、延迟消息
@GetMapping("/delay")
public String sendDelay() {// 延迟级别:1=1s, 2=5s, 3=10s,..., 18=2hrocketMQTemplate.syncSend("delay-topic", MessageBuilder.withPayload("延迟消息").build(),3000,  // 发送超时3      // 延迟级别3(10秒后投递));return "延迟消息已发送";
}

延迟级别对应时间

1=1s, 2=5s, 3=10s, 4=30s, 5=1m, 6=2m...18=2h
五、顺序消息
@GetMapping("/order")
public String sendOrder() {// 相同orderId的消息会被分配到同一个队列for (int i = 1; i <= 5; i++) {rocketMQTemplate.syncSendOrderly("order-topic", "顺序消息" + i,"ORDER_001"  // 保证相同订单号的消息顺序);}return "顺序消息已发送";
}// 消费者需实现顺序消费
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "order-group",consumeMode = ConsumeMode.ORDERLY  // 关键配置
)
public class OrderConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("顺序消费: " + message);}
}
六、事务消息
@Service
public class TransactionService implements RocketMQLocalTransactionListener {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/transaction")public String sendTransaction() {rocketMQTemplate.sendMessageInTransaction("tx-group", "tx-topic",MessageBuilder.withPayload("事务消息").build(),null);return "事务消息已提交";}@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 执行本地事务System.out.println("执行本地事务...");return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 事务状态检查(补偿机制)return RocketMQLocalTransactionState.COMMIT;}
}
七、消费者配置模板
@RocketMQMessageListener(topic = "${rocketmq.topic}",consumerGroup = "${rocketmq.consumer.group}",selectorExpression = "tagA || tagB",  // 消息过滤consumeMode = ConsumeMode.CONCURRENTLY,  // 并发模式messageModel = MessageModel.CLUSTERING   // 集群模式
)
public class MyConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息}
}
八、配置参数说明
rocketmq:name-server: 127.0.0.1:9876producer:group: my-groupsend-message-timeout: 3000    # 发送超时(ms)retry-times-when-send-failed: 2  # 重试次数max-message-size: 4194304     # 4MB消息限制consumer:my-consumer:topic: my-topicselector-type: tagselector-expression: "*"consume-thread-max: 20      # 最大消费线程
九、消息轨迹追踪
@Configuration
public class RocketMQConfig {@Beanpublic RocketMQTemplate rocketMQTemplate(RocketMQClientConfig config) {config.setEnableMsgTrace(true);  // 开启消息轨迹config.setCustomizedTraceTopic("my-trace-topic");return new RocketMQTemplate();}
}
十、不同模式对比
消息模式可靠性吞吐量延迟典型场景
同步消息中等支付结果通知
异步消息日志记录
单向消息最高最低心跳检测
延迟消息可调订单超时取消
顺序消息中等订单状态变更
事务消息最高分布式事务

通过以上代码示例,可以快速实现RocketMQ的各种消息模式集成。根据业务场景选择合适的方式,平衡可靠性与性能需求。

SpringBoot集成RocketMQ消息过滤指南

一、Tag过滤(最常用方式)
  1. 生产者发送带Tag消息
@RestController
public class TagController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 发送不同Tag的消息@GetMapping("/send-tags")public String sendTags() {rocketMQTemplate.syncSend("filter-topic:tagA", "TagA消息");rocketMQTemplate.syncSend("filter-topic:tagB", "TagB消息");return "已发送不同Tag的消息";}
}
  1. 消费者按Tag过滤
@Service
@RocketMQMessageListener(topic = "filter-topic",selectorExpression = "tagA",  // 只接收tagA的消息consumerGroup = "tag-filter-group"
)
public class TagFilterConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("收到TagA消息: " + message);}
}
二、SQL表达式过滤(更灵活)
  1. 生产者设置消息属性
@GetMapping("/send-sql")
public String sendSqlFilter() {// 设置消息属性Message<User> message = MessageBuilder.withPayload(new User(1, "张三")).setHeader("age", 25).setHeader("region", "east").build();rocketMQTemplate.syncSend("sql-topic", message);return "已发送带属性的消息";
}
  1. 消费者配置SQL过滤
@Service
@RocketMQMessageListener(topic = "sql-topic",selectorType = SelectorType.SQL92,selectorExpression = "age > 20 AND region = 'east'",consumerGroup = "sql-filter-group"
)
public class SqlFilterConsumer implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println("收到符合条件的用户: " + user.getName());}
}
三、多Tag组合过滤
// 消费者可以订阅多个Tag(用 || 分隔)
@RocketMQMessageListener(topic = "multi-tag-topic",selectorExpression = "tagA || tagB",consumerGroup = "multi-tag-group"
)
public class MultiTagConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("收到TagA或TagB消息: " + message);}
}
四、过滤注意事项
  1. Broker配置开启
# broker.conf 需要开启过滤支持
enablePropertyFilter=true
  1. 性能影响
  • Tag过滤:几乎无性能损耗
  • SQL过滤:会有额外计算开销(约5-10%性能下降)
  1. 常见SQL语法
age > 18 AND name LIKE '%张%'
IS NOT NULL
BETWEEN 10 AND 20
IN ('vip', 'svip')
五、完整配置示例
  1. application.yml配置
rocketmq:name-server: 127.0.0.1:9876consumer:tag-filter-group:topic: filter-topicselector-type: tagselector-expression: "tagA"sql-filter-group:topic: sql-topicselector-type: sql92selector-expression: "age > 20"
  1. 消息头过滤增强版
// 发送时设置更多属性
Map<String, Object> headers = new HashMap<>();
headers.put("price", 99.9);
headers.put("category", "electronics");Message<Product> msg = MessageBuilder.withPayload(product).setHeaders(headers).build();
rocketMQTemplate.syncSend("product-topic", msg);// 消费者SQL过滤
@RocketMQMessageListener(topic = "product-topic",selectorType = SelectorType.SQL92,selectorExpression = "price > 50 AND category = 'electronics'"
)
六、过滤模式对比
过滤方式配置复杂度灵活性性能影响适用场景
Tag过滤简单简单分类
SQL过滤中等轻微复杂条件
多Tag简单多分类
七、常见问题解决
  1. 过滤不生效检查
  • 确认Broker配置enablePropertyFilter=true
  • 检查Tag/属性名称是否拼写正确
  • 重启Broker使配置生效
  1. SQL语法错误
// 错误示例(字符串需用单引号)
selectorExpression = "region = east"  // 错误
selectorExpression = "region = 'east'" // 正确
  1. 性能优化建议
# 对高频消息增加Tag前缀过滤
selectorExpression = "tagA AND price > 100"

SpringBoot集成RocketMQ消息消费的两种核心模式

一、并发消费模式(默认)
  1. 基本实现方式
@Service
@RocketMQMessageListener(topic = "concurrent-topic",consumerGroup = "concurrent-group",consumeMode = ConsumeMode.CONCURRENTLY  // 显式声明并发模式
)
public class ConcurrentConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 多线程并行处理System.out.println("线程" + Thread.currentThread().getId() + "处理消息: " + message);}
}
  1. 关键特性
  • 多线程并行消费
  • 不保证消息顺序
  • 吞吐量高
  • 自动负载均衡
  1. 线程池配置
rocketmq:consumer:concurrent-group:consume-thread-min: 5   # 最小线程数consume-thread-max: 20  # 最大线程数
二、顺序消费模式
  1. 基本实现方式
@Service
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "order-group",consumeMode = ConsumeMode.ORDERLY  // 顺序消费模式
)
public class OrderlyConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 单线程顺序处理(相同队列)System.out.println("顺序处理消息: " + message);}
}
  1. 关键特性
  • 单队列单线程消费
  • 保证同一队列消息顺序
  • 吞吐量较低
  • 需要业务保证分片逻辑
  1. 顺序消息发送
// 发送时指定相同的shardingKey(如订单ID)
rocketMQTemplate.syncSendOrderly("order-topic", "顺序消息内容","ORDER_123"  // 相同订单号的消息会进入同一队列
);
三、两种模式对比
维度并发模式顺序模式
消费方式多线程并行单队列单线程
消息顺序不保证保证同一队列顺序
吞吐量高(万级TPS)较低(千级TPS)
适用场景日志收集、统计计算等订单状态变更、库存扣减等
线程池配置可调整线程数固定为队列数的线程
异常处理自动重试需防止单条消息阻塞后续消息
四、生产注意事项
  1. 并发模式优化
// 批量消费配置
@RocketMQMessageListener(consumerGroup = "batch-group",topic = "batch-topic",consumeMessageBatchMaxSize = 32  // 每次拉取32条
)
public class BatchConsumer implements RocketMQListener<List<MessageExt>> {@Overridepublic void onMessage(List<MessageExt> messages) {messages.forEach(msg -> {System.out.println("批量处理: " + new String(msg.getBody()));});}
}
  1. 顺序模式防阻塞
@Override
public void onMessage(String message) {try {// 必须捕获所有异常,避免阻塞队列processOrder(message);} catch (Exception e) {log.error("处理失败,消息将重试", e);throw e; // 抛出异常触发重试}
}
  1. 混合使用案例
// 订单核心流程用顺序消费
@RocketMQMessageListener(topic = "order-core",consumeMode = ConsumeMode.ORDERLY
)
public class OrderCoreConsumer implements RocketMQListener<OrderEvent> { /*...*/ }// 订单日志用并发消费
@RocketMQMessageListener(topic = "order-log",consumeMode = ConsumeMode.CONCURRENTLY
)
public class OrderLogConsumer implements RocketMQListener<OrderLog> { /*...*/ }
五、模式选择决策树
需要保证消息顺序?
顺序模式
高吞吐需求?
并发模式+批量消费
并发模式
http://www.xdnf.cn/news/3953.html

相关文章:

  • 2025 年 408 真题及答案
  • 深入探索Anthropic Claude与Spring AI的融合应用
  • 虚幻引擎5-Unreal Engine笔记之显卡环境设置使开发流畅
  • 【LaTeX+VSCode本地Win11编译教程】
  • Debezium TableSchemaBuilder详解
  • (一)Modular Monolith Architecture(项目结构/.net项目初始化/垂直切片架构)
  • 洛谷 P1440 求m区间内的最小值
  • 8.5/Q1,Charls高分经典文章解读
  • 【Web3】上市公司利用RWA模式融资和促进业务发展案例
  • Spring Boot多模块划分设计
  • C++访问MySQL
  • 《Python星球日记》第31天:Django 框架入门
  • opencv+opencv_contrib+cuda和VS2022编译
  • 202531 | RocketMQ 消息过滤 + 消息重试机制 + 死信消息 + 重复消费问题
  • zotero pdf中英翻译插件使用
  • epub格式转txt格式工具,txt批量转PDF
  • 设计模式(结构型)-组合模式
  • 【Java ee初阶】多线程(6)
  • item_get_app_pro - 获得淘宝app商品详情原数据操作流程
  • 使用 vllm 部署 Llama3-8b-Instruct
  • 【C++】grpc(一):安装
  • 【Python】Python好玩的第三方库之二维码生成,操作xlsx文件,以及音频控制器
  • 从零开始学Flink:开启实时计算的魔法之旅
  • CSS知识总结
  • Socket 编程 TCP
  • OpenGl实战笔记(1)基于qt5.15.2+mingw64+opengl绘制三角形
  • 解决因字段过长使MYSQL数据解析超时导致线上CPU告警问题
  • 技术犯规计入个人犯规吗·棒球1号位
  • [C语言]第一章-初识
  • 【Linux】深入理解Linux基础IO:从文件描述符到缓冲区设计