当前位置: 首页 > web >正文

RabbitMQ ,消息进入死信交换机

在 RabbitMQ 中,消息会进入死信交换机(Dead Letter Exchange,简称 DLX)通常是以下几种情况之一发生时:

  1. 消息被拒绝(nack)且没有重试

    • 如果消费者拒绝了一个消息(使用 basic.rejectbasic.nack),并且该消息没有被重新排队(即 requeue=false),则该消息会进入死信队列。
  2. 消息超时

    • 如果设置了 消息过期时间x-message-ttl),当消息在队列中停留时间超过设置的 TTL 后,消息会自动被删除,并进入死信交换机。
  3. 队列满

    • 如果队列的大小超过了其最大长度限制(x-max-length),新消息将被丢弃,或者进入死信队列(取决于队列的配置)。
  4. 队列被删除

    • 如果队列在消息仍然存在时被删除,那么这些未被消费的消息也会进入死信交换机。
  5. 队列绑定到死信交换机

    • 你可以为队列设置一个死信交换机(x-dead-letter-exchange),当队列中的消息因为某些原因无法消费或被丢弃时,它们会被路由到该死信交换机中。

通过配置死信交换机,RabbitMQ 可以帮助你捕捉到这些消息并进行后续处理,比如重试或记录日志。

在 RabbitMQ 中配置死信交换机(DLX)包括以下几个步骤:

1. 创建死信交换机(DLX)

死信交换机通常是一个普通的交换机(direct, fanout, topic等),没有特殊的配置要求。可以选择一个交换机来作为消息的死信目标。

