当前位置: 首页 > ai >正文

Zookeeper是什么?基于zookeeper实现分布式锁

zookeeper听的很多,但实际在应用开发中用的不错,主要是作为中间件配合使用的,例如:Kafka。

了解zk首先需要知道它的数据结构,可以想象为树、文件夹目录。每个节点有基本的信息,例如:创建时间、修改时间、版本,数据长度等。另外节点可以设置data,也就是数据,以字节的方式进行插入/获取,另外节点还拥有权限和状态。

状态很关键,有持久、临时(会话级别)、持久+顺序、临时+顺序、持久+TTL、临时+TTL。

顺序是给同一个节点增加一个编号,例如:path:/distributed_locks/lock

插入多个,在zk中是:/distributed_locks/lock0000000001和/distributed_locks/lock0000000002、、。

到这里数据结构已经大致清楚了,那么zk存在的意义是什么?

首先,zk的定义:是一个集中式服务,用于维护配置信息、命名、提供分布式同步和提供组服务。

关键点:集中、分布式。

在程序进行分布式、多节点部署时,传统程序内存中的变量或者锁机制等都不能在多节点中进行通用。此时,就需要一个集中式的一个中间件,在中间件上存储我们需要同时方案的变量或者其他定义。

那么,我们为什么不直接使用db数据库呢,可能是因为重?也可能是一些特殊的功能db中并不能实现?(临时会话、TTL?)。

作为目前很火热的一个中间件,存在它的意义肯定是有的。为什么说呢,zk是Java实现的,与 Hadoop、Kafka 等 Java 生态项目无缝集成。同理,可以想象,每个语言的特性不一致,都会有不同的中间件或者包。

上述,基本都是个人的一些理解,希望能给大家带来点启发。

zookeeper,咱们的扩展功能到分布式锁这里。通过节点的特性,我们采用会话级别、顺序性质的节点进行实现。

当我们的线程需要去尝试获取锁时,连接zk肯定是个会话,同时zk会根据顺序将不同的线程进行排序,线程内部只需要轮询、wait/notify等方式判断是否轮到自己得到锁了。获取到锁后,执行业务逻辑之后,随之可以将锁进行释放,以便让另外一个线程得到锁。

代码实现用2种方式实现:

原生zookeeper方法实现

