当前位置: 首页 > news >正文

RabbitMQ--延时队列总结

一、延迟队列概念

        延迟队列(Delay Queue)是一种特殊类型的队列,队列中的元素需要在指定的时间点被取出和处理。简单来说,延时队列就是存放需要在某个特定时间被处理的消息。它的核心特性在于“延迟”——消息在队列中停留一段时间,直到满足设定的延迟时间才会被处理。

关键特性:
  • 延时队列中的消息会在指定时间点才被消费。

  • 适用于时间敏感的任务调度,如订单过期、任务超时等。

二、延迟队列的使用场景

延迟队列适用于以下场景:

  1. 订单支付超时自动取消

    • 例如,订单生成后 10 分钟未支付,自动取消订单。

  2. 店铺商品上传提醒

    • 新店铺如果 10 天内没有上传商品,系统自动发送提醒消息。

  3. 用户未登录短信提醒

    • 用户注册后,若 3 天内没有登录,发送短信提醒用户登录。

  4. 退款超时提醒

    • 用户发起退款请求后,如果 3 天内未处理,自动通知运营人员。

  5. 会议提醒

    • 预定会议后,提前 10 分钟通知与会人员。

这些场景的特点是:在某个事件发生后,或者在某个时间点之前,需要完成某项任务。比如,在订单生成事件发生 10 分钟后,检查订单支付状态,未支付则关闭订单。

为什么不使用定时任务?
  • 对于小规模的数据量,可以使用定时任务每秒轮询一次进行处理。

  • 但是当数据量非常庞大(如百万级别的订单检查)时,轮询的方式会给数据库和系统带来巨大压力,无法满足高效处理的需求。

  • 延时队列通过精准的延迟时间控制和异步处理,能够高效地解决这个问题。

三、 如何在 RabbitMQ 中实现延时队列?

我们有两种常用的方式来实现延时队列:

  1. 通过 TTL(消息过期时间或队列过期时间)和死信队列实现:我们可以给队列里的消息设置一个有效期(TTL),一旦消息过期,它就会被路由到一个死信队列,再由死信队列进行消费。

  2. 使用 x-delayed-message 插件:这个插件是官方提供的,它允许我们给消息指定一个延迟时间,在这个时间到期之前,消息不会被消费者消费。

四、通过 TTL(消息过期时间或队列过期时间)和死信队列实现样例

在讨论这个问题前先来了解几个知识点

知识点①:RabbitMQ 中的 TTL(Time-to-Live)

        TTL 是 RabbitMQ 中用来控制消息或队列存活时间的属性。TTL 的单位是毫秒,表示一条消息或队列中的消息在指定时间内没有被消费时,消息会过期,成为死信。

(1) TTL 的两种设置方式
  • 消息 TTL:可以在发送每条消息时指定 TTL。

    例如,发送消息时设置 TTL 为 10 秒:

    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000")  // 设置消息延迟时间为 10 秒.build();channel.basicPublish("exchange", "routing_key", properties, "message".getBytes());
    
  • 队列 TTL:在创建队列时设置该队列内所有消息的 TTL。队列的 TTL 会影响队列中所有消息的过期时间。

    例如,在队列声明时设置 x-message-ttl

    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 10000);  // 设置消息的 TTL 为 10 秒
    channel.queueDeclare("queue", true, false, false, args);
    
(2) TTL 的行为
  • 队列 TTL:如果设置了队列 TTL,则队列内所有消息的 TTL 会被统一管理。如果消息超时,它会被丢弃或者路由到死信队列。

  • 消息 TTL:如果设置了消息 TTL,那么每条消息的 TTL 都会单独管理。如果消息未能在 TTL 时间内消费,则会成为“死信”。

知识点②: 死信队列(Dead Letter Queue)

 这里死信队列以及TTL的讲解笔者可以去查看这篇博客:Rabbitmq中的死信队列-CSDN博客

当消息过期或被拒绝时,消息会被发送到死信队列。死信队列用于接收那些已经过期的消息或被拒绝的消息,这样消费者可以集中处理这些需要处理的消息。

