当前位置: 首页 > backend >正文

基于redis实现分布式锁方案实战

分布式锁的进阶实现与优化方案

作为Java高级开发工程师,我将为您提供更完善的Redis分布式锁实现方案,包含更多生产级考量。

1. 生产级Redis分布式锁实现

1.1 完整实现类(支持可重入、自动续约)

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class RedisDistributedLock {private final JedisPool jedisPool;private final ScheduledExecutorService scheduler;// 本地存储锁的持有计数(用于可重入)private final ThreadLocal<Map<String, LockEntry>> lockHoldCounts = ThreadLocal.withInitial(HashMap::new);// 锁续约的Future集合private final ConcurrentMap<String, ScheduledFuture<?>> renewalFutures = new ConcurrentHashMap<>();private static class LockEntry {final String requestId;final AtomicInteger holdCount;LockEntry(String requestId) {this.requestId = requestId;this.holdCount = new AtomicInteger(1);}}public RedisDistributedLock(JedisPool jedisPool) {this.jedisPool = jedisPool;this.scheduler = Executors.newScheduledThreadPool(4);}public boolean tryLock(String lockKey, String requestId, int expireTime) {return tryLock(lockKey, requestId, expireTime, 0, 0);}public boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime, long retryInterval) {long startTime = System.currentTimeMillis();try {// 检查是否已经持有锁(可重入)LockEntry entry = lockHoldCounts.get().get(lockKey);if (entry != null && entry.requestId.equals(requestId)) {entry.holdCount.incrementAndGet();return true;}// 尝试获取锁while (true) {if (acquireLock(lockKey, requestId, expireTime)) {// 获取成功,记录持有信息lockHoldCounts.get().put(lockKey, new LockEntry(requestId));// 启动自动续约scheduleRenewal(lockKey, requestId, expireTime);return true;}// 检查是否超时if (maxWaitTime > 0 && System.currentTimeMillis() - startTime > maxWaitTime) {return false;}// 等待重试if (retryInterval > 0) {try {Thread.sleep(retryInterval);} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}}} catch (Exception e) {// 异常处理return false;}}private boolean acquireLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {SetParams params = SetParams.setParams().nx().px(expireTime);return "OK".equals(jedis.set(lockKey, requestId, params));}}private void scheduleRenewal(String lockKey, String requestId, int expireTime) {// 计算续约间隔(通常是过期时间的1/3)long renewalInterval = expireTime * 2 / 3;ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {try {renewLock(lockKey, requestId, expireTime);} catch (Exception e) {// 续约失败,取消任务renewalFutures.remove(lockKey);}}, renewalInterval, renewalInterval, TimeUnit.MILLISECONDS);renewalFutures.put(lockKey, future);}public boolean renewLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('pexpire', KEYS[1], ARGV[2]) " +"else " +"return 0 " +"end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Arrays.asList(requestId, String.valueOf(expireTime)));return "OK".equals(result);}}public boolean releaseLock(String lockKey, String requestId) {try {LockEntry entry = lockHoldCounts.get().get(lockKey);if (entry == null || !entry.requestId.equals(requestId)) {return false;}// 减少持有计数if (entry.holdCount.decrementAndGet() > 0) {return true;}// 完全释放锁lockHoldCounts.get().remove(lockKey);// 取消续约任务ScheduledFuture<?> future = renewalFutures.remove(lockKey);if (future != null) {future.cancel(false);}// 释放Redis锁try (Jedis jedis = jedisPool.getResource()) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));return Long.valueOf(1L).equals(result);}} catch (Exception e) {return false;}}public void shutdown() {scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException e) {scheduler.shutdownNow();Thread.currentThread().interrupt();}}
}

1.2 锁工厂模式(支持多种锁类型)

