当前位置: 首页 > ds >正文

RabbitMQ延时队列的两种实现方式

目录

一、延时插件实现

1、版本要求

2、为运行新容器时安装

3、为已运行的容器安装

4、验证安装

5、代码编写

1. 配置类

2. 生产者

3. 消费者

二、死信队列实现

1、代码编写

1. 配置类

2. 生产者

3. 消费者

三、踩坑记录

1、发送消息失败

2、消息过期后未能转发到死信队列

3、消费者消费报错


一、延时插件实现

1、版本要求

RabbitMQ 3.5.7以上

2、为运行新容器时安装

# 1. 拉取带管理界面的镜像
docker pull rabbitmq:3.11-management
​
# 2. 启动容器并启用插件
docker run -d \--name rabbitmq \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=password \rabbitmq:3.11-management \bash -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange && rabbitmq-server"

3、为已运行的容器安装

# 1. 进入正在运行的容器
docker exec -it rabbitmq /bin/bash
​
# 2. 在容器内执行插件安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
​
# 3. 退出容器
exit
​
# 4. 重启容器使插件生效
docker restart rabbitmq

4、验证安装

# 方法1:检查插件列表
docker exec rabbitmq rabbitmq-plugins list | grep delayed
​
# 方法2:登录管理界面
# 访问 http://localhost:15672 (使用设置的账号密码登录)
# 在 "Exchanges" 标签页创建交换机时,Type 下拉框会出现 "x-delayed-message" 选项

5、代码编写

1. 配置类

@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 交换机类型return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", // 固定类型true,false,args);}
​@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE, true);}
​@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}

2. 生产者

public void send(String exchange, String routing_key,Object data, Integer delayMillis) {// 消息后处理器:设置延时和持久化MessagePostProcessor processor = message -> {// 毫秒message.getMessageProperties().setDelay(delayMillis);// 持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;};
​rabbitTemplate.convertAndSend(exchange, routingKey, data, processor);
}

3. 消费者

@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
​@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
​
}

二、死信队列实现

1、代码编写

1. 配置类

@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
​public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_ROUTING_KEY = "normal_routing_key";// 死信队列(延时队列)@Beanpublic Queue delayedQueue() {return QueueBuilder.durable(DELAYED_QUEUE).build();}
​// 死信交换机@Beanpublic DirectExchange delayedExchange() {return new DirectExchange(DELAYED_EXCHANGE);}
​// 绑定死信队列到死信交换机@Beanpublic Binding delayedBinding(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY);}
​// 普通队列@Beanpublic Queue normalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE).deadLetterRoutingKey(DELAYED_ROUTING_KEY).build();}
​// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}
​// 绑定普通队列到普通交换机@Beanpublic Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);}}

2. 生产者

public void send(String exchange, String routing_key, Object data, Integer delayMillis) {String uuid = IdUtil.simpleUUID();// 消息入库略,uuid为主键MessageProperties properties = new MessageProperties();// 设置TTL,单位毫秒properties.setExpiration(String.valueOf(delayMillis));// 消息持久化(2 表示持久化)properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
​Message msg = rabbitTemplate.getMessageConverter().toMessage(data, properties);rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));
}

3. 消费者

@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
​@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
​
}

三、踩坑记录

1、发送消息失败

原因RabbitTemplate 配置了消息抵达确认,消息ID没有传值。

RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 消息抵达确认通知
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {String msgId = data.getId();if (ack) {log.info("消息抵达队列成功:{}", data);} else {log.error("消息未能发送成功,消息ID:{}", data.getId(), cause);}
});

生产者实际发送消息未传消息ID:

错误格式

rabbitTemplate.convertAndSend(exchange, routingKey, data);

正确格式

String uuid = IdUtil.simpleUUID();
rabbitTemplate.convertAndSend(exchange, routingKey, data, new CorrelationData(uuid));

2、消息过期后未能转发到死信队列

原因:正常消息未绑定死信队列,消息过期自动删除,而不会转发到死信队列中。

错误格式

@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).build();
}

正确格式

@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE) // 指定死信交换机.deadLetterRoutingKey(DELAYED_ROUTING_KEY) // 指定死信路由键.build();
}

3、消费者消费报错

原因:发送的消息由于自定义的 MessageProperties ,其中缺失了 contentType 参数,需要使用转化器进行转换,而不是直接发送消息。

错误格式

MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
​
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData(uuid));

正确格式

MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
​
Message msg = rabbitTemplate.getMessageConverter().toMessage(message, properties);
rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));

http://www.xdnf.cn/news/17261.html

相关文章:

  • NLP自然语言处理 03 Transformer架构
  • 数据集相关类代码回顾理解 | sns.distplot\%matplotlib inline\sns.scatterplot
  • 翻译的本质:人工翻译vs机器翻译的核心差异与互补性
  • 自然语言处理×第三卷:文本数据分析——她不再只是贴着你听,而开始学会分析你语言的结构
  • 最长连续序列(每天刷力扣hot100系列)
  • FANCU发那科机器人双脉冲焊接省气
  • 【STM32】HAL库中的实现(三):PWM(脉冲宽度调制)
  • 信用机制的发展与货币演进
  • 机器学习算法系列专栏:决策树算法(初学者)
  • golang的切片
  • 电子秤利用Websocket做为Client向MES系统推送数据
  • python的教务管理系统
  • 利用链上数据进行数字资产量化因子发现
  • 【Day 16】Linux-性能查看
  • Linux内核C语言代码规范
  • LangGraph学习笔记 — LangGraph中State状态模式
  • 数据安全治理——解读数据安全治理与评估服务业务介绍【附全文阅读】
  • oelove奥壹新版v11.7旗舰版婚恋系统微信原生小程序源码上架容易遇到的几个坑,避免遗漏参数白屏显示等问题
  • 相机拍摄的DNG格式照片日期如何修改?你可以用这款工具修改
  • vue3 子组件和子组件的通讯 mitt
  • 分布式选举算法:Bully、Raft、ZAB
  • 私有云盘新体验:FileRise在cpolar的加持下如何让数据管理更自由?
  • golang实现支持100万个并发连接(例如,HTTP长连接或WebSocket连接)系统架构设计详解
  • 第13届蓝桥杯Scratch_选拔赛_真题2021年11月27日
  • Guava 与 Caffeine 本地缓存系统详解
  • 2048小游戏
  • 【java】大数据insert的几种技术方案和优缺点
  • (ZipList入门笔记一)ZipList的节点介绍
  • Windows 电脑远程访问,ZeroTier 实现内网穿透完整指南(含原理讲解)
  • Spring Boot 整合 Web 开发全攻略