欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > ScheduledThreadPoolExecutor源码分析

ScheduledThreadPoolExecutor源码分析

2025/4/21 12:18:58 来源:https://blog.csdn.net/qq_32099833/article/details/147167004  浏览:    关键词:ScheduledThreadPoolExecutor源码分析

前言

Java Timer 提供了延时任务以及周期性任务调度执行的功能,它是基于单线程+小根堆的数据结构来实现的,存在一些局限性,例如:

  1. 单线程模式,任务只能串行化执行,前一个任务的执行耗时,会影响下一个任务的调度执行时间
  2. 任务执行异常,线程会退出,后续任务无法执行
  3. 系统时钟敏感
  4. 数组实现的小根堆不适合维护太大数据量的任务

于是,JDK5 开始提供 ScheduledThreadPoolExecutor 来代替 Timer,它提供了更强大的功能和更高的灵活性,例如:

  1. 多线程模式,任务可以并发执行,减少单个任务执行耗时对整体调度的影响
  2. 对异常的处理更友好,避免单个任务执行异常导致调度器停止
  3. 线程池可以更好的利用系统资源

大多数情况下,ScheduledThreadPoolExecutor 都是更好的选择。

源码分析

ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的子类,很多逻辑是共用父类的那一套。相较于父类,子类的构造函数要简单的多,最多只能配置三个属性:

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler);
}
  • corePoolSize 核心线程数大小
  • threadFactory 线程工厂,用于创建线程
  • handler 拒绝策略

默认最大线程数 Integer.MAX_VALUE,这个参数没有意义,因为是无界队列,达到核心线程数后不会再创建新的线程。

keepAliveTime 默认是0,线程空闲不会销毁,因为要保证有线程来等待任务到期执行。

任务队列强制是 DelayedWorkQueue,不可更改。这是 ScheduledThreadPoolExecutor 内部的一个基于小根堆实现的优先队列,按照任务执行时间升序排列。和 Timer#TaskQueue 的区别是,因为涉及到多线程等待任务到期,为了避免唤醒不必要的线程,多了个 leader 模式。

DelayedWorkQueue

DelayedWorkQueue 是 ScheduledThreadPoolExecutor 的内部类,它是用来维护延时任务的队列,采用的是小根堆的数据结构,底层用数组存储,和 Timer#TaskQueue 差别不大,同样不适合维护太大数据量的任务,否则内存压力会很大。

static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {private static final int INITIAL_CAPACITY = 16;private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private final ReentrantLock lock = new ReentrantLock();private int size = 0;private Thread leader = null;private final Condition available = lock.newCondition();
}
  • INITIAL_CAPACITY 数组初始容量16
  • queue 队列,数组实现的小根堆,延时任务按照执行时间升序排列
  • lock 排它锁
  • size 任务数
  • leader 在队头等待任务到期的线程,因为是多线程模式,避免线程被无意义的唤醒,只有leader线程才会等待任务到期,其它线程会永久等待,直到接收到唤醒信号
  • available 当队列头部有新任务可用或新线程可能需要成为领导者时,发出条件信号

任务入队的过程:

  1. 先抢锁
  2. 如果数组满了,调用 grow 数组会完成1.5倍的扩容
  3. 如果队列是空的,直接放到队头位置,即数组下标0的位置
  4. 如果队列不是空,则放到队尾,然后调用 siftUp 完成堆节点的上滤操作,将节点放到合适的位置。上滤操作其实就是不断和父节点比较,如果父节点比自己小就交换彼此的位置
  5. 任务入队后,如果队头就是自己,则要发出信号,唤醒线程来调度任务
public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;final ReentrantLock lock = this.lock;lock.lock();try {int i = size;if (i >= queue.length)grow();size = i + 1;if (i == 0) {queue[0] = e;setIndex(e, 0);} else {siftUp(i, e);}if (queue[0] == e) {leader = null;available.signal();}} finally {lock.unlock();}return true;
}

Worker 线程会从队列中取出到期的任务并执行,因为采用的是小根堆结构,所以队头永远是最早要执行的任务,任务出队的过程:

  1. 先抢锁
  2. 判断队头是否为空,为空说明没有任务,线程无期限wait,等待任务入队时将自己唤醒
  3. 队头不为空,则判断执行时间是否已到,到了则直接出队。
  4. 如果任务执行时间还没到,则判断自己是不是leader线程,是的话就await任务到期的时间,线程唤醒后会重新执行循环判断
  5. 如果自己不是leader线程,就无限期await
public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return finishPoll(first);first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}
}

任务实际从队列中删除是 finishPoll,因为队头元素出队了,需要补齐。先取出队尾元素 x,然后将x放到队头,再调用 siftDown 完成节点的下滤操作,调整堆结构。下滤操作其实就是不断和子节点比较,如果子节点比自己小,就互换位置,直到自己到一个合适的位置。

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {int s = --size;RunnableScheduledFuture<?> x = queue[s];queue[s] = null;if (s != 0)siftDown(0, x);setIndex(f, -1);return f;
}

ScheduledFutureTask

延时任务会被包装成 ScheduledFutureTask 入队,相较于 FutureTask,额外记录了延时任务的序号、执行时间、执行周期等数据:

private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {private final long sequenceNumber;private long time;private final long period;RunnableScheduledFuture<V> outerTask = this;int heapIndex;
}
  • sequenceNumber 任务序号,全局递增
  • time 任务执行时间
  • period 执行周期,正值表示固定速率执行,负值表示固定延迟执行
  • heapIndex 队列索引,提高任务取消的效率

延时任务的执行,先判断是不是周期性任务,如果不是就直接调用父类方法执行。是周期性任务,任务执行后会重新计算下一次执行的时间,然后任务重新入队。

public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}
}

schedule

提交延时任务是 schedule,任务会被包装成 ScheduledFutureTask,同时 triggerTime 会计算任务执行的时间,delayedExecute 会把任务加入到队列,然后确保启动线程调度任务。

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));delayedExecute(t);return t;
}

delayedExecute 先判断线程池是否关闭,关闭则执行拒绝策略。否则任务先入队,然后调用父类的 ensurePrestart 确保开启Worker线程调度任务执行。Worker 线程是个死循环,不断从队列取出要执行的任务执行。

private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart();}
}

Worker

Worker 线程会调用 runWorker,流程:

  1. while循环监听队列,getTask 从队列中取出要执行的任务
  2. 触发 beforeExecute,以及后续的 afterExecute 钩子函数
  3. 执行延时任务,期间会捕获异常
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}

尾巴

ScheduledThreadPoolExecutor 克服了传统 Timer 类的诸多弊端,其多线程执行模式极大地提升了任务调度的效率与灵活性,有效避免了单线程模式下任务串行执行所导致的相互干扰问题。
在其内部实现机制中,DelayedWorkQueue 基于小根堆的数据结构,确保任务按照执行时间的先后顺序有序排列,这种设计使得任务的调度更加精准高效。Worker 线程通过不断从队列中获取到期任务并执行,配合对任务执行异常的妥善处理,保障了任务调度的稳定性与持续性。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词