歡迎光臨
每天分享高質量文章

分散式事務 TCC-Transaction 原始碼分析 —— 事務恢復

摘要: 原創出處 http://www.iocoder.cn/TCC-Transaction/transaction-recovery/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 TCC-Transaction 1.2.3.3 正式版

  • 1. 概述

  • 2. 事務重試配置

  • 3. 事務重試定時任務

  • 4. 異常事務恢復

    • 4.1 載入異常事務集合

    • 4.2 恢復異常事務集合

  • 666. 彩蛋


友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群討論技術和原始碼。
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群討論技術和原始碼。
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群討論技術和原始碼。


1. 概述

本文分享 TCC 恢復。主要涉及如下二個 package 路徑下的類:

  • org.mengyun.tcctransaction.recover

    • RecoverConfig,事務恢復配置介面

    • TransactionRecovery,事務恢復邏輯

  • org.mengyun.tcctransaction.spring.recover :

    • DefaultRecoverConfig,預設事務恢復配置實現

    • RecoverScheduledJob,事務恢復定時任務

本文涉及到的類關係如下圖( 開啟大圖 ):

在《TCC-Transaction 原始碼分析 —— 事務儲存器》中,事務資訊被持久化到外部的儲存器中。事務儲存是事務恢復的基礎。透過讀取外部儲存器中的異常事務,定時任務會按照一定頻率對事務進行重試,直到事務完成或超過最大重試次數。

你行好事會因為得到贊賞而愉悅 
同理,開源專案貢獻者會因為 Star 而更加有動力 
為 TCC-Transaction 點贊!傳送門

ps:筆者假設你已經閱讀過《tcc-transaction 官方檔案 —— 使用指南1.2.x》。

2. 事務重試配置

org.mengyun.tcctransaction.recover.RecoverConfig,事務恢復配置介面,實現程式碼如下:

public interface RecoverConfig {

   /**
    * @return 最大重試次數
    */

   int getMaxRetryCount();

   /**
    * @return 恢復間隔時間,單位:秒
    */

   int getRecoverDuration();

   /**
    * @return cron 運算式
    */

   String getCronExpression();

   /**
    * @return 延遲取消異常集合
    */

   Set> getDelayCancelExceptions();

   /**
    * 設定延遲取消異常集合
    *
    * @param delayRecoverExceptions 延遲取消異常集合
    */

   void setDelayCancelExceptions(Set> delayRecoverExceptions);

}
  • #getMaxRetryCount(),單個事務恢復最大重試次數。超過最大重試次數後,目前僅打出錯誤日誌,下文會看到實現。

  • #getRecoverDuration(),單個事務恢復重試的間隔時間,單位:秒。

  • #getCronExpression(),定時任務 cron 運算式。

  • #getDelayCancelExceptions(),延遲取消異常集合。


org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig預設事務恢復配置實現,實現程式碼如下:

public class DefaultRecoverConfig implements RecoverConfig {

   public static final RecoverConfig INSTANCE = new DefaultRecoverConfig();

   /**
    * 最大重試次數
    */

   private int maxRetryCount = 30;

   /**
    * 恢復間隔時間,單位:秒
    */

   private int recoverDuration = 120;

   /**
    * cron 運算式
    */

   private String cronExpression = "0 */1 * * * ?";

   /**
    * 延遲取消異常集合
    */

   private Set> delayCancelExceptions = new HashSet>();

   public DefaultRecoverConfig() {
       delayCancelExceptions.add(OptimisticLockException.class);
       delayCancelExceptions.add(SocketTimeoutException.class);
   }

