歡迎光臨
每天分享高質量文章

【死磕Sharding-jdbc】—最大努力型事務

點選上方“Java技術驛站”,選擇“置頂公眾號”。

有內涵、有價值的文章第一時間送達!

BASE Transaction

  • Best efforts delivery transaction (已經實現).

  • Try confirm cancel transaction (待定).

Sharding-JDBC由於效能方面的考量,決定不支援強一致性分散式事務。

最大努力送達型事務

在分散式資料庫的場景下,相信對於該資料庫的操作最終一定可以成功,所以透過最大努力反覆嘗試送達操作。

最大努力送達型事務的架構圖

最大努力送達型事務的架構圖

摘自sharding-jdbc使用指南☞事務支援

執行過程有以下幾種情況:

  1. 執行成功–如圖所示,執行結果事件->監聽執行事件->執行成功->清理事務日誌

  2. 執行失敗,同步重試成功–如圖所示,執行結果事件->監聽執行事件->執行失敗->重試執行->執行成功->清理事務日誌

  3. 執行失敗,同步重試失敗,非同步重試成功–如圖所示,執行結果事件->監聽執行事件->執行失敗->重試執行->執行失敗->”非同步送達作業”重試執行->執行成功->清理事務日誌

  4. 執行失敗,同步重試失敗,非同步重試失敗,事務日誌保留—-如圖所示,執行結果事件->監聽執行事件->執行失敗->重試執行->執行失敗->”非同步送達作業”重試執行->執行失敗->… …

說明:不管執行結果如何,執行前事件都會記錄事務日誌;執行事件型別包括3種:BEFOREEXECUTEEXECUTEFAILUREEXECUTESUCCESS;另外,這裡的”同步“不是絕對的同步執行,而是透過google-guava的EventBus釋出事件後,在監聽端判斷是EXECUTEFAILURE事件,最多重試 syncMaxDeliveryTryTimes次;後面對 BestEffortsDeliveryListener的原始碼分析有介紹;這裡的”非同步“透過外掛實現,在後面的文章10. sharding-jdbc原始碼之非同步送達JOB會有分析;

適用場景

  • 根據主鍵刪除資料。

  • 更新記錄永久狀態,如更新通知送達狀態。

使用限制

  • 使用最大努力送達型柔性事務的SQL需要滿足冪等性。

  • INSERT陳述句要求必須包含主鍵,且不能是自增主鍵。

  • UPDATE陳述句要求冪等,不能是UPDATE xxx SET x=x+1

  • DELETE陳述句無要求。

開發示例

  1. // 1\. 配置SoftTransactionConfiguration

  2. SoftTransactionConfiguration transactionConfig = new SoftTransactionConfiguration(dataSource);

  3. // 配置相關請看後面的備註

  4. transactionConfig.setXXX();

  5. // 2\. 初始化SoftTransactionManager

  6. SoftTransactionManager transactionManager = new SoftTransactionManager(transactionConfig);

  7. transactionManager.init();

  8. // 3\. 獲取BEDSoftTransaction

  9. BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery);

  10. // 4\. 開啟事務

  11. transaction.begin(connection);

  12. // 5\. 執行JDBC

  13. /*

  14.    code here

  15. */

  16. *

  17. // 6.關閉事務

  18. transaction.end();