public interface DistributedLock {boolean tryLock(String lockKey, String requestId, int expireTime);boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime, long retryInterval);boolean releaseLock(String lockKey, String requestId);
}public class RedisLockFactory {private final JedisPool jedisPool;public RedisLockFactory(JedisPool jedisPool) {this.jedisPool = jedisPool;}public DistributedLock createSimpleLock() {return new SimpleRedisLock(jedisPool);}public DistributedLock createReentrantLock() {return new ReentrantRedisLock(jedisPool);}public DistributedLock createReadWriteLock() {return new RedisReadWriteLock(jedisPool);}private static class SimpleRedisLock implements DistributedLock {// 简单实现(同前面基础实现)}private static class ReentrantRedisLock extends RedisDistributedLock {// 可重入实现(同前面完整实现)}private static class RedisReadWriteLock implements DistributedLock {// 读写锁实现}
}

2. 高级特性实现

2.1 读写锁实现

public class RedisReadWriteLock {private final JedisPool jedisPool;private static final String READ_LOCK_PREFIX = "READ_LOCK:";private static final String WRITE_LOCK_PREFIX = "WRITE_LOCK:";public RedisReadWriteLock(JedisPool jedisPool) {this.jedisPool = jedisPool;}public boolean tryReadLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {// 检查是否有写锁if (jedis.exists(WRITE_LOCK_PREFIX + lockKey)) {return false;}// 获取读锁String readLockKey = READ_LOCK_PREFIX + lockKey;Long count = jedis.incr(readLockKey);if (count == 1L) {// 第一次获取读锁,设置过期时间jedis.pexpire(readLockKey, expireTime);}return true;}}public boolean tryWriteLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {// 检查是否有读锁if (jedis.exists(READ_LOCK_PREFIX + lockKey)) {return false;}// 获取写锁SetParams params = SetParams.setParams().nx().px(expireTime);return "OK".equals(jedis.set(WRITE_LOCK_PREFIX + lockKey, requestId, params));}}// 释放方法类似...
}

2.2 公平锁实现(基于Redis列表)

public class RedisFairLock {private final JedisPool jedisPool;private static final String QUEUE_PREFIX = "LOCK_QUEUE:";private static final String LOCK_PREFIX = "LOCK:";public RedisFairLock(JedisPool jedisPool) {this.jedisPool = jedisPool;}public boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime) {long startTime = System.currentTimeMillis();String queueKey = QUEUE_PREFIX + lockKey;String lockRealKey = LOCK_PREFIX + lockKey;try (Jedis jedis = jedisPool.getResource()) {// 加入等待队列jedis.rpush(queueKey, requestId);try {while (true) {// 检查是否轮到自己String firstRequestId = jedis.lindex(queueKey, 0);if (requestId.equals(firstRequestId)) {// 尝试获取锁SetParams params = SetParams.setParams().nx().px(expireTime);if ("OK".equals(jedis.set(lockRealKey, requestId, params))) {return true;}}// 检查超时if (System.currentTimeMillis() - startTime > maxWaitTime) {// 从队列中移除自己jedis.lrem(queueKey, 0, requestId);return false;}// 短暂等待Thread.sleep(100);}} finally {// 确保最终从队列中移除(防止异常情况)jedis.lrem(queueKey, 0, requestId);}} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}public void releaseLock(String lockKey, String requestId) {try (Jedis jedis = jedisPool.getResource()) {String lockRealKey = LOCK_PREFIX + lockKey;String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";jedis.eval(script, Collections.singletonList(lockRealKey), Collections.singletonList(requestId));}}
}

3. 生产环境最佳实践

3.1 配置建议

  1. 锁过期时间

    • 根据业务操作的最长时间设置,通常为业务平均耗时的3倍
    • 例如:业务平均耗时200ms,可设置锁过期时间为600ms
  2. 续约间隔

    • 设置为过期时间的1/3到1/2
    • 例如:过期时间600ms,续约间隔200-300ms
  3. 重试策略

    • 初始重试间隔50-100ms
    • 可考虑指数退避策略

3.2 监控与告警

public class LockMonitor {private final JedisPool jedisPool;private final MeterRegistry meterRegistry; // 假设使用Micrometerpublic LockMonitor(JedisPool jedisPool, MeterRegistry meterRegistry) {this.jedisPool = jedisPool;this.meterRegistry = meterRegistry;}public void monitorLockStats() {// 监控锁获取成功率Timer lockAcquireTimer = Timer.builder("redis.lock.acquire.time").description("Time taken to acquire redis lock").register(meterRegistry);// 监控锁等待时间DistributionSummary waitTimeSummary = DistributionSummary.builder("redis.lock.wait.time").description("Time spent waiting for redis lock").register(meterRegistry);// 监控锁竞争情况Gauge.builder("redis.lock.queue.size", () -> {try (Jedis jedis = jedisPool.getResource()) {return jedis.llen("LOCK_QUEUE:important_lock");}}).description("Number of clients waiting for lock").register(meterRegistry);}
}

3.3 异常处理策略

  1. Redis不可用时的降级策略
    • 本地降级锁(仅适用于单机或可以接受短暂不一致的场景)
    • 快速失败,避免系统雪崩
public class DegradableRedisLock implements DistributedLock {private final DistributedLock redisLock;private final ReentrantLock localLock = new ReentrantLock();private final CircuitBreaker circuitBreaker;public DegradableRedisLock(DistributedLock redisLock) {this.redisLock = redisLock;this.circuitBreaker = CircuitBreaker.ofDefaults("redisLock");}@Overridepublic boolean tryLock(String lockKey, String requestId, int expireTime) {return CircuitBreaker.decorateSupplier(circuitBreaker, () -> {try {return redisLock.tryLock(lockKey, requestId, expireTime);} catch (Exception e) {// Redis不可用,降级到本地锁return localLock.tryLock();}}).get();}// 其他方法实现类似...
}

4. 测试方案

4.1 单元测试

public class RedisDistributedLockTest {private RedisDistributedLock lock;private JedisPool jedisPool;@BeforeEachvoid setUp() {jedisPool = new JedisPool("localhost");lock = new RedisDistributedLock(jedisPool);}@Testvoid testLockAndUnlock() {String lockKey = "test_lock";String requestId = UUID.randomUUID().toString();assertTrue(lock.tryLock(lockKey, requestId, 10000));assertTrue(lock.releaseLock(lockKey, requestId));}@Testvoid testReentrantLock() {String lockKey = "test_reentrant_lock";String requestId = UUID.randomUUID().toString();assertTrue(lock.tryLock(lockKey, requestId, 10000));assertTrue(lock.tryLock(lockKey, requestId, 10000)); // 可重入assertTrue(lock.releaseLock(lockKey, requestId));assertTrue(lock.releaseLock(lockKey, requestId)); // 需要释放两次}// 更多测试用例...
}

4.2 并发测试

@Test
void testConcurrentLock() throws InterruptedException {String lockKey = "concurrent_test_lock";int threadCount = 10;CountDownLatch latch = new CountDownLatch(threadCount);AtomicInteger successCount = new AtomicInteger();for (int i = 0; i < threadCount; i++) {new Thread(() -> {String requestId = UUID.randomUUID().toString();if (lock.tryLock(lockKey, requestId, 1000, 5000, 100)) {try {successCount.incrementAndGet();Thread.sleep(100); // 模拟业务处理} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {lock.releaseLock(lockKey, requestId);}}latch.countDown();}).start();}latch.await();assertEquals(1, successCount.get()); // 确保只有一个线程获取到锁
}

5. 性能优化建议

  1. 连接池优化

    • 合理配置Jedis连接池大小
    • 使用try-with-resources确保连接释放
  2. Lua脚本优化

    • 预加载常用Lua脚本
    • 减少脚本复杂度
  3. 批量操作

    • 对于RedLock等多节点场景,考虑使用管道(pipeline)
  4. 本地缓存

    • 对于频繁使用的锁信息,可考虑本地缓存

6. 替代方案对比

方案优点缺点适用场景
Redis单节点实现简单,性能高单点故障,可靠性较低对可靠性要求不高的场景
Redis集群+RedLock可靠性较高实现复杂,性能较低对可靠性要求高的场景
Zookeeper可靠性高,原生支持临时节点性能较低,依赖Zookeeper强一致性要求的场景
数据库实现无需额外组件性能差,容易成为瓶颈简单场景,并发量低

这个进阶方案提供了生产环境所需的完整功能,包括可重入锁、读写锁、公平锁等高级特性,以及监控、降级等生产级考量。您可以根据实际项目需求选择合适的实现方式。

http://www.xdnf.cn/news/6399.html

相关文章:

  • Linux:理解文件系统
  • 网络损伤仪功能介绍与应用场景剖析
  • Java详解LeetCode 热题 100(17):LeetCode 41. 缺失的第一个正数(First Missing Positive)详解
  • JavaScript的BOM、DOM编程
  • Java并发编程:CAS操作
  • java调用get请求和post请求
  • 无人机屏蔽与滤波技术模块运行方式概述!
  • Git命令总结
  • 视频质量分析时,遇到不同分辨率的对照视频和源视频,分辨率对齐的正确顺序。
  • Linux515 rsync定时备份
  • 使用LoRA微调Qwen2.5-VL-7B-Instruct完成电气主接线图识别
  • Android 图片自动拉伸不变形,点九
  • Linux 系统中的文件系统层次结构和重要目录的用途。
  • 隆重推荐(Android 和 iOS)UI 自动化工具—Maestro
  • 浏览器宝塔访问不了给的面板地址
  • CSS图片垂直居中问题解决方案
  • 【数据结构入门训练DAY-35】棋盘问题
  • 本地文件操作 MCP (多通道处理) 使用案例
  • 使用 TypeScript + dhtmlx-gantt 在 Next.js 中实现
  • docker(四)使用篇一:docker 镜像仓库
  • 全球宠物经济新周期下的亚马逊跨境采购策略革新——宠物用品赛道成本优化三维路径
  • SQL练习(3/81)
  • 【Python】【面试凉经】Fastapi为什么Fast
  • uniapp,小程序中实现文本“展开/收起“功能的最佳实践
  • 5G + 区块链:技术巨浪下的新型数字生态!
  • 【生活相关-日语-日本-东京-搬家后-引越(ひっこし)(3)-踩坑点:国民健康保险】
  • Cloudflare防火墙拦截谷歌爬虫|导致收录失败怎么解决?
  • 国产化中间件 替换 nginx
  • MySQL索引优化面试高频考点解析(附实战场景)
  • 16.2 VDMA视频转发实验之模拟源