当前位置: 首页 > ai >正文

Redis——Redisson篇

Redisson

特性Jedis/LettuceRedisson
分布式锁需手动实现(如 SETNX + Lua✅ 内置 RLock,支持可重入、公平锁
高可用依赖 Redis 哨兵/集群,但客户端需手动处理故障转移✅ 自动监听 Redis 节点变化,支持哨兵/集群自动切换
分布式集合仅支持基础 Redis 命令✅ 提供 RMapRListRSet 等高级 API
发布/订阅基础 PUB/SUB 命令✅ 提供 RTopic,支持消息确认、持久化
性能优化需手动优化(如连接池、管道)✅ 内置连接池、异步/响应式编程支持
Spring 集成需手动配置 RedisTemplate✅ 直接集成 Spring Cache、Spring Data Redis

分布式集合

  • 特点:分布式部署redis时,对数据的操作会自动同步到其他节点上
  1. RBucket(分布式 String

    • set(V value):设置值
    • set(V value, long time, TimeUnit unit):设置有效期
    • setIfAbsent(V value):setnx
    • get():获取值
    • delete():删除值
    RBucket<String> bucket = redissonClient.getBucket("myKey"); //不要忘记设置泛型
    bucket.set("Hello Redisson"); // 设置值
    String value = bucket.get();   // 获取值
    bucket.delete();              // 删除键
    
  2. RMap(分布式 Hash

    • put(K key, V value):添加键值对,覆盖已有键
    • put(K key, V value, long time, TimeUnit unit):设置键值对及过期时间
    • putIfAbsent(K key, V value):原子性添加,仅当键不存在时生效
    • get(Object key):获取键对应的值
    • remove(Object key):删除键值对
    • addAndGet(key, delta):自增
    RMap<String, String> map = redissonClient.getMap("userMap");
    map.put("user1", "Alice");
    String value = map.get("user1");
    map.expire(10, TimeUnit.SECONDS); // 设置10秒过期时间
    
  3. RList(分布式 List

    • add(E element):在列表末尾添加元素
    • get(int index):根据索引获取元素
    • remove(int index):删除指定索引的元素
    RList<String> list = redissonClient.getList("myList");
    list.add("item1");          // 尾部插入
    list.addFirst("item0");     // 头部插入
    String firstItem = list.get(0); // 通过索引访问
    
  4. RSet(分布式 Set

    • add(E element):添加元素(自动去重)
    • contains(Object o):判断元素是否存在
    • remove(Object o):删除元素
    RSet<String> set = redissonClient.getSet("mySet");
    set.add("user1");
    set.add("user2");
    boolean contains = set.contains("user1"); // 检查元素是否存在
    

分布式锁

  • setnx的分布式锁存在的问题

    1. 不可重入:需要加锁前先检查
    2. 无重试机制:需要手动实现
    3. 超时释放锁可能导致误删:需要释放锁前先检查
    4. 集群架构下,节点宕机可能导致其他节点未同步分布式锁:需要使用Redlock算法
  • redisson分布式锁特点

    1. redisson提供可重入机制:使用Hash存储锁信息,键为锁名称,字段为UUID+线程ID的组合,值为重入次数
    2. redisson提供可自旋机制:调用tryLock()时,若锁被占用,线程会循环尝试获取,每次尝试间隔可通过参数配置
    3. redisson提供了看门狗机制:封装LUA脚本检查锁,避免了锁误删的问题
    4. redisson提供了mutiLock机制:使用Redlock算法,仅当超过半数节点成功获取锁时,才认为加锁成功
  public class RedissonConfigTest extends BaseTest{@Autowiredprivate RedissonClient redissonClient;@Testpublic void testRedisson() throws Exception{try{// 分布式锁必须声明锁名称以区分锁RLock lock = redissonClient.getLock("myLock"); /* 尝试获取锁,入参说明* tryLock(long waitTime, TimeUnit unit):waitTime+unit指定获取锁的最大等待时间、默认锁不过期* tryLock(long waitTime, long leaseTime, TimeUnit unit):leaseTime指定锁过期时间* tryLock():不等待,锁不过期*/boolean isLock = lock.tryLock(2, 10, TimeUnit.SECONDS);if (isLock){// TODO}}catch (InterruptedException e) {throw new RuntimeException(e);}finally{if (isLock){// 释放锁lock.unlock();}}}}
  • 举例:高并发抢购商品,使用redisson分布式锁防止超卖

    1. 较复杂的逻辑下,LUA脚本可能无法一次性完成,此时还是得使用分布式锁
    2. 用户抢购时没有拿到锁会自动重试,而不是返回错误
    3. 极端情况下,业务发送阻塞导致分布式锁自动释放,仍然有超卖的风险,可以加LUA脚本辅助
@Service
public class OrderServiceImpl implements IOrderService {@Autowiredprivate RedissonClient redissonClient;/** 抢购商品* @param goodId 商品id* @return 抢购结果*/@Overridepublic Result panicBuy(Long goodId) {Long userId = ThreadLocalDto.threadLocal.get().getUserId();RMap<String, String> goodMap = redissonClient.getMap("good:" + goodId); //商品信息bucket// 1.不在抢购期内直接失败Long begin = Long.parseLong(goodMap.get("beginTime"));Long end = Long.parseLong(goodMap.get("endTime"));if (LocalDateTime.ofInstant(Instant.ofEpochMilli(begin),ZoneOffset.of("+8")).isAfter(LocalDateTime.now()) || LocalDateTime.ofInstant(Instant.ofEpochMilli(end),ZoneOffset.of("+8")).isBefore(LocalDateTime.now())) {return Result.fail("不在秒杀时间中");}// 分布式锁:用商品ID作为锁的KeyRLock lock = redissonClient.getLock("lock:good:" + goodId);try{// 2.尝试拿redisson分布式锁boolean isLock = lock.tryLock(2, 10, TimeUnit.SECONDS);if (!isLock) { //重试后最终拿锁失败,返回错误信息return Result.fail("系统繁忙,请重试");}// 3.拿锁成功,开始尝试扣减库存if (Long.parseLong(goodMap.get("stoke")) <= 0){return Result.fail("没有库存");}goodMap.addAndGet("stoke", -1); //扣减库存// 4.库存扣减成功,创建临时订单(这一步也可以异步完成)Long orderId = RedisIdUtil.getId("order"); //分布式ID作为订单idRSet<String> orderIdSet = redissonClient.getSet("order:user:" + userId); //订单编号bucketorderIdSet.add(String.valueOf(orderId));OrderDto orderDto = OrderDto.builder().orderId(orderId).goodId(goodId).userId(userId).createTime(LocalDateTime.now()).build();RBucket<String> orderValue = redissonClient.getBucket("order:"+orderId); //订单信息bucketorderValue.set(JSON.toJSONString(orderDto));return Result.ok(orderId);}finally{if(isLock){// 5.释放锁lock.unlock();}}    }
}

分布式队列

  • 特点

    1. 消息持久化
    2. 封装了阻塞功能
    3. 封装了自动轮询功能
    4. 任务队列会自动同步到其他节点

消息队列

  • 原理:redisson提供了RQueue普通队列,使用List 类型(通过 LPUSH/RPOPRPUSH/LPOP )存储队列元素

  • 常用API

    1. redissonClient.getQueue("myQueue"):获取/创建消息队列
    2. RQueue.offer(E e):向队列尾部添加元素(非阻塞)
    3. RQueue.poll():从队列头部获取并移除元素(非阻塞)
    4. RQueue.poll(int var1):批量获取元素(非阻塞)
@Service
public class QueueProducer { //生产者@Autowiredprivate RedissonClient redissonClient;public void sendTask(String task) {RQueue<String> queue = redisson.getQueue("taskQueue");queue.offer(task);System.out.println("Sent task: " + task);}
}
@Component
public class OrderQueueConsumer { //消费者private final RedissonClient redissonClient;private final ThreadPoolTaskExecutor consumerPool;private final RQueue<String> taskQueue;@Autowiredpublic OrderQueueConsumer(RedissonClient redissonClient, @Qualifier("consumerPool") ThreadPoolTaskExecutor consumerPool) {this.redissonClient = redissonClient;this.consumerPool = consumerPool;taskQueue = redissonClient.getQueue("taskQueue");}@PostConstructpublic void init() {consumerPool.execute(this::consumeTasks);}public void consumeTasks() {while (true) {try {String task = taskQueue.poll(5, TimeUnit.SECONDS);if (task != null) {System.out.println("Processing task: " + task);// 模拟处理时间Thread.sleep(1000);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}

阻塞队列

  • 原理:redisson提供了RBlockingQueue阻塞队列

    1. 使用 Redis 的 List 类型(通过 LPUSH/RPOPRPUSH/LPOP 命令)存储队列元素
    2. 当队列为空时,消费者可通过 Redis 的 Pub/Sub 机制监听队列变化,避免频繁轮询
  • 常用API

    1. RedissonClient.getBlockingQueue("taskQueue"):获取/创建阻塞队列
    2. RBlockingQueue.add(E e):任务入队(阻塞)
    3. RBlockingQueue.take():获取任务(阻塞)
@Service
public class TaskProducer { //生产者@Autowiredprivate final RedissonClient redissonClient;public void submitTask(String task) {RBlockingQueue<String> taskQueue = redissonClient.getBlockingQueue("taskQueue");taskQueue.add(task); // 非阻塞入队System.out.println("Submitted task: " + task);}
}
@Service
public class TaskConsumer { //消费者private final RedissonClient redissonClient;private final ThreadPoolTaskExecutor consumerPool;private final RBlockingQueue<String> taskQueue;@Autowiredpublic OrderQueueConsumer(StringRedisTemplate redisTemplate, @Qualifier("consumerPool") ThreadPoolTaskExecutor consumerPool) {this.redisTemplate = redisTemplate;this.consumerPool = consumerPool;consumerPool.execute(this::consumeTasks);}public void consumeTasks() {while (true) {try {String task = taskQueue.take(); // 阻塞获取任务if (task != null) { // 处理任务System.out.println("Processing task: " + task);// 模拟处理时间Thread.sleep(1000);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}

延迟队列

  • 原理: redisson提供了RDelayedQueue,自动将延迟任务保存到RBlockingQueue

    1. 延迟消息存储:将消息和延迟时间戳存入 Sorted Set,键为 delay_queue:{targetQueueName}
    2. 定时转移:后台线程(TransferService)每秒扫描一次 Sorted Set,将到期消息转移到目标队列(RBlockingQueue
    3. 消费者获取:消费者从阻塞队列中消费消息
  • 常用API

    1. redissonClient.getDelayedQueue(RBlockingQueue blockingQueue):获取/创建延迟队列
    2. RDelayedQueue.offer("message", 10, TimeUnit.SECONDS):添加延迟任务
    3. RDelayedQueue.destroy():销毁延迟队列
@Service
public class TemporaryOrdersDelayed {private RedissonClient redissonClient;// 延迟队列和阻塞队列作为成员变量(避免重复创建)private final RBlockingQueue<String> blockingQueue;private final RDelayedQueue<String> delayedQueue;public TemporaryOrdersDelayed(RedissonClient redissonClient) {this.redissonClient = redissonClient;// 初始化队列(确保只创建一次)this.blockingQueue = redissonClient.getBlockingQueue("myDelayedQueue");this.delayedQueue = redissonClient.getDelayedQueue(blockingQueue);}/*** 添加延迟任务* @param orderId 订单ID* @param userId 用户ID* @param delaySeconds 延迟时长(秒)*/public void addDelayedTask(Long orderId, Long userId, long delaySeconds) {// 构造消息内容(建议用JSON或序列化对象)String taskData = String.format("%d:%d", orderId, userId);// 添加到延迟队列delayedQueue.offer(taskData, delaySeconds, TimeUnit.SECONDS);}// 必须提供销毁方法,防止内存泄漏!@PreDestroy //@PreDestroy表示在 Spring Bean 销毁之前被自动调用public void destroy() {if (delayedQueue !=) {// 1. 尝试处理队列中已到期但未转移的任务(非阻塞)Collection<String> readyTasks = delayedQueue.pollAll(); // 获取所有到期任务for (String task : readyTasks) {try {blockingQueue.put(task); // 手动转移到目标队列(让消费者处理)} catch (InterruptedException e) {Thread.currentThread().interrupt();}}// 2. 销毁延迟队列(停止后台线程)delayedQueue.destroy();}}
}

分布式计数器

  • 目的:原理和JUC计数器类似,但RCountDownLatch可以在分布式环境下,实现任务协调
// 分布式环境:服务A等待服务B和服务C的任务
RCountDownLatch latch = redisson.getCountDownLatch("distributed_task");
latch.trySetCount(2); // 初始化计数器为2// 服务B(独立JVM)
new Thread(() -> {System.out.println("服务B任务完成");latch.countDown();
}).start();// 服务C(独立JVM)
new Thread(() -> {System.out.println("服务C任务完成");latch.countDown();
}).start();// 服务A(主线程)
latch.await(); // 阻塞直到服务B和服务C都调用countDown()
System.out.println("所有分布式任务完成");
http://www.xdnf.cn/news/18048.html

相关文章:

  • Oracle algorithm的含义
  • 【Unity3D实例-功能-拔枪】角色拔枪(二)分割上身和下身
  • 【前端面试题】JavaScript核心面试题解析
  • 计算机网络---跳板机与堡垒机
  • Pytorch模型复现笔记-VGG讲解+架构搭建(可直接copy运行)+冒烟测试
  • 三维重建-动手学计算机视觉19(完结)
  • openEuler等Linux系统中如何复制移动硬盘的数据
  • 豆包 Java的23种设计模式
  • 力扣3:无重复字符的最长子串
  • 【LeetCode题解】LeetCode 33. 搜索旋转排序数组
  • Java研学-SpringCloud(二)
  • 从零到一:打包并发布你的第一个MCP AI工具服务
  • DNS总结
  • 从CVPR到NeurIPS,可变形卷积+可变形空间注意力如何斩获最佳论文
  • python+flask后端开发~项目实战 | 博客问答项目--模块化文件架构的基础搭建
  • 灰色预测模型
  • matlab tlc的文件、字符串操作
  • 【力扣热题100】双指针—— 接雨水
  • redis和cdn的相似性和区别
  • Android中切换语言的方法
  • Perf使用详解
  • 黑马商城day08-Elasticsearch作业(个人记录、仅供参考、详细图解)
  • 解决 SECURE_PCI_CONFIG_SPACE_ACCESS_VIOLATION蓝屏报错
  • 大模型提示词(Prompt)终极指南:从原理到实战,让AI输出质量提升300%
  • 为什么TCP连接是三次握手?不是四次两次?
  • ruoyi-vue(十一)——代码生成
  • ansible管理变量和事实
  • Chrome插件开发实战:todoList 插件
  • 影刀初级B级考试大题2
  • Java ArraysParallelSortHelpers 并行排序