这里直接拿 OKHttp 源码中的线程池举例:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(
0, // 核心线程
Integer.MAX_VALUE, // 最大线程
60, // 空闲线程闲置时间
TimeUnit.SECONDS, // 闲置时间单位
new SynchronousQueue<Runnable>(), // 线程等待队列
Util.threadFactory("OkHttp Dispatcher", false) // 线程创建工厂
);
}
return executorService;
}
|
每个线程的参数都在上面注释了,然后再看下这几个参数都分别起着什么样的作用,他们的工作流程先看下这个图,大致是这样的:
当一个任务通过execute(Runnable)方法添加到线程池时:
- 线程数量小于核心线程数时,新建线程(核心)来处理被添加的任务
- 线程数量大于等于 corePoolSize,存在空闲线程,使用空闲线程执行新任务
- 线程数量大于等于 corePoolSize,不存在空闲线程,新任务将被添加到等待队列,添加成功则等待空闲线程执行任务,添加失败:
- 线程数量小于最大线程数,新建线程执行新任务
- 线程数量大于最大线程数,拒绝此任务
现在用代码的例子来解释一下上述的情况:
情况一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
fun main() {
val threadPoolExecutor = ThreadPoolExecutor(
1, Int.MAX_VALUE,
60, TimeUnit.SECONDS,
LinkedBlockingDeque<Runnable>()
)
threadPoolExecutor.execute {
println("任务一")
while (true){}
}
threadPoolExecutor.execute {
println("任务二")
}
}
|
执行结果
只有任务一会被执行,其余的任务不会被执行,这是因为核心线程数是 1 ,最大线程数量为 Int 的最大数,当执行任务一的时候,任务一执行成功,但是任务并没有结束,然后执行任务二的时候,任务会被成功添加到等待队列中,但是由于核心线程数只有一个的关系,根据上图的关系,只有等待空闲线程才会执行任务二。
情况二
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
fun main() {
val threadPoolExecutor = ThreadPoolExecutor(
1, Int.MAX_VALUE,
60, TimeUnit.SECONDS,
ArrayBlockingQueue<Runnable>(1)
)
threadPoolExecutor.execute {
println("任务一")
while (true){}
}
threadPoolExecutor.execute {
println("任务二")
}
}
|
这个打印的结果和情况一是一样的,不一样的情况就是使用的队列是 ArrayBlockingQueue 大小为一个,如果按照正常情况的解释是如果执行任务情况一将添加到了队列中,然后启动核心线程执行任务,那么执行任务二的时候就添加到队列中失败了,因为满了,那么又因为线程数量小于最大线程数,所以会再创建一个线程去执行。按理说任务一和任务二都会执行的。理论上是这样的,其实不是,在执行任务一的时候,并没有添加到这个队列中去,而是直接执行任务了。看如下 execute 源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public void execute(Runnable var1) {
if (var1 == null) {
throw new NullPointerException();
} else {
int var2 = this.ctl.get();
if (workerCountOf(var2) < this.corePoolSize) {
if (this.addWorker(var1, true)) {
return; // 执行任务一的时候没有添加到 workQueue 中,而是直接执行任务了
}
var2 = this.ctl.get();
}
// 执行任务二的时候才添加到这个队列中了,是添加成功的
if (isRunning(var2) && this.workQueue.offer(var1)) {
int var3 = this.ctl.get();
if (!isRunning(var3) && this.remove(var1)) {
this.reject(var1);
} else if (workerCountOf(var3) == 0) {
this.addWorker((Runnable)null, false);
}
} else if (!this.addWorker(var1, false)) {
this.reject(var1);
}
}
}
|
执行任务一的时候,会执行第 7 行直接执行这个任务然后就被 return 掉了,并没有添加到 workQueue 中,然后执行任务二的时候则会添加成功,但是在经过第 16 行 isRunning 当前线程是正在运行的,所以条件不成立,会执行 workerCountOf,而当前运行的线程并不为 0 ,所以这里没有调用 addWorker 方法,所以任务二没有被执行。
情况三
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
fun main() {
val threadPoolExecutor = ThreadPoolExecutor(
1, Int.MAX_VALUE,
60, TimeUnit.SECONDS,
ArrayBlockingQueue<Runnable>(1)
)
threadPoolExecutor.execute {
println("任务一")
// while (true){}
}
threadPoolExecutor.execute {
println("任务二")
}
}
|
执行结果
任务一和任务二都能够被执行,这是因为在执行任务二的时候,任务一执行完毕了,已经有空闲线程了,那么就会去执行任务二了。
情况四
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
fun main() {
val threadPoolExecutor = ThreadPoolExecutor(
1, Int.MAX_VALUE,
60, TimeUnit.SECONDS,
ArrayBlockingQueue<Runnable>(1)
)
threadPoolExecutor.execute {
println("任务一")
while (true){}
}
threadPoolExecutor.execute {
println("任务二")
}
threadPoolExecutor.execute {
println("任务三")
}
}
|
执行结果
任务都能够被执行,这里的执行循序是 1 3 2 ,这是因为任务一直接会被执行,然后将任务二添加到队列中,等待空闲线程才会去执行。紧接着任务三会尝试添加到队列中,但是这个时候队列满了,添加失败,由于线程数量小于最大线程数,然后会创建一个新的线程去执行任务三,任务三执行完之后就有空闲线程了,就会去执行任务二了。
情况五
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
fun main() {
val threadPoolExecutor = ThreadPoolExecutor(
0, Int.MAX_VALUE,
60, TimeUnit.SECONDS,
SynchronousQueue<Runnable>()
)
threadPoolExecutor.execute {
println("任务一")
while (true){}
}
threadPoolExecutor.execute {
println("任务二")
}
threadPoolExecutor.execute {
println("任务三")
}
}
|
执行结果
任务都能够被执行,其实这里的参数配置和 OKHttp 的线程池配置是一样的了,注意这里的核心线程数虽然是 0,使用的是 SynchronousQueue 队列,至于为什么都能够被执行之前,先简单了解一下 3 个队列的区别
- ArrayBlockingQueue
- LinkedBlockingDeque
- 基于链表实现的阻塞队列,初始化可以指定大小,也可以不指定
- SynchronousQueue
这 3 个队列的区别大概就是这些了,因为 SynchronousQueue 是无容量的队列,即使核心线程数是 0 的情况下,每次添加任务的时候都会失败,由于线程数量小于最大线程数,所以每次都会创建新线程去执行任务(如果没有空闲线程的情况下),这就是为什么都能够执行任务, 也是为什么 OKHttp 会这么配置的原因,这样可以保证最大的并发量,一有任务就能够立即被执行。
源码地址:https://github.com/midFang/blogSource/tree/main/Thread