当前位置: 首页 > news >正文

【Redisson 加锁源码解析】

Redisson 源码解析 —— 分布式锁实现过程

在分布式系统中,分布式锁 是非常常见的需求,用来保证多个节点之间的互斥操作。Redisson 是 Redis 的一个 Java 客户端,它提供了对分布式锁的良好封装。本文将从源码角度剖析 Redisson 的分布式锁实现过程。


一、分布式锁的基本需求

一个健壮的分布式锁需要满足以下条件:

  1. 互斥性:同一时间只能有一个客户端持有锁。
  2. 死锁避免:客户端宕机后,锁不会永久被占用。
  3. 可重入性:同一线程可多次获取同一把锁。
  4. 高可用性:在 Redis 集群模式下仍能正常工作。
  5. 超时释放:设置持有锁时间,时间超过锁释放,避免死锁。
  6. 锁时间续约:看门狗机制,避免业务未执行完毕锁释放,导致并发问题。

二、Redisson 分布式锁的核心实现类以及加锁方法

在源码中,Redisson 提供了多种锁的实现,最核心的是:

  • RedissonLock —— 基于 Redis 的可重入锁实现
  • RedissonReadWriteLock —— 读写锁
  • RedissonFairLock —— 公平锁

我们主要关注 RedissonLock 的实现。


 RLock lock = redissonClient.getLock("32r");lock.方法名()

常用加锁方法:
在这里插入图片描述

  1. lock():获取锁,获取不到会一致阻塞直到获取。通过看门狗机制续期,默认持有锁是30s,每隔10s续期一次。
  2. lock(long l, TimeUnit timeUnit):获取锁,获取不到会一致阻塞直到获取。持有锁时间是手动入参的timeUnit,到期释放锁。
  3. tryLock(long waite, long l1, TimeUnit timeUnit) :获取锁失败后,自旋,等待 waite 秒,获取不到返回false,获取到,持有锁时间是 l1,单位 timeUnit。
  4. tryLock():尝试获取一次锁,如果获取不到,立即返回 false,获取锁成功,触发 看门狗续期机制(和 lock() 一样)。
  5. tryLock(long waitTime, TimeUnit unit):在 waitTime 时间窗口内,不断尝试执行,范围内获取锁失败,返回false。获取成功,启动看门狗机制。
 RLock lock = redissonClient.getLock("32r");

我们可以看到 redissonClient 调用这个方法时候,客户端返回的是RedissonLock这个类
在这里插入图片描述

所以对应的我们主要关注 RedissonLock 子类和父类RedissonBaseLock
在这里插入图片描述

这里我主要分析 lock() 方法的调用,其他锁的逻辑都是参考这个去完善的。

三、加锁流程解析

1. 调用入口

当我们执行:

RLock lock = redisson.getLock("myLock");
lock.lock();

进入RedissonLock#lock方法:
在这里插入图片描述
可以看到调用lock方法其实都是调用的另外一个lock(long leaseTime, TimeUnit unit, boolean interruptibly) 方法。
对应真正调用的lock()方法:

/*** 获取分布式锁的核心方法* @param leaseTime 锁的租约时间* @param unit 时间单位* @param interruptibly 是否允许中断* @throws InterruptedException 当线程被中断时抛出*/
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {// 获取当前线程ID,用于标识锁的持有者long threadId = Thread.currentThread().getId();// 尝试获取锁,返回剩余的TTL(生存时间)// 如果返回null表示获取锁成功,否则返回锁的剩余过期时间Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);// 如果ttl不为null,说明锁获取失败,需要等待if (ttl != null) {// 订阅锁释放的通知,返回一个Future对象CompletableFuture<RedissonLockEntry> future = this.subscribe(threadId);// 设置订阅操作的超时时间this.pubSub.timeout(future);// 根据是否允许中断来获取订阅结果RedissonLockEntry entry;if (interruptibly) {// 允许中断的方式获取结果entry = (RedissonLockEntry)this.commandExecutor.getInterrupted(future);} else {// 不允许中断的方式获取结果entry = (RedissonLockEntry)this.commandExecutor.get(future);}try {// 自旋等待锁释放while(true) {// 再次尝试获取锁ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);// 如果获取锁成功(ttl为null),则退出循环if (ttl == null) {return;}// 如果ttl大于等于0,说明锁还存在,需要等待指定的时间if (ttl >= 0L) {try {// 使用信号量等待指定的ttl时间entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {// 如果允许中断,直接抛出异常if (interruptibly) {throw e;}// 如果不允许中断,继续等待entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// 如果ttl小于0,表示需要无限等待if (interruptibly) {// 允许中断的无限等待entry.getLatch().acquire();} else {// 不允许中断的无限等待entry.getLatch().acquireUninterruptibly();}}}} finally {// 无论成功与否,都要取消订阅,释放资源this.unsubscribe(entry, threadId);}}
}

