线程池中多余的线程是如何回收的
这里涉及到线程池的执行策略,建议先看一下线程池ThreadPoolExecutor框架。
我在网上看到的原话是这样的:线程池中多余的线程是如何回收的?
听说这是一道面试题,我当时就惊了,这不就是相当于问源码吗???好家伙,现在ThreadPoolExecutor框架都问的这么深了吗?
莫慌。
下面一起来简单分析ThreadPoolExecutor回收工作线程的原理。
# Step1
来个例子:
public class ThreadPoolALL {
public static void main(String[] args) {
new ThreadPoolALL().ExecutorThreadPool();
}
static class TestRunnable implements Runnable {
static int i = 0;
@Override
public void run() {
synchronized (this) {
int count = getCount();
System.out.println(Thread.currentThread().getName() + " 线程被调用了。第" + count + "次");
}
}
public static int getCount() {
return ++i;
}
}
/**
* ThreadPoolExecutor是ExecutorSerivce接口的具体实现
* 即ExecutorSerivce最后也是调用ThreadPoolExecutor的
* ThreadPoolExecutor提供了很多参数
* 阿里开发手册建议使用这种方法创建线程池
*/
public void ExecutorThreadPool() {
//自定义线程池
int corePoolSize = 2; //线程池维护线程的最少数量
int maxPoolSize = 3; //线程池维护线程的最大数量
long keepAliveTime = 10; //线程池维护线程所允许的空闲时间(解释:当线程池的数量超过corePoolSize时,多余的空闲线程的存活时间。)
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
BlockingDeque<Runnable> queue = new LinkedBlockingDeque<>(7);
//或者
ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
SecurityManager s = System.getSecurityManager();
ThreadGroup group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
Thread t = new Thread(group, r);
//设置优先级
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
//设置错误处理
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
//自定义处理错误
System.out.println("factory捕获了错误--->>>" + t.getName() + e);
}
});
return t;
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue, factory, rejectedExecutionHandler);
//这是不使用shutdown回收线程的替代方法。
// 在allowCoreThreadTimeOut设置为true时,ThreadPoolExecutor的keepAliveTime参数必须大于0。
executor.allowCoreThreadTimeOut(false);
//1、Runnable方法起线程
TestRunnable testRunnable = new TestRunnable();
//起多少个线程
for (int i = 0; i < 10; i++) {
executor.execute(testRunnable);
}
// executor.shutdown();
}
}
⚠️注意:这里是注释了
executor.shutdown()
而且executor.allowCoreThreadTimeOut(false)
设置为false
(默认是false)
输出:
Thread-0 线程被调用了。第1次
Thread-0 线程被调用了。第2次
Thread-0 线程被调用了。第3次
Thread-0 线程被调用了。第4次
Thread-1 线程被调用了。第5次
Thread-1 线程被调用了。第6次
Thread-1 线程被调用了。第7次
Thread-1 线程被调用了。第8次
Thread-2 线程被调用了。第9次
Thread-0 线程被调用了。第10次
但是你看到控制台是没有退出的,IDEA 执行完了之后也是没有退出的:
# Step2
我们用java命令工具看一下有哪些进程:
C:\Users\HaC>jps
12192
21992 KotlinCompileDaemon
3176 Launcher
10076 Launcher
20476 Jps
20780 ThreadPoolALL
7084
可以看到 20780 ThreadPoolALL
这个java线程一直存活 ,
再查看一下这个进程有什么线程:(需要马上查看,因为设置了keepAliveTime)
C:\Users\HaC> jstack 20780
2021-03-23 14:38:03
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode):
"DestroyJavaVM" #17 prio=5 os_prio=0 tid=0x0000000002331000 nid=0x5188 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Thread-2" #16 prio=5 os_prio=0 tid=0x0000000020054000 nid=0x3ff8 waiting on condition [0x00000000219fe000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076bcee280> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:522)
at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:684)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
"Thread-1" #15 prio=5 os_prio=0 tid=0x0000000020053800 nid=0x4e70 waiting on condition [0x00000000217df000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076bcee280> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:522)
at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:684)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
"Thread-0" #14 prio=5 os_prio=0 tid=0x00000000200c8000 nid=0x5408 waiting on condition [0x000000002162e000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076bcee280> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:522)
at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:684)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
.....................省略
可以看到有三个线程 Thread-1 、Thread-2、Thread-0 在 TIMED_WAITING
状态。
# step3
我在代码中设置了keepAliveTime =10
秒,10秒过完后,再来看看这个线程是否还在:
C:\Users\HaC> jstack 20780
2021-03-23 14:40:17
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode):
"DestroyJavaVM" #17 prio=5 os_prio=0 tid=0x0000000002331000 nid=0x5188 waiting on condition [0x000
0000000000000]
java.lang.Thread.State: RUNNABLE
"Thread-2" #16 prio=5 os_prio=0 tid=0x0000000020054000 nid=0x3ff8 waiting on condition [0x00000000
219fe000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076bcee280> (a java.util.concurrent.locks.AbstractQueuedSy
nchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQue
uedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
"Thread-0" #14 prio=5 os_prio=0 tid=0x00000000200c8000 nid=0x5408 waiting on condition [0x00000000
2162e000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076bcee280> (a java.util.concurrent.locks.AbstractQueuedSy
nchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQue
uedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
可以看到只有两个线程 Thread-2、Thread-0 在 TIMED_WAITING
状态了。
这也验证了:
如果你设置了keepAliveTime
且设置executor.allowCoreThreadTimeOut(false)
,当线程池的数量超过corePoolSize时,超过的部分,即多余的空闲线程的存活时间超过keepAliveTime
就会被回收。
即maxPoolSize-corePoolSize的部分
如果你executor.allowCoreThreadTimeOut(true)
,再执行一下,等待10s后,发现线程自动退出了。
Process finished with exit code 0
说明executor.allowCoreThreadTimeOut(true)
会把corePoolSize的线程也退出了。
下面再来演示一下executor.shutdown();
,我们放开注释,再设置executor.allowCoreThreadTimeOut(false)
可以看到控制台马上退出了,并不会等待10s这么长。
# 总结一下:
线程退出(回收)有两种方法:
1、参数allowCoreThreadTimeOut为true,等待keepAliveTime后,线程就会被回收。
2、加上executor.shutdown();马上将线程池中的空闲线程回收。该方法会使得keepAliveTime参数失效。
这里是拓展阅读:
# 线程池执行过程分析
续上以上的代码,线程池开始执行的入口是这里:
executor.execute(testRunnable);
顺藤摸瓜跳过去看一下这个execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
这里有一大段注释,我在网上找了一张图,意思大概就是:
文字版本:
- 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
- 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
- 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
- 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
- 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
- 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
既然程序已经执行完了,为什么线程还没退出呢?
因为线程池的引用被它的内部类 Worker 持有了。而 Worker 和线程一一对应,是对 Thread 的增强,所以本质上就是因为线程没有被释放。
这里不详细展开线程池的执行原理,需要的可以参考:https://www.jianshu.com/p/edab547f2710
回收线程的核心在runWorker()
方法的getTask()
和processWorkerExit()
final void runWorker(Worker w) {
boolean completedAbruptly = true;
...
try {
while (getTask()...) {
...
处理任务
}
//该线程已经从队列中取不到任务了,改变标记,该标记表示:该线程是否因用户因素导致的异常而终止
completedAbruptly = false;
} finally {
//线程移除
processWorkerExit(w, completedAbruptly);
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
精力有限,简单顺着代码分析,最后线程池在执行 shutdown 方法 或 allowCoreThreadTimeOut方法 时,会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;
详细的可以参考:
https://blog.csdn.net/weixin_39633954/article/details/111579295.
https://www.jianshu.com/p/edab547f2710
参考这里的总结:https://www.cnblogs.com/kingsleylam/p/11241625.html
# 4. 总结
ThreadPoolExecutor回收工作线程,一条线程getTask()返回null,就会被回收。
分两种场景。
4.1 未调用shutdown() ,RUNNING状态下全部任务执行完成的场景
线程数量大于corePoolSize,线程超时阻塞,超时唤醒后CAS减少工作线程数,如果CAS成功,返回null,线程回收。否则进入下一次循环。当工作者线程数量小于等于corePoolSize,就可以一直阻塞了。
4.2 调用shutdown() ,全部任务执行完成的场景
shutdown() 会向所有线程发出中断信号,这时有两种可能。
4.2.1 所有线程都在阻塞
中断唤醒,进入循环,都符合第一个if判断条件,都返回null,所有线程回收。
4.2.2 任务还没有完全执行完
至少会有一条线程被回收。在processWorkerExit(Worker w, boolean completedAbruptly)方法里会调用tryTerminate(),向任意空闲线程发出中断信号。所有被阻塞的线程,最终都会被一个个唤醒,回收。