当前位置: 首页 > news >正文

自定义线程池 4.0

自定义线程池 4.0

1. 简介

上次我们实现了自定义线程池的 3.1 版本,提供了线程工厂创建线程和工具类创建简单线程池的功能,增强了线程池的灵活性,并且用起来更加方便了,本文我们将做如下的优化:

  • 给线程池添加关闭的方法。

这个功能看起来容易,实际做起来却很难

2. 总体设计

首先,我们需要思考什么叫关闭线程池,是执行完堆积的任务再让线程停止?还是让线程立即停止?实际上,这两种关闭线程池都可能用的到。先给这两个关闭的方法起名字吧,然后再说明使用场景:

  • shutdownWhenNoTask():执行完堆积的任务再让线程停止。使用的场景在于任务比较重要,必须得执行,但自己不想执行,想委托线程池执行。
  • shutdownNow():让线程立即停止,返回任务队列中的所有任务。使用的场景在于任务不重要,想要立即停止线程池,或者自己来执行没有执行的任务,不依赖线程池执行。

其次,当线程池关闭后,是无法再提交任务的,所以在线程池关闭后,再调用 submit() 方法直接拒绝。

接着,如何知道线程池处于什么状态呢?这有些困难,我先讲讲我的设计:给线程池添加一个状态变量 state,然后定义 2 个常量——RUNNING, SHUTDOWN,分别表示 运行状态关闭状态state 初始化为运行状态,在调用关闭方法时切换成关闭状态。但是,关闭方法可能会被多个线程调用,所以需要 state 的类型设计成原子类,这样这个状态变量就是线程安全的。

最后,如何让线程停止呢?这个问题思考起来就很复杂了,让我们进入标题 3 吧!

3. “让线程停止”的设计方案

3.1 Thread.interrupt()

Thread 有一个成员方法 interrupt(),这个方法并不是让线程直接停止,而是给线程打上停止的 标记,之后可以通过调用另一个成员方法 isInterrupted() 查看线程是否被打上停止标记。此外,调用这个方法会唤醒阻塞的线程,然后抛出 InterruptedException,这就是很多阻塞方法都需要处理 InterruptedException 异常的原因。

如果使用这个方法来向所有线程传递中断信号,那么可能会出现如下的情况:

线程 A 在执行任务时阻塞,线程 B 调用方法停止线程池,从而调用线程 A 的 interrupt() 方法给线程 A 打上停止标记,并唤醒线程 A。目前看来还正常,但如果线程 A 做了如下的异常处理,通过 Thread.interrupted() 清理了中断标记,那这个中断标记就无法被我们识别,从而无法使用中断标记使线程退出

try {...
} catch (InterruptedException e) {Thread.interrupted();...
}

3.2 POISON_PILL

POISON_PILL(毒丸) 是一种常见的设计模式,用于 安全地终止并发系统或进程间通信,通常通过 通过发送一个特殊的“毒丸”消息来通知接收者停止处理,从而优雅地关闭系统 的方式实现,它的核心思想如下:

  • 信号机制:发送一个特殊的消息(毒丸)作为终止信号。
  • 安全终止:接收者收到毒丸后,完成当前任务后停止处理,避免数据丢失或资源泄漏。
  • 解耦控制:发送者和接收者无需直接通信,通过消息传递实现异步终止。

我们完全可以使用毒丸来终止线程池中的所有线程,当调用停止方法时,向任务队列中投放一个毒丸,只要线程拿到毒丸,就直接退出吗?并不行,如果这时一个线程直接退出了,那其他线程怎么办?难道要在停止方法中投放与当前线程数量相同数量的毒丸吗?这样也不太好,我们可以把线程的退出想象成一个“流”,在这个“流”上,前面的事情处理完之后,需要通知后面的事情开始处理,同时也可以在没有后续事情时选择不通知,也就是说,我们可以在线程退出前向任务队列中投放一个毒丸,这样线程就前赴后继地拿到毒丸、退出了。

