Redission 实现延迟队列
前言
之前使用redis实现了消息队列,但是没有延迟消费的功能,现在编写一个可以实现延迟消费的功能,同时也能满足及时消费,只需要将延迟时间设置0就行了,用到了Redission,不需要基于stream进行一些复杂配置。
参考文章
【redis缓存】怎么使用 Redis 实现一个延时队列?_redis实现延时队列-CSDN博客
Redisson 的延迟队列真的能用吗?一文看透原理 + 坑点_redission延时队列原理-CSDN博客
Spring Boot 集成 Redisson 实现消息队列_springboot redis消息队列-CSDN博客
引入相关依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.23.3</version></dependency><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId><version>2.13.5</version></dependency>
redission配置类
@Configuration
@Slf4j
public class RedissonConfig {private final static String REDISSON_PREFIX = "redis://";@ResourceRedisProperties redisProperties;@Beanpublic RedissonClient redissonClient() {Config config = new Config();String url = REDISSON_PREFIX + redisProperties.getHost() + ":" + redisProperties.getPort();config.useSingleServer().setAddress(url).setPassword(redisProperties.getPassword()).setDatabase(redisProperties.getDatabase()).setPingConnectionInterval(2000); //设置2秒心跳间隔config.setLockWatchdogTimeout(10000L); //看门狗超时缩短(分布式锁自动续期更灵敏)显式设置为10秒try {return Redisson.create(config);} catch (Exception e) {log.error("RedissonClient init redis url:{}Exception:{}", url, e.getMessage());return null;}}
}
延迟队列配置类
相关的配置文件
server:port: ${SERVER_PORT:9211}# Spring
spring:application:# 应用名称name: ruoyi-redis-msg2redis:host: localhostport: 6379password: 123456mvc:pathmatch:matching-strategy: ant_path_matcher# 是否启用redis延迟队列
redission:delayqueue:enable: truequeues:- queueName: goodsDelayQueuebeanId: goodsDelayConsumeHandlerdesc: 商品消费的延迟队列- queueName: orderDelayQueuebeanId: orderDelayConsumeHandlerdesc: 订单消费的延迟队列
@Configuration
@ConfigurationProperties(prefix = "redission.delayqueue")
@Slf4j
@Data
public class RedisDelayQueueConfigProperties {private List<QueueConfig> queues;@Datapublic static class QueueConfig {private String queueName;private String desc;private String beanId;}
}
上面的配置文件中配置了2个重要参数
- 队列名
- 每个队列的处理类
下面的配置类,会为每个队列创建一个新的线程并且循环运行,阻塞监听队列(当队列有元素则take获取,如果没有则阻塞等待)
/*** @Description: redis延迟队列配置*/
@Configuration
@ConditionalOnProperty(value = "redission.delayqueue.enable")
@Slf4j
public class RedisDelayQueueConfig {@Autowiredprivate RedisDelayQueueConfigProperties configProperties;/*** @Description 线程池*/@Bean("delayExecutor")public ExecutorService getDelayExecutor() {return Executors.newFixedThreadPool(5);}@Beanpublic List<RedisDelayQueueConfigProperties.QueueConfig> startRedisDelayQueue(ApplicationContext applicationContext,RedissonClient redissonClient,@Qualifier("delayExecutor") ExecutorService executorService) {//根据配置的队列创建对应的线程,1个队列对应1个线程List<RedisDelayQueueConfigProperties.QueueConfig> queueConfigs = configProperties.getQueues();for (RedisDelayQueueConfigProperties.QueueConfig queueConfig : queueConfigs) {startThread(applicationContext, redissonClient, executorService, queueConfig);}return queueConfigs;}private <T> void startThread(ApplicationContext applicationContext,RedissonClient redissonClient,ExecutorService executorService,RedisDelayQueueConfigProperties.QueueConfig queueConfig) {//redissonClient获取阻塞队列RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueConfig.getQueueName());// 由于此线程需要常驻,可以新建线程,不用交给线程池管理Thread thread = new Thread(() -> {log.info("启动监听队列线程:{}", queueConfig.getQueueName());while (true) {try {// 获取到执行类RedisDelayQueueHandler redisDelayQueueHandler = applicationContext.getBean(queueConfig.getBeanId(),RedisDelayQueueHandler.class);// 从阻塞队列中获取被执行对象,为空时阻塞,作为参数传递给redisDelayQueueHandlerT t = blockingFairQueue.take();log.info("监听队列成功:{},交给处理类:{}", queueConfig.getQueueName(),queueConfig.getBeanId());//池线程执行消费executorService.submit(() -> redisDelayQueueHandler.exec(t));} catch (Exception e) {log.error("监听队列错误,", e);try {Thread.sleep(10000);} catch (InterruptedException ex) {ex.fillInStackTrace();}}}});thread.setName(queueConfig.getQueueName());thread.start();}
}
消费者处理类
这里我们创建一个接口,这样在刚才配置中根据配置文件,就能根据队列名选择对应的消费者
/*** @Description: 延迟队列执行方法,需要具体实现*/
public interface RedisDelayQueueHandler<T> {/*** @Description 执行方法*/void exec(T t);
}/*** @Description: 商品延时消费处理类*/
@Component
@Slf4j
public class GoodsDelayConsumeHandler implements RedisDelayQueueHandler<String> {@Overridepublic void exec(String str) {log.info("开始消费:{}" , str);}
}/*** @Description: 订单延时消费处理类*/
@Component
@Slf4j
public class OrderDelayConsumeHandler implements RedisDelayQueueHandler<String> {@Overridepublic void exec(String str) {log.info("开始消费:{}" , str);}
}
生产者和工具类
@Data
public class Msg {String queueName;String content;long delay;
}
生产者只需要传入队列名、消息体、延迟时间
@RestController
@RequestMapping("/msg")
@Api(tags = "MsgController")
@Slf4j
public class MsgController {@ResourceRedisDelayQueueConfigProperties properties;@Resourceprivate RedisDelayQueueUtil redisDelayQueueUtil;@PostMapping("/addDelayQueue")@ApiOperation(value = "添加延时消息")public R<?> addDelayQueue(@RequestBody Msg msg) {// 模拟业务中添加延迟任务boolean b = redisDelayQueueUtil.addDelayQueue(msg.getQueueName(),msg.getContent(),msg.getDelay());return R.toR(b);}@PostMapping("/findQueues")@ApiOperation(value = "查询可用队列")public R<?> findQueues() {List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueues();return R.okList(queues,queues.size());}}
/*** @Description: 延迟队列增删工具类*/
@Slf4j
@Component
public class RedisDelayQueueUtil {@Resourceprivate RedissonClient redissonClient;@Resourceprivate RedisDelayQueueConfigProperties properties;/*** 添加延时队列* @param queueName* @param content* @param delay* @return*/public boolean addDelayQueue(String queueName, String content,Long delay) {return addDelayQueue(queueName,content,delay,TimeUnit.SECONDS);}/*** 添加延时队列* @param queueName* @param content* @param endTime* @return*/public boolean addDelayQueue(String queueName, String content, Date endTime) {long seconds = DateUtils.diffTime(DateUtils.getNowDate(), endTime);if (seconds <= 0) {log.error("不能小于当前时间");throw new RuntimeException("不能小于当前时间");}return addDelayQueue(queueName,content, seconds, TimeUnit.SECONDS);}/*** @Description 添加延迟队列*/private boolean addDelayQueue(String queueName,String content,Long delay,TimeUnit timeUnit) {validateParam(queueName, content);try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.offer(content, delay, timeUnit);log.info("添加延时队列成功:{},content={},delay={}", queueName,content,timeUnit.toSeconds(delay) + "秒");} catch (Exception e) {log.info("添加延时队列失败:{},content={},delay={}", queueName,content,timeUnit.toSeconds(delay) + "秒");throw new RuntimeException(e.getMessage());}return true;}/*** 获取延迟队列** @param queueName*/public Object getDelayQueue(String queueName){if (StringUtils.isBlank(queueName)) {throw new ServiceException("队列名不能为空");}RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);Object value = delayedQueue.poll();return value;}/*** 删除指定队列中的消息** @param content 指定删除的消息对象队列值(同队列需保证唯一性)* @param queueName 指定队列键*/public boolean removeDelayedQueue(String queueName,String content) {validateParam(queueName, content);RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueName);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);return delayedQueue.remove(content);}/*** 校验参数** @param queueName 消息主题* @param content 消息体*/private void validateParam(String queueName, String content) {List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueues();if(CollUtil.isEmpty(queues)){throw new ServiceException("请配置队列名");}if (CharSequenceUtil.isBlank(queueName)) {throw new ServiceException("队列名不能为空");}boolean b = queues.stream().noneMatch(q -> q.getQueueName().equals(queueName));if (b) {throw new ServiceException("没有配置该队列");}if (CharSequenceUtil.isBlank(content)) {throw new ServiceException("消息体不能为空");}}}
测试流程
1、首先查一下可用的队列,只有配置了的才能使用
2、然后我们往goodsDelayQueue队列插入两条数据,延时时间分别为100s、60s
3、往redis插入数据
这里每个创建两个key
1、redisson_delay_queue:{队列名}
2、redisson_delay_queue_timeout:{队列名}
第1个key是由redisson_delay_queue固定前缀+队列名组成,里面的值是ist集合
第2个key是由redisson_delay_queue_timeout固定前缀+队列名组成,里面的值是Zset集合
Zset集合解释
redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score,将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务执行时间(比如30min),
任务内容(比如订单超时支付系统自动取消)等信息体。然后另起一个线程,该线程会周期性地从zset中取出score最小(即最早要执行的)的任务,如果该任务的score小于当前时间戳,则执行任务,否则等待一段时间再次检查,
直到任务可以执行,执行任务后,通过Redis的remove命令删除已经成功执行的任务即可。
当Zset到了执行时间,队列的线程就会从take中获取到数据,然后再开一个线程交给消费者处理