RabbitMQ ④-持久化 || 死信队列 || 延迟队列 || 事务
消息确认机制
简单介绍
RabbitMQ Broker 发送消息给消费者后,消费者处理该消息时可能会发生异常,导致消费失败。
如果 Broker 在发送消息后就直接删了,就会导致消息的丢失。
为了保证消息可靠到达消费者并且成功处理了该消息,RabbitMQ 提供了消息确认机制。
消费者在订阅队列时,可以指定 autoAck
参数,该参数指定是否自动确认消息。
autoAck=true
:消费者接收到消息后,自动确认消息,RabbitMQ Broker 立即删除该消息。autoAck=false
:消费者接收到消息后,不自动确认消息,需要消费者调用channel.basicAck()
方法确认消息。如果消费者处理消息时发生异常,则可以调用channel.basicNack()
方法,表示不确认该消息的接收。
Spring AMQP 提供了三种模式的消息确认
AcknowledgeMode.NONE
:消息一经发送,就不管它了,不管消费者是否处理成功,都直接确认消息。AcknowledgeMode.AUTO
(默认):自动确认,消息接收后,消费者处理成功时自动确认该消息,如果处理时发送异常,则不会确认消息。AcknowledgeMode.MANUAL
:手动确认,消息接收后,消费者处理成功时,需要调用channel.basicAck()
方法确认消息,如果处理时发送异常,则需要调用channel.basicNack()
方法,表示不确认该消息的接收。
代码示例
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: manual
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Bean("ackExchange")public DirectExchange ackExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();}
// @Bean("binding")
// public Binding binding(Exchange exchange, Queue queue) {
// return BindingBuilder.bind(queue)
// .to(exchange)
// .with("ack")
// .noargs();
// }@Bean("binding1")public Binding binding1(@Qualifier("ackExchange") DirectExchange exchange, @Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "消费者消息确认喵~");return "发送成功";}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void process(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(), "UTF-8"),deliveryTag);try {System.out.println("模拟处理业务逻辑");int a = 3 / 0;System.out.println("模拟处理业务完成");channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}
持久性机制
简单介绍
前面讲了消费端处理消息时,消息如何不丢失,但是如何保证 RabbitMQ
服务停掉以后,生产者发送的消息不丢失呢。默认情况下, RabbitMQ
退出或者由于某种原因崩溃时,会忽视队列和消息。
为了保证消息持久化,RabbitMQ 提供了持久化机制,分别是:交换机持久化、队列持久化和消息持久化。
- 交换机持久化:使用
ExchangeBuilder.durable(true)
方法创建的交换机,RabbitMQ 会将交换机信息持久化到磁盘,重启 RabbitMQ 后可以自动恢复。 - 队列持久化:使用
QueueBuilder.durable(true)
方法创建的队列,RabbitMQ 会将队列信息持久化到磁盘,重启 RabbitMQ 后可以自动恢复。 - 消息持久化:消息持久化可以保证消息不丢失,即使 RabbitMQ 重启或者崩溃,消息也不会丢失。
将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能,这是因为写入磁盘的速度相比于写入内存的速度还是很慢的,对于可靠性不是那么高的消息,可以不采用持久化处理以提高整体的吞吐量。
在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。
尽管设置了持久化,也不能保证就一定可以持久化。这是因为在将这些持久化信息写入磁盘时也是需要时间的,如果 RabbitMQ 在这段时间内崩溃,那么这些信息也会丢失。
代码示例
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("persistQueue")public Queue persistQueue() {return QueueBuilder.nonDurable(Constants.PERSIST_QUEUE).build();}@Bean("persistExchange")public DirectExchange persistExchange() {return ExchangeBuilder.directExchange(Constants.PERSIST_EXCHANGE).durable(false).build();}@Bean("binding2")public Binding binding2(@Qualifier("persistExchange") Exchange exchange, @Qualifier("persistQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("persist").noargs();}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/persist")public String persist() {Message message = new Message("消费者消息确认喵~".getBytes(StandardCharsets.UTF_8), new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);rabbitTemplate.convertAndSend(Constants.PERSIST_EXCHANGE, "persist", message);return "发送成功";}
}
发送方确认机制
简单介绍
在发送方将消息发送至 RabbitMQ Broker 时,也有可能出现消息丢失的情况。
为了保证消息可靠到达 Broker,RabbitMQ 提供了发送方确认机制。
发送方确认机制是指,在消息发送到 Broker
后,发送方会等待 Broker
回应,如果发送方收到消息,则发送方认为消息发送成功,如果发送方未收到消息,则发送方认为消息发送失败,可以重新发送。
RabbitMQ 提供了两种方式保证发送方发送的消息的可靠传输
confirm 确认模式
:发送方在发送消息后,对发送方设置一个ConfirmCallback
的监听,无论消息是否抵达Exchange
,这个监听都会被执行,如果消息抵达了Exchange
,则ACK
为true
,如果消息没有抵达Exchange
,则ACK
为false
。returns 退回模式
:尽管确认消息发送至Exchange
后,也依然不能完全保证消息的可靠传输。在Exchange
和Queue
会有一个Routing Key(Binding Key)
的绑定关系,如果消息没有匹配到任何一个Queue
,则通过returns
模式则会退回到发送方。
代码示例
confirm 确认模式
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: auto# 消息发送确认机制publisher-confirm-type: correlated
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange() {return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).durable(true).build();}@Bean("binding3")public Binding binding3(@Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("confirm");}
}
package com.ljh.extensions.rabbit.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: Themberfue* @date: 2025/4/30 21:08* @description:*/
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// ? 设置确认消息机制rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack) {System.out.printf("接收到消息,消息ID:%s\n",correlationData == null ? null : correlationData.getId());} else {System.out.printf("未接收到消息,消息ID:%s;原因:%s\n",correlationData == null ? null : correlationData.getId(), cause);}}});return rabbitTemplate;}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "confirmRabbitTemplate")RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {// ! 直接使用 setConfirmCallback 会影响其他接口的调用// ! 且只能设置一个确认回调,多次发起请求会报错
// rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
// @Override
// public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// System.out.println("执行了confirm方法");
// if (ack) {
// System.out.printf("接收到消息,消息ID:%s\n",
// correlationData == null ? null : correlationData.getId());
// } else {
// System.out.printf("未接收到消息,消息ID:%s\n;原因:%s",
// correlationData == null ? null : correlationData.getId(), cause);
// }
// }
// });CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + "1", "confirm", "confirm test...", correlationData);return "消息发送成功";}
}
returns 退回模式
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: auto# 消息发送退回机制publisher-returns: true
package com.ljh.extensions.rabbit.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: Themberfue* @date: 2025/4/30 21:08* @description:*/
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// ? 设置消息退回机制rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息被退回:" + returned);}});return rabbitTemplate;}
}
总结:如何确保消息的可靠性传输
- 发送方 => 服务端:通过发送方确认机制,confirm 确认模式 和 returns 退回模式,确保消息可靠到达。
- 服务端:通过持久化机制,保证消息不丢失。
- 服务端 => 接收方:通过消息确认机制,确保消息被消费者正确消费。
重试机制
简单介绍
消息在处理失败后,重新发送,重新处理,这便是消息重试机制。
RabbitMQ 提供了消息重试机制,可以设置消息最大重试次数,超过最大重试次数还未成功消费,则消息会被丢弃。
代码示例
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:
# 消息接收确认机制# acknowledge-mode: manual # 手动确认时,重发机制无效acknowledge-mode: autoretry:enabled: true # 开启重试机制initial-interval: 5000ms # 重发时间间隔max-attempts: 5 # 最大重试次数
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("retryQueue")public Queue retryQueue() {return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange() {return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).durable(true).build();}@Bean("binding4")public Binding binding4(@Qualifier("retryExchange") DirectExchange exchange, @Qualifier("retryQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("retry");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/retry")public String retry() {rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");return "消息发送成功";}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void process(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",Constants.RETRY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);int num = 3 / 0;System.out.println("业务处理完成");}
}
TTL 机制
简单介绍
TTL(Time To Live)机制,可以设置消息的存活时间,超过存活时间还未消费,则消息会被丢弃。
RabbitMQ 提供了 TTL 机制,可以设置队列和消息的 TTL 值,超过 TTL 值还未消费,则消息会被丢弃。
两者区别:
- 设置队列 TTL 值,一旦消息过期,就会从队列中删除。设置队列过期时间,队列中已过期的消息肯定在队列头部,RabbitMQ 只要定期扫描对头的消息是否过期即可。
- 设置消息 TTL 值,即使消息过期,也不会马上删除,只有在发送至消费者时才会检测其是否已经过期,如果过期才会删除。设置消息过期时间,每个消息的过期时间都可能不尽相同,所以需要扫描整个队列的消息才可确定是否过期,为了确保性能,所以采取类似于
懒汉模式
的方式。
将队列 TTL 设置为 30s,第一个消息的 TTL 设置为 30s,第二个消息的 TTL 设置为 10s。
理论上说,在 10s 后,第二个消息应该被丢弃。但由于设置了队列 TTL 值的机制,只会扫描队头的消息是否过期,所以在第一个消息过期之前,第二个消息不会被删除。
代码示例
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("ttlQueue")public Queue ttlQueue() {return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20_000) // ? 设置队列的 TTL 值.build();}@Bean("ttlExchange")public DirectExchange ttlExchange() {return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();}@Bean("binding5")public Binding binding5(@Qualifier("ttlExchange") DirectExchange exchange, @Qualifier("ttlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ttl");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// ? 设置消息的 TTL 值message.getMessageProperties().setExpiration("10000");return message;}};rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...",messagePostProcessor);return "消息发送成功";}
}
死信队列
简单介绍
死信(Dead Letter),就是因为某种原因,导致消费者消费失败的消息,称之为死信。
死信队列,当消息在一个队列中变成死信时,它就被重新被发送到另一个交换机,该交换机就是死信交换机(Dead Letter Exchange)。
该死信交换机绑定死信队列,当消息被重新发送到死信交换机时,它就被重新投递到死信队列。
消息变成死信会有如下几种原因:
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue 参数设置为 false。
- 消息过期。
- 队列达到最大长度。
代码示例
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DlConfig {@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE) // ? 配置该队列的死信交换机.deadLetterRoutingKey("dl") // ? 死信交换机绑定死信队列的 Routing Key.ttl(10_000).maxLength(10L) // ? 设置队列最大长度.build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal");}@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).durable(true).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") DirectExchange exchange, @Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dl");}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class DlListener {@RabbitListener(queues = Constants.NORMAL_QUEUE)public void processNormal(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",Constants.NORMAL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = Constants.DL_QUEUE)public void processDl(Message message) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",new Date(), Constants.DL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/dl")public String dl() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "normal test...");return "消息发送成功:" + new Date();}
}
延迟队列
简单介绍
延迟队列(Delay Queue),即消息被发送以后,并不想让消费者立刻消费该消息,而是等待一段时间后再消费。
延迟队列的使用场景有很多,比如:
- 智能家居:智能设备产生的事件,如开关、温度变化等,可以先存放在延迟队列中,等待一段时间后再消费。
- 日常管理:预定会议,需要在会议开始前 15 分钟通知参会人员。
- 订单处理:订单创建后,需要 30 分钟后才会发货。
RabbitMQ 本身没有提供延迟队列的功能,但是基于消息过期后会变成死信的特性,可以通过设置 TTL 和死信队列来实现延迟队列的功能。
代码示例
@RequestMapping("/delay")
public String delay() {//发送带ttl的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..."+ new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000");//10s过期return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..."+ new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000");//20s过期return messagePostProcessor;});return "发送成功!";
}
由于 RabbitMQ 检查消息是否过期的机制,如果 20s 的消息先到队列,那么 10s 的消息只会在 20s 后才会被检查到过期。
延迟队列插件
RabbitMQ 官方提供了延迟队列插件,可以实现延迟队列的功能。
延迟队列插件
安装队列插件
延迟队列插件下载地址
下载插件后,需要将插件放到 RabbitMQ 的插件目录(/usr/lib/rabbitmq/plugins
)下,然后重启 RabbitMQ 服务。
代码示例
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).durable(true).delayed() // ? 设置队列为延迟队列.build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") DirectExchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delay");}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void processDelay(Message message) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",new Date(), Constants.DELAY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {message.getMessageProperties().setDelayLong(20_000L); // ? 设置消息的延迟发送时间return message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {message.getMessageProperties().setDelayLong(10_000L); // ? 设置消息的延迟发送时间return message;});return "消息发送成功:" + new Date();}
}
事务机制
简单介绍
RabbitMQ
是基于 AMQP
协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制事务。
Spring AMQP
也提供了对事务相关的操作。RabbitMQ
事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败。
代码示例
配置事务管理器:
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;
}@Bean
public RabbitTransactionManager rabbitTransactionManager (ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);
}
@Bean("transQueue")
public Queue transQueue() {return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
}
@Transactional
@RequestMapping("/trans")
public String trans() {String msg = "trans test...";System.out.println("发送第一条消息:" + msg + 1);transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);int a = 3 / 0;System.out.println("发送第二条消息:" + msg + 2);transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);return "消息发送成功:";
}
消息分发
简单介绍
一个队列可以给多个消费者消费,默认情况下,RabbitMQ 是以轮询的方式将消息分发给这些消费者,不管该消费是否已经消费并且确认。
这种情况是不太合理的,如果每个消费者消费的能力都不同,有的消费者消费快,有的慢,这会极大降低整体系统的吞吐量和处理速度。
我们可以使用 channel.basicQos(int prefetchCount)
来限制当前信
道上的消费者所能保持的最大未确认消息的数量。
当该消费者达到最大的 prefetchCount
限制时,RabbitMQ 会停止向该消费者分发消息,直到该消费者的未确认消息数量小于 prefetchCount
时。
代码示例
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 手动确认prefetch: 5 # 队列最大接收五条消息
@Bean("qosQueue")
public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();
}
@Bean("qosExchange")
public DirectExchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();
}
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosExchange") DirectExchange exchange, @Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("qos");
}
@RequestMapping("/qos")
public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test...");}return "消息发送成功:" + new Date();
}
@RabbitListener(queues = Constants.QOS_QUEUE)
public void process(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(), "UTF-8"),deliveryTag);try {System.out.println("模拟处理业务逻辑");System.out.println("模拟处理业务完成");// channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}
}
应用场景
- 限流:可以根据消费者的处理能力,设置
prefetchCount
限制每个消费者所能接收的消息数量,从而达到限流的目的。 - 负载均衡:通过将
prefetchCount
设置为1
,通过设 prefetch = 1 的方式,告诉 RabbitMQ 一次只给一个消费者一条消息,也就是说,在处理并确认前一条消息之前,不要向该消费者发送新消息。相反,它会将它分派给下一个不忙的消费者。