分布式锁的几种实现
前几天看一个面试视频,提到了分布式锁一直想写写,但奈何考试太多,直到今天才有时间。好啦,开始今天的文章吧。
一.定义
分布式锁:当多个进程不在同一个系统中(比如分布式系统中控制共享资源访问),用分布式锁控制多个进程对资源的访问。
二.实现
- 基于数据库实现分布式锁 主要时依赖数据库支持的锁实现的
全局锁,表锁(排他锁,共享锁) ,行锁(记录锁,间隙锁,临键锁),意向锁,悲观锁,乐观锁
基于数据库表 可以通过唯一索引实现,我给一个唯一索引的字段插入数据时,其他线程是无法给这个字段继续插入数据(唯一索引特性),从而保证安全与一直性。
同样也可以通过主键索引实现(唯一+非空约束) 同上插入数据时相当于加锁,删除数据时相当于释放锁。
乐观锁(基于版本号)
如果存在一条数据插入时,另一条数据也要操作,就会检查版本号有没有更新,更新就再次获取锁,没有就插入数据。(如下为转账操作)
CREATE TABLE account (id INT PRIMARY KEY,balance DECIMAL(10,2),version INT NOT NULL
);
public boolean transferWithOptimisticLock(int id, double amount) throws SQLException {Connection conn = dataSource.getConnection();conn.setAutoCommit(false);try {// 查询当前余额与版本号String selectSql = "SELECT balance, version FROM account WHERE id = ?";PreparedStatement selectStmt = conn.prepareStatement(selectSql);selectStmt.setInt(1, id);ResultSet rs = selectStmt.executeQuery();if (!rs.next()) return false;double currentBalance = rs.getDouble("balance");int currentVersion = rs.getInt("version");if (currentBalance < amount) {throw new RuntimeException("余额不足");}double newBalance = currentBalance - amount;int newVersion = currentVersion + 1;// 更新并检查版本号String updateSql = "UPDATE account SET balance = ?, version = ? WHERE id = ? AND version = ?";PreparedStatement updateStmt = conn.prepareStatement(updateSql);updateStmt.setDouble(1, newBalance);updateStmt.setInt(2, newVersion);updateStmt.setInt(3, id);updateStmt.setInt(4, currentVersion);int rowsAffected = updateStmt.executeUpdate();if (rowsAffected == 0) {// 版本不一致,说明其他人已修改System.out.println("乐观锁失败:数据已被修改,请重试");return false;}conn.commit();return true;} catch (SQLException e) {conn.rollback();throw e;} finally {conn.setAutoCommit(true);}
}
悲观锁(基于排它锁)
一条数据插入时就直接加锁,然后等到数据插入完成就释放锁,持有锁期间不允许又任何读写操作作用于该数据。
CREATE TABLE account (id INT PRIMARY KEY,balance DECIMAL(10,2)
);
public void transferWithPessimisticLock(int id, double amount) throws SQLException {Connection conn = dataSource.getConnection();conn.setAutoCommit(false);try {// 加锁查询(FOR UPDATE)String lockSql = "SELECT balance FROM account WHERE id = ? FOR UPDATE";PreparedStatement lockStmt = conn.prepareStatement(lockSql);lockStmt.setInt(1, id);ResultSet rs = lockStmt.executeQuery();if (!rs.next()) throw new RuntimeException("账户不存在");double currentBalance = rs.getDouble("balance");if (currentBalance < amount) {throw new RuntimeException("余额不足");}double newBalance = currentBalance - amount;// 执行更新String updateSql = "UPDATE account SET balance = ? WHERE id = ?";PreparedStatement updateStmt = conn.prepareStatement(updateSql);updateStmt.setDouble(1, newBalance);updateStmt.setInt(2, id);updateStmt.executeUpdate();conn.commit();} catch (SQLException e) {conn.rollback();throw e;} finally {conn.setAutoCommit(true);}
}
- 基于 redis 实现分布式锁:(常用)
单个Redis实例:setnx(key,当前时间+过期时间) + Lua脚本
这个太常见了,黑马外卖不就是通过Redis结合防重Token和Lua脚本来实现幂等性校验。lua脚本保证了读取用户token与删除token是一步完成的。
Maven导入:
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.0.1</version>
</dependency>
代码实现:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;import java.util.Collections;
import java.util.UUID;public class RedisDistributedLock {private final Jedis jedis;private final String lockKey;private final String requestId;private final int expireTime; // 毫秒public RedisDistributedLock(Jedis jedis, String lockKey, int expireTime) {this.jedis = jedis;this.lockKey = lockKey;this.expireTime = expireTime;this.requestId = UUID.randomUUID().toString(); // 唯一标识}/*** 获取锁(带自动过期)*/public boolean acquire() {SetParams params = new SetParams();params.nx(); // 仅当 key 不存在时才设置params.px(expireTime); // 设置过期时间(毫秒)String result = jedis.set(lockKey, requestId, params);return "OK".equals(result);}/*** 释放锁(通过 Lua 脚本保证原子性)*/public boolean release() {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Object result = jedis.eval(script,Collections.singletonList(lockKey),Collections.singletonList(requestId));return "1".equals(result.toString());}public static void main(String[] args) {try (Jedis jedis = new Jedis("localhost", 6379)) {RedisDistributedLock lock = new RedisDistributedLock(jedis, "my:lock:key", 30000);if (lock.acquire()) {System.out.println("【线程:" + Thread.currentThread().getName() + "】获取锁成功");try {// 模拟业务逻辑Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();} finally {if (lock.release()) {System.out.println("【线程:" + Thread.currentThread().getName() + "】释放锁成功");} else {System.out.println("【线程:" + Thread.currentThread().getName() + "】释放锁失败,可能已被他人释放");}}} else {System.out.println("【线程:" + Thread.currentThread().getName() + "】获取锁失败");}} catch (Exception e) {e.printStackTrace();}}
}
Redis集群模式:Redlock(红锁,存在争议)
原理:
- 使用 N 个独立的 Redis 节点 (推荐为奇数个,如 5 个)
- 每个节点都尝试获取相同的锁
- 客户端通过多数节点加锁成功来判断是否获得锁
4. 关键问题与争议
问题 | 描述 |
---|---|
时间依赖 | Redlock 假设时间是同步的,但现实中 NTP 或虚拟机暂停会导致时间跳跃 |
锁安全性 | Martin Kleppmann 指出,在某些异常场景下可能多个客户端同时持有锁 |
实用性 | Antirez 强调 Redlock 更适合工程实践而非理论完美性 |
Maven导入:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.20.1</version>
</dependency>
代码实现(基于Redission):
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;import java.util.concurrent.TimeUnit;public class RedlockDemo {public static void main(String[] args) {// 配置多个 Redis 节点(模拟集群)Config config1 = new Config();config1.useSingleServer().setAddress("redis://127.0.0.1:6379");Config config2 = new Config();config2.useSingleServer().setAddress("redis://127.0.0.1:6380");Config config3 = new Config();config3.useSingleServer().setAddress("redis://127.0.0.1:6381");// 创建三个 Redisson 客户端实例RedissonClient redisson1 = Redisson.create(config1);RedissonClient redisson2 = Redisson.create(config2);RedissonClient redisson3 = Redisson.create(config3);// 获取每个节点上的锁对象RLock lock1 = redisson1.getLock("redlock:key");RLock lock2 = redisson2.getLock("redlock:key");RLock lock3 = redisson3.getLock("redlock:key");// 创建 Redlock 对象RLock redLock = redisson1.getRedLock(lock1, lock2, lock3);boolean isLocked = false;try {// 尝试加锁,等待最多 100 秒,上锁后 30 秒自动解锁isLocked = redLock.tryLock(100, 30, TimeUnit.SECONDS);if (isLocked) {System.out.println("【加锁成功】当前线程:" + Thread.currentThread().getName());// 执行业务逻辑Thread.sleep(5000);} else {System.out.println("【加锁失败】");}} catch (InterruptedException e) {e.printStackTrace();} finally {if (isLocked) {redLock.unlock(); // 释放锁System.out.println("【锁已释放】");}// 关闭客户端redisson1.shutdown();redisson2.shutdown();redisson3.shutdown();}}
}
- 基于 zookeeper实现分布式锁(zookeeper相对与redis而言舍弃了部分高性能而保证了强一致性)
临时有序节点来实现的分布式锁,Curator
核心思想:
- 每个客户端在 ZooKeeper 中尝试创建一个临时有序子节点 ,如:
/locks/my_lock/lock_0000000001
- 获取当前所有子节点并排序,判断自己是否是最小序号的节点:
- 如果是 → 成功获取锁;
- 如果不是 → 监听前一个节点(Watch),等待它被删除;
- 执行完业务逻辑后,删除自己的节点 → 释放锁;
- 利用 ZooKeeper 的 Watcher 机制自动通知下一个节点尝试加锁;
String ourPath = zk.createEphemeralSequential(root + "/lock_", data);
List<String> children = zk.getChildren(root);
Collections.sort(children); // 排序所有子节点
//伪代码
if (ourPath is the first in sorted list) {return true; // 成功获取锁
} else {String prevNode = findPrevNode(ourPath, children);watch(prevNode); // 监听前一个节点waitUntilPrevNodeDeleted(); // 等待删除后再次尝试
}
Maven导入:
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.7.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.7.0</version>
</dependency>
代码实现:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ZookeeperDistributedLockExample {// ZooKeeper 地址(单机模式)private static final String ZK_ADDRESS = "localhost:2181";// 锁路径(可以理解为资源名)private static final String LOCK_PATH = "/distributed_lock";public static void main(String[] args) {// 创建 ZooKeeper 客户端CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new ExponentialBackoffRetry(1000, 3));client.start();// 创建分布式锁对象InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);// 模拟多个线程并发请求锁ExecutorService executor = Executors.newFixedThreadPool(5);for (int i = 0; i < 5; i++) {final int threadNum = i;executor.submit(() -> {try {System.out.println("线程 " + threadNum + " 正在尝试获取锁...");if (lock.acquire(10, TimeUnit.SECONDS)) { // 等待最多10秒try {System.out.println("线程 " + threadNum + " 成功获得锁");Thread.sleep(3000); // 模拟业务处理} finally {lock.release(); // 释放锁System.out.println("线程 " + threadNum + " 已释放锁");}} else {System.out.println("线程 " + threadNum + " 获取锁失败");}} catch (Exception e) {e.printStackTrace();}});}executor.shutdown();}
}
- 基于 Consul 实现分布式锁
核心思想:
- 每个客户端尝试在 Consul KV 中创建一个键,并带上当前会话(Session);
- Consul 使用
acquire
原子操作保证只有第一个成功设置该 key 的客户端才能获得锁; - 如果 key 已存在且关联了一个未过期的 session,则其他客户端无法获取锁;
- 当持有锁的客户端释放锁或 session 失效时,key 被清除,其他客户端可以重新竞争;
注:以下代码需要实现启动 Consul Agent
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;public class ConsulDistributedLock {private static final String CONSUL_URL = "http://localhost:8500/v1/kv/";private final String lockKey; // 锁路径,如 "/locks/my_resource"private final String sessionId; // 当前会话 IDprivate final String serviceName; // 客户端标识public ConsulDistributedLock(String lockKey, String serviceName) throws Exception {this.lockKey = lockKey;this.serviceName = serviceName;this.sessionId = createSession();}/*** 创建一个 Session*/private String createSession() throws Exception {String sessionJson = String.format("{\"Name\":\"%s\",\"TTL\":\"15s\"}", serviceName);String response = sendPost("http://localhost:8500/v1/session/create", sessionJson);return response.split("\"")[3]; // 简单提取 session ID}/*** 尝试获取锁*/public boolean acquire() throws IOException {String urlStr = CONSUL_URL + lockKey + "?acquire=" + sessionId;String payload = "locked-by:" + serviceName;String result = sendPut(urlStr, payload);return Boolean.parseBoolean(result);}/*** 释放锁*/public boolean release() throws IOException {String urlStr = CONSUL_URL + lockKey + "?release=" + sessionId;String payload = "locked-by:" + serviceName;String result = sendPut(urlStr, payload);return Boolean.parseBoolean(result);}/*** 心跳保持 Session 活跃*/public void renewSession() throws IOException {String urlStr = "http://localhost:8500/v1/session/renew/" + sessionId;sendPut(urlStr, "");}/*** 发送 PUT 请求*/private String sendPut(String urlStr, String body) throws IOException {return sendRequest(urlStr, "PUT", body);}/*** 发送 POST 请求*/private String sendPost(String urlStr, String body) throws IOException {return sendRequest(urlStr, "POST", body);}/*** 发送 HTTP 请求通用方法*/private String sendRequest(String urlStr, String method, String body) throws IOException {URL url = new URL(urlStr);HttpURLConnection conn = (HttpURLConnection) url.openConnection();conn.setRequestMethod(method);conn.setDoOutput(true);try (OutputStream os = conn.getOutputStream()) {byte[] input = body.getBytes(StandardCharsets.UTF_8);os.write(input, 0, input.length);}StringBuilder response = new StringBuilder();try (BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {String responseLine;while ((responseLine = br.readLine()) != null) {response.append(responseLine.trim());}}return response.toString();}public static void main(String[] args) throws Exception {ConsulDistributedLock lock = new ConsulDistributedLock("/locks/my_lock", "service-A");System.out.println("尝试获取锁...");if (lock.acquire()) {try {System.out.println("【线程:" + Thread.currentThread().getName() + "】获取锁成功");// 模拟业务逻辑执行for (int i = 0; i < 10; i++) {lock.renewSession(); // 保持心跳Thread.sleep(5000);}} finally {lock.release();System.out.println("锁已释放");}} else {System.out.println("获取锁失败");}}
}
感谢你看到这里,喜欢的可以点点关注哦。