基于 RabbitMQ 死信队列+TTL 实现延迟消息+延迟插件基本使用
在许多业务场景中,我们都需要处理延迟任务。例如:
- 用户下单后,30分钟内未支付则自动取消订单。
- 用户注册成功后,5分钟后发送一份引导邮件。
- 创建一个定时任务,在未来某个指定时间点执行。
这些场景的核心需求都是:在未来的某个时间点,触发一个特定的动作。RabbitMQ 本身没有直接提供延迟队列的功能,但我们可以巧妙地利用其两个核心特性——**消息存活时间(TTL)和死信交换机(Dead-Letter-Exchange)**来组合实现一个强大而可靠的延迟消息系统。
本文将通过一个完整的 Spring Boot + RabbitMQ 示例,深入剖析如何使用 DLX + TTL 实现延迟消息,分析其工作原理、优缺点,并介绍通过插件来解决的更成熟的解决方案。
核心概念
在深入代码之前,我们必须先理解两个关键概念。
1. 消息存活时间 (Time-To-Live, TTL)
TTL 用于设置消息在队列中的最大存活时间,单位为毫秒。当一条消息在一个队列中的存留时间超过了其 TTL 值,它就会“过期”。
TTL 有两种设置方式:
- 对整个队列设置 TTL:通过在
queue.declare
时添加x-message-ttl
参数。该队列中的所有消息都将拥有相同的存活时间。 - 对单条消息设置 TTL:在发送消息时,通过设置消息属性(
expiration
)来指定。这样可以为每条消息赋予不同的存活时间。
当一条消息过期后,它会变成“死信”(Dead Letter)。
2. 死信交换机 (Dead-Letter-Exchange, DLX)
死信交换机本质上也是一个普通的交换机。当一个队列中的消息满足以下任一条件时,它就会变成“死信”,并被 RabbitMQ 自动重新发布到该队列预先配置好的一个“死信交换机”上。
- 消息 TTL 过期(我们本次实现的核心)。
- 消息被消费者拒绝(
basic.reject
或basic.nack
),并且requeue
参数被设置为false
。 - 队列达到最大长度(
x-max-length
)或最大容量(x-max-length-bytes
),导致最早的消息被丢弃。
实现原理:DLX + TTL 组合拳
我们的延迟队列实现思路正是利用了上述两个特性:
- 创建一个普通的业务队列(我们称之为
normal.queue
),不设置任何消费者。 - 为这个
normal.queue
配置一个死信交换机(dead.letter.exchange
)。 - 当生产者发送一条消息时,我们为其设置一个 TTL(例如10秒),并将其发送到与
normal.queue
绑定的业务交换机(normal.exchange
)。 - 由于
normal.queue
没有消费者,消息会在队列中静静地等待。 - 10秒后,消息的 TTL 到期,它变成了“死信”。
- RabbitMQ 自动将这条死信消息从
normal.queue
中移除,并将其路由到预设的dead.letter.exchange
。 dead.letter.exchange
再根据其路由规则,将消息投递到最终的“死信队列”(dead.letter.queue
)。- 我们的消费者只监听这个死信队列。一旦收到消息,就意味着延迟时间已到,可以开始处理业务。
配置Exhange/Queue
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class DelayedConfig { // 1. 声明普通的业务交换机和队列 public static final String NORMAL_EXCHANGE = "normal.exchange"; public static final String NORMAL_QUEUE = "normal.queue"; public static final String NORMAL_ROUTING_KEY = "normal.key"; // 2. 声明死信交换机和死信队列 public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange"; public static final String DEAD_LETTER_QUEUE = "dead.letter.queue"; public static final String DEAD_LETTER_ROUTING_KEY = "dead.key"; @Bean public DirectExchange normalExchange() { return new DirectExchange(NORMAL_EXCHANGE); } // 这就是出错的队列声明 @Bean public Queue normalQueue() { return QueueBuilder.durable(NORMAL_QUEUE) .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE) .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY) .build(); } @Bean public Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY); } // 3. 确保死信交换机和死信队列也作为 Bean 被声明 @Bean public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE); } @Bean public Queue deadLetterQueue() { return new Queue(DEAD_LETTER_QUEUE); } @Bean public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) { // 死信队列绑定到死信交换机,使用普通队列指定的 dead-letter-routing-keyreturn BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY); }
}
配置解读:
normalQueue()
是配置的核心。我们通过.withArgument()
方法为normal.queue
设置了两个重要参数:x-dead-letter-exchange
:指定了当队列里的消息变成死信后,应该被发往哪个交换机。x-dead-letter-routing-key
:指定了死信消息被发送到死信交换机时,使用哪个路由键。这允许我们更灵活地控制死信消息的流向。
生产者
发送带TTL的消息
生产者将消息发送到业务交换机normal.exchange
,并为每条消息动态设置expiration
属性。
@RestController
@RequestMapping
public class DelayController { private final RabbitTemplate rabbitTemplate; public DelayController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @RequestMapping("/delay") public String delay(){ //发送带ttl的消息 System.out.println("发送延迟消息, 当前时间: " + new Date());rabbitTemplate.convertAndSend("normal.exchange", "normal.key", "delay test with ttl 10s..."+new Date(),message -> { message.getMessageProperties().setExpiration("10000"); return message; }); rabbitTemplate.convertAndSend("normal.exchange", "normal.key", "delay test with ttl 20s..."+new Date(), message -> { message.getMessageProperties().setExpiration("20000"); return message; }); return "success"; }
}
消费者
监听死信队列
消费者只关心最终的业务处理,所以它监听的是dead.letter.queue
。
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import java.util.Date;
@Component
public class DelayConsumer { @RabbitListener(queues = "dead.letter.queue") public void ListenerDLXQueue(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n", new Date(), new String(message.getBody(),"UTF-8"), deliveryTag); }
}
结果分析与问题洞察
当我们访问 http://127.0.0.1:8080/delay
后,观察控制台输出如下:
发送延迟消息, 当前时间: 周日 8月 10 17:28:13 CST 2025周日 8月 10 17:28:23 CST 2025 死信队列接收到消息: delay test with ttl 10s...Sun Aug 10 17:28:11 CST 2025, deliveryTag: 1
周日 8月 10 17:28:34 CST 2025 死信队列接收到消息: delay test with ttl 20s...Sun Aug 10 17:28:13 CST 2025, deliveryTag: 2
但是如果将投递顺序调换
@RequestMapping("/delay")
public String delay(){ //发送带ttl的消息 System.out.println("发送延迟消息, 当前时间: " + new Date()); rabbitTemplate.convertAndSend("normal.exchange", "normal.key", "delay test with ttl 20s..."+new Date(),message -> { message.getMessageProperties().setExpiration("20000"); return message; }); rabbitTemplate.convertAndSend("normal.exchange", "normal.key", "delay test with ttl 10s..."+new Date(), message -> { message.getMessageProperties().setExpiration("10000"); return message; }); return "success";
}
再次进行请求,你会看到延迟20s的消息和延迟10s的消息是同时被处理的
发送延迟消息, 当前时间: Sun Aug 10 22:27:15 CST 2025
周日 8月 10 22:27:36 CST 2025 死信队列接收到消息: delay test with ttl 20s...Sun Aug 10 22:27:15 CST 2025, deliveryTag: 1
周日 8月 10 22:27:36 CST 2025 死信队列接收到消息: delay test with ttl 10s...Sun Aug 10 22:27:15 CST 2025, deliveryTag: 2
这是为什么呢?
这是 DLX+TTL 方案最核心的一个“陷阱”:RabbitMQ 只会检查队列头部的消息是否过期。如果队头的消息没有过期,那么后面的消息就算已经过期了,也无法被投递到死信交换机。
在我们的例子中:
20s TTL
的消息先入队,位于队头。10s TTL
的消息后入队,位于其后。- RabbitMQ 盯着队头的
20s TTL
消息。20秒后,该消息过期,被投递到死信队列。 - 此时,
10s TTL
的消息才成为新的队头。RabbitMQ 开始检查它,发现它的 TTL 已经结束了,直接进行投递
这就导致了延迟的“串行”执行,延迟时间有可能会被延后。
改进方案:使用延迟消息插件
为了解决上述的队列头部阻塞问题,并实现更精确、更灵活的延迟控制,RabbitMQ 官方提供了一个非常强大的插件:rabbitmq-delayed-message-exchange
。
插件原理
该插件提供了一种新的交换机类型:x-delayed-message
。这种交换机在接收到消息后,并不会立即投递到队列,而是会根据消息头中的 x-delay
属性(单位毫秒)来等待相应的时间,然后再进行投递。这个过程是在交换机内部完成的,不依赖于队列,因此不会产生队头阻塞问题。
插件代码实践
假设您已经在 RabbitMQ 服务器上启用了 rabbitmq-delayed-message-exchange
插件。
1. 新增插件配置
配置变得异常简单,不再需要死信队列和业务队列的区分。
// 新增一个配置类用于演示插件用法
@Configuration
public class DelayedPluginConfig {public static final String DELAYED_EXCHANGE = "delayed.plugin.exchange";public static final String DELAYED_QUEUE = "delayed.plugin.queue";public static final String DELAYED_ROUTING_KEY = "delayed.plugin.key";@Beanpublic CustomExchange delayedExchange() {// 声明一个 x-delayed-message 类型的交换机// durable: 持久化// autoDelete: 不自动删除return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false,Map.of("x-delayed-type", "direct")); // 指定基础交换机类型}@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE);}@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}
2. 新增插件生产者
发送消息时,我们不再设置 expiration
,而是添加一个 x-delay
的 header。
// 在 DelayController 中新增一个方法
@RestController
public class DelayController {// ... 原有代码 ...@RequestMapping("/delay-plugin")public String delayPlugin() {System.out.println("发送延迟消息 (Plugin), 当前时间: " + new Date());// 发送延迟20秒的消息rabbitTemplate.convertAndSend(DelayedPluginConfig.DELAYED_EXCHANGE, DelayedPluginConfig.DELAYED_ROUTING_KEY,"delay test with plugin 20s..." + new Date(), message -> {message.getMessageProperties().setHeader("x-delay", 20000); // 20秒return message;});// 发送延迟10秒的消息rabbitTemplate.convertAndSend(DelayedPluginConfig.DELAYED_EXCHANGE, DelayedPluginConfig.DELAYED_ROUTING_KEY,"delay test with plugin 10s..." + new Date(), message -> {message.getMessageProperties().setHeader("x-delay", 10000); // 10秒return message;});return "success (plugin)";}
}
3. 新增插件消费者
消费者直接监听最终的业务队列即可。
// 新增一个消费者类
@Component
public class DelayedPluginConsumer {@RabbitListener(queues = DelayedPluginConfig.DELAYED_QUEUE)public void listenDelayedQueue(Message message) throws Exception {System.out.printf("插件延迟队列 %tc 接收到消息: %s%n", new Date(), new String(message.getBody(),"UTF-8"));}
}
现在访问 http://127.0.0.1:8080/delay-plugin
,你会发现,即使是后发送的10秒延迟消息,也会比先发送的20秒延迟消息先被消费,完美解决了队头阻塞问题。
结论与选型建议
本文我们详细探讨了两种实现 RabbitMQ 延迟消息的方法:
特性 | DLX + TTL 方案 | 延迟消息插件方案 |
---|---|---|
实现方式 | 依赖队列TTL和死信交换机 | 依赖特定类型的交换机 (x-delayed-message ) |
配置复杂度 | 较高,需要配置两套Exchange和Queue | 较低,一套Exchange和Queue即可 |
延迟精确性 | 受队头消息影响,不精确 | 精确,消息间延迟互不影响 |
依赖 | RabbitMQ原生功能,无需额外插件 | 需要在服务端安装并启用 rabbitmq-delayed-message-exchange 插件 |
适用场景 | 业务场景简单,队列中消息TTL固定,或能容忍延迟误差 | 对延迟时间精确性要求高的场景 |
总结建议:
- 首选延迟消息插件:对于绝大部分需要延迟消息的场景,延迟插件提供了更简单、更精确、更符合直觉的解决方案。它是目前实现延迟消息的最佳实践。
- 了解 DLX+TTL:虽然插件方案更优,但理解 DLX+TTL 的工作原理非常有价值。它不仅能让你实现延迟队列,更能加深你对 RabbitMQ 核心机制的理解,这对于问题排查和设计更复杂的系统大有裨益。