当前位置: 首页 > backend >正文

《如何在 Spring 中实现 MQ 消息的自动重连:监听与发送双通道策略》

大家好,我是G探险者!

📌 背景场景

在高可用分布式系统中,我们经常面临:

  • MQ 集群重启 → 消息监听中断
  • MQ 网络短暂抖动 → 发送端连接失败
  • 一端恢复正常,另一端仍处于挂死状态

如果你只配置了“连接工厂层”的重连,却忽略了监听容器或发送客户端的容错设计,重连机制可能失效,业务陷入长时间不可用。


✅ 核心理念:监听和发送是两个不同的连接“通道”

通道用途组件
监听通道从 MQ 拉取消息Spring JMS 的 MessageListenerContainer
发送通道发送消息到 MQSpring 的 JmsTemplate

这两个通道各自有自己的连接池和生命周期,不能指望一个设置就解决全部问题


🔁 一、监听端的自动重连机制

推荐做法:使用 DefaultMessageListenerContainer 并设置重连间隔

DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDestinationName("MY.QUEUE");
container.setMessageListener(new MyListener());// ✅ 开启事务模式可选
container.setSessionTransacted(true);// ✅ 开启自动重连机制(默认是 true)
container.setRecoveryInterval(5000L); // 每 5 秒重试连接一次container.afterPropertiesSet();
container.start();

DefaultMessageListenerContainer 内部会捕获 ConnectionException 等连接中断异常,自动重试连接。


📤 二、发送端的容灾重连策略

监听容器有容器帮你维护连接,而 发送端(JmsTemplate)则需要连接池支撑

推荐:配合使用 CachingConnectionFactory

ConnectionFactory factory = createIBMConnectionFactory(); // 原始 MQ 工厂
CachingConnectionFactory cachingFactory = new CachingConnectionFactory(factory);// 可选设置缓存大小(缓存 session 的数量)
cachingFactory.setSessionCacheSize(10);JmsTemplate jmsTemplate = new JmsTemplate(cachingFactory);
jmsTemplate.convertAndSend("MY.QUEUE", "Hello MQ");

📌 为啥要用 CachingConnectionFactory

原因描述
重用连接避免每次发送都新建连接(开销大)
支持连接断开重建内部封装连接失效后重建逻辑
提供 session 缓存提升发送效率,降低资源消耗

🧰 三、JMS 厂商参数补充(IBM MQ 举例)

若你使用 IBM MQ,可以在底层工厂设置:

MQQueueConnectionFactory factory = new MQQueueConnectionFactory();factory.setHostName("192.168.1.102");
factory.setPort(1414);
factory.setQueueManager("QM1");
factory.setChannel("CHANNEL1");
factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);// ✅ 启用自动重连
factory.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_OPTIONS,WMQConstants.WMQ_CLIENT_RECONNECT);// ✅ 设置最大重连时间(秒)
factory.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_TIMEOUT, 30);// ✅ 设置连接列表(用于集群 HA)
factory.setStringProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST,"192.168.1.102(1414),192.168.1.103(1414)");

🔗 四、总结策略建议表

场景推荐设置
MQ监听端DefaultMessageListenerContainer + setRecoveryInterval
MQ发送端JmsTemplate + CachingConnectionFactory
多 broker/集群设置 CONNECTION_NAME_LIST
事务性保障setSessionTransacted(true) + onMessage() 异常触发 rollback
监听不生效检查是否调用了 afterPropertiesSet()

📘 下一篇预告:

《JMS事务性会话彻底解析:消息监听中的 commit、rollback 和幂等设计》

我们将深入剖析如何使用事务控制 MQ 消息的消费与回滚,Spring 容器如何自动帮你 commit/rollback,以及如何设计幂等保证系统不重复处理失败消息。


http://www.xdnf.cn/news/14781.html

相关文章:

  • JavaEE初阶第五期:解锁多线程,从 “单车道” 到 “高速公路” 的编程升级(三)
  • Windows环境下Docker容器化的安装与设置指南
  • 时序数据库IoTDB监控指标采集与可视化指南
  • 基于MATLAB的SVM支持向量机的乳腺癌分类方法应用
  • 现代 JavaScript (ES6+) 入门到实战(六):异步的终极形态 - async/await 的优雅魔法
  • HTTP中常见的Content-Type
  • HybridCLR热更新实例项目及改造流程
  • 现代 JavaScript (ES6+) 入门到实战(五):告别回调地狱,Promise 完全入门
  • 免费SSL证书一键申请与自动续期
  • STM32——HAL库总结
  • 【AGI】Qwen VLo:多模态AI的范式重构与AGI演进关键里程碑
  • mac触摸板设置右键
  • 【HuggingFace】模型下载至本地访问
  • 基于Pandas和FineBI的昆明职位数据分析与可视化实现(三)- 职位数据统计分析
  • 条件概率:不确定性决策的基石
  • C#写破解rar文件密码例程
  • 【硬核数学】10. “价值标尺”-损失函数:信息论如何设计深度学习的损失函数《从零构建机器学习、深度学习到LLM的数学认知》
  • Android大图加载优化:BitmapRegionDecoder深度解析与实战
  • IDE/IoT/实践小熊派LiteOS工程配置、编译、烧录、调试(基于 bearpi-iot_std_liteos 源码)
  • 马斯克的 Neuralink:当意念突破肉体的边界,未来已来
  • 同步日志系统深度解析【链式调用】【宏定义】【固定缓冲区】【线程局部存储】【RAII】
  • 《汇编语言:基于X86处理器》第5章 过程(2)
  • C# 委托(为委托添加方法和从委托移除方法)
  • 暑假复习篇之类与对象
  • gantt-task-react的改造使用
  • 源码运行效果图(六)
  • cocos creator 3.8 - 精品源码 - 六边形消消乐(六边形叠叠乐、六边形堆叠战士)
  • 《自动控制原理 》- 第 1 章 自动控制的基本原理与方式
  • 计算机操作系统(十七)内存管理
  • OpenCV图像噪点消除五大滤波方法