網(wǎng)站開發(fā)費(fèi)用稅常德網(wǎng)站設(shè)計(jì)
一 概述
??Elastic-Job 最開始只有一個(gè) elastic-job-core 的項(xiàng)目,定位輕量級、無中心化,最核心的服務(wù)就是支持彈性擴(kuò)容和數(shù)據(jù)分片!從 2.X 版本以后,主要分為 Elastic-Job-Lite 和 Elastic-Job-Cloud 兩個(gè)子項(xiàng)目。esjbo官網(wǎng)地址
- Elastic-Job-Lite 定位為輕量級 無 中 心 化 解 決 方 案 , 使 用jar 包 的 形 式 提 供 分 布 式 任 務(wù) 的 協(xié) 調(diào) 服 務(wù) 。
- Elastic-Job-Cloud 使用 Mesos + Docker 的解決方案,額外提供資源治理、應(yīng)用分發(fā)以及進(jìn)程隔離等服務(wù)(跟 Lite 的區(qū)別只是部署方式不同,他們使用相同的 API,只要開發(fā)一次)。
今天我們主要介紹的是Elastic-Job-Lite,最主要的功能特性如下:
- 分布式調(diào)度協(xié)調(diào)
- 彈性擴(kuò)容縮容
- 失效轉(zhuǎn)移
- 錯(cuò)過執(zhí)行作業(yè)重觸發(fā)
- 作業(yè)分片一致性,保證同一分片在分布式環(huán)境中僅一個(gè)執(zhí)行實(shí)例
- 自診斷并修復(fù)分布式不穩(wěn)定造成的問
- 支持并行調(diào)度
- 支持作業(yè)生命周期操作
- 豐富的作業(yè)類型
- Spring整合以及命名空間提供
- 運(yùn)維平臺
??應(yīng)用在各自的節(jié)點(diǎn)執(zhí)行任務(wù),通過 zookeeper 注冊中心協(xié)調(diào)。節(jié)點(diǎn)注冊、節(jié)點(diǎn)選舉、任務(wù)分片、監(jiān)聽都在 E-Job 的代碼中完成。下圖是官網(wǎng)提供得架構(gòu)圖。
作業(yè)啟動流程圖:
作業(yè)執(zhí)行流程圖:
1.1 基本概念
1.1.1 分片概念
? 任務(wù)的分布式執(zhí)行,需要將一個(gè)任務(wù)拆分為多個(gè)獨(dú)立的任務(wù)項(xiàng),然后由分布式的服務(wù)器分別執(zhí)行某一個(gè)或幾個(gè)分片項(xiàng)。
例如:有一個(gè)遍歷數(shù)據(jù)庫某張表的作業(yè),現(xiàn)有2臺服務(wù)器。為了快速的執(zhí)行作業(yè),那么每臺服務(wù)器應(yīng)執(zhí)行作業(yè)的50%。 為滿足此需求,可將作業(yè)分成2片,每臺服務(wù)器執(zhí)行1片。作業(yè)遍歷數(shù)據(jù)的邏輯應(yīng)為:服務(wù)器A遍歷ID以奇數(shù)結(jié)尾的數(shù)據(jù);服務(wù)器B遍歷ID以偶數(shù)結(jié)尾的數(shù)據(jù)。 如果分成10片,則作業(yè)遍歷數(shù)據(jù)的邏輯應(yīng)為:每片分到的分片項(xiàng)應(yīng)為ID%10,而服務(wù)器A被分配到分片項(xiàng)0,1,2,3,4;服務(wù)器B被分配到分片項(xiàng)5,6,7,8,9,直接的結(jié)果就是服務(wù)器A遍歷ID以0-4結(jié)尾的數(shù)據(jù);服務(wù)器B遍歷ID以5-9結(jié)尾的數(shù)據(jù)。
1.1.2 分片策略
shardingTotalCount:作業(yè)分片總數(shù)。jobShardingStrategyClass:作業(yè)分片策略實(shí)現(xiàn)類全路徑。shardingItemParameters:分片序列號和個(gè)性化參數(shù)對照表。分片序列號和參數(shù)用等號分隔, 多個(gè)鍵值對用逗號分隔。分片序列號從0開始, 不可大于或等于作業(yè)分片總數(shù)。分片的維度通常有狀態(tài)(state)、類型(accountType)、id分區(qū)等,需要按照業(yè)務(wù)合適選取。
elasticJob提供了如下三種分片策略,
- AverageAllocationJobShardingStrategy :(默認(rèn)的分片策略) 基于平均分配算法的分片策略。
如果有3臺服務(wù)器, 分成9片, 則每臺服務(wù)器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
如果有3臺服務(wù)器, 分成8片, 則每臺服務(wù)器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
如果有3臺服務(wù)器, 分成10片, 則每臺服務(wù)器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]
- OdevitySortByNameJobShardingStrategy:根據(jù)作業(yè)名的哈希值奇偶數(shù)決定IP升降序算法的分片策略。
作業(yè)名的哈希值為奇數(shù)則IP升序.
作業(yè)名的哈希值為偶數(shù)則IP降序.
用于不同的作業(yè)平均分配負(fù)載至不同的服務(wù)器.
eg:
如果有3臺服務(wù)器, 分成2片, 作業(yè)名稱的哈希值為奇數(shù), 則每臺服務(wù)器分到的分片是: 1=[0], 2=[1], 3=[].
如果有3臺服務(wù)器, 分成2片, 作業(yè)名稱的哈希值為偶數(shù), 則每臺服務(wù)器分到的分片是: 3=[0], 2=[1], 1=[].
- RotateServerByNameJobShardingStrategy:根據(jù)作業(yè)名的哈希值對服務(wù)器列表進(jìn)行輪轉(zhuǎn)的分片策略。
1.1.3 分片項(xiàng)與業(yè)務(wù)處理解耦
Elastic-Job并不直接提供數(shù)據(jù)處理的功能,框架只會將分片項(xiàng)分配至各個(gè)運(yùn)行中的作業(yè)服務(wù)器,開發(fā)者需要自行處理分片項(xiàng)與真實(shí)數(shù)據(jù)的對應(yīng)關(guān)系。
1.1.4 個(gè)性化參數(shù)
個(gè)性化參數(shù)即shardingItemParameter,可以和分片項(xiàng)匹配對應(yīng)關(guān)系,用于將分片項(xiàng)的數(shù)字轉(zhuǎn)換為更加可讀的業(yè)務(wù)代碼。
? 例如:按照地區(qū)水平拆分?jǐn)?shù)據(jù)庫,數(shù)據(jù)庫A是北京的數(shù)據(jù);數(shù)據(jù)庫B是上海的數(shù)據(jù);數(shù)據(jù)庫C是廣州的數(shù)據(jù)。 如果僅按照分片項(xiàng)配置,開發(fā)者需要了解0表示北京;1表示上海;2表示廣州。 合理使用個(gè)性化參數(shù)可以讓代碼更可讀,如果配置為0=北京,1=上海,2=廣州,那么代碼中直接使用北京,上海,廣州的枚舉值即可完成分片項(xiàng)和業(yè)務(wù)邏輯的對應(yīng)關(guān)系。
1.1.5 任務(wù)類型
- Simple類型作業(yè):意為簡單實(shí)現(xiàn),未經(jīng)任何封裝的類型。需實(shí)現(xiàn)SimpleJob接口。該接口僅提供單一方法用于覆蓋,此方法將定時(shí)執(zhí)行。與Quartz原生接口相似,但提供了彈性擴(kuò)縮容和分片等功能。
- Dataflow類型:用于處理數(shù)據(jù)流,需實(shí)現(xiàn)DataflowJob接口。該接口提供2個(gè)方法可供覆蓋,分別用于抓取(fetchData)和處理(processData)數(shù)據(jù)
- Script類型:作業(yè)意為腳本類型作業(yè),支持shell,python,perl等所有類型腳本。只需通過控制臺或代碼配置scriptCommandLine即可,無需編碼。執(zhí)行腳本路徑可包含參數(shù),參數(shù)傳遞完畢后,作業(yè)框架會自動追加最后一個(gè)參數(shù)為作業(yè)運(yùn)行時(shí)信息。
1.2 核心概念
1.2.1 分布式調(diào)度
Elastic-Job-Lite并無作業(yè)調(diào)度中心節(jié)點(diǎn),而是基于部署作業(yè)框架的程序在到達(dá)相應(yīng)時(shí)間點(diǎn)時(shí)各自觸發(fā)調(diào)度。注冊中心僅用于作業(yè)注冊和監(jiān)控信息存儲。而主作業(yè)節(jié)點(diǎn)僅用于處理分片和清理等功能。
1.2.2 作業(yè)高可用
Elastic-Job-Lite提供最安全的方式執(zhí)行作業(yè)。將分片總數(shù)設(shè)置為1,并使用多于1臺的服務(wù)器執(zhí)行作業(yè),作業(yè)將會以1主n從的方式執(zhí)行。
? 一旦執(zhí)行作業(yè)的服務(wù)器崩潰,等待執(zhí)行的服務(wù)器將會在下次作業(yè)啟動時(shí)替補(bǔ)執(zhí)行。開啟失效轉(zhuǎn)移功能效果更好,可以保證在本次作業(yè)執(zhí)行時(shí)崩潰,備機(jī)立即啟動替補(bǔ)執(zhí)行。
1.2.3 最大限度利用資源
Elastic-Job-Lite也提供最靈活的方式,最大限度的提高執(zhí)行作業(yè)的吞吐量。將分片項(xiàng)設(shè)置為大于服務(wù)器的數(shù)量,最好是大于服務(wù)器倍數(shù)的數(shù)量,作業(yè)將會合理的利用分布式資源,動態(tài)的分配分片項(xiàng)。
二 應(yīng)用安裝
2.1 zookeeper 安裝
elastic-job-lite,是直接依賴 zookeeper 的,因此在開發(fā)之前我們需要先準(zhǔn)備好對應(yīng)的 zookeeper 環(huán)境,關(guān)于 zookeeper 的安裝過程,就不多說了,非常簡單,網(wǎng)上都有教程!
2.2 elastic-job-lite-console 安裝
elastic-job-lite-console,主要是一個(gè)任務(wù)作業(yè)可視化界面管理系統(tǒng)。
可以單獨(dú)部署,與平臺不關(guān),主要是通過配置注冊中心和數(shù)據(jù)源來抓取數(shù)據(jù)。
獲取的方式也很簡單,直接訪問https://github.com/apache/shardingsphere-elasticjob地址,然后切換到2.1.5的版本號,然后執(zhí)行mvn clean install進(jìn)行打包,獲取對應(yīng)的安裝包將其解壓,進(jìn)行bin文件夾啟動服務(wù)即可!
如果你的網(wǎng)速像蝸牛一樣的慢,還有一個(gè)辦法就是從這個(gè)地址https://gitee.com/elasticjob/elastic-job獲取對應(yīng)的源碼!
啟動服務(wù)后,在瀏覽器訪問http://127.0.0.1:8899,輸入賬戶、密碼(都是root)即可進(jìn)入控制臺頁面,類似如下界面!
進(jìn)入之后,將上文所在的 zookeeper 注冊中心進(jìn)行配置,包括數(shù)據(jù)庫 mysql 的數(shù)據(jù)源也可以配置一下!
2.3 創(chuàng)建工程(自集成方式)
2.3.1 引入依賴和配置
本文采用springboot來搭建工程為例,創(chuàng)建工程并添加elastic-job-lite依賴!
<!-- 使用springframework自定義命名空間時(shí)引入 --><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version></dependency><!-- elastic-job-lite 默認(rèn)依賴如下的模塊,否則ZookeeperRegistryCenter無法注冊 --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.10.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.10.0</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>18.0</version></dependency>
在配置文件application.properties中提前配置好 zookeeper 注冊中心相關(guān)信息!
#zookeeper config
zookeeper.serverList=127.0.0.1:2181
zookeeper.namespace=example-elastic-job-test
2.3.2 新建 ZookeeperConfig 配置類
@Configuration
@ConditionalOnExpression("'${zookeeper.serverList}'.length() > 0")
public class ZookeeperConfig {/*** zookeeper 配置* @return*/@Bean(initMethod = "init")public ZookeeperRegistryCenter zookeeperRegistryCenter(@Value("${zookeeper.serverList}") String serverList, @Value("${zookeeper.namespace}") String namespace){return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList,namespace));}}
2.3.4 新建Simple任務(wù)處理類
- 編寫一個(gè)SimpleJob接口的實(shí)現(xiàn)類MySimpleJob,當(dāng)前工作主要是打印一條日志。
@Slf4j
public class MySimpleJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {log.info(String.format("Thread ID: %s, 作業(yè)分片總數(shù): %s, " +"當(dāng)前分片項(xiàng): %s.當(dāng)前參數(shù): %s," +"作業(yè)名稱: %s.作業(yè)自定義參數(shù): %s",Thread.currentThread().getId(),shardingContext.getShardingTotalCount(),shardingContext.getShardingItem(),shardingContext.getShardingParameter(),shardingContext.getJobName(),shardingContext.getJobParameter()));}
}
- 創(chuàng)建一個(gè)MyElasticJobListener任務(wù)監(jiān)聽器,用于監(jiān)聽MySimpleJob的任務(wù)執(zhí)行情況。
@Slf4j
public class MyElasticJobListener implements ElasticJobListener {private long beginTime = 0;@Overridepublic void beforeJobExecuted(ShardingContexts shardingContexts) {beginTime = System.currentTimeMillis();log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));}@Overridepublic void afterJobExecuted(ShardingContexts shardingContexts) {long endTime = System.currentTimeMillis();log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime);}
}
- 創(chuàng)建一個(gè)MySimpleJobConfig類,將MySimpleJob其注入到zookeeper。
@Configuration
public class MySimpleJobConfig {/*** 任務(wù)名稱*/@Value("${simpleJob.mySimpleJob.name}")private String mySimpleJobName;/*** cron表達(dá)式*/@Value("${simpleJob.mySimpleJob.cron}")private String mySimpleJobCron;/*** 作業(yè)分片總數(shù)*/@Value("${simpleJob.mySimpleJob.shardingTotalCount}")private int mySimpleJobShardingTotalCount;/*** 作業(yè)分片參數(shù)*/@Value("${simpleJob.mySimpleJob.shardingItemParameters}")private String mySimpleJobShardingItemParameters;/*** 自定義參數(shù)*/@Value("${simpleJob.mySimpleJob.jobParameters}")private String mySimpleJobParameters;@Autowiredprivate ZookeeperRegistryCenter registryCenter;@Beanpublic MySimpleJob mySimpleJob() {return new MySimpleJob();}@Bean(initMethod = "init")public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) {//配置任務(wù)監(jiān)聽器MyElasticJobListener elasticJobListener = new MyElasticJobListener();return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), elasticJobListener);}private LiteJobConfiguration getLiteJobConfiguration() {// 定義作業(yè)核心配置JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount).shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build();// 定義SIMPLE類型配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName());// 定義Lite作業(yè)根配置LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();return simpleJobRootConfig;}
}
- 在配置文件application.properties中配置好對應(yīng)的mySimpleJob參數(shù)!
#elastic job
#simpleJob類型的job
simpleJob.mySimpleJob.name=mySimpleJob
simpleJob.mySimpleJob.cron=0/15 * * * * ?
simpleJob.mySimpleJob.shardingTotalCount=3
simpleJob.mySimpleJob.shardingItemParameters=0=a,1=b,2=c
simpleJob.mySimpleJob.jobParameters=helloWorld
5.運(yùn)行程序,查看效果
在上圖demo中,配置的分片數(shù)為3,這個(gè)時(shí)候會有3個(gè)線程進(jìn)行同時(shí)執(zhí)行任務(wù),因?yàn)槎际窃谝慌_機(jī)器上執(zhí)行的,這個(gè)任務(wù)被執(zhí)行來3次,下面修改一下端口配置,創(chuàng)建三個(gè)相同的服務(wù)實(shí)例,在看看效果如下:
2.3.5 新建DataFlowJob類型作業(yè)
- 創(chuàng)建一個(gè)DataflowJob類型的實(shí)現(xiàn)類MyDataFlowJob。
@Slf4j
public class MyDataFlowJob implements DataflowJob<String> {private boolean flag = false;@Overridepublic List<String> fetchData(ShardingContext shardingContext) {log.info("開始獲取數(shù)據(jù)");if (flag) {return null;}return Arrays.asList("qingshan", "jack", "seven");}@Overridepublic void processData(ShardingContext shardingContext, List<String> data) {for (String val : data) {// 處理完數(shù)據(jù)要移除掉,不然就會一直跑,處理可以在上面的方法里執(zhí)行。這里采用 flaglog.info("開始處理數(shù)據(jù):" + val);}flag = true;}
}
- 接著創(chuàng)建MyDataFlowJob的配置類,將其注入到zookeeper注冊中心。
Configuration
public class MyDataFlowJobConfig {/*** 任務(wù)名稱*/@Value("${dataflowJob.myDataflowJob.name}")private String jobName;/*** cron表達(dá)式*/@Value("${dataflowJob.myDataflowJob.cron}")private String jobCron;/*** 作業(yè)分片總數(shù)*/@Value("${dataflowJob.myDataflowJob.shardingTotalCount}")private int jobShardingTotalCount;/*** 作業(yè)分片參數(shù)*/@Value("${dataflowJob.myDataflowJob.shardingItemParameters}")private String jobShardingItemParameters;/*** 自定義參數(shù)*/@Value("${dataflowJob.myDataflowJob.jobParameters}")private String jobParameters;@Autowiredprivate ZookeeperRegistryCenter registryCenter;@Beanpublic MyDataFlowJob myDataFlowJob() {return new MyDataFlowJob();}@Bean(initMethod = "init")public JobScheduler dataFlowJobScheduler(final MyDataFlowJob myDataFlowJob) {MyElasticJobListener elasticJobListener = new MyElasticJobListener();return new SpringJobScheduler(myDataFlowJob, registryCenter, getLiteJobConfiguration(), elasticJobListener);}private LiteJobConfiguration getLiteJobConfiguration() {// 定義作業(yè)核心配置JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount).shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build();// 定義DATAFLOW類型配置DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, MyDataFlowJob.class.getCanonicalName(), false);// 定義Lite作業(yè)根配置LiteJobConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).overwrite(true).build();return dataflowJobRootConfig;}
}
- 最后,在配置文件application.properties中配置好對應(yīng)的myDataflowJob參數(shù)!
#dataflow類型的job
dataflowJob.myDataflowJob.name=myDataflowJob
dataflowJob.myDataflowJob.cron=0/15 * * * * ?
dataflowJob.myDataflowJob.shardingTotalCount=1
dataflowJob.myDataflowJob.shardingItemParameters=0=a,1=b,2=c
dataflowJob.myDataflowJob.jobParameters=myDataflowJobParamter
- 運(yùn)行程序,查看效果
需要注意的地方是,如果配置的是流式處理類型,它會不停的拉取數(shù)據(jù)、處理數(shù)據(jù),在拉取的時(shí)候,如果返回為空,就不會處理數(shù)據(jù)!如果配置的是非流式處理類型,和上面介紹的simpleJob類型,處理一樣!
2.3.6 新建ScriptJob類型作業(yè)
- ScriptJob 類型的任務(wù)配置:主要是用于定時(shí)執(zhí)行某個(gè)腳本,一般用的比較少!因?yàn)槟繕?biāo)是腳本,沒有執(zhí)行的任務(wù),所以無需編寫任務(wù)作業(yè)類型!只需要編寫一個(gè)ScriptJob類型的配置類即可,命令是echo 'Hello World !內(nèi)容!
@Configuration
public class MyScriptJobConfig {/*** 任務(wù)名稱*/@Value("${scriptJob.myScriptJob.name}")private String jobName;/*** cron表達(dá)式*/@Value("${scriptJob.myScriptJob.cron}")private String jobCron;/*** 作業(yè)分片總數(shù)*/@Value("${scriptJob.myScriptJob.shardingTotalCount}")private int jobShardingTotalCount;/*** 作業(yè)分片參數(shù)*/@Value("${scriptJob.myScriptJob.shardingItemParameters}")private String jobShardingItemParameters;/*** 自定義參數(shù)*/@Value("${scriptJob.myScriptJob.jobParameters}")private String jobParameters;@Autowiredprivate ZookeeperRegistryCenter registryCenter;@Bean(initMethod = "init")public JobScheduler scriptJobScheduler() {MyElasticJobListener elasticJobListener = new MyElasticJobListener();return new JobScheduler(registryCenter, getLiteJobConfiguration(), elasticJobListener);}private LiteJobConfiguration getLiteJobConfiguration() {// 定義作業(yè)核心配置JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount).shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build();// 定義SCRIPT類型配置ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "echo 'Hello World !'");// 定義Lite作業(yè)根配置LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).overwrite(true).build();return scriptJobRootConfig;}
}
- 在配置文件application.properties中配置好對應(yīng)的myScriptJob參數(shù)!
#script類型的job
scriptJob.myScriptJob.name=myScriptJob
scriptJob.myScriptJob.cron=0/15 * * * * ?
scriptJob.myScriptJob.shardingTotalCount=3
scriptJob.myScriptJob.shardingItemParameters=0=a,1=b,2=c
scriptJob.myScriptJob.jobParameters=myScriptJobParamter
- 運(yùn)行程序,看看效果
2.3.7 任務(wù)狀態(tài)持久化
可能有的人會發(fā)出疑問,elastic-job是如何存儲數(shù)據(jù)的,用ZooInspector客戶端鏈接zookeeper注冊中心,你發(fā)現(xiàn)對應(yīng)的任務(wù)配置被存儲到相應(yīng)的樹根上!而具體作業(yè)任務(wù)執(zhí)行軌跡和狀態(tài)結(jié)果是不會存儲到zookeeper,需要我們在項(xiàng)目中通過數(shù)據(jù)源方式進(jìn)行持久化!
將任務(wù)狀態(tài)持久化到數(shù)據(jù)庫配置過程也很簡單,只需要在對應(yīng)的配置類上注入數(shù)據(jù)源即可,以MySimpleJobConfig為例,代碼如下:
@Configuration
public class MySimpleJobConfig {/*** 任務(wù)名稱*/@Value("${simpleJob.mySimpleJob.name}")private String mySimpleJobName;/*** cron表達(dá)式*/@Value("${simpleJob.mySimpleJob.cron}")private String mySimpleJobCron;/*** 作業(yè)分片總數(shù)*/@Value("${simpleJob.mySimpleJob.shardingTotalCount}")private int mySimpleJobShardingTotalCount;/*** 作業(yè)分片參數(shù)*/@Value("${simpleJob.mySimpleJob.shardingItemParameters}")private String mySimpleJobShardingItemParameters;/*** 自定義參數(shù)*/@Value("${simpleJob.mySimpleJob.jobParameters}")private String mySimpleJobParameters;@Autowiredprivate ZookeeperRegistryCenter registryCenter;@Autowiredprivate DataSource dataSource;;@Beanpublic MySimpleJob stockJob() {return new MySimpleJob();}@Bean(initMethod = "init")public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) {//添加事件數(shù)據(jù)源配置JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);MyElasticJobListener elasticJobListener = new MyElasticJobListener();return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), jobEventConfig, elasticJobListener);}private LiteJobConfiguration getLiteJobConfiguration() {// 定義作業(yè)核心配置JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount).shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build();// 定義SIMPLE類型配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName());// 定義Lite作業(yè)根配置LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();return simpleJobRootConfig;}
}
同時(shí),需要在配置文件application.properties中配置好對應(yīng)的datasource參數(shù)!
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/example-elastic-job-test
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
運(yùn)行程序,然后在elastic-job-lite-console控制臺配置對應(yīng)的數(shù)據(jù)源!
注意:該功能Mysql數(shù)據(jù)庫的話只支持5.7,其他版本則無法連接
最后,點(diǎn)擊【作業(yè)軌跡】即可查看對應(yīng)作業(yè)執(zhí)行情況!
2.4 創(chuàng)建工程(start集成方式)
具體 參考(推薦):https://blog.csdn.net/baidu_21349635/article/details/106317774
三 常見問題集錦
參考文獻(xiàn):
https://blog.csdn.net/qq_34350584/article/details/119754657
https://blog.csdn.net/lvlei19911108/article/details/118633115
https://blog.csdn.net/cicada_smile/article/details/104810958