于是我们可以写出一个投放毒丸的方法(这个方法下面需要优化,目前只是一个简单的实现):

public void offerPoisonPill() {synchronized (threadPoolMonitor) {taskQueue.offer(POISON_PILL); // POISON_PILL 是一个 Runnable 类型的常量}
}

4. 两个关闭方法的实现

4.1 shutdownWhenNoTask()

这个方法要做的事情很简单,只需要将线程池的状态切换成关闭状态,然后再向队列中投放一个毒丸即可。

4.2 shutdownNow()

这个方法要做的事情稍微有点复杂,除了切换线程池状态和投放毒丸之外,还需要做两件事:

  • 给所有线程发送中断信号,尝试让它们中断。
  • 将队列中的所有任务放到一个集合中并返回。

5. 投放毒丸方法的优化

标题 3.2 中实现的投放毒丸是有问题的,假设任务队列已满,则会投放失败,所以我们需要得到 offer() 方法返回 true 的结果,从而保证毒丸真正放到队列中。

5.1 v1 多次投放

有一种十分容易想到的做法:

private void doOfferPoisonPill() {// 不断投放毒丸,直到成功为止while (!taskQueue.offer(POISON_PILL)) {}
}

但是,这种做法很耗性能,只要投放不成功,就一直重试,连休息的时间都没有(如果让线程休眠一段时间,会稍微好一点,但也属于线程在 忙等待)。

5.2 v2 阻塞投放

于是,想到了更高级的做法,只要投放不成功,就等待队列有空余位置,于是诞生了如下的代码:

/*** 任务队列的锁,用于生成一个 {@link Condition} 对象*/
private final Lock taskQueueLock = new ReentrantLock();/*** 任务队列已满的条件对象*/
private final Condition taskQueueNotFull = taskQueueLock.newCondition();private void doOfferPoisonPill() {// 不断投放毒丸,直到成功为止while (!taskQueue.offer(POISON_PILL)) {taskQueueLock.lock();try {taskQueueNotFull.await();} catch (InterruptedException ignore) {} finally {taskQueueLock.unlock();}}
}

更重要的是,我们不能再让其他类随便调用任务队列取出元素的方法了,因为我们需要 在取出元素时唤醒在这里阻塞等待队列空余位置的线程,所以我们需要将其取出元素的方法包装起来,如下所示:

/*** 使用 {@link BlockingQueue#take()} 方法从队列中取出任务** @return 任务队列中的任务*/
private Runnable takeTaskFromQueue() throws InterruptedException {taskQueueLock.lock();try {Runnable task = taskQueue.take();taskQueueNotFull.signalAll();return task;} finally {taskQueueLock.unlock();}
}/*** 使用 {@link BlockingQueue#take()} 方法从队列中取出任务** @param nanos 等待的时间,单位:ns* @return 任务队列中的任务,如果等待超时,则返回 {@code null}*/
private Runnable pollTaskFromQueue(long nanos) throws InterruptedException {taskQueueLock.lock();try {Runnable task = taskQueue.poll(nanos, TimeUnit.NANOSECONDS);if (task != null) {taskQueueNotFull.signalAll();}return task;} finally {taskQueueLock.unlock();}
}

6. 实现 4.0 版本

听明白上面这些设计后,我们终于能实现 4.0 版本了:

6.1 Worker

public abstract class Worker implements Runnable {/*** 线程执行的初始任务*/private Runnable initialTask;/*** 对 <strong>真正运行的线程</strong> 的引用,用于调用其 {@link Thread#start()} 方法启动线程*/private final Thread actuallyRunningThread;/*** {@link Worker} 存在的线程池*/protected final ThreadPool4_0 threadPool;public Worker(Runnable initialTask, Set<Worker> workerPool, ThreadFactory threadFactory, ThreadPool4_0 threadPool) {this.initialTask = initialTask;this.actuallyRunningThread = threadFactory.newThread(this);workerPool.add(this);this.threadPool = threadPool;}@Overridepublic final void run() {initialTask.run();initialTask = null; // help GCtry {while (true) {Runnable t = getTask();if (t == null) {// 检查是否获取到任务了,如果没有则退出循环,停止运行break;} else if (t == ThreadPool4_0.POISON_PILL) {// 如果任务是毒丸,则先往队列中再放一个毒丸,然后退出循环threadPool.offerPoisonPillIfThreadRest();// 以下这句话只是为了测试,在正式环境中最好注释掉LogUtil.infoWithTimeAndThreadName("发现毒丸,退出循环");break;}t.run();}} finally {onWorkerExit();}}/*** 启动内部保存的线程*/public final void start() {actuallyRunningThread.start();}/*** 给正在运行中的线程打上中断标记*/public final void interrupt() {actuallyRunningThread.interrupt();}/*** 获取任务,当返回 @{code null} 时,这个 {@link Worker} 对象就退出循环* <p>* 使用模板方法模式,交给子类实现** @return 获取到的任务*/protected abstract Runnable getTask();/*** 当 {@link Worker} 准备退出时执行的回调函数* <p>* 用于将 {@link Worker} 对象从线程池中移除*/private void onWorkerExit() {threadPool.removeWorkerFromThreadPool(this);}
}

6.2 ThreadPool4_0

public class ThreadPool4_0 {/*** 线程池中核心线程的最大数量*/private final int corePoolSize;/*** 线程池中线程的最大数量*/private final int maxPoolSize;/*** 临时线程阻塞的最长时间(单位:ns),超过这个时间还没有领取到任务就直接退出*/private final long keepAliveTime;/*** 任务队列*/private final BlockingQueue<Runnable> taskQueue;/*** 拒绝策略,用于在无法执行任务的时候拒绝任务*/private final RejectPolicy rejectPolicy;/*** 线程工厂*/private final ThreadFactory threadFactory;/*** 默认的线程工厂*/private static final ThreadFactory DEFAULT_THREAD_FACTORY = new ThreadFactory() {/*** 计数器,用来记录当前创建的是第几个线程,从 0 开始*/private int counter = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "thread-pool-" + counter++);}};/*** 构造一个线程池,默认参数如下:* <ul>*     <li>拒绝策略默认为抛出异常的拒绝策略</li>*     <li>线程工厂默认为线程添加了简单的名字 thread-pool-?</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列*/public ThreadPool4_0(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, RejectPolicy.THROW_EXCEPTION);}/*** 构造一个线程池,默认参数如下:* <ul>*     <li>拒绝策略默认为抛出异常的拒绝策略</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param threadFactory 线程工厂*/public ThreadPool4_0(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, ThreadFactory threadFactory) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, RejectPolicy.THROW_EXCEPTION, threadFactory);}/*** 构造一个线程池,默认参数如下:* <ul>*     <li>线程工厂默认为线程添加了简单的名字 thread-pool-?</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param rejectPolicy 拒绝策略*/public ThreadPool4_0(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, rejectPolicy, DEFAULT_THREAD_FACTORY);}/*** 构造一个线程池** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param rejectPolicy 拒绝策略* @param threadFactory 线程工厂*/public ThreadPool4_0(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy, ThreadFactory threadFactory) {this.corePoolSize = corePoolSize;this.maxPoolSize = maxPoolSize;this.keepAliveTime = unit.toNanos(keepAliveTime);this.taskQueue = taskQueue;this.rejectPolicy = rejectPolicy;this.threadFactory = threadFactory;}/*** 存放线程的集合,使用 {@link Set} 是因为 {@link Set#remove(Object)} 性能更高*/private final Set<Worker> threadPool = new HashSet<>();/*** 线程池的管程* <p>* 用于保证 <strong>将线程放入线程池</strong>、<strong>从线程池中移除线程</strong> 的互斥性* 同时也在保证 {@link #currPoolSize} 相关操作的互斥性*/private final Object threadPoolMonitor = new Object();/*** 线程池中当前线程数量,这个值 <= threadPool.size()* 在创建新线程时增加,在放毒丸时减少,threadPool.size() 减少的时机晚于 currPoolSize*/private int currPoolSize = 0;/*** <h3>核心线程执行的任务</h3>* {@link #getTask()} 方法会一直阻塞,直到有新任务*/public final class CoreWorker extends Worker {public CoreWorker(Runnable initialTask, Set<Worker> workerPool, ThreadFactory threadFactory,ThreadPool4_0 threadPool) {super(initialTask, workerPool, threadFactory, threadPool);}@Overrideprotected Runnable getTask() {try {return takeTaskFromQueue();} catch (InterruptedException e) {throw new RuntimeException(e);}}}/*** <h3>临时线程执行的任务</h3>* {@link #getTask()} 方法会在阻塞一定时间后如果还没有任务,则会返回 {@code null}*/public final class TempWorker extends Worker {public TempWorker(Runnable initialTask, Set<Worker> workerPool, ThreadFactory threadFactory,ThreadPool4_0 threadPool) {super(initialTask, workerPool, threadFactory, threadPool);}@Overrideprotected Runnable getTask() {try {return pollTaskFromQueue(keepAliveTime);} catch (InterruptedException e) {throw new RuntimeException(e);}}}/*** 线程池的状态,状态共有 2 种:* <ul>*     <li>{@link #RUNNING} 运行状态</li>*     <li>{@link #SHUTDOWN} 关闭状态,调用了线程池的关闭方法</li>* </ul>*/private final AtomicInteger state = new AtomicInteger(RUNNING);private static final int RUNNING = 1;private static final int SHUTDOWN = 2;/*** 提交任务** @param task 待执行的任务*/public void submit(Runnable task) {// 如果线程池的状态不是 RUNNING 状态,则直接拒绝任务if (state.get() != RUNNING) {rejectPolicy.reject(this, task);return;}// 如果 线程数量 小于 最大核心线程数量,则新建一个 核心线程 执行任务,然后直接返回synchronized (threadPoolMonitor) {if (currPoolSize < corePoolSize) {CoreWorker coreWorker = new CoreWorker(task, threadPool, threadFactory, this);coreWorker.start();currPoolSize++;return;}}// 如果能够放到任务队列中,则直接返回if (taskQueue.offer(task)) {return;}// 如果 线程数量 小于 最大线程数量,则新建一个 临时线程 执行任务synchronized (threadPoolMonitor) {if (currPoolSize < maxPoolSize) {TempWorker tempWorker = new TempWorker(task, threadPool, threadFactory, this);tempWorker.start();currPoolSize++;return;}}// 线程数量到达最大线程数量,任务队列已满,执行拒绝策略rejectPolicy.reject(this, task);}/*** 在没有任务执行时停止所有线程* 当此方法被调用,线程池会停止接受提交任务,然后等待线程将 它们正在执行的任务 和 任务队列中的任务 都执行完毕,之后让所有线程退出*/public void shutdownWhenNoTask() {// 将状态从 RUNNING 切换到 SHUTDOWNif (state.compareAndSet(RUNNING, SHUTDOWN)) {// 如果切换成功,则向任务队列中投放一个毒丸offerPoisonPill();}}/*** 立刻停止所有线程* 当此方法被调用,线程池会停止接受提交任务,给线程发送中断信号* <p>* 注意:<strong>如果在调用此方法之前调用了 {@link #shutdownWhenNoTask()} 方法,不会立刻停止所有线程</strong>** @return 任务队列中的任务*/public List<Runnable> shutdownNow() {// 将状态从 RUNNING 切换到 SHUTDOWN,如果修改失败,则表示线程池已经调用过关闭相关的方法了,直接返回一个空集合即可if (!state.compareAndSet(RUNNING, SHUTDOWN)) {return new ArrayList<>();}// 给所有线程发送中断信号synchronized (threadPoolMonitor) {threadPool.forEach(Worker::interrupt);}// 将任务队列中的任务放到一个集合中List<Runnable> taskList = new ArrayList<>(taskQueue.size());taskQueue.drainTo(taskList);// 向任务队列中投放一个毒丸offerPoisonPill();// 返回任务集合return taskList;}/*** 获取当前线程池中的线程数量** @return 当前线程池中的线程数量*/public int getCurrPoolSize() {synchronized (threadPoolMonitor) {return currPoolSize;}}/*** 获取当前任务队列中的任务数** @return 当前任务队列中的任务数*/public int getCurrTaskNum() {return taskQueue.size();}/*** 丢弃任务队列 {@link #taskQueue} 中的最旧的任务(队头任务)** @return 任务队列中的最旧的任务(队头任务)*/public Runnable discardOldestTask() {return taskQueue.poll();}/*** 使用 {@link BlockingQueue#take()} 方法从队列中取出任务** @return 任务队列中的任务*/private Runnable takeTaskFromQueue() throws InterruptedException {taskQueueLock.lock();try {Runnable task = taskQueue.take();taskQueueNotFull.signalAll();return task;} finally {taskQueueLock.unlock();}}/*** 使用 {@link BlockingQueue#take()} 方法从队列中取出任务** @param nanos 等待的时间,单位:ns* @return 任务队列中的任务,如果等待超时,则返回 {@code null}*/private Runnable pollTaskFromQueue(long nanos) throws InterruptedException {taskQueueLock.lock();try {Runnable task = taskQueue.poll(nanos, TimeUnit.NANOSECONDS);if (task != null) {taskQueueNotFull.signalAll();}return task;} finally {taskQueueLock.unlock();}}/*** 毒丸,用于让线程池中的所有线程退出* 投放到任务队列中,只要线程获取到这个任务,就退出,并且在 线程池中还有线程 的情况下将毒丸重新放回任务队列*/public static final Runnable POISON_PILL = () -> {};/*** 任务队列的锁,用于生成一个 {@link Condition} 对象*/private final Lock taskQueueLock = new ReentrantLock();/*** 任务队列已满的条件对象*/private final Condition taskQueueNotFull = taskQueueLock.newCondition();// 实际上投放毒丸的操作private void doOfferPoisonPill() {// 不断投放毒丸,直到成功为止while (!taskQueue.offer(POISON_PILL)) {taskQueueLock.lock();try {taskQueueNotFull.await();} catch (InterruptedException ignore) {} finally {taskQueueLock.unlock();}}currPoolSize--;}/*** 向任务队列中投放一个毒丸,等待线程领取后退出*/public void offerPoisonPill() {synchronized (threadPoolMonitor) {doOfferPoisonPill();}}/*** 在线程池中还有其他线程的情况下,向任务队列中投放一个毒丸,等待线程领取后退出,用于*/public void offerPoisonPillIfThreadRest() {synchronized (threadPoolMonitor) {if (currPoolSize > 0) {doOfferPoisonPill();}}}/*** 从 {@link #threadPool} 中移除指定的 {@link Worker} 对象** @param worker 待移除的 {@link Worker} 对象*/public void removeWorkerFromThreadPool(Worker worker) {synchronized (threadPoolMonitor) {threadPool.remove(worker);}}
}

