当前位置: 首页 > web >正文

聊聊redisson的lockWatchdogTimeout

本文主要研究一下redisson的lockWatchdogTimeout

lockWatchdogTimeout

redisson/src/main/java/org/redisson/config/Config.java

private long lockWatchdogTimeout = 30 * 1000;/*** This parameter is only used if lock has been acquired without leaseTimeout parameter definition.* Lock expires after <code>lockWatchdogTimeout</code> if watchdog* didn't extend it to next <code>lockWatchdogTimeout</code> time interval.* <p>* This prevents against infinity locked locks due to Redisson client crush or* any other reason when lock can't be released in proper way.* <p>* Default is 30000 milliseconds** @param lockWatchdogTimeout timeout in milliseconds* @return config*/public Config setLockWatchdogTimeout(long lockWatchdogTimeout) {this.lockWatchdogTimeout = lockWatchdogTimeout;return this;}public long getLockWatchdogTimeout() {return lockWatchdogTimeout;}

Config定义了lockWatchdogTimeout属性,默认30s

tryAcquireOnceAsync

redisson/src/main/java/org/redisson/RedissonLock.java

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.internalLockLeaseTime = getServiceManager().getCfg().getLockWatchdogTimeout();this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {CompletionStage<Boolean> acquiredFuture;if (leaseTime > 0) {acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}acquiredFuture = handleNoSync(threadId, acquiredFuture);CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {// lock acquiredif (acquired) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return acquired;});return new CompletableFutureWrapper<>(f);}

tryAcquireOnceAsync对于leaseTime小于等于0的,使用默认的internalLockLeaseTime,并在获取到锁之后执行scheduleExpirationRenewal

tryLockInnerAsync

redisson/src/main/java/org/redisson/RedissonLock.java

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,"if ((redis.call('exists', KEYS[1]) == 0) " +"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}

这里使用pexpire命令,参数为leaseTime,获取到锁的返回nil,获取不到锁的通过pttl返回该锁的毫秒级的剩余存活时间

scheduleExpirationRenewal

redisson/src/main/java/org/redisson/RedissonBaseLock.java

    protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}}

scheduleExpirationRenewal对于刚放进EXPIRATION_RENEWAL_MAP的执行renewExpiration

renewExpiration

redisson/src/main/java/org/redisson/RedissonBaseLock.java

    private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = getServiceManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}CompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock {} expiration", getRawName(), e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// reschedule itselfrenewExpiration();} else {cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}

renewExpiration通过getServiceManager().newTimeout创建一个timerTask,delay为internalLockLeaseTime/3,该task执行renewExpirationAsync,若有异常则从EXPIRATION_RENEWAL_MAP移除,若续期成功则再次执行renewExpiration调度timerTask,续期不成功(锁不存在)则执行cancelExpirationRenewal

cancelExpirationRenewal

redisson/src/main/java/org/redisson/RedissonBaseLock.java

    protected void cancelExpirationRenewal(Long threadId) {ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task == null) {return;}if (threadId != null) {task.removeThreadId(threadId);}if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();if (timeout != null) {timeout.cancel();}EXPIRATION_RENEWAL_MAP.remove(getEntryName());}}

cancelExpirationRenewal则取出ExpirationEntry,移除指定的threadId,若没有其他threads的话再取出Timeout执行cancel,最后从EXPIRATION_RENEWAL_MAP移除

unlockAsync0

redisson/src/main/java/org/redisson/RedissonBaseLock.java

    private RFuture<Void> unlockAsync0(long threadId) {CompletionStage<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {cancelExpirationRenewal(threadId);if (e != null) {if (e instanceof CompletionException) {throw (CompletionException) e;}throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}

unlockAsync0会执行cancelExpirationRenewal去取消自动续期

小结

redisson提供了lockWatchdogTimeout参数,默认为30s,对于加锁时没有指定leaseTime的,会使用默认的lockWatchdogTimeout作为过期时间,并且获取到锁之后会启动定时任务scheduleExpirationRenewal去给锁续期。需要自动续期的,可以使用没有leaseTime参数的方法,或者leaseTime传-1。

在unlock、续期时发现锁不存在、renewExpiration的finally中发现Thread.currentThread().isInterrupted()这几种情况会执行cancelExpirationRenewal去取消自动续期,若续期时redis异常则直接从EXPIRATION_RENEWAL_MAP中移除间接取消自动续期

doc

  • locks-and-synchronizers
http://www.xdnf.cn/news/6442.html

相关文章:

  • AWS Elastic Beanstalk部署极简Spring工程(EB CLI失败版)
  • 基于OpenCV的人脸微笑检测实现
  • 乘法口诀练习神器
  • 富文本编辑器:链接功能
  • 基于 Python Requests + Pytest + Allure 构建接口自动化测试框架的最优实践
  • 编程日志5.8
  • 【测试】测试分类
  • WebRTC 通话原理:从协商到通信
  • Intellij报错:the file size(3.47M) exceeds configured limit (2.56MB)
  • websocket入门详解
  • 第28周——InceptionV1实现猴痘识别
  • 鸿蒙OSUniApp实现个性化的搜索框与搜索历史记录#三方框架 #Uniapp
  • STM32单片机内存分配详细讲解
  • Android Studio中Gradle 7.0上下项目配置及镜像修改
  • 游戏引擎学习第280天:精简化的流式实体sim
  • 毕设设计 | 管理系统图例
  • ET EntityRef EntityWeakRef 类分析
  • 基于EFISH-SCB-RK3576/SAIL-RK3576的消防机器人控制器技术方案‌
  • VSTO(C#)Excel开发进阶2:操作图片 改变大小 滚动到可视区
  • 产品更新丨谷云科技 iPaaS 集成平台 V7.5 版本发布
  • [特殊字符] 苍穹外卖项目中的 WebSocket 实战:实现来单与催单提醒功能
  • Parsec解决PnP连接失败的问题
  • 星巴克中国要卖在高点
  • sqli-labs靶场第七关——文件导出注入
  • ISP中拖影问题的处理
  • 嵌入式学习笔记DAY21(双向链表、Makefile)
  • C++11(2)
  • MySQL DBA数据运维管理经验分享:新手入门快速提升效率的新工具与技巧
  • 基于AH1101芯片的5V升18.6V LED恒流背光供电方案设计
  • 【免费分享】虚拟机VM(适用于 Windows)17.6.3