当前位置: 首页 > news >正文

如何让rabbitmq保存服务断开重连?保证高可用?

在 Spring Boot 集成 RabbitMQ 时,可以通过以下几种方式让 RabbitMQ 保存服务断开重连,以保证高可用:

配置自动重连

  • application.properties 配置 :在 Spring Boot 的配置文件 application.properties 中,可以设置 RabbitMQ 的连接工厂相关参数来开启自动重连功能。

    • spring.rabbitmq.listener.simple.recovery-interval:设置自动重连的时间间隔,在指定的时间间隔后会尝试重新建立连接。 spring.rabbitmq.connection-timeout:设置连接超时时间,单位为毫秒。如果在指定的时间内无法完成连接,就会认为连接失败,之后会根据其他重连机制进行尝试。

  • 代码配置 :可以在配置类中通过编程的方式配置连接工厂的相关参数来实现自动重连。

@Bean
public ConnectionFactory rabbitConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setConnectionTimeout(5000);connectionFactory.setRequestedHeartbeat(20);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setTopologyRecoveryEnabled(true);return connectionFactory;
}

其中,setAutomaticRecoveryEnabled(true) 启用自动重连功能,setTopologyRecoveryEnabled(true) 表示在重连时会自动恢复队列、交换器等拓扑结构。

使用重试机制

  • @Retryable 注解 :在发送消息的方法上添加@Retryable注解,当发送消息失败时,会根据注解的配置进行重试。

@Retryable(value = {AmqpException.class}, maxAttempts = 3, backoff = @Backoff(delay = 3000))
public void sendMessage(String message) {rabbitTemplate.convertAndSend("exchange", "routingKey", message);
}

这里maxAttempts指定了最大重试次数为 3 次,backoff指定了重试的延迟时间为 3000 毫秒。

  • RetryTemplate 配置 :可以通过配置RetryTemplate来自定义重试策略,例如设置重试的间隔、重试的次数等,然后将其注入到消息发送相关的类中使用。

@Bean
public RetryTemplate retryTemplate() {RetryTemplate retryTemplate = new RetryTemplate();FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();fixedBackOffPolicy.setBackOffPeriod(3000);retryTemplate.setBackOffPolicy(fixedBackOffPolicy);SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();retryPolicy.setMaxAttempts(3);retryTemplate.setRetryPolicy(retryPolicy);return retryTemplate;
}

在发送消息的方法中使用retryTemplate.execute来执行发送消息的操作。

集群模式

  • 搭建 RabbitMQ 集群 :将多个 RabbitMQ 服务器组成一个集群,当一个节点出现故障时,客户端可以自动连接到其他可用的节点上。在 Spring Boot 中,可以通过配置多个 RabbitMQ 节点的地址来实现与集群的连接。

@Bean
public ConnectionFactory rabbitConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();List<String> addresses = new ArrayList<>();addresses.add("amqp://guest:guest@localhost:5672");addresses.add("amqp://guest:guest@localhost:5673");addresses.add("amqp://guest:guest@localhost:5674");connectionFactory.setAddresses(addresses);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");return connectionFactory;
}

这样,当连接到某个节点失败时,会自动尝试连接其他节点。

消息持久化

  • 消息持久化 :将消息设置为持久化存储,这样在 RabbitMQ 服务重启后,消息不会丢失。可以通过在发送消息时设置消息的持久化标志来实现。

rabbitTemplate.convertAndSend("exchange", "routingKey", message, messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return messagePostProcessor;
});

 同时,还需要将队列设置为持久化,在声明队列时指定durable参数为true

Queue queue = new Queue("queueName", true);

消费者确认机制

  • 手动确认 :在消息消费完成后,手动发送确认消息,这样在服务断开重连后,RabbitMQ 会重新发送未确认的消息,保证消息的可靠性。

@RabbitListener(queues = "queueName")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {// 消息处理逻辑channel.basicAck(tag, false);} catch (Exception e) {channel.basicNack(tag, false, true);}
}

如果消息处理成功,则发送basicAck确认消息;如果处理失败,则发送basicNack拒绝消息,并将消息重新入队。

监控和告警

  • 监控工具 :使用监控工具如 Prometheus 和 Grafana 等来监控 RabbitMQ 的运行状态,包括连接数、队列长度、消息速率等。当出现异常情况时,及时发现并进行处理。

  • 告警机制 :配置告警规则,当监控指标超过设定的阈值时,触发告警通知,及时通知运维人员进行干预,确保 RabbitMQ 的稳定运行。

http://www.xdnf.cn/news/398467.html

相关文章:

  • 付费专栏·Python潮流周刊电子书合集(epub、pdf、markdown)下载
  • 基于微信小程序的城市特色旅游推荐应用的设计与实现
  • DVWA靶场保姆级通关教程--07SQL注入下
  • 机器学习第七讲:概率统计 → 预测可能性,下雨概率70%就是典型应用
  • 药物抓取准确率97.3%!YOLO-EASB+IAFFGA-Net:如何让智能药房机器人靠视觉算法征服杂乱场景?
  • 搭建大数据学习的平台
  • 服务网格的“解剖学” - 控制平面与数据平面
  • 支付宝API-SKD-GO版
  • 打破GPU显存墙:FlashAttention-2算法在LLM训练中的极致优化实践
  • OpenCV CUDA 模块中在 GPU 上对图像或矩阵进行 翻转(镜像)操作的一个函数 flip()
  • Dockerfile 常见语法和指令
  • 青少年编程与数学 02-019 Rust 编程基础 08课题、字面量、运算符和表达式
  • RDD的五大特征
  • DICOM 网络服务实现:医学影像传输与管理的技术实践
  • Hadoop的组成,HDFS架构,YARN架构概述
  • 互联网大厂Java求职面试实战:Spring Boot与微服务场景深度解析
  • 学习日志03 java
  • 【Java继承】——面向对象编程的基石
  • ngx_http_limit_conn_module精准连接控制
  • C#里WPF使用触发器实现鼠标点击响应
  • 谷歌Gemini生图升级:与GPT-4o的对决,谁更胜一筹?
  • 克隆虚拟机组成集群
  • Python爬虫第20节-使用 Selenium 爬取小米商城空调商品
  • Electron学习大纲
  • 从零开始的python学习(七)P89+P90+P91+P92+P93+P94
  • 关于高并发GIS数据处理的一点经验分享
  • flutter 的 json序列化和反序列化
  • 南京邮电大学金工实习答案
  • 全模态具身智能:从 VLM 到 MLLM
  • Multisim14使用教程详尽版--(2025最新版)