Redis——Redisson篇
Redisson
特性 | Jedis/Lettuce | Redisson |
---|---|---|
分布式锁 | 需手动实现(如 SETNX + Lua ) | ✅ 内置 RLock ,支持可重入、公平锁 |
高可用 | 依赖 Redis 哨兵/集群,但客户端需手动处理故障转移 | ✅ 自动监听 Redis 节点变化,支持哨兵/集群自动切换 |
分布式集合 | 仅支持基础 Redis 命令 | ✅ 提供 RMap 、RList 、RSet 等高级 API |
发布/订阅 | 基础 PUB/SUB 命令 | ✅ 提供 RTopic ,支持消息确认、持久化 |
性能优化 | 需手动优化(如连接池、管道) | ✅ 内置连接池、异步/响应式编程支持 |
Spring 集成 | 需手动配置 RedisTemplate | ✅ 直接集成 Spring Cache、Spring Data Redis |
分布式集合
- 特点:分布式部署redis时,对数据的操作会自动同步到其他节点上
-
RBucket
(分布式String
)set(V value)
:设置值set(V value, long time, TimeUnit unit)
:设置有效期setIfAbsent(V value)
:setnxget()
:获取值delete()
:删除值
RBucket<String> bucket = redissonClient.getBucket("myKey"); //不要忘记设置泛型 bucket.set("Hello Redisson"); // 设置值 String value = bucket.get(); // 获取值 bucket.delete(); // 删除键
-
RMap
(分布式Hash
)put(K key, V value)
:添加键值对,覆盖已有键put(K key, V value, long time, TimeUnit unit)
:设置键值对及过期时间putIfAbsent(K key, V value)
:原子性添加,仅当键不存在时生效get(Object key)
:获取键对应的值remove(Object key)
:删除键值对addAndGet(key, delta)
:自增
RMap<String, String> map = redissonClient.getMap("userMap"); map.put("user1", "Alice"); String value = map.get("user1"); map.expire(10, TimeUnit.SECONDS); // 设置10秒过期时间
-
RList
(分布式List
)add(E element)
:在列表末尾添加元素get(int index)
:根据索引获取元素remove(int index)
:删除指定索引的元素
RList<String> list = redissonClient.getList("myList"); list.add("item1"); // 尾部插入 list.addFirst("item0"); // 头部插入 String firstItem = list.get(0); // 通过索引访问
-
RSet
(分布式Set
)add(E element)
:添加元素(自动去重)contains(Object o)
:判断元素是否存在remove(Object o)
:删除元素
RSet<String> set = redissonClient.getSet("mySet"); set.add("user1"); set.add("user2"); boolean contains = set.contains("user1"); // 检查元素是否存在
分布式锁
-
setnx
的分布式锁存在的问题- 不可重入:需要加锁前先检查
- 无重试机制:需要手动实现
- 超时释放锁可能导致误删:需要释放锁前先检查
- 集群架构下,节点宕机可能导致其他节点未同步分布式锁:需要使用Redlock算法
-
redisson
分布式锁特点- redisson提供可重入机制:使用Hash存储锁信息,键为锁名称,字段为
UUID+线程ID
的组合,值为重入次数 - redisson提供可自旋机制:调用
tryLock()
时,若锁被占用,线程会循环尝试获取,每次尝试间隔可通过参数配置 - redisson提供了看门狗机制:封装LUA脚本检查锁,避免了锁误删的问题
- redisson提供了
mutiLock
机制:使用Redlock
算法,仅当超过半数节点成功获取锁时,才认为加锁成功
- redisson提供可重入机制:使用Hash存储锁信息,键为锁名称,字段为
public class RedissonConfigTest extends BaseTest{@Autowiredprivate RedissonClient redissonClient;@Testpublic void testRedisson() throws Exception{try{// 分布式锁必须声明锁名称以区分锁RLock lock = redissonClient.getLock("myLock"); /* 尝试获取锁,入参说明* tryLock(long waitTime, TimeUnit unit):waitTime+unit指定获取锁的最大等待时间、默认锁不过期* tryLock(long waitTime, long leaseTime, TimeUnit unit):leaseTime指定锁过期时间* tryLock():不等待,锁不过期*/boolean isLock = lock.tryLock(2, 10, TimeUnit.SECONDS);if (isLock){// TODO}}catch (InterruptedException e) {throw new RuntimeException(e);}finally{if (isLock){// 释放锁lock.unlock();}}}}
-
举例:高并发抢购商品,使用redisson分布式锁防止超卖
- 较复杂的逻辑下,LUA脚本可能无法一次性完成,此时还是得使用分布式锁
- 用户抢购时没有拿到锁会自动重试,而不是返回错误
- 极端情况下,业务发送阻塞导致分布式锁自动释放,仍然有超卖的风险,可以加LUA脚本辅助
@Service
public class OrderServiceImpl implements IOrderService {@Autowiredprivate RedissonClient redissonClient;/** 抢购商品* @param goodId 商品id* @return 抢购结果*/@Overridepublic Result panicBuy(Long goodId) {Long userId = ThreadLocalDto.threadLocal.get().getUserId();RMap<String, String> goodMap = redissonClient.getMap("good:" + goodId); //商品信息bucket// 1.不在抢购期内直接失败Long begin = Long.parseLong(goodMap.get("beginTime"));Long end = Long.parseLong(goodMap.get("endTime"));if (LocalDateTime.ofInstant(Instant.ofEpochMilli(begin),ZoneOffset.of("+8")).isAfter(LocalDateTime.now()) || LocalDateTime.ofInstant(Instant.ofEpochMilli(end),ZoneOffset.of("+8")).isBefore(LocalDateTime.now())) {return Result.fail("不在秒杀时间中");}// 分布式锁:用商品ID作为锁的KeyRLock lock = redissonClient.getLock("lock:good:" + goodId);try{// 2.尝试拿redisson分布式锁boolean isLock = lock.tryLock(2, 10, TimeUnit.SECONDS);if (!isLock) { //重试后最终拿锁失败,返回错误信息return Result.fail("系统繁忙,请重试");}// 3.拿锁成功,开始尝试扣减库存if (Long.parseLong(goodMap.get("stoke")) <= 0){return Result.fail("没有库存");}goodMap.addAndGet("stoke", -1); //扣减库存// 4.库存扣减成功,创建临时订单(这一步也可以异步完成)Long orderId = RedisIdUtil.getId("order"); //分布式ID作为订单idRSet<String> orderIdSet = redissonClient.getSet("order:user:" + userId); //订单编号bucketorderIdSet.add(String.valueOf(orderId));OrderDto orderDto = OrderDto.builder().orderId(orderId).goodId(goodId).userId(userId).createTime(LocalDateTime.now()).build();RBucket<String> orderValue = redissonClient.getBucket("order:"+orderId); //订单信息bucketorderValue.set(JSON.toJSONString(orderDto));return Result.ok(orderId);}finally{if(isLock){// 5.释放锁lock.unlock();}} }
}
分布式队列
-
特点
- 消息持久化
- 封装了阻塞功能
- 封装了自动轮询功能
- 任务队列会自动同步到其他节点
消息队列
-
原理:redisson提供了
RQueue
普通队列,使用List 类型(通过LPUSH
/RPOP
或RPUSH
/LPOP
)存储队列元素 -
常用API
redissonClient.getQueue("myQueue")
:获取/创建消息队列RQueue.offer(E e)
:向队列尾部添加元素(非阻塞)RQueue.poll()
:从队列头部获取并移除元素(非阻塞)RQueue.poll(int var1)
:批量获取元素(非阻塞)
@Service
public class QueueProducer { //生产者@Autowiredprivate RedissonClient redissonClient;public void sendTask(String task) {RQueue<String> queue = redisson.getQueue("taskQueue");queue.offer(task);System.out.println("Sent task: " + task);}
}
@Component
public class OrderQueueConsumer { //消费者private final RedissonClient redissonClient;private final ThreadPoolTaskExecutor consumerPool;private final RQueue<String> taskQueue;@Autowiredpublic OrderQueueConsumer(RedissonClient redissonClient, @Qualifier("consumerPool") ThreadPoolTaskExecutor consumerPool) {this.redissonClient = redissonClient;this.consumerPool = consumerPool;taskQueue = redissonClient.getQueue("taskQueue");}@PostConstructpublic void init() {consumerPool.execute(this::consumeTasks);}public void consumeTasks() {while (true) {try {String task = taskQueue.poll(5, TimeUnit.SECONDS);if (task != null) {System.out.println("Processing task: " + task);// 模拟处理时间Thread.sleep(1000);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
阻塞队列
-
原理:redisson提供了
RBlockingQueue
阻塞队列- 使用 Redis 的 List 类型(通过
LPUSH
/RPOP
或RPUSH
/LPOP
命令)存储队列元素 - 当队列为空时,消费者可通过 Redis 的 Pub/Sub 机制监听队列变化,避免频繁轮询
- 使用 Redis 的 List 类型(通过
-
常用API
RedissonClient.getBlockingQueue("taskQueue")
:获取/创建阻塞队列RBlockingQueue.add(E e)
:任务入队(阻塞)RBlockingQueue.take()
:获取任务(阻塞)
@Service
public class TaskProducer { //生产者@Autowiredprivate final RedissonClient redissonClient;public void submitTask(String task) {RBlockingQueue<String> taskQueue = redissonClient.getBlockingQueue("taskQueue");taskQueue.add(task); // 非阻塞入队System.out.println("Submitted task: " + task);}
}
@Service
public class TaskConsumer { //消费者private final RedissonClient redissonClient;private final ThreadPoolTaskExecutor consumerPool;private final RBlockingQueue<String> taskQueue;@Autowiredpublic OrderQueueConsumer(StringRedisTemplate redisTemplate, @Qualifier("consumerPool") ThreadPoolTaskExecutor consumerPool) {this.redisTemplate = redisTemplate;this.consumerPool = consumerPool;consumerPool.execute(this::consumeTasks);}public void consumeTasks() {while (true) {try {String task = taskQueue.take(); // 阻塞获取任务if (task != null) { // 处理任务System.out.println("Processing task: " + task);// 模拟处理时间Thread.sleep(1000);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
延迟队列
-
原理: redisson提供了
RDelayedQueue
,自动将延迟任务保存到RBlockingQueue
- 延迟消息存储:将消息和延迟时间戳存入
Sorted Set
,键为delay_queue:{targetQueueName}
- 定时转移:后台线程(
TransferService
)每秒扫描一次Sorted Set
,将到期消息转移到目标队列(RBlockingQueue
) - 消费者获取:消费者从阻塞队列中消费消息
- 延迟消息存储:将消息和延迟时间戳存入
-
常用API
redissonClient.getDelayedQueue(RBlockingQueue blockingQueue)
:获取/创建延迟队列RDelayedQueue.offer("message", 10, TimeUnit.SECONDS)
:添加延迟任务RDelayedQueue.destroy()
:销毁延迟队列
@Service
public class TemporaryOrdersDelayed {private RedissonClient redissonClient;// 延迟队列和阻塞队列作为成员变量(避免重复创建)private final RBlockingQueue<String> blockingQueue;private final RDelayedQueue<String> delayedQueue;public TemporaryOrdersDelayed(RedissonClient redissonClient) {this.redissonClient = redissonClient;// 初始化队列(确保只创建一次)this.blockingQueue = redissonClient.getBlockingQueue("myDelayedQueue");this.delayedQueue = redissonClient.getDelayedQueue(blockingQueue);}/*** 添加延迟任务* @param orderId 订单ID* @param userId 用户ID* @param delaySeconds 延迟时长(秒)*/public void addDelayedTask(Long orderId, Long userId, long delaySeconds) {// 构造消息内容(建议用JSON或序列化对象)String taskData = String.format("%d:%d", orderId, userId);// 添加到延迟队列delayedQueue.offer(taskData, delaySeconds, TimeUnit.SECONDS);}// 必须提供销毁方法,防止内存泄漏!@PreDestroy //@PreDestroy表示在 Spring Bean 销毁之前被自动调用public void destroy() {if (delayedQueue !=) {// 1. 尝试处理队列中已到期但未转移的任务(非阻塞)Collection<String> readyTasks = delayedQueue.pollAll(); // 获取所有到期任务for (String task : readyTasks) {try {blockingQueue.put(task); // 手动转移到目标队列(让消费者处理)} catch (InterruptedException e) {Thread.currentThread().interrupt();}}// 2. 销毁延迟队列(停止后台线程)delayedQueue.destroy();}}
}
分布式计数器
- 目的:原理和JUC计数器类似,但
RCountDownLatch
可以在分布式环境下,实现任务协调
// 分布式环境:服务A等待服务B和服务C的任务
RCountDownLatch latch = redisson.getCountDownLatch("distributed_task");
latch.trySetCount(2); // 初始化计数器为2// 服务B(独立JVM)
new Thread(() -> {System.out.println("服务B任务完成");latch.countDown();
}).start();// 服务C(独立JVM)
new Thread(() -> {System.out.println("服务C任务完成");latch.countDown();
}).start();// 服务A(主线程)
latch.await(); // 阻塞直到服务B和服务C都调用countDown()
System.out.println("所有分布式任务完成");