備註:SoftTransactionConfiguration支援的配置以及含義請參考sharding-jdbc使用指南☞事務支援,這段開發示例的程式碼也摘自這裡;也可參考 sharding-jdbc-transaction模組中 com.dangdang.ddframe.rdb.transaction.soft.integrate.SoftTransactionTest如何使用柔性事務,但是這裡的程式碼需要稍作修改,否則只是普通的執行邏輯,不是sharding-jdbc的執行邏輯

  1. @Test

  2. public void bedSoftTransactionTest() throws SQLException {

  3.    SoftTransactionManager transactionManagerFactory = new SoftTransactionManager(getSoftTransactionConfiguration(getShardingDataSource()));

  4.    // 初始化柔性事務管理器

  5.    transactionManagerFactory.init();

  6.    BEDSoftTransaction transactionManager = (BEDSoftTransaction) transactionManagerFactory.getTransaction(SoftTransactionType.BestEffortsDelivery);

  7.    transactionManager.begin(getShardingDataSource().getConnection());

  8.    // 執行INSERT SQL(DML型別),如果執行過程中異常,會在`BestEffortsDeliveryListener`中重試

  9.    insert();

  10.    transactionManager.end();

  11. }

  12. private void insert() {

  13.    String dbSchema = "insert into transaction_test(id, remark) values (2, ?)";

  14.    try (

  15.            // 將.getConnection("db_trans", SQLType.DML)移除,這樣的話,得到的才是ShardingConnection

  16.            Connection conn = getShardingDataSource().getConnection();

  17.            PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {

  18.        preparedStatement.setString(1, "JUST TEST IT .");

  19.        preparedStatement.executeUpdate();

  20.    } catch (final SQLException e) {

  21.        e.printStackTrace();

  22.    }

  23. }

核心原始碼分析

透過3. sharding-jdbc原始碼之路由&執行中對ExecutorEngine的分析可知,sharding-jdbc在執行SQL前後,分別呼叫 EventBusInstance.getInstance().post()提交了事件,那麼呼叫 EventBusInstance.getInstance().register()的地方,就是柔性事務處理的地方,透過檢視原始碼的呼叫關係可知,只有 SoftTransactionManager.init()呼叫了 EventBusInstance.getInstance().register(),所以柔性事務實現的核心在SoftTransactionManager這裡;

柔性事務管理器

柔性事務實現在 SoftTransactionManager中,核心原始碼如下:

  1. public final class SoftTransactionManager {

  2.    // 柔性事務配置物件

  3.    @Getter

  4.    private final SoftTransactionConfiguration transactionConfig;

  5.    /**

  6.     * Initialize B.A.S.E transaction manager.

  7.     * @throws SQLException SQL exception

  8.     */

  9.    public void init() throws SQLException {

  10.        // 初始化註冊最大努力送達型柔性事務監聽器;

  11.        EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());

  12.        if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {

  13.            // 如果事務日誌資料源型別是關係型資料庫,則建立事務日誌表transaction_log

  14.            createTable();

  15.        }

  16.        // 內嵌的最大努力送達型非同步JOB任務,依賴噹噹開源的elastic-job

  17.        if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {

  18.            new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();

  19.        }

  20.    }

  21.    // 從這裡可知建立的事務日誌表表名是transaction_log(所以需要保證每個庫中使用者沒有自定義建立transaction_log表)

  22.    private void createTable() throws SQLException {

  23.        String dbSchema = "CREATE TABLE IF NOT EXISTS `transaction_log` ("

  24.                + "`id` VARCHAR(40) NOT NULL, "

  25.                + "`transaction_type` VARCHAR(30) NOT NULL, "

  26.                + "`data_source` VARCHAR(255) NOT NULL, "

  27.                + "`sql` TEXT NOT NULL, "

  28.                + "`parameters` TEXT NOT NULL, "

  29.                + "`creation_time` LONG NOT NULL, "

  30.                + "`async_delivery_try_times` INT NOT NULL DEFAULT 0, "

  31.                + "PRIMARY KEY (`id`));";

  32.        try (

  33.                Connection conn = transactionConfig.getTransactionLogDataSource().getConnection();

  34.                PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {

  35.            preparedStatement.executeUpdate();

  36.        }

  37.    }

從這段原始碼可知,柔性事務的幾個重點如下,接下來一一根據原始碼進行分析;

  • 事務日誌儲存器;

  • 最大努力送達型事務監聽器;

  • 非同步送達JOB任務;

1.事務日誌儲存器

柔性事務日誌介面類為 TransactionLogStorage.java,有兩個實現類:

  1. RdbTransactionLogStorage:關係型資料庫儲存柔性事務日誌;

  2. MemoryTransactionLogStorage:記憶體儲存柔性事務日誌;

1.1.1事務日誌核心介面

TransactionLogStorage中幾個重要介面在兩個實現類中的實現:

  • void add(TransactionLog):Rdb實現就是把事務日誌TransactionLog 插入到 transaction_log表中,Memory實現就是把事務日誌儲存到 ConcurrentHashMap中;

  • void remove(String id):Rdb實現就是從 transaction_log表中刪除事務日誌,Memory實現從 ConcurrentHashMap中刪除事務日誌;

  • void increaseAsyncDeliveryTryTimes(String id):非同步增加送達重試次數,即TransactionLog中的asyncDeliveryTryTimes+1;Rdb實現就是update transaction_log表中 async_delivery_try_times欄位加1;Memory實現就是TransactionLog中重新給asyncDeliveryTryTimes賦值 newAtomicInteger(transactionLog.getAsyncDeliveryTryTimes()).incrementAndGet()

  • findEligibleTransactionLogs(): 查詢需要處理的事務日誌,條件是:① 非同步處理次數async_delivery_try_times小於引數最大處裡次數maxDeliveryTryTimes,② transaction_typeBestEffortsDelivery,③ 系統當前時間與事務日誌的建立時間差要超過引數maxDeliveryTryDelayMillis,每次最多查詢引數size條;Rdb實現透過sql從transaction_log表中查詢,Memory實現遍歷ConcurrentHashMap匹配符合條件的TransactionLog;

  • boolean processData():Rdb實現執行TransactionLog中的sql,如果執行過程中丟擲異常,那麼呼叫increaseAsyncDeliveryTryTimes()增加送達重試次數並丟擲異常,如果執行成功,刪除事務日誌,並傳回true;Memory實現直接傳回false(因為processData()的目的是執行TransactionLog中的sql,而Memory型別無法觸及資料庫,所以傳回false)

1.1.2事務日誌儲存核心原始碼

RdbTransactionLogStorage.java實現原始碼:

  1. public final class RdbTransactionLogStorage implements TransactionLogStorage {

  2.    private final DataSource dataSource;

  3.    @Override

  4.    public void add(final TransactionLog transactionLog) {

  5.        // 儲存事務日誌到rdb中的sql

  6.        String sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);";

  7.        try (

  8.            Connection conn = dataSource.getConnection();

  9.            PreparedStatement preparedStatement = conn.prepareStatement(sql)) {

  10.            ... ...

  11.            preparedStatement.executeUpdate();

  12.        } catch (final SQLException ex) {

  13.            throw new TransactionLogStorageException(ex);

  14.        }

  15.    }

  16.    @Override

  17.    public void remove(final String id) {

  18.        // 根據id刪除事務日誌的sql

  19.        String sql = "DELETE FROM `transaction_log` WHERE `id`=?;";

  20.        try (

  21.            Connection conn = dataSource.getConnection();

  22.            PreparedStatement preparedStatement = conn.prepareStatement(sql)) {

  23.            preparedStatement.setString(1, id);

  24.            preparedStatement.executeUpdate();

  25.        } catch (final SQLException ex) {

  26.            throw new TransactionLogStorageException(ex);

  27.        }

  28.    }

  29.    @Override

  30.    public List<TransactionLog> findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) {

  31.        List<TransactionLog> result = new ArrayList<>(size);

  32.        // 執行該sql查詢需要處理的事務日誌,最多取size條;

  33.        String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` FROM `transaction_log` WHERE `async_delivery_try_times` AND `transaction_type`=? AND `creation_time` LIMIT ?;";

  34.        try (Connection conn = dataSource.getConnection()) {

  35.            try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {

  36.                ... ...

  37.                preparedStatement.setLong(3, System.currentTimeMillis() - maxDeliveryTryDelayMillis);

  38.                ... ...

  39.            }

  40.        } catch (final SQLException ex) {

  41.            throw new TransactionLogStorageException(ex);

  42.        }

  43.        return result;

  44.    }

  45.    @Override

  46.    public void increaseAsyncDeliveryTryTimes(final String id) {

  47.        // 更新處理次數+1

  48.        String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;";

  49.        try (

  50.            ... ...

  51.        } catch (final SQLException ex) {

  52.            throw new TransactionLogStorageException(ex);

  53.        }

  54.    }

  55.    @Override

  56.    public boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) {

  57.        try (

  58.            Connection conn = connection;

  59.            // 執行TransactionLog中的sql

  60.            PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) {

  61.            for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) {

  62.                preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex));

  63.            }

  64.            preparedStatement.executeUpdate();

  65.        } catch (final SQLException ex) {

  66.            如果丟擲異常,表示執行sql失敗,那麼把增加處理次數並把異常丟擲去;

  67.            increaseAsyncDeliveryTryTimes(transactionLog.getId());

  68.            throw new TransactionCompensationException(ex);

  69.        }

  70.        // 如果沒有丟擲異常,表示執行sql成功,那麼刪除該事務日誌;

  71.        remove(transactionLog.getId());

  72.        return true;

  73.    }

  74. }

1.1.3事務日誌儲存樣例

id transction_type data_source sql parameters creation_time asyncdeliverytry_times
85c141c4-1b8f-4e54-9010-0cc661bb1864 BestEffortsDelivery db_trans insert into transaction_test(id, remark) values (3, ?) ["TEST BY AFEI."] 1517899200989 0

transaction_log中儲存的事務日誌樣例:

id transction_type data_source sql parameters creation_time asyncdeliverytry_times
85c141c4-1b8f-4e54-9010-0cc661bb1864 BestEffortsDelivery db_trans insert into transaction_test(id, remark) values (3, ?) ["TEST BY AFEI."] 1517899200989 0

1.2最大努力送達型事務監聽器

核心原始碼如下:

  1. /**

  2. * Best efforts delivery B.A.S.E transaction listener.

  3. *

  4. * @author zhangliang

  5. */

  6. @Slf4j

  7. public final class BestEffortsDeliveryListener {

  8.    @Subscribe

  9.    @AllowConcurrentEvents

  10.    // 從方法可知,只監聽DML執行事件(DML即資料維護語言,包括INSERT, UPDATE, DELETE)

  11.    public void listen(final DMLExecutionEvent event) {

  12.        // 判斷是否需要繼續,判斷邏輯為:事務存在,並且是BestEffortsDelivery型別事務

  13.        if (!isProcessContinuously()) {

  14.            return;

  15.        }

  16.        // 從柔性事務管理器中得到柔性事務配置

  17.        SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();

  18.        // 得到配置的柔性事務儲存器

  19.        TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());

  20.        // 這裡肯定是最大努力送達型事務(如果不是BEDSoftTransaction,isProcessContinuously()就是false)

  21.        BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();

  22.        // 根據事件型別做不同處理

  23.        switch (event.getEventExecutionType()) {

  24.            case BEFORE_EXECUTE:

  25.                // 如果執行前事件,那麼先儲存事務日誌;

  26.                //TODO for batch SQL need split to 2-level records

  27.                transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),

  28.                        event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));

  29.                return;

  30.            case EXECUTE_SUCCESS:

  31.                // 如果執行成功事件,那麼刪除事務日誌;

  32.                transactionLogStorage.remove(event.getId());

  33.                return;

  34.            case EXECUTE_FAILURE:

  35.                boolean deliverySuccess = false;

  36.                // 如果執行成功事件,最大努力送達型最多嘗試3次(可配置,SoftTransactionConfiguration中的引數syncMaxDeliveryTryTimes);

  37.                for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {

  38.                    // 如果在該Listener中執行成功,那麼傳回,不需要再嘗試

  39.                    if (deliverySuccess) {

  40.                        return;

  41.                    }

  42.                    boolean isNewConnection = false;

  43.                    Connection conn = null;

  44.                    PreparedStatement preparedStatement = null;

  45.                    try {

  46.                        conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);

  47.                        // 透過執行"select 1"判斷conn是否是有效的資料庫連線;如果不是有效的資料庫連線,釋放掉並重新獲取一個資料庫連線;

  48.                        if (!isValidConnection(conn)) {

  49.                            bedSoftTransaction.getConnection().release(conn);

  50.                            conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);

  51.                            isNewConnection = true;

  52.                        }

  53.                        preparedStatement = conn.prepareStatement(event.getSql());

  54.                        //TODO for batch event need split to 2-level records

  55.                        for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {

  56.                            preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));

  57.                        }

  58.                        // 因為只監控DML,所以呼叫executeUpdate()

  59.                        preparedStatement.executeUpdate();

  60.                        // executeUpdate()後能執行到這裡,說明執行成功;根據id刪除事務日誌;

  61.                        deliverySuccess = true;

  62.                        transactionLogStorage.remove(event.getId());

  63.                    } catch (final SQLException ex) {

  64.                        // 如果sql執行有異常,那麼輸出error日誌

  65.                        log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);

  66.                    } finally {

  67.                        close(isNewConnection, conn, preparedStatement);

  68.                    }

  69.                }

  70.                return;

  71.            default:

  72.                // 值支援三種事件型別,對於其他值,丟擲異常

  73.                throw new UnsupportedOperationException(event.getEventExecutionType().toString());

  74.        }

  75.    }

  76. }

BestEffortsDeliveryListener原始碼總結:

  • 執行前,插入事務日誌;

  • 執行成功,則刪除事務日誌;

  • 執行失敗,則最大努力嘗試 syncMaxDeliveryTryTimes次;

1.3 非同步送達JOB任務

  • 部署用於儲存事務日誌的資料庫。

  • 部署用於非同步作業使用的zookeeper。

  • 配置YAML檔案,參照示例檔案config.yaml。

  • 下載並解壓檔案sharding-jdbc-transaction-async-job-$VERSION.tar,透過start.sh指令碼啟動非同步作業。

非同步送達JOB任務基於elastic-job,所以需要部署zookeeper;

END

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