SpringBoot集成RabbitMQ使用过期时间+死信队列实现延迟队列
有的时候呢,我们需要使用到延迟队列,RabbitMQ不像RocketMQ一样默认就支持延迟队列,RabbitMQ是不支持延迟队列的,但是呢?我们可以通过正常的队列加上消息的过期时间,配置死信队列,来模拟实现延迟队列。
一、创建普通队列(配置过期时间),绑定死信队列
# 死信交换机配置 以直连交换机为列
my:exchangeNormalName: exchange.normal.a #正常交换机queueNormalName: queue.normal.a #正常队列exchangeDlxName: exchange.dlx.a #死信交换机queueDlxName: queue.dlx.a #死信队列
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 死信交换机*/
@Configuration
public class DeadLetterExchangeConfig {@Value("${my.exchangeNormalName}")private String exchangeNormalName;@Value("${my.queueNormalName}")private String queueNormalName;@Value("${my.exchangeDlxName}")private String exchangeDlxName;@Value("${my.queueDlxName}")private String queueDlxName;/*** 正常交换机* @return*/@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}/*** 正常队列* @return*/@Beanpublic Queue normalQueue(){Map<String, Object> arguments = new HashMap<>();//重点:设置这两个参数//设置队列的死信交换机arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置死信路由key,要跟死信交换机和死信队列绑定的路由key一致arguments.put("x-dead-letter-routing-key","error");return new Queue(queueNormalName,true,false,false,arguments);}/*** 正常交换机和正常队列绑定* @param normalExchange* @param normalQueue* @return*/@Beanpublic Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");}/*** 死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}/*** 死信队列* @return*/@Beanpublic Queue dlxQueue(){return new Queue(queueDlxName,true,false,false);}/*** 死信交换机和死信队列绑定* @param dlxExchange* @param dlxQueue* @return*/@Beanpublic Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");}}
二、生产者推送消息
/*** 死信交换机-直连交换机 消息过期*/@Testpublic void sendDirectMessage2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "test message, hello!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);//给消息设置过期时间MessagePostProcessor messagePostProcessor = message -> {//20秒后过期message.getMessageProperties().setExpiration("20000");message.getMessageProperties().setContentEncoding("UTF-8");//持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//非持久化//message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);return message;};//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchangerabbitTemplate.convertAndSend("exchange.normal.a", "order", map, messagePostProcessor);}
三、死信队列消费者
消费者监听死信队列,上边我们创建的普通队列的消息过期时间是20秒,相当于我们向普通队列中推送消息之后,20秒过期则进入死信队列中,消费者监听死信队列,等待消息进入死信队列之后再进行消费处理。这样就模拟了一个延迟队列。
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;/*** 死信队列消费 手动重新投递消息*/
@Component
public class DlxReceiver_1 {@Autowiredprivate RabbitTemplate rabbitTemplate;// 监听死信队列@RabbitListener(queues = "queue.dlx.a", ackMode = "MANUAL")public void handleDlqMessage(Map<String, Object> messageBody, Channel channel, Message message) throws IOException {try {/***TODO: 业务处理*/channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);System.out.println("消息消费成功: " + messageBody);} catch (Exception e) {channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);System.err.println("消息消费失败: " + e.getMessage());}}}
以上大概就是SpringBoot集成RabbitMQ实现延迟队列的全过程。