(五)四种线程池底层详解
Java中提供了四种线程池创建方法,分别是:
线程池名称 | 描述 |
---|---|
newSingleThreadExecutor | 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 |
newFixedThreadPool | 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 |
newScheduledThreadPool | 创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。 |
newCachedThreadPool | 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 |
# 1. newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
特点
- 创建只有一个线程的线程池,且线程的存活时间是无限的;当该线程正繁忙时,对于新任务会进入阻塞队列中(无界的阻塞队列)
应用场景
- 适用单线程的场景。
- 适用于对提交任务的处理有顺序性要求的场景。
可以看到corePoolSize、maximumPoolSize都是 1 ,keepAliveTime 是 0 ,使用的是LinkedBlockingQueue队列。
demo:
public class FourThreadPool {
public static void main(String[] args) {
ScheduledThreadPoolExecutorTest().ScheduledThreadPoolExecutorTest();
ExecutorService executorService = new newSingleThreadExecutor().newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executorService.execute(new TestRunnable());
}
executorService.shutdown();
}
}
class TestRunnable implements Runnable {
static int i = 1;
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 线程被调用了。第" + getCount() + "次");
}
public static int getCount() {
return i++;
}
}
class newSingleThreadExecutor {
ExecutorService newSingleThreadExecutor() {
/**
* 1
* 此线程池 Executor 只有一个线程。它用于以顺序方式的形式执行任务。
* 如果此线程在执行任务时因异常而挂掉,则会创建一个新线程来替换此线程,后续任务将在新线程中执行。
*/
ExecutorService executorService = Executors.newSingleThreadExecutor();
return executorService;
}
}
输出:
pool-1-thread-1 线程被调用了。第1次
pool-1-thread-1 线程被调用了。第2次
pool-1-thread-1 线程被调用了。第3次
pool-1-thread-1 线程被调用了。第4次
pool-1-thread-1 线程被调用了。第5次
pool-1-thread-1 线程被调用了。第6次
pool-1-thread-1 线程被调用了。第7次
pool-1-thread-1 线程被调用了。第8次
pool-1-thread-1 线程被调用了。第9次
pool-1-thread-1 线程被调用了。第10次
# 2. newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
特点:
- 核心线程数与最大线程数相等,因此不会创建空闲线程。keepAliveTime 设置与否无关紧要。
- 采用无界队列,任务会被无限添加,直至内存溢出(OOM)。
- 由于无界队列不可能被占满,任务在执行前不可能被拒绝(前提是线程池一直处于运行状态)。
应用场景:
- 适用于线程数固定的场景
- 适用负载比较重的服务器
可以看到corePoolSize和maximumPoolSize一样, 由调用者自己设定传入 ,keepAliveTime 是 0 ,使用的是LinkedBlockingQueue队列。
可以看到上面两种线程池(SingleThreadExecutor、newFixedThreadPool)都是使用了 LinkedBlockingQueue ,而且默认使用的是 new LinkedBlockingQueue<Runnable>());
继续扒一下这个LinkedBlockingQueue
的默认大小
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
竟然是 int 类型的最大值
@Native public static final int MAX_VALUE = 0x7fffffff;
就是说这个队列里面可以放 2^31-1 = 2147483647
个 任务。
demo:
public class FourThreadPool {
public static void main(String[] args) {
ScheduledThreadPoolExecutorTest().ScheduledThreadPoolExecutorTest();
ExecutorService executorService = new newFixedThreadPool().newFixedThreadPool();
for (int i = 0; i < 10; i++) {
executorService.execute(new TestRunnable());
}
executorService.shutdown();
}
}
class TestRunnable implements Runnable {
static int i = 1;
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 线程被调用了。第" + getCount() + "次");
}
public static int getCount() {
return i++;
}
}
class newFixedThreadPool {
ExecutorService newFixedThreadPool() {
/**
* 2
* 它是一个拥有固定数量线程的线程池。提交给 Executor 的任务由固定的 n 个线程执行,
* 如果有更多的任务,它们存储在 LinkedBlockingQueue 里。这个数字 n 通常跟底层处理器CPU支持的线程总数有关。
*/
ExecutorService executorService = Executors.newFixedThreadPool(3);
return executorService;
}
}
输出:
pool-1-thread-1 线程被调用了。第1次
pool-1-thread-2 线程被调用了。第2次
pool-1-thread-2 线程被调用了。第4次
pool-1-thread-2 线程被调用了。第5次
pool-1-thread-2 线程被调用了。第6次
pool-1-thread-2 线程被调用了。第7次
pool-1-thread-1 线程被调用了。第3次
pool-1-thread-1 线程被调用了。第10次
pool-1-thread-2 线程被调用了。第9次
pool-1-thread-3 线程被调用了。第8次
可以看到最多只有3个线程,哪个线程空闲了就执行,超过corePoolSize=3
后,任务会被丢到 LinkedBlockingQueue 队列,因为LinkedBlockingQueue很大,所以执行完前面的任务,空闲后会执行队列LinkedBlockingQueue里面的任务。
# 3. newScheduledThreadPool
顾名思义可以延迟某个任务的执行。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
这里的super还是会调用ThreadPoolExecutor的方法。
可以看到corePoolSize由调用者自己设定传入 ,maximumPoolSize 是int的最大值,keepAliveTime 是 0 ,使用的是DelayedWorkQueue队列。
new DelayedWorkQueue()
: 一个按超时时间升序排序的队列,顾名思义,可以延迟执行。
我们可以自定义设置队列的延迟策略:
* @param command the task to execute
* @param initialDelay the time to delay first execution
* @param period the period between successive executions
* @param unit the time unit of the initialDelay and period parameters
* @return a ScheduledFuture representing pending completion of
* the task, and whose {@code get()} method will throw an
* exception upon cancellation
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if command is null
* @throws IllegalArgumentException if period less than or equal to zero
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
demo:
class ScheduledThreadPoolExecutorTest {
ScheduledExecutorService ScheduledThreadPoolExecutorTest() {
/**
* 3
* 当我们有一个需要定期运行的任务或者我们希望延迟某个任务时,就会使用此类型的 executor。
*/
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
//创建并执行在给定延迟后启用的单次操作。延迟2秒后启动,但是只执行一次。
// executorService.schedule(new TestRunnable(), 2, TimeUnit.SECONDS);
//延迟2秒后启动,之后每间隔1秒执行一次
executorService.scheduleAtFixedRate(new TestRunnable(), 2, 1, TimeUnit.SECONDS);
return executorService;
}
public static void main(String[] args) {
new ScheduledThreadPoolExecutorTest().ScheduledThreadPoolExecutorTest();
}
}
class TestRunnable implements Runnable {
static int i = 1;
@Override
public void run() {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
System.out.println(Thread.currentThread().getName() + " 线程被调用了。第" + getCount() + "次");
}
public static int getCount() {
return i++;
}
}
输出:
2020-07-15 18:03:47
2020-07-15 18:03:49
pool-1-thread-1 线程被调用了。第1次
2020-07-15 18:03:50
pool-1-thread-1 线程被调用了。第2次
2020-07-15 18:03:51
pool-1-thread-2 线程被调用了。第3次
2020-07-15 18:03:52
pool-1-thread-2 线程被调用了。第4次
2020-07-15 18:03:53
pool-1-thread-2 线程被调用了。第5次
2020-07-15 18:03:54
pool-1-thread-2 线程被调用了。第6次
2020-07-15 18:03:55
pool-1-thread-2 线程被调用了。第7次
# 4. newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特点:
- 核心线程数corePoolSize为0,则初始就创建空闲线程,并且空闲线程的只能等待任务60s,60s内没有提交任务,空闲线程将被销毁。
- 最大线程数为无穷大,这样会造成巨量线程同时运行,CPU负载过高,导致应用崩溃。
- 采用同步阻塞队列,即队列不存储任务。提交一个消费一个。由于最大线程数为无穷大,因此,只要提交任务就一定会被消费(应用未崩溃前)。
应用场景:
- 适用于耗时短、异步的小程序。
- 适用于负载较轻的服务器。
解释:SynchronousQueue是同步队列,即队列不存储任务。因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;若池中线程空闲时间超过指定大小,则该线程会被销毁。
demo:
public class FourThreadPool {
public static void main(String[] args) {
ScheduledThreadPoolExecutorTest().ScheduledThreadPoolExecutorTest();
ExecutorService executorService = new newCachedThreadPoolTest().newCachedThreadPoolTest();
for (int i = 0; i < 10; i++) {
executorService.execute(new TestRunnable());
}
executorService.shutdown();
}
}
class TestRunnable implements Runnable {
static int i = 1;
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 线程被调用了。第" + getCount() + "次");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static int getCount() {
return i++;
}
}
class newCachedThreadPoolTest {
ExecutorService newCachedThreadPoolTest() {
/**
* 4
* 此线程池的线程数不受限制。如果所有的线程都在忙于执行任务并且又有新的任务到来了,这个线程池将创建一个新的线程并将其提交到 Executor。
* 只要其中一个线程变为空闲,它就会执行新的任务。 如果一个线程有 60 秒的时间都是空闲的,它们将被结束生命周期并从缓存中删除。
*/
ExecutorService executorService = Executors.newCachedThreadPool();
return executorService;
}
}
执行结果:
pool-1-thread-2 线程被调用了。第1次
pool-1-thread-1 线程被调用了。第2次
pool-1-thread-3 线程被调用了。第3次
pool-1-thread-4 线程被调用了。第4次
pool-1-thread-5 线程被调用了。第5次
pool-1-thread-6 线程被调用了。第6次
pool-1-thread-7 线程被调用了。第7次
pool-1-thread-8 线程被调用了。第8次
pool-1-thread-9 线程被调用了。第9次
pool-1-thread-10 线程被调用了。第10次
# 线程池任务执行流程 :
- 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
- 当线程池达到corePoolSize时,新提交任务将被放入workQueue 中,等待线程池中任务调度执行(一旦核心线程释放后,将会从任务队列中pull task继续执行)
- 当workQueue(不考虑无界)已满,且 maximumPoolSize > corePoolSize 时,新提交任务会创建新线程执行任务,但空闲线程池中的线程是有**存活时间(keepAliveTime)**的,当线程执行完任务后,只能存活 keepAliveTime 时长,时间一过,线程就得被销毁。
- 当提交任务数超过 maximumPoolSize 时,新提交任务由RejectedExecutionHandler处理
- 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
- 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
来一张图总结一下线程池:
ThreadPoolExecutor 状态转换的途径,也可以理解为生命周期:
参考:
- java四种线程池的使用:https://www.cnblogs.com/zincredible/p/10984459.html (opens new window)
- 推荐看一下vivo技术团队的这个:https://mp.weixin.qq.com/s/Z9g-2JihdTpQ5Zsf3xzKCw