ThreadPoolExecutor
- 前言
- 核心参数
- JDK默认的线程池创建方式
- 1. 定长线程池:Executors.newFixedThreadPool(100);
- 2. 简单线程池: Executors.newSingleThreadExecutor();
- 3. 缓存线程池:Executors.newCachedThreadPool();
- 4. 定时线程池:Executors.newScheduledThreadPool(10);
- 自定义线程池
- 核心线程数 (corePoolSize):
- 最大线程数 (maximumPoolSize):
- 线程池提交方式
前言
多线程是我们开发中永远绕不开的,但要合理的使用它就需要用到线程池,线程池是一种通过复用现有线程来管理和优化多线程任务的一种机制,在面试中也是必问的知识点,本篇深入理解线程池的各个属性和使用。
核心参数
深入源码可知线程池有七个参数:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}
- corePoolSize(核心线程数) :
当没有其他特殊设置的情况下,就算没有请求需要处理,这些线程也不会被回收会永远存在。
(理解:比如指定的核心线程数为2,服务启动时并不会被创建,而是有请求过来时线程才会被创建,如果请求只有一个,处理完成后线程池中就会保留一个线程,如果请求超过了2个,那么最后全部处理完成后线程池中会保留两个线程) - maximumPoolSize(最大线程数=核心线程数+非核心线程数):
就是线程池中最大能同时存在的线程数量
(理解:当队列中的容量装不下任务时,就会创建非核心线程 (即超过corePoolSize
但小于maximumPoolSize
的线程) 来处理任务,当非核心线程处理完任务后,如果队列中还有任务等待处理,则非核心线程继续去处理队列中的任务,当没有任务需要非核心线程处理时,那么非核心线程空闲的时间如果超过了设置的超时时间非核心线程就会被回收) - keepAliveTime(空闲时间):
非核心线程空闲没有处理任务超过这个时间就会被摧毁或者回收。
- unit(空闲时间单位) :时分秒…
- workQueue(任务队列) :当同一时间段需要处理的任务数量超过了核心线程数量后,超过的部分会进入队列中等待。
任务队列常用的有四种:
- java.util.concurrent.SynchronousQueue: 同步队列,
该队列没有容量为0
,所以当需要护理的任务超过核心线程数时,任务不会进入队列,而是直接创建非核心线程来处理,不用等待。该队列起到了一个中转站的作用。- java.util.concurrent. LinkedBlockingQueue:
无(有)界队列,该队列默认为无限制容量大小,可无限的向队列中放入任务,但非常消耗内存资源,极端情况下可能处理OOM异常
,因此非核心线程也不会被创建,空闲时间不会生效、淘汰策略永远不会被触发。(注:该队列也不是严格意义上的无界,它也可指定容量大小, 插入和删除使用独立的锁,适合高并发场景,但内存开销较大 )- java.util.concurrent. ArrayBlockingQueue:
有界队列,创建时必须指定容量大小,当容量不足时创建非核心线程
, 插入和删除使用单一锁,内存利用更好,适合固定容量的队列,性能较快,但高并发下可能存在锁竞争。- java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue :
为每个进入队列的任务设置一个定时时间,当超过定时时间时才去处理该任务,查看源码可知默认初始化容量为16
.
- threadFactory(线程工厂) :描述如何创建一个线程,一般使用默认值(Executors.defaultThreadFactory()),也可自定义继承ThreadFactory类,实现newTread()方法
- handler(任务淘汰策略) :当线程池的数量达到了最大线程数,那么就会触发淘汰策略,将多出的任务进行处理,官方默认为new AbortPolicy()
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
根据源码可知淘汰策略有四种:
- java.util.concurrent. AbortPolicy:当满足拒绝条件时,
直接抛出异常RejectedExecutionException
- java.util.concurrent. CallerRunsPolicy:当满足拒绝条件时,
将拒绝的任务交由提交任务的线程进行处理
- java.util.concurrent.DiscardOldestPolicy: 当满足拒绝条件时,
將队列中旧的任务替换为新提交的任务。
- java.util.concurrent.DiscardPolicy: 当满足拒绝条件时,
悄无声息的将被拒绝的任务丢弃掉,不抛出异常
JDK默认的线程池创建方式
一共提供了4种
1. 定长线程池:Executors.newFixedThreadPool(100);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
由源码可知,他的 核心线程数与最大线程数相同 空闲时间为0 队列为无界队列, 说明线程数量永远是固定的,并且任务数量可以无限制的加入到队列中,极端情况下可能发生OOM
2. 简单线程池: Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
由源码可知,他的 核心线程数与最大线程数都被指定为1个,它和定长线程池特性基本一样
,只不过是官方指定数量为1不能更改,也会发生OOM
3. 缓存线程池:Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
由源码可知,他的 核心线程数为1 而最大线程数都 Integer.MAX_VALUE
,相当于无限大,那么由任务需要处理则会里面进入队列,但是队列使用的是SynchronousQueue
,他的容量为0,所以线程池会立马创建一个新的线程进行处理,同一时间段存在多少任务就创建多少线程进行处理,如果任务处理完60s后,全部线程会被回收,空闲时间线程池不消耗任务资源,但极端情况下可能会耗尽系统资源
。
4. 定时线程池:Executors.newScheduledThreadPool(10);
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue());
}
由源码可知,他的 核心线程数为开发者给定 而最大线程数固定为 Integer.MAX_VALUE
,相当于无限大,因为我们是定时线程池,所以用的是 DelayedWorkQueue
定时队列.
定时线程池有3种使用方式:
- 指定一个延时时间然后只执行一次
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.schedule(() -> System.out.println("定时任务执行"), 1, TimeUnit.SECONDS);
- 给定一个固定时间频率执行,下一次执行时间是按照上一个任务执行完结束时间加上间隔时间
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.scheduleWithFixedDelay(() -> System.out.println("扫描数据库邮件表,并发送邮件"), 10, 5, TimeUnit.SECONDS);
- 给定一个固定时间频率执行,下一次执行时间是按照上一个任务执行开始时间加上间隔时间, 所以不管上一个任务是否执行完成,只要时间到了,下一个任务依然要开始处理
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.scheduleAtFixedRate(() -> System.out.println("定时任务执行"), 1, 1, TimeUnit.SECONDS);
自定义线程池
前面学习了线程池的各个属性,在真实开发中我们该如何使用线程池呢,各个参数该怎么设置,或者直接使用JDK默认的4中创建方式?
默认的方式一般都是用于特殊的场景,对于通用场景存在缺陷,所以一般需要根据项目实际情况自己设置相关参数。
核心线程数 (corePoolSize):
通常,核心线程数的配置可以依据服务器的 CPU 核心数 和 任务特性 来调整:
- 对于 CPU 密集型(如计算、加密等)任务,核心线程数通常设置为 CPU 核心数,即
Runtime.getRuntime().availableProcessors()
。因为 CPU 密集型任务主要消耗 CPU 资源,线程数接近 CPU 核心数可以充分利用计算资源。 - 对于 I/O 密集型(如网络、数据库、文件系统等)任务,由于线程经常处于等待 I/O 的状态,核心线程数可以设置为 CPU 核心数的 2 倍或更多,以便有更多的线程处理其他请求。
int corePoolSize = Runtime.getRuntime().availableProcessors(); // CPU 密集型
int corePoolSize = 2 * Runtime.getRuntime().availableProcessors(); // I/O 密集型
最大线程数 (maximumPoolSize):
最大线程数的配置要根据业务压力和允许的最大并发度来设置:
- 对于 CPU 密集型任务,最大线程数可以保持和核心线程数一致或略大一些,因为过多的线程反而会增加上下文切换的开销。
- 对于 I/O 密集型任务,最大线程数可以大于核心线程数,通常为核心线程数的 2 到 3 倍,因为这些任务更多是等待 I/O 完成而非消耗 CPU 资源。
int maxPoolSize = Runtime.getRuntime().availableProcessors(); // CPU 密集型
int maxPoolSize = 3 * Runtime.getRuntime().availableProcessors(); // I/O 密集型
注1:其余的参数配置根据项目实际的使用环境等多方面考虑决定,比如高并发低延迟或者任务量大以及服务器资源限制等。
注2:对于拒绝策略,如果想对拒绝的任务特殊处理,也可以自定义拒绝策略,继承实现
RejectedExecutionHandler
接口方法即可
线程池提交方式
两种提交方式
execute()
执行后不返回结果
public class ThreadPoolNotResultSubmitTest {/*** 使用默认的线程工厂*/private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) {THREAD_POOL_EXECUTOR.execute(() ->{System.out.println("线程池执行任务,线程名为: " + Thread.currentThread().getName());});}
}
submit()
执行完后可以获取结果
public class ThreadPoolResultSubmitTest {/*** 使用默认的线程工厂*/private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {Future<String> future = THREAD_POOL_EXECUTOR.submit(() -> {System.out.println("我执行了");return String.format("我是执行结果,我被线程【%s】执行", Thread.currentThread().getName());});System.out.println("线程执行结果: "+future.get());}
}
submit()
提交任务时不会阻塞,它是异步的,立即返回一个 Future
,但是使用future.get()
后如果还未得到结果,就会阻塞直到获取结果,也可使用方法 isDone()
判断是否已经返回结果,然后在进行获取操作,也可对future.get()
设置超时时间等。