当前位置: 首页 > news >正文

如何利用Redis实现延迟队列?

延迟队列概念解析

延迟队列(Delay Queue)是一种特殊的消息队列,核心特性是允许消息在指定的延迟时间后被消费者处理,而非立即消费。它解决了传统队列(FIFO)无法处理“定时任务”或“超时任务”的问题,常见于需要异步延迟处理的场景(如订单超时取消、定时提醒、重试机制等)。


核心要素

  1. 延迟时间:消息入队时需指定“延迟时长”或“绝对执行时间”(如“5分钟后处理”或“2024-08-01 10:00执行”)。
  2. 任务存储:需可靠存储未到期的任务(避免宕机丢失),支持快速查询到期任务。
  3. 触发机制:能高效检测并提取已到期的任务(时间精度需满足业务需求)。
  4. 处理逻辑:消费者对到期任务进行业务处理(如调用接口、更新数据库)。

与普通队列的区别

特性普通队列(FIFO)延迟队列
消费时机消息入队后立即可被消费消息需等待指定延迟时间后才被消费
排序规则按入队顺序(先进先出)按到期时间排序(时间早的优先)
核心目标解耦、异步、削峰填谷解决“定时/超时”类异步任务需求

典型应用场景

  • 订单超时取消:用户下单后未支付,15分钟后自动取消订单。
  • 重试机制:接口调用失败后,延迟30秒重试(避免立即重试加重系统负担)。
  • 定时通知:活动开始前30分钟,向用户推送提醒消息。
  • 缓存预热:每日凌晨3点触发缓存数据加载任务。

关键设计挑战

  1. 延迟精度:需平衡性能与时间精度(如Redis轮询间隔过短会增加QPS,过长可能导致任务延迟处理)。
  2. 持久化:避免因服务宕机导致未到期任务丢失(如Redis通过RDB/AOF持久化,RabbitMQ通过消息持久化)。
  3. 分布式支持:多消费者场景下需避免任务重复消费(如Redis使用Lua脚本原子化取任务)。
  4. 内存/存储限制:单机方案(如JDK DelayQueue)受内存限制,需评估任务量上限。

一、Redis 实现延迟队列(Java 代码)

Redis 延迟队列通常利用 有序集合(ZSET) 存储任务,任务的执行时间作为 score,通过轮询或阻塞方式获取到期任务。以下是核心实现:

1. 依赖引入(Maven)
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.4.3</version>
</dependency>
2. 生产者(任务入队)
import redis.clients.jedis.Jedis;
import java.util.UUID;public class RedisDelayQueueProducer {private final Jedis jedis;private final String queueKey = "delay_queue";public RedisDelayQueueProducer(Jedis jedis) {this.jedis = jedis;}// 添加延迟任务(score 为执行时间戳)public void addTask(String taskData, long executeTime) {String taskId = UUID.randomUUID().toString();jedis.zadd(queueKey, executeTime, taskId + ":" + taskData);}
}
3. 消费者(任务出队)

使用 Lua 脚本原子化获取并删除到期任务(避免多消费者竞态条件):

import redis.clients.jedis.Jedis;
import java.util.Arrays;
import java.util.List;public class RedisDelayQueueConsumer {private final Jedis jedis;private final String queueKey = "delay_queue";// Lua 脚本:获取并删除 score <= 当前时间的任务(最多取 10 个)// 使用Lua保证删除时间和任务的原子性private final String luaScript = "" +"local tasks = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 10)\n" +"if #tasks > 0 then\n" +"    redis.call('zrem', KEYS[1], unpack(tasks))\n" +"end\n" +"return tasks";public RedisDelayQueueConsumer(Jedis jedis) {this.jedis = jedis;}public List<String> pollExpiredTasks() {long currentTime = System.currentTimeMillis();return jedis.eval(luaScript, Arrays.asList(queueKey), Arrays.asList(String.valueOf(currentTime)));}
}

