zookeeper实现分布式获取全局唯一自增ID的案例。
项目结构
所有配置写在 application.yml
文件中,代码进行了拆分,加入了相关依赖。
1. pom.xml
依赖
<dependencies><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.1</version></dependency>
</dependencies>
2. application.yml
配置文件
在 src/main/resources/application.yml
文件中配置 ZooKeeper 服务器地址、初始 workerId
路径等信息:
zookeeper:server: "localhost:2181"counter-path: "/counter"worker-id-path: "/workerId"
3. 创建 ZookeeperConfig
配置类
配置类用于初始化 Zookeeper 连接,并从配置文件读取参数。
import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configurationpublic class ZookeeperConfig {@Value("${zookeeper.server}")private String zkServer;@Beanpublic ZooKeeper zooKeeper() throws Exception {System.out.println("Zookeeper server: " + zkServer);return new ZooKeeper(zkServer, 3000, event ->System.out.println("Watcher triggered: " + event.getType()));}
}
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;@Component
@ConfigurationProperties(prefix = "zookeeper")
public class ZookeeperProperties {private String server;private String counterPath;private String workerIdPath;public String getServer() {return server;}public void setServer(String server) {this.server = server;}public String getCounterPath() {return counterPath;}public void setCounterPath(String counterPath) {this.counterPath = counterPath;}public String getWorkerIdPath() {return workerIdPath;}public void setWorkerIdPath(String workerIdPath) {this.workerIdPath = workerIdPath;}
}
4. 创建 IDGeneratorService
服务类
这是 ID 生成的核心服务类,包含简易自增 ID 和雪花算法实现。
import com.example.client.redis_test.snowflake.SnowflakeIdGenerator;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;@Service
public class IDGeneratorService {private static final Lock lock = new ReentrantLock();private final ZooKeeper zk;private final ZookeeperProperties zookeeperProperties;private final SnowflakeIdGenerator idGenerator;private final long workerId;private final long dataCenterId;public IDGeneratorService(ZooKeeper zk, ZookeeperProperties zookeeperProperties) throws Exception {this.zk = zk;this.zookeeperProperties = zookeeperProperties;this.workerId = getWorkerId();Random random = new Random();dataCenterId = random.nextInt(32);System.out.println("dataCenterId = " + dataCenterId);this.idGenerator = new SnowflakeIdGenerator(workerId, dataCenterId);}// 获取下一个自增IDpublic long getNextId() throws Exception {lock.lock();try {String counterPath = zookeeperProperties.getCounterPath();// 检查路径是否存在,如果不存在则创建Stat stat = zk.exists(counterPath, false);if (stat == null) {// 如果路径不存在,则创建该路径,并设置初始值为 1zk.create(counterPath, "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}// 获取当前值byte[] data = zk.getData(counterPath, false, null);long currentId = Long.parseLong(new String(data));// 计算下一个 IDlong nextId = currentId + 1;// 更新路径数据zk.setData(counterPath, String.valueOf(nextId).getBytes(), -1);return nextId;} finally {lock.unlock();}}// 从 Zookeeper 获取并分配唯一的 workerIdprivate long getWorkerId() throws Exception {String workerIdPath = zookeeperProperties.getWorkerIdPath();Stat stat = zk.exists(workerIdPath, false);if (stat == null) {zk.create(workerIdPath, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}byte[] data = zk.getData(workerIdPath, false, null);long currentWorkerId = Long.parseLong(new String(data));if (currentWorkerId <= 31) { // 最大 Worker ID 为 31zk.setData(workerIdPath, String.valueOf(currentWorkerId + 1).getBytes(), -1);return currentWorkerId;} else {throw new RuntimeException("Exceeded max worker ID.");}}// 生成 Snowflake IDpublic long generateSnowflakeId() {return idGenerator.nextId();}
}
public class SnowflakeIdGenerator {private final long workerId;private final long dataCenterId;private final long sequenceBits = 12L;private final long workerIdBits = 5L;private final long dataCenterIdBits = 5L;private final long maxWorkerId = -1L ^ (-1L << workerIdBits);private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);private final long workerIdShift = sequenceBits;private final long dataCenterIdShift = sequenceBits + workerIdBits;private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;private final long sequenceMask = -1L ^ (-1L << sequenceBits);private long sequence = 0L;private long lastTimestamp = -1L;public SnowflakeIdGenerator(long workerId, long dataCenterId) {if (workerId > maxWorkerId || workerId < 0) {throw new IllegalArgumentException("workerId invalid");}if (dataCenterId > maxDataCenterId || dataCenterId < 0) {throw new IllegalArgumentException("dataCenterId invalid");}this.workerId = workerId;this.dataCenterId = dataCenterId;}public synchronized long nextId() {long timestamp = timeGen();if (timestamp < lastTimestamp) {throw new RuntimeException("Clock moved backwards.");}if (lastTimestamp == timestamp) {sequence = (sequence + 1) & sequenceMask;if (sequence == 0) {timestamp = tilNextMillis(lastTimestamp);}} else {sequence = 0L;}lastTimestamp = timestamp;return ((timestamp - 1609459200000L) << timestampLeftShift)| (dataCenterId << dataCenterIdShift)| (workerId << workerIdShift)| sequence;}private long tilNextMillis(long lastTimestamp) {long timestamp = timeGen();while (timestamp <= lastTimestamp) {timestamp = timeGen();}return timestamp;}private long timeGen() {return System.currentTimeMillis();}
}
5. 创建 IDGeneratorController
控制器
用于暴露一个简单的 API 接口,返回生成的 ID。
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@RestController
public class IDGeneratorController {@Resourceprivate IDGeneratorService idGeneratorService;public IDGeneratorController(IDGeneratorService idGeneratorService) {this.idGeneratorService = idGeneratorService;}@GetMapping("/next-id")public long getNextId() throws Exception {return idGeneratorService.getNextId();}@GetMapping("/snowflake-id")public long getSnowflakeId() {return idGeneratorService.generateSnowflakeId();}@PostConstructpublic void test() throws InterruptedException {System.out.println("==== Zookeeper 雪花ID多线程测试开始 ====");int threadCount = 20; // 模拟并发线程数int idsPerThread = 1000000; // 每个线程生成的 ID 数Set<Long> allIds = ConcurrentHashMap.newKeySet(); // 用于去重检查CountDownLatch latch = new CountDownLatch(threadCount);long start = System.currentTimeMillis();ExecutorService executor = Executors.newFixedThreadPool(threadCount);for (int i = 0; i < threadCount; i++) {executor.execute(() -> {for (int j = 0; j < idsPerThread; j++) {long id = idGeneratorService.generateSnowflakeId();allIds.add(id);}latch.countDown();});}latch.await();long end = System.currentTimeMillis();executor.shutdown();int totalGenerated = threadCount * idsPerThread;int uniqueCount = allIds.size();System.out.println("==== 雪花ID多线程测试报告 ====");System.out.println("线程数: " + threadCount);System.out.println("每线程生成ID数: " + idsPerThread);System.out.println("总生成ID数: " + totalGenerated);System.out.println("唯一ID数: " + uniqueCount);System.out.println("重复ID数: " + (totalGenerated - uniqueCount));System.out.println("执行耗时(ms): " + (end - start));System.out.printf("吞吐量: %.2f 万ID/秒%n", totalGenerated / ((end - start) / 1000.0) / 10000);System.out.println("============================");}
}
6. 启动与测试
启动 Spring Boot 应用,访问以下接口来测试 ID 生成:
GET /next-id
:获取自增 ID。GET /snowflake-id
:获取雪花算法生成的 ID。
总结
这个 Spring Boot 项目利用 Zookeeper 来确保 ID 的全局唯一性,并通过配置文件来控制相关的参数。主要拆分为以下几个部分:
ZookeeperConfig
:Zookeeper 连接配置。IDGeneratorService
:包含 ID 生成逻辑,包括自增 ID 和雪花算法。IDGeneratorController
:暴露接口供外部调用。