點選上方“Java技術驛站”,選擇“置頂公眾號”。
有內涵、有價值的文章第一時間送達!
BASE Transaction
-
Best efforts delivery transaction (已經實現).
-
Try confirm cancel transaction (待定).
Sharding-JDBC由於效能方面的考量,決定不支援強一致性分散式事務。
最大努力送達型事務
在分散式資料庫的場景下,相信對於該資料庫的操作最終一定可以成功,所以透過最大努力反覆嘗試送達操作。
最大努力送達型事務的架構圖
最大努力送達型事務的架構圖
摘自sharding-jdbc使用指南☞事務支援
執行過程有以下幾種情況:
-
執行成功–如圖所示,執行結果事件->監聽執行事件->執行成功->清理事務日誌
-
執行失敗,同步重試成功–如圖所示,執行結果事件->監聽執行事件->執行失敗->重試執行->執行成功->清理事務日誌
-
執行失敗,同步重試失敗,非同步重試成功–如圖所示,執行結果事件->監聽執行事件->執行失敗->重試執行->執行失敗->”非同步送達作業”重試執行->執行成功->清理事務日誌
-
執行失敗,同步重試失敗,非同步重試失敗,事務日誌保留—-如圖所示,執行結果事件->監聽執行事件->執行失敗->重試執行->執行失敗->”非同步送達作業”重試執行->執行失敗->… …
說明:不管執行結果如何,執行前事件都會記錄事務日誌;執行事件型別包括3種:BEFOREEXECUTE,EXECUTEFAILURE和EXECUTESUCCESS;另外,這裡的”同步“不是絕對的同步執行,而是透過google-guava的EventBus釋出事件後,在監聽端判斷是EXECUTEFAILURE事件,最多重試
syncMaxDeliveryTryTimes
次;後面對BestEffortsDeliveryListener
的原始碼分析有介紹;這裡的”非同步“透過外掛實現,在後面的文章10. sharding-jdbc原始碼之非同步送達JOB會有分析;
適用場景
-
根據主鍵刪除資料。
-
更新記錄永久狀態,如更新通知送達狀態。
使用限制
-
使用最大努力送達型柔性事務的SQL需要滿足冪等性。
-
INSERT陳述句要求必須包含主鍵,且不能是自增主鍵。
-
UPDATE陳述句要求冪等,不能是UPDATE xxx SET x=x+1
-
DELETE陳述句無要求。
開發示例
// 1\. 配置SoftTransactionConfiguration
SoftTransactionConfiguration transactionConfig = new SoftTransactionConfiguration(dataSource);
// 配置相關請看後面的備註
transactionConfig.setXXX();
// 2\. 初始化SoftTransactionManager
SoftTransactionManager transactionManager = new SoftTransactionManager(transactionConfig);
transactionManager.init();
// 3\. 獲取BEDSoftTransaction
BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery);
// 4\. 開啟事務
transaction.begin(connection);
// 5\. 執行JDBC
/*
code here
*/
*
// 6.關閉事務
transaction.end();
備註:SoftTransactionConfiguration支援的配置以及含義請參考sharding-jdbc使用指南☞事務支援,這段開發示例的程式碼也摘自這裡;也可參考
sharding-jdbc-transaction
模組中com.dangdang.ddframe.rdb.transaction.soft.integrate.SoftTransactionTest
如何使用柔性事務,但是這裡的程式碼需要稍作修改,否則只是普通的執行邏輯,不是sharding-jdbc的執行邏輯:
@Test
public void bedSoftTransactionTest() throws SQLException {
SoftTransactionManager transactionManagerFactory = new SoftTransactionManager(getSoftTransactionConfiguration(getShardingDataSource()));
// 初始化柔性事務管理器
transactionManagerFactory.init();
BEDSoftTransaction transactionManager = (BEDSoftTransaction) transactionManagerFactory.getTransaction(SoftTransactionType.BestEffortsDelivery);
transactionManager.begin(getShardingDataSource().getConnection());
// 執行INSERT SQL(DML型別),如果執行過程中異常,會在`BestEffortsDeliveryListener`中重試
insert();
transactionManager.end();
}
private void insert() {
String dbSchema = "insert into transaction_test(id, remark) values (2, ?)";
try (
// 將.getConnection("db_trans", SQLType.DML)移除,這樣的話,得到的才是ShardingConnection
Connection conn = getShardingDataSource().getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
preparedStatement.setString(1, "JUST TEST IT .");
preparedStatement.executeUpdate();
} catch (final SQLException e) {
e.printStackTrace();
}
}
核心原始碼分析
透過3. sharding-jdbc原始碼之路由&執行中對ExecutorEngine的分析可知,sharding-jdbc在執行SQL前後,分別呼叫 EventBusInstance.getInstance().post()
提交了事件,那麼呼叫 EventBusInstance.getInstance().register()
的地方,就是柔性事務處理的地方,透過檢視原始碼的呼叫關係可知,只有 SoftTransactionManager.init()
呼叫了 EventBusInstance.getInstance().register()
,所以柔性事務實現的核心在SoftTransactionManager這裡;
柔性事務管理器
柔性事務實現在 SoftTransactionManager
中,核心原始碼如下:
public final class SoftTransactionManager {
// 柔性事務配置物件
@Getter
private final SoftTransactionConfiguration transactionConfig;
/**
* Initialize B.A.S.E transaction manager.
* @throws SQLException SQL exception
*/
public void init() throws SQLException {
// 初始化註冊最大努力送達型柔性事務監聽器;
EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
// 如果事務日誌資料源型別是關係型資料庫,則建立事務日誌表transaction_log
createTable();
}
// 內嵌的最大努力送達型非同步JOB任務,依賴噹噹開源的elastic-job
if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {
new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();
}
}
// 從這裡可知建立的事務日誌表表名是transaction_log(所以需要保證每個庫中使用者沒有自定義建立transaction_log表)
private void createTable() throws SQLException {
String dbSchema = "CREATE TABLE IF NOT EXISTS `transaction_log` ("
+ "`id` VARCHAR(40) NOT NULL, "
+ "`transaction_type` VARCHAR(30) NOT NULL, "
+ "`data_source` VARCHAR(255) NOT NULL, "
+ "`sql` TEXT NOT NULL, "
+ "`parameters` TEXT NOT NULL, "
+ "`creation_time` LONG NOT NULL, "
+ "`async_delivery_try_times` INT NOT NULL DEFAULT 0, "
+ "PRIMARY KEY (`id`));";
try (
Connection conn = transactionConfig.getTransactionLogDataSource().getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
preparedStatement.executeUpdate();
}
}
從這段原始碼可知,柔性事務的幾個重點如下,接下來一一根據原始碼進行分析;
事務日誌儲存器;
最大努力送達型事務監聽器;
非同步送達JOB任務;
1.事務日誌儲存器
柔性事務日誌介面類為 TransactionLogStorage.java
,有兩個實現類:
-
RdbTransactionLogStorage:關係型資料庫儲存柔性事務日誌;
-
MemoryTransactionLogStorage:記憶體儲存柔性事務日誌;
1.1.1事務日誌核心介面
TransactionLogStorage中幾個重要介面在兩個實現類中的實現:
-
void add(TransactionLog):Rdb實現就是把事務日誌TransactionLog 插入到
transaction_log
表中,Memory實現就是把事務日誌儲存到ConcurrentHashMap
中; -
void remove(String id):Rdb實現就是從
transaction_log
表中刪除事務日誌,Memory實現從ConcurrentHashMap
中刪除事務日誌; -
void increaseAsyncDeliveryTryTimes(String id):非同步增加送達重試次數,即TransactionLog中的asyncDeliveryTryTimes+1;Rdb實現就是update
transaction_log
表中async_delivery_try_times
欄位加1;Memory實現就是TransactionLog中重新給asyncDeliveryTryTimes賦值newAtomicInteger(transactionLog.getAsyncDeliveryTryTimes()).incrementAndGet()
; -
findEligibleTransactionLogs(): 查詢需要處理的事務日誌,條件是:①
非同步處理次數async_delivery_try_times小於引數最大處裡次數maxDeliveryTryTimes
,②transaction_type是BestEffortsDelivery
,③系統當前時間與事務日誌的建立時間差要超過引數maxDeliveryTryDelayMillis
,每次最多查詢引數size條;Rdb實現透過sql從transaction_log表中查詢,Memory實現遍歷ConcurrentHashMap匹配符合條件的TransactionLog; -
boolean processData():Rdb實現執行TransactionLog中的sql,如果執行過程中丟擲異常,那麼呼叫increaseAsyncDeliveryTryTimes()增加送達重試次數並丟擲異常,如果執行成功,刪除事務日誌,並傳回true;Memory實現直接傳回false(因為processData()的目的是執行TransactionLog中的sql,而Memory型別無法觸及資料庫,所以傳回false)
1.1.2事務日誌儲存核心原始碼
RdbTransactionLogStorage.java
實現原始碼:
public final class RdbTransactionLogStorage implements TransactionLogStorage {
private final DataSource dataSource;
@Override
public void add(final TransactionLog transactionLog) {
// 儲存事務日誌到rdb中的sql
String sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);";
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
... ...
preparedStatement.executeUpdate();
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
}
@Override
public void remove(final String id) {
// 根據id刪除事務日誌的sql
String sql = "DELETE FROM `transaction_log` WHERE `id`=?;";
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
preparedStatement.setString(1, id);
preparedStatement.executeUpdate();
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
}
@Override
public List<TransactionLog> findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) {
List<TransactionLog> result = new ArrayList<>(size);
// 執行該sql查詢需要處理的事務日誌,最多取size條;
String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` FROM `transaction_log` WHERE `async_delivery_try_times` AND `transaction_type`=? AND `creation_time` LIMIT ?;";
try (Connection conn = dataSource.getConnection()) {
try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
... ...
preparedStatement.setLong(3, System.currentTimeMillis() - maxDeliveryTryDelayMillis);
... ...
}
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
return result;
}
@Override
public void increaseAsyncDeliveryTryTimes(final String id) {
// 更新處理次數+1
String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;";
try (
... ...
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
}
@Override
public boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) {
try (
Connection conn = connection;
// 執行TransactionLog中的sql
PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) {
for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) {
preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex));
}
preparedStatement.executeUpdate();
} catch (final SQLException ex) {
如果丟擲異常,表示執行sql失敗,那麼把增加處理次數並把異常丟擲去;
increaseAsyncDeliveryTryTimes(transactionLog.getId());
throw new TransactionCompensationException(ex);
}
// 如果沒有丟擲異常,表示執行sql成功,那麼刪除該事務日誌;
remove(transactionLog.getId());
return true;
}
}
1.1.3事務日誌儲存樣例
id | transction_type | data_source | sql | parameters | creation_time | asyncdeliverytry_times |
---|---|---|---|---|---|---|
85c141c4-1b8f-4e54-9010-0cc661bb1864 | BestEffortsDelivery | db_trans | insert into transaction_test(id, remark) values (3, ?) | ["TEST BY AFEI."] | 1517899200989 | 0 |
transaction_log中儲存的事務日誌樣例:
id | transction_type | data_source | sql | parameters | creation_time | asyncdeliverytry_times |
---|---|---|---|---|---|---|
85c141c4-1b8f-4e54-9010-0cc661bb1864 | BestEffortsDelivery | db_trans | insert into transaction_test(id, remark) values (3, ?) | ["TEST BY AFEI."] | 1517899200989 | 0 |
1.2最大努力送達型事務監聽器
核心原始碼如下:
/**
* Best efforts delivery B.A.S.E transaction listener.
*
* @author zhangliang
*/
@Slf4j
public final class BestEffortsDeliveryListener {
@Subscribe
@AllowConcurrentEvents
// 從方法可知,只監聽DML執行事件(DML即資料維護語言,包括INSERT, UPDATE, DELETE)
public void listen(final DMLExecutionEvent event) {
// 判斷是否需要繼續,判斷邏輯為:事務存在,並且是BestEffortsDelivery型別事務
if (!isProcessContinuously()) {
return;
}
// 從柔性事務管理器中得到柔性事務配置
SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
// 得到配置的柔性事務儲存器
TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
// 這裡肯定是最大努力送達型事務(如果不是BEDSoftTransaction,isProcessContinuously()就是false)
BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
// 根據事件型別做不同處理
switch (event.getEventExecutionType()) {
case BEFORE_EXECUTE:
// 如果執行前事件,那麼先儲存事務日誌;
//TODO for batch SQL need split to 2-level records
transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
return;
case EXECUTE_SUCCESS:
// 如果執行成功事件,那麼刪除事務日誌;
transactionLogStorage.remove(event.getId());
return;
case EXECUTE_FAILURE:
boolean deliverySuccess = false;
// 如果執行成功事件,最大努力送達型最多嘗試3次(可配置,SoftTransactionConfiguration中的引數syncMaxDeliveryTryTimes);
for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
// 如果在該Listener中執行成功,那麼傳回,不需要再嘗試
if (deliverySuccess) {
return;
}
boolean isNewConnection = false;
Connection conn = null;
PreparedStatement preparedStatement = null;
try {
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
// 透過執行"select 1"判斷conn是否是有效的資料庫連線;如果不是有效的資料庫連線,釋放掉並重新獲取一個資料庫連線;
if (!isValidConnection(conn)) {
bedSoftTransaction.getConnection().release(conn);
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
isNewConnection = true;
}
preparedStatement = conn.prepareStatement(event.getSql());
//TODO for batch event need split to 2-level records
for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
}
// 因為只監控DML,所以呼叫executeUpdate()
preparedStatement.executeUpdate();
// executeUpdate()後能執行到這裡,說明執行成功;根據id刪除事務日誌;
deliverySuccess = true;
transactionLogStorage.remove(event.getId());
} catch (final SQLException ex) {
// 如果sql執行有異常,那麼輸出error日誌
log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
} finally {
close(isNewConnection, conn, preparedStatement);
}
}
return;
default:
// 值支援三種事件型別,對於其他值,丟擲異常
throw new UnsupportedOperationException(event.getEventExecutionType().toString());
}
}
}
BestEffortsDeliveryListener原始碼總結:
執行前,插入事務日誌;
執行成功,則刪除事務日誌;
執行失敗,則最大努力嘗試
syncMaxDeliveryTryTimes
次;
1.3 非同步送達JOB任務
-
部署用於儲存事務日誌的資料庫。
-
部署用於非同步作業使用的zookeeper。
-
配置YAML檔案,參照示例檔案config.yaml。
-
下載並解壓檔案sharding-jdbc-transaction-async-job-$VERSION.tar,透過start.sh指令碼啟動非同步作業。
非同步送達JOB任務基於elastic-job,所以需要部署zookeeper;
END