文章目录
- 1. 概要
- 2. 线程的核心参数
- 2.1.核心线程和最大线程
- 2.2.任务队列
- 2.2.1.ArrayBlockingQueue
- 2.2.2.LinkedBlockingQueue
- 2.2.3.SynchronousQueue
- 2.2.4.PriorityBlockingQueue
- 2.2.5.DelayQueue
- 2.2.7.LinkedBlockingDeque
- 2.3 keepAliveTime
- 2.4 ThreadFactory
- 2.5 拒绝策略
- 3. JDK 提供的几种默认的线程池实现
- 3.1 newFixedThreadPool
- 3.2 newCachedThreadPool
- 3.3 newSingleThreadExecutor
- 3.4 newScheduledThreadPool
- 4. 为什么不推荐使用上面的线程池
- 5. 小结
1. 概要
线程池(Thread Pool)是一种并发编程的技术,用于管理一组线程,以便复用这些线程来执行多个任务。使用线程池的核心目的就是用来减少线程的创建和销毁的开销,从而能提高系统的响应性能,同时线程池对线程的管理也能避免线程创建过多导致内存溢出,这篇文章主要是介绍线程池的用法,线程池的线程执行原理将会在下一篇文章中介绍。
2. 线程的核心参数
要想知道线程的核心参数和组件是什么,我们直接去看线程的创建就知道了
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}
2.1.核心线程和最大线程
- 核心线程
corePoolSize
:在线程池中有有一些线程是创建出来之后就不会被回收,会在线程池中一直保持活跃的状态。核心线程的目的就是:确保有任务到来能够快速响应请求 - 最大线程数
maximumPoolSize
:线程池中允许的最大线程数。当任务数量超过核心线程数且工作队列已满时,线程池可以创建更多的线程,但总数不能超过 maximumPoolSize
下面是核心线程数和最大线程数的工作原理:
- 初始化:
- 当线程被创建出来的时候,核心线程数会被初始化为 0,所以并不是一开始就创建这么多线程存在线程池中
- 添加任务
- 随着任务被添加,核心线程数也会逐渐增加到
corePoolSize
,这时候再添加任务就会把这些任务加入到任务队列里面等待处理 - 当队列也放满任务的时候,新加入的任务会由线程池创建
非核心线程
去处理,当核心线程 + 非核心线程的数量
达到maximumPoolSize
的时候,再加入任务就会执行拒绝策略了
- 随着任务被添加,核心线程数也会逐渐增加到
- 线程回收
- 非核心线程在空闲一段时间之后(
keepAliveTime
)就会被回收 - 核心线程是不会被回收的,除非线程池关闭了
- 非核心线程在空闲一段时间之后(
2.2.任务队列
从 2.1 可以得知,任务队列就是存储那些没有被核心线程处理的任务,JDK 中有下面几种常见任务队列
2.2.1.ArrayBlockingQueue
- 类型: 有界阻塞队列(Bounded Blocking Queue)。
- 特点: 基于数组实现,容量固定不可修改
- 适用场景: 比较适合用于需要控制任务队列大小的场景,避免任务队列无限增长
2.2.2.LinkedBlockingQueue
- 类型: 可选的有界阻塞队列<.font>
- 特点: 基于链表实现,可以设置容量大小,如果不设置就默认创建一个无界队列
- 适用场景: 适用于任务数量波动较大,但希望避免任务队列无限增长的场景
2.2.3.SynchronousQueue
- 类型: 同步阻塞队列
- 特点:
没有容量
,也就是说每一次插入任务的时候必须要等到这个任务被执行了,才能继续往里面添加,有点像队列长度为 1 的生产者消费者模式 - 适用场景: 适用于任务提交和执行非常频繁的场景,任务提交后立即执行,不会在队列中等待
2.2.4.PriorityBlockingQueue
- 类型: 无界优先级阻塞队列(Unbounded Priority Blocking Queue)
- 特点: 基于优先级堆(数组)实现,可以按照设定的方式进行排序
- 适用场景: 适用于任务需要按照优先级顺序执行的场景
2.2.5.DelayQueue
- 类型: 无界延迟阻塞队列(Unbounded Delay Blocking Queue)
- 特点: 基于 PriorityQueue 实现的延时队列,任务必须在指定的延迟时间后才能被执行,队列中的任务按照延迟时间排序
- 适用场景: 适用于需要延迟执行任务的场景,例如定时任务
2.2.7.LinkedBlockingDeque
- 类型: 可选有界双端阻塞队列(Optional Bounded Blocking Deque)
- 特点: 基于链表实现,支持从队列的两端插入和移除任务,也可以设置初始容量和不设置初始容量,如果不设置默认容量是
Integer.MAX_VALUE
- 适用场景: 适用于需要从队列两端操作任务的场景
2.3 keepAliveTime
keepAliveTime
就是非核心线程的保活时间,也就说当添加任务的时候如果队列已经塞满了,这时候会创建非核心线程去执行任务,这些非核心线程的保活时间就是keepAliveTime
,超过这个时间,这些非核心线程就会被回收调
2.4 ThreadFactory
线程工厂,这是一个接口,用于创建新的线程
public interface ThreadFactory {Thread newThread(Runnable r);
}
线程池里面的线程就是通过这个接口创建出来的,我们可以自己设置一个线程工厂,然后自定义线程的名称、优先级、是否为后台线程(Daemon Thread)、线程组等…
举个例子,下面这段代码中我们自定义了一个任务队列
public class Pra {public static void main(String[] args) {// 自定义 ThreadFactoryThreadFactory customThreadFactory = new ThreadFactory() {private int count = 0;@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("Thread-Count-" + count++); // 设置线程名称thread.setPriority(Thread.NORM_PRIORITY); // 设置线程优先级thread.setDaemon(false); // 设置是否为后台线程return thread;}};// 创建线程池,并使用自定义的 ThreadFactoryExecutorService executor = Executors.newFixedThreadPool(5, customThreadFactory);// 提交任务for (int i = 0; i < 10; i++) {executor.submit(() -> {System.out.println("Executing task on thread: " + Thread.currentThread().getName());try {Thread.sleep(1000); // 模拟任务执行时间} catch (InterruptedException e) {e.printStackTrace();}});}// 关闭线程池executor.shutdown();}}
结果输出如下:
可以看到,输出的线程名字都是我们自己定义的名字
2.5 拒绝策略
在使用线程池时,当线程池中的任务队列已满且无法再接受新的任务时,就会触发拒绝策略,Java 提供了几种内置的拒绝策略
- AbortPolicy
- 默认的拒绝策略,当任务无法提交的时候就会抛出
RejectedExecutionException
异常
- 默认的拒绝策略,当任务无法提交的时候就会抛出
/*** 拒绝策略: 直接抛异常*/
public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}
}
下面看个例子,
- CallerRunsPolicy
- 当任务无法提交的时候就会把这个任务
交给调用线程执行
- 这样就能避免任务被丢弃,但是有可能会导致调用者线程被阻塞
- 当任务无法提交的时候就会把这个任务
/*** 拒绝策略: 如果线程池没有关闭,当前线程去运行*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}
}
- DiscardPolicy
- 当任务无法提交时,直接丢弃该任务,不做任何处理
- 如果任务不重要就可以用这个拒绝策略直接丢弃
/*** 拒绝策略: 什么也不管*/public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}
- DiscardOldestPolicy
- 当任务无法提交时,丢弃任务队列中最旧的任务,然后尝试重新提交当前任务
- 适合在需要尽快处理新任务的情况下使用
/*** 拒绝策略: 抛弃最早的一条任务,加入当前任务*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// 丢弃最早的一条任务if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}
}
下面看一个具体的例子,我们自己定义一个线程池,核心线程数是 1,最大线程数是 1,任务队列长度是 2,然后我们往里面塞 5 个任务,从定义可知这个线程池最多只能处理 3 个任务,所以最后两个任务会执行拒绝策略
public class Pra {public static void main(String[] args) {// 创建线程池,指定拒绝策略ThreadPoolExecutor executor = new ThreadPoolExecutor(1, // 核心线程数1, // 最大线程数60, // 线程空闲时间TimeUnit.SECONDS,new LinkedBlockingQueue<>(2), // 任务队列new ThreadPoolExecutor.AbortPolicy() // 拒绝策略);// 提交任务for (int i = 0; i < 5; i++) {final int index = i;executor.submit(() -> {System.out.println("Thread Name: " + Thread.currentThread().getName() + ", index: " + index);try {Thread.sleep(2000); // 模拟任务执行时间} catch (InterruptedException e) {e.printStackTrace();}});}// 关闭线程池executor.shutdown();}}
上述输出如下:
这就是第一个 AbortPolicy 的输出,抛出异常 RejectedExecutionException,其余三个输出分别如下:
- CallerRunsPolicy:调用线程执行
- DiscardPolicy:丢弃任务
- DiscardOldestPolicy:丢弃队列中最早的任务,也就是
index 1
和index 2
3. JDK 提供的几种默认的线程池实现
JDK 提供了几种默认的线程池实现,这些线程池适合用于各种不同的场景
3.1 newFixedThreadPool
用于创建一个包括无界队列的线程池,特点如下:
- 核心线程数和最大线程数相同,意味者线程池能处理的任务就是
核心线程数 + 队列长度
- 队列使用了
LinkedBlockingQueue
,长度没有设置,意味者里面用了一个无界队列,需要注意任务过多导致的内存溢出的问题
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}
3.2 newCachedThreadPool
用于创建一个没有核心线程的线程池,特点如下:
- 核心线程数为
0
,最大线程数是Integer.MAX_VALUE
,这意味着能处理的任务数没有限制,同时创建出来的线程60S
内没有处理任务就会被回收掉 - 队列使用了
SynchronousQueue
,不存放任务,需要注意线程创建过多导致的内存溢出的问题 - 适合处理大量短期任务
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}
3.3 newSingleThreadExecutor
创建一个单线程的线程池,所有任务将按照提交的顺序依次执行,特点如下:
- 核心线程数为
1
,最大线程数是1
,意味者只有一个线程 - 队列使用了
LinkedBlockingQueue
,容量没有限制,需要注意任务过多导致的内存溢出的问题 - 适合用于对执行顺序有要求的任务
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
3.4 newScheduledThreadPool
创建一个可调度任务的线程池,适用于需要延迟执行或定期执行任务的场景,特点如下:
- 核心线程数可设置,最大线程数是
Integer.MAX_VALUE
,意味者可以接收的任务没有限制 - 队列使用了
DelayedWorkQueue
,支持延迟执行和定期执行任务,需要注意线程过多导致的内存溢出的问题 - 适合用于执行一些定时任务的场景,比如定时提醒…
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
4. 为什么不推荐使用上面的线程池
虽然 JDK 给我们提供了这几个线程池的创建方法,但是或多或少都有一些局限,有可能会导致 性能问题
或者 内存溢出
的问题
- 任务队列无界
- 可以看到上面的这几个线程池中
newFixedThreadPool
和newSingleThreadExecutor
创建的都是无界队列,这就导致当任务比较多的时候都堵塞在任务队列里面有可能导致内存溢出
- 可以看到上面的这几个线程池中
- 线程数没有限制
- 剩下两个线程池
newCachedThreadPool
和newScheduledThreadPool
的线程数都是没有限制的,这就导致当任务比较多的时候创建太多的线程去执行任务有可能导致内存溢出
- 剩下两个线程池
- 参数无法自定义
- 默认线程池没有提供任务优先级的支持和定制化的拒绝策略
- 在高并发场景下,可能需要根据任务的紧急程度、重要性等因素来排队或拒绝某些任务,而默认线程池无法满足这些需求
- 同时线程池里面我们也没办法自己设置线程的名称等属性…
总之还有很多的限制这里就不说了,我们可以通过自定义线程池参数的方式去更好控制线程池的行为,满足具体应用的要求,设计到下面几个方面:
- 合理设置线程数量,根据系统的 CPU 核心数和任务类型(CPU 密集型或 I/O 密集型),合理设置线程池的核心线程数和最大线程数
- 有界任务队列,限制任务队列大小,避免无限制增加
- 自定义拒绝策略,一般我自己在工作中写的话都会用到第二种策略,就是让调用线程去执行
- 线程工厂,通过工厂去自定义线程的命名、优先级等属性,便于调试和监控
/*** 构造函数*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// 参数设置if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
可以通过上面的构造函数去自己设定合理的参数,构建线程池
5. 小结
到这里就介绍完核心线程的一些参数和用法了,其实线程池的使用很简单,但是里面的一些原理流程还是值得深究的,就比如下面两个问题:
- 线程池是如何做到核心线程不回收,非核心线程回收的?
- 线程池的任务执行流程是怎么样的?
- 执行任务的过程中发生异常了,会发生什么?
这些问题都需要从源码中才能找到答案了,下一篇文章我将会详细讲解线程池的源码
如有错误,欢迎指出!!!