当前位置: 首页 > news >正文

RabbitMQ 高级特性之延迟队列

1. 简介

在某些场景下,当生产者发送消息后,可能不需要让消费者立即接收到,而是让消息延迟一段时间后再发送给消费者。

2. 实现方式

2.1 TTL + 死信队列

给消息设置过期时间后,若消息在这段时间内没有被消费,就会将消息发送到死信队列中,我们可以利用这一特性,将需要延迟发送的消息设置过期时间,然后再让消费者从死信队列中获取消息,这样就实现了消息的延迟发送。

队列与交换机配置如下:

@Configuration
public class DLConfig {/*** 正常队列、交换机* @return*/@Bean("norQueue")public Queue norQueue() {return QueueBuilder.durable(Constants.NOR_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE) //绑定死信交换机.deadLetterRoutingKey(Constants.DL_ROUTINGKEY).build();}@Bean("norExchange")public DirectExchange norExchange() {return ExchangeBuilder.directExchange(Constants.NOR_EXCHANGE).build();}@Bean("norBind")public Binding norBind(@Qualifier("norExchange") DirectExchange directExchange,@Qualifier("norQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.NOR_ROUTINGKEY);}/*** 死信队列、交换机*/@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBind")public Binding dlBind(@Qualifier("dlExchange") DirectExchange directExchange,@Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.DL_ROUTINGKEY);}
}

生产者代码如下:

    @RequestMapping("/dl1")public String dl1() {String messageInfo = "dl... " + new Date();MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000"); //10s 后过期return message;}};rabbitTemplate.convertAndSend(Constants.NOR_EXCHANGE, Constants.NOR_ROUTINGKEY, messageInfo, messagePostProcessor);return "消息发送成功";}

消费者代码如下:

@Component
@Slf4j
public class DLListener {/*** ttl + 死信队列 -> 延时队列* @param message*/@RabbitListener(queues = Constants.DL_QUEUE)public void listener(Message message) {String messageInfo = new String(message.getBody());log.info("接收到消息: {}, time: {}", messageInfo, new Date());}
}

由于消息发送到了死信队列,于是我们只需要从死信队列中获取消息即可。

代码运行结果如下:

从运行结果中可以看出,消息延迟了 10s 才被消费。

这种实现方式的问题:

但是,当我们连续发送两条消息,第一条消息的过期时间为 15s,第二条消息的过期时间为 10s,代码运行结果如下:

这里我们看到,虽然第二条消息先过期,但却和第一条消息一起被消费,按照正常情况下第二条消息应该率先被消费,于是这种实现方式存在一定的问题。

2.2 使用插件 

2.2.1 安装插件

虽然 RabbitMQ 没有提供延迟队列的使用方式,但是提供了延迟队列的插件,我们可以安装插件并使用。

插件安装地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

需要下载 .ez 的插件。

需要根据本机的 RabbitMQ 版本选择匹配的插件版本,不然无法使用。

插件下载完成后,需要将插件放到 /usr/lib/rabbitmq/plugins 目录下,若没有需要进行创建。

安装完成后,使用下面这行命令启动插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动完成后,需要重启 RabbitMQ 服务,这样插件就能正常运行。

2.2.2 使用插件

插件安装完成后,交换机的类型就会多出下面一种:

即延迟队列,于是我们在声明交换机是,就能够声明这个类型的交换机。

队列与交换机配置如下:

@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}/*** 延迟交换机* @return*/@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayBind")public Binding delayBind(@Qualifier("delayExchange") DirectExchange directExchange,@Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.DELAY_ROUTINGKEY);}
}

 在声明交换机时,使用了 delayed 来声明该队列是延迟队列。

生产者代码如下:

    @RequestMapping("/delay")public String delay() {String messageInfo = "delay ...";rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, Constants.DELAY_ROUTINGKEY, messageInfo + "25000ms", message -> {message.getMessageProperties().setDelayLong(20000L); //过期时间,单位为 msreturn message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, Constants.DELAY_ROUTINGKEY, messageInfo + "10000ms", message -> {message.getMessageProperties().setDelayLong(10000L); //过期时间,单位为 msreturn message;});return "消息发送成功";}

消费者代码如下:

@Component
@Slf4j
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void listener(Message message) {log.info("接收到消息: {}, time: {}", new String(message.getBody()), new Date());}
}

运行结果如下:

从结果中可以看出,虽然第二条消息的过期时间是后入队列的,但是却会先被消费,这就解决了 TTL + 死信队列实现方式的不足。

http://www.xdnf.cn/news/1090873.html

相关文章:

  • SQL Server通过存储过程实现HTML页面生成
  • mac m1安装大模型工具vllm
  • 迁移Oracle SH 示例 schema 到 PostgreSQL
  • 双指针-15.三数之和-力扣(LeetCode)
  • 算法核心知识复习:排序算法对比 + 递归与递推深度解析(根据GESP四级题目总结)
  • Oracle 数据库升级踩坑:DBLink ORA-02019 问题解决思路
  • 使用 Docker 搭建 Rust Web 应用开发环境——AI教你学Docker
  • 工程改Mvvm
  • 一天一道Sql题(day04)
  • 基于lottie的微信小程序动画开发指南
  • CSS中的Element语法
  • 仓颉语言 1.0.0 升级指南:工具链适配、collection 操作重构与 Map 遍历删除避坑
  • ali linux 安装libreoffice
  • 《重构项目》基于Apollo架构设计的项目重构方案(多种地图、多阶段、多任务、状态机管理)
  • Context Engineering:从Prompt Engineering到上下文工程的演进
  • Ragas的Prompt Object
  • 微软 Bluetooth LE Explorer 实用工具的详细使用分析
  • JVM字节码加载与存储中的细节
  • 川翔云电脑:突破硬件极限,重构设计生产力范式
  • 【vim中替换】
  • 【自动驾驶】经典LSS算法解析——深度估计
  • BEV感知算法:自动驾驶的“上帝视角“革命
  • django 一个表中包括id和parentid,如何通过parentid找到全部父爷id
  • 免费扫描软件NAPS2:跨平台支持 旋转裁剪 + 多页合并,纸质文档变 PDF / 图片
  • 详解Kafka重平衡机制详解
  • Python(30)基于itertools生成器的量子计算模拟技术深度解析
  • 18-C#改变形参内容
  • 《设计模式之禅》笔记摘录 - 5.代理模式
  • AI应用实践:制作一个支持超长计算公式的计算器,计算内容只包含加减乘除算法,保存在一个HTML文件中
  • 设计模式(行为型)-责任链模式