当前位置: 首页 > ds >正文

Redission 实现延迟队列

前言

之前使用redis实现了消息队列,但是没有延迟消费的功能,现在编写一个可以实现延迟消费的功能,同时也能满足及时消费,只需要将延迟时间设置0就行了,用到了Redission,不需要基于stream进行一些复杂配置。

参考文章

【redis缓存】怎么使用 Redis 实现一个延时队列?_redis实现延时队列-CSDN博客

Redisson 的延迟队列真的能用吗?一文看透原理 + 坑点_redission延时队列原理-CSDN博客

Spring Boot 集成 Redisson 实现消息队列_springboot redis消息队列-CSDN博客


引入相关依赖

         <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.23.3</version></dependency><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId><version>2.13.5</version></dependency>

redission配置类

@Configuration
@Slf4j
public class RedissonConfig {private final static String REDISSON_PREFIX = "redis://";@ResourceRedisProperties redisProperties;@Beanpublic RedissonClient redissonClient() {Config config = new Config();String url = REDISSON_PREFIX + redisProperties.getHost() + ":" + redisProperties.getPort();config.useSingleServer().setAddress(url).setPassword(redisProperties.getPassword()).setDatabase(redisProperties.getDatabase()).setPingConnectionInterval(2000);  //设置2秒心跳间隔config.setLockWatchdogTimeout(10000L);     //看门狗超时缩短(分布式锁自动续期更灵敏)显式设置为10秒try {return Redisson.create(config);} catch (Exception e) {log.error("RedissonClient init redis url:{}Exception:{}", url, e.getMessage());return null;}}
}

延迟队列配置类

相关的配置文件

server:port: ${SERVER_PORT:9211}# Spring
spring:application:# 应用名称name: ruoyi-redis-msg2redis:host: localhostport: 6379password: 123456mvc:pathmatch:matching-strategy: ant_path_matcher# 是否启用redis延迟队列
redission:delayqueue:enable: truequeues:- queueName: goodsDelayQueuebeanId: goodsDelayConsumeHandlerdesc: 商品消费的延迟队列- queueName: orderDelayQueuebeanId: orderDelayConsumeHandlerdesc: 订单消费的延迟队列
@Configuration
@ConfigurationProperties(prefix = "redission.delayqueue")
@Slf4j
@Data
public class RedisDelayQueueConfigProperties {private List<QueueConfig> queues;@Datapublic static class QueueConfig {private String queueName;private String desc;private String beanId;}
}

上面的配置文件中配置了2个重要参数

