基于redis实现分布式锁方案实战
分布式锁的进阶实现与优化方案
作为Java高级开发工程师,我将为您提供更完善的Redis分布式锁实现方案,包含更多生产级考量。
1. 生产级Redis分布式锁实现
1.1 完整实现类(支持可重入、自动续约)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class RedisDistributedLock {private final JedisPool jedisPool;private final ScheduledExecutorService scheduler;// 本地存储锁的持有计数(用于可重入)private final ThreadLocal<Map<String, LockEntry>> lockHoldCounts = ThreadLocal.withInitial(HashMap::new);// 锁续约的Future集合private final ConcurrentMap<String, ScheduledFuture<?>> renewalFutures = new ConcurrentHashMap<>();private static class LockEntry {final String requestId;final AtomicInteger holdCount;LockEntry(String requestId) {this.requestId = requestId;this.holdCount = new AtomicInteger(1);}}public RedisDistributedLock(JedisPool jedisPool) {this.jedisPool = jedisPool;this.scheduler = Executors.newScheduledThreadPool(4);}public boolean tryLock(String lockKey, String requestId, int expireTime) {return tryLock(lockKey, requestId, expireTime, 0, 0);}public boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime, long retryInterval) {long startTime = System.currentTimeMillis();try {// 检查是否已经持有锁(可重入)LockEntry entry = lockHoldCounts.get().get(lockKey);if (entry != null && entry.requestId.equals(requestId)) {entry.holdCount.incrementAndGet();return true;}// 尝试获取锁while (true) {if (acquireLock(lockKey, requestId, expireTime)) {// 获取成功,记录持有信息lockHoldCounts.get().put(lockKey, new LockEntry(requestId));// 启动自动续约scheduleRenewal(lockKey, requestId, expireTime);return true;}// 检查是否超时if (maxWaitTime > 0 && System.currentTimeMillis() - startTime > maxWaitTime) {return false;}// 等待重试if (retryInterval > 0) {try {Thread.sleep(retryInterval);} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}}} catch (Exception e) {// 异常处理return false;}}private boolean acquireLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {SetParams params = SetParams.setParams().nx().px(expireTime);return "OK".equals(jedis.set(lockKey, requestId, params));}}private void scheduleRenewal(String lockKey, String requestId, int expireTime) {// 计算续约间隔(通常是过期时间的1/3)long renewalInterval = expireTime * 2 / 3;ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {try {renewLock(lockKey, requestId, expireTime);} catch (Exception e) {// 续约失败,取消任务renewalFutures.remove(lockKey);}}, renewalInterval, renewalInterval, TimeUnit.MILLISECONDS);renewalFutures.put(lockKey, future);}public boolean renewLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('pexpire', KEYS[1], ARGV[2]) " +"else " +"return 0 " +"end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Arrays.asList(requestId, String.valueOf(expireTime)));return "OK".equals(result);}}public boolean releaseLock(String lockKey, String requestId) {try {LockEntry entry = lockHoldCounts.get().get(lockKey);if (entry == null || !entry.requestId.equals(requestId)) {return false;}// 减少持有计数if (entry.holdCount.decrementAndGet() > 0) {return true;}// 完全释放锁lockHoldCounts.get().remove(lockKey);// 取消续约任务ScheduledFuture<?> future = renewalFutures.remove(lockKey);if (future != null) {future.cancel(false);}// 释放Redis锁try (Jedis jedis = jedisPool.getResource()) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));return Long.valueOf(1L).equals(result);}} catch (Exception e) {return false;}}public void shutdown() {scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException e) {scheduler.shutdownNow();Thread.currentThread().interrupt();}}
}
1.2 锁工厂模式(支持多种锁类型)
public interface DistributedLock {boolean tryLock(String lockKey, String requestId, int expireTime);boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime, long retryInterval);boolean releaseLock(String lockKey, String requestId);
}public class RedisLockFactory {private final JedisPool jedisPool;public RedisLockFactory(JedisPool jedisPool) {this.jedisPool = jedisPool;}public DistributedLock createSimpleLock() {return new SimpleRedisLock(jedisPool);}public DistributedLock createReentrantLock() {return new ReentrantRedisLock(jedisPool);}public DistributedLock createReadWriteLock() {return new RedisReadWriteLock(jedisPool);}private static class SimpleRedisLock implements DistributedLock {// 简单实现(同前面基础实现)}private static class ReentrantRedisLock extends RedisDistributedLock {// 可重入实现(同前面完整实现)}private static class RedisReadWriteLock implements DistributedLock {// 读写锁实现}
}
2. 高级特性实现
2.1 读写锁实现
public class RedisReadWriteLock {private final JedisPool jedisPool;private static final String READ_LOCK_PREFIX = "READ_LOCK:";private static final String WRITE_LOCK_PREFIX = "WRITE_LOCK:";public RedisReadWriteLock(JedisPool jedisPool) {this.jedisPool = jedisPool;}public boolean tryReadLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {// 检查是否有写锁if (jedis.exists(WRITE_LOCK_PREFIX + lockKey)) {return false;}// 获取读锁String readLockKey = READ_LOCK_PREFIX + lockKey;Long count = jedis.incr(readLockKey);if (count == 1L) {// 第一次获取读锁,设置过期时间jedis.pexpire(readLockKey, expireTime);}return true;}}public boolean tryWriteLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {// 检查是否有读锁if (jedis.exists(READ_LOCK_PREFIX + lockKey)) {return false;}// 获取写锁SetParams params = SetParams.setParams().nx().px(expireTime);return "OK".equals(jedis.set(WRITE_LOCK_PREFIX + lockKey, requestId, params));}}// 释放方法类似...
}
2.2 公平锁实现(基于Redis列表)
public class RedisFairLock {private final JedisPool jedisPool;private static final String QUEUE_PREFIX = "LOCK_QUEUE:";private static final String LOCK_PREFIX = "LOCK:";public RedisFairLock(JedisPool jedisPool) {this.jedisPool = jedisPool;}public boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime) {long startTime = System.currentTimeMillis();String queueKey = QUEUE_PREFIX + lockKey;String lockRealKey = LOCK_PREFIX + lockKey;try (Jedis jedis = jedisPool.getResource()) {// 加入等待队列jedis.rpush(queueKey, requestId);try {while (true) {// 检查是否轮到自己String firstRequestId = jedis.lindex(queueKey, 0);if (requestId.equals(firstRequestId)) {// 尝试获取锁SetParams params = SetParams.setParams().nx().px(expireTime);if ("OK".equals(jedis.set(lockRealKey, requestId, params))) {return true;}}// 检查超时if (System.currentTimeMillis() - startTime > maxWaitTime) {// 从队列中移除自己jedis.lrem(queueKey, 0, requestId);return false;}// 短暂等待Thread.sleep(100);}} finally {// 确保最终从队列中移除(防止异常情况)jedis.lrem(queueKey, 0, requestId);}} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}public void releaseLock(String lockKey, String requestId) {try (Jedis jedis = jedisPool.getResource()) {String lockRealKey = LOCK_PREFIX + lockKey;String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";jedis.eval(script, Collections.singletonList(lockRealKey), Collections.singletonList(requestId));}}
}
3. 生产环境最佳实践
3.1 配置建议
-
锁过期时间:
- 根据业务操作的最长时间设置,通常为业务平均耗时的3倍
- 例如:业务平均耗时200ms,可设置锁过期时间为600ms
-
续约间隔:
- 设置为过期时间的1/3到1/2
- 例如:过期时间600ms,续约间隔200-300ms
-
重试策略:
- 初始重试间隔50-100ms
- 可考虑指数退避策略
3.2 监控与告警
public class LockMonitor {private final JedisPool jedisPool;private final MeterRegistry meterRegistry; // 假设使用Micrometerpublic LockMonitor(JedisPool jedisPool, MeterRegistry meterRegistry) {this.jedisPool = jedisPool;this.meterRegistry = meterRegistry;}public void monitorLockStats() {// 监控锁获取成功率Timer lockAcquireTimer = Timer.builder("redis.lock.acquire.time").description("Time taken to acquire redis lock").register(meterRegistry);// 监控锁等待时间DistributionSummary waitTimeSummary = DistributionSummary.builder("redis.lock.wait.time").description("Time spent waiting for redis lock").register(meterRegistry);// 监控锁竞争情况Gauge.builder("redis.lock.queue.size", () -> {try (Jedis jedis = jedisPool.getResource()) {return jedis.llen("LOCK_QUEUE:important_lock");}}).description("Number of clients waiting for lock").register(meterRegistry);}
}
3.3 异常处理策略
- Redis不可用时的降级策略:
- 本地降级锁(仅适用于单机或可以接受短暂不一致的场景)
- 快速失败,避免系统雪崩
public class DegradableRedisLock implements DistributedLock {private final DistributedLock redisLock;private final ReentrantLock localLock = new ReentrantLock();private final CircuitBreaker circuitBreaker;public DegradableRedisLock(DistributedLock redisLock) {this.redisLock = redisLock;this.circuitBreaker = CircuitBreaker.ofDefaults("redisLock");}@Overridepublic boolean tryLock(String lockKey, String requestId, int expireTime) {return CircuitBreaker.decorateSupplier(circuitBreaker, () -> {try {return redisLock.tryLock(lockKey, requestId, expireTime);} catch (Exception e) {// Redis不可用,降级到本地锁return localLock.tryLock();}}).get();}// 其他方法实现类似...
}
4. 测试方案
4.1 单元测试
public class RedisDistributedLockTest {private RedisDistributedLock lock;private JedisPool jedisPool;@BeforeEachvoid setUp() {jedisPool = new JedisPool("localhost");lock = new RedisDistributedLock(jedisPool);}@Testvoid testLockAndUnlock() {String lockKey = "test_lock";String requestId = UUID.randomUUID().toString();assertTrue(lock.tryLock(lockKey, requestId, 10000));assertTrue(lock.releaseLock(lockKey, requestId));}@Testvoid testReentrantLock() {String lockKey = "test_reentrant_lock";String requestId = UUID.randomUUID().toString();assertTrue(lock.tryLock(lockKey, requestId, 10000));assertTrue(lock.tryLock(lockKey, requestId, 10000)); // 可重入assertTrue(lock.releaseLock(lockKey, requestId));assertTrue(lock.releaseLock(lockKey, requestId)); // 需要释放两次}// 更多测试用例...
}
4.2 并发测试
@Test
void testConcurrentLock() throws InterruptedException {String lockKey = "concurrent_test_lock";int threadCount = 10;CountDownLatch latch = new CountDownLatch(threadCount);AtomicInteger successCount = new AtomicInteger();for (int i = 0; i < threadCount; i++) {new Thread(() -> {String requestId = UUID.randomUUID().toString();if (lock.tryLock(lockKey, requestId, 1000, 5000, 100)) {try {successCount.incrementAndGet();Thread.sleep(100); // 模拟业务处理} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {lock.releaseLock(lockKey, requestId);}}latch.countDown();}).start();}latch.await();assertEquals(1, successCount.get()); // 确保只有一个线程获取到锁
}
5. 性能优化建议
-
连接池优化:
- 合理配置Jedis连接池大小
- 使用try-with-resources确保连接释放
-
Lua脚本优化:
- 预加载常用Lua脚本
- 减少脚本复杂度
-
批量操作:
- 对于RedLock等多节点场景,考虑使用管道(pipeline)
-
本地缓存:
- 对于频繁使用的锁信息,可考虑本地缓存
6. 替代方案对比
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
Redis单节点 | 实现简单,性能高 | 单点故障,可靠性较低 | 对可靠性要求不高的场景 |
Redis集群+RedLock | 可靠性较高 | 实现复杂,性能较低 | 对可靠性要求高的场景 |
Zookeeper | 可靠性高,原生支持临时节点 | 性能较低,依赖Zookeeper | 强一致性要求的场景 |
数据库实现 | 无需额外组件 | 性能差,容易成为瓶颈 | 简单场景,并发量低 |
这个进阶方案提供了生产环境所需的完整功能,包括可重入锁、读写锁、公平锁等高级特性,以及监控、降级等生产级考量。您可以根据实际项目需求选择合适的实现方式。