当前位置: 首页 > ai >正文

手写一个简单的线程池

手写一个简单的线程池

项目仓库:https://gitee.com/bossDuy/hand-tearing-thread-pool
基于一个b站up的课程:https://www.bilibili.com/video/BV1cJf2YXEw3/?spm_id_from=333.788.videopod.sections&vd_source=4cda4baec795c32b16ddd661bb9ce865

理解线程池的原理

线程池就是为了减少频繁的创建和销毁线程带来的性能损耗,工作原理:

在这里插入图片描述

简单的说:线程池就是有一个存放线程的集合和一个存放任务的阻塞队列。当提交一个任务的时候,判断核心线程是否满了,没满就会创建一个核心线程加入线程池并且执行任务,核心线程是不会被销毁的即使没有任务执行;满了就会放入任务队列等待;如果队列满了的话就会创建非核心线程进行执行任务,这些非核心线程在不执行任务的时候就会等一段时间销毁(配置的过期时间),如果创建的线程达到了最大线程数,那么就会执行拒绝策略。

可以简要整理如下:

提交任务 -> 核心任务是否已满为满,创建核心线程并执行任务已满,则加入任务队列队列未满 -> 等待执行队列已满 -> 创建非核心线程达到线程最大数量 -> 拒绝策略未达到最大数量 -> 执行任务

自己实现简单的线程池

第一步:实现了一个线程复用的线程池