  1. 队列名
  2. 每个队列的处理类

下面的配置类,会为每个队列创建一个新的线程并且循环运行,阻塞监听队列(当队列有元素则take获取,如果没有则阻塞等待)

/*** @Description: redis延迟队列配置*/
@Configuration
@ConditionalOnProperty(value = "redission.delayqueue.enable")
@Slf4j
public class RedisDelayQueueConfig {@Autowiredprivate RedisDelayQueueConfigProperties configProperties;/*** @Description 线程池*/@Bean("delayExecutor")public ExecutorService getDelayExecutor() {return Executors.newFixedThreadPool(5);}@Beanpublic List<RedisDelayQueueConfigProperties.QueueConfig> startRedisDelayQueue(ApplicationContext applicationContext,RedissonClient redissonClient,@Qualifier("delayExecutor") ExecutorService executorService) {//根据配置的队列创建对应的线程,1个队列对应1个线程List<RedisDelayQueueConfigProperties.QueueConfig> queueConfigs = configProperties.getQueues();for (RedisDelayQueueConfigProperties.QueueConfig queueConfig : queueConfigs) {startThread(applicationContext, redissonClient, executorService, queueConfig);}return queueConfigs;}private <T> void startThread(ApplicationContext applicationContext,RedissonClient redissonClient,ExecutorService executorService,RedisDelayQueueConfigProperties.QueueConfig queueConfig) {//redissonClient获取阻塞队列RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueConfig.getQueueName());// 由于此线程需要常驻,可以新建线程,不用交给线程池管理Thread thread = new Thread(() -> {log.info("启动监听队列线程:{}", queueConfig.getQueueName());while (true) {try {// 获取到执行类RedisDelayQueueHandler redisDelayQueueHandler = applicationContext.getBean(queueConfig.getBeanId(),RedisDelayQueueHandler.class);// 从阻塞队列中获取被执行对象,为空时阻塞,作为参数传递给redisDelayQueueHandlerT t = blockingFairQueue.take();log.info("监听队列成功:{},交给处理类:{}", queueConfig.getQueueName(),queueConfig.getBeanId());//池线程执行消费executorService.submit(() -> redisDelayQueueHandler.exec(t));} catch (Exception e) {log.error("监听队列错误,", e);try {Thread.sleep(10000);} catch (InterruptedException ex) {ex.fillInStackTrace();}}}});thread.setName(queueConfig.getQueueName());thread.start();}
}

消费者处理类

这里我们创建一个接口,这样在刚才配置中根据配置文件,就能根据队列名选择对应的消费者

/*** @Description: 延迟队列执行方法,需要具体实现*/
public interface RedisDelayQueueHandler<T> {/*** @Description 执行方法*/void exec(T t);
}/*** @Description: 商品延时消费处理类*/
@Component
@Slf4j
public class GoodsDelayConsumeHandler implements RedisDelayQueueHandler<String> {@Overridepublic void exec(String str) {log.info("开始消费:{}" , str);}
}/*** @Description: 订单延时消费处理类*/
@Component
@Slf4j
public class OrderDelayConsumeHandler implements RedisDelayQueueHandler<String> {@Overridepublic void exec(String str) {log.info("开始消费:{}" , str);}
}

生产者和工具类

@Data
public class Msg {String queueName;String content;long delay;
}

生产者只需要传入队列名、消息体、延迟时间

@RestController
@RequestMapping("/msg")
@Api(tags = "MsgController")
@Slf4j
public class MsgController {@ResourceRedisDelayQueueConfigProperties properties;@Resourceprivate RedisDelayQueueUtil redisDelayQueueUtil;@PostMapping("/addDelayQueue")@ApiOperation(value = "添加延时消息")public R<?> addDelayQueue(@RequestBody Msg msg) {// 模拟业务中添加延迟任务boolean b = redisDelayQueueUtil.addDelayQueue(msg.getQueueName(),msg.getContent(),msg.getDelay());return R.toR(b);}@PostMapping("/findQueues")@ApiOperation(value = "查询可用队列")public R<?> findQueues() {List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueues();return R.okList(queues,queues.size());}}
/*** @Description: 延迟队列增删工具类*/
@Slf4j
@Component
public class RedisDelayQueueUtil {@Resourceprivate RedissonClient redissonClient;@Resourceprivate RedisDelayQueueConfigProperties properties;/*** 添加延时队列* @param queueName* @param content* @param delay* @return*/public boolean addDelayQueue(String queueName, String content,Long delay) {return addDelayQueue(queueName,content,delay,TimeUnit.SECONDS);}/*** 添加延时队列* @param queueName* @param content* @param endTime* @return*/public  boolean addDelayQueue(String queueName, String content, Date endTime) {long seconds = DateUtils.diffTime(DateUtils.getNowDate(), endTime);if (seconds <= 0) {log.error("不能小于当前时间");throw new RuntimeException("不能小于当前时间");}return addDelayQueue(queueName,content, seconds, TimeUnit.SECONDS);}/*** @Description 添加延迟队列*/private boolean addDelayQueue(String queueName,String content,Long delay,TimeUnit timeUnit) {validateParam(queueName, content);try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.offer(content, delay, timeUnit);log.info("添加延时队列成功:{},content={},delay={}", queueName,content,timeUnit.toSeconds(delay) + "秒");} catch (Exception e) {log.info("添加延时队列失败:{},content={},delay={}", queueName,content,timeUnit.toSeconds(delay) + "秒");throw new RuntimeException(e.getMessage());}return true;}/*** 获取延迟队列** @param queueName*/public Object getDelayQueue(String queueName){if (StringUtils.isBlank(queueName)) {throw new ServiceException("队列名不能为空");}RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);Object value = delayedQueue.poll();return value;}/*** 删除指定队列中的消息** @param content 指定删除的消息对象队列值(同队列需保证唯一性)* @param queueName 指定队列键*/public boolean removeDelayedQueue(String queueName,String content) {validateParam(queueName, content);RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);return delayedQueue.remove(content);}/*** 校验参数** @param queueName 消息主题* @param content   消息体*/private void validateParam(String queueName, String content) {List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueues();if(CollUtil.isEmpty(queues)){throw new ServiceException("请配置队列名");}if (CharSequenceUtil.isBlank(queueName)) {throw new ServiceException("队列名不能为空");}boolean b = queues.stream().noneMatch(q -> q.getQueueName().equals(queueName));if (b) {throw new ServiceException("没有配置该队列");}if (CharSequenceUtil.isBlank(content)) {throw new ServiceException("消息体不能为空");}}}

测试流程

1、首先查一下可用的队列,只有配置了的才能使用

2、然后我们往goodsDelayQueue队列插入两条数据,延时时间分别为100s、60s

3、往redis插入数据

这里每个创建两个key

1、redisson_delay_queue:{队列名}

2、redisson_delay_queue_timeout:{队列名}

第1个key是由redisson_delay_queue固定前缀+队列名组成,里面的值是ist集合

第2个key是由redisson_delay_queue_timeout固定前缀+队列名组成,里面的值是Zset集合

Zset集合解释

redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score,将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务执行时间(比如30min),
任务内容(比如订单超时支付系统自动取消)等信息体。然后另起一个线程,该线程会周期性地从zset中取出score最小(即最早要执行的)的任务,如果该任务的score小于当前时间戳,则执行任务,否则等待一段时间再次检查,
直到任务可以执行,执行任务后,通过Redis的remove命令删除已经成功执行的任务即可。

当Zset到了执行时间,队列的线程就会从take中获取到数据,然后再开一个线程交给消费者处理

http://www.xdnf.cn/news/19428.html

相关文章:

