当前位置: 首页 > news >正文

【RocketMq延迟消息操作流程】

文章目录

  • 延迟消息
  • Producer 发延迟消息
  • Broker 处理延迟消息
    • broker具体的操作流程
    • 源码分析:

延迟消息

RocketMQ 允许生产者发送一条延迟消息(延后 N 秒、N 分钟再被消费者消费)。
并不是通过业务线程 sleep,而是 RocketMQ Broker 内部,专门有一个延迟投递系统。
消息一开始不会立刻到达业务 Topic,而是先进入系统内部的 延迟Topic(名字叫 SCHEDULE_TOPIC_XXXX)。
在这里插入图片描述

Producer 发延迟消息

构建 Message。
设置 delayTimeLevel(延迟级别)。
调用 send() 发送。
延迟消息等级
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h这18个等级

Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes());
msg.setDelayTimeLevel(3); // 设置延迟级别,比如3对应10秒
producer.send(msg);

Broker 处理延迟消息

broker具体的操作流程

Broker 看到有 delayTimeLevel > 0 的消息,
不直接写入目标 Topic 的真正队列,
而是写到特殊 Topic:SCHEDULE_TOPIC_XXXX。
比如 Level 3 的消息,写到 SCHEDULE_TOPIC_XXXX 主题的第三个队列里。
Broker 定时调度(ScheduleMessageService)
Broker内部有一个后台线程服务:ScheduleMessageService,周期性扫描:
每个 delayLevel 都有一个对应的 ConsumeQueue。
按每5秒、10秒频率扫描(可配置)。
流程:
定时器触发。
遍历 SCHEDULE_TOPIC_XXXX 下的各个队列。
取出消息,看它设置的投递时间 deliverTimestamp。
如果当前时间 >= deliverTimestamp,说明到期了。
Broker 重新构建一条新消息:
主题恢复为原始的 Topic。、
清除掉 delayTimeLevel。
写入到目标 Topic 的真正队列中(比如 TopicTest-Queue0)。
🔵 小细节:
deliverTimestamp 不是 delayTimeLevel,而是具体时间戳。
调度失败的(比如 Broker崩溃)下次启动时继续恢复扫描。

源码分析:

