Java并发编程-线程池(四)
文章目录
- 线程池实现原理
- Worker
- Worker 核心设计总结
- runWorker(Worker w)
- 总结
线程池实现原理
上一篇我们看了 addWork 方法,那接下来就让我们详细看看内部类Worker。
Worker
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{final Thread thread; //worker自己的线程Runnable firstTask;Worker(Runnable firstTask) {setState(-1); // 默认禁止中断this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this); // 调用ThreadPoolExecutor的runWorker方法(后面会讲)}protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) { //仅当状态为0时获取锁,不可重入setExclusiveOwnerThread(Thread.currentThread());//课后习题return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {//调用链:shutdownNow()->interruptWorkers()->interruptIfStarted()Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {// 中断条件:运行中/运行结束 && 线程初始化过 && 没有被中断过try {t.interrupt();} catch (SecurityException ignore) {}}}
}
Worker
类也继承了AQS,主要目的是利用state实现线程执行任务时的中断控制,确保线程池的优雅关闭机制。利用AQS的state
字段来区分线程的初始状态、运行状态及中断权限。
-
初始化锁状态:在
Worker
的构造函数中调用setState(-1)
,将状态设置为非0值(-1),表示**默认禁止中断** 。这样可以避免新创建的Worker
线程在未执行任务前(即未调用runWorker
方法时)被中断干扰。- 实际任务执行时:在
runWorker
方法中,通过调用unlock()
方法将状态从-1
重置为0
,表示允许中断。
- 实际任务执行时:在
-
shutdownNow()的中断条件:运行中/运行结束 && 线程初始化过 && 没有被中断过
-
shutdown()的中断条件:
final void runWorker(Worker w) {...w.unlock(); // 允许中断...w.lock();...w.unlock();...
}
//调用链:shutdown()->interruptIdleWorkers()
private void interruptIdleWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {// 中断条件:没有被中断 && 没有任务在执行try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}}} finally {mainLock.unlock();}
}
Worker 核心设计总结
-
同步机制:通过AQS的自定义锁管理,实现任务的原子执行和中断控制。
-
状态标记:利用AQS的
state
字段区分线程的初始状态、运行状态及中断权限。 -
线程池管理:支持线程池在关闭时区分处理空闲线程和运行线程,确保任务完整性。
Worker会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点:
runWorker(Worker w)
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 允许中断//标识线程是否因异常(如任务抛出未捕获错误)而非正常退出boolean completedAbruptly = true; try {// 循环获取工作队列里的任务来执行while (task != null || (task = getTask()) != null) {// 当前有任务 或者 能从队列中拿到任务w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||//若线程池正在关闭(STOP 状态)(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&// Thread.interrupted() 会清除当前线程的中断状态(返回历史值),// 若为 true 且线程池已停止,需重新标记中断防止代码忽略此请求。!wt.isInterrupted())//在强制关闭(如 shutdownNow())时,快速中断正在处理的任务wt.interrupt();try {//空方法(钩子),可被子类覆写以添加日志、监控或自定义逻辑(如任务执行时间统计)beforeExecute(wt, task);Throwable thrown = null;try {task.run(); // 执行任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;//避免任务对象被错误复用或内存泄漏w.completedTasks++;//统计Worker完成的任务数(用于线程池监控或扩容策略)w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly)//补偿操作:若线程异常终止(未正常调整计数),//需主动减少工作线程计数(workerCount)以保持准确性decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;//累计完成数workers.remove(w);//移除失效线程} finally {mainLock.unlock();}//当前线程退出后可能满足线程池终止条件(如所有线程退出且任务队列为空)//尝试将线程池状态过渡到TERMINATED(需符合SHUTDOWN/STOP状态且无活动线程和待处理任务)tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {//线程池仍允许处理任务(如RUNNING/SHUTDOWN)if (!completedAbruptly) {//若允许核心线程超时(allowCoreThreadTimeOut),最小值为 0;//否则为 corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;// 队列非空时至少保留一个线程处理任务if (workerCountOf(c) >= min)return; //当前线程数足够,无需补充}addWorker(null, false);//补充新Worker}
}
总结
ThreadPoolExecutor中线程执行任务的示意图
线程池中的线程执行任务分两种情况。
-
在execute()方法中创建一个线程时,会让这个线程执行当前任务。
-
这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行。