【RocketMq延迟消息操作流程】
文章目录
- 延迟消息
- Producer 发延迟消息
- Broker 处理延迟消息
- broker具体的操作流程
- 源码分析:
延迟消息
RocketMQ 允许生产者发送一条延迟消息(延后 N 秒、N 分钟再被消费者消费)。
并不是通过业务线程 sleep,而是 RocketMQ Broker 内部,专门有一个延迟投递系统。
消息一开始不会立刻到达业务 Topic,而是先进入系统内部的 延迟Topic(名字叫 SCHEDULE_TOPIC_XXXX)。
Producer 发延迟消息
构建 Message。
设置 delayTimeLevel(延迟级别)。
调用 send() 发送。
延迟消息等级
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h这18个等级
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes());
msg.setDelayTimeLevel(3); // 设置延迟级别,比如3对应10秒
producer.send(msg);
Broker 处理延迟消息
broker具体的操作流程
Broker 看到有 delayTimeLevel > 0 的消息,
不直接写入目标 Topic 的真正队列,
而是写到特殊 Topic:SCHEDULE_TOPIC_XXXX。
比如 Level 3 的消息,写到 SCHEDULE_TOPIC_XXXX 主题的第三个队列里。
Broker 定时调度(ScheduleMessageService)
Broker内部有一个后台线程服务:ScheduleMessageService,周期性扫描:
每个 delayLevel 都有一个对应的 ConsumeQueue。
按每5秒、10秒频率扫描(可配置)。
流程:
定时器触发。
遍历 SCHEDULE_TOPIC_XXXX 下的各个队列。
取出消息,看它设置的投递时间 deliverTimestamp。
如果当前时间 >= deliverTimestamp,说明到期了。
Broker 重新构建一条新消息:
主题恢复为原始的 Topic。、
清除掉 delayTimeLevel。
写入到目标 Topic 的真正队列中(比如 TopicTest-Queue0)。
🔵 小细节:
deliverTimestamp 不是 delayTimeLevel,而是具体时间戳。
调度失败的(比如 Broker崩溃)下次启动时继续恢复扫描。
源码分析:
brokerController启动时,会执行start方法,然后也会Master的角色的broker启动一个定时任务
DefaultMessageStore# handleScheduleMessageService
public void handleScheduleMessageService(final BrokerRole brokerRole) {if (this.scheduleMessageService != null) {//判断当前的角色是从节点if (brokerRole == BrokerRole.SLAVE) {//则当前的定时任务则关闭this.scheduleMessageService.shutdown();} else {//如果是master节点,则正常启动this.scheduleMessageService.start();}}}
//指定定时任务
public void start() {if (started.compareAndSet(false, true)) {this.timer = new Timer("ScheduleMessageTimerThread", true);for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}if (timeDelay != null) {//开启定时轮训1s钟轮训一次this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {if (started.get()) ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}}
TimerTask() 中run方法
@Overridepublic void run() {try {if (isStarted()) {this.executeOnTimeup();}} catch (Exception e) {// XXX: warn and notify melog.error("ScheduleMessageService, executeOnTimeup exception", e);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);}}
this.executeOnTimeup();
当定时任务(TimerTask)触发时,去扫描 延迟消息队列(也就是延迟Topic:SCHEDULE_TOPIC_XXXX),
取出当前 offset 位置之后的消息,判断是否到达投递时间:
如果到时间了,把消息恢复成普通消息(写到正常Topic队列)
如果没到时间,则设置一个新的定时器,等到真正到时间再处理。
public void executeOnTimeup() {//todo 1. 根据延迟级别查找对应的 ConsumeQueue// 每个 delayLevel(延迟级别)对应一个 ConsumeQueue(逻辑队列)。delayLevel2QueueId() 是把延迟级别映射到一个固定 queueId。ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));long failScheduleOffset = offset;if (cq != null) {//todo 从 ConsumeQueue 中按 offset 取数据从 ConsumeQueue 中拿一段连续的索引数据(每条索引 20字节)。//如果拿不到,说明 offset 无效或者数据已被删除(最小offset问题)。SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ != null) {try {long nextOffset = offset;int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();//遍历 ConsumeQueue 中的条目for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//每一条 ConsumeQueue 条目的结构是:long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();//可能 tagsCode 记录的是一个扩展地址,需要到 ConsumeQueueExt 文件读取真正的投递时间戳。if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}//计算当前消息应投递的时间戳long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);//判断是否到达投递时间到时间了:立即将消息 "还原" 成普通消息并投递。//没到时间:新建 TimerTask,countdown 毫秒后再处理,并更新 offset 退出循环。long countdown = deliverTimestamp - now;if (countdown <= 0) {MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {//恢复普通消息并重新存储MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}//通过 messageTimeup 把延迟消息还原成普通消息(恢复原本的 topic、queueId)。//写入到正常的 commitLog。//如果写失败了,重新设置定时器并退出。PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {continue;} else {// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());//全部遍历完成,设置下一次TimerTaskScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="+ offsetPy + ",sizePy=" + sizePy, e);}}} else {//当前这条消息还未到达应投递时间;//所以不处理当前消息,只是 延迟 countdown 毫秒后再重新处理该条消息(也就是注册一个新的 TimerTask);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);//更新当前延迟队列的处理进度;//意味着 RocketMQ 会从 nextOffset 开始继续调度,不会反复扫描已过的旧 offset;//offset 其实是某个延迟等级 ConsumeQueue 的“消费进度”概念。ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} // end of for//它是 ScheduleMessageService#executeOnTimeup() 里 处理完当前 buffer 中所有消息之后的收尾逻辑,用于// 继续调度下一个周期的处理任务。这段逻辑出现在整个消息扫描 for 循环的最后。nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;} finally {bufferCQ.release();}} // end of if (bufferCQ != null)else {long cqMinOffset = cq.getMinOffsetInQueue();if (offset < cqMinOffset) {failScheduleOffset = cqMinOffset;log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="+ cqMinOffset + ", queueId=" + cq.getQueueId());}}} // end of if (cq != null)ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);}