點選上方“芋道原始碼”,選擇“置頂公眾號”
技術文章第一時間送達!
原始碼精品專欄
摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-config/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!
本文基於 Elastic-Job V2.1.5 版本分享
-
1. 概述
-
2. 作業配置
-
3. 作業配置服務
-
666. 彩蛋
1. 概述
本文主要分享 Elastic-Job-Lite 作業配置。
涉及到主要類的類圖如下( 開啟大圖 ):
-
黃色的類在
elastic-job-common-core
專案裡,為 Elastic-Job-Lite、Elastic-Job-Cloud 公用作業配置類。
另外建議你已經( 非必須 ):
-
閱讀過《官方檔案 —— 配置手冊》
-
執行過 JavaMain.java
你行好事會因為得到贊賞而愉悅
同理,開源專案貢獻者會因為 Star 而更加有動力
為 Elastic-Job 點贊!傳送門
2. 作業配置
一個作業( ElasticJob )的排程,需要配置獨有的一個作業排程器( JobScheduler ),兩者是 1 : 1
的關係。這點大家要註意下,當然下文看程式碼也會看到。
作業排程器的建立可以配置四個引數:
-
註冊中心( CoordinatorRegistryCenter ):用於協調分散式服務。必填。
-
Lite作業配置( LiteJobConfiguration ):必填。
-
作業事件匯流排( JobEventBus ):對作業事件非同步監聽。選填。
-
作業監聽器( ElasticJobListener ):對作業執行前,執行後進行同步監聽。選填。
2.1 註冊中心配置
Elastic-Job 抽象了註冊中心介面( RegistryCenter ),並提供了預設基於 Zookeeper 的註冊中心實現( ZookeeperRegistryCenter )。
ZookeeperRegistryCenter 對應配置類為 ZookeeperConfiguration。該類註釋很完整,可以點選連結直接檢視原始碼,這裡我們重點說下 namespace
屬性。如果你有多個不同 Elastic-Job叢集 時,使用相同 Zookeeper,可以配置不同的 namespace
進行隔離。
註冊中心的初始化,我們會在《Elastic-Job-Lite 原始碼解析 —— 註冊中心》詳細分享。
2.2 Lite作業配置
LiteJobConfiguration 繼承自介面 JobRootConfiguration,作為 Elastic-Job-Lite 裡的作業( LiteJob )配置。Elastic-Job-Cloud 的作業( CloudJob )對應另外的配置類,也實現了該介面。
public final class LiteJobConfiguration implements JobRootConfiguration {
private final JobTypeConfiguration typeConfig;
private final boolean monitorExecution;
private final int maxTimeDiffSeconds;
private final int monitorPort;
private final String jobShardingStrategyClass;
private final int reconcileIntervalMinutes;
private final boolean disabled;
private final boolean overwrite;
// .... 省略部分get方法
public static class Builder {
// .... 省略部分屬性
public final LiteJobConfiguration build() {
return new LiteJobConfiguration(jobConfig, monitorExecution, maxTimeDiffSeconds, monitorPort, jobShardingStrategyClass, reconcileIntervalMinutes, disabled, overwrite);
}
}
}
-
typeConfig
:作業型別配置。必填。 -
monitorExecution
:監控作業執行時狀態。預設為false
。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業執行》詳細分享。每次作業執行時間和間隔時間均非常短的情況,建議不監控作業執行時狀態以提升效率。因為是瞬時狀態,所以無必要監控。請使用者自行增加資料堆積監控。並且不能保證資料重覆選取,應在作業中實現冪等性。
每次作業執行時間和間隔時間均較長的情況,建議監控作業執行時狀態,可保證資料不會重覆選取。 -
monitorPort
:作業監控埠。預設為-1
,不開啟作業監控埠。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業監控服務》詳細分享。建議配置作業監控埠, 方便開發者dump作業資訊。
使用方法: echo “dump” | nc 127.0.0.1 9888 -
maxTimeDiffSeconds
:設定最大容忍的本機與註冊中心的時間誤差秒數。預設為-1
,不檢查時間誤差。選填。 -
jobShardingStrategyClass
:作業分片策略實現類全路徑。預設為使用分配側路。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業分片策略》詳細分享。 -
reconcileIntervalMinutes
:修複作業伺服器不一致狀態服務排程間隔時間,配置為小於1的任意值表示不執行修複。預設為10
。在《Elastic-Job-Lite 原始碼解析 —— 自診斷修複 》詳細分享。 -
disabled
:作業是否禁用執行。預設為false
。選填。 -
overwrite
:設定使用本地作業配置改寫註冊中心的作業配置。預設為false
。選填。建議使用運維平臺( console )配置作業配置,統一管理。 -
Builder 類:使用該類配置 LiteJobConfiguration 屬性,呼叫
#build()
方法最終生成作業配置。參見:《JAVA設計樣式 — 生成器樣式(Builder)》。
2.2.1 作業型別配置
作業型別配置介面( JobTypeConfiguration ) 有三種配置實現,針對三種作業型別:
配置實現 | 作業 | 說明 |
---|---|---|
SimpleJobConfiguration | SimpleJob | 簡單作業。例如:訂單過期作業 |
DataflowJobConfiguration | DataflowJob | 資料流作業。TODO:筆者暫時未瞭解流式處理資料,不誤人子弟 |
ScriptJobConfiguration | ScriptJob | 指令碼作業。例如:呼叫 shell 指令碼備份資料庫作業 |
三種配置類屬性對比如:
屬性 | SimpleJob | DataflowJob | ScriptJob | 說明 |
---|---|---|---|---|
coreConfig |
√ | √ | √ | 作業核心配置 |
jobType |
JobType.SIMPLE | JobType.DATAFLOW | JobType.SCRIPT | 作業型別 |
jobClass |
√ | √ | √ (預設:ScriptJob.class) | 作業實現類全路徑 |
streamingProcess |
√ | 是否流式處理資料 | ||
scriptCommandLine |
√ | 指令碼型作業執行命令列 |
作業型別配置不僅僅適用於 Elastic-Job-Lite,也適用於 Elastic-Job-Cloud。
2.2.2 作業核心配置
作業核心配置( JobCoreConfiguration ),我們可以看到在每種作業型別配置都有該屬性( coreConfig
)。
public final class JobCoreConfiguration {
private final String jobName;
private final String cron;
private final int shardingTotalCount;
private final String shardingItemParameters;
private final String jobParameter;
private final boolean failover;
private final boolean misfire;
private final String description;
private final JobProperties jobProperties;
public static class Builder {
// .... 省略部分屬性
public final JobCoreConfiguration build() {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobName), "jobName can not be empty.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(cron), "cron can not be empty.");
Preconditions.checkArgument(shardingTotalCount > 0, "shardingTotalCount should larger than zero.");
return new JobCoreConfiguration(jobName, cron, shardingTotalCount, shardingItemParameters, jobParameter, failover, misfire, description, jobProperties);
}
}
}
-
jobName
:作業名稱。必填。 -
cron
:cron運算式,用於控製作業觸發時間。必填。 -
shardingTotalCount
:作業分片總數。如果一個作業啟動超過作業分片總數的節點,只有shardingTotalCount
會執行作業。必填。在《Elastic-Job-Lite 原始碼解析 —— 作業分片策略 》詳細分享。 -
shardingItemParameters
:分片序列號和引數。選填。分片序列號和引數用等號分隔,多個鍵值對用逗號分隔
分片序列號從0開始,不可大於或等於作業分片總數
如:
0=a,1=b,2=c -
jobParameter
:作業自定義引數。選填。作業自定義引數,可透過傳遞該引數為作業排程的業務方法傳參,用於實現帶引數的作業
例:每次獲取的資料量、作業實體從資料庫讀取的主鍵等 -
failover
:是否開啟作業執行失效轉移。開啟表示如果作業在一次作業執行中途宕機,允許將該次未完成的作業在另一作業節點上補償執行。預設為false
。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業失效轉移 》詳細分享。 -
misfire
:是否開啟錯過作業重新執行。預設為true
。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業執行 》詳細分享。 -
description
:作業描述。選填。 -
jobProperties
:作業屬性配置。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業執行 》詳細分享。public final class JobProperties {
private EnumMap
map = new EnumMap<>(JobPropertiesEnum.class); public enum JobPropertiesEnum {
/**
* 作業異常處理器.
*/
JOB_EXCEPTION_HANDLER("job_exception_handler", JobExceptionHandler.class, DefaultJobExceptionHandler.class.getCanonicalName()),
/**
* 執行緒池服務處理器.
*/
EXECUTOR_SERVICE_HANDLER("executor_service_handler", ExecutorServiceHandler.class, DefaultExecutorServiceHandler.class.getCanonicalName());
private final String key;
private final Class> classType;
private final String defaultValue;}
} -
JOB_EXCEPTION_HANDLER
:用於擴充套件異常處理類。 -
EXECUTOR_SERVICE_HANDLER
:用於擴充套件作業處理執行緒池類。 -
透過這個屬性,我們可以自定義每個作業的異常處理和執行緒池服務。
2.3 作業事件配置
透過作業事件配置( JobEventConfiguration ),實現對作業事件的非同步監聽、處理。在《Elastic-Job-Lite 原始碼解析 —— 作業事件追蹤》詳細分享。
2.4 作業監聽器
透過配置作業監聽器( ElasticJobListener ),實現對作業執行的同步監聽、處理。在《Elastic-Job-Lite 原始碼解析 —— 作業監聽器》詳細分享。
3. 作業配置服務
多個 Elastic-Job-Lite 使用相同註冊中心和相同 namespace
組成叢集,實現高可用。叢集中,使用作業配置服務( ConfigurationService ) 共享作業配置。
public final class ConfigurationService {
/**
* 時間服務
*/
private final TimeService timeService;
/**
* 作業節點資料訪問類
*/
private final JobNodeStorage jobNodeStorage;
public ConfigurationService(final CoordinatorRegistryCenter regCenter, final String jobName) {
jobNodeStorage = new JobNodeStorage(regCenter, jobName);
timeService = new TimeService();
}
}
-
JobNodeStorage,封裝註冊中心,提供儲存服務。在《Elastic-Job-Lite 原始碼解析 —— 作業資料儲存》詳細分享。
-
TimeService,時間服務,提供當前時間查詢。
public final class TimeService {
/**
* 獲取當前時間的毫秒數.
*
* @return 當前時間的毫秒數
*/
public long getCurrentMillis() {
return System.currentTimeMillis();
}}
3.1 讀取作業配置
/**
* 讀取作業配置.
*
* @param fromCache 是否從快取中讀取
* @return 作業配置
*/
public LiteJobConfiguration load(final boolean fromCache) {
String result;
if (fromCache) { // 快取
result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
if (null == result) {
result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
}
} else {
result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
}
return LiteJobConfigurationGsonFactory.fromJson(result);
}
3.2 持久化作業配置
/**
* 持久化分散式作業配置資訊.
*
* @param liteJobConfig 作業配置
*/
public void persist(final LiteJobConfiguration liteJobConfig) {
checkConflictJob(liteJobConfig);
if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
}
}
-
呼叫
#checkConflictJob(...)
方法校驗註冊中心儲存的作業配置的作業實現類全路徑(jobClass
)和當前的是否相同,如果不同,則認為是衝突,不允許儲存:private void checkConflictJob(final LiteJobConfiguration liteJobConfig) {
OptionalliteJobConfigFromZk = find();
if (liteJobConfigFromZk.isPresent()
&& !liteJobConfigFromZk.get().getTypeConfig().getJobClass().equals(liteJobConfig.getTypeConfig().getJobClass())) { // jobClass 是否相同
throw new JobConfigurationException("Job conflict with register center. The job '%s' in register center's class is '%s', your job class is '%s'",
liteJobConfig.getJobName(), liteJobConfigFromZk.get().getTypeConfig().getJobClass(), liteJobConfig.getTypeConfig().getJobClass());
}
} -
當註冊中心未儲存該作業配置 或者 當前作業配置允許替換註冊中心作業配置(
overwrite = true
)時,持久化作業配置。
3.3 校驗本機時間是否合法
/**
* 檢查本機與註冊中心的時間誤差秒數是否在允許範圍.
*
* @throws JobExecutionEnvironmentException 本機與註冊中心的時間誤差秒數不在允許範圍所丟擲的異常
*/
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
if (-1 == maxTimeDiffSeconds) {
return;
}
long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
if (timeDiff > maxTimeDiffSeconds * 1000L) {
throw new JobExecutionEnvironmentException(
"Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
}
}
-
Elastic-Job-Lite 作業觸發是依賴本機時間,相同叢集使用註冊中心時間為基準,校驗本機與註冊中心的時間誤差是否在允許範圍內(
LiteJobConfiguration.maxTimeDiffSeconds
)。
666. 彩蛋
Elastic-Job-Lite 原始碼解析系列第一篇文章,希望大家多多支援,預計全部更新完會有 15+ 篇。Elastic-Job-Cloud 原始碼系列後續也會更新。
道友,分享一波微信朋友圈支援支援支援,可好?
如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。
目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:
01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽
05. 拓展機制 SPI
06. 執行緒池
07. 服務暴露 Export
08. 服務取用 Refer
09. 註冊中心 Registry
10. 動態編譯 Compile
11. 動態代理 Proxy
12. 服務呼叫 Invoke
13. 呼叫特性
14. 過濾器 Filter
15. NIO 伺服器
16. P2P 伺服器
17. HTTP 伺服器
18. 序列化 Serialization
19. 叢集容錯 Cluster
20. 優雅停機
21. 日誌適配
22. 狀態檢查
23. 監控中心 Monitor
24. 管理中心 Admin
25. 運維命令 QOS
26. 鏈路追蹤 Tracing
…
一共 60 篇++
原始碼不易↓↓↓↓↓
點贊支援老艿艿↓↓