这时候我们只需要重点关注对应的this.tryAcquire(-1L, leaseTime, unit, threadId);这个方法。
源码图如下:
在这里插入图片描述
对应的Java代码解释:

/*** 异步尝试获取锁* @param waitTime 等待时间* @param leaseTime 锁的租约时间* @param unit 时间单位* @param threadId 线程ID* @return 返回锁的剩余TTL时间,null表示获取锁成功*/
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 声明TTL剩余时间的Future对象RFuture<Long> ttlRemainingFuture;// 判断是否指定了租约时间if (leaseTime > 0L) {// 使用指定的租约时间尝试获取锁ttlRemainingFuture = this.<Long>tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 使用默认的内部锁租约时间尝试获取锁ttlRemainingFuture = this.<Long>tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 对获取锁的结果进行后续处理CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {// 如果ttlRemaining为null,说明成功获取到锁if (ttlRemaining == null) {// 判断是否指定了租约时间if (leaseTime > 0L) {// 将指定的租约时间转换为毫秒并存储到内部锁租约时间this.internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 如果没有指定租约时间,启动锁的自动续期机制// 防止锁因过期而被误释放this.scheduleExpirationRenewal(threadId);}}// 返回TTL剩余时间(null表示获取锁成功,非null表示需要等待的时间)return ttlRemaining;});// 将CompletionStage包装成RFuture并返回return new CompletableFutureWrapper(f);
}

这里最重要的是调用对应的tryAcquire里面的tryLockInnerAsync方法,方法详解如下:

 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) thenredis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});}

这个tryLockInnerAsync方法主要是执行对应的脚本,然后返回剩余的时间,如果获取锁成功返回 nil ,获取锁失败会返回 持有锁的锁过期时间

核心 Lua 脚本详解如下:

Redisson 并不是简单地 SETNX,而是使用 Lua 脚本 来保证操作的原子性
加锁脚本大致逻辑如下:

if (redis.call('exists', KEYS[1]) == 0) then-- 锁不存在,设置锁并绑定到线程redis.call('hset', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;-- 锁已存在,判断是否是当前线程重入
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) thenredis.call('hincrby', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;return redis.call('pttl', KEYS[1]);

解释:

  • KEYS[1]: 锁的 key (如 myLock)
  • ARGV[1]: 锁的过期时间(默认 30s)
  • ARGV[2]: 当前线程标识(由 UUID + 线程 ID 组成)

执行流程:

  1. 如果锁不存在,设置 hash,key = 线程标识,value = 1。
  2. 如果锁存在且是自己线程,则递增重入次数。
  3. 否则返回锁的剩余过期时间。

问题延伸:
Redis不是单线程吗,高并发线程下不是线程安全吗?为什么还需要使用Lua脚本保证原子性
想想为什么使用lua脚本,你可以想象一下高并发场景下,Redis执行命令是单线程的,Redis只能保证对应的单条命令是原子性的,不能保证多条命令的原子性,假设线程A执行:redis.call('exists', KEYS[1]) == 0结束后,线程B抢到执行权,然后线程B也执行:redis.call('exists', KEYS[1]) == 0,然后后续大家都会进行对应的锁设置,导致线程A上锁可能会被覆盖,不过可以用hsetnx解决,但是后续可能判断还是会有并发问题。使用 lua 脚本可以将多条命令整合成类似一条命令,redis执行,从而保证原子性

WatchDog 自动续期机制

Redisson 的一大亮点是 锁续期机制

  • 当线程获取锁后,会启动一个 看门狗定时任务,默认每隔 lockWatchdogTimeout / 3 秒续期一次(默认 30s → 10s)。
  • 如果业务逻辑执行很久,不用担心锁被提前释放。
  • 如果线程宕机,定时任务不再执行,锁会在超时后自动释放。

判断对应的leasetime有没有指定,然后执行对应的续期或不续期的方法
源码关键点在:scheduleExpirationRenewal() 方法。
关键代码

   CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {if (ttlRemaining == null) {if (leaseTime > 0L) {this.internalLockLeaseTime = unit.toMillis(leaseTime);} else {this.scheduleExpirationRenewal(threadId);}}return ttlRemaining;});

