当前位置: 首页 > ai >正文

RocketMQ 客户端编程模型

消息确认机制

Producer

  • 单向发送:Producer 只发不管成功与否,如果发送失败,无法补救,适合日志类场景。
  • 同步发送:Producer发送消息之后会阻塞当前线程,等待 Broker 返回结果。这种方式很大程度上保证了消息的安全性,但是效率较低。
  • 异步发送:Producer发送消息之后,当前线程不等待 Broker 的响应,通过回调函数处理 broker 的响应。

Broker

根据 Consumer 返回信息保证消息处理的可靠性。如果 Consumer 返回的是 RECONSUME_LATER,Broker 就会过一段时间发起消息重试。

消息重试机制
  • 需要发起重试的消息会被保存进消费者组对应的重试 Topic 中,这个 Topic 是 RocketMQ 自动生成的,这些消息不会重新放入 MessageQueue 中,因为会影响正常消息的处理。
  • 如果一个消息经过多次重试之后,消费者依然无法正常处理,那么 Broker 会把这条消息推入到消费者组对应的死信队列中。进入死信队列的消息只能人工处理。RocketMQ 默认最大重试次数是 16 次。
Consumer 自行指定起始消费位点

消费位点(Consume Offset): Consumer 当前消费到某个消息队列中的哪一条消息的位置。

Broker 记录了 Consumer 消费了哪些信息,可以决定继续把哪些消息发送给消费者。但是消费者希望自己决定拉取哪些消息,就可以通过一下参数决定拉取消息范围。

  • CONSUME_FROM_LAST_OFFSET //从对列的最后一条消息开始消费
  • CONSUME_FROM_FIRST_OFFSET //从对列的第一条消息开始消费
  • CONSUME_FROM_TIMESTAMP //从某一个时间点开始重新消费

Consumer 处理消息方式

集群(默认)

一个消息只会被消费者组中一个消费者实例处理,由消费者组中所有消费者实例共享同一个消费位点实现。

广播

一个消息会被所有消费者实例接收到并处理。

Consumer 自己记录并更新消费位点的信息,Broker 不再维护消费位点的信息,意味着如果消息消费失败了不能再次消费。

Broker 根据 Consumer 的 offset 消费位点,将对应的信息推给 Consumer。例如当 Consumer 记录的消费位点是 6,Broker 会把 6 号及之后的消息发送给 Consumer。

过滤消息

简单过滤

Producer 和 Consumer 都设置 Tag 属性,根据 Tag 值进行消息过滤。

如果希望匹配多个 Tag 值,可以通过 “||” 连接。

“*” 表示匹配所有。

SQL 过滤

是按照 SQL92 标准来执行的,同时 Broker 要把 enablePropertyFilter 设置为 true。

顺序消息

实现方式:
生产者利用代码限制将这批消息按序放入同一个 MessageQueue 中,否则默认情况下会把消息均分到所有 MessageQueue 中。

消费者实现 MessageListenerOrderly 这个接口去处理队列中的消息,使用这个接口为了给 MessageQueue 加锁,确保拿到当前 MessageQueue 所有消息后再去取另一个 MessageQueue 的消息。

这种方式是局部有序,不是全局有序。

延迟消息

Producer 向 Broker 发送消息后,Broker 隔一段时间后再把这条消息推送给 Consumer。

固定延迟级别

在这里插入图片描述

message.setDelayTimeLevel(3) //意味着等待 10s 后在发送消息

Rocket 设定了 18 个延迟级别,每个延迟级别对应一个 MessageQueue,每次扫描这些队列中的消息,进行延迟操作。

指定消息发送时间
message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L); 

这种方式是通过时间轮算法实现的。

批量消息

生产者将多条消息合并成一个批量消息,一次性发送出去,这样可以减少网络 IO,提升消息发送的吞吐量。

注意:

  • 同一批消息的 TOPIC 必须一致。
  • 批量消息不支持延迟消息。
  • 批量消息的大小不要超过 1M。

事务消息

基础目的是通过 RocketMQ 的事务机制,保证上下游数据一致性。

事务消息发送完整流程(三段式协议)

🚩1. 发送 Half 消息(即“预发送”)
Producer 向 Broker 发送一条“半消息(Half Message)”,这条消息还不会被 Consumer 消费。

