当前位置: 首页 > java >正文

SpringBoot集成RabbitMQ使用过期时间+死信队列实现延迟队列

有的时候呢,我们需要使用到延迟队列,RabbitMQ不像RocketMQ一样默认就支持延迟队列,RabbitMQ是不支持延迟队列的,但是呢?我们可以通过正常的队列加上消息的过期时间,配置死信队列,来模拟实现延迟队列。

一、创建普通队列(配置过期时间),绑定死信队列

# 死信交换机配置 以直连交换机为列
my:exchangeNormalName: exchange.normal.a   #正常交换机queueNormalName: queue.normal.a         #正常队列exchangeDlxName: exchange.dlx.a         #死信交换机queueDlxName: queue.dlx.a               #死信队列

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 死信交换机*/
@Configuration
public class DeadLetterExchangeConfig {@Value("${my.exchangeNormalName}")private String exchangeNormalName;@Value("${my.queueNormalName}")private String queueNormalName;@Value("${my.exchangeDlxName}")private String exchangeDlxName;@Value("${my.queueDlxName}")private String queueDlxName;/*** 正常交换机* @return*/@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}/*** 正常队列* @return*/@Beanpublic Queue normalQueue(){Map<String, Object> arguments = new HashMap<>();//重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置死信路由key,要跟死信交换机和死信队列绑定的路由key一致arguments.put("x-dead-letter-routing-key","error");return new Queue(queueNormalName,true,false,false,arguments);}/*** 正常交换机和正常队列绑定* @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");}/*** 死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}/*** 死信队列* @return*/@Beanpublic Queue dlxQueue(){return new Queue(queueDlxName,true,false,false);}/*** 死信交换机和死信队列绑定* @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");}}

二、生产者推送消息

/*** 死信交换机-直连交换机 消息过期*/@Testpublic void sendDirectMessage2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "test message, hello!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);//给消息设置过期时间MessagePostProcessor messagePostProcessor = message -> {//20秒后过期message.getMessageProperties().setExpiration("20000");message.getMessageProperties().setContentEncoding("UTF-8");//持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//非持久化//message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);return message;};//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchangerabbitTemplate.convertAndSend("exchange.normal.a", "order", map, messagePostProcessor);}

三、死信队列消费者

消费者监听死信队列,上边我们创建的普通队列的消息过期时间是20秒,相当于我们向普通队列中推送消息之后,20秒过期则进入死信队列中,消费者监听死信队列,等待消息进入死信队列之后再进行消费处理。这样就模拟了一个延迟队列。


import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;/*** 死信队列消费  手动重新投递消息*/
@Component
public class DlxReceiver_1 {@Autowiredprivate RabbitTemplate rabbitTemplate;// 监听死信队列@RabbitListener(queues = "queue.dlx.a", ackMode = "MANUAL")public void handleDlqMessage(Map<String, Object> messageBody, Channel channel, Message message) throws IOException {try {/***TODO: 业务处理*/channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);System.out.println("消息消费成功: " + messageBody);} catch (Exception e) {channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);System.err.println("消息消费失败: " + e.getMessage());}}}

以上大概就是SpringBoot集成RabbitMQ实现延迟队列的全过程。

http://www.xdnf.cn/news/1727.html

相关文章:

  • Linux系统----进程的状态
  • [创业之路-384]:企业法务 - 初创公司,如何做好知识产品的风险防范?
  • 质检LIMS系统在金融咨询行业的应用 金融咨询行业的实验室数字化
  • Linux下编译opencv-4.10.0(静态链接库和动态链接库)
  • Leetcode 34. 在排序数组中查找元素的第一个和最后一个位置
  • 2025-04-24 Python深度学习4—— 计算图与动态图机制
  • 极狐GitLab 如何 cherry-pick 变更?
  • STM32移植最新版FATFS
  • Godot开发2D冒险游戏——第二节:主角光环整起来!
  • C# new Bitmap(32043, 32043, PixelFormat.Format32bppArgb)报错:参数无效,如何将图像分块化处理?
  • STM32F103_HAL库+寄存器学习笔记20 - CAN发送中断+ringbuffer + CAN空闲接收中断+接收所有CAN报文+ringbuffer
  • Python爬虫去重策略:增量爬取与历史数据比对
  • VulnHub-DC-2靶机渗透教程
  • zip是 Python 中 `zip` 函数的一个用法
  • 数模学习:一,层次分析法
  • flutter 小知识
  • 在Ubuntu 18.04 和 ROS Melodic 上编译 UFOMap
  • 跨浏览器音频录制:实现兼容的音频捕获与WAV格式生成
  • Spring Security认证流程
  • LabVIEW实现Voronoi图绘制功能
  • 【MQ篇】初识RabbitMQ保证消息可靠性
  • 信息系统项目管理工程师备考计算类真题讲解七
  • KMS工作原理及其安全性分析
  • Java Agent 注入 WebSocket 篇
  • java方法引用
  • kotlin和MVVM的结合使用总结(二)
  • 一种Spark程序运行指标的采集与任务诊断实现方式
  • CE第二次作业
  • NODE_OPTIONS=--openssl-legacy-provider vue-cli-service serve
  • Git 的基本概念和使用方式