7. 测试程序

public class ThreadPool4_0Test {/*** 测试线程池 4.0 版本的基本功能*/@Testpublic void test() throws InterruptedException {final int taskSize = 3;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool4_0 threadPool = new ThreadPool4_0(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");// 等待测试结束latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");}/*** 测试线程池 4.0 版本 {@link ThreadPool4_0#shutdownWhenNoTask()} 的功能*/@Testpublic void testShutdownWhenNoTask() throws InterruptedException {final int taskSize = 3;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool4_0 threadPool = new ThreadPool4_0(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");LogUtil.infoWithTimeAndThreadName("调用停止方法前,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("调用停止方法前,任务数量是" + threadPool.getCurrTaskNum());threadPool.shutdownWhenNoTask();LogUtil.infoWithTimeAndThreadName("调用停止方法后,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("调用停止方法后,任务数量是" + threadPool.getCurrTaskNum());// 等待任务执行完毕latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");LogUtil.infoWithTimeAndThreadName("执行完任务后,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("执行完任务后,任务数量是" + threadPool.getCurrTaskNum());}/*** 测试线程池 4.0 版本 {@link ThreadPool4_0#shutdownNow()} 的功能*/@Testpublic void testShutdownNow() throws InterruptedException {final int taskSize = 3;ThreadPool4_0 threadPool = new ThreadPool4_0(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {LogUtil.infoWithTimeAndThreadName("收到中断信号,停止任务执行");}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);});}LogUtil.infoWithTimeAndThreadName("提交任务后");LogUtil.infoWithTimeAndThreadName("调用停止方法前,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("调用停止方法前,任务数量是" + threadPool.getCurrTaskNum());List<Runnable> taskList = threadPool.shutdownNow();LogUtil.infoWithTimeAndThreadName("有" + taskList.size() + "个任务没有执行");LogUtil.infoWithTimeAndThreadName("调用停止方法后,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("调用停止方法后,任务数量是" + threadPool.getCurrTaskNum());// 等待任务执行完毕Thread.sleep(taskSize * 1000 + 100);LogUtil.infoWithTimeAndThreadName("任务执行完毕");LogUtil.infoWithTimeAndThreadName("执行完任务后,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("执行完任务后,任务数量是" + threadPool.getCurrTaskNum());}/*** 测试线程池 4.0 版本 调用停止方法后直接拒绝任务 的功能*/@Testpublic void testShutdownRejectTask() {Assertions.assertThrows(RuntimeException.class, () -> {final int taskSize = 3;ThreadPool4_0 threadPool = new ThreadPool4_0(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {LogUtil.infoWithTimeAndThreadName("收到中断信号,停止任务执行");}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);});}LogUtil.infoWithTimeAndThreadName("提交任务后");threadPool.shutdownWhenNoTask();threadPool.submit(() -> {});});}
}

