點選上方“Java技術驛站”,選擇“置頂公眾號”。
有內涵、有價值的文章第一時間送達!
最大努力送達型非同步JOB任務
當最大努力送達型監聽器多次失敗嘗試後,把任務交給最大努力送達型非同步JOB任務處理,非同步多次嘗試處理;核心原始碼在模組 sharding-jdbc-transaction-async-job
中。該模組是一個獨立非同步處理模組,使用者決定是否需要啟用,原始碼比較少,大概看一下原始碼結構:
原始碼結構
resouces目錄下的指令碼和dubbo非常相似(作者應該也看過dubbo原始碼,哈),start.sh&stop.sh;分別是服務啟動指令碼和服務停止指令碼;根據start.sh指令碼可知,該模組的主方法是BestEffortsDeliveryJobMain:
CONTAINER_MAIN=com.dangdang.ddframe.rdb.transaction.soft.bed.BestEffortsDeliveryJobMain
nohup java -classpath $CONF_DIR:$LIB_DIR:. $CONTAINER_MAIN >/dev/null 2>&1 &
Main方法的核心原始碼如下:
public final class BestEffortsDeliveryJobMain {
public static void main(final String[] args) throws Exception {
try (InputStreamReader inputStreamReader = new InputStreamReader(BestEffortsDeliveryJobMain.class.getResourceAsStream("/conf/config.yaml"), "UTF-8")) {
BestEffortsDeliveryConfiguration config = new Yaml(new Constructor(BestEffortsDeliveryConfiguration.class)).loadAs(inputStreamReader, BestEffortsDeliveryConfiguration.class);
new BestEffortsDeliveryJobFactory(config).init();
}
}
}
由原始碼可知,主配置檔案是
config.yaml
;將該檔案解析為BestEffortsDeliveryConfiguration,然後呼叫newBestEffortsDeliveryJobFactory(config).init()
;
config.yaml配置檔案中job相關配置內容如下:
jobConfig:
#作業名稱
name: bestEffortsDeliveryJob
#觸發作業的cron運算式--每5s重試一次
cron: 0/5 * * * * ?
#每次作業獲取的事務日誌最大數量
transactionLogFetchDataCount: 100
#事務送達的最大嘗試次數.
maxDeliveryTryTimes: 3
#執行送達事務的延遲毫秒數,早於此間隔時間的入庫事務才會被作業執行,其SQL為 where *** AND `creation_time`< (now() - maxDeliveryTryDelayMillis),即至少60000ms,即一分鐘前入庫的事務日誌才會被拉取出來;
maxDeliveryTryDelayMillis: 60000
maxDeliveryTryDelayMillis:60000
這個配置也可以理解為60s內的transaction_log不處理;
BestEffortsDeliveryJobFactory核心原始碼:
@RequiredArgsConstructor
public final class BestEffortsDeliveryJobFactory {
// 這個屬性賦值透過有參構造方法進行賦值--new BestEffortsDeliveryJobFactory(config),就是透過`config.yaml`配置的屬性
private final BestEffortsDeliveryConfiguration bedConfig;
/**
* BestEffortsDeliveryJobMain中呼叫該init()方法,初始化最大努力嘗試型非同步JOB,該JOB基於elastic-job;
* Initialize best efforts delivery job.
*/
public void init() {
// 根據config.yaml中配置的zkConfig節點,得到協調排程中心CoordinatorRegistryCenter
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(createZookeeperConfiguration(bedConfig));
// 排程中心初始化
regCenter.init();
// 構造elastic-job排程任務
JobScheduler jobScheduler = new JobScheduler(regCenter, createBedJobConfiguration(bedConfig));
jobScheduler.setField("bedConfig", bedConfig);
jobScheduler.setField("transactionLogStorage", TransactionLogStorageFactory.createTransactionLogStorage(new RdbTransactionLogDataSource(bedConfig.getDefaultTransactionLogDataSource())));
jobScheduler.init();
}
// 根據該方法可知,建立的是BestEffortsDeliveryJob
private JobConfiguration createBedJobConfiguration(final BestEffortsDeliveryConfiguration bedJobConfig) {
// 根據config.yaml中配置的jobConfig節點得到job配置資訊,且指定job型別為BestEffortsDeliveryJob
JobConfiguration result = new JobConfiguration(bedJobConfig.getJobConfig().getName(), BestEffortsDeliveryJob.class, 1, bedJobConfig.getJobConfig().getCron());
result.setFetchDataCount(bedJobConfig.getJobConfig().getTransactionLogFetchDataCount());
result.setOverwrite(true);
return result;
}
BestEffortsDeliveryJob核心原始碼:
@Slf4j
public class BestEffortsDeliveryJob extends AbstractIndividualThroughputDataFlowElasticJob<TransactionLog> {
@Setter
private BestEffortsDeliveryConfiguration bedConfig;
@Setter
private TransactionLogStorage transactionLogStorage;
@Override
public List<TransactionLog> fetchData(final JobExecutionMultipleShardingContext context) {
// 從transaction_log表中抓取最多100條事務日誌(相關引數都在config.yaml中jobConfig節點下)
return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(),
bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis());
}
@Override
public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) {
try (
Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()) {
// 呼叫事務日誌儲存器的processData()進行處理
transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes());
} catch (final SQLException | TransactionCompensationException ex) {
log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes() + 1,
bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));
return false;
}
return true;
}
}
END