const amqp = require('amqplib');async function createDLX() {//连接RabbitMQ服务器const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();// 创建一个普通交换机// 创建一个名为normal_exchange的普通交换机,类型为direct(直连交换机)。直连交换机是一种最简单的交换机类型,消息会根据路由键直接发送到与之绑定的队列const exchangeName = 'normal_exchange';// durable: true表示这个交换机是持久的,即使RabbitMQ重启,交换机也会存在。await channel.assertExchange(exchangeName, 'direct', { durable: true });// 创建死信交换机const dlxExchangeName = 'dlx_exchange';await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });// 创建死信队列(DLQ)const dlqName = 'dlq_queue';await channel.assertQueue(dlqName, { durable: true });// 将死信队列绑定到死信交换机await channel.bindQueue(dlqName, dlxExchangeName, 'dlx_routing_key');// 设置队列的死信交换机(DLX)/*arguments部分非常重要,x-dead-letter-exchange和x-dead-letter-routing-key设置了这个队列的死信交换机和死信路由键。
x-dead-letter-exchange:表示如果队列中的消息无法正常消费(例如过期,队列满了,或消费者拒绝),这些消息会被发送到dlx_exchange死信交换机。
x-dead-letter-routing-key:死信交换机的路由键,用来将消息路由到dlq_queue*/const queueName = 'normal_queue';await channel.assertQueue(queueName, {durable: true,arguments: {'x-dead-letter-exchange': dlxExchangeName, // 设置死信交换机'x-dead-letter-routing-key': 'dlx_routing_key' // 死信路由键}});// 将普通队列绑定到普通交换机await channel.bindQueue(queueName, exchangeName, 'normal_routing_key');// 关闭连接console.log('Created DLX and DLQ successfully');await channel.close();await connection.close();
}createDLX().catch(console.error);

2. 创建死信队列(DLQ)

创建一个死信队列,来接收死信交换机发送过来的消息。

3. 配置原始队列(生产者队列)

在生产者队列的声明时,你需要设置一些额外的队列参数,以便让消息能够进入死信队列。主要是配置 x-dead-letter-exchange 和(可选的)x-dead-letter-routing-key

示例:

rabbitmqctl add_queue myqueue
rabbitmqctl add_queue my_dead_letter_queue

然后配置生产者队列,使其消息能够进入死信交换机。

4. 设置生产者队列参数

在声明生产者队列时,设置 x-dead-letter-exchange(死信交换机)和(可选的)x-dead-letter-routing-key。你还可以设置其他参数,比如 x-message-ttl(消息过期时间)或 x-max-length(队列最大长度),这些都会影响消息何时被投递到死信交换机。

例如,如果你使用的是 direct 类型的交换机,配置方法如下:

channel.assertQueue('myqueue', {arguments: {'x-dead-letter-exchange': 'dlx_exchange',  // 死信交换机'x-dead-letter-routing-key': 'dlx_routing_key'  // 可选的死信路由键}
});

5. 创建死信交换机绑定死信队列

创建并绑定死信交换机到死信队列,以便死信消息能够被路由到正确的队列。

channel.assertExchange('dlx_exchange', 'direct');
channel.assertQueue('my_dead_letter_queue');
channel.bindQueue('my_dead_letter_queue', 'dlx_exchange', 'dlx_routing_key');

6. 处理死信消息

你可以在消费者端对死信队列进行监听、处理,比如进行重试,或者记录日志等。

const amqp = require('amqplib');async function consumeDLQ() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const dlqName = 'dlq_queue'; // 死信队列const normalExchangeName = 'normal_exchange'; // 普通交换机const normalQueueName = 'normal_queue'; // 普通队列// 监听死信队列 channel.assertQueue(dlqName)确保死信队列存在,并开始监听await channel.assertQueue(dlqName, { durable: true });console.log(`Waiting for messages in ${dlqName}.`);// 消费死信队列中的消息channel.consume(dlqName, async (msg) => {if (msg !== null) {console.log(`Received dead letter message: ${msg.content.toString()}`);// 假设我们想重试死信消息,将它发送回普通交换机和队列const messageContent = msg.content.toString();// 重试:将死信消息重新发送到普通交换机try {await channel.publish(normalExchangeName, 'normal_routing_key', Buffer.from(messageContent));console.log(`Re-published message: ${messageContent}`);// 如果成功消费,确认消息已处理channel.ack(msg);} catch (error) {// 处理重试失败的情况,可能会记录日志或报警console.error(`Failed to retry message: ${error.message}`);// 你可以选择不确认(不ack),并处理失败的消息channel.nack(msg);}}});
}
consumeDLQ().catch(console.error);

这样配置后,任何因消息超时、拒绝、队列溢出等原因未被消费的消息都会被路由到配置好的死信交换机和死信队列中。

总结

  • 死信交换机的配置关键在于使用 x-dead-letter-exchange 和(可选的)x-dead-letter-routing-key
  • 配置适当的消息过期时间、队列大小限制等,可以影响消息何时被送到死信队列。
  • 死信队列和交换机要确保有合适的绑定配置。
http://www.xdnf.cn/news/18249.html

相关文章:

  • React diff Vue diff介绍
  • 嵌入式学习硬件I.MX6ULL(五)按键 中断 GIC OCP原则
  • 云原生:重塑软件世界的技术浪潮与编程语言选择
  • 【每天学点‘音视频’】前向纠错 和 漏包重传
  • Flask 入门详解:从零开始构建 Web 应用
  • Linux中基于Centos7使用lamp架构搭建个人论坛(wordpress)
  • Dify web前端源码本地部署详细教程
  • 软件测试覆盖率:真相与实践
  • 【论文阅读69】-DeepHGNN复杂分层结构下的预测
  • Mybatis执行sql流程(一)
  • Dijkstra和多层图 0
  • Linux 系统(如 Ubuntu / CentOS)阿里云虚拟机(ECS)上部署 Bitnami LAMP
  • 自定义ViewPage2滑动切换效果
  • docker compose再阿里云上无法使用的问题
  • MQTT(轻量级消息中间件)基本使用指南
  • MySQL 函数大赏:聚合、日期、字符串等函数剖析
  • 用户认证与应用控制技术
  • DevExtreme Angular UI控件更新:引入全新严格类型配置组件
  • Tmux Xftp及Xshell的服务器使用方法
  • 黑马java八股文全集
  • 实时视频延迟优化实战:RTSP与RTMP播放器哪个延迟更低?
  • Python 项目里的数据清理工作(数据清洗步骤应用)
  • 《算法导论》第 27 章 - 多线程算法
  • K8S集群环境搭建(一)
  • 母猪姿态转换行为识别:计算机视觉与行为识别模型调优指南
  • ——分治——
  • 腾讯开源:视频生成框架Hunyuan-GameCraft
  • MySQL数据库初识
  • 聊聊Vuex vs Pinia
  • 【Python】Python 面向对象编程详解​