package com.fahe.testdistrubutedlock.zk;
​
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
​
import java.util.List;
import java.util.concurrent.CountDownLatch;
​
/*** @program: test-distrubuted-lock* @description: client* @author: <linfahe-694204477@qq.com>* @create: 2025-04-23 14:05**/
@Slf4j
public class ZkClient implements Watcher {
​public static final String ZK_ADDR = "127.0.0.1:32181";public ZooKeeper zk;public CountDownLatch connectedSignal = new CountDownLatch(1);
​public ZkClient() {try {zk = new ZooKeeper(ZK_ADDR, 3000, this);connectedSignal.await(); // 等待连接成功} catch (Exception e) {throw new RuntimeException(e);}}
​@Overridepublic void process(WatchedEvent watchedEvent) {log.info("process WatchedEvent : {}", watchedEvent);if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {connectedSignal.countDown();}}
​
​// 创建持久节点public void createNode() throws KeeperException, InterruptedException {Stat existsed = zk.exists("/my-node", false);if (existsed != null) {
//            zk.delete("/my-node", -1);return;}String path = zk.create("/my-node", "data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println("创建节点:" + path);}
​// 获取节点数据public void getData() throws KeeperException, InterruptedException {byte[] data = zk.getData("/my-node", false, null);System.out.println("节点数据:" + new String(data));}
​public static void main(String[] args) throws InterruptedException, KeeperException {ZkClient zkClient = new ZkClient();List<String> children = zkClient.zk.getChildren("/", true);for (String child : children) {log.info("child : {}", child);}zkClient.createNode();zkClient.getData();}
​public void close() {try {if (zk != null) {zk.close();}} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
​
package com.fahe.testdistrubutedlock.zk;
​
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
​
import java.util.Collections;
import java.util.List;
​
public class DistributedLock {private static final String LOCK_ROOT = "/locks";private static final String LOCK_NODE = LOCK_ROOT + "/lock_";private ZooKeeper zooKeeper;private String lockPath;
​public DistributedLock(ZooKeeper zooKeeper) throws Exception {this.zooKeeper = zooKeeper;Stat stat = zooKeeper.exists(LOCK_ROOT, false);if (stat == null) {zooKeeper.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}
​public void acquireLock() throws Exception {lockPath = zooKeeper.create(LOCK_NODE, "new byte[0]".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println("Lock path: " + lockPath);
​while (true) {List<String> children = zooKeeper.getChildren(LOCK_ROOT, false);Collections.sort(children);String smallestChild = LOCK_ROOT + "/" + children.get(0);
​if (lockPath.equals(smallestChild)) {System.out.println("Acquired lock: " + lockPath);return;}System.out.println("Waiting for lock: " + lockPath + "; smallestChild : " + smallestChild);String watchNode = null;for (int i = children.size() - 1; i >= 0; i--) {String child = LOCK_ROOT + "/" + children.get(i);if (child.compareTo(lockPath) < 0) {watchNode = child;break;}}System.out.println("Waiting for lock: " + lockPath + "; smallestChild : " + smallestChild + " ; watchNode = " + watchNode);
​if (watchNode != null) {final Object lock = new Object();Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {synchronized (lock) {lock.notifyAll();}}};
​Stat stat = zooKeeper.exists(watchNode, watcher);if (stat != null) {synchronized (lock) {lock.wait();}}}}}
​public void releaseLock() throws Exception {if (lockPath != null) {zooKeeper.delete(lockPath, -1);System.out.println("Released lock: " + lockPath);lockPath = null;}}
​public static void main(String[] args) {ZkClient client = new ZkClient();// 模拟多线程。for (int i = 0; i < 30; i++) {new Thread(() -> {try {mainTest(client);} catch (Exception e) {e.printStackTrace();}}).start();}// 模拟多实例。ZkClient client2 = new ZkClient();for (int i = 0; i < 30; i++) {new Thread(() -> {try {mainTest(client2);} catch (Exception e) {e.printStackTrace();}}).start();}}
​public static void mainTest(ZkClient client) {
//         = new ZkClient();try {ZooKeeper zooKeeper = client.zk;
​DistributedLock lock = new DistributedLock(zooKeeper);lock.acquireLock();System.out.println("Lock acquired");// 模拟业务逻辑int randomSleepTime = (int) (Math.random() * 100);System.out.println("randomSleepTime = " + randomSleepTime);Thread.sleep(randomSleepTime);System.out.println("Business logic completed");lock.releaseLock();
//            client.close();} catch (Exception e) {e.printStackTrace();}}
}
​

使用Curator三方包实现:

package com.fahe.testdistrubutedlock.zk;
​
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
​
​
/*** @program: test-distrubuted-lock* @description: curator 测试* @author: <linfahe-694204477@qq.com>* @create: 2025-04-23 15:04**/
public class CuratorMain {private final InterProcessMutex lock;private static final String LOCK_PATH = "/distributed_lock/my_lock";private static final String ZK_ADDR = "127.0.0.1:32181";
​public CuratorMain() {CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDR,new ExponentialBackoffRetry(200, 2));client.start();this.lock = new InterProcessMutex(client, LOCK_PATH);}
​public boolean acquireLock() {try {lock.acquire();return true;} catch (Exception e) {e.printStackTrace();return false;}}
​public void releaseLock() {try {if (lock.isAcquiredInThisProcess()) {lock.release();}} catch (Exception e) {e.printStackTrace();}}
​public static void main(String[] args) {CuratorMain curatorMain = new CuratorMain();for (int i = 0; i < 100; i++) {new Thread(() -> {boolean acquireLock = curatorMain.acquireLock();System.out.println("thread-" + Thread.currentThread().getName() + " is running");System.out.println("acquireLock = " + acquireLock);if (acquireLock) {curatorMain.releaseLock();}}, "thread-" + i).start();}CuratorMain curatorMain2 = new CuratorMain();for (int i = 100; i < 200; i++) {new Thread(() -> {boolean acquireLock = curatorMain2.acquireLock();System.out.println("thread-" + Thread.currentThread().getName() + " is running");System.out.println("acquireLock = " + acquireLock);if (acquireLock) {curatorMain2.releaseLock();}}, "thread-" + i).start();}try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}
}
​

http://www.xdnf.cn/news/1283.html

相关文章:

  • 软件黑盒与白盒测试详解
  • 同样的接口用postman/apifox能跑通,用jmeter跑就报错500
  • 【MCP】第二篇:IDE革命——用MCP构建下一代智能工具链
  • 【Linux】冯诺依曼体系结构及操作系统架构图的具体剖析
  • 【Ubuntu】关于系统分区、挂载点、安装位置的一些基本信息
  • 【算法笔记】动态规划基础(一):dp思想、基础线性dp
  • 【k8s】docker、k8s、虚拟机的区别以及使用场景
  • sentinel
  • CATBOOST算法总结
  • vscode如何多行同时编辑,vscode快速选中多行快捷键
  • 使用 JUnit 4在 Spring 中进行单元测试的完整步骤
  • 【数据结构入门训练DAY-21】信息学奥赛一本通T1334-围圈报数
  • 深入剖析TCP协议(内容二):从OSI与TCP/IP网络模型到三次握手、四次挥手、状态管理、性能优化及Linux内核源码实现的全面技术指南
  • 基于cubeMX的hal库STM32实现MQ2烟雾浓度检测
  • 软考软件设计师30天备考指南
  • 升级xcode16之后react-native-zip-archive不兼容,unsupported option ‘-G‘
  • The backpropagation and the brain
  • Java与C语言核心差异:从指针到内存管理的全面剖析
  • Node.js学习
  • WT2000T专业录音芯片:破解普通录音设备信息留存、合规安全与远程协作三大难题
  • 【k8s系列7-更新中】kubeadm搭建Kubernetes高可用集群-三主两从
  • .NET 6 WPF 利用CefSharp.Wpf.NETCore显示PDF文件
  • 什么是 GLTF/GLB? 3D 内容创建的基本数据格式说明,怎么下载GLB/GLTF格式模型
  • HarmonyOS 是 Android 套壳嘛?
  • 【C语言】动态内存的常见错误
  • Git远程操作与标签管理
  • Linux权限
  • 数据结构:栈
  • Multi-View Stereo for Community Photo Collections
  • 云原生--CNCF-1-云原生计算基金会介绍(云原生生态的发展目标和未来)