Broker 将其持久化存储。

返回给 Producer 成功响应。

🚩2. 执行本地事务逻辑
Producer 拿到 Half 消息写入成功后,立即调用用户实现的 executeLocalTransaction() 方法。

这个方法中你可以写如:下订单、扣库存、写日志等本地业务操作。

事务执行完毕后返回状态(COMMIT / ROLLBACK / UNKNOWN)。

🚩3. Broker 事务回查(Check)
如果 Producer 返回 UNKNOWN(比如宕机了、网络断了),Broker 就会定时主动回查事务状态。

调用 checkLocalTransaction() 方法,询问 Producer:这条事务到底成功了吗?

Producer 响应后,Broker 决定是提交消息(Consumer 可见)还是回滚消息(删除消息)。


举个栗子 🌰:

在下单场景中,你希望先写订单入库,再发送 “订单创建成功” 消息。但如果两步之间宕机,就可能出现:

  • 订单的数据入库了,但订单创建成功的消息没发 → 下游系统不知道有订单

  • 订单创建成功的消息发出去了,但订单数据没写入库成功 → 下游处理了一个不存在的订单

为了解决这种问题,就需要事务消息来绑定消息发送和本地事务执行的一致性

消息流转过程(下单场景)
🧩 1. Producer 发送 Half 消息

  • Producer 构建一条“订单创建成功”的消息。
  • Broker 接收到 Half 消息并持久化,但这时候消息是 不可投递状态。
  • Broker 返回发送成功。

此时,消息有了备份,但不会投递给消费者,属于“准备状态”。

🧩2. 执行本地事务逻辑

  • Producer 在 Half 消息写入成功后,执行本地事务逻辑(调用用户实现的 executeLocalTransaction() 方法):
    • 把订单数据写入数据库
    • 插入成功后返回状态: UNKNOWN

🧩3. Broker 事务回查(Check)

  • Broker 启动回查机制:
    • 定时调用 checkLocalTransaction(MessageExt msg) 方法
    • Producer 查询数据库订单记录:
      • 如果发现订单存在 → 返回 COMMIT
      • 如果订单不存在或插入失败 → 返回 ROLLBACK
  • Broker 最终据此决定:提交消息 or 丢弃消息。

当订单写入数据库后,Producer 返回 COMMIT,Broker 将消息发送给 Consumer。
当订单写入数据库后,Producer 返回 ROLLBACK ,Broker 将消息删除,此条消息不会被消费。

http://www.xdnf.cn/news/13742.html

相关文章:

  • 第28节 Node.js 文件系统
  • SAP调用deepseek 的API
  • 成像细节丢失如何解决?OAS 矩孔衍射聚焦模型来解困
  • JY901-ROS2驱动代码
  • 力扣-70.爬楼梯
  • 解决蓝牙MAC 地址倒序问题
  • 第十四届蓝桥杯大赛软件赛国赛Java大学A组答案整理
  • 968. Binary Tree Cameras
  • [架构之美]深入优化Spring Boot WebFlux应用
  • 力扣HOT100之技巧:75. 颜色分类
  • 《拆解问题的技术》笔记
  • 常用三款解压软件对比
  • Python6.12打卡(day44)
  • Dify Python调用API
  • 从基础镜像到自定义镜像Docker容器化镜像的演变之路
  • double + double会有精度问题(通过BigDecimal解决)
  • 力扣HOT100之贪心算法:763. 划分字母区间
  • Nacos服务注册与发现原理
  • 关于安卓dialogFragment中,EditText无法删除文字的问题
  • 103. Java 继承 - 状态、实现和类型的多重继承
  • 全球/中国降水量数据集(1940-2024年)
  • 图像解码失败检测
  • 健康管理实训室建设方案:构建智慧康养人才培养生态体系
  • PERST#、Hot Reset、Link Disable
  • React16,17,18,19更新对比
  • slam--高斯分布
  • 《树状数组》
  • 消除信息屏障推动系统联动,IBMS系统成为建筑智能控制核心枢纽
  • EtherCAT转Modbus TCP网关实现倍福CX9020与科尔摩根NDC8AGV控制器设备之间的通讯案例
  • C语言入门教程