用身份證備案網站外貿營銷渠道
?
我們在前面文章中說到過Quartz涉及到的線程,但是散落在幾篇文章中,不好找。而Quartz涉及到的線程對于理解Quartz也比較重要,所以今天專門提取出來單獨說一下。
Quartz中的主要線程:
-
任務調度線程QuartzSchedulerThread
-
任務執(zhí)行線程
-
Misfire處理線程
-
ClusterManager線程
任務調度線程 QuartzSchedulerThread
QuartzSchedulerThread是任務調度線程,他的職責是對滿足觸發(fā)條件(nextFireTimer到了)的注冊到JobStore的Trigger分配給可用的任務執(zhí)行線程去執(zhí)行。
QuartzSchedulerThread啟動
任務調度線程QuartzSchedulerThread是在調度器Scheduler創(chuàng)建的時候啟動的。
應用層通過以下調用創(chuàng)建Scheduler:
Scheduler sche = new StdSchedulerFactory().getScheduler();
一般情況下通過StdSchedulerFactory構建Scheduler,getScheduler首先嘗試從SchedulerRepository獲取Schedule,首次運行獲取不到,則通過instantiate()方法獲取。
public Scheduler getScheduler() throws SchedulerException {if (cfg == null) {initialize();}SchedulerRepository schedRep = SchedulerRepository.getInstance();Scheduler sched = schedRep.lookup(getSchedulerName());if (sched != null) {if (sched.isShutdown()) {schedRep.remove(getSchedulerName());} else {return sched;}} sched = instantiate();return sched;
}
instantiate()方法特別特別特別長,幾乎就是在這里完成Quartz所有相關組件的初始化的。
其中會創(chuàng)建QuartzScheduler,之后會將QuartzScheduler包裝到stdScheduler中存入SchedulerRepository中。
instantiate()創(chuàng)建QuartzScheduler是調用的構造方法:
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)throws SchedulerException {this.resources = resources;if (resources.getJobStore() instanceof JobListener) {addInternalJobListener((JobListener)resources.getJobStore());}this.schedThread = new QuartzSchedulerThread(this, resources); ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();schedThreadExecutor.execute(this.schedThread);if (idleWaitTime > 0) { this.schedThread.setIdleWaitTime(idleWaitTime);}jobMgr = new ExecutingJobsManager();addInternalJobListener(jobMgr);errLogger = new ErrorLogger();addInternalSchedulerListener(errLogger);signaler = new SchedulerSignalerImpl(this, this.schedThread);getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}
可以看到構造方法中創(chuàng)建了QuartzSchedulerThread對象,之后獲取ThreadExecutor(一般情況下為DefaultThreadExecutor)并通過調用其execute方法啟動QuartzSchedulerThread線程:
public class DefaultThreadExecutor implements ThreadExecutor {public void initialize() {}public void execute(Thread thread) {thread.start();}}
QuartzSchedulerThread的運行
其實就是他的run方法,我們也大概分析過,其主要邏輯是:
-
從作業(yè)執(zhí)行線程池獲取availThreadCount,也就是當前可用的線程數
-
調用JobStore的acquireNextTriggers方法,獲取特定短時間(idleWaitTime,默認30秒)內可能需要被觸發(fā)的,數量不超過availThreadCount的觸發(fā)器
-
調用JobStore的triggersFired方法對獲取到的可能需要被觸發(fā)的觸發(fā)器進行二次加工,再次獲取到最終的待觸發(fā)器結果集
-
循環(huán)處理最終的待處理觸發(fā)器結果集中的每一個需要被觸發(fā)的觸發(fā)器
-
用JobRunShell包裝該觸發(fā)器,送給線程池執(zhí)行該觸發(fā)器關聯的作業(yè)
好了,作業(yè)調度線程QuartzSchedulerThread我們就基本搞清楚了。
作業(yè)執(zhí)行線程
Quartz的作業(yè)執(zhí)行線程是放在線程池中進行管理的,默認是SimpleTreadPool,有關SimpleThreadPool我們前面專門有一篇文章介紹過,這里就不再贅述了。
作業(yè)執(zhí)行線程和作業(yè)調度線程一樣,也是在作業(yè)調度器Scheduler創(chuàng)建后立即啟動,這個過程同樣也是在StdSchedulerFactory的instantiate()方法中完成的:
instantiate()創(chuàng)建SimpleThreadPool之后會調用SimpleThreadPool的initialize方法,根據配置文件指定的任務執(zhí)行線程數完成工作線程的初始化和啟動。比如配置文件設置為10則初始化10個工作線程并逐個啟動。作業(yè)執(zhí)行線程的初始化及啟動的詳細過程請參考Quartz - SimpleThreadPool。
MisfireHandler線程
如果你的項目使用RAMJobStore,而不是JDBC-based JobStore(指需要持久化到數據庫的JobStore),那么就不存在Misfire處理線程。
因為RAMJobStore在處理正常觸發(fā)的過程中順便就處理了Misfire,所以就不再需要其他處理機制了,這部分我們在前面的文章中也分析過Quartz - Misfire (for RAMJobstore)。
JDBC-based JobStore在處理正常觸發(fā)的時候只獲取未錯過觸發(fā)時間的觸發(fā)器,對于錯過觸發(fā)時間的、也就是Misfire的觸發(fā)器就需要另外的機制來處理。
Misfire處理線程就是Quartz采用JDBC-based JobStore的情況下用來處理錯過觸發(fā)時機的觸發(fā)器的線程。
MisfireHandler啟動
MisfireHandler定義在JobStoreSupport類中,JobStoreSupport是JobStore的JDBC-based JobStore的虛擬類,Quartz主要提供了兩個基于JDBC的JobStore的實現:JobStoreTX、JobStoreCMT。JDBC-based JobStore詳細內容請參考Quartz - JDBC-Based JobStore,今天主要分析MisfireHandler。
我們在應用中創(chuàng)建任務調度器Scheduler后需要調用他的start方法:
Scheduler sche = new StdSchedulerFactory().getScheduler();sche.scheduleJob(jobDetail,trigger);sche.start();
這個start方法會調用到JobStore的schedulerStarted()方法,如果我們應用中采用的是JDBC-based JobStore的話,會調用到JobStoreSupport的schedulerStarted(),其中會創(chuàng)建MisfireHandler之后調用MisfireHandler的initialize():
misfireHandler = new MisfireHandler();if(initializersLoader != null)misfireHandler.setContextClassLoader(initializersLoader);misfireHandler.initialize();
Misfire的initialize將創(chuàng)建好的MisFireHandle線程交給ThreadExecutor啟動。
public void initialize() {ThreadExecutor executor = getThreadExecutor();executor.execute(MisfireHandler.this);}
MisfireHandle的運行
也就是MisfireHandle的run方法。處理邏輯和RAMJobStore處理misfired trigger的邏輯類似,只不過MisfireHandle的所有處理邏輯都是通過數據庫操作完成的。
JDBC-Based JobStore對應的表結構我們會找機會專門分析,這里就不詳細展開了。
MisfireHandle的run方法的主要邏輯為:
-
從數據庫中獲取在WAITING(等待執(zhí)行)狀態(tài)、下次執(zhí)行時間小于msifiredtime(當前時間 - MisfireThreshold)的觸發(fā)器,也就是錯過觸發(fā)時間的觸發(fā)器
-
為了避免存在大量misfired trigger的情況下,一次處理太多數據影響其他正常觸發(fā)器的執(zhí)行,MisfireHandle線程每次僅獲取部分而不是全部misfired trigger(參數maxToRecoverAtATime指定,默認為20)
-
對獲取到的每一個錯過執(zhí)行時間的觸發(fā)器(misfired trigger),調用觸發(fā)器的updateAfterMisfire方法獲取下次執(zhí)行時間,updateAfterMisfire方法我們在上一篇講Misfire處理策略的文章中說過,就是根據觸發(fā)器的處理策略獲取下次執(zhí)行時間
-
updateAfterMisfire方法執(zhí)行后,獲取到的觸發(fā)器的下次執(zhí)行時間如果不為空的話,更新到數據庫中,等待正常的任務執(zhí)行線程調度執(zhí)行
ClusterManager線程
與MisfiredHandle一樣,ClusterManager線程也是JDBC-Based JobStore特有的。
顧名思義,ClusterManager線程與集群有關系,JDBC-Based JobStore是可以支持Quartz的集群部署的,在集群環(huán)境下,Quartz服務節(jié)點可能會down機、掉線,從而影響任務的執(zhí)行,ClusterManager線程就是負責檢查Quartz服務節(jié)點的在線狀態(tài)的,如果發(fā)生掉線后,將該服務節(jié)點負責的觸發(fā)器交給其他服務節(jié)點來處理。
具體邏輯等到我們分析完Quartz Cluster之后補充。