当前位置: 首页 > java >正文

死信队列完整处理方案

1. 数据库表设计(先创建异常消息表)

CREATE TABLE `dead_letter_message` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`exchange` varchar(255) NOT NULL COMMENT '原始交换机',`routing_key` varchar(255) NOT NULL COMMENT '原始路由键',`message_content` text NOT NULL COMMENT '消息内容',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',`last_retry_time` datetime DEFAULT NULL COMMENT '最后重试时间',`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态(0-待处理,1-处理中,2-处理成功,3-处理失败)',`error_msg` text COMMENT '错误信息',PRIMARY KEY (`id`),KEY `idx_status_retry` (`status`,`retry_count`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='死信消息表';

2. 死信消息处理器完整实现

@Component
@RabbitListener(queues = DLX_QUEUE_NAME)
public class DlxReceiver {private static final Logger log = LoggerFactory.getLogger(DlxReceiver.class);@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate DeadLetterMessageMapper deadLetterMessageMapper;private static final String EXCHANGE_NAME = "DlxDirectExchange";private static final String ROUTING_KEY = "DlxDirectRouting";@RabbitHandlerpublic void process(Map<String, String> message) {log.info("开始处理死信队列消息: {}", message);try {// 1. 将消息持久化到数据库DeadLetterMessageDO entity = new DeadLetterMessageDO();entity.setExchange(EXCHANGE_NAME);entity.setRoutingKey(ROUTING_KEY);entity.setMessageContent(JacksonUtil.writeValueAsString(message));entity.setStatus(0); // 待处理状态deadLetterMessageMapper.insert(entity);log.info("死信消息已持久化到数据库, ID: {}", entity.getId());// 2. 这里可以添加额外的异常处理逻辑,比如发送报警通知等} catch (Exception e) {log.error("处理死信消息异常", e);// 即使保存失败也不重新投递,避免无限循环// 可以考虑记录日志或发送系统报警}}/*** 定时任务:处理积压的死信消息*/@Scheduled(fixedDelay = 30000) // 每30秒执行一次public void processPendingMessages() {// 1. 查询待处理的死信消息(限制每次处理数量)List<DeadLetterMessageDO> messages = deadLetterMessageMapper.selectPendingMessages(100);if (messages.isEmpty()) {log.debug("没有待处理的死信消息");return;}log.info("开始批量处理死信消息,数量: {}", messages.size());for (DeadLetterMessageDO message : messages) {try {// 2. 更新为处理中状态message.setStatus(1); // 处理中message.setRetryCount(message.getRetryCount() + 1);message.setLastRetryTime(new Date());deadLetterMessageMapper.updateById(message);// 3. 重新投递消息Map<String, String> content = JacksonUtil.readMapValue(message.getMessageContent(),String.class,String.class);rabbitTemplate.convertAndSend(message.getExchange(),message.getRoutingKey(),content);// 4. 更新为处理成功状态message.setStatus(2); // 处理成功deadLetterMessageMapper.updateById(message);log.info("死信消息重新投递成功, ID: {}", message.getId());} catch (Exception e) {log.error("处理死信消息失败, ID: " + message.getId(), e);// 更新为处理失败状态message.setStatus(3); // 处理失败message.setErrorMsg(e.getMessage());deadLetterMessageMapper.updateById(message);// 可以根据重试次数决定是否放弃或发送报警if (message.getRetryCount() >= 3) {log.warn("死信消息已达到最大重试次数, ID: {}", message.getId());// 可以发送报警通知管理员}}}}
}

3. MyBatis Mapper 接口

@Mapper
public interface DeadLetterMessageMapper {@Insert("INSERT INTO dead_letter_message(exchange, routing_key, message_content, create_time, status) " +"VALUES(#{exchange}, #{routingKey}, #{messageContent}, NOW(), #{status})")@Options(useGeneratedKeys = true, keyProperty = "id")int insert(DeadLetterMessageDO entity);@Update("UPDATE dead_letter_message SET status = #{status}, retry_count = #{retryCount}, " +"last_retry_time = #{lastRetryTime}, error_msg = #{errorMsg} WHERE id = #{id}")int updateById(DeadLetterMessageDO entity);@Select("SELECT * FROM dead_letter_message WHERE status = 0 ORDER BY create_time ASC LIMIT #{limit}")List<DeadLetterMessageDO> selectPendingMessages(@Param("limit") int limit);@Select("SELECT * FROM dead_letter_message WHERE id = #{id}")DeadLetterMessageDO selectById(long id);
}

4. 实体类

@Data
public class DeadLetterMessageDO {private Long id;private String exchange;private String routingKey;private String messageContent;private Date createTime;private Integer retryCount;private Date lastRetryTime;private Integer status; // 0-待处理,1-处理中,2-处理成功,3-处理失败private String errorMsg;
}

5. 管理接口(可选)

可以添加REST接口供管理员手动重试或查看死信消息:

@RestController
@RequestMapping("/api/dead-letter")
@Slf4j
public class DeadLetterController {@Autowiredprivate DeadLetterMessageMapper deadLetterMessageMapper;@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/list")public List<DeadLetterMessageDO> listPendingMessages() {return deadLetterMessageMapper.selectPendingMessages(100);}@PostMapping("/retry/{id}")public String retryMessage(@PathVariable Long id) {DeadLetterMessageDO message = deadLetterMessageMapper.selectById(id);if (message == null) {return "消息不存在";}try {Map<String, String> content = JacksonUtil.readMapValue(message.getMessageContent(),String.class,String.class);rabbitTemplate.convertAndSend(message.getExchange(),message.getRoutingKey(),content);message.setStatus(2); // 处理成功message.setRetryCount(message.getRetryCount() + 1);message.setLastRetryTime(new Date());deadLetterMessageMapper.updateById(message);return "重新投递成功";} catch (Exception e) {log.error("手动重试死信消息失败", e);return "重新投递失败: " + e.getMessage();}}
}

以上为实现RabbitMQ中死信队列的模板内容,可以复制粘贴后稍作修改使用

http://www.xdnf.cn/news/1221.html

相关文章:

  • AiEditor v1.3.8 发布
  • 2023蓝帽杯初赛内存取证-3
  • vmstat指令介绍
  • 自动化测试实现容器化部署
  • C#内存管理深度解析:值类型与引用类型全解析
  • Linux命令-pidstat
  • Python简介与入门
  • 使用若依二次开发商城系统-4:商品属性
  • 无价值的劳动与暴力威胁是否会导致人性逆转?-来自DeepSeek
  • WP快主题
  • 激光SLAM算法综述
  • 滚动的足球-第16届蓝桥第4次STEMA测评Scratch真题第3题
  • Android Studio调试中的坑二
  • C++与C
  • 1.微服务拆分与通信模式
  • NLP高频面试题(五十一)——LSTM详解
  • 【机器学习】决策树算法中的 “黄金指标”:基尼系数深度剖析
  • MCP Server架构设计详解:一文掌握框架核心
  • PowerBi中REMOVEFILTERS怎么使用?
  • 虚无隧穿产生宇宙(true nothing tunneling) 是谁提出的
  • 【Spring Boot】MyBatis多表查询的操作:注解和XML实现SQL语句
  • 权限管理降维打击:AI自动生成分布式系统鉴权代码(含JWT刷新策略)
  • 如何通过证书认证安全登录堡垒机、防火墙和VPN?安当KSP密钥管理系统助力企业实现零信任身份验证
  • 【中级软件设计师】程序设计语言基础成分
  • 3.1.2 materialDesign:Card 的使用介绍
  • VUE篇之,实现锚点定位,滚动与导航联动
  • 黑盒测试——等价类划分法实验
  • 虚拟机超详细Ubuntu安装教程
  • 测试基础笔记第九天
  • Idea创建项目的搭建