   @Override
   public void setDelayCancelExceptions(Set> delayCancelExceptions) {
       this.delayCancelExceptions.addAll(delayCancelExceptions);
   }

}
  • maxRetryCount,單個事務恢復最大重試次數 為 30。

  • recoverDuration,單個事務恢復重試的間隔時間為 120 秒。

  • cronExpression,定時任務 cron 運算式為 "0 */1 * * * ?",每分鐘執行一次。如果你希望定時任務執行的更頻繁,可以修改 cron 運算式,例如 0/30 * * * * ?,每 30 秒執行一次。

  • delayCancelExceptions,延遲取消異常集合。在 DefaultRecoverConfig 構造方法裡,預先添加了 OptimisticLockException / SocketTimeoutException 。

    • 官方解釋:事務恢復的疑問

    • 這塊筆者還有一些疑問,如果有別的可能性導致這個情況,麻煩告知下筆者。謝謝。

    • 官方解釋:為什麼 tcc 事務切麵中對樂觀鎖與socket超時異常不做回滾處理,只拋異常?

    • 針對 SocketTimeoutException :try 階段,本地參與者呼叫遠端參與者( 遠端服務,例如 Dubbo,Http 服務),遠端參與者 try 階段的方法邏輯執行時間較長,超過 Socket 等待時長,發生 SocketTimeoutException,如果立刻執行事務回滾,遠端參與者 try 的方法未執行完成,可能導致 cancel 的方法實際未執行( try 的方法未執行完成,資料庫事務【非 TCC 事務】未提交,cancel 的方法讀取資料時發現未變更,導致方法實際未執行,最終 try 的方法執行完後,提交資料庫事務【非 TCC 事務】,較為極端 ),最終引起資料不一致。在事務恢復時,會對這種情況的事務進行取消回滾,如果此時遠端參與者的 try 的方法還未結束,還是可能發生資料不一致。

    • 針對 OptimisticLockException :還是 SocketTimeoutException 的情況,事務恢復間隔時間小於 Socket 超時時間,此時事務恢復呼叫遠端參與者取消回滾事務,遠端參與者下次更新事務時,會因為樂觀鎖更新失敗,丟擲 OptimisticLockException。如果 CompensableTransactionInterceptor 此時立刻取消回滾,可能會和定時任務的取消回滾衝突,因此統一交給定時任務處理。

3. 事務重試定時任務

org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob,事務恢復定時任務,基於 Quartz 實現排程,不斷不斷不斷執行事務恢復。實現程式碼如下:

public class RecoverScheduledJob {

   private TransactionRecovery transactionRecovery;

   private TransactionConfigurator transactionConfigurator;

   private Scheduler scheduler;

   public void init() {
       try {
           // Quartz JobDetail
           MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
           jobDetail.setTargetObject(transactionRecovery);
           jobDetail.setTargetMethod("startRecover");
           jobDetail.setName("transactionRecoveryJob");
           jobDetail.setConcurrent(false); // 禁止併發
           jobDetail.afterPropertiesSet();
           // Quartz CronTriggerFactoryBean
           CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
           cronTrigger.setBeanName("transactionRecoveryCronTrigger");
           cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
           cronTrigger.setJobDetail(jobDetail.getObject());
           cronTrigger.afterPropertiesSet();
           // 啟動任務排程
           scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
           // 啟動 Quartz Scheduler
           scheduler.start();
       } catch (Exception e) {
           throw new SystemException(e);
       }
   }
}
  • 呼叫 MethodInvokingJobDetailFactoryBean#setConcurrent(false) 方法,禁用任務併發執行。

  • 呼叫 MethodInvokingJobDetailFactoryBean#setTargetObject(...) + MethodInvokingJobDetailFactoryBean#setTargetMethod(...) 方法,設定任務呼叫 TransactionRecovery#startRecover(...) 方法執行。

如果應用叢集部署,會不會相同事務被多個定時任務同時重試

答案是不會,事務在重試時會樂觀鎖更新,同時只有一個應用節點能更新成功。

官方解釋:多機部署下,所有機器都宕機,從異常中恢復時,所有的機器豈不是都可以查詢到所有的需要恢復的服務?

當然極端情況下,Socket 呼叫超時時間大於事務重試間隔,第一個節點在重試某個事務,一直未執行完成,第二個節點已經可以重試。

ps:建議,Socket 呼叫超時時間小於事務重試間隔。

是否定時任務和應用伺服器解耦

