本文作者:李釗,公眾號「咖啡拿鐵」作者,分散式事務 Seata 社群 Contributor。
1.關於 Seata
在前不久,我寫了一篇關於分散式事務中介軟體 Fescar 的解析,沒過幾天 Fescar 團隊對其進行了品牌升級,取名為 Seata(Simpe Extensible Autonomous Transcaction Architecture),而以前的 Fescar 的英文全稱為 Fast & EaSy Commit And Rollback。可以看見 Fescar 從名字上來看更加侷限於 Commit 和 Rollback,而新的品牌名字 Seata 旨在打造一套一站式分散式事務解決方案。更換名字之後,我對其未來的發展更有信心。
這裡先大概回憶一下 Seata 的整個過程模型:
-
TM:事務的發起者。用來告訴 TC,全域性事務的開始,提交,回滾。
-
RM:具體的事務資源,每一個 RM 都會作為一個分支事務註冊在 TC。
-
TC 事務的協調者。也可以看做是 Fescar-Server,用於接收我們的事務的註冊,提交和回滾。
在之前的文章中對整個角色有個大體的介紹,在這篇文章中我將重點介紹其中的核心角色 TC,也就是事務協調器。
2.Transaction Coordinator
為什麼之前一直強調 TC 是核心呢?那因為 TC 這個角色就好像上帝一樣,管控著芸芸眾生的 RM 和 TM。如果 TC 一旦不好使,那麼 RM 和 TM 一旦出現小問題,那必定會亂的一塌糊塗。所以要想瞭解 Seata,那麼必須要瞭解他的 TC。
那麼一個優秀的事務協調者應該具備哪些能力呢?我覺得應該有以下幾個:
-
正確的協調:能正確的協調 RM 和 TM 接下來應該做什麼,做錯了應該怎麼辦,做對了應該怎麼辦。
-
高可用:事務協調器在分散式事務中很重要,如果不能保證高可用,那麼他也沒有存在的必要了。
-
高效能:事務協調器的效能一定要高,如果事務協調器效能有瓶頸,那麼他所管理的 RM 和 TM 會經常遇到超時,從而引起回滾頻繁。
-
高擴充套件性:這個特點是屬於程式碼層面的,如果是一個優秀的框架,那麼需要給使用方很多自定義擴充套件,比如服務註冊/發現,讀取配置等等。
下麵我也將逐步闡述 Seata 是如何做到上面四點。
2.1 Seata-Server 的設計
Seata-Server 整體的模組圖如上所示:
-
Coordinator Core:最下麵的模組是事務協調器核心程式碼,主要用來處理事務協調的邏輯,如是否 Commit、Rollback 等協調活動。
-
Store:儲存模組,用來將資料持久化,防止重啟或者宕機資料丟失。
-
Discover:服務註冊/發現模組,用於將 Server 地址暴露給 Client。
-
Config:用來儲存和查詢服務端的配置。
-
Lock:鎖模組,用於給 Seata 提供全域性鎖的功能。
-
Rpc:用於和其他端通訊。
-
HA-Cluster:高可用叢集,目前還沒開源,為 Seata 提供可靠的高可用功能。
2.2 Discover
首先來講講比較基礎的 Discover 模組,又稱服務註冊/發現模組。我們將 Seata-Server 啟動之後,需要將自己的地址暴露給其他使用者,那麼就需要這個模組幫忙。
這個模組有個核心介面 RegistryService,如上圖所示:
-
register:服務端使用,進行服務註冊。
-
unregister:服務端使用,一般在 JVM 關閉鉤子,ShutdownHook 中呼叫。
-
subscribe:客戶端使用,註冊監聽事件,用來監聽地址的變化。
-
unsubscribe:客戶端使用,取消註冊監聽事件。
-
lookup:客戶端使用,根據 Key 查詢服務地址串列。
-
close:都可以使用,用於關閉 Register 資源。
如果需要新增自己定義的服務註冊/發現,那麼實現這個介面即可。截止目前在社群的不斷開發推動下,已經有四種服務註冊/發現,分別是 redis、zk、nacos、eruka。下麵簡單介紹下 Nacos 的實現:
2.2.1 register 介面
step1:校驗地址是否合法;
step2:獲取 Nacos 的 Name 實體,然後將地址註冊到當前 Cluster 名稱上面。
unregister 介面類似,這裡不做詳解。
2.2.2 lookup 介面
step1:獲取當前 clusterName 名字;
step2:判斷當前 Cluster 是否已經獲取過了,如果獲取過就從 Map 中取;
step3:從 Nacos 拿到地址資料,將其轉換成所需要的;
step4:將事件變動的 Listener 註冊到 Nacos。
2.2.3 subscribe 介面
這個介面比較簡單,具體分兩步:
step1:將 Clstuer 和 Listener 新增進 Map 中;
step2:向 Nacos 註冊。
2.3 Config
配置模組也是一個比較基礎,比較簡單的模組。我們需要配置一些常用的引數比如:Netty 的 Select 執行緒數量,Work 執行緒數量,Session 允許最大為多少等等,當然這些引數在 Seata 中都有自己的預設設定。
同樣的在 Seata 中也提供了一個介面 Configuration,用來自定義需要的獲取配置的地方:
-
getInt/Long/Boolean/Config():透過 DataId 來獲取對應的值。
-
putConfig:用於新增配置。
-
removeConfig:刪除一個配置。
-
add/remove/get ConfigListener:新增/刪除/獲取 配置監聽器,一般用來監聽配置的變更。
目前為止有四種方式獲取 Config:File(檔案獲取)、Nacos、Apollo、ZK。在 Seata 中首先需要配置 registry.conf,來配置 conf 的型別。實現 conf 比較簡單這裡就不深入分析。
2.4 Store
儲存層的實現對於 Seata 是否高效能,是否可靠非常關鍵。
如果儲存層沒有實現好,那麼如果發生宕機,在 TC 中正在進行分散式事務處理的資料將會被丟失。既然使用了分散式事務,那麼其肯定不能容忍丟失。如果儲存層實現好了,但是其效能有很大問題,RM 可能會發生頻繁回滾那麼其完全無法應對高併發的場景。
在 Seata 中預設提供了檔案方式的儲存,下麵定義儲存的資料為 Session,而 TM 創造的全域性事務資料叫 GlobalSession,RM 創造的分支事務叫 BranchSession,一個 GlobalSession 可以擁有多個 BranchSession。我們的目的就是要將這麼多 Session 儲存下來。
在 FileTransactionStoreManager#writeSession 程式碼中:
上面的程式碼主要分為下麵幾步:
step1:生成一個 TransactionWriteFuture。
step2:將這個 futureRequest 丟進一個 LinkedBlockingQueue 中。為什麼需要將所有資料都丟進佇列中呢?當然這裡其實也可以用鎖來實現,在另外一個阿裡開源的 RocketMQ 中使用的鎖。不論是佇列還是鎖,他們的目的是為了保證單執行緒寫,這又是為什麼呢?有人會解釋說,需要保證順序寫,這樣速度就很快,這個理解是錯誤的,我們的 FileChannel 其實是執行緒安全的,已經能保證順序寫了。保證單執行緒寫其實是為了讓這個寫邏輯都是單執行緒的,因為可能有些檔案寫滿或者記錄寫資料位置等等邏輯,當然這些邏輯都可以主動加鎖去做,但是為了實現簡單方便,直接再整個寫邏輯加鎖是最為合適的。
step3:呼叫 future.get,等待該條資料寫邏輯完成通知。
我們將資料提交到佇列之後,接下來需要對其進行消費,程式碼如下:
這裡將一個 WriteDataFileRunnable() 提交進執行緒池,這個 Runnable 的 run() 方法如下:
分為下麵幾步:
step1:判斷是否停止,如果 stopping 為 true 則傳回 null。
step2:從佇列中獲取資料。
step3:判斷 future 是否已經超時了,如果超時,則設定結果為 false,此時我們生產者 get() 方法會接觸阻塞。
step4:將資料寫進檔案,此時資料還在 pageCache 層並沒有掃清到磁碟,如果寫成功然後根據條件判斷是否進行刷盤操作。
step5:當寫入數量到達一定的時候,或者寫入時間到達一定的時候,需要將當前的檔案儲存為歷史檔案,刪除以前的歷史檔案,然後建立新的檔案。這一步是為了防止檔案無限增長,大量無效資料浪費磁碟資源。
在 writeDataFile 中有如下程式碼:
step1:首先獲取 ByteBuffer,如果超出最大迴圈 BufferSize 就直接建立一個新的,否則就使用快取的 Buffer。這一步可以很大的減少 GC。
step2:然後將資料新增進入 ByteBuffer。
step3:最後將 ByteBuffer 寫入 fileChannel,這裡會重試三次。此時的資料還在 pageCache 層,受兩方面的影響,OS 有自己的掃清策略,但是這個業務程式不能控制,為了防止宕機等事件出現造成大量資料丟失,所以就需要業務自己控制 flush。下麵是 flush 的程式碼:
這裡 flush 的條件寫入一定數量或者寫的時間超過一定時間,這樣也會有個小問題如果是停電,那麼 pageCache 中有可能還有資料並沒有被刷盤,會導致少量的資料丟失。目前還不支援同步樣式,也就是每條資料都需要做刷盤操作,這樣可以保證每條訊息都落盤,但是效能也會受到極大的影響,當然後續會不斷的演進支援。
Store 核心流程主要是上面幾個方法,當然還有一些比如 Session 重建等,這些比較簡單,讀者可以自行閱讀。
2.5 Lock
大家知道資料庫實現隔離級別主要是透過鎖來實現的,同樣的再分散式事務框架 Seata 中要實現隔離級別也需要透過鎖。一般在資料庫中資料庫的隔離級別一共有四種:讀未提交、讀已提交、可重覆讀、序列化。在 Seata 中可以保證寫的互斥,而讀的隔離級別一般是未提交,但是提供了達到讀已提交隔離的手段。
Lock 模組也就是 Seata 實現隔離級別的核心模組。在 Lock 模組中提供了一個介面用於管理鎖:
其中有三個方法:
-
acquireLock:用於對 BranchSession 加鎖,這裡雖然是傳的分支事務 Session,實際上是對分支事務的資源加鎖,成功傳回 true。
-
isLockable:根據事務 ID,資源 ID,鎖住的 Key 來查詢是否已經加鎖。
-
cleanAllLocks:清除所有的鎖。
對於鎖我們可以在本地實現,也可以透過 redis 或者 mysql 來幫助我們實現。官方預設提供了本地全域性鎖的實現:
在本地鎖的實現中有兩個常量需要關註:
-
BUCKET_PER_TABLE:用來定義每個 table 有多少個 bucket,目的是為了後續對同一個表加鎖的時候減少競爭。
-
LOCK_MAP:這個 Map 從定義上來看非常複雜,裡裡外外套了很多層 Map,這裡用個表格具體說明一下:
層數 |
key |
value |
1-LOCK_MAP |
resourceId(jdbcUrl) |
dbLockMap |
2-dbLockMap |
tableName (表名) |
tableLockMap |
3-tableLockMap |
PK.hashcode%Bucket (主鍵值的 hashcode%bucket) |
bucketLockMap |
4-bucketLockMap |
PK |
trascationId |
可以看見實際上的加鎖在 bucketLockMap 這個 Map 中,這裡具體的加鎖方法比較簡單就不作詳細闡述,主要是逐步的找到 bucketLockMap ,然後將當前 TrascationId 塞進去,如果這個主鍵當前有 TranscationId,那麼比較是否是自己,如果不是則加鎖失敗。
2.6 RPC
保證 Seata 高效能的關鍵之一也是使用了 Netty 作為 RPC 框架,採用預設配置的執行緒模型如下圖所示:
如果採用預設的基本配置那麼會有一個 Acceptor 執行緒用於處理客戶端的連結,會有 cpu*2 數量的 NIO-Thread,再這個執行緒中不會做業務太重的事情,只會做一些速度比較快的事情,比如編解碼,心跳事件和TM註冊。一些比較費時間的業務操作將會交給業務執行緒池,預設情況下業務執行緒池配置為最小執行緒為 100,最大為 500。
這裡需要提一下的是 Seata 的心跳機制,這裡是使用 Netty 的 IdleStateHandler 完成的,如下:
在 Server 端對於寫沒有設定最大空閑時間,對於讀設定了最大空閑時間,預設為 15s,如果超過 15s 則會將連結斷開,關閉資源。
step1:判斷是否是讀空閑的檢測事件;
step2:如果是,則斷開連結,關閉資源。
2.7 HA-Cluster
目前官方沒有公佈 HA-Cluster,但是透過一些其他中介軟體和官方的一些透露,可以將 HA-Cluster 用如下方式設計:
具體的流程如下:
step1:客戶端釋出資訊的時候根據 TranscationId 保證同一個 Transcation 是在同一個 Master 上,透過多個 Master 水平擴充套件,提供併發處理效能。
step2:在 Server 端中一個 Master 有多個 Slave,Master 中的資料近實時同步到 Slave上,保證當 Master 宕機的時候,還能有其他 Slave 頂上來可以用。
當然上述一切都是猜測,具體的設計實現還得等 0.5 版本之後。目前有一個 Go 版本的 Seata-Server 也捐贈給了 Seata (還在流程中),其透過 Raft 實現副本一致性,其他細節不是太清楚。
2.8 Metrics & Tracing
這個模組也是一個沒有具體公佈實現的模組,當然有可能會提供外掛口,讓其他第三方 metric 接入進來。另外最近 Apache SkyWalking 正在和 Seata 小組商討如何接入進來。
3.Coordinator Core
上面我們講了很多 Server 基礎模組,想必大家對 Seata 的實現已經有個大概,接下來我會講解事務協調器具體邏輯是如何實現的,讓大家更加瞭解 Seata 的實現內幕。
3.1 啟動流程
啟動方法在 Server 類有個 main 方法,定義了我們啟動流程:
step1:建立一個 RpcServer,再這個裡麵包含了我們網路的操作,用 Netty 實現了服務端。
step2:解析埠號和檔案地址。
step3:初始化 SessionHolder,其中最重要的重要就是重我們 dataDir 這個檔案夾中恢復我們的資料,重建我們的Session。
step4:建立一個CoorDinator,這個也是我們事務協調器的邏輯核心程式碼,然後將其初始化,其內部初始化的邏輯會建立四個定時任務:
-
retryRollbacking:重試 rollback 定時任務,用於將那些失敗的 rollback 進行重試的,每隔 5ms 執行一次。
-
retryCommitting:重試 commit 定時任務,用於將那些失敗的 commit 進行重試的,每隔 5ms 執行一次。
-
asyncCommitting:非同步 commit 定時任務,用於執行非同步的 commit,每隔 10ms 一次。
-
timeoutCheck:超時定時任務檢測,用於檢測超時的任務,然後執行超時的邏輯,每隔 2ms 執行一次。
step5: 初始化 UUIDGenerator 這個也是我們生成各種 ID(transcationId,branchId) 的基本類。
step6:將本地 IP 和監聽埠設定到 XID 中,初始化 RpcServer 等待客戶端的連線。
啟動流程比較簡單,下麵我會介紹分散式事務框架中的常見的一些業務邏輯 Seata 是如何處理的。
3.2 Begin – 開啟全域性事務
一次分散式事務的起始點一定是開啟全域性事務,首先我們看看全域性事務 Seata 是如何實現的:
step1: 根據應用 ID,事務分組,名字,超時時間建立一個 GlobalSession,這個再前面也提到過他和 branchSession 分別是什麼。
step2:對其新增一個 RootSessionManager 用於監聽一些事件,這裡要說一下目前在 Seata 裡面有四種型別的 Listener (這裡要說明的是所有的 sessionManager 都實現了 SessionLifecycleListener):
-
ROOTSESSIONMANAGER:最全,最大的,擁有所有的 Session。
-
ASYNCCOMMITTINGSESSION_MANAGER:用於管理需要做非同步 commit 的 Session。
-
RETRYCOMMITTINGSESSION_MANAGER:用於管理重試 commit 的 Session。
-
RETRYROLLBACKINGSESSION_MANAGER:用於管理重試回滾的 Session。
由於這裡是開啟事務,其他 SessionManager 不需要關註,我們只新增 RootSessionManager 即可。
step3:開啟 GlobalSession:
這一步會把狀態變為 Begin,記錄開始時間,並且呼叫 RootSessionManager的 onBegin 監聽方法,將 Session 儲存到 Map 並寫入到我們的檔案。
step4:最後傳回 XID,這個 XID 是由 ip+port+transactionId 組成的,非常重要,當 TM 申請到之後需要將這個 ID 傳到 RM 中,RM 透過 XID 來決定到底應該訪問哪一臺 Server。
3.3 BranchRegister – 分支事務註冊
當全域性事務在 TM 開啟之後,RM 的分支事務也需要註冊到全域性事務之上,這裡看看是如何處理的:
step1:透過 transactionId 獲取並校驗全域性事務是否是開啟狀態。
step2:建立一個新的分支事務,也就是 BranchSession。
step3:對分支事務進行加全域性鎖,這裡的邏輯就是使用鎖模組的邏輯。
step4:新增 branchSession,主要是將其新增到 GlobalSession 物件中,並寫入到我們的檔案中。
step5:傳回 branchId,這個 ID 也很重要,我們後續需要用它來回滾我們的事務,或者對我們分支事務狀態更新。
分支事務註冊之後,還需要彙報分支事務的後續狀態到底是成功還是失敗,在 Server 目前只是簡單的做一下儲存記錄,彙報的目的是,就算這個分支事務失敗,如果 TM 還是執意要提交全域性事務,那麼再遍歷提交分支事務的時候,這個失敗的分支事務就不需要提交。
3.4 GlobalCommit – 全域性提交
當分支事務執行完成之後,就輪到 TM – 事務管理器來決定是提交還是回滾,如果是提交,那麼就會走到下麵的邏輯:
step1:首先找到 GlobalSession。如果他為 Null 證明已經被 Commit 過了,那麼直接冪等操作,傳回成功。
step2:關閉 GlobalSession 防止再次有新的 branch 進來。
step3:如果 status 是等於 Begin,那麼久證明還沒有提交過,改變其狀態為 Committing 也就是正在提交。
step4:判斷是否是可以非同步提交,目前只有AT樣式可以非同步提交,因為是透過 Undolog 的方式去做的。MT 和 TCC 都需要走同步提交的程式碼。
step5:如果是非同步提交,直接將其放進 ASYNCCOMMITTINGSESSION_MANAGER,讓其再後臺執行緒非同步去做 step6,如果是同步的那麼直接執行 step6。
step6:遍歷 BranchSession 進行提交,如果某個分支事務失敗,根據不同的條件來判斷是否進行重試,非同步不需要重試,因為其本身都在 manager 中,只要沒有成功就不會被刪除會一直重試,如果是同步提交的會放進非同步重試佇列進行重試。
3.5 GlobalRollback – 全域性回滾
如果 TM 決定全域性回滾,那麼會走到下麵的邏輯:
這個邏輯和提交流程基本一致,可以看作是他的反向,這裡就不展開講了。
4.總結
最後再總結一下開始我們提出了分散式事務的關鍵四點,Seata 到底是怎麼解決的:
-
正確的協調:透過後臺定時任務各種正確的重試,並且未來會推出監控平臺有可能可以手動回滾。
-
高可用: 透過 HA-Cluster 保證高可用。
-
高效能:檔案順序寫,RPC 透過 Netty 實現,Seata 未來可以水平擴充套件,提高處理效能。
-
高擴充套件性:提供給使用者可以自由實現的地方,比如配置,服務發現和註冊,全域性鎖等等。
最後希望大家能從這篇文章能瞭解 Seata-Server 的核心設計原理,當然你也可以想象如果你自己去實現一個分散式事務的 Server 應該怎樣去設計?
文中涉及的相關連結
- Seata github 地址:
https://github.com/seata/seata
- 延伸閱讀:螞蟻金服分散式事務開源以及實踐 | SOFA 開源一週年獻禮