项目学习总结(4)
文章目录
- 概述
- 雪花算法及序列化工厂的实现
- 雪花算法具体实现:
- 序列化工厂的建立
- 总结
概述
今天项目主要扩展几种序列化方式同时建立序列化工厂,以及实现雪花算法处理请求id(之前是直接写死成1)
雪花算法及序列化工厂的实现
雪花算法具体实现:
首先简单了解下雪花算法的原理:雪花算法的核心是将 64 位 Long 型 ID 拆分成多个固定长度的字段(时间戳、数据中心 ID、机器 ID、序列号),通过位运算拼接成唯一 ID。这里我把id分成了以下几个部分:
雪花算法ID字段划分表
字段 | 位数(代码常量) | 作用 | 范围(最大值) |
---|---|---|---|
时间戳 | 剩余位数(64 - 5 - 5 - 12 = 42 位) | 记录当前时间与 “起始时间戳” 的差值(毫秒级),确保 ID 随时间递增 | 2^42 ≈ 44 万亿毫秒(约 139 年,足够用) |
数据中心 ID | DATA_CENTER_BIT = 5L | 区分不同数据中心(如北京、上海数据中心),避免跨数据中心 ID 冲突 | 2^5 - 1 = 31(支持 32 个数据中心) |
机器 ID | MACHINE_BIT = 5L | 区分同一数据中心内的不同机器(如服务器 1、服务器 2),避免同数据中心内机器冲突 | 2^5 - 1 = 31(每个数据中心支持 32 台机器) |
序列号 | SEQUENCE_BIT = 12L | 同一毫秒内生成多个 ID 时,用序列号区分(解决同一毫秒内的并发冲突) | 2^12 - 1 = 4095(每毫秒最多生成 4096 个 ID) |
这样每个请求就会由于其不同的数据中心,机器id,时间进而产生不同的请求id,这样就可以有效去重。当然单机服务也可以使用线程安全的AtomicInteger类。下面来看下具体代码及实现流程:
public class IdGenerator {private static LongAdder longAdder = new LongAdder();//起始时间戳public static final long START_STAMP = DateUtil.get("2025-1-1").getTime();public static final long DATA_CENTER_BIT = 5L;public static final long MACHINE_BIT = 5L;public static final long SEQUENCE_BIT = 12L;public static final long DATA_CENTER_MAX = ~(-1L << DATA_CENTER_BIT);public static final long MACHINE_MAX = ~(-1L << MACHINE_BIT);public static final long SEQUENCE_MAX = ~(-1L << SEQUENCE_BIT);public static final long TIMESTAMP_LEFT = DATA_CENTER_BIT+ MACHINE_BIT + SEQUENCE_BIT;public static final long DATA_CENTER_LEFT = MACHINE_BIT + SEQUENCE_BIT;public static final long MACHINE_LEFT = SEQUENCE_BIT;private long dataCenterId;private long machineId;private LongAdder sequenceId = new LongAdder();private long lastTimeStamp = -1L;public IdGenerator(long dataCenterId, long machineId) {if(dataCenterId > DATA_CENTER_MAX || machineId > MACHINE_MAX){throw new IllegalArgumentException("传入的编号不合法");}this.dataCenterId = dataCenterId;this.machineId = machineId;}public long getId(){//1.处理时间戳long currentTime = System.currentTimeMillis();long timeStamp = currentTime - START_STAMP;if(timeStamp < lastTimeStamp){throw new RuntimeException("您的服务器进行了时钟回调");}if(timeStamp == lastTimeStamp){sequenceId.increment();if(sequenceId.sum() >= SEQUENCE_MAX){timeStamp = getNextTimeStamp();sequenceId.reset();}}else{sequenceId.reset();}lastTimeStamp = timeStamp;long sequence = sequenceId.sum();return timeStamp << TIMESTAMP_LEFT | dataCenterId << DATA_CENTER_LEFT |machineId << MACHINE_LEFT | sequence;}private long getNextTimeStamp() {long current = System.currentTimeMillis() - START_STAMP;while(current == lastTimeStamp){current = System.currentTimeMillis() - START_STAMP;}return current;}
1. 字段最大值:DATA_CENTER_MAX、MACHINE_MAX、SEQUENCE_MAX
// 数据中心ID的最大值
public static final long DATA_CENTER_MAX = ~(-1L << DATA_CENTER_BIT);
// 机器ID的最大值
public static final long MACHINE_MAX = ~(-1L << MACHINE_BIT);
// 序列号的最大值
public static final long SEQUENCE_MAX = ~(-1L << SEQUENCE_BIT);
位运算原理:以DATA_CENTER_MAX为例(DATA_CENTER_BIT=5):
(-1L << 5):-1 的二进制是全 1(64 位),左移 5 位后,低 5 位变为 0,高位保持 1(即…11111111 11100000)。
~(-1L << 5):对上述结果取反,低 5 位变为 1,高位变为 0(即000…000 00011111),对应十进制 31(2^5-1)。
作用:通过位运算计算出每个字段的最大允许值(避免 ID 超出字段位数导致重叠)。例如数据中心 ID 不能超过 31,否则会占用其他字段的位数,导致 ID 冲突。
当然这里也可以使用(1L << n) - 1来实现,因为左移1位就相当于扩大2倍。但不推荐使用Math.pow,这样计算效率比较低
2. 位移量:TIMESTAMP_LEFT、DATA_CENTER_LEFT、MACHINE_LEFT
// 时间戳需要左移的位数(跳过数据中心、机器、序列号的位数)
public static final long TIMESTAMP_LEFT = DATA_CENTER_BIT + MACHINE_BIT + SEQUENCE_BIT; // 5+5+12=22
// 数据中心ID需要左移的位数(跳过机器、序列号的位数)
public static final long DATA_CENTER_LEFT = MACHINE_BIT + SEQUENCE_BIT; // 5+12=17
// 机器ID需要左移的位数(跳过序列号的位数)
public static final long MACHINE_LEFT = SEQUENCE_BIT; // 12
作用:位移量决定了每个字段在 64 位 ID 中的 “起始位置”。例如时间戳左移 22 位后,它的二进制会占据 ID 的最高位(从第 22 位到第 63 位),不会与其他字段重叠。
三、getId()方法:ID 生成的完整流程
getId()方法是生成唯一 ID 的核心,通过 “时间戳处理→序列号管理→位运算拼接” 三个步骤生成 ID:
步骤 1:计算相对时间戳(减少 ID 长度)
// 获取当前时间戳
long currentTime = System.currentTimeMillis();
// 计算相对时间戳:当前时间 - 起始时间(START_STAMP=2025-1-1的时间戳)
long timeStamp = currentTime - START_STAMP;
这里如果直接用System.currentTimeMillis()(绝对时间戳,从 1970 年开始),数值较大(约 17 位十进制),会占用更多位数。减去起始时间后,时间戳从 0 开始计数(2025-1-1 之后的毫秒数),减少 ID 的整体长度。
步骤 2:处理时钟回拨(避免 ID 重复)
if (timeStamp < lastTimeStamp) {throw new RuntimeException("您的服务器进行了时钟回调");
}
时钟回拨的风险:如果服务器时钟被调回(如手动修改时间、NTP 同步导致时间倒退),可能会生成与之前相同的时间戳,导致 ID 重复。
处理方式:直接抛异常,提示时钟异常。
步骤 3:管理序列号(解决同一毫秒内的并发冲突)
if (timeStamp == lastTimeStamp) {// 同一毫秒内:序列号自增sequenceId.increment();// 如果序列号超过最大值(4095),等待到下一个毫秒if (sequenceId.sum() >= SEQUENCE_MAX) {timeStamp = getNextTimeStamp(); // 获取下一个毫秒的时间戳sequenceId.reset(); // 序列号重置为0}
} else {// 不同毫秒:序列号重置为0sequenceId.reset();
}
场景说明:
若当前时间戳与上一次生成 ID 的时间戳相同(同一毫秒内),说明在同一毫秒内有并发请求,通过sequenceId自增区分(最多 4096 个)。
若序列号用完(超过 4095),则调用getNextTimeStamp()等待到下一个毫秒,重置序列号后再生成。
步骤 4:通过位运算拼接所有字段(生成最终 ID)
lastTimeStamp = timeStamp; // 更新上一次时间戳
long sequence = sequenceId.sum(); // 获取当前序列号// 位运算拼接:时间戳 << 22 | 数据中心ID << 17 | 机器ID << 12 | 序列号
return timeStamp << TIMESTAMP_LEFT | dataCenterId << DATA_CENTER_LEFT | machineId << MACHINE_LEFT | sequence;
这是最关键的一步! 我用一个具体例子演示拼接过程:
假设:
相对时间戳timeStamp = 100(二进制:1100100)
数据中心 IDdataCenterId = 1(二进制:1)
机器 IDmachineId = 2(二进制:10)
序列号sequence = 3(二进制:11)
时间戳左移 22 位:
100 << 22 → 二进制中,1100100向左移动 22 位,占据 ID 的最高位(第 22-63 位),低 22 位补 0。
数据中心 ID 左移 17 位:
1 << 17 → 二进制1向左移动 17 位,占据 ID 的第 17-21 位(共 5 位)。
机器 ID 左移 12 位:
2 << 12 → 二进制10向左移动 12 位,占据 ID 的第 12-16 位(共 5 位)。
序列号不左移:
sequence = 3 → 二进制11,占据 ID 的第 0-11 位(共 12 位)。
或运算(|)拼接:
四个部分通过|运算组合,每个字段占据独立的位区域,互不重叠,最终形成一个 64 位的唯一 ID。
示意图(简化为 32 位展示):
时间戳(左移22位) 数据中心ID(左移17位) 机器ID(左移12位) 序列号
[1100100......] | [00001......] | [00010......] | [000000000011]
四、getNextTimeStamp():获取下一个有效毫秒
private long getNextTimeStamp() {long current = System.currentTimeMillis() - START_STAMP;// 循环等待,直到当前时间戳超过上一次的时间戳(进入下一个毫秒)while (current == lastTimeStamp) {current = System.currentTimeMillis() - START_STAMP;}return current;
}
作用:当同一毫秒内的序列号用完(超过 4095),该方法会阻塞等待,直到时间进入下一个毫秒,确保下一个 ID 的时间戳递增,避免序列号溢出导致的 ID 重复。
序列化工厂的建立
工厂设计模式在前面实现注册中心就已经有使用过,因此这里我们简单回顾下怎么实现工厂设计模式:
定义接口(规范)→ 实现具体类(逻辑)→ 枚举管理(映射)→ 工厂封装(入口)。
为了实现以上的功能,我定义了如下的几个类/接口:
序列化组件核心角色与作用说明
组件类名 | 核心角色 | 关键作用 |
---|---|---|
Serializer | 序列化规范接口 | 定义所有序列化方式必须实现的 “序列化(serialize)” 和 “反序列化(diserialize)” 方法 |
JdkSerializer | 具体序列化实现类 | 基于 Java 原生 IO 流(ObjectInputStream/ObjectOutputStream)实现序列化逻辑 |
SerializerWrapper | 序列化器包装类 | 封装 “序列化标识(code)、类型名(type)、序列化器实例(serializer)”,避免零散传参 |
SerializerFactory | 序列化工厂(核心) | 通过静态缓存 Map 管理所有序列化器,提供 “按类型名 / 按标识” 获取序列化器的统一入口 |
接下来看下每个组件的具体应用和实现:
1. Serializer接口:定义序列化的 “通用规范”
public interface Serializer {// 序列化:Java对象 → 字节数组byte[] serialize(Object object);// 反序列化:字节数组 → 指定类型的Java对象(泛型保证类型安全)<T> T diserialize(byte[] bytes, Class<T> clazz);
}
核心作用:为所有序列化方式定 “规矩”—— 不管是JdkSerializer、JsonSerializer,都必须实现这两个方法,确保上层代码能以统一方式调用(比如编码器不用关心是 Java 序列化还是 JSON 序列化,只需调用serialize方法)。
泛型T的意义:diserialize方法的泛型+Class clazz,是为了让反序列化后直接返回目标类型对象(无需强制类型转换)。
2. JdkSerializer:Java 原生序列化的具体实现
public class JdkSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if(object == null) return null;// 用try-with-resources自动关闭流(避免内存泄漏)try(ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream outputStream = new ObjectOutputStream(baos);) {outputStream.writeObject(object); // 原生序列化核心APIreturn baos.toByteArray(); // 转成字节数组(网络可传输)} catch (IOException e) {log.error("序列化对象【{}】时发生异常",object);throw new SerializeException(e); // 抛自定义异常(便于上层统一捕获)}}@Overridepublic <T> T diserialize(byte[] bytes, Class<T> clazz) {if(bytes == null || clazz == null) return null;try(ByteArrayInputStream bais = new ByteArrayInputStream(bytes);ObjectInputStream objectInputStream = new ObjectInputStream(bais);) {return (T) objectInputStream.readObject(); // 原生反序列化核心API} catch (IOException | ClassNotFoundException e) {log.error("反序列化对象【{}】时发生异常",clazz);throw new SerializeException(e);}}
}
核心逻辑:基于 Java 原生的ObjectInputStream/ObjectOutputStream实现序列化,这是最基础的序列化方式(无需依赖第三方库)。
这里把
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);ObjectInputStream objectInputStream = new ObjectInputStream(bais)
写到try块的括号中是为了自动关闭流资源(ByteArrayOutputStream、ObjectOutputStream等)。
3. SerializerWrapper:序列化器的 “统一包装类”
@Data // Lombok注解:自动生成getter/setter/toString等
@NoArgsConstructor
@AllArgsConstructor
public class SerializerWrapper {private byte code; // 序列化标识(1字节,对应报文中的serializeType字段)private String type; // 序列化类型名(如"jdk"、"json",便于配置和日志)private Serializer serializer; // 实际的序列化器实例(如JdkSerializer)
}
核心作用:把 “标识、类型名、实例” 三个关联信息封装成一个对象,避免零散传递参数。
比如:之前需要单独传byte code+String type+Serializer serializer,现在只需传一个SerializerWrapper,代码更简洁,也避免参数顺序错乱。
与报文的关联:code字段对应之前报文中的 “序列化类型” 字段(1 字节),比如code=1代表 JDK 序列化,code=2代表 JSON 序列化,接收方通过code就能找到对应的反序列化器。这样可以通过多种方式找到对应的序列化方法。
4. SerializerFactory:工厂核心(管理 + 获取序列化器)
这是整个序列化工厂的 “大脑”,负责初始化序列化器缓存和提供统一获取入口,代码逻辑如下:
(1)静态缓存 Map:预加载 + 线程安全
// 按“类型名”缓存(如key="jdk" → value=SerializerWrapper)
private final static ConcurrentHashMap<String,SerializerWrapper> SERIALIZER_CACHE = new ConcurrentHashMap<>(8);
// 按“序列化标识”缓存(如key=1 → value=SerializerWrapper)
private final static ConcurrentHashMap<Byte,SerializerWrapper> SERIALIZER_CACHE_CODE = new ConcurrentHashMap<>(8);
ConcurrentHashMap:线程安全的 Map,避免多线程环境下(如 Netty 的 Worker 线程同时获取序列化器)出现并发修改异常。
双缓存设计:支持两种获取方式(按类型名 / 按标识),适配不同场景:
按类型名(SERIALIZER_CACHE):适合配置文件指定序列化方式(如配置serialize.type=jdk,直接通过 “jdk” 获取)。
按标识(SERIALIZER_CACHE_CODE):适合报文解析场景(从报文中读 1 字节code,直接通过code获取)。
(2)静态代码块:初始化缓存(预加载)
static{// 1. 创建JDK序列化的包装类(code=1,type="jdk",实例=JdkSerializer)SerializerWrapper jdk = new SerializerWrapper((byte) 1, "jdk", new JdkSerializer());// 2. 创建JSON序列化的包装类(code=2,type="json",实例=JsonSerializer)SerializerWrapper json = new SerializerWrapper((byte) 2, "json", new JsonSerializer());// 3. 存入按类型名缓存的MapSERIALIZER_CACHE.put("jdk",jdk);SERIALIZER_CACHE.put("json",json);// 4. 存入按标识缓存的MapSERIALIZER_CACHE_CODE.put((byte) 1,jdk);SERIALIZER_CACHE_CODE.put((byte) 2,json);
}
静态代码块特性:类加载时执行(只执行一次),提前将所有支持的序列化器加载到缓存中,后续获取时直接从 Map 中取,无需重复创建实例(避免频繁 new 对象的性能损耗)。
扩展方便:未来新增序列化方式(如 Hessian),只需在这里新增一个SerializerWrapper并放入两个 Map,无需修改其他逻辑。
(3)工厂方法:统一获取入口
// 按“类型名”获取(如传入"jdk",返回JDK序列化的包装类)
public static SerializerWrapper getSerializer(String serializeType){return SERIALIZER_CACHE.get(serializeType);
}// 按“序列化标识”获取(如传入1,返回JDK序列化的包装类)
public static SerializerWrapper getSerializer(byte serializeCode){return SERIALIZER_CACHE_CODE.get(serializeCode);
}
核心作用:上层代码无需关心缓存 Map 的存在,只需调用工厂方法,传入 “类型名” 或 “标识”,就能拿到对应的序列化器包装类,完全屏蔽了 “序列化器如何创建、如何管理” 的细节。
三、实际使用流程(结合 RPC 场景)
下面我们以 “编码器序列化 body” 和 “解码器反序列化 body” 为例,看看这套工厂如何工作:
- 编码器中使用(客户端 / 服务端发送请求时)
// 假设从配置中拿到序列化类型名"jdk",或从htrpcRequest中拿到code=1
String serializeType = "jdk"; // 或 byte serializeCode = 1;// 1. 通过工厂获取序列化器包装类
SerializerWrapper wrapper = SerializerFactory.getSerializer(serializeType);
// 2. 从包装类中拿到实际的序列化器实例(JdkSerializer)
Serializer serializer = wrapper.getSerializer();
// 3. 调用序列化方法,将RequestPayLoad转成字节数组(用于网络传输)
RequestPayLoad payload = msg.getRequestPayLoad();
byte[] bodyBytes = serializer.serialize(payload);
- 解码器中使用(服务端 / 客户端接收响应时)
// 1. 从报文中读取序列化标识(1字节,如code=1)
byte serializeCode = byteBuf.readByte();
// 2. 通过工厂获取对应的包装类
SerializerWrapper wrapper = SerializerFactory.getSerializer(serializeCode);
// 3. 拿到反序列化器实例
Serializer serializer = wrapper.getSerializer();
// 4. 调用反序列化方法,将字节数组转成RequestPayLoad
byte[] payloadBytes = ...; // 从报文中读取的body字节
RequestPayLoad payload = serializer.diserialize(payloadBytes, RequestPayLoad.class);
总结
今天主要完成了序列化工厂的创建以及雪花算法改造请求id。