Spring Boot定时任务原理
在现代应用中,定时任务的调度是实现周期性操作的关键机制。Spring Boot 提供了强大的定时任务支持,通过注解驱动的方式,开发者可以轻松地为方法添加定时任务功能。本文将深入探讨 Spring Boot 中定时任务的实现原理,重点分析 @EnableScheduling
和 ScheduledAnnotationBeanPostProcessor
的作用,以及任务如何被注册和执行。我们还将详细介绍底层使用的线程池调度器 ThreadPoolTaskScheduler
和 Java 内置的 ScheduledThreadPoolExecutor
,它们如何协同工作,保证定时任务的准确执行。此外,我们还将探讨任务调度的线程阻塞与唤醒机制,深入剖析延迟队列(DelayedWorkQueue
)如何有效管理任务的执行顺序。通过本文的学习,你将能够更好地理解和应用 Spring Boot 定时任务,提升应用的调度能力和性能。
1.注解驱动
Spring Boot通过@EnableScheduling
激活定时任务支持,而EnableScheduling
注解导入了SchedulingConfiguration
,这个类创建了一个名为ScheduledAnnotationBeanPostProcessor
的bean
,而这个bean
就是定时任务的关键
/*** {@code @Configuration} class that registers a {@link ScheduledAnnotationBeanPostProcessor}* bean capable of processing Spring's @{@link Scheduled} annotation.** <p>This configuration class is automatically imported when using the* {@link EnableScheduling @EnableScheduling} annotation. See* {@code @EnableScheduling}'s javadoc for complete usage details.** @author Chris Beams* @since 3.1* @see EnableScheduling* @see ScheduledAnnotationBeanPostProcessor*/
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}}
2.对ScheduledAnnotationBeanPostProcessor
的分析
1. 类职责
- 核心作用:扫描 Spring Bean 中的
@Scheduled
注解方法,将其转换为定时任务,并注册到任务调度器。
2. 定时任务注册的关键流程
代码都是经过简化的代码,实际上我去看Spring的源码,发现代码都很长,但是整体意思是差不多的
Bean 初始化后扫描注解(关键方法:postProcessAfterInitialization
)
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {// 1. 跳过 AOP 基础设施类if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||bean instanceof ScheduledExecutorService) {// Ignore AOP infrastructure such as scoped proxies.return bean;}// 2. 检查类是否包含 @Scheduled 注解Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);if (!nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, List.of(Scheduled.class, Schedules.class))) {// 3. 反射查找所有带 @Scheduled 的方法Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, method -> AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class));// 4. 处理每个带注解的方法annotatedMethods.forEach((method, scheduledAnnotations) -> scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));}return bean;
}
- 跳过无关 Bean:如 AOP 代理类、
TaskScheduler
本身。 - 反射扫描方法:通过
MethodIntrospector
查找所有带有@Scheduled
的方法。 - 注解聚合:支持
@Schedules
多注解合并。
解析任务参数并注册(关键方法:processScheduled
)
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {// 1. 创建 Runnable 任务Runnable runnable = createRunnable(bean, method);// 2. 解析时间参数(cron/fixedDelay/fixedRate)if (StringUtils.hasText(cron)) {// 处理 cron 表达式CronTask task = new CronTask(runnable, new CronTrigger(cron, timeZone));tasks.add(registrar.scheduleCronTask(task));} else if (fixedDelay > 0) {// 处理 fixedDelayFixedDelayTask task = new FixedDelayTask(runnable, fixedDelay, initialDelay);tasks.add(registrar.scheduleFixedDelayTask(task));} else if (fixedRate > 0) {// 处理 fixedRateFixedRateTask task = new FixedRateTask(runnable, fixedRate, initialDelay);tasks.add(registrar.scheduleFixedRateTask(task));}// 3. 注册任务到 ScheduledTaskRegistrarsynchronized (scheduledTasks) {scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>()).addAll(tasks);}
}
- 任务封装:将方法封装为
ScheduledMethodRunnable
。 - 时间参数解析:
- 支持
cron
、fixedDelay
、fixedRate
三种模式。 - 处理
initialDelay
初始延迟。 - 使用
embeddedValueResolver
解析占位符(如${task.interval}
)。
- 支持
- 任务注册:最终任务被添加到
ScheduledTaskRegistrar
。
启动任务调度(关键方法:finishRegistration
)
private void finishRegistration() {// 1. 配置 TaskScheduler(优先级:显式设置 > 查找 Bean > 默认单线程)if (registrar.getScheduler() == null) {TaskScheduler scheduler = resolveSchedulerBean(beanFactory, TaskScheduler.class, false);registrar.setTaskScheduler(scheduler);}// 2. 调用 SchedulingConfigurer 自定义配置(扩展点)List<SchedulingConfigurer> configurers = beanFactory.getBeansOfType(SchedulingConfigurer.class);configurers.forEach(configurer -> configurer.configureTasks(registrar));// 3. 启动所有注册的任务registrar.afterPropertiesSet();
}
- 调度器解析:
- 默认查找名为
taskScheduler
的 Bean。 - 若无则创建单线程调度器(
Executors.newSingleThreadScheduledExecutor()
)。
- 默认查找名为
- 扩展点:允许通过
SchedulingConfigurer
自定义任务注册逻辑。 - 最终启动:调用
afterPropertiesSet()
触发任务调度。
3.ThreadPoolTaskScheduler的剖析
ThreadPoolTaskScheduler
是 Spring 对 Java ScheduledThreadPoolExecutor
的封装,是 @Scheduled
定时任务的底层执行引擎。
- 继承关系:继承
ExecutorConfigurationSupport
,实现TaskScheduler
接口,整合了线程池管理与定时任务调度。 - 底层依赖:基于
ScheduledThreadPoolExecutor
,支持 周期性任务(fixedRate/fixedDelay)和 动态触发任务(如 cron 表达式)。
线程池初始化(关键方法:initializeExecutor
)
同样,这里和以后的部分也都是伪代码
@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {// 创建 ScheduledThreadPoolExecutorthis.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);// 配置线程池策略(如取消后立即移除任务)if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor scheduledPoolExecutor) {scheduledPoolExecutor.setRemoveOnCancelPolicy(this.removeOnCancelPolicy);// 其他策略设置...}return this.scheduledExecutor;
}
这部分是我复制源码的,可以清晰的看到,底层就是new了ScheduledThreadPoolExecutor
protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);}
4.ScheduledThreadPoolExecutor的原理分析
核心成员:
- 任务队列:使用
DelayedWorkQueue
(内部实现的小顶堆),按任务执行时间排序。 - 线程池:复用
ThreadPoolExecutor
的线程管理机制,支持核心线程数和最大线程数配置。
2. 定时任务调度机制
所有定时任务被封装为 ScheduledFutureTask
对象,其核心逻辑如下:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {private long time; // 下一次执行时间(纳秒)private final long period; // 周期(正数:fixedRate;负数:fixedDelay)private int heapIndex; // 在 DelayedWorkQueue 中的索引public void run() {if (isPeriodic()) {// 周期性任务:重新计算下一次执行时间,并重新加入队列setNextRunTime();reExecutePeriodic(outerTask);} else {// 一次性任务:直接执行super.run();}}
}
- 任务提交:通过
schedule
、scheduleAtFixedRate
等方法提交任务。 - 队列管理:任务被封装为
ScheduledFutureTask
并加入DelayedWorkQueue
。 - 线程唤醒:工作线程 (
Worker
) 从队列获取任务,若任务未到执行时间,线程进入限时等待(available.awaitNanos(delay)
)。 - 任务执行:到达执行时间后,线程执行任务:
- 固定速率(fixedRate):执行完成后,根据
period
计算下一次执行时间(time += period
)。 - 固定延迟(fixedDelay):执行完成后,根据当前时间计算下一次执行时间(
time = now() + (-period)
)。
- 固定速率(fixedRate):执行完成后,根据
- 重新入队:周期性任务执行后,重新加入队列等待下次调度。
3.DelayedWorkQueue
的简单剖析
DelayQueue队列是一个延迟队列,DelayQueue中存放的元素必须实现Delayed接口的元素,实现接口后相当于是每个元素都有个过期时间,当队列进行take获取元素时,先要判断元素有没有过期,只有过期的元素才能出队操作,没有过期的队列需要等待剩余过期时间才能进行出队操作。
DelayQueue队列内部使用了PriorityQueue优先队列来进行存放数据,它采用的是二叉堆进行的优先队列,使用ReentrantLock锁来控制线程同步,由于内部元素是采用的PriorityQueue来进行存放数据,所以Delayed接口实现了Comparable接口,用于比较来控制优先级
线程阻塞与唤醒逻辑
(1) 取任务时的阻塞(take() 方法)
当线程调用 take()
方法从队列中获取任务时,若队列为空或队头任务未到期,线程会进入阻塞状态:
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) {available.await(); // 队列为空时无限等待} else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0) return q.poll(); // 任务已到期,取出执行if (leader != null) {available.await(); // 其他线程已为队头任务等待,本线程无限等待} else {Thread thisThread = Thread.currentThread();leader = thisThread; // 标记当前线程为“领导者”try {available.awaitNanos(delay); // 限时等待到期时间} finally {if (leader == thisThread) leader = null;}}}}} finally {if (leader == null && q.peek() != null) available.signal();lock.unlock();}
}
- 关键逻辑:
leader
线程优化:避免多个线程同时等待同一任务到期,仅一个线程(leader)限时等待,其他线程无限等待- 限时等待:通过
available.awaitNanos(delay)
阻塞到任务到期时间。
(2) 插入新任务时的唤醒(offer() 方法)
当新任务被插入队列时,若新任务成为队头(即最早到期),会触发唤醒逻辑:
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e); // 插入任务并调整堆结构if (q.peek() == e) { // 新任务成为队头leader = null;available.signal(); // 唤醒等待线程}return true;} finally {lock.unlock();}
}
- 唤醒条件:
- 插入的任务成为新的队头(即其到期时间最早)。
- 调用
available.signal()
唤醒等待的线程(leader)
或其他线程
(3) 唤醒机制总结
- 何时唤醒:
- 超时唤醒:等待线程因任务到期而被 JVM 自动唤醒。
- 插入新任务唤醒:新任务的到期时间早于当前队头任务时,插入线程会触发唤醒。
- 唤醒对象:
- 若存在
leader
线程(正在限时等待队头任务),优先唤醒它。 - 若无
leader
,唤醒任意一个等待线程
- 若存在