brokerController启动时,会执行start方法,然后也会Master的角色的broker启动一个定时任务
DefaultMessageStore# handleScheduleMessageService

    public void handleScheduleMessageService(final BrokerRole brokerRole) {if (this.scheduleMessageService != null) {//判断当前的角色是从节点if (brokerRole == BrokerRole.SLAVE) {//则当前的定时任务则关闭this.scheduleMessageService.shutdown();} else {//如果是master节点,则正常启动this.scheduleMessageService.start();}}}

//指定定时任务

 public void start() {if (started.compareAndSet(false, true)) {this.timer = new Timer("ScheduleMessageTimerThread", true);for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}if (timeDelay != null) {//开启定时轮训1s钟轮训一次this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {if (started.get()) ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}}

TimerTask() 中run方法

   @Overridepublic void run() {try {if (isStarted()) {this.executeOnTimeup();}} catch (Exception e) {// XXX: warn and notify melog.error("ScheduleMessageService, executeOnTimeup exception", e);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);}}

this.executeOnTimeup();
当定时任务(TimerTask)触发时,去扫描 延迟消息队列(也就是延迟Topic:SCHEDULE_TOPIC_XXXX),
取出当前 offset 位置之后的消息,判断是否到达投递时间:
如果到时间了,把消息恢复成普通消息(写到正常Topic队列)
如果没到时间,则设置一个新的定时器,等到真正到时间再处理。

  public void executeOnTimeup() {//todo 1. 根据延迟级别查找对应的 ConsumeQueue// 每个 delayLevel(延迟级别)对应一个 ConsumeQueue(逻辑队列)。delayLevel2QueueId() 是把延迟级别映射到一个固定 queueId。ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));long failScheduleOffset = offset;if (cq != null) {//todo 从 ConsumeQueue 中按 offset 取数据从 ConsumeQueue 中拿一段连续的索引数据(每条索引 20字节)。//如果拿不到,说明 offset 无效或者数据已被删除(最小offset问题)。SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ != null) {try {long nextOffset = offset;int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();//遍历 ConsumeQueue 中的条目for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//每一条 ConsumeQueue 条目的结构是:long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();//可能 tagsCode 记录的是一个扩展地址,需要到 ConsumeQueueExt 文件读取真正的投递时间戳。if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}//计算当前消息应投递的时间戳long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);//判断是否到达投递时间到时间了:立即将消息 "还原" 成普通消息并投递。//没到时间:新建 TimerTask,countdown 毫秒后再处理,并更新 offset 退出循环。long countdown = deliverTimestamp - now;if (countdown <= 0) {MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {//恢复普通消息并重新存储MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}//通过 messageTimeup 把延迟消息还原成普通消息(恢复原本的 topic、queueId)。//写入到正常的 commitLog。//如果写失败了,重新设置定时器并退出。PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {continue;} else {// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());//全部遍历完成,设置下一次TimerTaskScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="+ offsetPy + ",sizePy=" + sizePy, e);}}} else {//当前这条消息还未到达应投递时间;//所以不处理当前消息,只是 延迟 countdown 毫秒后再重新处理该条消息(也就是注册一个新的 TimerTask);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);//更新当前延迟队列的处理进度;//意味着 RocketMQ 会从 nextOffset 开始继续调度,不会反复扫描已过的旧 offset;//offset 其实是某个延迟等级 ConsumeQueue 的“消费进度”概念。ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} // end of for//它是 ScheduleMessageService#executeOnTimeup() 里 处理完当前 buffer 中所有消息之后的收尾逻辑,用于// 继续调度下一个周期的处理任务。这段逻辑出现在整个消息扫描 for 循环的最后。nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;} finally {bufferCQ.release();}} // end of if (bufferCQ != null)else {long cqMinOffset = cq.getMinOffsetInQueue();if (offset < cqMinOffset) {failScheduleOffset = cqMinOffset;log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="+ cqMinOffset + ", queueId=" + cq.getQueueId());}}} // end of if (cq != null)ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);}
http://www.xdnf.cn/news/219349.html

相关文章:

  • 鸟笼效应——AI与思维模型【84】
  • Canvas基础篇:概述
  • DeepSeek 本地化部署与 WebUI 配置的方法
  • Fiddler抓取APP端,HTTPS报错全解析及解决方案(一篇解决常见问题)
  • 在Ubuntu中安装python
  • 02_高并发系统问题及解决方案
  • 大模型高效化三大核心技术:量化、蒸馏与剪枝详解
  • 【AI论文】BitNet v2:针对1位LLM的原生4位激活和哈达玛变换
  • 物流新速度:数字孪生让仓库“聪明”起来
  • 民锋视角下的价格波动管理思路
  • 健康养生:拥抱活力生活
  • 【AI提示词】机会成本决策分析师
  • 理解 EKS CloudWatch Pod CPU Utilization 指标:与 `kubectl top` 及节点 CPU 的关系
  • 企业架构之旅(3):TOGAF ADM架构愿景的核心价值
  • C#学习——类型、变量
  • SpringSecurity+JWT
  • linux安装部署配置docker环境
  • 基于STM32的虚线绘制函数改造
  • linux下创建c++项目的docker镜像和容器
  • try catch + throw
  • Python小程序:上班该做点摸鱼的事情
  • plm在车间管理中的重要作用
  • 4月29号
  • 浅谈工业RFID国产化替代趋势:技术自主化与产业升级是必然!
  • 定义接口的头文件和对应库文件之间的关系
  • 为什么要学习《金刚经》
  • 【linux】当nuc连接雷达之后,连接不上网络的解决方法
  • LangGraph简单使用
  • 制作一款打飞机游戏31:敌人数据库
  • 【MySQL】内置函数