螞蟻金服的分散式事務服務 DTS 採用 client-server 樣式:

  • xts-client :負責事務的建立、提交、回滾、記錄。

  • xts-server :負責異常事務的恢復。

FROM 《螞蟻金融雲 DTS 檔案》 
分散式事務服務 (Distributed Transaction Service, DTS) 是一個分散式事務框架,用來保障在大規模分散式環境下事務的最終一致性。DTS 從架構上分為 xts-client 和 xts-server 兩部分,前者是一個嵌入客戶端應用的 JAR 包,主要負責事務資料的寫入和處理;後者是一個獨立的系統,主要負責異常事務的恢復。

4. 異常事務恢復

org.mengyun.tcctransaction.recover.TransactionRecovery,異常事務恢復,實現主體程式碼如下:

“`Java
public class TransactionRecovery {

/**
* 啟動恢復事務邏輯
*/

public void startRecover() {
   // 載入異常事務集合
   List transactions = loadErrorTransactions();
   // 恢復異常事務集合
   recoverErrorTransactions(transactions);
}

}
“`

4.1 載入異常事務集合

呼叫 #loadErrorTransactions() 方法,載入異常事務集合。實現程式碼如下:

private List loadErrorTransactions() {
  TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
  long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
  RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();
  return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));
}
  • 異常事務的定義:當前時間超過 – 事務變更時間( 最後執行時間 ) >= 事務恢復間隔( RecoverConfig#getRecoverDuration() )。這裡有一點要註意,已完成的事務會從事務儲存器刪除。

4.2 恢復異常事務集合

呼叫 #recoverErrorTransactions(...) 方法,恢復異常事務集合。實現程式碼如下:

private void recoverErrorTransactions(List transactions) {
  for (Transaction transaction : transactions) {
      // 超過最大重試次數
      if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
          logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));
          continue;
      }
      // 分支事務超過最大可重試時間
      if (transaction.getTransactionType().equals(TransactionType.BRANCH)
              && (transaction.getCreateTime().getTime() +
              transactionConfigurator.getRecoverConfig().getMaxRetryCount() *
                      transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000
              > System.currentTimeMillis())) {
          continue;
      }
      // Confirm / Cancel
      try {
          // 增加重試次數
          transaction.addRetriedCount();
          // Confirm
          if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
              transaction.changeStatus(TransactionStatus.CONFIRMING);
              transactionConfigurator.getTransactionRepository().update(transaction);
              transaction.commit();
              transactionConfigurator.getTransactionRepository().delete(transaction);
          // Cancel
          } else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)
                  || transaction.getTransactionType().equals(TransactionType.ROOT)) { // 處理延遲取消的情況
              transaction.changeStatus(TransactionStatus.CANCELLING);
              transactionConfigurator.getTransactionRepository().update(transaction);
              transaction.rollback();
              transactionConfigurator.getTransactionRepository().delete(transaction);
          }
      } catch (Throwable throwable) {
          if (throwable instanceof OptimisticLockException
                  || ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {
              logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
          } else {
              logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
          }
      }
  }
}
  • 當單個事務超過最大重試次數時,不再重試,只打印異常,此時需要人工介入解決。可以接入 ELK 收集日誌監控報警。

  • 分支事務超過最大可重試時間時,不再重試。可能有同學和我一開始理解的是相同的,實際分支事務對應的應用伺服器也可以重試分支事務,不是必須根事務發起重試,從而一起重試分支事務。這點要註意下。

  • 當事務處於 TransactionStatus.CONFIRMING 狀態時,提交事務,邏輯和 TransactionManager#commit() 類似。

  • 當事務處於 TransactionStatus.CONFIRMING 狀態,或者事務型別為根事務,回滾事務,邏輯和 TransactionManager#rollback() 類似。這裡加判斷的事務型別為根事務,用於處理延遲回滾異常的事務的回滾。

666. 彩蛋

在寫本文的過程中,無意中翻到螞蟻雲的檔案,分享給看到此處的真愛們。

真愛們,請猛擊《AntCloudPayPublic》跳轉。

胖友,分享一個朋友圈可好?

贊(0)

分享創造快樂