202533 | SpringBoot集成RocketMQ
SpringBoot集成RocketMQ极简入门
一、基础配置(3步完成)
- 添加依赖
<!-- pom.xml -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>
- 配置文件
# application.yml
rocketmq:name-server: 127.0.0.1:9876 # NameServer地址producer:group: my-producer-group # 生产者组名
- 启动类添加注解
@SpringBootApplication
public class MyApp {public static void main(String[] args) {SpringApplication.run(MyApp.class, args);}
}
二、发送消息(2种方式)
- 同步发送
@RestController
public class MyController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send")public String send() {rocketMQTemplate.convertAndSend("test-topic", "Hello World!");return "消息已发送";}
}
- 带Tag发送
// 发送到test-topic下的tagA
rocketMQTemplate.convertAndSend("test-topic:tagA", "带标签的消息");
三、接收消息
@Service
@RocketMQMessageListener(topic = "test-topic",consumerGroup = "my-consumer-group"
)
public class MyConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("收到消息: " + message);}
}
四、测试流程
- 启动SpringBoot应用
- 访问
http://localhost:8080/send
- 控制台将打印:
收到消息: Hello World!
RocketMQ 发送对象和集合消息
一、发送对象消息
- 基础发送方式
// 1. 定义可序列化的对象
@Data // 需要实现Serializable
public class User implements Serializable {private Integer id;private String name;
}// 2. 发送对象
@RestController
public class ObjectController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send-user")public String sendUser() {User user = new User(1, "张三");rocketMQTemplate.convertAndSend("user-topic", user);return "用户对象已发送";}
}
- 接收对象消息
@Service
@RocketMQMessageListener(topic = "user-topic",consumerGroup = "user-consumer-group"
)
public class UserConsumer implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println("收到用户: " + user.getName());}
}
二、发送集合消息
- 发送List集合
@GetMapping("/send-list")
public String sendList() {List<String> names = Arrays.asList("张三", "李四", "王五");rocketMQTemplate.convertAndSend("list-topic", names);return "列表已发送";
}
- 接收List消息
@Service
@RocketMQMessageListener(topic = "list-topic",consumerGroup = "list-consumer-group"
)
public class ListConsumer implements RocketMQListener<List<String>> {@Overridepublic void onMessage(List<String> names) {names.forEach(name -> System.out.println("收到名字: " + name));}
}
- 发送Map集合
@GetMapping("/send-map")
public String sendMap() {Map<String, Integer> scores = new HashMap<>();scores.put("数学", 90);scores.put("英语", 85);rocketMQTemplate.convertAndSend("map-topic", scores);return "Map已发送";
}
三、注意事项
-
序列化要求:
- 所有对象必须实现
Serializable
接口 - 推荐使用
@Data
Lombok注解简化代码
- 所有对象必须实现
-
集合类型限制:
// 这些集合类型都可以直接发送 List/Set/Map/Queue...
-
消息大小限制:
- 默认最大4MB(可通过配置修改)
rocketmq.producer.max-message-size=5242880 # 修改为5MB
四、完整示例
- 发送复杂对象集合
@Data
public class Order implements Serializable {private Long orderId;private List<Product> products;
}@GetMapping("/send-orders")
public String sendOrders() {List<Order> orders = Arrays.asList(new Order(1001L, Arrays.asList(new Product("手机"), new Product("耳机"))),new Order(1002L, Arrays.asList(new Product("电脑"))));rocketMQTemplate.convertAndSend("order-topic", orders);return "订单集合已发送";
}
- 接收复杂集合
@Service
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<List<Order>> {@Overridepublic void onMessage(List<Order> orders) {orders.forEach(order -> {System.out.println("订单ID: " + order.getOrderId());order.getProducts().forEach(p -> System.out.println(" 商品: " + p.getName()));});}
}
五、常见问题解决
-
序列化错误:
org.apache.rocketmq.client.exception.MQClientException: The message body cannot be null
解决方法:检查对象是否实现了Serializable
-
集合泛型问题:
// 如果遇到泛型擦除问题,可以指定TypeReference rocketMQTemplate.convertAndSend("generic-topic", list, new ParameterizedTypeReference<List<User>>(){});
这样就完成了对象和集合消息的发送与接收!
SpringBoot集成RocketMQ发送不同消息模式
一、同步消息(可靠但性能较低)
@RestController
public class SyncController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sync")public String sendSync() {// 同步发送会阻塞直到收到Broker确认SendResult result = rocketMQTemplate.syncSend("sync-topic", "同步消息");return "发送成功,MsgId:" + result.getMsgId();}
}
特点:适用于重要通知、支付结果等需要确认的场景
二、异步消息(高性能)
@GetMapping("/async")
public String sendAsync() {rocketMQTemplate.asyncSend("async-topic", "异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功:" + sendResult);}@Overridepublic void onException(Throwable e) {System.err.println("发送失败:" + e.getMessage());}});return "异步请求已提交";
}
特点:适合日志收集等允许少量丢失的场景
三、单向消息(只发送不等待)
@GetMapping("/oneway")
public String sendOneWay() {rocketMQTemplate.sendOneWay("oneway-topic", "单向消息");return "单向消息已发出";
}
特点:吞吐量最高,但不保证可靠性
四、延迟消息
@GetMapping("/delay")
public String sendDelay() {// 延迟级别:1=1s, 2=5s, 3=10s,..., 18=2hrocketMQTemplate.syncSend("delay-topic", MessageBuilder.withPayload("延迟消息").build(),3000, // 发送超时3 // 延迟级别3(10秒后投递));return "延迟消息已发送";
}
延迟级别对应时间:
1=1s, 2=5s, 3=10s, 4=30s, 5=1m, 6=2m...18=2h
五、顺序消息
@GetMapping("/order")
public String sendOrder() {// 相同orderId的消息会被分配到同一个队列for (int i = 1; i <= 5; i++) {rocketMQTemplate.syncSendOrderly("order-topic", "顺序消息" + i,"ORDER_001" // 保证相同订单号的消息顺序);}return "顺序消息已发送";
}// 消费者需实现顺序消费
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "order-group",consumeMode = ConsumeMode.ORDERLY // 关键配置
)
public class OrderConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("顺序消费: " + message);}
}
六、事务消息
@Service
public class TransactionService implements RocketMQLocalTransactionListener {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/transaction")public String sendTransaction() {rocketMQTemplate.sendMessageInTransaction("tx-group", "tx-topic",MessageBuilder.withPayload("事务消息").build(),null);return "事务消息已提交";}@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 执行本地事务System.out.println("执行本地事务...");return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 事务状态检查(补偿机制)return RocketMQLocalTransactionState.COMMIT;}
}
七、消费者配置模板
@RocketMQMessageListener(topic = "${rocketmq.topic}",consumerGroup = "${rocketmq.consumer.group}",selectorExpression = "tagA || tagB", // 消息过滤consumeMode = ConsumeMode.CONCURRENTLY, // 并发模式messageModel = MessageModel.CLUSTERING // 集群模式
)
public class MyConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息}
}
八、配置参数说明
rocketmq:name-server: 127.0.0.1:9876producer:group: my-groupsend-message-timeout: 3000 # 发送超时(ms)retry-times-when-send-failed: 2 # 重试次数max-message-size: 4194304 # 4MB消息限制consumer:my-consumer:topic: my-topicselector-type: tagselector-expression: "*"consume-thread-max: 20 # 最大消费线程
九、消息轨迹追踪
@Configuration
public class RocketMQConfig {@Beanpublic RocketMQTemplate rocketMQTemplate(RocketMQClientConfig config) {config.setEnableMsgTrace(true); // 开启消息轨迹config.setCustomizedTraceTopic("my-trace-topic");return new RocketMQTemplate();}
}
十、不同模式对比
消息模式 | 可靠性 | 吞吐量 | 延迟 | 典型场景 |
---|---|---|---|---|
同步消息 | 高 | 低 | 中等 | 支付结果通知 |
异步消息 | 中 | 高 | 低 | 日志记录 |
单向消息 | 低 | 最高 | 最低 | 心跳检测 |
延迟消息 | 高 | 中 | 可调 | 订单超时取消 |
顺序消息 | 高 | 中 | 中等 | 订单状态变更 |
事务消息 | 最高 | 低 | 高 | 分布式事务 |
通过以上代码示例,可以快速实现RocketMQ的各种消息模式集成。根据业务场景选择合适的方式,平衡可靠性与性能需求。
SpringBoot集成RocketMQ消息过滤指南
一、Tag过滤(最常用方式)
- 生产者发送带Tag消息
@RestController
public class TagController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 发送不同Tag的消息@GetMapping("/send-tags")public String sendTags() {rocketMQTemplate.syncSend("filter-topic:tagA", "TagA消息");rocketMQTemplate.syncSend("filter-topic:tagB", "TagB消息");return "已发送不同Tag的消息";}
}
- 消费者按Tag过滤
@Service
@RocketMQMessageListener(topic = "filter-topic",selectorExpression = "tagA", // 只接收tagA的消息consumerGroup = "tag-filter-group"
)
public class TagFilterConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("收到TagA消息: " + message);}
}
二、SQL表达式过滤(更灵活)
- 生产者设置消息属性
@GetMapping("/send-sql")
public String sendSqlFilter() {// 设置消息属性Message<User> message = MessageBuilder.withPayload(new User(1, "张三")).setHeader("age", 25).setHeader("region", "east").build();rocketMQTemplate.syncSend("sql-topic", message);return "已发送带属性的消息";
}
- 消费者配置SQL过滤
@Service
@RocketMQMessageListener(topic = "sql-topic",selectorType = SelectorType.SQL92,selectorExpression = "age > 20 AND region = 'east'",consumerGroup = "sql-filter-group"
)
public class SqlFilterConsumer implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println("收到符合条件的用户: " + user.getName());}
}
三、多Tag组合过滤
// 消费者可以订阅多个Tag(用 || 分隔)
@RocketMQMessageListener(topic = "multi-tag-topic",selectorExpression = "tagA || tagB",consumerGroup = "multi-tag-group"
)
public class MultiTagConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("收到TagA或TagB消息: " + message);}
}
四、过滤注意事项
- Broker配置开启
# broker.conf 需要开启过滤支持
enablePropertyFilter=true
- 性能影响
- Tag过滤:几乎无性能损耗
- SQL过滤:会有额外计算开销(约5-10%性能下降)
- 常见SQL语法
age > 18 AND name LIKE '%张%'
IS NOT NULL
BETWEEN 10 AND 20
IN ('vip', 'svip')
五、完整配置示例
- application.yml配置
rocketmq:name-server: 127.0.0.1:9876consumer:tag-filter-group:topic: filter-topicselector-type: tagselector-expression: "tagA"sql-filter-group:topic: sql-topicselector-type: sql92selector-expression: "age > 20"
- 消息头过滤增强版
// 发送时设置更多属性
Map<String, Object> headers = new HashMap<>();
headers.put("price", 99.9);
headers.put("category", "electronics");Message<Product> msg = MessageBuilder.withPayload(product).setHeaders(headers).build();
rocketMQTemplate.syncSend("product-topic", msg);// 消费者SQL过滤
@RocketMQMessageListener(topic = "product-topic",selectorType = SelectorType.SQL92,selectorExpression = "price > 50 AND category = 'electronics'"
)
六、过滤模式对比
过滤方式 | 配置复杂度 | 灵活性 | 性能影响 | 适用场景 |
---|---|---|---|---|
Tag过滤 | 简单 | 低 | 无 | 简单分类 |
SQL过滤 | 中等 | 高 | 轻微 | 复杂条件 |
多Tag | 简单 | 中 | 无 | 多分类 |
七、常见问题解决
- 过滤不生效检查
- 确认Broker配置
enablePropertyFilter=true
- 检查Tag/属性名称是否拼写正确
- 重启Broker使配置生效
- SQL语法错误
// 错误示例(字符串需用单引号)
selectorExpression = "region = east" // 错误
selectorExpression = "region = 'east'" // 正确
- 性能优化建议
# 对高频消息增加Tag前缀过滤
selectorExpression = "tagA AND price > 100"
SpringBoot集成RocketMQ消息消费的两种核心模式
一、并发消费模式(默认)
- 基本实现方式
@Service
@RocketMQMessageListener(topic = "concurrent-topic",consumerGroup = "concurrent-group",consumeMode = ConsumeMode.CONCURRENTLY // 显式声明并发模式
)
public class ConcurrentConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 多线程并行处理System.out.println("线程" + Thread.currentThread().getId() + "处理消息: " + message);}
}
- 关键特性
- 多线程并行消费
- 不保证消息顺序
- 吞吐量高
- 自动负载均衡
- 线程池配置
rocketmq:consumer:concurrent-group:consume-thread-min: 5 # 最小线程数consume-thread-max: 20 # 最大线程数
二、顺序消费模式
- 基本实现方式
@Service
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "order-group",consumeMode = ConsumeMode.ORDERLY // 顺序消费模式
)
public class OrderlyConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 单线程顺序处理(相同队列)System.out.println("顺序处理消息: " + message);}
}
- 关键特性
- 单队列单线程消费
- 保证同一队列消息顺序
- 吞吐量较低
- 需要业务保证分片逻辑
- 顺序消息发送
// 发送时指定相同的shardingKey(如订单ID)
rocketMQTemplate.syncSendOrderly("order-topic", "顺序消息内容","ORDER_123" // 相同订单号的消息会进入同一队列
);
三、两种模式对比
维度 | 并发模式 | 顺序模式 |
---|---|---|
消费方式 | 多线程并行 | 单队列单线程 |
消息顺序 | 不保证 | 保证同一队列顺序 |
吞吐量 | 高(万级TPS) | 较低(千级TPS) |
适用场景 | 日志收集、统计计算等 | 订单状态变更、库存扣减等 |
线程池配置 | 可调整线程数 | 固定为队列数的线程 |
异常处理 | 自动重试 | 需防止单条消息阻塞后续消息 |
四、生产注意事项
- 并发模式优化
// 批量消费配置
@RocketMQMessageListener(consumerGroup = "batch-group",topic = "batch-topic",consumeMessageBatchMaxSize = 32 // 每次拉取32条
)
public class BatchConsumer implements RocketMQListener<List<MessageExt>> {@Overridepublic void onMessage(List<MessageExt> messages) {messages.forEach(msg -> {System.out.println("批量处理: " + new String(msg.getBody()));});}
}
- 顺序模式防阻塞
@Override
public void onMessage(String message) {try {// 必须捕获所有异常,避免阻塞队列processOrder(message);} catch (Exception e) {log.error("处理失败,消息将重试", e);throw e; // 抛出异常触发重试}
}
- 混合使用案例
// 订单核心流程用顺序消费
@RocketMQMessageListener(topic = "order-core",consumeMode = ConsumeMode.ORDERLY
)
public class OrderCoreConsumer implements RocketMQListener<OrderEvent> { /*...*/ }// 订单日志用并发消费
@RocketMQMessageListener(topic = "order-log",consumeMode = ConsumeMode.CONCURRENTLY
)
public class OrderLogConsumer implements RocketMQListener<OrderLog> { /*...*/ }