package com.yb0os1;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class MyThreadPool {//1、线程什么时候创建?/**核心线程中我们要保证线程是可以复用的,那么就不可以直接new Thread(task).start(); 这样执行完task线程就会被销毁了我们将接收到的任务对象放到队列中,然后线程从队列中取出任务,通过任务的run方法进行调用,这样就是在该线程上调用任务,并且调用完后不会销毁线程*///2、我们一开始使用 while (true) if(!tasks.isEmpty()) Runnable task = tasks.remove(0);/**这样如果任务队列一直为空就会一循环,消耗cpu资源。此时就是阻塞队列出现了,当为空阻塞等待 非空执行*/BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(1024);Thread thread = new Thread(()->{while (true){if(!taskQueue.isEmpty()){try {Runnable task = taskQueue.take();task.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}},"唯一线程");{thread.start();//启动线程}public void execute(Runnable task){taskQueue.offer(task);//向队列添加元素 尽量是否offer 满则返回false  add满则排除异常}
}
package com.yb0os1;public class Main {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool();for (int i = 0; i < 5; i++) {myThreadPool.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {//InterruptedException这个是线程中断异常,// 这个异常一般都是线程在等待或者阻塞中被中断了就会抛出的,// sleep wait等等都是有,除了LockSupport.park 这个会记录中断位 不会抛出这个异常e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"执行完毕");});}System.out.println("主线程没有被阻塞");}
}

测试结果:

在这里插入图片描述

第二步:实现多个线程复用的线程池

package com.yb0os1;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class MyThreadPool {//任务队列private final BlockingQueue<Runnable> taskQueue;//核心线程的数量private final int corePoolSize;//最大线程的数量private final int maxPoolSize;private final int keepAliveTime;private final TimeUnit unit;public MyThreadPool(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> taskQueue) {this.corePoolSize = corePoolSize;this.maxPoolSize = maxPoolSize;this.keepAliveTime = keepAliveTime;this.unit = unit;this.taskQueue = taskQueue;}//核心线程List<Thread> coreList = new ArrayList<>();//非核心线程List<Thread> supportList = new ArrayList<>();//添加元素和判断长度不是原子的,所以存在线程安全问题 可以加锁 CAS等解决public void execute(Runnable command) {//目前线程列表中线程数量小于核心线程的数量,则创建线程if (coreList.size() < corePoolSize) {Thread thread = new CoreThread();coreList.add(thread);thread.start();
//            return;}//成功添加到阻塞队列if (taskQueue.offer(command)) {return;}//任务队列也满了 需要创建非核心线程//核心线程满 任务队列满 但是非核心线程没有满才可以添加if (coreList.size() + supportList.size() < maxPoolSize) {Thread thread = new SupportThread();supportList.add(thread);thread.start();return;}//我们创建完线程之后 并没有处理刚才的command 不能确定是否队列真的满了if (!taskQueue.offer(command)) {//真的满了 抛出异常throw new RuntimeException("线程池已满");}}class CoreThread extends Thread {@Overridepublic void run() {while (true) {try {Runnable task = taskQueue.take();task.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}class SupportThread extends Thread {@Overridepublic void run() {while (true) {try {Runnable command  = taskQueue.poll(keepAliveTime, unit);//等待一秒没有获取就会返回nullif (command  == null) {//线程结束break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println(Thread.currentThread().getName()+"非核心线程结束");supportList.remove(Thread.currentThread());System.out.println("当前非核心线程数量为:" + supportList.size());}}
}
package com.yb0os1;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;public class Main {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool(2,4,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2));for (int i = 0; i < 4; i++) {myThreadPool.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {//InterruptedException这个是线程中断异常,// 这个异常一般都是线程在等待或者阻塞中被中断了就会抛出的,// sleep wait等等都是有,除了LockSupport.park 这个会记录中断位 不会抛出这个异常e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"执行完毕");});}System.out.println("主线程没有被阻塞");}
}

存在问题,任务没有被正确的执行:

在这里插入图片描述

b站评论区指出的:if (blockingQueue.offer(command)) { return; } 这里如果任务成功放入队列,方法就直接 return 了。 但在 创建 SupportThread 的逻辑中,没有保证这个任务会被执行,因为 offer() 失败后你才创建新线程。 但 command 并没有交给这个新线程,而是再次尝试 offer(),如果失败就直接走拒绝策略了。 这样的话,可能 SupportThread 已经启动,但任务却没被执行。

理解:如果队列满了,我们创建非核心线程,但是并没有将这任务直接交给我们创建的新线程,而是再次尝试加入队列中,这就导致了一个不确定的状态:

  1. 如果此时队列还是满的(offer 返回 false),就会直接抛出异常,任务未被执行
  2. 如果队列此时恰好有空间(可能因为其他线程刚刚完成了任务,从而腾出了队列空间),那么任务会被放入队列,后续由某个线程(可能是核心线程,也可能是其他非核心线程)从队列中取出并执行。但新创建的非核心线程可能并没有真正处理这个任务。

解决方案:如果队列满了,我们要创建非核心线程并且由这个线程执行任务

也可以说 让线程执行当前 command 之后,再从 queue 中拿任务

第三步:修复bug 设计拒绝策略

package com.yb0os1;import com.yb0os1.reject.DiscardRejectHandle;
import com.yb0os1.reject.ThrowRejectHandle;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;public class Main {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool(2,4,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2),new DiscardRejectHandle());for (int i = 0; i < 8; i++) {int finalI = i;myThreadPool.execute(()->{try {Thread.sleep(100);} catch (InterruptedException e) {//InterruptedException这个是线程中断异常,// 这个异常一般都是线程在等待或者阻塞中被中断了就会抛出的,// sleep wait等等都是有,除了LockSupport.park 这个会记录中断位 不会抛出这个异常e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"执行完毕---"+ finalI);});}System.out.println("主线程没有被阻塞");}
}
package com.yb0os1;import com.yb0os1.reject.RejectHandle;import java.sql.SQLOutput;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class MyThreadPool {public BlockingQueue<Runnable> getTaskQueue() {return taskQueue;}//任务队列private final BlockingQueue<Runnable> taskQueue;//核心线程的数量private final int corePoolSize;//最大线程的数量private final int maxPoolSize;private final int keepAliveTime;private final TimeUnit unit;//拒绝策略private final RejectHandle rejectHandle;public MyThreadPool(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> taskQueue,  RejectHandle rejectHandle) {this.corePoolSize = corePoolSize;this.maxPoolSize = maxPoolSize;this.keepAliveTime = keepAliveTime;this.unit = unit;this.taskQueue = taskQueue;this.rejectHandle = rejectHandle;}//核心线程List<Thread> coreList = new ArrayList<>();//非核心线程List<Thread> supportList = new ArrayList<>();//添加元素和判断长度不是原子的,所以存在线程安全问题 可以加锁 CAS等解决public void execute(Runnable command) {//目前线程列表中线程数量小于核心线程的数量,则创建线程if (coreList.size() < corePoolSize) {Thread thread = new CoreThread(command);coreList.add(thread);thread.start();return;}//成功添加到阻塞队列if (taskQueue.offer(command)) {return;}//任务队列也满了 需要创建非核心线程//核心线程满 任务队列满 但是非核心线程没有满才可以添加if (coreList.size() + supportList.size() < maxPoolSize) {Thread thread = new SupportThread(command);supportList.add(thread);thread.start();return;}//我们创建完线程之后 并没有处理刚才的command 不能确定是否队列真的满了if (!taskQueue.offer(command)) {//真的满 使用拒绝策略rejectHandle.reject(command,this);}}//优先处理传过来的 然后再去阻塞队列中获取class CoreThread extends Thread {private final Runnable command;CoreThread(Runnable command) {this.command = command;}@Overridepublic void run() {command.run();while (true) {try {Runnable task = taskQueue.take();System.out.println("核心线程"+Thread.currentThread().getName()+"正在执行任务");task.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}class SupportThread extends Thread {private final Runnable command;SupportThread(Runnable command) {this.command = command;}@Overridepublic void run() {command.run();while (true) {try {Runnable command  = taskQueue.poll(keepAliveTime, unit);//等待一秒没有获取就会返回nullif (command  == null) {//线程结束break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println(Thread.currentThread().getName()+"非核心线程结束");supportList.remove(Thread.currentThread());
//            System.out.println("当前非核心线程数量为:" + supportList.size());}}
}
package com.yb0os1.reject;import com.yb0os1.MyThreadPool;public interface RejectHandle {void reject(Runnable command, MyThreadPool myThreadPool);
}
package com.yb0os1.reject;import com.yb0os1.MyThreadPool;public class DiscardRejectHandle implements RejectHandle{@Overridepublic void reject(Runnable command, MyThreadPool myThreadPool) {myThreadPool.getTaskQueue().poll();System.out.println("任务被丢弃");}
}
package com.yb0os1.reject;import com.yb0os1.MyThreadPool;public class ThrowRejectHandle implements RejectHandle{@Overridepublic void reject(Runnable command, MyThreadPool myThreadPool) {throw new RuntimeException("线程池已满");}
}

思考

在这里插入图片描述

1.你能给线程池增加一个shutdown功能吗

答:关闭线程池分两种情况, 一个是清空任务队列、线程全部完成任务后关闭; 二是等线程完成后直接关,不管队列中的任务。

2、怎么理解拒绝策略

答:首先它是一个策略模式,在线程池的代码中,当任务队列满时就会触发该接口的方法,所以我们只要实现这个接口方法,再把实现类传入线程池即可,并且方法里还可以拿到被拒绝的任务、线程池对象来实现自己的拒绝逻辑。

3、ThreadFactory参数

答:这个参数是线程池用来创建核心、辅助线程的方法,我们可以自定义线程名称等参数。

http://www.xdnf.cn/news/8339.html

相关文章:

  • SQL实战之索引失效案例详解
  • Python在自动驾驶中的多传感器融合——让智能汽车“看得更清楚”
  • “Agent上车”浪潮来临,谁在引领新一轮的AI座舱交互变革?
  • JMeter 教程:监控性能指标 - 第三方插件安装(PerfMon)
  • SQL SERVER中实现类似LEAST函数的功能,返回多列数据中的最小值
  • 6个月Python学习计划 Day 2
  • python 实现一个完整的基于Python的多视角三维重建系统,包含特征提取与匹配、相机位姿估计、三维重建、优化和可视化等功能
  • Javase易混点专项复习02_static关键字
  • Day125 | 灵神 | 二叉树 | 二叉树中的第K大层和
  • 003-类和对象(二)
  • Ubuntu Linux系统的基本命令详情
  • 李宏毅《机器学习2025》笔记 —— 更新中
  • 使用 uv 工具从 pyproject.toml 和 uv.lock 快速安装 Python 依赖
  • 10G SFP+ 双纤光模块选购避坑指南:从SFP-10G-LRM到SFP-10G-ZR的兼容性与应用
  • C语言中的文件I/O
  • 用算法实现 用统计的方式实现 用自然语言处理的方法实现 用大模型实现 专利精益化统计分析
  • Attu下载 Mac版与Win版
  • 电磁兼容(EMC)仿真(精编版)
  • pytorch LSTM 结构详解
  • PR-2014《The MinMax K-Means clustering algorithm》
  • HTML5的新语义化标签
  • 腾讯地图WebServiceAPI提供基于HTTPS/HTTP协议的数据接口
  • JAVA:Kafka 存储接口详解与实践样例
  • 练习小项目7:天气状态切换器②
  • 机器学习中的维度、过拟合、降维
  • 从制造到智造:猎板PCB的技术实践与产业价值重构
  • 攻防世界 - MISCall
  • JMeter-SSE响应数据自动化
  • SVN被锁定解决svn is already locked
  • 青少年编程与数学 02-020 C#程序设计基础 02课题、开发环境