  • Verilog 硬件描述语言自学——重温数电之典型组合逻辑电路
  • 基于 Spring Boot3 的ZKmall开源商城分层架构实践:打造高效可扩展的 Java 电商系统
  • 大语言模型的“可解释性”探究——李宏毅大模型2025第三讲笔记
  • Linux kernel 多核启动
  • Tomcat 企业级运维实战系列(六):综合项目实战:Java 前后端分离架构部署
  • 〔从零搭建〕数据中枢平台部署指南
  • 汽车加气站操作工证考试的复习重点是什么?
  • 如何取得专案/设计/设定/物件的属性
  • ETCD学习笔记
  • 手表--带屏幕音响-时间制切换12/24小时
  • 从零开始学习单片机18
  • 《云原生架构从崩溃失控到稳定自愈的实践方案》
  • 消费 $83,用Claude 实现临床护理系统记录单(所见即所得版)
  • C++三方服务异步拉起
  • MySQL函数 - String函数
  • Google Protobuf初体验
  • 深层语义在自然语言处理中的理论框架与技术融合研究
  • 使用电脑操作Android11手机,连接步骤
  • Python爬虫实战:研究统计学方法,构建电商平台数据分析系统
  • 面经分享--小米Java一面
  • 具有类人先验知识的 Affordance-觉察机器人灵巧抓取
  • STM32 之GP2Y1014AU0F的应用--基于RTOS的环境
  • 老题新解|不与最大数相同的数字之和
  • PCB 局部厚铜工艺:技术升级与新兴场景应用,猎板加工亮点
  • 同步/异步日志库
  • 响应式编程框架Reactor【4】
  • Web 聊天室消息加解密方案详解
  • open webui源码分析13-模型管理
  • 数据结构--栈(Stack) 队列(Queue)
  • Python API接口实战指南:从入门到精通