这里直接拿 OKHttp 源码中的线程池举例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(
          									0, // 核心线程
                            Integer.MAX_VALUE,  // 最大线程
                            60,					// 空闲线程闲置时间
                            TimeUnit.SECONDS,	// 闲置时间单位
                            new SynchronousQueue<Runnable>(), // 线程等待队列
                            Util.threadFactory("OkHttp Dispatcher", false) // 线程创建工厂
      );
    }
    return executorService;
}

每个线程的参数都在上面注释了,然后再看下这几个参数都分别起着什么样的作用,他们的工作流程先看下这个图,大致是这样的:

当一个任务通过execute(Runnable)方法添加到线程池时:

  • 线程数量小于核心线程数时,新建线程(核心)来处理被添加的任务
  • 线程数量大于等于 corePoolSize,存在空闲线程,使用空闲线程执行新任务
  • 线程数量大于等于 corePoolSize,不存在空闲线程,新任务将被添加到等待队列,添加成功则等待空闲线程执行任务,添加失败:
    • 线程数量小于最大线程数,新建线程执行新任务
    • 线程数量大于最大线程数,拒绝此任务

现在用代码的例子来解释一下上述的情况:

情况一

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
fun main() {
    val threadPoolExecutor = ThreadPoolExecutor(
        1, Int.MAX_VALUE,
        60, TimeUnit.SECONDS,
        LinkedBlockingDeque<Runnable>()
    )
    threadPoolExecutor.execute {
        println("任务一")
        while (true){}
    }
    threadPoolExecutor.execute {
        println("任务二")
    }
}

执行结果

1
任务一

只有任务一会被执行,其余的任务不会被执行,这是因为核心线程数是 1 ,最大线程数量为 Int 的最大数,当执行任务一的时候,任务一执行成功,但是任务并没有结束,然后执行任务二的时候,任务会被成功添加到等待队列中,但是由于核心线程数只有一个的关系,根据上图的关系,只有等待空闲线程才会执行任务二。

情况二

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
fun main() {
    val threadPoolExecutor = ThreadPoolExecutor(
        1, Int.MAX_VALUE,
        60, TimeUnit.SECONDS,
        ArrayBlockingQueue<Runnable>(1)
    )
    threadPoolExecutor.execute {
        println("任务一")
        while (true){}
    }
    threadPoolExecutor.execute {
        println("任务二")
    }
}

这个打印的结果和情况一是一样的,不一样的情况就是使用的队列是 ArrayBlockingQueue 大小为一个,如果按照正常情况的解释是如果执行任务情况一将添加到了队列中,然后启动核心线程执行任务,那么执行任务二的时候就添加到队列中失败了,因为满了,那么又因为线程数量小于最大线程数,所以会再创建一个线程去执行。按理说任务一和任务二都会执行的。理论上是这样的,其实不是,在执行任务一的时候,并没有添加到这个队列中去,而是直接执行任务了。看如下 execute 源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 public void execute(Runnable var1) {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            int var2 = this.ctl.get();
            if (workerCountOf(var2) < this.corePoolSize) {
                if (this.addWorker(var1, true)) {
                    return; // 执行任务一的时候没有添加到 workQueue 中,而是直接执行任务了
                }

                var2 = this.ctl.get();
            }
						// 执行任务二的时候才添加到这个队列中了,是添加成功的
            if (isRunning(var2) && this.workQueue.offer(var1)) {
                int var3 = this.ctl.get();
                if (!isRunning(var3) && this.remove(var1)) {
                    this.reject(var1);
                } else if (workerCountOf(var3) == 0) {
                    this.addWorker((Runnable)null, false);
                }
            } else if (!this.addWorker(var1, false)) {
                this.reject(var1);
            }

        }
}

执行任务一的时候,会执行第 7 行直接执行这个任务然后就被 return 掉了,并没有添加到 workQueue 中,然后执行任务二的时候则会添加成功,但是在经过第 16 行 isRunning 当前线程是正在运行的,所以条件不成立,会执行 workerCountOf,而当前运行的线程并不为 0 ,所以这里没有调用 addWorker 方法,所以任务二没有被执行。

情况三

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
fun main() {
    val threadPoolExecutor = ThreadPoolExecutor(
        1, Int.MAX_VALUE,
        60, TimeUnit.SECONDS,
        ArrayBlockingQueue<Runnable>(1)
    )
    threadPoolExecutor.execute {
        println("任务一")
//        while (true){}
    }
    threadPoolExecutor.execute {
        println("任务二")
    }

}

执行结果

1
2
任务一
任务二

任务一和任务二都能够被执行,这是因为在执行任务二的时候,任务一执行完毕了,已经有空闲线程了,那么就会去执行任务二了。

情况四

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
fun main() {
    val threadPoolExecutor = ThreadPoolExecutor(
        1, Int.MAX_VALUE,
        60, TimeUnit.SECONDS,
        ArrayBlockingQueue<Runnable>(1)
    )
    threadPoolExecutor.execute {
        println("任务一")
        while (true){}
    }
    threadPoolExecutor.execute {
        println("任务二")
    }

    threadPoolExecutor.execute {
        println("任务三")
    }

}

执行结果

1
2
3
任务一
任务三
任务二

任务都能够被执行,这里的执行循序是 1 3 2 ,这是因为任务一直接会被执行,然后将任务二添加到队列中,等待空闲线程才会去执行。紧接着任务三会尝试添加到队列中,但是这个时候队列满了,添加失败,由于线程数量小于最大线程数,然后会创建一个新的线程去执行任务三,任务三执行完之后就有空闲线程了,就会去执行任务二了。

情况五

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
fun main() {
    val threadPoolExecutor = ThreadPoolExecutor(
        0, Int.MAX_VALUE,
        60, TimeUnit.SECONDS,
        SynchronousQueue<Runnable>()
    )
    threadPoolExecutor.execute {
        println("任务一")
        while (true){}
    }
    threadPoolExecutor.execute {
        println("任务二")
    }

    threadPoolExecutor.execute {
        println("任务三")
    }
}

执行结果

1
2
3
任务一
任务二
任务三

任务都能够被执行,其实这里的参数配置和 OKHttp 的线程池配置是一样的了,注意这里的核心线程数虽然是 0,使用的是 SynchronousQueue 队列,至于为什么都能够被执行之前,先简单了解一下 3 个队列的区别

  • ArrayBlockingQueue
    • 基于数组的阻塞队列,初始化需要指定固定大小
  • LinkedBlockingDeque
    • 基于链表实现的阻塞队列,初始化可以指定大小,也可以不指定
  • SynchronousQueue
    • 无容量的队列

这 3 个队列的区别大概就是这些了,因为 SynchronousQueue 是无容量的队列,即使核心线程数是 0 的情况下,每次添加任务的时候都会失败,由于线程数量小于最大线程数,所以每次都会创建新线程去执行任务(如果没有空闲线程的情况下),这就是为什么都能够执行任务, 也是为什么 OKHttp 会这么配置的原因,这样可以保证最大的并发量,一有任务就能够立即被执行。

源码地址:https://github.com/midFang/blogSource/tree/main/Thread