当前位置: 首页 > web >正文

消息消费类型和具体实现

顺序消息

顺序消息是消息队列 RocketMQ 提供的一种高级消息类型。
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费。
即先发送的消息先消费,后发送的消息后消费。

顺序消息的应用场景

RocketMQ 的顺序消费(也称为有序消息或顺序消息)在某些业务场景中是至关重要的。
在常见的订单处理、金融业务系统中都有顺序消息的需求

  • 金融交易:例如,在处理用户的账户余额更新时,所有与该用户账户相关的操作必须按照发生的顺序进行处理。如果先处理了取款消息而后处理存款消息,可能会导致用户的账户余额出现错误。

  • 订单处理系统:在一个电商环境中,从下单、支付到发货等步骤都需要保证按顺序执行。比如,不能在订单未创建之前就进行支付确认,也不能在未支付之前就开始发货。

  • 工作流管理:当业务逻辑涉及多个步骤并且这些步骤之间存在依赖关系时,确保每个任务按照正确的顺序被执行是非常重要的。例如,文档审批流程可能需要依次通过不同级别的审核人员。

乱序因素

针对于RocketMQ普通消息的流转,我们前几节课已经详细介绍:

  1. 生产者异步/并行发送:消息可能以不同顺序到达MQ。
  2. MQ的分区/队列机制:消息被分散到不同分区或队列,不同队列的消费速度不一致。
  3. 消费者并行消费:多个消费者实例或线程同时处理消息,导致乱序。

如何实现顺序消息?

为了实现顺序消费,RocketMQ 提供了两种类型的顺序消息:

  • 全局顺序
  • 分区顺序

全局顺序

我们要实现全局消息的顺序消费,
可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费。
从而保证消息的全局有序,但是这种方式效率低,一般不使用。
在这里插入图片描述

分区有序

在实际开发中分区有序适用于大多数业务场景,并且很少会选择全局顺序。因为后者可能导致性能瓶颈(所有的消息都要经过同一个队列),而前者可以在一定程度上平衡顺序性和吞吐量。
什么叫分区有序呢?
在同一 Topic 下的不同分区(Queue)之间不保证顺序,但在同一分区内的消息则保持严格的顺序。

假设一个Topic分配了三个消息队列,生产者在发送消息的时候,可以对消息设置一个路由ID。
比如想保证一个订单的相关消息有序,那么就使用订单ID当做路由ID。
在发送消息的时候,通过订单ID对消息队列的个数取余,根据取余结果选择消息队列。
在这里插入图片描述

这就是RocketMQ的局部有序,保证消息在某个消息队列中有序
在这里插入图片描述

生产顺序性

在分布式环境下,保证消息的全局顺序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有沟通的情况下各自向 RocketMQ 服务端发送消息 a 和消息 b,由于分布式系统的限制,我们无法保证 a 和 b 的顺序。
因此业界消息系统通常保证的是分区的顺序性,即保证带有同一属性的消息的顺序,我们将该属性称之为 MessageGroup。(就像上面举例中Trade_1_1、Trade_1_2、Trade_1_3属于一个MessageGroup)
对于同一 MessageGroup,为了保证其发送顺序的先后性,由同一个Producer负责,并且对于每一个 Producer 而言,顺序消息是同步发送的。
同步发送的好处是显而易见的,在客户端得到上一条消息的发送结果后再发送下一条,即能准确保证发送顺序,若使用异步发送或多线程则很难保证这一点。
在这里插入图片描述

我们简单总结一下顺序发送
RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
  • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

RocketMQ通过自定义队列选择器可以实现这一点

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}
}, orderId);

消费者顺序性

与顺序消息不同的是,普通消息的消费实际上没有任何限制,消费者拉取的消息是被异步、并发消费的,而顺序消息,需要保证对于同一个 MessageGroup,同一时刻只有一个客户端在消费消息,并且在该条消息被确认消费完成之前(或者进入死信队列),消费者无法消费同一 MessageGroup 的下一条消息,否则消费的顺序性将得不到保证。

  • RocketMQ:使用 MessageListenerOrderly 监听器顺序消费。
consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> messages, ConsumeOrderlyContext context) {// 单线程处理消息return ConsumeOrderlyStatus.SUCCESS;}
});

延迟消息

