如何让rabbitmq保存服务断开重连?保证高可用?
在 Spring Boot 集成 RabbitMQ 时,可以通过以下几种方式让 RabbitMQ 保存服务断开重连,以保证高可用:
配置自动重连
-
application.properties 配置 :在 Spring Boot 的配置文件 application.properties 中,可以设置 RabbitMQ 的连接工厂相关参数来开启自动重连功能。
-
spring.rabbitmq.listener.simple.recovery-interval
:设置自动重连的时间间隔,在指定的时间间隔后会尝试重新建立连接。spring.rabbitmq.connection-timeout
:设置连接超时时间,单位为毫秒。如果在指定的时间内无法完成连接,就会认为连接失败,之后会根据其他重连机制进行尝试。
-
-
代码配置 :可以在配置类中通过编程的方式配置连接工厂的相关参数来实现自动重连。
@Bean
public ConnectionFactory rabbitConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setConnectionTimeout(5000);connectionFactory.setRequestedHeartbeat(20);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setTopologyRecoveryEnabled(true);return connectionFactory;
}
其中,setAutomaticRecoveryEnabled(true)
启用自动重连功能,setTopologyRecoveryEnabled(true)
表示在重连时会自动恢复队列、交换器等拓扑结构。
使用重试机制
-
@Retryable 注解 :在发送消息的方法上添加
@Retryable
注解,当发送消息失败时,会根据注解的配置进行重试。
@Retryable(value = {AmqpException.class}, maxAttempts = 3, backoff = @Backoff(delay = 3000))
public void sendMessage(String message) {rabbitTemplate.convertAndSend("exchange", "routingKey", message);
}
这里maxAttempts
指定了最大重试次数为 3 次,backoff
指定了重试的延迟时间为 3000 毫秒。
-
RetryTemplate 配置 :可以通过配置
RetryTemplate
来自定义重试策略,例如设置重试的间隔、重试的次数等,然后将其注入到消息发送相关的类中使用。
@Bean
public RetryTemplate retryTemplate() {RetryTemplate retryTemplate = new RetryTemplate();FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();fixedBackOffPolicy.setBackOffPeriod(3000);retryTemplate.setBackOffPolicy(fixedBackOffPolicy);SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();retryPolicy.setMaxAttempts(3);retryTemplate.setRetryPolicy(retryPolicy);return retryTemplate;
}
在发送消息的方法中使用retryTemplate.execute
来执行发送消息的操作。
集群模式
-
搭建 RabbitMQ 集群 :将多个 RabbitMQ 服务器组成一个集群,当一个节点出现故障时,客户端可以自动连接到其他可用的节点上。在 Spring Boot 中,可以通过配置多个 RabbitMQ 节点的地址来实现与集群的连接。
@Bean
public ConnectionFactory rabbitConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();List<String> addresses = new ArrayList<>();addresses.add("amqp://guest:guest@localhost:5672");addresses.add("amqp://guest:guest@localhost:5673");addresses.add("amqp://guest:guest@localhost:5674");connectionFactory.setAddresses(addresses);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");return connectionFactory;
}
这样,当连接到某个节点失败时,会自动尝试连接其他节点。
消息持久化
-
消息持久化 :将消息设置为持久化存储,这样在 RabbitMQ 服务重启后,消息不会丢失。可以通过在发送消息时设置消息的持久化标志来实现。
rabbitTemplate.convertAndSend("exchange", "routingKey", message, messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return messagePostProcessor;
});
同时,还需要将队列设置为持久化,在声明队列时指定durable
参数为true
。
Queue queue = new Queue("queueName", true);
消费者确认机制
-
手动确认 :在消息消费完成后,手动发送确认消息,这样在服务断开重连后,RabbitMQ 会重新发送未确认的消息,保证消息的可靠性。
@RabbitListener(queues = "queueName")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {// 消息处理逻辑channel.basicAck(tag, false);} catch (Exception e) {channel.basicNack(tag, false, true);}
}
如果消息处理成功,则发送basicAck
确认消息;如果处理失败,则发送basicNack
拒绝消息,并将消息重新入队。
监控和告警
-
监控工具 :使用监控工具如 Prometheus 和 Grafana 等来监控 RabbitMQ 的运行状态,包括连接数、队列长度、消息速率等。当出现异常情况时,及时发现并进行处理。
-
告警机制 :配置告警规则,当监控指标超过设定的阈值时,触发告警通知,及时通知运维人员进行干预,确保 RabbitMQ 的稳定运行。