8. 测试结果

8.1 test

21:21:24 [    main] 提交任务前
21:21:24 [    main] 提交任务后
21:21:25 [thread-pool-0] 正在执行任务0
21:21:26 [thread-pool-0] 正在执行任务1
21:21:27 [thread-pool-0] 正在执行任务2
21:21:27 [    main] 任务执行完毕

8.2 testShutdownWhenNoTask

在调用关闭方法后,队列中还剩一个任务,等待其执行完毕后,所有线程退出。

21:22:04 [    main] 提交任务前
21:22:04 [    main] 提交任务后
21:22:04 [    main] 调用停止方法前,线程数量是2
21:22:04 [    main] 调用停止方法前,任务数量是1
21:22:05 [thread-pool-0] 正在执行任务0
21:22:05 [thread-pool-1] 正在执行任务2
21:22:05 [thread-pool-1] 发现毒丸,退出循环
21:22:05 [    main] 调用停止方法后,线程数量是1
21:22:05 [    main] 调用停止方法后,任务数量是1
21:22:06 [thread-pool-0] 正在执行任务1
21:22:06 [thread-pool-0] 发现毒丸,退出循环
21:22:06 [    main] 任务执行完毕
21:22:06 [    main] 执行完任务后,线程数量是0
21:22:06 [    main] 执行完任务后,任务数量是0