方案缺点(消费者去消费这条消息只有轮询去消费,会导致大量线程空转,特别是高峰期,不太推荐使用):
由于 Redis ZSET 不支持原生的阻塞命令(如 BLPOP ),实际中需通过以下方式模拟阻塞:

  • 短轮询+休眠 :轮询间隔设置为较小值(如100ms),减少延迟但增加 Redis 压力。
  • 事件触发 :结合 Redis 的 PUBLISH/SUBSCRIBE 机制,生产者在添加任务时发布事件,消费者订阅事件后立即触发轮询(减少无效轮询)。

二、其他延迟队列实现方案(Java)

方案 1:JDK DelayQueue(单机版)

基于 java.util.concurrent.DelayQueue,任务需实现 Delayed 接口。

代码实现
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;// 延迟任务类
class DelayTask implements Delayed {private final String taskId;private final String data;private final long expireTime; // 绝对时间戳(毫秒)public DelayTask(String taskId, String data, long delayMs) {this.taskId = taskId;this.data = data;this.expireTime = System.currentTimeMillis() + delayMs;}// 剩余延迟时间@Overridepublic long getDelay(TimeUnit unit) {long diff = expireTime - System.currentTimeMillis();return unit.convert(diff, TimeUnit.MILLISECONDS);}// 按到期时间排序@Overridepublic int compareTo(Delayed o) {return Long.compare(this.expireTime, ((DelayTask) o).expireTime);}
}// 生产者与消费者
public class JdkDelayQueueDemo {private static final DelayQueue<DelayTask> queue = new DelayQueue<>();public static void main(String[] args) {// 生产者:添加延迟 5 秒的任务new Thread(() -> {queue.put(new DelayTask("task1", "data1", 5000));}).start();// 消费者:阻塞获取到期任务new Thread(() -> {while (true) {try {DelayTask task = queue.take();System.out.println("处理任务:" + task.taskId + ", 数据:" + task.data);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}).start();}
}

方案缺点:轮询不推荐

方案 2:RabbitMQ 死信队列(分布式)

通过设置消息 TTL(过期时间),过期后消息转发到死信队列(DLX),消费者监听死信队列。

代码实现(需 RabbitMQ 环境)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class RabbitMqDelayQueueDemo {private static final String NORMAL_EXCHANGE = "normal_exchange";private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 1. 配置死信队列(DLX)channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT);channel.queueDeclare(DEAD_LETTER_QUEUE, true, false, false, null);channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE, "dead_letter_key");// 2. 配置普通队列(设置 TTL 和 DLX)Map<String, Object> normalQueueArgs = new HashMap<>();normalQueueArgs.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);normalQueueArgs.put("x-dead-letter-routing-key", "dead_letter_key");normalQueueArgs.put("x-message-ttl", 5000); // 消息 5 秒后过期channel.queueDeclare("normal_queue", true, false, false, normalQueueArgs);channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.queueBind("normal_queue", NORMAL_EXCHANGE, "normal_key");// 3. 生产者发送消息到普通队列String message = "延迟任务数据";channel.basicPublish(NORMAL_EXCHANGE, "normal_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());// 4. 消费者监听死信队列(处理延迟任务)channel.basicConsume(DEAD_LETTER_QUEUE, false, (consumerTag, delivery) -> {String msg = new String(delivery.getBody());System.out.println("处理延迟任务:" + msg);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}, consumerTag -> {});}
}

方案特点:使用死信队列机制实现延迟队列,如果有RabbitMQ 推荐使用。


方案 3:RocketMq实现(推荐)
一、核心概念

RocketMQ 的延迟时间并非任意值,而是通过「延迟级别」控制(由 Broker 配置决定)。默认延迟级别对应的时间为:

1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h

对应级别为 1~18(级别 0 表示不延迟)。

二、实现步骤
1. 生产者发送延迟消息

在发送消息时,通过 setDelayTimeLevel(int level) 方法设置延迟级别。

示例代码(Java)

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class DelayProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();// 创建消息并设置延迟级别(例如级别 3,对应 10s 延迟)Message msg = new Message("DelayTopic",  // 主题"TagA",        // 标签"Hello RocketMQ".getBytes("UTF-8")  // 消息内容);msg.setDelayTimeLevel(3);  // 设置延迟级别为 3(10秒)// 发送消息producer.send(msg);producer.shutdown();}
}
2. 消费者消费消息

