(六)ThreadPoolExecutor自定义线程池
上一篇中提到四种线程池的创建方式,最后还是会 new ThreadPoolExecutor()
,所以 我们可以使用 new ThreadPoolExecutor()
的方法创建自定义的线程。
可以看到 ThreadPoolExecutor 是 ExecutorService的实现类:
public class ThreadPoolExecutor extends AbstractExecutorService {
}
public abstract class AbstractExecutorService implements ExecutorService {
}
阿里开发手册写到建议使用 ThreadPoolExecutor
去创建线程池,而不是使用默认的4中线程池策略:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 返回的线程池对象的弊端如下: 1) FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。 2) CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
# ThreadPoolExecutor
线程池的核心:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数解释:
序号 | 参数名称 | 参数解释 |
---|---|---|
1 | corePoolSize | 表示常驻核心线程数(包括空闲线程),如果大于0,即使本地任务执行完也不会被销毁 |
2 | maximumPoolSize | 表示线程池能够容纳可同时执行的最大线程数 |
3 | keepAliveTime | 线程池中线程空闲的时间,当空闲时间达到该值时,线程会被销毁,只剩下 corePoolSize 个线程 |
4 | unit | keepAliveTime 的时间单位,最终都会转换成【纳秒】,因为CPU的执行速度杠杠滴 |
5 | workQueue | 当请求的线程数大于 corePoolSize 时,线程进入该阻塞队列 |
6 | threadFactory | 顾名思义,线程工厂,用来生产一组相同任务的线程,同时也可以通过它增加前缀名,虚拟机栈分析时更清晰 |
7 | handler | 执行拒绝策略,当 workQueue 达到上限,就要通过这个来处理,比如拒绝,丢弃等,这是一种限流的保护措施 |
# corePoolSize、maximumPoolSize、workQueue 三者关系:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler 所指定的策略来处理此任务。
也就是: 核心线程corePoolSize满,丢到任务队列workQueue,当workQueue没满,且corePoolSize+workQueue<maximumPoolSize 时,建新线程执行(最多可以开maximumPoolSize 个线程);当workQueue已满,且`corePoolSize+workQueue>=maximumPoolSize 时,这个多出来的任务就会使用handler处理被拒绝。
# 关于workQueue
workQueue的类型为BlockingQueue<Runnable>
,通常可以取下面三种类型:
1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
workQueue也是有长度的,例如:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 9, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(2));
如果队列不指定对列长度的话,那么默认使用的就是Integer.MAX_VALUE
# 关于hander
Java提供了四种拒绝策略。
AbortPolicy
:默认的拒绝策略
,会直接 throw RejectedExecutionException
拒绝
CallerRunsPolicy
:提交任务的主线程自己去执行该任务
DiscardOldestPolicy
:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列
DiscardPolicy
:相当大胆的策略,直接丢弃任务,没有任何异常抛出
# 关于线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
- shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
- shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
# 线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize()
- setCorePoolSize:设置核心池大小
- setMaximumPoolSize:设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。
demo:
public class RejectedExecutionExceptionTest {
public static void main(String[] args) {
int corePoolSize = 3; //线程池维护线程的最少数量
int maxPoolSize = 5; //线程池维护线程的最大数量
long keepAliveTime = 1; //线程池维护线程所允许的空闲时间(解释:当线程池的数量超过corePoolSize时,多余的空闲线程的存活时间。)
//缓存队列可以放多少个线程
BlockingDeque<Runnable> queue = new LinkedBlockingDeque<>(2);
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
ThreadFactory factory = (Runnable r) -> {
//创建一个线程
Thread t = new Thread(r);
//给创建的线程设置UncaughtExceptionHandler对象 里面实现异常的默认逻辑
t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> {
System.out.println("factory的exceptionHandler捕捉到异常--->>> \n" + e.getMessage());
});
return t;
};
//创建一个线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue, factory, rejectedExecutionHandler);
runnable(executor);
}
static void runnable(ThreadPoolExecutor executor) {
TestRunnable testRunnable = new TestRunnable(executor);
for (int i = 0; i < 10; i++) {
executor.execute(testRunnable);
}
executor.shutdown();
}
static class TestRunnable implements Runnable {
static int i = 1;
ThreadPoolExecutor executor;
public TestRunnable(ThreadPoolExecutor executor) {
this.executor = executor;
}
public static int getCount() {
return i++;
}
@Override
public void run() {
synchronized (this) {
System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "---》》》ActiveCount:"
+ executor.getActiveCount() + ",CompletedTaskCount:" + executor.getCompletedTaskCount()
+ ",Queue:" + executor.getQueue().remainingCapacity() + ",taskCount:" + executor.getTaskCount()
);
System.out.println(" " + Thread.currentThread().getName() + " 线程被调用了。第" + getCount() + "次");
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出:
factory的exceptionHandler捕捉到异常--->>>
Task com.yudianxx.basic.线程.Executor.RejectedExecutionExceptionTest$TestRunnable@424c0bc4 rejected from java.util.concurrent.ThreadPoolExecutor@3c679bde[Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
2020-07-17 16:03:34---》》》ActiveCount:5,CompletedTaskCount:0,Queue:0,taskCount:7 Thread-0 线程被调用了。第1次
2020-07-17 16:03:35---》》》ActiveCount:5,CompletedTaskCount:1,Queue:1,taskCount:7 Thread-4 线程被调用了。第2次
2020-07-17 16:03:36---》》》ActiveCount:5,CompletedTaskCount:2,Queue:2,taskCount:7 Thread-3 线程被调用了。第3次
2020-07-17 16:03:37---》》》ActiveCount:4,CompletedTaskCount:3,Queue:2,taskCount:7 Thread-2 线程被调用了。第4次
2020-07-17 16:03:38---》》》ActiveCount:3,CompletedTaskCount:4,Queue:2,taskCount:7 Thread-1 线程被调用了。第5次
2020-07-17 16:03:39---》》》ActiveCount:2,CompletedTaskCount:5,Queue:2,taskCount:7 Thread-4 线程被调用了。第6次
2020-07-17 16:03:40---》》》ActiveCount:1,CompletedTaskCount:6,Queue:2,taskCount:7 Thread-0 线程被调用了。第7次
可以看到,maxPoolSize+workQuene = 7, 而一共有10个任务,第8个就会抛出错误。
换成:
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();
输出:
2020-07-17 16:04:43---》》》ActiveCount:5,CompletedTaskCount:0,Queue:0,taskCount:7 Thread-0 线程被调用了。第1次
2020-07-17 16:04:44---》》》ActiveCount:5,CompletedTaskCount:1,Queue:1,taskCount:7 Thread-4 线程被调用了。第2次
2020-07-17 16:04:45---》》》ActiveCount:5,CompletedTaskCount:2,Queue:2,taskCount:7 Thread-3 线程被调用了。第3次
2020-07-17 16:04:46---》》》ActiveCount:4,CompletedTaskCount:3,Queue:2,taskCount:7 Thread-2 线程被调用了。第4次
2020-07-17 16:04:47---》》》ActiveCount:3,CompletedTaskCount:4,Queue:2,taskCount:7 Thread-1 线程被调用了。第5次
2020-07-17 16:04:48---》》》ActiveCount:2,CompletedTaskCount:5,Queue:2,taskCount:7 Thread-4 线程被调用了。第6次
2020-07-17 16:04:49---》》》ActiveCount:1,CompletedTaskCount:6,Queue:2,taskCount:7 Thread-0 线程被调用了。第7次
后3个任务之间丢弃,而且不抛错误。
换成:
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
输出:
2020-07-17 16:05:34---》》》ActiveCount:5,CompletedTaskCount:0,Queue:0,taskCount:7 Thread-0 线程被调用了。第1次
2020-07-17 16:05:35---》》》ActiveCount:5,CompletedTaskCount:1,Queue:1,taskCount:7 Thread-4 线程被调用了。第2次
2020-07-17 16:05:36---》》》ActiveCount:5,CompletedTaskCount:2,Queue:2,taskCount:7 Thread-3 线程被调用了。第3次
2020-07-17 16:05:37---》》》ActiveCount:4,CompletedTaskCount:3,Queue:2,taskCount:7 main 线程被调用了。第4次
2020-07-17 16:05:38---》》》ActiveCount:5,CompletedTaskCount:3,Queue:1,taskCount:9 Thread-2 线程被调用了。第5次
2020-07-17 16:05:39---》》》ActiveCount:5,CompletedTaskCount:4,Queue:2,taskCount:9 Thread-1 线程被调用了。第6次
2020-07-17 16:05:40---》》》ActiveCount:4,CompletedTaskCount:5,Queue:2,taskCount:9 Thread-2 线程被调用了。第7次
2020-07-17 16:05:41---》》》ActiveCount:3,CompletedTaskCount:6,Queue:2,taskCount:9 Thread-3 线程被调用了。第8次
2020-07-17 16:05:42---》》》ActiveCount:2,CompletedTaskCount:7,Queue:2,taskCount:9 Thread-4 线程被调用了。第9次
2020-07-17 16:05:43---》》》ActiveCount:1,CompletedTaskCount:8,Queue:2,taskCount:9 Thread-0 线程被调用了。第10次
如果线程表示空闲的,就丢给调用这个线程的主线程去执行。
# 一些api
此外,还有一些方法:
getCorePoolSize():返回线程池的核心线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小; getMaximumPoolSize():返回线程池的最大线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小; getLargestPoolSize():记录了曾经出现的最大线程个数(水位线); getPoolSize():线程池中当前线程的数量; getActiveCount():Returns the approximate(近似) number of threads that are actively executing tasks; prestartAllCoreThreads():会启动所有核心线程,无论是否有待执行的任务,线程池都会创建新的线程,直到池中线程数量达到 corePoolSize; prestartCoreThread():会启动一个核心线程(同上); allowCoreThreadTimeOut(true):允许核心线程在KeepAliveTime时间后,退出;
# 总结
总的来说,使用 Executors 创建的线程池太过于理想化,并不能满足很多现实中的业务场景,所以要求我们通过 ThreadPoolExecutor来创建,并传入合适的参数。
推荐阅读:
- ThreadPoolExecutor的常见的队列到底能够存放多少任务:https://zhuanlan.zhihu.com/p/97485729 (opens new window)
- 浅谈Java并发编程系列(六) —— 线程池的使用:https://segmentfault.com/a/1190000006998440 (opens new window)
- Java并发编程:线程池的使用:https://www.cnblogs.com/dolphin0520/p/3932921.html (opens new window)