8.3 testShutdownNow

在调用关闭方法后,队列中的一个任务被取出来,随后两个线程相继退出。

21:22:22 [    main] 提交任务前
21:22:22 [    main] 提交任务后
21:22:22 [    main] 调用停止方法前,线程数量是2
21:22:22 [    main] 调用停止方法前,任务数量是1
21:22:22 [thread-pool-1] 收到中断信号,停止任务执行
21:22:22 [thread-pool-0] 收到中断信号,停止任务执行
21:22:22 [thread-pool-1] 正在执行任务2
21:22:22 [thread-pool-0] 正在执行任务0
21:22:22 [thread-pool-1] 发现毒丸,退出循环
21:22:22 [    main] 有1个任务没有执行
21:22:22 [thread-pool-0] 发现毒丸,退出循环
21:22:22 [    main] 调用停止方法后,线程数量是0
21:22:22 [    main] 调用停止方法后,任务数量是0
21:22:25 [    main] 任务执行完毕
21:22:25 [    main] 执行完任务后,线程数量是0
21:22:25 [    main] 执行完任务后,任务数量是0

8.4 testShutdownRejectTask

这个测试只要通过就说明抛出异常了,即拒绝了提交的任务。

21:22:53 [    main] 提交任务前
21:22:53 [    main] 提交任务后
21:22:54 [thread-pool-0] 正在执行任务0
21:22:54 [thread-pool-1] 正在执行任务2
21:22:54 [thread-pool-1] 发现毒丸,退出循环

