RocketMQ 客户端编程模型
消息确认机制
Producer
- 单向发送:Producer 只发不管成功与否,如果发送失败,无法补救,适合日志类场景。
- 同步发送:Producer发送消息之后会阻塞当前线程,等待 Broker 返回结果。这种方式很大程度上保证了消息的安全性,但是效率较低。
- 异步发送:Producer发送消息之后,当前线程不等待 Broker 的响应,通过回调函数处理 broker 的响应。
Broker
根据 Consumer 返回信息保证消息处理的可靠性。如果 Consumer 返回的是 RECONSUME_LATER,Broker 就会过一段时间发起消息重试。
消息重试机制
- 需要发起重试的消息会被保存进消费者组对应的重试 Topic 中,这个 Topic 是 RocketMQ 自动生成的,这些消息不会重新放入 MessageQueue 中,因为会影响正常消息的处理。
- 如果一个消息经过多次重试之后,消费者依然无法正常处理,那么 Broker 会把这条消息推入到消费者组对应的死信队列中。进入死信队列的消息只能人工处理。RocketMQ 默认最大重试次数是 16 次。
Consumer 自行指定起始消费位点
消费位点(Consume Offset): Consumer 当前消费到某个消息队列中的哪一条消息的位置。
Broker 记录了 Consumer 消费了哪些信息,可以决定继续把哪些消息发送给消费者。但是消费者希望自己决定拉取哪些消息,就可以通过一下参数决定拉取消息范围。
- CONSUME_FROM_LAST_OFFSET //从对列的最后一条消息开始消费
- CONSUME_FROM_FIRST_OFFSET //从对列的第一条消息开始消费
- CONSUME_FROM_TIMESTAMP //从某一个时间点开始重新消费
Consumer 处理消息方式
集群(默认)
一个消息只会被消费者组中一个消费者实例处理,由消费者组中所有消费者实例共享同一个消费位点实现。
广播
一个消息会被所有消费者实例接收到并处理。
Consumer 自己记录并更新消费位点的信息,Broker 不再维护消费位点的信息,意味着如果消息消费失败了不能再次消费。
Broker 根据 Consumer 的 offset 消费位点,将对应的信息推给 Consumer。例如当 Consumer 记录的消费位点是 6,Broker 会把 6 号及之后的消息发送给 Consumer。
过滤消息
简单过滤
Producer 和 Consumer 都设置 Tag 属性,根据 Tag 值进行消息过滤。
如果希望匹配多个 Tag 值,可以通过 “||” 连接。
“*” 表示匹配所有。
SQL 过滤
是按照 SQL92 标准来执行的,同时 Broker 要把 enablePropertyFilter 设置为 true。
顺序消息
实现方式:
生产者利用代码限制将这批消息按序放入同一个 MessageQueue 中,否则默认情况下会把消息均分到所有 MessageQueue 中。
消费者实现 MessageListenerOrderly 这个接口去处理队列中的消息,使用这个接口为了给 MessageQueue 加锁,确保拿到当前 MessageQueue 所有消息后再去取另一个 MessageQueue 的消息。
这种方式是局部有序,不是全局有序。
延迟消息
Producer 向 Broker 发送消息后,Broker 隔一段时间后再把这条消息推送给 Consumer。
固定延迟级别
message.setDelayTimeLevel(3) //意味着等待 10s 后在发送消息
Rocket 设定了 18 个延迟级别,每个延迟级别对应一个 MessageQueue,每次扫描这些队列中的消息,进行延迟操作。
指定消息发送时间
message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);
这种方式是通过时间轮算法实现的。
批量消息
生产者将多条消息合并成一个批量消息,一次性发送出去,这样可以减少网络 IO,提升消息发送的吞吐量。
注意:
- 同一批消息的 TOPIC 必须一致。
- 批量消息不支持延迟消息。
- 批量消息的大小不要超过 1M。
事务消息
基础目的是通过 RocketMQ 的事务机制,保证上下游数据一致性。
事务消息发送完整流程(三段式协议)
🚩1. 发送 Half 消息(即“预发送”)
Producer 向 Broker 发送一条“半消息(Half Message)”,这条消息还不会被 Consumer 消费。
Broker 将其持久化存储。
返回给 Producer 成功响应。
🚩2. 执行本地事务逻辑
Producer 拿到 Half 消息写入成功后,立即调用用户实现的 executeLocalTransaction() 方法。
这个方法中你可以写如:下订单、扣库存、写日志等本地业务操作。
事务执行完毕后返回状态(COMMIT / ROLLBACK / UNKNOWN)。
🚩3. Broker 事务回查(Check)
如果 Producer 返回 UNKNOWN(比如宕机了、网络断了),Broker 就会定时主动回查事务状态。
调用 checkLocalTransaction() 方法,询问 Producer:这条事务到底成功了吗?
Producer 响应后,Broker 决定是提交消息(Consumer 可见)还是回滚消息(删除消息)。
举个栗子 🌰:
在下单场景中,你希望先写订单入库,再发送 “订单创建成功” 消息。但如果两步之间宕机,就可能出现:
-
订单的数据入库了,但订单创建成功的消息没发 → 下游系统不知道有订单
-
订单创建成功的消息发出去了,但订单数据没写入库成功 → 下游处理了一个不存在的订单
为了解决这种问题,就需要事务消息来绑定消息发送和本地事务执行的一致性。
消息流转过程(下单场景)
🧩 1. Producer 发送 Half 消息
- Producer 构建一条“订单创建成功”的消息。
- Broker 接收到 Half 消息并持久化,但这时候消息是 不可投递状态。
- Broker 返回发送成功。
此时,消息有了备份,但不会投递给消费者,属于“准备状态”。
🧩2. 执行本地事务逻辑
- Producer 在 Half 消息写入成功后,执行本地事务逻辑(调用用户实现的 executeLocalTransaction() 方法):
-
- 把订单数据写入数据库
-
- 插入成功后返回状态: UNKNOWN
🧩3. Broker 事务回查(Check)
- Broker 启动回查机制:
- 定时调用 checkLocalTransaction(MessageExt msg) 方法
- Producer 查询数据库订单记录:
- 如果发现订单存在 → 返回 COMMIT
- 如果订单不存在或插入失败 → 返回 ROLLBACK
- Broker 最终据此决定:提交消息 or 丢弃消息。
当订单写入数据库后,Producer 返回 COMMIT,Broker 将消息发送给 Consumer。
当订单写入数据库后,Producer 返回 ROLLBACK ,Broker 将消息删除,此条消息不会被消费。