(1) 如何利用死信队列实现延时队列?
  1. 设置队列的 TTL,使消息在到期后成为死信。

  2. 配置死信队列,使过期的消息进入死信队列。

  3. 消费者从死信队列消费,定期消费这些过期的消息。

方式一:RabbitMQ 延时队列的实现方式(给消息设置TTL和死信队列)

(1)配置文件类代码

@Component
public class MsgTtlQueueConfig {public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String QUEUE_C = "QC";//声明队列 C 死信交换机@Bean("queueC")public Queue queueB(){Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//没有声明 TTL 属性return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 B 绑定 X 交换机@Beanpublic Binding queuecBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}

(2)消息生产者代码

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
}

(3) 发送请求

  • http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
  • http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000       

发送一个 HTTP 请求,参数中包括消息内容和 TTL(过期时间)。

        http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000 表示发送消息"你好 2"并设置该消息的 TTL 为 2000 毫秒(即 2 秒)。2 秒内没有被消费者消费,该消息就会被 RabbitMQ 丢弃。

(4) 给消息设置TTL和死信队列的问题

        你当前的设计是为每条消息单独设置 TTL(通过         correlationData.getMessageProperties().setExpiration(ttlTime)),而不是为队列本身设置 TTL。这样做的目的是希望每条消息有不同的过期时间,从而实现不同的延时处理。

设计中可能存在的问题

        消息 TTL 是通过设置每条消息的 expiration 属性来控制每条消息的过期时间。每条消息可以有不同的 TTL,这样可以灵活地指定不同的消息延迟时间。    

    问题出在 RabbitMQ 的消息消费机制 上:RabbitMQ 是按照队列中的消息顺序来消费消息的,且它只会检查队列里的消息是否过期,而不是单独检查每条消息的 TTL。

  • 消费顺序问题

    假设队列中有两条消息:

    1. 第一条消息的 TTL 设置为 20 秒。

    2. 第二条消息的 TTL 设置为 2 秒。

                在这种情况下,RabbitMQ 会 按顺序检查队列中的消息,也就是说,它首先会检查第一条消息(TTL 20 秒),即使第二条消息的 TTL 很短(只有 2 秒)。如果第一条消息还没有过期,RabbitMQ 会先检查它,然后再检查第二条消息。结果就是,第二条消息可能会被延迟,即使它的 TTL 已经过期。

              也就是说即使第二条消息的 TTL 设置为 2 秒,然后此时第二条消息已经过期,它也会等待第一条消息被消费(进入死信队列后)后才会检查。这意味着 第二条消息在第一条消息未过期的情况下不会立刻进入死信队列,而是会等到第一条消息被消费,才会去检查是否过期。所以会被延迟

        

方式二:RabbitMQ 延时队列的实现方式(给队列设置TTL和死信队列)

RabbitMQ 的延时队列可以通过 TTL 配合死信队列实现,具体步骤如下:

(1) 设置队列的 TTL

在创建队列时,我们设置队列的 x-message-ttl 属性,控制消息的生存时间。例如:

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 10000);  // 设置队列消息的 TTL 为 10 秒
args.put("x-dead-letter-exchange", "dlx_exchange");  // 设置死信交换机
args.put("x-dead-letter-routing-key", "dlx_routing_key");  // 设置死信路由键
channel.queueDeclare("ttl_queue", true, false, false, args);
(2) 配置死信队列

设置死信交换机和死信路由键,当消息 TTL 到期后,它会进入死信队列。

channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "dlx_routing_key");
(3) 发送消息时设置 TTL

发送消息时,可以给消息设置 expiration 属性来控制消息的延迟时间。例如,10 秒后该消息将变为死信:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000")  // 设置消息的 TTL 为 10 秒.build();channel.basicPublish("exchange", "routing_key", properties, "message".getBytes());
(4) 消费死信队列

消费者从死信队列中获取消息进行处理:

channel.basicConsume("dlx_queue", true, (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received expired message: " + message);
}, consumerTag -> {});
(5)总结
  • 延迟队列 通过让消息在指定时间后再被消费,解决了定时任务和轮询检查的性能问题。

  • TTL死信队列 是实现 RabbitMQ 延时队列的关键技术,通过控制消息的存活时间和让过期消息进入死信队列,消费者可以按需处理这些消息。

  • 适用场景包括:订单超时、任务调度、消息提醒等。

  • 延时队列的核心需求是让消息在指定时间后被处理,而 RabbitMQ 中的 TTL(过期时间)正好能实现这一点。当消息的 TTL 到期后,它会变成死信并被投递到死信队列。这样,消费者只需要持续从死信队列消费消息即可,因为队列中的消息都是等待被及时处理的。这种方式实现了高效的延时处理,同时避免了轮询和重复检查。

(6) 给队列设置TTL和死信队列的问题
        如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10秒1个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,需要要增加无数个队列才能满足需求

五、使用 x-delayed-message 插件实现延时队列实现样例

5.1 安装插件

要启用延时队列,首先要安装 rabbitmq-delayed-message-exchange 插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
5.2 创建延时交换机

创建一个交换机时,指定它是一个 x-delayed-message 类型的交换机。通过这个交换机来处理延时消息:

Channel channel = connection.createChannel();
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");  // 设定延时交换机的类型,通常是 direct 类型channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args);
5.3 发送延时消息

发送消息时,我们需要指定延迟的时间。这个时间通过设置消息的 expiration 属性来实现,单位是毫秒:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000")  // 设置消息延迟时间为 10 秒.build();channel.basicPublish("delayed_exchange", "routing_key", properties, "Hello, delayed message".getBytes());
5.4 消费消息

消费者与普通消息的消费方式一样,消息会在延迟时间到期后被消费:

channel.basicConsume("delayed_queue", true, (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received message: " + message);
}, consumerTag -> {});
http://www.xdnf.cn/news/1487107.html

相关文章:

  • Linux 周期性用户作业计划:crontab
  • Python 2025:高性能计算与科学智能的新纪元
  • CEEMDAN-PSO-CNN-GRU 锂电池健康状态预测matlab
  • 华为IP(9)
  • Compose笔记(五十)--stickyHeader
  • 超越模仿,探寻智能的本源:从人类认知机制到下一代自然语言处理
  • MySQL 锁机制解析
  • 【高并发内存池】五、页缓存的设计
  • 【多模态学习】QA2:Tokenize和Embedding?BPE算法?交叉熵损失函数?
  • 算法:链表
  • 【开题答辩全过程】以 线上助农系统为例,包含答辩的问题和答案
  • 10 qml教程-自定义属性
  • 860章:使用Scrapy框架构建分布式爬虫
  • browser_use event_bus订阅机制详解
  • AUTOSAR进阶图解==>AUTOSAR_SWS_TimeSyncOverFlexRay
  • 轻松Linux-8.动静态库的制作及原理
  • SoundSource for Mac 音频控制工具
  • PyTorch Lightning(训练评估框架)
  • Python+DRVT 从外部调用 Revit:批量创建楼板
  • 基于SpringBoot+Vue的健身房管理系统的设计与实现(代码+数据库+LW)
  • 多环境配置切换机制能否让开发与生产无缝衔接?
  • 【论文阅读】自我进化的AI智能体综述
  • Unity学习----【进阶】Input System学习(一)--导入与基础的设备调用API
  • 《探索C++11:现代语法的内存管理优化“性能指针”(下篇)》
  • LeetCode 面试经典 150 题:移除元素(双指针思想优化解法详解)
  • RICOH理光 Priport DX4443c速印机 印A3的问题
  • 数据结构之二叉树(2)
  • 如何解决pip安装报错ModuleNotFoundError: No module named ‘setuptools’问题
  • 嵌入式学习---(ARM)
  • AutoHotkey将脚本编译为exe文件