延迟消息使用场景

  1. 电商交易系统的订单超时未支付,自动取消订单
    在电商交易系统中,像淘宝、京东,我们提交了一个订单之后,在支付时都会提示,需要在指定时间内(例如30分钟)完成支付,否则订单将被取消的消息,实际上这个超时未支付功能就可以使用延时消息来实现。在下单成功之后,就发送一个延时消息,然后指定消息的延时时间为30分钟,这条消息将会在30分钟后投递给后台业务系统(Consumer),此时才能被消费者进行消费,消费消息的时候会再去检查这个订单的状态,确认下是否支付成功,如果支付成功,则忽略不处理;如果订单还是未支付,则进行取消订单、释放库存等操作;

  2. 活动场景
    比如B站视频投稿经常会发起一些活动,Up主在活动期间可以按照活动规则投稿视频,在活动时间截止后,后台根据Up主完成任务的情况以及结合投稿视频的播放量等进行判定,然后派发对应的奖励。这种场景我们也可以采用延时消息来实现,即在发起活动后,同时发送一条延时消息,延时时间设置为本次活动周期的时间。当活动结束后,这条延时消息刚好可以被消费者进行消费,这样就可以消费消息然后执行一系列的逻辑处理。

消息延迟级别

延迟消息通过设置延迟级别来指定的。RocketMQ最多支持18个延迟级别,每个延迟级别对应的延迟时间可以通过配置messageDelayLevel自定义。该配置属于Broker,对所有Topic有效!默认的level值为0,代表非延迟消息,超过18按最大值18计算。
在这里插入图片描述

修改Broker配置文件broker.conf:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 3h

注意:需要重启Broker生效,且必须保持18个级别。

如何实现延迟消息

首先消息发送是可以设置一个延迟级别

Message msg = new Message("TestTopic", "TagA", "OrderID001", "Hello world".getBytes());
// 设置延时级别3(对应10秒延迟)
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);

具体Broker的流程如下:

  1. Producer 将消息投递到Broker的commitLog服务。
  2. commitLog服务判断消息是否为延迟消息,如果是,则将实际的topic和queueId保存到消息的属性中,并将topic设置成延迟topic(SCHEDULE_TOPIC_XXXX),queueId对应的延迟级别(queueId = level-1)和消息投递时间保存在tagCode中。
  3. 消息延迟服务(DelayMessageService)从SCHEDULE_TOPIC_XXXX主题循环拉取消息。
  4. DelayMessageService根据tagCode找到对应的延迟队列,并按照延迟级别进行排序。
  5. 当达到指定的延迟时间后,DelayMessageService会将消息重新推送到commitLog服务。
  6. commitLog服务将消息推到Producer 指定的目标 Topic 中。
  7. Consumer从 目标 Topic 中拉取消息。

在这里插入图片描述

事务消息

应用场景

RocketMQ事务消息主要解决分布式事务场景问题
分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。
在这里插入图片描述

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更用户积分,更新用户积分表。
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

事务消息实现原理

  1. 发送方将半事务消息发送至消息队列RocketMQ版服务端。
  2. 消息队列RocketMQ版服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。
    在这里插入图片描述

事务消息回查步骤如下:
当步骤1中半事务消息发送完成,但本地事务返回状态为TransactionStatus.Unknow,或者应用退出导致本地事务未提交任何状态时,从Broker的角度看,这条半事务消息的状态是未知的。因此Broker会定期向消息发送方即消息生产者集群中的任意一生产者实例发起消息回查,要求发送方回查该Half状态消息,并上报其最终状态。

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对消息发送方即生产者集群中任意一生产者实例发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

代码实现

生产者发送事务消息

  • 需创建事务类型的生产者TransactionMQProducer;
  • 需调用setTransactionListener()方法设置事务监听器;
  • 使用sendMessageInTransaction()以事务方式发送消息;
public class TransactionProducer {public static void main(String[] args) throws MQClientException, InterruptedException {// 创建事务类型的生产者TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-group");// 设置NameServer的地址producer.setNamesrvAddr("10.0.90.211:9876");// 设置事务监听器producer.setTransactionListener(new TransactionListenerImpl());// 启动生产者producer.start();// 发送10条消息for (int i = 0; i < 10; i++) {try {Message msg = new Message("TransactionTopic", "", ("Hello RocketMQ Transaction Message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置消息Keymsg.setKeys("Num" + i);// 使用事务方式发送消息SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.println("sendResult = " + sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}// 阻塞,目的是为了在消息发送完成后才关闭生产者Thread.sleep(10000);producer.shutdown();}
}

事务监听器

