Java线程池使用入门
基本的线程池类
Executor
接口:该接口中只有一个execute
方法,执行已提交的Runnable
任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。ExecutorService
接口:继承自Executor
接口,提供了更多管理线程池的方法。AbstractExecutorService
抽象类,实现了ExecutorService
接口,实现了基本操作功能。ThreadPoolExecutor
最基本的线程池实现类,继承自AbstractExecutorService
。它的JDK文档上很详细的描述该类的参数作用和很多的使用细节。这是我们需要重点掌握的类。ScheduledExecutorService
接口,继承自ExecutorService
接口,可安排在给定的延迟后运行或定期执行的命令。ScheduledThreadPoolExecutor
类,继承自ThreadPoolExecutor
类,并实现了ScheduledExecutorService
接口,如果需要延迟后执行命令,或者定期执行命令,则这个类是必须掌握的。Executors
类,继承自Object
,这是一个工厂类,提供了创建ExecutorService
、ScheduledExecutorService
、ThreadFactory
、Callable
等一些与线程池相关的工厂方法。
上面列出了这么多类和接口,总结一下:
- 最基本的是
Executor
接口,它只有一个execute
方法,所以这也是使用线程池的最基本的方法。 - 关键实现类就两个:
ThreadPoolExecutor
、ScheduledThreadPoolExecutor
,它们都以Executor
结尾,所以它们都是Executor
接口的实现类。 - 以
Service
结尾的都是接口或者抽象类,了解一下即可。
所以重点需要掌握的其实就两个:ThreadPoolExecutor
、ScheduledThreadPoolExecutor
,而这两个中ThreadPoolExecutor
又是最重点的,因为ScheduledThreadPoolExecutor
是继承自ThreadPoolExecutor
的。
由于我工作中只用到了ThreadPoolExecutor
,所以我这里就只重点学习一下这个类的使用。
ThreadPoolExecutor
参数介绍
JDK文档中对于 ThreadPoolExecutor
类有很多的文字描述,所以我们只需好好去读一下JDK文档就能掌握到很多基本使用原理。
需要注意的是,在这个文档中,有提到:
强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。
但是阿里巴巴Java开发手册告诉我们不要去用这些工厂方法,因为直接用这些方法是有坑的,这些工厂方法只是使用不同的参数来创建出ThreadPoolExecutor
对象,所以我们最好是自己手动创建ThreadPoolExecutor
,这样我们才会去熟悉创建ThreadPoolExecutor
时使用的各个参数的功能和作用,这样才能避免在使用时产生Bug。
ThreadPoolExecutor
有4个构造函数,属于重载函数,所以可以理解为其实它就一个构造函数,我们只需要学习接收参数最多的那个构造函数即可:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler
)
一共7个参数,功能如下:
-
corePoolSize
核心线程数量。默认情况下核心线程在空闲的时候也不会被回收,所以在需要的时候就可以直接复用,加快响应速度。 -
maximumPoolSize
允许的最大线程数量(包含核心线程与非核心线程),不能小于corePoolSize
-
keepAliveTime
线程的超时时间,默认情况下指非核心线程在空闲多久的时候会被回收。 -
TimeUnit
前一个参数的时间单位,比较常用的单位为秒:TimeUnit.SECONDS
-
BlockingQueue
用于保存线程的队列,这是一个比较重要的参数,容易让人蹲坑!队列大小不能为0,且此列队仅保存由execute
方法提交的Runnable
任务。 -
ThreadFactory
创建线程的工厂,当需要自定义线程属性的时候才需要自己提供自定义的线程工厂(比如想要自定义线程的名称、线程组、优先级、守护进程状态等),否则使用默认的线程工厂即可:Executors.defaultThreadFactory()
-
RejectedExecutionHandler
拒绝异常处理器,当线程队列放满了,而且非核心线程也满了,则此任务应该如何处理交由RejectedExecutionHandler
来决定。它有几个基本的实现类,代码也很简单,一看源码就知道各个子类的功能和区别了。
核心线程什么时候被创建
示例如下:
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnitprivate fun createPool(): Pair<Int, ThreadPoolExecutor> {val maximumPoolSize = 4 // 允许的最大线程数val corePoolSize = 2 // 核心线程数val queueCapacity = 1 // 队列容量val keepAliveTime = 10L // 线程的闲置超时时间(默认情况下只针对非核心线程)val unit = TimeUnit.SECONDSval workQueue = ArrayBlockingQueue<Runnable>(queueCapacity)val threadFactory = Executors.defaultThreadFactory()val handler = ThreadPoolExecutor.AbortPolicy() // 使用中断策略,一旦任务无法处理则抛出异常val pool = ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler)return Pair(queueCapacity, pool)
}fun printPoolState(pool: ThreadPoolExecutor, queueCapacity: Int) {val maxPoolSize = pool.maximumPoolSize // 允许的最大线程数val corePoolSize = pool.corePoolSize // 核心线程数val activeCount = pool.activeCount // 正在主动执行任务的线程的大致数,这是一个估计值,并不一定准确。val poolSize = pool.poolSize // 池中当前线程的数量val idleThreadCount = poolSize - activeCount // 空闲线程数val size = pool.queue.size // 队列中已存线程数val remainingCapacity = pool.queue.remainingCapacity() // 队列可存线程数println("最大线程数:${maxPoolSize},核心线程数:$corePoolSize,当前线程数:$poolSize(活跃${activeCount},空闲${idleThreadCount}),队列大小:${queueCapacity}(已存$size,可存$remainingCapacity)")
}fun main() {val (queueCapacity, pool) = createPool()Thread.sleep(1000)printPoolState(pool, queueCapacity)
}
运行结果如下:
最大线程数:4,核心线程数:2,当前线程数:0(活跃0,空闲0),队列大小:1(已存0,可存1)
可以看到,当线程池创建后,我们等了1秒钟,核心线程也没有启动。说明它是需要有任务来了之后才会启动的,示例如下:
fun execute(pool: ThreadPoolExecutor, message: String) {pool.execute {println("开始执行:$message")Thread.sleep(1000)println("结束执行:$message")}
}fun main() {val (queueCapacity, pool) = createPool()printPoolState(pool, queueCapacity)execute(pool, "任务1")Thread.sleep(1100)printPoolState(pool, queueCapacity)
}
运行结果如下:
最大线程数:4,核心线程数:2,当前线程数:0(活跃0,空闲0),队列大小:1(已存0,可存1)
开始执行:任务1
结束执行:任务1
最大线程数:4,核心线程数:2,当前线程数:1(活跃0,空闲1),队列大小:1(已存0,可存1)
在示例中,我们在子线程中睡了1秒,在主线程中睡了1.1秒,所以在最后一次打印结果时,子线程肯定已经完全结束了,可以看到当前线程数为1了,这说明核心线程也是按需启动的,并不是一下就把所有核心线程全部启动的,而是来几个任务就创建几个核心线程,示例代码如下:
fun main() {val (queueCapacity, pool) = createPool()execute(pool, "任务1")Thread.sleep(2000)printPoolState(pool, queueCapacity)execute(pool, "任务2")printPoolState(pool, queueCapacity)
}
运行结果如下:
开始执行:任务1
结束执行:任务1
最大线程数:4,核心线程数:2,当前线程数:1(活跃0,空闲1),队列大小:1(已存0,可存1)
最大线程数:4,核心线程数:2,当前线程数:2(活跃1,空闲1),队列大小:1(已存0,可存1)
开始执行:任务2
结束执行:任务2
在执行了第1个任务后,我们在主线程上睡了2秒,然后打印状态,此时任务1肯定执行完成了,从打印状态也能看到当前线程数1(活跃0,空闲1),比较奇怪的时,当我们执行任务2的时候,它并不会使用这个空闲线程来执行,而是创建了一个新的线程来执行,从打印结果:当前线程数2(活跃1,空闲1),所以,当线程数未达到核心线程数之前,来新任务的时候即使有空闲线程,也会创建新的线程来执行任务,而不是使用已存在的空闲线程。
核心线程的创建,除了有任务后开始创建外,还可以在来任务之前提前创建,如下:
fun main() {val (queueCapacity, pool) = createPool()pool.prestartCoreThread()printPoolState(pool, queueCapacity)Thread.sleep(10)printPoolState(pool, queueCapacity)
}
运行结果如下:
最大线程数:4,核心线程数:2,当前线程数:1(活跃1,空闲0),队列大小:1(已存0,可存1)
最大线程数:4,核心线程数:2,当前线程数:1(活跃0,空闲1),队列大小:1(已存0,可存1)
从第2条打印结果可知,调用prestartCoreThread()
可预创建1条核心线程,而且它不是一创建后就立马变成空闲状态的,我们等了10毫秒后打印才变成了空闲状态。
如果想要预创建两条核心线程是不是要调用两次prestartCoreThread()
呢?示例如下:
fun main() {val (queueCapacity, pool) = createPool()pool.prestartCoreThread()pool.prestartCoreThread()printPoolState(pool, queueCapacity)
}
运行结果如下:
最大线程数:4,核心线程数:2,当前线程数:2(活跃2,空闲0),队列大小:1(已存0,可存1)
确实如我们所想,每调用一次prestartCoreThread()
就会创建一个新的核心线程。但是它不会超过规定的数量,示例如下:
fun main() {val (queueCapacity, pool) = createPool()pool.prestartCoreThread()pool.prestartCoreThread()pool.prestartCoreThread()printPoolState(pool, queueCapacity)
}
最大线程数:4,核心线程数:2,当前线程数:2(活跃2,空闲0),队列大小:1(已存0,可存1)
虽然我们调用了3次prestartCoreThread()
,但是只创建出了两条线程,这是因为我们设置了corePoolSize
为2。
如果我们希望一下子把所有核心线程全部预先创建出来,还有一个方式,如下:
fun main() {val (queueCapacity, pool) = createPool()pool.prestartAllCoreThreads()printPoolState(pool, queueCapacity)
}
运行结果如下:
最大线程数:4,核心线程数:2,当前线程数:2(活跃2,空闲0),队列大小:1(已存0,可存1)
队列什么时候会被使用
示例如下:
fun main() {val (queueCapacity, pool) = createPool()execute(pool,"任务1")execute(pool,"任务2")printPoolState(pool, queueCapacity)Thread.sleep(3000)printPoolState(pool, queueCapacity)execute(pool, "任务3")printPoolState(pool, queueCapacity)Thread.sleep(2000)printPoolState(pool, queueCapacity)
}
运行结果如下:
最大线程数:4,核心线程数:2,当前线程数:2(活跃2,空闲0),队列大小:1(已存0,可存1)
开始执行:任务1
开始执行:任务2
结束执行:任务2
结束执行:任务1
最大线程数:4,核心线程数:2,当前线程数:2(活跃0,空闲2),队列大小:1(已存0,可存1)
最大线程数:4,核心线程数:2,当前线程数:2(活跃0,空闲2),队列大小:1(已存1,可存0)
开始执行:任务3
结束执行:任务3
最大线程数:4,核心线程数:2,当前线程数:2(活跃0,空闲2),队列大小:1(已存0,可存1)
结果分析:
- 在代码中,我们一开始执行了两个任务,然后立马打印状态,显示
当前线程数:2(活跃2,空闲0),队列大小:1(已存0,可存1)
,这说明创建了两个线程来执行任务,任务并没有保存到队列中。 - 然后主线程睡了3秒后打印状态,显示
当前线程数2(活跃0,空闲2)
,说明两个线程已空闲。 - 执行任务3,然后打印状态为
当前线程数:2(活跃0,空闲2),队列大小:1(已存1,可存0)
,我们看到队列中已存1
,说明任务3被保存到了队列中,而不是直接交给空闲线程处理。这说明:- 当前线程数量 < 核心线程数时,来新任务时会创建新线程并直接交给新线程处理,不会保存到队列中。
- 当前线程数量 >= 核心线程数时,来新任务时先把任务保存到队列,再由空闲线程从队列中取任务来执行。
- 主线程又睡了2秒,此时任务3也执行完了,打印状态为:
当前线程数:2(活跃0,空闲2),队列大小:1(已存0,可存1)
,可以看到队列中的任务已经没了。
非核心线程什么时候创建
示例代码如下:
fun main() {val (queueCapacity, pool) = createPool()pool.prestartAllCoreThreads()printPoolState(pool, queueCapacity)execute(pool, "任务1")printPoolState(pool, queueCapacity)execute(pool, "任务2")printPoolState(pool, queueCapacity)execute(pool, "任务3")printPoolState(pool, queueCapacity)execute(pool, "任务4")printPoolState(pool, queueCapacity)execute(pool, "任务5")printPoolState(pool, queueCapacity)execute(pool, "任务6")printPoolState(pool, queueCapacity)
}
运行结果如下:
最大线程数:4,核心线程数:2,当前线程数:2(活跃0,空闲2),队列大小:1(已存0,可存1)
最大线程数:4,核心线程数:2,当前线程数:2(活跃0,空闲2),队列大小:1(已存1,可存0)
开始执行:任务1
最大线程数:4,核心线程数:2,当前线程数:2(活跃1,空闲1),队列大小:1(已存1,可存0)
开始执行:任务2
最大线程数:4,核心线程数:2,当前线程数:2(活跃2,空闲0),队列大小:1(已存1,可存0)
最大线程数:4,核心线程数:2,当前线程数:3(活跃3,空闲0),队列大小:1(已存1,可存0)
开始执行:任务4
最大线程数:4,核心线程数:2,当前线程数:4(活跃4,空闲0),队列大小:1(已存1,可存0)
开始执行:任务5
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task MainKt$$Lambda/0x000001f388003ac0@6f539caf rejected from java.util.concurrent.ThreadPoolExecutor@27973e9b[Running, pool size = 4, active threads = 4, queued tasks = 1, completed tasks = 0]at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)at MainKt.execute(Main.kt:31)at MainKt.main(Main.kt:52)at MainKt.main(Main.kt)
结束执行:任务5
开始执行:任务3
结束执行:任务4
结束执行:任务2
结束执行:任务1
结束执行:任务3
- 我们通过
prestartAllCoreThreads()
准备好了两个核心线程,从第1行打印结果可知:当前线程数:2(活跃0,空闲2),队列大小:1(已存0,可存1)
- 执行任务1:
execute(pool, "任务1")
,打印状态当前线程数:2(活跃0,空闲2),队列大小:1(已存1,可存0)
,可知任务1被放入队列了 - 打印
开始执行:任务1
,说明空闲线程从列队中把任务1取出来执行了。 - 执行任务2:
execute(pool, "任务2")
,打印状态当前线程数:2(活跃1,空闲1),队列大小:1(已存1,可存0)
,这说明并没有创建新的线程,线程数量还是2,且已存1
说明任务2被存入队列。 - 打印
开始执行:任务2
,说明空闲线程从列队中把任务2取出来执行了。 - 执行任务3:
execute(pool, "任务3")
,打印状态当前线程数:2(活跃2,空闲0),队列大小:1(已存1,可存0)
,活跃2说明已经有两个线程在执行任务了,而已存1说明任务3被存入了队列中。 - 执行任务4:
execute(pool, "任务4")
,打印状态当前线程数:3(活跃3,空闲0),队列大小:1(已存1,可存0)
,活跃3,说明现在有3个线程了,说明新创建了一个线程,已存1
表明还有1个任务在队列中,是任务3还是任务4保存在队列中呢? - 打印
开始执行:任务4
,这说明上一步创建的非核心线程直接执行任务4了,通常我们会以为创建了新线程应该先从队列中把任务3取出来执行,再把任务4保存到队列,事实并不是这样的。 - 执行任务5:
execute(pool, "任务5")
,打印状态当前线程数:4(活跃4,空闲0),队列大小:1(已存1,可存0)
,活跃4说明又创建了一个非核心线程,此时任务3依然保存在队列中。 - 打印
开始执行:任务5
,这是由上一步创建的非核心线程来执行的,此时任务3依然保存在队列中。 - 执行任务6:
execute(pool, "任务6")
,此时抛出了异常,原因也很简单,因为池已经满了,无法再处理新任务了,在执行任务5之后打印的状态显示当前线程数:4(活跃4,空闲0),队列大小:1(已存1,可存0)
,这个状态表示池已经满了,因为我们设置的maximumPoolSize = 4
且queueCapacity = 1
,而此时4个线程都处于活跃状态,也就是正在处理任务,而队列中也已经保存有1个任务了,总结就是线程全忙,队列已满,所以新任务6就没办法处理了,就抛出了异常。比就奇怪的是,虽然抛出了异常,但是进程并没有结束,线程池中的5个任务都能正常执行完成。
验证线程超时被回收
线程池参数为:
val maximumPoolSize = 4
val corePoolSize = 1
val queueCapacity = 1
val keepAliveTime = 10L
基于这些参数,则对应可以同时1个任务交给核心线程处理,1个放到队列,3个交给非核心线程,所以可以同时提交5个任务给线程池是没问题的,示例如下:
fun main() {val (queueCapacity, pool) = createPool()for (i in 1..5) {execute(pool, "任务$i")}printPoolState(pool, queueCapacity)Thread.sleep(1000 * 10)printPoolState(pool, queueCapacity)Thread.sleep(2000)printPoolState(pool, queueCapacity)
}
运行结果如下:
开始执行:任务3
开始执行:任务1
最大线程数:4,核心线程数:1,当前线程数:4(活跃4,空闲0),队列大小:1(已存1,可存0)
开始执行:任务4
开始执行:任务5
结束执行:任务3
结束执行:任务5
结束执行:任务4
结束执行:任务1
开始执行:任务2
结束执行:任务2
最大线程数:4,核心线程数:1,当前线程数:4(活跃0,空闲4),队列大小:1(已存0,可存1)
最大线程数:4,核心线程数:1,当前线程数:1(活跃0,空闲1),队列大小:1(已存0,可存1)
可以看到:
- 在5个任务都提交后立刻打印状态,
显示状态为:4(活跃4,空闲0),队列大小:1(已存1,可存0)
,此时4个线程全部在忙,队列已满,一切正常! - 然后主线程上睡了10秒,再打印状态为:
当前线程数:4(活跃0,空闲4),队列大小:1(已存0,可存1)
,因为子线程任务只需要1秒,所以任务全部完成,线程也变成了空闲状态,队列也空出来了,奇怪的是为什么3个非核心线程没被回收?这是因为子线程执行需要1秒,主线程睡了10秒,则相当于子线程才空闲了9秒,而我们的超时设置为10秒,所以超时时间还没到。 - 接着我们在主线程又睡了2秒,加起来主线程共睡了12秒了,则非核心线程的超时时间肯定够了,打印状态为:
当前线程数:1(活跃0,空闲1),队列大小:1(已存0,可存1)
,可以看到线程只剩1个了,3个非核心线程已被回收。
如果你的应用并不是一个经常需要高并发的应用,其实核心线程也可以设置可以被回收的:
pool.allowCoreThreadTimeOut(true)
再次运行代码,发现最后打印当前线程数为1,说明核心线程没有被回收,这是为什么呢?因为我们最大线程数为4,所以最多同时处理4个任务,而我们提交了5个任务,每个任务的执行时间为1秒,所以5个任务最少需要2秒,而主线程睡了12秒,按理说刚好才对啊,但是我们要知道,线程池处理完4个任务后,此时已经过去了1秒,然后才能处理第5个任务,前面4个任务处理完变成空闲状态,再到从队列取到任务,然后才能开始执行任务,所以这期间也是需要多花一些时间的,所以处理5个任务,总的时间肯定比2秒多一点点的,那我们在主线程上就再多睡1秒就能看到核心线程也被回收了。
在我公司的测速项目中,测速是不经常用的,所以可以把核心线程也设置为可以超时是可以的,但是我并没有这样做,因为我觉得我们的服务器资源还没紧张到那个地步,那就浪费一点吧,反正也没事,以资源换时间,当需要用的时候能快一点点是一点点。
总结
- 线程池对象创建后,核心线程并不是立刻创建的,而是在调用
execute
函数时才创建,而且是调用一次创建一个,且每次调用时即使前面有空闲线程,也会创建新线程来处理新任务,直到当前线程数达到了核心线程数后才停止创建。 - 当线程数量 >= 核心线程数后,再有新任务来的时候,即使池里面有空闲线程,也不会立刻交给空闲线程处理的,而是先保存到队列里面,然后等待空闲线程从列队里面取来执行。
- 当队列满了之后,再有新任务时,会直接创建非核心线程来处理当前任务,而不是优先处理队列里面的任务,所以我们不要希望任务按先后顺序执行。总结就是创建线程时(核心或非核心)都会直接处理当前提交的任务,而保存在队列里的任务只能等空闲的线程取来处理。
- 最大线程数和队列容量最少设置为核心线程数的两倍,原因请看下面的避坑。
避坑
了解了线程池的一些基本使用原理后,就可以来聊一下我工作中遇到的坑了。
在我的工作中,有一个测速项目,每次需要两个线程,一个线程用于测试上传,一个用于测试下载,且这两个线程有做同步处理,即不管哪个线程先结束了,都会等另一个线程结束,然后再统计上行、下行的平均速度。就是这样一个需求,然后遇到了bug,发现队列竟然不够用了,因为我知道一次只需要两个线程,所以我声明最大线程为2,觉得够用了,本想把队列大小设置为0的,但是因为API限定不能设置为0,所以我设置为1,使用前面的例子,修改如下参数:
val maximumPoolSize = 2
val corePoolSize = 2
val queueCapacity = 1
其他代码保持不变,然后模拟我工作的场景:
fun main() {val (queueCapacity, pool) = createPool()execute(pool, "任务1")execute(pool, "任务2")Thread.sleep(2000)printPoolState(pool, queueCapacity)execute(pool, "任务3")printPoolState(pool, queueCapacity)execute(pool, "任务4")printPoolState(pool, queueCapacity)
}
运行结果如下:
开始执行:任务2
开始执行:任务1
结束执行:任务2
结束执行:任务1
最大线程数:2,核心线程数:2,当前线程数:2(活跃0,空闲2),队列大小:1(已存0,可存1)
最大线程数:2,核心线程数:2,当前线程数:2(活跃0,空闲2),队列大小:1(已存1,可存0)
开始执行:任务3
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task MainKt$$Lambda/0x000001aa01003ac0@6f539caf rejected from java.util.concurrent.ThreadPoolExecutor@27973e9b[Running, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 2]at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)at MainKt.execute(Main.kt:31)at MainKt.main(Main.kt:49)at MainKt.main(Main.kt)
结束执行:任务3
结果分析:
- 我们每次提交两个任务,第一次提交了两个任务,且顺序完成。
- 我在主线程特意睡了2秒,以模拟等前面两个任务都完成(前两个任务只需要1秒就能执行完),打印的状态
当前线程数:2(活跃0,空闲2),队列大小:1(已存0,可存1)
,这也表明前面两个任务已完成,且两个核心线程都处于了空闲状态 - 提交任务3后,状态为:
当前线程数:2(活跃0,空闲2),队列大小:1(已存1,可存0)
,这里队列已满,通过前面的知识我们知道,当当前线程数 >= 核心线程数后,再有新任务时,即使当前有空闲线程,也不会立刻把任务交给空闲线程处理的,而是先把任务保存到队列。 - 提交任务4,此时队列已满,此时会创建非核心线程来处理任务,但是由于我核心线程和允许的最大线程设置是一样的,都是2,这种情况就相当于不允许有核心线程,所以不会创建非核心线程,而队列又满了,所以就抛出了拒绝执行的异常:
RejectedExecutionException
。
当时我没搞懂原理的时候我就很奇怪,我每次只需要两个线程,一个用于测试上行,一个用于测试下行,而且当前也确实有两个空闲的线程了,还有队列容量为1,按道理我同时提交3个任务应该都是没问题的呀,为什么每次提交两个任务时都只能处理1个任务,而另一个会被拒绝?能处理1个任务就是因为我们的队列大小设置为1,所以可以把1个任务保存进去,接着队列就满了第二个任务就放不进来了,所以就抛出了异常了。
这个示例代码也并不是每次都会出异常,比如我们在提交第3个任务的时候,任务保存到了队列中,然后就在一瞬间空闲线程就把任务从列队中取走了,队列就空出来了,则此时提交第4个任务的时候就能保存到队列中了。所以要想异常更容易出现,可以把任务3和任务4之间的打印状态的代码删除,因为这个打印也是要消耗时间的,这个消耗的时间很可能就足已让空闲线程从列队中取走任务3了,再提交任务4的时候自然就能正常提交到队列中了。
了解了原理后,解决方案其实也很简单,要想实现每次都能同时处理两个任务,把队列大小设置为2就行了。或者调大允许的最大线程数也可以。总之,合理的设置应该满足:
queueCapacity >= corePoolSize
maximumPoolSize >= corePoolSize
这里,我们把核心线程设置为1也能解决问题:
val maximumPoolSize = 2
val corePoolSize = 1 // 由2改为1
val queueCapacity = 1
见了鬼,为什么改小了反而也没问题了,这也是我没搞懂原理时所迷惑的,不懂原理,光看现象,自己也不敢保证因此就解决了问题,当时只敢这样想:可能只是减少了问题的出现,搞不好在什么条件下它又出现问题了。所以,还是得知根知底才能知道问题是否真的解决了,下面来分析原因:
fun main() {val (queueCapacity, pool) = createPool()execute(pool, "任务1")printPoolState(pool, queueCapacity)execute(pool, "任务2")printPoolState(pool, queueCapacity)Thread.sleep(3000)printPoolState(pool, queueCapacity)execute(pool, "任务3")printPoolState(pool, queueCapacity)execute(pool, "任务4")printPoolState(pool, queueCapacity)
}
运行结果:
最大线程数:2,核心线程数:1,当前线程数:1(活跃1,空闲0),队列大小:1(已存0,可存1)
开始执行:任务1
最大线程数:2,核心线程数:1,当前线程数:1(活跃1,空闲0),队列大小:1(已存1,可存0)
结束执行:任务1
开始执行:任务2
结束执行:任务2
最大线程数:2,核心线程数:1,当前线程数:1(活跃0,空闲1),队列大小:1(已存0,可存1)
最大线程数:2,核心线程数:1,当前线程数:1(活跃0,空闲1),队列大小:1(已存1,可存0)
开始执行:任务3
最大线程数:2,核心线程数:1,当前线程数:1(活跃1,空闲0),队列大小:1(已存1,可存0)
结束执行:任务3
开始执行:任务4
结束执行:任务4
- 提交第1个任务,则创建1个核心线程,任务立刻交给核心线程处理。对应第一次打印的状态:
当前线程数:1(活跃1,空闲0),队列大小:1(已存0,可存1)
- 提交第2个任务时,由于当前线程 >= 核心线程数,所以不管三七二十一,先把任务保存到队列中。对应第二次打印状态:
当前线程数:1(活跃1,空闲0),队列大小:1(已存1,可存0)
- 主线程睡了3秒,前面两个任务顶多花2秒,所以此时前面的两个任务肯定都完成了,对应第三次打印状态为:
当前线程数:1(活跃0,空闲1),队列大小:1(已存0,可存1)
- 提交第3个任务时,对应第四次打印状态为:
当前线程数:1(活跃0,空闲1),队列大小:1(已存1,可存0)
,此时任务3被保存到了队列中。 - 提交第4个任务时,对应第四次打印状态为:
当前线程数:1(活跃1,空闲0),队列大小:1(已存1,可存0)
,此时队列已满,无法再存入到队列中了,但是由于我们设置的最大线程数为2,核心线程为1,所以允许非核心线程为1,所以虽然现在队列满了,但是非核心线程还没满,它会创建非核心线程来处理当前任务。
这就是为什么把核心线程改小了反而也解决了问题。
为了预防万一,队列大小或允许最大线程数至少应该是核心线程的两倍,比如:假设设置的核心线程和允许的最大线程都是2,队列大小也设置为2,假设当前有两条核心线程,刚处理完两个任务,队列里面也有两个任务,你以为此时两条核心线程会立马变成空闲状态立刻取走队列里面的任务吗?其实并不一定的,线程处理完任务并不一定立马就能变成空闲状态的,所以此刻如果你再提交新的任务,就会因为队列满了而被拒绝。所以不要设置得刚好够用的状态,你以为的刚好够用,在某些条件下可能就不够用了。所以允许的最大线程还有队列容易至少都设置为核心线程的两倍,宁多勿少!多了浪费一点资源没关系,少了你出Bug那可是大事!