接口适配器模式实现令牌桶算法和漏桶算法
以下是令牌桶算法、漏桶算法和雪花算法的清晰对比解析。它们属于完全不同的技术领域,前两者用于流量控制,后者用于分布式ID生成:
1. 令牌桶算法(Token Bucket)
- 领域:流量整形 / 速率限制
- 核心目标:控制请求处理速率,允许突发流量
- 工作原理:
- 系统以固定速率生成令牌存入桶中(桶有容量上限)。
- 请求到达时消耗令牌:
- 令牌足够 → 立即处理
- 令牌不足 → 拒绝或等待
- 关键特性:
- 应用场景:
- API接口限流(如允许瞬间高并发)
- 网络流量突发传输(视频直播)
- 微服务熔断降级
2. 漏桶算法(Leaky Bucket)
- 领域:流量整形 / 请求平滑
- 核心目标:强制请求以恒定速率处理,消除突发性
- 工作原理:
- 请求进入桶中(桶有容量上限)。
- 桶底以固定速率“漏出”请求进行处理(FIFO)。
- 桶满时新请求被拒绝。
- 关键特性:
- 应用场景:
- 网络设备发包速率控制
- 打印机任务队列调度
- 保护下游服务不被流量冲垮
3. 雪花算法(Snowflake)
- 领域:分布式系统 全局唯一ID生成
- 核心目标:在分布式环境下生成趋势递增、不重复的ID
- ID结构(64位二进制):
0 | 0000000000 0000000000 0000000000 0000000000 0 | 0000000000 | 000000000000 └─符号位(恒0) └─────41位时间戳(毫秒)─────┘ └─10位机器ID─┘ └──12位序列号──┘
- 工作原理:
- 时间戳:记录ID生成时间(可支持69年)
- 机器ID:区分不同节点(防止集群冲突)
- 序列号:同一毫秒内的自增序号(支持4096/ms/节点)
- 关键特性:
- 高性能:单机每秒可生成400万+ ID
- 去中心化:无需协调服务(如数据库、ZooKeeper)
- 有序性:ID按时间趋势递增(利于数据库索引)
- 应用场景:
- 分布式数据库主键(MySQL、MongoDB)
- 消息队列消息ID(Kafka、RocketMQ)
- 业务订单号/用户ID生成
三者的本质区别总结
维度 | 令牌桶算法 | 漏桶算法 | 雪花算法 |
---|---|---|---|
领域 | 流量控制 | 流量控制 | 分布式ID生成 |
核心目标 | 允许突发,限平均速率 | 强制平滑输出请求 | 生成全局唯一有序ID |
关键技术 | 令牌生成与消耗 | 请求队列+恒定速率泄漏 | 时间戳+机器ID+序列号 |
是否存储状态 | 是(令牌数) | 是(请求队列) | 是(时间戳/序列号计数) |
典型应用 | API限流、网络流量整形 | 打印机调度、流量平滑 | 数据库主键、分布式业务ID |
为什么雪花算法不用于流量控制?
- 雪花算法的“桶”是逻辑结构(用于组合ID),而令牌桶/漏桶的“桶”是请求缓冲容器,两者解决的问题维度完全不同。
💡 技术选型建议:
- 需要应对突发流量 → 令牌桶(如电商秒杀)
- 需严格平滑输出 → 漏桶(如支付回调)
- 需分布式唯一ID → 雪花算法(如订单系统)
接口:
public interface SourceAble {void tokenBucket() throws Exception;void leakyBucket() throws Exception;
}
桶的适配器:
public abstract class BucketWrapper implements SourceAble {@Overridepublic void tokenBucket() throws Exception {System.out.println("抽象类的令牌桶算法");}@Overridepublic void leakyBucket() throws Exception {System.out.println("抽象类的漏桶算法");}}
令牌桶与漏桶分别实现桶抽象类:令牌桶
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TokenBucket extends BucketWrapper {/*** 平均流量(每秒生成的令牌数量)*/private int avgFlowRate = 512;/*** 一个令牌允许的数据包大小 1字节*/private int everyTokenSize = 1;/*** 桶内令牌上限1024个,最大流量峰值(瞬间最大流量)为 everyTokenSize*maxFlowRate =1024Byte=1K*/private int maxFlowRate = 1024;/*** 队列来缓存桶数量*/private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<>(maxFlowRate);/*** 任务调度,固定速率产生令牌*/private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();/*** volatile: 当一个变量被volatile修饰时,对该变量的写操作会立即被其他线程可见,即保证了可见性。* volatile关键字还可以禁止编译器和处理器对被修饰变量的操作进行重排序优化。*/private volatile boolean isStart = false;/*** ReentrantLock(可重入锁)是Java中提供的一种高级并发锁机制,用于解决多线程并发访问共享资源时的同步和互斥控制问题。* ReentrantLock与synchronized关键字相比,提供了更灵活、更强大的锁定机制。*/private ReentrantLock lock = new ReentrantLock(true);public static TokenBucket newBuilder() {return new TokenBucket();}/*** 获取足够的令牌个数** @return boolean*/public boolean getTokens(byte[] dataSize) {// 传输内容大小对应的桶个数int needTokenNum = dataSize.length / everyTokenSize + 1;final ReentrantLock lock = this.lock;lock.lock();try {// 是否存在足够的桶数量boolean result = needTokenNum <= tokenQueue.size();System.out.println("桶内令牌数" + (result ? "充足" : "不足") + ",总令牌数:" +tokenQueue.size() + " 需要:" + needTokenNum);if (!result) {return false;}int tokenCount = 0;for (int i = 0; i < needTokenNum; i++) {Byte poll = tokenQueue.poll();if (poll != null) {tokenCount++;}}return tokenCount == needTokenNum;} finally {lock.unlock();}}/*** 添加令牌** @param tokenNum*/public void addTokens(Integer tokenNum) {for (int i = 0; i < tokenNum; i++) {// 若是桶已经满了,就不再加入新的令牌tokenQueue.offer((byte) 1);}}/*** build,完成初始化,并开始任务调度(始按照固定速率生产令牌)** @return*/public TokenBucket build() {start();return this;}public void start() {// 初始化桶队列大小if (maxFlowRate != 0) {tokenQueue = new ArrayBlockingQueue<>(maxFlowRate);}// 初始化令牌生产者TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);isStart = true;}@AllArgsConstructorstatic class TokenProducer implements Runnable {private int avgFlowRate;private TokenBucket tokenBucket;@Overridepublic void run() {tokenBucket.addTokens(avgFlowRate);}}public TokenBucket maxFlowRate(int maxFlowRate) {this.maxFlowRate = maxFlowRate;return this;}public TokenBucket avgFlowRate(int avgFlowRate) {this.avgFlowRate = avgFlowRate;return this;}@Overridepublic void tokenBucket() throws Exception {// 平均流量512,瞬时流量1024TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();// 四个字节String data = "0000";for (int i = 1; i <= 20; i++) {// 生成随机大小的流量请求int copyNum = new Random().nextInt(100);StringBuilder stringBuilder = new StringBuilder(data.length() * copyNum);for (int j = 0; j < copyNum; j++) {stringBuilder.append(data);}// 获取对应数量的令牌boolean tokens = tokenBucket.getTokens(stringBuilder.toString().getBytes());if (tokens) {// 拿到令牌,通过System.out.println("第 " + i + " 次请求通过,传输数据:" + copyNum+ "字节 桶内令牌数量:" + tokenBucket.tokenQueue.size());} else {// 没有拿到令牌,拒绝System.out.println("第 " + i + " 次请求拒绝,传输数据:" + copyNum+ "字节 桶内令牌数量:" + tokenBucket.tokenQueue.size());}TimeUnit.MILLISECONDS.sleep(100);}}
}
漏桶:
@EqualsAndHashCode(callSuper = false)
@Data
@NoArgsConstructor
public class LeakyBucket extends BucketWrapper {/*** 当前时间*/private Long lastLeakyTimeStamp = System.currentTimeMillis();/*** 桶的容量*/private Integer burst;/*** 水漏出的速度,每毫秒处理的数量*/private Double rate;/*** 当前水量(当前累积请求数)*/private Integer currentWater = 0;public LeakyBucket(int burst, double rate) {this.burst = burst;this.rate = rate;}/*** 漏水*/public void leaky() {long now = System.currentTimeMillis();// 间隔的时间long deltaTs = now - this.lastLeakyTimeStamp;// 漏掉的水int deltaQuota = (int) (deltaTs * rate);// 桶内剩余的水int leftWater = currentWater - deltaQuota;// 先执行漏水,计算剩余水量currentWater = Math.max(0, leftWater);//更新上一次漏水时间为当前时间lastLeakyTimeStamp = now;}/*** 加水(新请求)** @return 是否满水*/public boolean acquire(int size) {leaky();if (currentWater + size <= burst) {// 尝试加水,并且水还未满currentWater += size;return true;} else {// 水满,拒绝加水return false;}}@Overridepublic void leakyBucket() throws Exception {// 每100ms,流入10的水,流出5的水for (int i = 0; i < 100; i++) {Boolean acquire = acquire(10);System.out.println("本次申请:" + (acquire ? "成功" : "失败") +"当前桶内水量:" + currentWater +" 剩余空间:" + (burst - currentWater));TimeUnit.MILLISECONDS.sleep(100);}}
}
雪花算法:
@EqualsAndHashCode(callSuper = false)
@Data
public class SnowFlake extends BucketWrapper {/*** 因为二进制里第一个 bit 为如果是 1,那么都是负数,但是我们生成的 id 都是正数,所以第一个 bit 统一都是 0。*/private long sequence = 0L;/*** 代表一毫秒内生成的多个id的最新序号 12位 4096 -1 = 4095 个,每毫秒生产的序列号之从0开始递增;*/private long sequenceBits = 12L;/*** 最大值4095,到最大后从0开始*/private long sequenceMask = ~(-1L << sequenceBits);/*** 10位机器码看成是“5位dataCenterId+5位workerId”*/private long workerId;/*** 机器ID 2进制5位 32位减掉1位 31个*/private long workerIdBits = 5L;private long workerIdShift = sequenceBits;/*** 机房ID 2进制5位 32位减掉1位 31个*/private long dataCenterId;private long dataCenterIdBits = 5L;private long dataCenterIdShift = sequenceBits + workerIdBits;private long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;/*** 记录上一个id的产生时间戳,判断是否在 同一毫秒内*/private long lastTimestamp = -1L;SnowFlake(long dataCenterId, long workerId) {// 检查机房id和机器id是否超过31 不能小于0// 就是5 bit最多只能有31个数字,也就是说机器id最多只能是32以内long maxWorkerId = ~(-1L << workerIdBits);long maxDataCenterId = ~(-1 << dataCenterIdBits);if ((dataCenterId > maxDataCenterId || dataCenterId < 0) || (workerId > maxWorkerId || workerId < 0)) {throw new IllegalArgumentException("dataCenterId/workerId值非法");}this.dataCenterId = dataCenterId;this.workerId = workerId;}/*** 通过SnowFlake生成id的核心算法,通过调用nextId()方法,让当前这台机器上的snowflake算法程序生成一个全局唯一的id** @return*/private synchronized long nextId() {//获取计算id时刻的时间戳long timestamp = System.currentTimeMillis();//保证递增if (timestamp < lastTimestamp) {throw new RuntimeException("时间戳值非法");}//如果在同一个毫秒内,又发送了一个请求生成一个id,这时把seqence序号给递增1,最多就是4096个(0-4095)if (lastTimestamp == timestamp) {/*通过位运算保证sequence不会超出序列号所能容纳的最大值4095如果到达最大值4095此时再增加一个(即sequence + 1),就会使sequence恢复到0,所以可用sequence==0,表示sequence已满。*/sequence = (sequence + 1) & sequenceMask;//当某一毫秒的时间,产生的id数 超过4095,系统会进入等待,直到下一毫秒,系统继续产生IDif (sequence == 0) {timestamp = tilNextMillis(lastTimestamp);}} else {//如果此次生成id的时间戳,与上次的时间戳不同,就已经可以根据时间戳区分id值,sequence从0开始sequence = 0L;}//更新最近一次生成id的时间戳lastTimestamp = timestamp;/* 最核心的二进制位运算操作,生成id1.将41位时间戳左移动22位;2.将5位dataCenterId向左移动17位,并将5位workerId向左移动12位;3.sequence本来就在最低位,因此不需要移动。以下<<和|运算,实际就是将时间戳、机器码和序列号移动到snowflake中相应的位置。最后拼接起来成一个64 bit的二进制数字,转换成10进制就是个long型*//* 时间戳 2^41 - 1 差不多可以用69年设置一个时间初始值 1288834974657L(1970-01-01 00:00:00到2010年11月04日01:42:54所经过的毫秒数)现在的某一时刻时间戳减去1288834974657L的值,正好在2^41内。*/long twepoch = 1288834974657L;return ((timestamp - twepoch) << timestampLeftShift) | (dataCenterId << dataCenterIdShift) | (workerId << workerIdShift) | sequence;}/*** 当某一毫秒的时间,产生的id数 超过4095,系统会进入等待,直到下一毫秒,系统继续产生ID** @param lastTimestamp* @return*/private long tilNextMillis(long lastTimestamp) {long timestamp = System.currentTimeMillis();/*如果当前时刻的时间戳<=上一次生成id的时间戳,就重新生成当前时间。即确保当前时刻的时间戳,与上一次的时间戳不会重复。*/while (timestamp <= lastTimestamp) {timestamp = System.currentTimeMillis();}return timestamp;}@Overridepublic void snowFlake() throws Exception {for (int i = 0; i < 22; i++) {long id = nextId();String biStr = Long.toBinaryString(id);System.out.println(id + "\t" + biStr);}}
}