Executor框架
- Executor框架构成
- 异步计算的结果涉及的接口或类
- 多线程实现涉及的接口或类
- 多线程的管理(线程池涉及的接口或类)
- Executor接口
- ExecutorService接口
- AbstractExecutorService抽象类
- ScheduledExecutorService接口
- ThreadPoolExecutor类(线程池核心类)
- ScheduledThreadPoolExecutor类
Executor框架简介:多线程管理所用到的线程池所对应的类 接口组合在一起的所形成的这么一个所谓框架, Executor框架实现的就是线程池的功能
Executor框架构成
Executor框架包括3大部分:
①任务。也就是工作单元,包括被执行任务需要实现的接口:Runnable接口或者Callable接口;
②任务的执行。也就是把任务分派给多个线程的执行机制,包括Executor接口及继承自Executor接口的ExecutorService接口
③异步计算的结果。包括Future接口及实现了Future接口的FutureTask类
Executor框架成员关系图简示:
异步计算的结果涉及的接口或类
Future接口 FutureTask类和CompletableFuture<T>详情介绍详情链接
多线程实现涉及的接口或类
Callable接口详情介绍点击链接
Thread类使用详情介绍点击链接
多线程的管理(线程池涉及的接口或类)
Executor框架的使用
Executor接口
Executor接口是线程池的顶级接口,只有一个方法execute,用于执行Runnable任务
public interface Executor {void execute(Runnable command);
}
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)execute()方法中,由Executor框架完成线程的调配和任务的执行部分。
如ExecutorService接口就增加了一些能力:(
1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
2)提供了管控线程池的方法,比如停止线程池的运行
ExecutorService接口
由于ExecutorService接口是继承的Executor接口,所以自动拥有了execute方法
并扩展 Executor接口,添加任务生命周期管理(如 shutdown())、异步任务提交(submit())等功能方法
public interface ExecutorService extends Executor {//关闭线程池:停止接收新的任务,但是允许已经提交的任务继续执行直到它们完成void shutdown();//关闭线程池:会尝试停止所有正在执行的任务,并返回一个包含未执行任务的列表List<Runnable> shutdownNow();/**检查线程池是否已经开始关闭过程 也就是是否执行了 shutdown 或 shutdownNow 方法,*需要注意的是,如果调用 isShutdown() 方法的返回结果为 true,不代表线程池此时已经彻底关闭了,*这仅仅代表线程池开始了关闭的流程。也就是说,此时可能线程中依然有线程在执行任务,*队列里也可能有等待被执行的任务*/boolean isShutdown();/*** isTerminated()方法用于判断ExecutorService是否已经终止。*isTerminated()方法的返回值是一个布尔值,如ExecutorService已经终止,则返回true*需要注意的是,除非先调用shutdown或shutdownNow,否则isTerminated()方法永远不返回true*/boolean isTerminated();/**用于判断 等待所有任务在工作队列中是否执行完毕。它接收两个参数:一个是等待时间(long 类型),另一个是时间单位(TimeUnit 枚举类型)。如果在指定时间内所有任务都完成了执行,该方法将返回 true;如果在指定时间内还有任务未完成,该方法将返回 false*/boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;/*** 提交一个Callable任务,该任务有返回值。* 返回的Future对象的get()方法将返回Callable任务的返回值*/<T> Future<T> submit(Callable<T> task);/*** 提交一个Runnable任务,并指定一个结果T result。* 即使Runnable任务没有返回值,也可以通过Future对象的get()方法获取到这个结果* 注意Runnable任务是没有返回值,想要获取结果是通过submit方法,你可以传递一个结果引用T result* 因为这个引用可以在任务执行过程中被修改,从而在主线程中获取到任务的执行结果。* 这是因为submit方法实际上返回了一个Future对象,该对象可以用来获取任务的执行结果*/<T> Future<T> submit(Runnable task, T result);//提交一个Runnable任务,该任务没有返回值。返回的Future对象的get()方法将返回nullFuture<?> submit(Runnable task);//invokeAll方法是一个用于 批量提交多个任务 并等待所有任务完成,//并返回一个包含每个任务对应 Future 的列表<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;//是一个用于批量提交多个任务并控制超时时间,返回一个包含所有任务结果的 Future 列表<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;//取得第一个完成任务的结果值,当第一个任务执行完成后,会调用interrupt方法将其他的任务中断取消<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;//invokeAny() 方法用于并发执行多个任务并返回第一个成功完成的任务结果(或在超时前返回)<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
总结
1.关闭线程池方法(shutdownNow(),shutdown()方法)
2.关闭线程池方法配合使用的方法
a.isTerminated()(用于判断ExecutorService是否已经终止)
b.awaitTermination()(用于判断 等待所有任务在工作队列中是否执行完毕)
3.submit() 和 自动继承的execute(Runnable a)方法区别
相同点是都可以启动线程,
不同点
①submit()可以接收Runnable或Callable类型的参数 ,execute(Runnable a)只能接收Runnable类型的参数
②submit()可以返回Future对象,从而可以获取执行结果或者需要捕获执行过程中的异常,而execute()方法无返回值
AbstractExecutorService抽象类
AbstractExecutorService抽象类提供了ExecutorService接口部分方法的默认实现,并新增了部分方法,这是一个抽象类,但是该类中没有一个抽象方法,为何还要定义成一个抽象类
抽象类只能被继承,不可以实现,并且抽象类不可以实例化,这些都是抽象类的特性
AbstractExecutorService 被定义为抽象类的原因可以从以下几个方面来理解:
1.防止实例化:抽象类的一个重要特性是不能被实例化。AbstractExecutorService 作为一个抽象类,确保了它不能被直接实例化,而是必须通过其子类来实现具体的功能。这有助于保证 ExecutorService 接口的正确实现,避免直接使用不完整的实现。
2.提供模板方法:AbstractExecutorService 提供了一些默认的实现方法,这些方法可以被子类直接使用或重写。这种设计模式被称为模板方法模式,它允许在抽象类中定义算法的骨架,而将一些步骤的实现延迟到子类中。这种方式提高了代码的复用性和可维护性。
3.强制子类实现特定方法:虽然 AbstractExecutorService 中没有显式的抽象方法,但它可能包含了一些需要子类实现的方法。通过将这些方法定义为抽象方法,可以强制子类提供具体的实现,从而确保子类的行为符合预期。
4.设计意图:从设计角度来看,AbstractExecutorService 作为一个抽象类,表明它是一个不完整的实现,需要通过子类来完成。这种设计意图可以通过将类定义为抽象类来明确传达给开发者
public abstract class AbstractExecutorService implements ExecutorService {//将Runnable任务包装为RunnableFuture<T> //参数runnable:原始任务 value:任务完成后返回的结果protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}// 将 Callable 任务包装为 RunnableFuture<T>//参数callable:原始任务 返回值:RunnableFuture<T> 实例protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}//提交 Runnable 任务并指定返回结果。public Future<?> submit(Runnable task) {..................................................}//提交 Callable 任务,返回 Future 对象public <T> Future<T> submit(Runnable task, T result) {...................................................}//提交 Runnable 任务,返回 Future<?>(结果为 null)public <T> Future<T> submit(Callable<T> task) {....................................................}private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {........................................................}//返回第一个成功完成的任务结果public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {......................................................................}//带超时的 invokeAnypublic <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {return doInvokeAny(tasks, true, unit.toNanos(timeout));}//提交所有任务并阻塞等待全部完成public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {..................................................................}//提交所有任务并带超时等待public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {...........................................................}}
总结
1.除了以上方法继承ExecutorService接口提供了默认实现方法,其余的还是继承ExecutorService接口接口方法的特征,并且新增了newTaskFor方法
2.顶级接口Executor接口的execute方法也通过继承-实现等操作到子类这里,具体实现还是由继承 AbstractExecutorService的子类去进行重写实现
// 继承 AbstractExecutorService类重写该方法就这样的@Overridepublic void execute(Runnable command) {}
ScheduledExecutorService接口
ScheduledExecutorService接口继承了ExecutorService 接口,并且新增了关于任务的延迟执行、固定频率执行和固定延迟执行等方法
public interface ScheduledExecutorService extends ExecutorService {//单次延迟执行任务:提交一个 Runnable 任务,在指定延迟后 单次执行//ScheduledFuture<?>,用于跟踪任务状态或取消任务public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);//提交一个 Callable 任务,在指定延迟后 单次执行,并返回计算结果//参数command:待执行的任务 delay:延迟时间 unit:时间单位(如 TimeUnit.SECONDSpublic <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);//提交一个 Runnable 任务,按固定频率 周期性执行//参数initialDelay:首次执行的延迟时间 period:连续两次任务开始时间的间隔public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);//提交一个Runnable任务,按固定延迟周期性执行//initialDelay:首次执行的延迟时间。delay:上一次任务结束 到下一次任务开始的时间间隔public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);}
代码示例
public class ScheduledTaskExample {public static void main(String[] args) throws InterruptedException {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);// 单次延迟任务scheduler.schedule(() -> System.out.println("单次延迟任务"),2, TimeUnit.SECONDS);// 固定频率任务ScheduledFuture<?> fixedRateFuture = scheduler.scheduleAtFixedRate(() -> System.out.println("固定频率任务 - 时间: " + System.currentTimeMillis()),1, 3, TimeUnit.SECONDS);//如果任务抛出未捕获的异常,周期性任务会终止,后续执行不再继续,需在任务内部处理异常scheduler.scheduleAtFixedRate(() -> {try {// 业务逻辑} catch (Exception e) {// 记录日志或恢复状态}}, 1, 5, TimeUnit.SECONDS);// 固定延迟任务ScheduledFuture<?> fixedDelayFuture = scheduler.scheduleWithFixedDelay(() -> System.out.println("固定延迟任务 - 时间: " + System.currentTimeMillis()),1, 3, TimeUnit.SECONDS);// 运行 10 秒后关闭Thread.sleep(10_000);fixedRateFuture.cancel(true);//Future的cancel取消方法fixedDelayFuture.cancel(true);scheduler.shutdown();//调用shutdown()或shutdownNow()关闭线程池,否则JVM不会退出}
}
ThreadPoolExecutor类(线程池核心类)
可以说ThreadPoolExecutor 就是线程池,实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务
ThreadPoolExecutor类(线程池)介绍使用点击链接
主要就三点
1.ThreadPoolExecutor类线程池的生命周期维护管理
2.ThreadPoolExecutor类线程池对于任务的分配管理
3.ThreadPoolExecutor类线程池对于线程池中的线程生命周期管理
ScheduledThreadPoolExecutor类
1. 核心继承与接口
ScheduledThreadPoolExecutor是Java并发包中的一个线程池实现,用于执行定时或周期性任务。它继承自ThreadPoolExecutor,所以应该具备基本的线程池功能,同时扩展了定时或延迟任务的能力
public class ScheduledThreadPoolExecutor//继承自ThreadPoolExecutor:复用线程池的基础功能(如线程管理、任务队列)。extends ThreadPoolExecutor//实现ScheduledExecutorService:提供定时任务的调度接口(如 schedule, scheduleAtFixedRate) implements ScheduledExecutorService {
}
虽然继承自ThreadPoolExecutor:复用线程池的基础功能,但是有区别
ScheduledThreadPoolExecutor的构造函数最多只有三个参数
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler);}
少了maximumPoolSize,keepAliveTime,TimeUnit,BlockingQueue workQueue四个参数,原因在于ScheduledThreadPoolExecutor的内部类DelayedWorkQueue队列
2. 核心数据结构
任务队列:DelayedWorkQueue
DelayedWorkQueue专为 ScheduledThreadPoolExecutor 优化使用的类
DelayedWorkQueued介绍点击链接
①DelayedWorkQueue是初始化为16且动态扩容的,是无界的
所以线程池非核心线程不会触发,只会在核心线程满了进入队列然后队列动态扩容,所以关于非核心线程的三个参数都没了,因为无界队列,因此需要主要OOM内存溢出问题
②DelayedWorkQueue是一个基于堆的优先级队列:内部使用数组实现小顶堆,按任务的触发时间(time 字段)排序。我们定时周期或延迟执行的任务会封装进入DelayedWorkQueue,通过delayedExecute(sft)方法提交到队列,所以参数BlockingQueue workQueue也没有了
任务封装:ScheduledFutureTask
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {private long time; // 任务触发时间(纳秒)private final long period; // 周期(正数:固定速率;负数:固定延迟)//...
}
触发时间排序:通过 compareTo 方法实现堆排序。
周期模式:
- 固定速率(Fixed Rate):基于初始调度时间计算下一次触发时间。
- 固定延迟(Fixed Delay):基于任务实际完成时间计算下一次触发时间。
3.调度逻辑
①任务提交schedule 方法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {// 包装任务为 ScheduledFutureTaskScheduledFutureTask<Void> sft = new ScheduledFutureTask<>(command, null, triggerTime(delay, unit));// 提交到队列delayedExecute(sft);return sft;
}
②delayedExecute 方法
private void delayedExecute(RunnableScheduledFuture<?> task) {//如果线程池已关闭,执行拒绝策略。if (isShutdown())reject(task);else {//将任务添加到 DelayedWorkQueue。super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart();}}
4.周期性任务处理
run 方法重写(在 ScheduledFutureTask 中):
public void run() {if (!isPeriodic()) {super.run(); // 一次性任务直接执行} else if (runAndReset()) { // 周期性任务执行后重置状态// 计算下一次触发时间setNextRunTime();// 重新加入队列reExecutePeriodic(outerTask);}
}
固定速率:time += period(基于初始时间)。
固定延迟:time = now() + period(基于当前时间)。
5. 线程池配置
- 核心线程数固定:默认不回收核心线程(allowCoreThreadTimeOut 默认为 false),确保及时处理定时任务。
- 最大线程数无效:由于使用无界队列,线程数不会超过核心线程数(除非设置 allowCoreThreadTimeOut == true)
总结关键问题与注意事项
任务堆积与 OOM
风险:DelayedWorkQueue 是无界的,大量提交任务会导致队列膨胀。
解决方案:监控任务提交速率,或自定义有界队列(需处理拒绝策略)。
任务取消与删除
remove 方法:直接从队列删除任务,但时间复杂度为 O(n)(堆结构导致低效)。
purge 方法:清理所有已取消的任务,减少内存占用。(父类ThreadPoolExecutor里面的方法)
异常处理
静默吞噬异常:任务执行抛异常后,后续周期任务不再执行(需在任务内捕获异常)