Java并发编程-线程池(二)
文章目录
- 线程池的实现原理
- execute(Runnable command)
- **1. 阶段一:尝试创建核心线程**
- **2. 阶段二:尝试将任务加入队列**
- **3. 阶段三:尝试创建非核心线程或拒绝任务**
- **关键机制与设计思想**
线程池的实现原理
当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?根据刚刚讲的线程池参数的含义,我们来看一下线程池 的主要处理流程。
从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程是这样的, 这个很关键,面试必问。
- 判断核心线程池是否已满,即线程数是否达到
corePoolSize
如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程已经满了,则进入下个流程。
- 判断工作队列是否已经满。
BlockingQueue
如果工作队列没有满,则将新提交的任务存储在这 个工作队列里。如果工作队列满了,则进入下个流程。
- 判断线程池是否已满,即线程数是否达到
maxPoolSize
如果没有,则创建一个新的工作线程 来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
再来看看 ThreadPoolExecutor执行execute()方法的图:
按照我们上面说的, ThreadPoolExecutor执行execute方法也会分为这几种情况。
-
如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
-
如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
-
如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
-
如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁, 因为很明显这是一个严重的瓶颈。
在ThreadPoolExecutor完成预热之后 , 也就是当前运行的线程数大于等于corePoolSize,几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。
通过流程分析,我们很直观地了解了线程池的工作原理,接下来, 我们再通过源代码来看看具体是如何实现的
execute(Runnable command)
//高3位表示状态,低29位表示线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {//如果当前工作线程数小于核心线程数(corePoolSize),//则尝试创建新线程作为核心线程并立即执行任务if (addWorker(command, true)) // 以核心模式创建Workerreturn;c = ctl.get(); // 若创建失败(如线程池已关闭),重新获取ctl值}if (isRunning(c) && workQueue.offer(command)) { // 检查状态并尝试入队int recheck = ctl.get();if (! isRunning(recheck) && remove(command))// 线程池已关闭且任务成功移除reject(command); // 拒绝任务else if (workerCountOf(recheck) == 0) //无存活线程addWorker(null, false); //创建非核心线程防止队列任务积压}else if (!addWorker(command, false))// 以非核心模式创建Workerreject(command); // 抛出RejectedExecutionException异常
}
1. 阶段一:尝试创建核心线程
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) // 以核心模式创建Workerreturn;c = ctl.get(); // 若创建失败(如线程池已关闭),重新获取ctl值
}
-
目标:如果当前工作线程数小于核心线程数(
corePoolSize
),则尝试创建新线程作为核心线程并立即执行任务。 -
关键操作:
-
workerCountOf(c)
:解析ctl
变量(高3位表示状态,低29位表示线程数)获取当前工作线程数。 -
addWorker(command, true)
:尝试以核心线程限制(corePoolSize
)创建工作者(Worker
),command
作为首任务。 -
失败处理:若线程池已关闭(状态非
RUNNING
)或并发冲突导致创建失败,则进入阶段二。
-
2. 阶段二:尝试将任务加入队列
if (isRunning(c) && workQueue.offer(command)) { // 检查状态并尝试入队int recheck = ctl.get(); // 重新获取状态以应对并发变化if (!isRunning(recheck) && remove(command)) // 线程池已关闭且任务成功移除reject(command); // 拒绝任务else if (workerCountOf(recheck) == 0) // 无存活线程(例如核心线程超时被回收)addWorker(null, false); // 创建非核心线程(后续从队列取任务)
}
-
目标:当核心线程已满,尝试将任务加入阻塞队列(如
LinkedBlockingQueue
)。 -
关键操作:
-
双重状态检查:
-
初始入队前校验线程池是否处于运行状态(
isRunning(c)
)。 -
入队后再次校验(
recheck
),避免在入队期间线程池被关闭。
-
-
处理极端情况:
-
若线程池已关闭且成功从队列移除任务,则触发拒绝策略。
-
若所有工作线程意外终止(例如异常退出),则新建非核心线程(参数
firstTask = null
),强制处理队列中的积压任务。此处的null
表示新线程不立即执行提交的command
,而是直接从队列中获取任务(通过Worker.run()
中的循环逻辑)。此时创建的非核心线程虽然无初始任务,但会主动消费队列中积累的任务,确保队列不积压。
while (running) {Job job = null;synchronized (jobs) {if (jobs.isEmpty()) jobs.wait(); // 从队列取任务job = jobs.removeFirst();}if (job != null) job.run(); }
-
-
3. 阶段三:尝试创建非核心线程或拒绝任务
else if (!addWorker(command, false)) // 以非核心模式创建Workerreject(command); // 队列已满且线程数达到maximumPoolSize,拒绝任务
-
目标:当队列已满时,尝试创建非核心线程(线程数上限为
maximumPoolSize
)。 -
关键操作:
-
addWorker(command, false)
:以非核心模式创建工作者,直接执行当前任务。 -
失败条件:
-
线程数已达
maximumPoolSize
。 -
线程池已关闭(非
RUNNING
状态)。
-
-
拒绝策略:调用
RejectedExecutionHandler.rejectedExecution()
,默认抛出RejectedExecutionException
。
-
关键机制与设计思想
-
全局锁(Global Lock)的应用:
addWorker()
方法内部通过锁(如ReentrantLock
)同步线程池状态修改操作(例如增减线程、更新ctl
),确保原子性。
-
减少锁竞争优化:
- 在核心线程已预热的情况下,多数任务直接通过队列处理(阶段二),避免了频繁获取锁的性能损耗。
-
Worker执行逻辑(补充):
- 每个
Worker
启动后执行runWorker()
方法,循环从队列中获取任务。
- 每个