JAVA线程池ThreadPoolExecutor说明
1. 核心参数与线程池状态
-
核心参数:
-
corePoolSize
:核心线程数,即使空闲也不会回收(除非设置allowCoreThreadTimeOut
)。 -
maximumPoolSize
:最大线程数,队列满时允许创建的最大临时线程数,包含核心线程数。 -
workQueue
:任务队列(如LinkedBlockingQueue
),用于缓存未执行的任务。 -
keepAliveTime
:非核心线程空闲存活时间。
-
-
线程池状态(通过
ctl
变量高3位表示):-
RUNNING
(111):接收新任务并处理队列任务。 -
SHUTDOWN
(000):不接收新任务,但处理队列中剩余任务。 -
STOP
(001):中断所有线程,丢弃队列任务。 -
TIDYING
(010):所有任务终止,即将执行terminated()
钩子方法。 -
TERMINATED
(011):线程池完全终止。
-
2. 任务提交:execute()
方法
源码核心逻辑如下:
public void execute(Runnable command) {if (command == null) throw new NullPointerException();int c = ctl.get();// 步骤1:当前线程数 < corePoolSize,创建核心线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) // true表示创建核心线程return;c = ctl.get(); // 创建失败后重新获取ctl}// 步骤2:尝试将任务加入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 二次检查线程池状态(防止在此期间被关闭)if (!isRunning(recheck) && remove(command))reject(command); // 如果线程池已关闭,回滚任务并拒绝else if (workerCountOf(recheck) == 0)addWorker(null, false); // 确保至少有一个线程处理队列任务}// 步骤3:队列已满,尝试创建非核心线程else if (!addWorker(command, false)) // false表示非核心线程reject(command); // 创建失败(线程数已达maximumPoolSize),触发拒绝策略
}
3. 线程执行逻辑:Worker
与 runWorker()
Worker 类(内部类)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {final Thread thread; // 实际执行任务的线程Runnable firstTask; // 初始任务(可能为null)Worker(Runnable firstTask) {this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); // 通过线程工厂创建线程}public void run() {runWorker(this); // 线程启动后调用runWorker}
}
核心方法:runWorker(Worker w)
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 允许中断boolean completedAbruptly = true;try {// 循环获取任务并执行while (task != null || (task = getTask()) != null) {w.lock(); // 加锁,确保任务执行期间不被中断(除非线程池关闭)// 检查线程池状态(如果已STOP,中断线程)if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task); // 钩子方法(可自定义)try {task.run(); // 实际执行任务} catch (Throwable ex) {afterExecute(task, ex); // 钩子方法(异常处理)throw ex;}afterExecute(task, null); // 钩子方法(正常完成)} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly); // 线程退出处理}
}
4. 获取任务:getTask()
private Runnable getTask() {boolean timedOut = false;for (;;) {int c = ctl.get();// 检查线程池状态if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null; // 线程需要终止}int wc = workerCountOf(c);// 判断是否允许超时(非核心线程或允许核心线程超时)boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 检查线程数是否超过maximumPoolSize或超时if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null; // 减少线程数并返回nullcontinue;}try {// 从队列中取任务(阻塞或超时)Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true; // 超时标记} catch (InterruptedException retry) {timedOut = false;}}
}
5. 线程回收:processWorkerExit()
当 getTask()
返回 null
时,线程会调用此方法清理资源:
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 若任务执行异常,减少线程数decrementWorkerCount();// 统计已完成的任务数final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w); // 移除Worker} finally {mainLock.unlock();}// 尝试终止线程池(如果状态为SHUTDOWN且队列为空)tryTerminate();// 补充新线程(如果队列中仍有任务)int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && !workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return;}addWorker(null, false); // 补充新线程处理队列任务}
}
6. 拒绝策略实现
默认实现为 AbortPolicy
:
public static class AbortPolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());}
}
其他策略如 CallerRunsPolicy
(由调用者线程执行任务)、DiscardPolicy
(静默丢弃)等均可自定义。
总结:源码核心流程
-
任务提交:通过
execute()
方法触发线程创建、入队或拒绝。 -
线程管理:通过
Worker
封装线程和任务,runWorker()
循环执行任务。 -
任务获取:
getTask()
从队列中获取任务,支持阻塞或超时等待。 -
线程回收:超时或异常时,调用
processWorkerExit()
清理资源并补充线程。 -
状态控制:通过
ctl
变量的原子操作管理线程池状态和线程数。