死信队列完整处理方案
1. 数据库表设计(先创建异常消息表)
CREATE TABLE `dead_letter_message` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`exchange` varchar(255) NOT NULL COMMENT '原始交换机',`routing_key` varchar(255) NOT NULL COMMENT '原始路由键',`message_content` text NOT NULL COMMENT '消息内容',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',`last_retry_time` datetime DEFAULT NULL COMMENT '最后重试时间',`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态(0-待处理,1-处理中,2-处理成功,3-处理失败)',`error_msg` text COMMENT '错误信息',PRIMARY KEY (`id`),KEY `idx_status_retry` (`status`,`retry_count`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='死信消息表';
2. 死信消息处理器完整实现
@Component
@RabbitListener(queues = DLX_QUEUE_NAME)
public class DlxReceiver {private static final Logger log = LoggerFactory.getLogger(DlxReceiver.class);@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate DeadLetterMessageMapper deadLetterMessageMapper;private static final String EXCHANGE_NAME = "DlxDirectExchange";private static final String ROUTING_KEY = "DlxDirectRouting";@RabbitHandlerpublic void process(Map<String, String> message) {log.info("开始处理死信队列消息: {}", message);try {// 1. 将消息持久化到数据库DeadLetterMessageDO entity = new DeadLetterMessageDO();entity.setExchange(EXCHANGE_NAME);entity.setRoutingKey(ROUTING_KEY);entity.setMessageContent(JacksonUtil.writeValueAsString(message));entity.setStatus(0); // 待处理状态deadLetterMessageMapper.insert(entity);log.info("死信消息已持久化到数据库, ID: {}", entity.getId());// 2. 这里可以添加额外的异常处理逻辑,比如发送报警通知等} catch (Exception e) {log.error("处理死信消息异常", e);// 即使保存失败也不重新投递,避免无限循环// 可以考虑记录日志或发送系统报警}}/*** 定时任务:处理积压的死信消息*/@Scheduled(fixedDelay = 30000) // 每30秒执行一次public void processPendingMessages() {// 1. 查询待处理的死信消息(限制每次处理数量)List<DeadLetterMessageDO> messages = deadLetterMessageMapper.selectPendingMessages(100);if (messages.isEmpty()) {log.debug("没有待处理的死信消息");return;}log.info("开始批量处理死信消息,数量: {}", messages.size());for (DeadLetterMessageDO message : messages) {try {// 2. 更新为处理中状态message.setStatus(1); // 处理中message.setRetryCount(message.getRetryCount() + 1);message.setLastRetryTime(new Date());deadLetterMessageMapper.updateById(message);// 3. 重新投递消息Map<String, String> content = JacksonUtil.readMapValue(message.getMessageContent(),String.class,String.class);rabbitTemplate.convertAndSend(message.getExchange(),message.getRoutingKey(),content);// 4. 更新为处理成功状态message.setStatus(2); // 处理成功deadLetterMessageMapper.updateById(message);log.info("死信消息重新投递成功, ID: {}", message.getId());} catch (Exception e) {log.error("处理死信消息失败, ID: " + message.getId(), e);// 更新为处理失败状态message.setStatus(3); // 处理失败message.setErrorMsg(e.getMessage());deadLetterMessageMapper.updateById(message);// 可以根据重试次数决定是否放弃或发送报警if (message.getRetryCount() >= 3) {log.warn("死信消息已达到最大重试次数, ID: {}", message.getId());// 可以发送报警通知管理员}}}}
}
3. MyBatis Mapper 接口
@Mapper
public interface DeadLetterMessageMapper {@Insert("INSERT INTO dead_letter_message(exchange, routing_key, message_content, create_time, status) " +"VALUES(#{exchange}, #{routingKey}, #{messageContent}, NOW(), #{status})")@Options(useGeneratedKeys = true, keyProperty = "id")int insert(DeadLetterMessageDO entity);@Update("UPDATE dead_letter_message SET status = #{status}, retry_count = #{retryCount}, " +"last_retry_time = #{lastRetryTime}, error_msg = #{errorMsg} WHERE id = #{id}")int updateById(DeadLetterMessageDO entity);@Select("SELECT * FROM dead_letter_message WHERE status = 0 ORDER BY create_time ASC LIMIT #{limit}")List<DeadLetterMessageDO> selectPendingMessages(@Param("limit") int limit);@Select("SELECT * FROM dead_letter_message WHERE id = #{id}")DeadLetterMessageDO selectById(long id);
}
4. 实体类
@Data
public class DeadLetterMessageDO {private Long id;private String exchange;private String routingKey;private String messageContent;private Date createTime;private Integer retryCount;private Date lastRetryTime;private Integer status; // 0-待处理,1-处理中,2-处理成功,3-处理失败private String errorMsg;
}
5. 管理接口(可选)
可以添加REST接口供管理员手动重试或查看死信消息:
@RestController
@RequestMapping("/api/dead-letter")
@Slf4j
public class DeadLetterController {@Autowiredprivate DeadLetterMessageMapper deadLetterMessageMapper;@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/list")public List<DeadLetterMessageDO> listPendingMessages() {return deadLetterMessageMapper.selectPendingMessages(100);}@PostMapping("/retry/{id}")public String retryMessage(@PathVariable Long id) {DeadLetterMessageDO message = deadLetterMessageMapper.selectById(id);if (message == null) {return "消息不存在";}try {Map<String, String> content = JacksonUtil.readMapValue(message.getMessageContent(),String.class,String.class);rabbitTemplate.convertAndSend(message.getExchange(),message.getRoutingKey(),content);message.setStatus(2); // 处理成功message.setRetryCount(message.getRetryCount() + 1);message.setLastRetryTime(new Date());deadLetterMessageMapper.updateById(message);return "重新投递成功";} catch (Exception e) {log.error("手动重试死信消息失败", e);return "重新投递失败: " + e.getMessage();}}
}
以上为实现RabbitMQ中死信队列的模板内容,可以复制粘贴后稍作修改使用