消费者无需特殊配置,正常订阅主题即可,Broker 会在延迟时间到达后投递消息。

示例代码(Java)

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class DelayConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer_group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("DelayTopic", "*");  // 订阅主题consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("收到消息:%s,延迟时间:%ds%n", new String(msg.getBody()), msg.getStoreTimestamp() - msg.getBornTimestamp() / 1000);  // 计算实际延迟时间(毫秒转秒)}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.println("消费者启动");}
}
三、注意事项
  1. 延迟级别限制:Broker 默认仅支持 18 个延迟级别,如需自定义延迟时间,需修改 Broker 配置文件(broker.conf)中的 messageDelayLevel 参数(格式:1s 5s 10s ...)。
  2. 消息时效性:延迟消息的存储和投递依赖 Broker 稳定性,需确保 Broker 有足够资源处理延迟队列。
  3. 版本兼容性:RocketMQ 4.2.0 及以上版本支持延迟消息,低版本需升级。

方案特点:原生api支持延迟队列,推荐此方案,实现简单易配置。


三、方案对比

方案优势劣势内聚耦合扩展性
Redis 延迟队列支持分布式、持久化(RDB/AOF)、高性能(O(logN) 插入/查询)需要维护 Redis 集群;需处理网络抖动(如 Lua 脚本原子性)低(依赖 Redis)高(可通过集群扩展)
JDK DelayQueue无额外依赖、实现简单、单机性能高单机限制(无法分布式)、无持久化(宕机任务丢失)、任务数受内存限制高(纯 JDK)低(仅单机)
RabbitMQ 死信队列天然分布式、支持持久化、消息可靠(ACK 机制)依赖 RabbitMQ 集群;配置复杂(需设置 TTL/DLX);延迟精度受 TTL 限制中(依赖 MQ)中(需扩展 MQ 集群)

总结

  • Redis:适合需要分布式、高吞吐的延迟任务(如订单超时取消)。
  • JDK DelayQueue:适合单机、小规模、对延迟精度要求不高的场景(如本地缓存清理)。
  • RabbitMQ:适合需要严格消息可靠、已集成 MQ 的分布式系统(如电商促销活动通知)。
http://www.xdnf.cn/news/505801.html

相关文章:

  • 【leetcode】2900. 最长相邻不相等子序列 I
  • 数据库索引优化:如何平衡查询与写入性能
  • 劳特巴赫trace32烧录方法
  • 【Linux网络】ARP协议
  • 使用Pinia持久化插件-persist解决刷新浏览器后数据丢失的问题
  • 使用python进行船舶轨迹跟踪
  • 编译原理7~9
  • 【Element UI】表单及其验证规则详细
  • python运算符
  • python训练营打卡第26天
  • Go语言 Gin框架 使用指南
  • js中不同循环的使用以及结束循环方法
  • 两个电机由同一个控制器控制,其中一个电机发生堵转时,另一个电机的电流会变大,是发生了倒灌现象吗?电流倒灌产生的机理是什么?
  • Gartner《How to Leverage Lakehouse Design in Your DataStrategy》学习心得
  • SAP HCM 0008数据存储逻辑
  • 《棒球万事通》球类运动有哪些项目·棒球1号位
  • c++ 运算符重载
  • 16 C 语言布尔类型与 sizeof 运算符详解:布尔类型的三种声明方式、执行时间、赋值规则
  • qt6 c++操作qtableview和yaml
  • 使用 CodeBuddy 开发一款富交互的屏幕录制与注释分享工具开发纪实
  • C语言查漏补缺
  • Codeforces Round 1024 (Div.2)
  • 【C/C++】C++返回值优化:RVO与NRVO全解析
  • 安全性(三):信息安全的五要素及其含义
  • Python-92:最大乘积区间问题
  • 从AI系统到伦理平台:技术治理的开放转向
  • docker部署第一个Go项目
  • 语音转文字并进行中英文翻译
  • 【JavaScript】 js 基础知识强化复习
  • 2025系统架构师---选择题知识点(押题)