RabbitMQ可靠传输——持久性、发送方确认
一、持久性
前面学习消息确认机制时,是为了保证Broker到消费者直接的可靠传输的,但是如果是Broker出现问题(如停止服务),如何保证消息可靠性?对此,RabbitMQ提供了持久化功能:
持久化分为三种:1. 交换机持久化 2. 队列持久化 3.消息持久化
1.1 交换机持久化
一、交换机持久化方法
在声明交换机时,将durable置为true即可,如果不指定,默认为true
二、交换机持久化的作用
避免了当Broker重启时,未重新执行交换机声明代码,而导致生产者消息无法路由
1.2 队列持久化
一、队列持久化方法
在声明队列时,使用durable方法声明的队列为持久化队列,使用nonDurable声明的队列为非持久化队列
对应管理界面:
二、队列持久化的作用
在RabbitMQ服务器重启时,未持久化的队列将丢失,持久化队列保留
1.3 消息持久化
一、消息持久化方法
前面在发送消息时,都是直接指定一个字符串来发送消息,如:
![]()
我们先进入convertAndSend方法,观察其源码:
接下来再进入MessageProperties,观察其源码:
可以看到,不指定消息是否持久化,默认为持久化。
也就是说,如果要指定消息为非持久化,就选哟给convertAndSend传入一个Message对象而不是仅传一个消息字符串,接下来学习如何设置消息非持久化:
1> 创建一个Message对象
Message message = new Message("persistent test...".getBytes(),new MessageProperties());
2> 获取MessageProperties对象,通过setDeliveryMode方法设置消息非持久化
/* *如果要设置为持久化,可以直接换一个String的消息,也可以将这里的 *MessageDeliveryMode.NON_PERSISTENT改为MessageDeliveryMode.PERSISTENT */ message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
3> 将Message对象作为参数传递给converAndSend方法
rabbitTemplate.convertAndSend(Constants.NO_PERSISTENT_EXCHANGE,"",message);
整体代码:
二、消息持久化作用
RabbitMQ服务器重启时,未持久化的消息将丢失(即使消息所在队列未持久化队列),持久化的消息将保留(前提是消息所在的队列是持久化队列)
但是,有了消息确认机制以及持久性就能保证消息传输的可靠性了吗?显然不是,因为消息确认机制保证的是Broker到消费者的可靠性 ,持久性保证的是Broker内部的可靠性,还有生产者到Broker的可靠性没有被保证,因此,RabbitMQ引入了publiser confirms(发送方确认)机制
二、发送方确认机制
前面已经学习了RabbitMQ核心机制之一——持久化,但是这就能保证消息传输的可靠性了吗?显然不是,如果发送方发送的消息没有到达Broker,又谈何持久化?因此,我们还需要了解RabbitMQ的发送方确认机制(通过事务也能解决,但是比较复杂,这里不谈)
publisher confirms 机制又包含两种模式:
1> confirm确认模式
2> return退回模式
2.1 confirm确认模式
一、触发机制
Producer向Broker发送消息时,需要设置一个ConfirmCallback监听,这样消息无论是否到达exchange,这个监听都会触发,如果消息到达exchange,ACK为true,如果没有到达exchange,ACK为false。
二、代码演示
1> 添加RabbitMQ配置
publisher-confirm-type: correlated #配置publisher confirm机制
2> 代码实现(队列、交换机随便声明一个就行,类型随意)
@RequestMapping("/confirm")public String confirm(){//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行confirm方法");if(ack){//ack为true,消息到达exchangeSystem.out.printf("接收到消息,消息ID:%s \n",correlationData==null ? null : correlationData.getId());}else {//ack为false,消息为到达exchangeSystem.out.printf("未接收到消息,消息ID:%s , cause: %s \n",correlationData==null ? null : correlationData.getId(),cause);}}});CorrelationData correlationData = new CorrelationData("1");//发送消息rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData);return "消息发送成功";}
3>运行程序,测试接口
1.正确发送消息(交换机名、routingKey存在)
消息发送成功,接下来查看控制台信息:
可以看到,交换机成功接收到消息
2.错误发送消息(改为一个不存在的交换机名)
再次运行程序,访问接口,发送消息:
消息发送成功,查看控制台:
可以看到,消息并没有到达指定交换机,原因是不存在这个交换机。
3.错误发送消息(改为一个不存在的routingKey)
运行程序,测试接口:
消息发送成功,查看控制台:
可以看到,在routingKey不存在的情况下,消息还是到达了交换机,但是这个消息一定是无法路由到队列的,因此就需要通过publsher confirm的 return退回模式 来解决了
4> 上述代码编写存在的问题
上面我们设置了ConfirmCallback监听经过测试,似乎并没有问题,但是仔细思考就会发现,我们在上面的代码中是通过rabbitTemplate这个对象来设置的,那岂不是前面所有使用rabbitTemplate的接口都被设置了监听?访问其它接口也一样会打印回调方法中的信息?
下面我们测试一下下面的方法:
运行程序,测试接口:
消息发送成功,查看控制台:
可以看到,同样会触发监听,为了避免这个问题,我们可以在config包中自己配置一个RabbitTemplate对象并注入进来:
@Configuration public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//消息到达exchange时的回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行confirm方法");if(ack){//ack为true,表示消息到达交换机System.out.printf("接收到消息,消息ID:%s \n",correlationData==null ? null : correlationData.getId());}else{//ack为false,表示消息未到达交换机System.out.printf("未接收到消息,消息ID:%s , cause: %s \n",correlationData==null ? null : correlationData.getId(),cause);//业务逻辑,如重发等}}});//消息退回时的回调方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回: " + returned);}});return rabbitTemplate;} }
2.2 return退回模式
一、触发机制
当消息到达exchange后,需要路由到queue中,如果一条消息无法被任何queue消费(routingKey不存在或队列不存在),可以把消息退回给producer,退回时可以设置一个回调方法ReturnCallback,对消息进行处理
二、代码演示
//消息退回时的回调方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回: " + returned);}});
修啊routingKey为一个不存在的routingKey:
运行程序,测试接口:
查看控制台:
可以看到,消息被退回
2.3 总结
publisher confirms 机制可以保证消息从生产者到Broker的可靠性,其中confirm模式工作在生产者到exchange之间,return模式工作在exchange到queue之间