國外炫酷網(wǎng)站外貿(mào)平臺app
Spring Boot定時(shí)任務(wù)原理
在現(xiàn)代應(yīng)用中,定時(shí)任務(wù)的調(diào)度是實(shí)現(xiàn)周期性操作的關(guān)鍵機(jī)制。Spring Boot 提供了強(qiáng)大的定時(shí)任務(wù)支持,通過注解驅(qū)動的方式,開發(fā)者可以輕松地為方法添加定時(shí)任務(wù)功能。本文將深入探討 Spring Boot 中定時(shí)任務(wù)的實(shí)現(xiàn)原理,重點(diǎn)分析 @EnableScheduling
和 ScheduledAnnotationBeanPostProcessor
的作用,以及任務(wù)如何被注冊和執(zhí)行。我們還將詳細(xì)介紹底層使用的線程池調(diào)度器 ThreadPoolTaskScheduler
和 Java 內(nèi)置的 ScheduledThreadPoolExecutor
,它們?nèi)绾螀f(xié)同工作,保證定時(shí)任務(wù)的準(zhǔn)確執(zhí)行。此外,我們還將探討任務(wù)調(diào)度的線程阻塞與喚醒機(jī)制,深入剖析延遲隊(duì)列(DelayedWorkQueue
)如何有效管理任務(wù)的執(zhí)行順序。通過本文的學(xué)習(xí),你將能夠更好地理解和應(yīng)用 Spring Boot 定時(shí)任務(wù),提升應(yīng)用的調(diào)度能力和性能。
1.注解驅(qū)動
Spring Boot通過@EnableScheduling
激活定時(shí)任務(wù)支持,而EnableScheduling
注解導(dǎo)入了SchedulingConfiguration
,這個(gè)類創(chuàng)建了一個(gè)名為ScheduledAnnotationBeanPostProcessor
的bean
,而這個(gè)bean
就是定時(shí)任務(wù)的關(guān)鍵
/*** {@code @Configuration} class that registers a {@link ScheduledAnnotationBeanPostProcessor}* bean capable of processing Spring's @{@link Scheduled} annotation.** <p>This configuration class is automatically imported when using the* {@link EnableScheduling @EnableScheduling} annotation. See* {@code @EnableScheduling}'s javadoc for complete usage details.** @author Chris Beams* @since 3.1* @see EnableScheduling* @see ScheduledAnnotationBeanPostProcessor*/
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}}
2.對ScheduledAnnotationBeanPostProcessor
的分析
1. 類職責(zé)
- 核心作用:掃描 Spring Bean 中的
@Scheduled
注解方法,將其轉(zhuǎn)換為定時(shí)任務(wù),并注冊到任務(wù)調(diào)度器。
2. 定時(shí)任務(wù)注冊的關(guān)鍵流程
代碼都是經(jīng)過簡化的代碼,實(shí)際上我去看Spring的源碼,發(fā)現(xiàn)代碼都很長,但是整體意思是差不多的
Bean 初始化后掃描注解(關(guān)鍵方法:postProcessAfterInitialization
)
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {// 1. 跳過 AOP 基礎(chǔ)設(shè)施類if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||bean instanceof ScheduledExecutorService) {// Ignore AOP infrastructure such as scoped proxies.return bean;}// 2. 檢查類是否包含 @Scheduled 注解Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);if (!nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, List.of(Scheduled.class, Schedules.class))) {// 3. 反射查找所有帶 @Scheduled 的方法Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, method -> AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class));// 4. 處理每個(gè)帶注解的方法annotatedMethods.forEach((method, scheduledAnnotations) -> scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));}return bean;
}
- 跳過無關(guān) Bean:如 AOP 代理類、
TaskScheduler
本身。 - 反射掃描方法:通過
MethodIntrospector
查找所有帶有@Scheduled
的方法。 - 注解聚合:支持
@Schedules
多注解合并。
解析任務(wù)參數(shù)并注冊(關(guān)鍵方法:processScheduled
)
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {// 1. 創(chuàng)建 Runnable 任務(wù)Runnable runnable = createRunnable(bean, method);// 2. 解析時(shí)間參數(shù)(cron/fixedDelay/fixedRate)if (StringUtils.hasText(cron)) {// 處理 cron 表達(dá)式CronTask task = new CronTask(runnable, new CronTrigger(cron, timeZone));tasks.add(registrar.scheduleCronTask(task));} else if (fixedDelay > 0) {// 處理 fixedDelayFixedDelayTask task = new FixedDelayTask(runnable, fixedDelay, initialDelay);tasks.add(registrar.scheduleFixedDelayTask(task));} else if (fixedRate > 0) {// 處理 fixedRateFixedRateTask task = new FixedRateTask(runnable, fixedRate, initialDelay);tasks.add(registrar.scheduleFixedRateTask(task));}// 3. 注冊任務(wù)到 ScheduledTaskRegistrarsynchronized (scheduledTasks) {scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>()).addAll(tasks);}
}
- 任務(wù)封裝:將方法封裝為
ScheduledMethodRunnable
。 - 時(shí)間參數(shù)解析:
- 支持
cron
、fixedDelay
、fixedRate
三種模式。 - 處理
initialDelay
初始延遲。 - 使用
embeddedValueResolver
解析占位符(如${task.interval}
)。
- 支持
- 任務(wù)注冊:最終任務(wù)被添加到
ScheduledTaskRegistrar
。
啟動任務(wù)調(diào)度(關(guān)鍵方法:finishRegistration
)
private void finishRegistration() {// 1. 配置 TaskScheduler(優(yōu)先級:顯式設(shè)置 > 查找 Bean > 默認(rèn)單線程)if (registrar.getScheduler() == null) {TaskScheduler scheduler = resolveSchedulerBean(beanFactory, TaskScheduler.class, false);registrar.setTaskScheduler(scheduler);}// 2. 調(diào)用 SchedulingConfigurer 自定義配置(擴(kuò)展點(diǎn))List<SchedulingConfigurer> configurers = beanFactory.getBeansOfType(SchedulingConfigurer.class);configurers.forEach(configurer -> configurer.configureTasks(registrar));// 3. 啟動所有注冊的任務(wù)registrar.afterPropertiesSet();
}
- 調(diào)度器解析:
- 默認(rèn)查找名為
taskScheduler
的 Bean。 - 若無則創(chuàng)建單線程調(diào)度器(
Executors.newSingleThreadScheduledExecutor()
)。
- 默認(rèn)查找名為
- 擴(kuò)展點(diǎn):允許通過
SchedulingConfigurer
自定義任務(wù)注冊邏輯。 - 最終啟動:調(diào)用
afterPropertiesSet()
觸發(fā)任務(wù)調(diào)度。
3.ThreadPoolTaskScheduler的剖析
ThreadPoolTaskScheduler
是 Spring 對 Java ScheduledThreadPoolExecutor
的封裝,是 @Scheduled
定時(shí)任務(wù)的底層執(zhí)行引擎。
- 繼承關(guān)系:繼承
ExecutorConfigurationSupport
,實(shí)現(xiàn)TaskScheduler
接口,整合了線程池管理與定時(shí)任務(wù)調(diào)度。 - 底層依賴:基于
ScheduledThreadPoolExecutor
,支持 周期性任務(wù)(fixedRate/fixedDelay)和 動態(tài)觸發(fā)任務(wù)(如 cron 表達(dá)式)。
線程池初始化(關(guān)鍵方法:initializeExecutor
)
同樣,這里和以后的部分也都是偽代碼
@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {// 創(chuàng)建 ScheduledThreadPoolExecutorthis.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);// 配置線程池策略(如取消后立即移除任務(wù))if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor scheduledPoolExecutor) {scheduledPoolExecutor.setRemoveOnCancelPolicy(this.removeOnCancelPolicy);// 其他策略設(shè)置...}return this.scheduledExecutor;
}
這部分是我復(fù)制源碼的,可以清晰的看到,底層就是new了ScheduledThreadPoolExecutor
protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);}
4.ScheduledThreadPoolExecutor的原理分析
核心成員:
- 任務(wù)隊(duì)列:使用
DelayedWorkQueue
(內(nèi)部實(shí)現(xiàn)的小頂堆),按任務(wù)執(zhí)行時(shí)間排序。 - 線程池:復(fù)用
ThreadPoolExecutor
的線程管理機(jī)制,支持核心線程數(shù)和最大線程數(shù)配置。
2. 定時(shí)任務(wù)調(diào)度機(jī)制
所有定時(shí)任務(wù)被封裝為 ScheduledFutureTask
對象,其核心邏輯如下:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {private long time; // 下一次執(zhí)行時(shí)間(納秒)private final long period; // 周期(正數(shù):fixedRate;負(fù)數(shù):fixedDelay)private int heapIndex; // 在 DelayedWorkQueue 中的索引public void run() {if (isPeriodic()) {// 周期性任務(wù):重新計(jì)算下一次執(zhí)行時(shí)間,并重新加入隊(duì)列setNextRunTime();reExecutePeriodic(outerTask);} else {// 一次性任務(wù):直接執(zhí)行super.run();}}
}
- 任務(wù)提交:通過
schedule
、scheduleAtFixedRate
等方法提交任務(wù)。 - 隊(duì)列管理:任務(wù)被封裝為
ScheduledFutureTask
并加入DelayedWorkQueue
。 - 線程喚醒:工作線程 (
Worker
) 從隊(duì)列獲取任務(wù),若任務(wù)未到執(zhí)行時(shí)間,線程進(jìn)入限時(shí)等待(available.awaitNanos(delay)
)。 - 任務(wù)執(zhí)行:到達(dá)執(zhí)行時(shí)間后,線程執(zhí)行任務(wù):
- 固定速率(fixedRate):執(zhí)行完成后,根據(jù)
period
計(jì)算下一次執(zhí)行時(shí)間(time += period
)。 - 固定延遲(fixedDelay):執(zhí)行完成后,根據(jù)當(dāng)前時(shí)間計(jì)算下一次執(zhí)行時(shí)間(
time = now() + (-period)
)。
- 固定速率(fixedRate):執(zhí)行完成后,根據(jù)
- 重新入隊(duì):周期性任務(wù)執(zhí)行后,重新加入隊(duì)列等待下次調(diào)度。
3.DelayedWorkQueue
的簡單剖析
DelayQueue隊(duì)列是一個(gè)延遲隊(duì)列,DelayQueue中存放的元素必須實(shí)現(xiàn)Delayed接口的元素,實(shí)現(xiàn)接口后相當(dāng)于是每個(gè)元素都有個(gè)過期時(shí)間,當(dāng)隊(duì)列進(jìn)行take獲取元素時(shí),先要判斷元素有沒有過期,只有過期的元素才能出隊(duì)操作,沒有過期的隊(duì)列需要等待剩余過期時(shí)間才能進(jìn)行出隊(duì)操作。
DelayQueue隊(duì)列內(nèi)部使用了PriorityQueue優(yōu)先隊(duì)列來進(jìn)行存放數(shù)據(jù),它采用的是二叉堆進(jìn)行的優(yōu)先隊(duì)列,使用ReentrantLock鎖來控制線程同步,由于內(nèi)部元素是采用的PriorityQueue來進(jìn)行存放數(shù)據(jù),所以Delayed接口實(shí)現(xiàn)了Comparable接口,用于比較來控制優(yōu)先級
線程阻塞與喚醒邏輯
(1) 取任務(wù)時(shí)的阻塞(take() 方法)
當(dāng)線程調(diào)用 take()
方法從隊(duì)列中獲取任務(wù)時(shí),若隊(duì)列為空或隊(duì)頭任務(wù)未到期,線程會進(jìn)入阻塞狀態(tài):
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) {available.await(); // 隊(duì)列為空時(shí)無限等待} else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0) return q.poll(); // 任務(wù)已到期,取出執(zhí)行if (leader != null) {available.await(); // 其他線程已為隊(duì)頭任務(wù)等待,本線程無限等待} else {Thread thisThread = Thread.currentThread();leader = thisThread; // 標(biāo)記當(dāng)前線程為“領(lǐng)導(dǎo)者”try {available.awaitNanos(delay); // 限時(shí)等待到期時(shí)間} finally {if (leader == thisThread) leader = null;}}}}} finally {if (leader == null && q.peek() != null) available.signal();lock.unlock();}
}
- 關(guān)鍵邏輯:
leader
線程優(yōu)化:避免多個(gè)線程同時(shí)等待同一任務(wù)到期,僅一個(gè)線程(leader)限時(shí)等待,其他線程無限等待- 限時(shí)等待:通過
available.awaitNanos(delay)
阻塞到任務(wù)到期時(shí)間。
(2) 插入新任務(wù)時(shí)的喚醒(offer() 方法)
當(dāng)新任務(wù)被插入隊(duì)列時(shí),若新任務(wù)成為隊(duì)頭(即最早到期),會觸發(fā)喚醒邏輯:
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e); // 插入任務(wù)并調(diào)整堆結(jié)構(gòu)if (q.peek() == e) { // 新任務(wù)成為隊(duì)頭leader = null;available.signal(); // 喚醒等待線程}return true;} finally {lock.unlock();}
}
- 喚醒條件:
- 插入的任務(wù)成為新的隊(duì)頭(即其到期時(shí)間最早)。
- 調(diào)用
available.signal()
喚醒等待的線程(leader)
或其他線程
(3) 喚醒機(jī)制總結(jié)
- 何時(shí)喚醒:
- 超時(shí)喚醒:等待線程因任務(wù)到期而被 JVM 自動喚醒。
- 插入新任務(wù)喚醒:新任務(wù)的到期時(shí)間早于當(dāng)前隊(duì)頭任務(wù)時(shí),插入線程會觸發(fā)喚醒。
- 喚醒對象:
- 若存在
leader
線程(正在限時(shí)等待隊(duì)頭任務(wù)),優(yōu)先喚醒它。 - 若無
leader
,喚醒任意一個(gè)等待線程
- 若存在