当前位置: 首页 > ops >正文

RabbitMQ学习(第三天)

文章目录

  • 延迟消息概述
  • 1、死信交换机
  • 2、延迟消息插件

今天主要学习Rabbit延迟消息相关的知识。

延迟消息概述

生产者发送一个消息之后,消费者不会立即收到消息,而是在执行时间才会收到消息。

延迟消息主要为了实现延迟任务,应用场景一般是秒杀情境下,有一个倒计时判断用户是否已经付款,配合定时任务来检查订单状态。

1、死信交换机

首先来看看死信的定义:

当一个队列中的消息满足下列情况之一,就会成为死信 (dead letter):

  • 消费者使用basic.reject或basic.nack声明消息失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息 (达到了队列或消息本身设定的过期时间),超过无人消费
  • 容量超的队列被积满了,最早的消息可能会成为死信

死信交换机,简单来说就是存放死信的交换机,通过dead-letter-exchange参数指定。

而我们通过这个方式,可以模拟出来延迟消息的效果:

在这里插入图片描述
图中,我们首先设定了消息过期时间为30s,如果过期了,就将消息放到dix.direct交换机,即死信交换机,随后将消息投给Consumer,这样就实现了延迟消息的效果,30s后才让消费者收到消息。

在这里插入图片描述
控制台点击这个即可设置死信队列,参数后面跟的是死信交换机的名称,比如dlx.direct

我们就照着图中实例来定义交换机和队列,演示一下.

消息接收者:

    @RabbitListener(queues = "dlx.queue")public void DLXQueueConsumer(String msg) {log.info("dlx.queue消费了消息[ " + msg + " ]");throw new RuntimeException("故意的");}

注意不要有监听simple.queue的监听器,不然的话会导致消息被消费,从而无法进行测试。

测试代码:

    @Testpublic void testTTLMessage() {Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setExpiration("10000").build();rabbitTemplate.convertAndSend("simple.direct","hi", message);log.info("消息发送成功");}

我们这里测试消息就设置成10秒后过期,具体时间可以自己调整。
发送方运行结果:

05-11 11:55:34:549  INFO 27744 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.138.133:5672]
05-11 11:55:34:593  INFO 27744 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#24361cfc:0/SimpleConnection@55e42449 [delegate=amqp://root@192.168.138.133:5672/, localPort= 61846]
05-11 11:55:34:608  INFO 27744 --- [           main] com.rabbitmq.publisher.publisher         : 消息发送成功

消费方运行结果:

05-11 11:55:44:619  INFO 22540 --- [ntContainer#0-1] com.rabbitmq.consumer.listener.Consumer  : dlx.queue消费了消息[ hello ]

从运行结果的时间来看,可以发现间隔正好差了10秒,实现了延迟消息。

2、延迟消息插件

RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息的交换机,当消息投递到该交换机后可以选择延迟一定时间,过期后再次投递到队列。

下面是插件安装流程:

在linux环境输入:

docker inspect mq

下图中的就是挂载目录:
在这里插入图片描述

插件下载流程:

①、首先输入网址:
https://www.rabbitmq.com/community-plugins.html

②、界面中点击对应插件的Release:
在这里插入图片描述

③、点击下面下载即可,选择想要的版本,这里下的是3.8.17
在这里插入图片描述

④、将插件放到指定插件目录下

cd进入该目录:

cd /var/lib/docker/volumes/mq-plugins/_data

将下载好的插件放入该目录

⑤、接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

⑥、重启镜像

docker restart mq

即可。

java代码中编写延迟消息接收案例:

消费者方代码:

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "delay.queue", durable = "true"),exchange = @Exchange(value = "delay.direct", delayed = "true"),key = "hello"))public void listenDelayQueue(String msg) {log.info("接收到delay.queue的消息: [ " + msg + " ]");}

发送者方代码:

@Testpublic void testDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "hello", "message", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10000);return message;}});log.info("消息发送成功");}

运行结果:

发送方:
05-11 13:21:42:518  INFO 32628 --- [           main] com.rabbitmq.publisher.publisher         : 消息发送成功接收方:
05-11 13:21:52:531  INFO 23616 --- [ntContainer#0-1] com.rabbitmq.consumer.listener.Consumer  : 接收到delay.queue的消息: [ "message" ]

可以看到发收消息间隔正好10秒。

http://www.xdnf.cn/news/5305.html

相关文章:

  • Spark任务调度流程详解
  • Java大师成长计划之第18天:Java Memory Model与Volatile关键字
  • 游戏引擎学习第273天:动画预览
  • BGP联盟
  • MNIST DDP 分布式数据并行
  • 「OC」源码学习—— 消息发送、动态方法解析和消息转发
  • Docker拉取ubuntu22.04镜像使用ROS2 humble及仿真工具可视化进行导航
  • 【大模型面试每日一题】Day 15:流水线并行的Bubble问题及其缓解方法
  • Apache Flink 与 Flink CDC:概念、联系、区别及版本演进解析
  • 花朵识别系统Python+深度学习+卷积神经网络算法+TensorFlow+人工智能
  • Newton GPU 机器人仿真器入门教程(零)— NVIDIA、DeepMind、Disney 联合推出
  • 【目标检测系列】YOLOV1解读
  • openjdk底层汇编指令调用(一)——汇编指令及指令编码基础
  • 通过 Azure DevOps 探索 Helm 和 Azure AKS
  • Spark 中RDD、Job,stage,task的关系
  • ActiveMQ 生产环境问题排查与调优指南(一)
  • 编程日志5.3
  • 智能语音助手的未来:从交互到融合
  • 实战项目3(04)
  • 画立方体软件开发笔记 js-pytorch xlsx 导出 excel pnpm安装
  • uni-app学习笔记(二)--vue页面代码的构成和新建页面
  • Pandas学习笔记(四)
  • 嵌入式硬件篇---UART
  • 外网访问内网海康威视监控视频的方案:WebRTC + Coturn 搭建
  • Python OpenCV性能优化与部署实战指南
  • python 实现文件批量重命名
  • “frame stacking”---帮助强化学习稳定提升和收敛技巧
  • Nipype 简单使用教程
  • 5 从众效应
  • Spring Boot 集成 Flink CDC 实现 MySQL 到 Kafka 实时同步