Java面试实战系列【并发篇】- Semaphore深度解析与实战
文章目录
- 一、引言
- 1.1 什么是Semaphore信号量
- 1.2 Semaphore的核心概念:许可证机制
- 1.3 为什么需要Semaphore
- 二、Semaphore核心原理
- 2.1 信号量的理论基础
- 2.2 基于AQS的底层实现机制
- 2.3 公平与非公平模式原理
- 2.4 许可证的获取与释放流程
- 2.5 源码关键片段深度解析
- 三、Semaphore核心API详解
- 3.1 构造方法详解
- 3.2 acquire()系列方法深度剖析
- 3.3 release()系列方法详解
- 3.4 tryAcquire()尝试获取许可证
- 3.5 其他重要方法介绍
- 四、Semaphore典型使用场景
- 4.1 连接池资源管理
- 4.2 限流与流量控制
- 4.3 生产者-消费者模式
一、引言
1.1 什么是Semaphore信号量
Semaphore(信号量)是Java并发包java.util.concurrent
中的一个重要同步工具类,它用于控制同时访问某个资源的线程数量。Semaphore维护了一组许可证(permits),线程在访问资源前需要获取许可证,访问完成后释放许可证。
核心概念:
- 许可证(Permits):代表可以同时访问资源的线程数量
- 获取(Acquire):线程请求获得一个或多个许可证
- 释放(Release):线程释放持有的许可证,供其他线程使用
图解说明:上图展示了Semaphore的基本工作原理。许可证池中有固定数量的许可证,线程获取许可证后才能访问资源,当许可证用完时,新来的线程需要等待。
1.2 Semaphore的核心概念:许可证机制
许可证机制是Semaphore的核心设计思想,它类似于现实生活中的停车场管理:
时序图说明:
- Semaphore初始化时设定许可证数量(本例为2)
- 线程1和线程2成功获取许可证并访问资源
- 线程3因为无可用许可证而等待
- 当线程1释放许可证后,线程3立即获得许可证
- 所有线程使用完资源后都会释放许可证
1.3 为什么需要Semaphore
在实际开发中,我们经常遇到需要限制并发访问数量的场景:
典型应用场景:
- 数据库连接池:限制同时连接数据库的连接数
- HTTP连接池:控制同时发起的HTTP请求数量
- 线程池控制:限制同时执行的任务数量
- 限流保护:保护系统不被过多请求压垮
- 资源池管理:管理有限的系统资源(如文件句柄、内存等)
不使用Semaphore的问题:
// 问题示例:无控制的并发访问
public class UncontrolledAccess {private final ExpensiveResource resource = new ExpensiveResource();public void accessResource() {// 所有线程都可以同时访问,可能导致:// 1. 系统资源耗尽// 2. 性能急剧下降// 3. 系统崩溃resource.doWork();}
}
使用Semaphore的解决方案:
// 解决方案:使用Semaphore控制并发
public class ControlledAccess {private final Semaphore semaphore = new Semaphore(5); // 最多5个线程同时访问private final ExpensiveResource resource = new ExpensiveResource();public void accessResource() throws InterruptedException {semaphore.acquire(); // 获取许可证try {resource.doWork(); // 安全访问资源} finally {semaphore.release(); // 确保释放许可证}}
}
二、Semaphore核心原理
2.1 信号量的理论基础
信号量(Semaphore)概念最初由计算机科学家Edsger Dijkstra在1965年提出,是解决并发编程中资源竞争问题的重要工具。
信号量的数学模型:
P操作(获取):
if (S > 0) {S = S - 1; // 获取成功,减少可用资源
} else {block(); // 获取失败,线程阻塞等待
}V操作(释放):
S = S + 1; // 增加可用资源
wakeup(); // 唤醒等待的线程
Java中的对应关系:
- P操作 ↔
acquire()
方法 - V操作 ↔
release()
方法 - S值 ↔ 可用许可证数量
2.2 基于AQS的底层实现机制
Semaphore基于AbstractQueuedSynchronizer(AQS)框架实现,AQS提供了同步状态管理和线程阻塞/唤醒的基础设施。
类图说明:
- Semaphore:对外提供的API接口
- Sync:内部抽象同步器,继承自AQS
- FairSync/NonfairSync:公平和非公平模式的具体实现
- AQS:提供同步状态管理和线程队列管理
AQS状态表示:
// AQS中的state字段表示可用许可证数量
// state > 0 : 有可用许可证
// state = 0 : 无可用许可证,新线程需要等待
// state < 0 : 理论上不会出现(Semaphore确保state >= 0)protected int getState() { return state; }
protected boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2.3 公平与非公平模式原理
Semaphore支持两种获取许可证的策略:公平模式和非公平模式。
公平模式(FairSync):
protected int tryAcquireShared(int acquires) {for (;;) {// 关键:检查是否有前驱节点在等待if (hasQueuedPredecessors())return -1; // 有线程在排队,当前线程不能插队int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}
非公平模式(NonfairSync):
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;// 注意:没有检查hasQueuedPredecessors(),允许插队}
}
两种模式的对比:
性能特性分析:
- 公平模式:保证先到先得,但性能较低(需要检查队列状态)
- 非公平模式:性能更高,但可能导致线程饥饿
2.4 许可证的获取与释放流程
获取许可证的完整流程:
释放许可证的完整流程:
2.5 源码关键片段深度解析
核心获取逻辑:
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}// AQS中的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}// 非公平模式的tryAcquireShared实现
final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}
源码解析要点:
1. 中断检查机制:
if (Thread.interrupted())throw new InterruptedException();
在获取许可证前首先检查线程是否被中断,体现了可中断设计原则。
2. 快速路径优化:
if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
首先尝试快速获取,只有在失败时才进入复杂的队列等待逻辑,这是一种重要的性能优化策略。
3. CAS无锁更新:
if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;
这里的逻辑很巧妙:
- 如果
remaining < 0
,说明许可证不足,直接返回负数表示失败 - 如果许可证足够,尝试CAS更新;如果CAS失败,会在下一次循环中重试
- 这种设计在高并发下既保证了正确性,又提供了较好的性能
4. 释放逻辑的实现:
public void release() {sync.releaseShared(1);
}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}
释放逻辑的关键点:
- 溢出检查:
if (next < current)
检查整数溢出,防止许可证数量超过最大值 - 无条件释放:release操作不检查当前线程是否持有许可证,这是Semaphore的设计特点
- CAS重试:使用无锁的CAS操作更新状态,失败时自动重试
5. 等待队列的管理:
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
这段代码展示了AQS队列的精妙设计:
- 共享模式节点:使用
Node.SHARED
标记,与独占模式不同 - 传播机制:
setHeadAndPropagate
确保在有多个许可证时能唤醒多个等待线程 - 中断响应:
parkAndCheckInterrupt()
检查中断状态,实现可中断等待 - 异常安全:finally块确保在异常情况下正确清理节点
三、Semaphore核心API详解
3.1 构造方法详解
Semaphore提供了两个构造方法,允许配置许可证数量和公平性策略:
// 构造方法1:指定许可证数量,默认非公平模式
public Semaphore(int permits)// 构造方法2:指定许可证数量和公平性策略
public Semaphore(int permits, boolean fair)
参数说明:
- permits:初始许可证数量,必须 >= 0
- fair:公平性策略,true为公平模式,false为非公平模式
使用示例:
// 创建一个有5个许可证的非公平信号量
Semaphore semaphore1 = new Semaphore(5);// 创建一个有3个许可证的公平信号量
Semaphore semaphore2 = new Semaphore(3, true);// 创建一个许可证数量为0的信号量(常用于一次性事件)
Semaphore semaphore3 = new Semaphore(0);
构造方法源码解析:
public Semaphore(int permits) {sync = new NonfairSync(permits); // 默认使用非公平模式
}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}// Sync构造方法
Sync(int permits) {setState(permits); // 设置AQS的state为许可证数量
}
构造方法设计要点:
- 默认选择非公平模式是因为性能更好,大多数场景下也是合适的
- 许可证数量直接设置为AQS的state值,利用AQS的原子操作保证线程安全
- 通过不同的Sync实现类来支持公平和非公平两种策略
3.2 acquire()系列方法深度剖析
acquire()系列方法是获取许可证的核心API,提供了多种获取策略:
// 基本获取方法
public void acquire() throws InterruptedException
public void acquire(int permits) throws InterruptedException// 不可中断获取方法
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)
3.2.1 基本acquire()方法:
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);
}
方法特性分析:
- 可中断:线程在等待过程中可以被中断
- 阻塞等待:如果许可证不足,线程会阻塞直到有可用许可证
- 异常安全:被中断时抛出InterruptedException
使用示例:
public class ResourceManager {private final Semaphore semaphore = new Semaphore(3);public void accessResource() throws InterruptedException {// 获取一个许可证semaphore.acquire();try {System.out.println(Thread.currentThread().getName() + " 正在访问资源");Thread.sleep(2000); // 模拟资源使用} finally {semaphore.release(); // 确保释放许可证}}public void batchAccess(int count) throws InterruptedException {// 批量获取多个许可证semaphore.acquire(count);try {System.out.println("批量获取了 " + count + " 个许可证");// 执行需要多个许可证的操作} finally {semaphore.release(count); // 批量释放}}
}
3.2.2 不可中断acquire方法:
public void acquireUninterruptibly() {sync.acquireShared(1);
}public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits);
}
不可中断方法的特点:
- 线程在等待过程中不响应中断
- 适用于必须获得资源才能继续的场景
- 即使线程被中断,也会继续等待直到获得许可证
使用场景对比:
// 场景1:可中断获取 - 适用于可以取消的任务
public void cancellableTask() {try {semaphore.acquire(); // 可以被中断取消doWork();} catch (InterruptedException e) {System.out.println("任务被取消");Thread.currentThread().interrupt(); // 恢复中断状态} finally {semaphore.release();}
}// 场景2:不可中断获取 - 适用于关键资源访问
public void criticalTask() {semaphore.acquireUninterruptibly(); // 不可被中断try {doCriticalWork(); // 关键任务必须完成} finally {semaphore.release();}
}
3.3 release()系列方法详解
release()方法用于释放许可证,增加可用许可证的数量:
// 释放一个许可证
public void release()// 释放多个许可证
public void release(int permits)
3.3.1 基本release()方法源码:
public void release() {sync.releaseShared(1);
}public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);
}// AQS中的releaseShared方法
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared(); // 唤醒等待的线程return true;}return false;
}
release方法的重要特性:
1. 无所有权检查:
// Semaphore不检查释放许可证的线程是否曾经获取过许可证
public class PermitDemo {private static final Semaphore semaphore = new Semaphore(1);public static void main(String[] args) throws InterruptedException {// 线程A获取许可证Thread threadA = new Thread(() -> {try {semaphore.acquire();System.out.println("Thread A 获取许可证");Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}// 注意:Thread A 不释放许可证});// 线程B释放许可证(即使它没有获取过)Thread threadB = new Thread(() -> {try {Thread.sleep(1000);semaphore.release(); // 这是允许的!System.out.println("Thread B 释放许可证");} catch (InterruptedException e) {e.printStackTrace();}});threadA.start();threadB.start();threadA.join();threadB.join();}
}
这种设计的优缺点:
- 优点:灵活性高,可以实现复杂的许可证管理策略
- 缺点:容易出现许可证泄漏或错误释放的问题
2. 许可证数量可以超过初始值:
public class PermitOverflow {public static void main(String[] args) {Semaphore semaphore = new Semaphore(2); // 初始2个许可证System.out.println("初始许可证: " + semaphore.availablePermits()); // 输出: 2semaphore.release(3); // 释放3个许可证System.out.println("释放后许可证: " + semaphore.availablePermits()); // 输出: 5// 现在有5个许可证可用,超过了初始值2}
}
3. 批量释放的原子性:
// 批量释放许可证的示例
public class BatchRelease {private final Semaphore semaphore = new Semaphore(10);public void batchOperation() throws InterruptedException {int permits = 5;semaphore.acquire(permits); // 批量获取5个许可证try {// 执行需要5个许可证的操作processWithMultiplePermits();} finally {semaphore.release(permits); // 原子性地释放5个许可证}}private void processWithMultiplePermits() {// 模拟需要多个许可证的操作System.out.println("使用5个许可证执行批量操作");}
}
3.4 tryAcquire()尝试获取许可证
tryAcquire()系列方法提供非阻塞的许可证获取方式:
// 立即尝试获取,不等待
public boolean tryAcquire()
public boolean tryAcquire(int permits)// 带超时的尝试获取
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
3.4.1 立即尝试获取:
public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;
}public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();return sync.nonfairTryAcquireShared(permits) >= 0;
}
重要特性:
- 非阻塞:立即返回,不会等待
- 非公平:即使是公平模式的Semaphore,tryAcquire也是非公平的
- 返回值:true表示获取成功,false表示获取失败
使用示例:
public class NonBlockingAccess {private final Semaphore semaphore = new Semaphore(3);private final AtomicInteger successCount = new AtomicInteger(0);private final AtomicInteger failureCount = new AtomicInteger(0);public void attemptAccess() {if (semaphore.tryAcquire()) {successCount.incrementAndGet();try {System.out.println(Thread.currentThread().getName() + " 获取许可证成功");Thread.sleep(1000); // 模拟工作} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {semaphore.release();}} else {failureCount.incrementAndGet();System.out.println(Thread.currentThread().getName() + " 获取许可证失败,执行降级逻辑");fallbackLogic(); // 执行备用逻辑}}private void fallbackLogic() {// 获取许可证失败时的备用处理逻辑System.out.println("执行备用处理方案");}public void printStatistics() {System.out.println("成功获取: " + successCount.get());System.out.println("获取失败: " + failureCount.get());}
}
3.4.2 带超时的尝试获取:
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
超时获取的特性:
- 有限等待:在指定时间内等待,超时则返回false
- 可中断:等待过程中可以被中断
- 精确控制:可以精确控制等待时间
超时获取的使用场景:
public class TimeoutAccess {private final Semaphore semaphore = new Semaphore(2);// 场景1:用户请求处理,不能无限等待public boolean processUserRequest() {try {// 最多等待5秒if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) {try {return doProcessing();} finally {semaphore.release();}} else {System.out.println("系统繁忙,请稍后重试");return false;}} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}// 场景2:批处理任务,设置合理超时public boolean processBatch(List<Task> tasks) {int requiredPermits = Math.min(tasks.size(), semaphore.availablePermits());try {// 根据任务数量动态设置超时时间long timeout = tasks.size() * 100; // 每个任务预期100msif (semaphore.tryAcquire(requiredPermits, timeout, TimeUnit.MILLISECONDS)) {try {return processTasks(tasks.subList(0, requiredPermits));} finally {semaphore.release(requiredPermits);}} else {// 部分处理策略return processPartially(tasks);}} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}private boolean doProcessing() {// 模拟处理逻辑return true;}private boolean processTasks(List<Task> tasks) {// 批量处理任务return true;}private boolean processPartially(List<Task> tasks) {// 部分处理逻辑return true;}static class Task {// 任务定义}
}
3.5 其他重要方法介绍
除了核心的获取和释放方法,Semaphore还提供了一些实用的辅助方法:
3.5.1 状态查询方法:
// 获取当前可用许可证数量
public int availablePermits()// 获取正在等待许可证的线程数量(估算值)
public final int getQueueLength()// 检查是否有线程正在等待许可证
public final boolean hasQueuedThreads()// 返回等待线程的集合(用于调试)
protected Collection<Thread> getQueuedThreads()// 检查是否使用公平策略
public boolean isFair()
状态查询方法的使用示例:
public class SemaphoreMonitor {private final Semaphore semaphore;private final int totalPermits;public SemaphoreMonitor(int permits, boolean fair) {this.semaphore = new Semaphore(permits, fair);this.totalPermits = permits;}public void printStatus() {System.out.println("=== Semaphore 状态监控 ===");System.out.println("总许可证数量: " + totalPermits);System.out.println("可用许可证: " + semaphore.availablePermits());System.out.println("使用中许可证: " + (totalPermits - semaphore.availablePermits()));System.out.println("等待队列长度: " + semaphore.getQueueLength());System.out.println("是否有等待线程: " + semaphore.hasQueuedThreads());System.out.println("公平模式: " + semaphore.isFair());System.out.println("使用率: " + String.format("%.1f%%", (totalPermits - semaphore.availablePermits()) * 100.0 / totalPermits));}// 监控线程,定期输出状态public void startMonitoring() {Thread monitor = new Thread(() -> {while (!Thread.currentThread().isInterrupted()) {try {printStatus();Thread.sleep(5000); // 每5秒监控一次} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}});monitor.setDaemon(true);monitor.start();}
}
3.5.2 批量操作方法:
// 排空所有可用许可证
public int drainPermits()// 减少可用许可证数量
protected void reducePermits(int reduction)
批量操作的源码实现:
public int drainPermits() {return sync.drainPermits();
}final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}
}
drainPermits的使用场景:
public class SystemMaintenance {private final Semaphore semaphore = new Semaphore(10);// 系统维护时,停止新的请求处理public void startMaintenance() {System.out.println("开始系统维护...");// 排空所有许可证,阻止新请求int drained = semaphore.drainPermits();System.out.println("已排空 " + drained + " 个许可证");// 等待当前正在处理的请求完成waitForActiveRequestsComplete();// 执行维护操作performMaintenance();}public void endMaintenance() {System.out.println("维护完成,恢复服务...");// 恢复许可证,重新允许请求处理semaphore.release(10);}private void waitForActiveRequestsComplete() {// 等待活跃请求完成的逻辑}private void performMaintenance() {// 维护操作逻辑}
}
方法使用注意事项:
- availablePermits():返回的是瞬时值,在高并发环境下可能立即过期
- getQueueLength():返回的是估算值,主要用于监控和调试
- drainPermits():会影响所有等待的线程,使用时需要谨慎
- reducePermits():受保护的方法,主要用于子类扩展
四、Semaphore典型使用场景
4.1 连接池资源管理
连接池是Semaphore最经典的应用场景之一。在数据库连接池、HTTP连接池等场景中,需要限制同时建立的连接数量以避免资源耗尽。
连接池的基本实现框架:
public class DatabaseConnectionPool {private final Semaphore connectionSemaphore;private final Queue<Connection> availableConnections;private final Set<Connection> allConnections;private final int maxConnections;public DatabaseConnectionPool(int maxConnections) {this.maxConnections = maxConnections;this.connectionSemaphore = new Semaphore(maxConnections);this.availableConnections = new ConcurrentLinkedQueue<>();this.allConnections = ConcurrentHashMap.newKeySet();// 初始化连接池initializeConnections();}public Connection getConnection() throws InterruptedException {// 获取许可证connectionSemaphore.acquire();try {Connection connection = availableConnections.poll();if (connection == null || !isValidConnection(connection)) {connection = createNewConnection();}return connection;} catch (Exception e) {// 如果获取连接失败,释放许可证connectionSemaphore.release();throw e;}}public void returnConnection(Connection connection) {if (connection != null && allConnections.contains(connection)) {if (isValidConnection(connection)) {availableConnections.offer(connection);} else {// 连接已失效,创建新连接补充try {Connection newConnection = createNewConnection();availableConnections.offer(newConnection);} catch (Exception e) {// 创建失败时记录日志,但仍要释放许可证System.err.println("Failed to create replacement connection: " + e.getMessage());}}// 释放许可证,允许其他线程获取连接connectionSemaphore.release();}}private void initializeConnections() {for (int i = 0; i < maxConnections; i++) {try {Connection connection = createNewConnection();availableConnections.offer(connection);} catch (Exception e) {System.err.println("Failed to initialize connection: " + e.getMessage());}}}private Connection createNewConnection() {// 模拟创建数据库连接Connection connection = new MockConnection();allConnections.add(connection);return connection;}private boolean isValidConnection(Connection connection) {// 检查连接是否有效try {return connection != null && !connection.isClosed();} catch (Exception e) {return false;}}// 模拟Connection类private static class MockConnection implements Connection {private boolean closed = false;@Overridepublic boolean isClosed() { return closed; }@Overridepublic void close() { closed = true; }// 其他Connection方法的模拟实现...}
}
连接池使用示例:
public class DatabaseService {private final DatabaseConnectionPool connectionPool;public DatabaseService() {this.connectionPool = new DatabaseConnectionPool(10); // 最大10个连接}public void executeQuery(String sql) {Connection connection = null;try {// 获取连接(可能需要等待)connection = connectionPool.getConnection();// 执行数据库操作executeSQL(connection, sql);} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("获取数据库连接被中断");} catch (Exception e) {System.err.println("数据库操作失败: " + e.getMessage());} finally {// 确保连接被归还if (connection != null) {connectionPool.returnConnection(connection);}}}private void executeSQL(Connection connection, String sql) {// 模拟SQL执行System.out.println("执行SQL: " + sql);try {Thread.sleep(1000); // 模拟数据库操作耗时} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
4.2 限流与流量控制
在微服务架构中,限流是保护系统稳定性的重要手段。Semaphore可以有效控制接口的并发访问量。
接口限流器的实现:
public class RateLimiter {private final Semaphore semaphore;private final int maxConcurrency;private final long timeoutMs;// 监控统计private final AtomicLong totalRequests = new AtomicLong(0);private final AtomicLong rejectedRequests = new AtomicLong(0);private final AtomicLong timeoutRequests = new AtomicLong(0);public RateLimiter(int maxConcurrency, long timeoutMs) {this.maxConcurrency = maxConcurrency;this.timeoutMs = timeoutMs;this.semaphore = new Semaphore(maxConcurrency);}/*** 执行限流保护的操作*/public <T> T execute(Supplier<T> operation) throws RateLimitException {totalRequests.incrementAndGet();try {// 尝试在指定时间内获取许可证if (semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS)) {try {return operation.get();} finally {semaphore.release();}} else {// 获取许可证超时timeoutRequests.incrementAndGet();throw new RateLimitException("Request timeout: too many concurrent requests");}} catch (InterruptedException e) {Thread.currentThread().interrupt();rejectedRequests.incrementAndGet();throw new RateLimitException("Request interrupted", e);}}/*** 立即执行,不等待*/public <T> Optional<T> tryExecute(Supplier<T> operation) {totalRequests.incrementAndGet();if (semaphore.tryAcquire()) {try {return Optional.of(operation.get());} finally {semaphore.release();}} else {rejectedRequests.incrementAndGet();return Optional.empty();}}/*** 获取限流统计信息*/public RateLimitStats getStats() {return new RateLimitStats(maxConcurrency,semaphore.availablePermits(),totalRequests.get(),rejectedRequests.get(),timeoutRequests.get(),semaphore.getQueueLength());}/*** 动态调整并发限制*/public void adjustLimit(int newLimit) {int currentLimit = maxConcurrency;int difference = newLimit - currentLimit;if (difference > 0) {// 增加许可证semaphore.release(difference);} else if (difference < 0) {// 减少许可证try {semaphore.acquire(-difference);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}// 自定义异常public static class RateLimitException extends Exception {public RateLimitException(String message) {super(message);}public RateLimitException(String message, Throwable cause) {super(message, cause);}}// 统计信息类public static class RateLimitStats {private final int maxConcurrency;private final int availablePermits;private final long totalRequests;private final long rejectedRequests;private final long timeoutRequests;private final int waitingThreads;public RateLimitStats(int maxConcurrency, int availablePermits, long totalRequests, long rejectedRequests, long timeoutRequests, int waitingThreads) {this.maxConcurrency = maxConcurrency;this.availablePermits = availablePermits;this.totalRequests = totalRequests;this.rejectedRequests = rejectedRequests;this.timeoutRequests = timeoutRequests;this.waitingThreads = waitingThreads;}public double getSuccessRate() {return totalRequests == 0 ? 1.0 : (totalRequests - rejectedRequests - timeoutRequests) * 1.0 / totalRequests;}public double getCurrentUtilization() {return (maxConcurrency - availablePermits) * 1.0 / maxConcurrency;}@Overridepublic String toString() {return String.format("RateLimitStats{maxConcurrency=%d, available=%d, " +"total=%d, rejected=%d, timeout=%d, waiting=%d, " +"successRate=%.2f%%, utilization=%.2f%%}",maxConcurrency, availablePermits, totalRequests, rejectedRequests, timeoutRequests, waitingThreads,getSuccessRate() * 100, getCurrentUtilization() * 100);}}
}
限流器的使用示例:
public class APIController {private final RateLimiter rateLimiter = new RateLimiter(10, 5000); // 最大10并发,超时5秒// 使用限流保护的APIpublic ResponseEntity<String> protectedAPI() {try {String result = rateLimiter.execute(() -> {// 执行实际的业务逻辑return performBusinessLogic();});return ResponseEntity.ok(result);} catch (RateLimiter.RateLimitException e) {return ResponseEntity.status(429) // Too Many Requests.body("Service is busy, please try again later");}}// 立即响应的API(不等待)public ResponseEntity<String> immediateAPI() {Optional<String> result = rateLimiter.tryExecute(() -> {return performBusinessLogic();});if (result.isPresent()) {return ResponseEntity.ok(result.get());} else {return ResponseEntity.status(503) // Service Unavailable.body("Service is temporarily unavailable");}}// 监控接口public ResponseEntity<RateLimiter.RateLimitStats> getStats() {return ResponseEntity.ok(rateLimiter.getStats());}private String performBusinessLogic() {// 模拟业务处理try {Thread.sleep(1000); // 模拟1秒处理时间} catch (InterruptedException e) {Thread.currentThread().interrupt();}return "Business logic executed successfully";}
}
4.3 生产者-消费者模式
在生产者-消费者模式中,Semaphore可以用来控制缓冲区的容量,确保生产者不会生产过多数据,消费者也不会消费空缓冲区。
基于Semaphore的生产者-消费者实现:
public class ProducerConsumerBuffer<T> {private final Queue<T> buffer;private final Semaphore emptySlots; // 空槽位信号量private final Semaphore fullSlots; // 满槽位信号量private final Semaphore mutex; // 互斥访问信号量private final int capacity;// 统计信息private final AtomicLong producedCount = new AtomicLong(0);private final AtomicLong consumedCount = new AtomicLong(0);public ProducerConsumerBuffer(int capacity) {this.capacity = capacity;this.buffer = new LinkedList<>();this.emptySlots = new Semaphore(capacity); // 初始时所有槽位都为空this.fullSlots = new Semaphore(0); // 初始时没有满槽位this.mutex = new Semaphore(1); // 互斥访问}/*** 生产者放入数据*/public void put(T item) throws InterruptedException {emptySlots.acquire(); // 等待空槽位mutex.acquire(); // 获取互斥锁try {buffer.offer(item);producedCount.incrementAndGet();System.out.println("Produced: " + item + ", Buffer size: " + buffer.size());} finally {mutex.release(); // 释放互斥锁fullSlots.release(); // 增加满槽位计数}}/*** 生产者尝试放入数据(非阻塞)*/public boolean tryPut(T item, long timeout, TimeUnit unit) throws InterruptedException {if (emptySlots.tryAcquire(timeout, unit)) {if (mutex.tryAcquire(timeout, unit)) {try {buffer.offer(item);producedCount.incrementAndGet();System.out.println("Produced (timeout): " + item + ", Buffer size: " + buffer.size());return true;} finally {mutex.release();fullSlots.release();}} else {emptySlots.release(); // 获取mutex失败,释放emptySlotsreturn false;}}return false;}/*** 消费者取出数据*/public T take() throws InterruptedException {fullSlots.acquire(); // 等待满槽位mutex.acquire(); // 获取互斥锁try {T item = buffer.poll();consumedCount.incrementAndGet();System.out.println("Consumed: " + item + ", Buffer size: " + buffer.size());return item;} finally {mutex.release(); // 释放互斥锁emptySlots.release(); // 增加空槽位计数}}/*** 消费者尝试取出数据(非阻塞)*/public T tryTake(long timeout, TimeUnit unit) throws InterruptedException {if (fullSlots.tryAcquire(timeout, unit)) {if (mutex.tryAcquire(timeout, unit)) {try {T item = buffer.poll();consumedCount.incrementAndGet();System.out.println("Consumed (timeout): " + item + ", Buffer size: " + buffer.size());return item;} finally {mutex.release();emptySlots.release();}} else {fullSlots.release(); // 获取mutex失败,释放fullSlotsreturn null;}}return null;}/*** 获取缓冲区状态*/public BufferStats getStats() {return new BufferStats(capacity,buffer.size(),emptySlots.availablePermits(),fullSlots.availablePermits(),producedCount.get(),consumedCount.get());}/*** 清空缓冲区*/public void clear() throws InterruptedException {mutex.acquire();try {int size = buffer.size();buffer.clear();// 重置信号量状态fullSlots.drainPermits();emptySlots.drainPermits();emptySlots.release(capacity);System.out.println("Buffer cleared, removed " + size + " items");} finally {mutex.release();}}// 缓冲区统计信息public static class BufferStats {private final int capacity;private final int currentSize;private final int emptySlots;private final int fullSlots;private final long producedCount;private final long consumedCount;public BufferStats(int capacity, int currentSize, int emptySlots, int fullSlots, long producedCount, long consumedCount) {this.capacity = capacity;this.currentSize = currentSize;this.emptySlots = emptySlots;this.fullSlots = fullSlots;this.producedCount = producedCount;this.consumedCount = consumedCount;}public double getUtilization() {return currentSize * 1.0 / capacity;}@Overridepublic String toString() {return String.format("BufferStats{capacity=%d, size=%d, empty=%d, full=%d, " +"produced=%d, consumed=%d, utilization=%.1f%%}",capacity, currentSize, emptySlots, fullSlots,producedCount, consumedCount, getUtilization() * 100);}}
}
生产者-消费者的使用示例:
public class ProducerConsumerDemo {private static final ProducerConsumerBuffer<String> buffer = new ProducerConsumerBuffer<>(5);public static void main(String[] args) throws InterruptedException {// 启动多个生产者for (int i = 0; i < 3; i++) {final int producerId = i;new Thread(() -> {try {for (int j = 0; j < 10; j++) {String item = "Producer-" + producerId + "-Item-" + j;buffer.put(item);Thread.sleep(1000); // 生产间隔}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}, "Producer-" + i).start();}// 启动多个消费者for (int i = 0; i < 2; i++) {final int consumerId = i;new Thread(() -> {try {for (int j = 0; j < 15; j++) {String item = buffer.take();// 模拟消费处理时间Thread.sleep(1500);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}, "Consumer-" + i).start();}// 监控线程new Thread(() -> {try {while (!Thread.currentThread().isInterrupted()) {Thread.sleep(3000);System.out.println("=== " + buffer.getStats() + " ===");}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}, "Monitor").start();// 主线程等待Thread.sleep(30000);}
}