根据对应的没指定leaseTime ,然后执行对应的RedissonBaseLock#scheduleExpirationRenewal对应的方法逻辑如下:

  /*** 调度锁的过期时间续期任务* 为指定线程启动自动续期机制,防止锁因过期而被误释放* @param threadId 需要续期的线程ID*/
protected void scheduleExpirationRenewal(long threadId) {// 创建新的过期时间管理条目ExpirationEntry entry = new ExpirationEntry();// 尝试将新条目放入续期映射表中,如果已存在则返回旧条目// 使用putIfAbsent确保原子性操作,避免并发问题ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);// 判断是否已经存在续期任务if (oldEntry != null) {// 如果已存在续期任务,只需将当前线程ID添加到现有条目中// 这种情况发生在同一个锁被多个线程(可重入锁)或同一线程多次获取时oldEntry.addThreadId(threadId);} else {// 如果是首次为这个锁创建续期任务// 将当前线程ID添加到新创建的条目中entry.addThreadId(threadId);try {// 启动实际的续期任务// 这会创建定时任务,定期延长锁的过期时间this.renewExpiration();} finally {// 检查当前线程是否被中断if (Thread.currentThread().isInterrupted()) {// 如果线程被中断,取消刚刚启动的续期任务// 防止资源泄漏和无效的续期操作this.cancelExpirationRenewal(threadId);}}}
}

这个通过一个创建一个ExpirationEntry 然后通过EXPIRATION_RENEWAL_MAP判断是否存在,如果条目不存在就启动对应的自动续期机制任务 renewExpiration()

RedissonBaseLock#renewExpiration()方法如下:

/*** 启动锁的自动续期机制* 创建定时任务,定期延长锁的过期时间,防止锁因超时而被释放*/
private void renewExpiration() {// 从续期映射表中获取当前锁的过期时间管理条目ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());// 如果条目存在,说明需要为这个锁设置续期任务if (ee != null) {// 创建定时任务,在锁租约时间的1/3处执行续期操作// 选择1/3时间点是为了在锁过期前有足够的时间进行续期Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {// 定时任务执行时,重新获取续期条目(防止在延迟期间被移除)ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());// 双重检查:确保续期条目仍然存在if (ent != null) {// 获取需要续期的第一个线程ID// 对于可重入锁,可能有多个线程ID,取第一个进行续期Long threadId = ent.getFirstThreadId();// 如果线程ID有效,执行续期操作if (threadId != null) {// 异步执行锁的续期操作CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);// 处理续期结果future.whenComplete((res, e) -> {// 如果续期过程中发生异常if (e != null) {// 记录错误日志RedissonBaseLock.log.error("Can't update lock " + RedissonBaseLock.this.getRawName() + " expiration", e);// 从续期映射表中移除条目,停止续期RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());} else {// 续期操作成功完成if (res) {// 如果续期成功(返回true),递归调用继续下一轮续期// 这样就形成了持续的自动续期循环RedissonBaseLock.this.renewExpiration();} else {// 如果续期失败(返回false),说明锁已经不存在或不属于当前线程// 取消续期任务,清理资源RedissonBaseLock.this.cancelExpirationRenewal((Long)null);}}});}}}}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 在租约时间的1/3处执行续期// 将定时任务保存到条目中,用于后续的取消操作ee.setTimeout(task);}
}