  • executeLocalTransaction:执行本地事务;
  • checkLocalTransaction:回查本地事务状态,根据这次回查的结果来决定此次事务是提交还是回滚;
/*** 事务监听器,重写执行本地事务方法以及事务回查方法*/
public class TransactionListenerImpl implements TransactionListener {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {String msgKey = msg.getKeys();switch (msgKey) {case "Num0":case "Num1":// 明确回复回滚操作,消息将会被删除,不允许被消费。return LocalTransactionState.ROLLBACK_MESSAGE;case "Num8":case "Num9":// 消息无响应,代表需要回查本地事务状态来决定是提交还是回滚事务return LocalTransactionState.UNKNOW;default:// 消息通过,允许消费者消费消息return LocalTransactionState.COMMIT_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("回查本地事务状态,消息Key: " + msg.getKeys() + ",消息内容: " + new String(msg.getBody()));// 需要根据业务,查询本地事务是否执行成功,这里直接返回COMMITreturn LocalTransactionState.COMMIT_MESSAGE;}}

RocketMQ事务消息实现订单系统、积分系统、物流系统

[图片]

RocketMQ 的事务消息,并不是“把消息发送给多个下游服务保证多方一致”。
它实际上是这样的:
✅ 只用于生产者本地事务 和 Broker 之间的“一致性”确认。
✅ 下游服务(如积分服务、物流服务)并不是 RocketMQ 事务消息里的一部分。它们是消息的“消费者”,只能消费提交成功的消息。
也就是说:

  • 你发出半消息后,RocketMQ 只等你确认“这个消息能不能提交”。
  • 下游服务 并不直接参与 RocketMQ 的事务。他们只是普通消费者,拿到已提交消息后去处理自己的业务逻辑。

传统分布式事务(2PC / XA)

✅ 原理
基于两阶段提交协议(2PC):
1️⃣ 第一阶段(Prepare):

  • 协调者让所有参与者(订单库、积分库、物流库)预提交(即锁定资源)。
  • 参与者答复是否准备好(资源锁定完成)。
    2️⃣ 第二阶段(Commit/Rollback):
  • 如果全部成功,协调者让所有参与者提交。
  • 如果有失败,协调者让所有参与者回滚。
    🔔 这里使用全局事务管理器(如 XA,或者分布式中间件 Seata),对所有数据库/资源的写操作进行全局锁定。
    暂时无法在飞书文档外展示此内容
http://www.xdnf.cn/news/13322.html

相关文章:

  • nsswitch.conf配置文件内容解析
  • 生产安全与设备管理如何分清界限?如何正确用设备管理系统?
  • 微机原理与接口技术,期末冲刺复习资料(五)
  • 3.1 数据链路层的功能
  • 商品中心—2.商品生命周期和状态的技术文档
  • HTML 、CSS 、JavaScript基本简单介绍
  • 大型活动交通拥堵治理的视觉算法应用
  • ceph集群调整pg数量实战(下)
  • 【如何用Python调用DeepSeek的API接口?】
  • JavaSec-RCE
  • Python爬虫实战:爬取知乎回答详情
  • WebRTC(二):工作机制
  • CARSIM-车速、油门、刹车练习
  • 【计网】作业7
  • 金属矫平机:塑造平整与精度的工业利器
  • 【机器视觉】单目测距——运动结构恢复
  • synchronized 学习
  • 计算机网络笔记(三十四)——5.6TCP可靠传输的实现
  • 【持续更新】linux网络编程试题
  • 优化篇 | 网络时延优化有哪些项
  • ARM 单片机定义变量绝对地址方法
  • umask命令详解
  • 如何在Debian中提高phpstorm的稳定性
  • PostgreSQL 安装与配置全指南(适用于 Windows、macOS 与主流 Linux 发行版)
  • <6>-MySQL表的增删查改
  • Fastapi的swagger文档响应超时无法访问的解决办法
  • mybatisplus.BaseMapper#selectPage 使用方法
  • 服务器内存能不能用在台式机上
  • 模板语法中的插值语法详解
  • innodb 数据页结构