9. 思考

  • 我们在实现让线程池停止的方法时使用了毒丸的设计,你了解 ThreadPoolExecutor 使用的设计是什么吗?
  • 在测试时,我发现 testShutdownWhenNoTask 会有死锁的情况出现,你知道这是为什么吗?

10. 总结

这次我们实现了自定义线程池的 4.0 版本,了解了毒丸的思想,还在一定程度上顾及了多线程环境下的线程安全问题。以上,算是将线程池基本功能都实现了一遍,之后会专门写一篇文章用来回答之前没有讲的思考题。

http://www.xdnf.cn/news/1017649.html

相关文章:

  • 基于51单片机的简易售货机系统
  • 使用 C/C++ 和 OpenCV 构建智能停车场视觉管理系统
  • 在GIS 工作流中实现数据处理(4)
  • 用Java实现常见排序算法详解
  • 玩转Docker | 使用Docker部署vaultwarden密码管理器
  • 让 Deepseek 写电器电费计算器(html版本)
  • 使用docker compose部署netmaker打通内网
  • JDK 8u231安装教程 - Windows 64位下载安装及环境变量配置指南
  • 解决U盘安装Win11无法命令行跳过联网激活的问题
  • java复习 11
  • 使用 C++/OpenCV 和 libevent 构建远程智能停车场管理系统
  • 每天宜搭宜搭小知识—报表组件—柱线混合图
  • 算法第15天:继续二叉树|前序递归+回溯与前序递归的场景总结、最大二叉树、合并二叉树、二叉搜索树中的搜索、验证二叉搜索树
  • Mac电脑 系统监测工具 System Dashboard Pro
  • 【leetcode】543. 二叉树的直径
  • uni-app项目实战笔记4--使用组件具名插槽slot定义公共标题模块
  • 案例:城市“光革命”背后,塔能科技的智能照明进化方程式
  • 欧美简洁时尚风格通用PPT模版分享
  • 麒麟信安支撑2025年电力监控系统安全运维新技能推广应用示范培训班顺利举办
  • Java + easyexcel 新旧数据对比,单元格值标红
  • 优化 Excel 文件可以提升文件性能、减少文件大小并加快计算速度
  • mysql中替换字符串(正则)
  • mapbox进阶,切片网格生成实现
  • 深入理解Python协程:asyncio、异步并发、事件循环
  • 开疆智能ModbusTCP转Devicenet网关连接三菱PLC与ABB机器人配置案例
  • NAS 年中成果汇报:从入门到高阶的影视/音乐/小说/资源下载 等好玩Docker 全集合
  • Python让自动驾驶“看见未来”:环境建模那些事儿
  • AWS知识点和技术面试模拟题
  • 基于python大数据的nba球员可视化分析系统
  • 大模型驱动数据分析革新:美林数据智能问数解决方案破局传统 BI 痛点