最后完美结束对应的获取锁的过程,返回一个对应的时间值 ttl
在这里插入图片描述
如果返回的是null代表加锁成功,否则是加锁失败,此时会进行订阅持有锁者this.subscribe(threadId),如果释放锁会通知这个获取锁失败的线程,会将这个线程唤醒。

四、解锁流程解析

解锁的流程

解锁时同样使用 Lua 脚本,保证原子性:

if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) thenreturn nil;
end;local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1);if (counter > 0) thenreturn 0;
elseredis.call('del', KEYS[1]);return 1;
end;

解释:

  1. 检查当前线程是否持有锁。
  2. 如果是可重入锁,计数 -1。
  3. 如果计数为 0,则删除锁。

六、源码设计亮点

  1. Lua 脚本保证原子性,避免分布式并发问题。
  2. 可重入性设计:使用 hash 结构存储线程标识和重入次数。
  3. 锁超时释放设计:避免死锁问题。
  4. 看门狗机制:保证长时间任务也能安全持有锁。
  5. 异步化设计:Redisson 提供 lockAsync() 等方法,方便高并发场景。

七、总结

  • Redisson 的分布式锁实现基于 Redis + Lua 脚本,解决了互斥、可重入和死锁问题。
  • 看门狗续期机制 是 Redisson 的亮点,保证了业务执行时间不可预测的情况下的安全性。
  • 在生产环境中,Redisson 的分布式锁相较于 SETNX + EXPIRE 的手写版本,更加健壮和可靠。
http://www.xdnf.cn/news/1399249.html

相关文章:

  • VuePress添加自定义组件
  • 【MySQL数据库】索引 - 结构 学习记录
  • 加速智能经济发展:如何助力“人工智能+”战略在实时视频领域的落地
  • Swift 解法详解:LeetCode 367《有效的完全平方数》
  • Kafka入门
  • 开源 C++ QT Widget 开发(八)网络--Http文件下载
  • 《微服务架构从故障频发到自愈可控的实战突围方案》
  • CSDN博客语法(不常用但有用)
  • 谷歌 “Nano Banana“ 深度解析:AI 图像的未来是精准编辑,而非从零生成
  • ⚡ Linux find 命令参数详解
  • MySQL基础理解入门
  • 嵌入式硬件电路分析---AD采集电路
  • Spring Boot 自动配置原理深度解析:从启动流程到监听机制
  • 【Java EE进阶 --- SpringBoot】Spring Web MVC(Spring MVC)(二)
  • 设计模式之代理模式!
  • 深度学习基础:前馈网络、反向传播与梯度问题
  • 基于IEC61499开放自动化PLC数据储存方案
  • 在 WSL2-NVIDIA-Workbench 中安装Anaconda、CUDA 13.0、cuDNN 9.12 及 PyTorch(含完整环境验证)
  • 第 8 篇:量化交易之tradeUI和webserverUI 区别?
  • 系统分析师考试大纲新旧版本深度分析与备考策略
  • 捡捡java——2、基础07
  • 开发指南136-设置零值不显示
  • vue中的与,或,非
  • Ansible 核心运维场景落地:YUM 仓库、SSH 公钥、固定 IP 配置技巧
  • [Windows] 剪映国际版CapCut 6.7.0 视频编辑处理,免费使用素材和滤镜
  • 拼团小程序源码分享拼团余额提现小程序定制教程开发源码二开
  • LeetCode 136. 只出现一次的数字
  • [论文阅读] 人工智能 + 软件工程 | 从“法律条文”到“Gherkin脚本”:Claude与Llama谁更懂合规开发?
  • 普蓝自研AutoTrack-4X导航套件平台适配高校机器人实操应用
  • k8s(自写)