歡迎光臨
每天分享高質量文章

註冊中心 Eureka 原始碼解析 —— 任務批處理

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


本文主要基於 Eureka 1.8.X 版本

  • 1. 概述

  • 2. 整體流程

  • 3. 任務處理器

  • 4. 建立任務分發器

    • 4.1 批次任務執行分發器

    • 4.2 單任務執行分發器

  • 5. 建立任務接收執行器

  • 6. 建立任務執行器

    • 6.1 建立批次任務執行器

    • 6.2 建立單任務執行器

    • 6.3 工作執行緒抽象類

  • 7. 網路通訊整形器

  • 8. 任務接收執行器【處理任務】

  • 9. 任務接收執行緒【排程任務】

  • 10. 任務執行器【執行任務】

    • 10.1 批次任務工作執行緒

    • 10.2 單任務工作執行緒

  • 666. 彩蛋


1. 概述

本文主要分享 任務批處理。Eureka-Server 叢集透過任務批處理同步應用實體註冊實體,所以本文也是為 Eureka-Server 叢集同步的分享做鋪墊。

本文涉及類在 com.netflix.eureka.util.batcher 包下,涉及到主體類的類圖如下( 開啟大圖 ):

  • 紫色部分 —— 任務分發器

  • 藍色部分 —— 任務接收器

  • 紅色部分 —— 任務執行器

  • 綠色部分 —— 任務處理器

  • 黃色部分 —— 任務持有者( 任務 )

推薦 Spring Cloud 書籍

  • 請支援正版。下載盜版,等於主動編寫低階 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

推薦 Spring Cloud 影片

  • Java 微服務實踐 – Spring Boot

  • Java 微服務實踐 – Spring Cloud

  • Java 微服務實踐 – Spring Boot / Spring Cloud

2. 整體流程

任務執行的整體流程如下( 開啟大圖 ):

  • 細箭頭 —— 任務執行經歷的操作

  • 粗箭頭 —— 任務佇列流轉的方向

  • 不同於一般情況下,任務提交了立即同步或非同步執行,任務的執行拆分了三層佇列

    • 藍線:分發器在收到任務執行請求後,提交到接收佇列,任務實際未執行

    • 黃線:執行器的工作執行緒處理任務失敗,將符合條件( 見 「3. 任務處理器」 )的失敗任務提交到重新執行佇列。

    • 第一層,接收佇列( acceptorQueue ),重新處理佇列( reprocessQueue )。

  • 第二層,待執行佇列( processingOrder )
    * 粉線:接收執行緒( Runner )將重新執行佇列,接收佇列提交到待執行佇列。

  • 第三層,工作佇列( workQueue )
    * 粉線:接收執行緒( Runner )將待執行佇列的任務根據引數( maxBatchingSize )將任務合併成批次任務,排程( 提交 )到工作佇列。
    * 黃線:執行器的工作執行緒,一個工作執行緒可以拉取一個批次任務進行執行。

  • 三層佇列的好處

    • 接收佇列,避免處理任務的阻塞等待。

    • 接收執行緒( Runner )合併任務,將相同任務編號( 是的,任務是帶有編號的 )的任務合併,只執行一次。

    • Eureka-Server 為叢集同步提供批次操作多個應用實體的介面,一個批次任務可以一次排程介面完成,避免多次呼叫的開銷。當然,這樣做的前提是合併任務,這也導致 Eureka-Server 叢集之間對應用實體的註冊和下線帶來更大的延遲。畢竟,Eureka 是在 CAP 之間,選擇了 AP

3. 任務處理器

com.netflix.eureka.util.batcher.TaskProcessor ,任務處理器介面。介面程式碼如下:

// ... 省略程式碼,超過微信文章上限
  • ProcessingResult ,處理任務結果。

    • `Success` ,成功。

    • `Congestion` ,擁擠錯誤,任務將會被重試。例如,請求被限流。

    • `TransientError` ,瞬時錯誤,任務將會被重試。例如,網路請求超時。

    • `PermanentError` ,永久錯誤,任務將會被丟棄。例如,執行時發生程式異常。

  • #process(task) 方法,處理單任務。

  • #process(tasks) 方法,處理批次任務。

4. 建立任務分發器

com.netflix.eureka.util.batcher.TaskDispatcher ,任務分發器介面。介面程式碼如下:

// ... 省略程式碼,超過微信文章上限
  • #process(…) 方法,提交任務編號,任務,任務過期時間給任務分發器處理。

com.netflix.eureka.util.batcher.TaskDispatchers ,任務分發器工廠類,用於建立任務分發器。其內部提供兩種任務分發器的實現:

  • 批次任務執行的分發器,用於 Eureka-Server 叢集註冊資訊的同步任務。

  • 單任務執行的分發器,用於 Eureka-Server 向亞馬遜 AWS 的 ASG ( Autoscaling Group ) 同步狀態。雖然本系列暫時對 AWS 相關的不做解析,從工具類的角度來說,本文會對該分發器進行分享。

com.netflix.eureka.cluster.ReplicationTaskProcessor ,實現 TaskDispatcher ,Eureka-Server 叢集任務處理器。感興趣的同學,可以點選連結自己研究,我們將在 《Eureka 原始碼解析 —— Eureka-Server 叢集同步》 有詳細解析。

4.1 批次任務執行分發器

呼叫 TaskDispatchers#createBatchingTaskDispatcher(...) 方法,建立批次任務執行的分發器,實現程式碼如下:

// TaskDispatchers.java
  1/**
  2:  * 建立批次任務執行的分發器
  3:  *
  4:  * @param id 任務執行器編號
  5:  * @param maxBufferSize 待執行佇列最大數量
  6:  * @param workloadSize 單個批次任務包含任務最大數量
  7:  * @param workerCount 任務執行器工作執行緒數
  8:  * @param maxBatchingDelay 批次任務等待最大延遲時長,單位:毫秒
  9:  * @param congestionRetryDelayMs 請求限流延遲重試時間,單位:毫秒
 10:  * @param networkFailureRetryMs 網路失敗延遲重試時長,單位:毫秒
 11:  * @param taskProcessor 任務處理器
 12:  * @param  任務編號泛型
 13:  * @param  任務泛型
 14:  * @return 批次任務執行的分發器
 15:  */

 // ... 省略程式碼,超過微信文章上限
  • 第 1 至 23 行 :方法引數。比較多哈,請耐心理解。

    • `workloadSize` 引數,單個批次任務包含任務最大數量。

    • `taskProcessor` 引數,自定義任務執行器實現

  • 第 24 至 27 行 :建立任務接收執行器。在 「5. 建立任務接收器」 詳細解析。

  • 第 28 至 29 行 :建立批次任務執行器。在 「6.1 建立批次任務執行器」 詳細解析。

  • 第 30 至 42 行 :建立批次任務分發器。

    • 第 32 至 35 行 :`#process()` 方法的實現,呼叫 `AcceptorExecutor#process(…)` 方法,提交 [ 任務編號 , 任務 , 任務過期時間 ] 給任務分發器處理。

4.2 單任務執行分發器

呼叫 TaskDispatchers#createNonBatchingTaskDispatcher(...) 方法,建立單任務執行的分發器,實現程式碼如下:

  1/**
  2:  * 建立單任務執行的分發器
  3:  *
  4:  * @param id 任務執行器編號
  5:  * @param maxBufferSize 待執行佇列最大數量
  6:  * @param workerCount 任務執行器工作執行緒數
  7:  * @param maxBatchingDelay 批次任務等待最大延遲時長,單位:毫秒
  8:  * @param congestionRetryDelayMs 請求限流延遲重試時間,單位:毫秒
  9:  * @param networkFailureRetryMs 網路失敗延遲重試時長,單位:毫秒
 10:  * @param taskProcessor 任務處理器
 11:  * @param  任務編號泛型
 12:  * @param  任務泛型
 13:  * @return 單任務執行的分發器
 14:  */

 15public static  TaskDispatcher createNonBatchingTaskDispatcher(String id,
 16:                                                                             int maxBufferSize,
 17:                                                                             int workerCount,
 18:                                                                             long maxBatchingDelay,
 19:                                                                             long congestionRetryDelayMs,
 20:                                                                             long networkFailureRetryMs,
 21:                                                                             TaskProcessor taskProcessor)
 
{
 22:     // 建立 任務接收執行器
 23:     final AcceptorExecutor acceptorExecutor = new AcceptorExecutor<>(
 24:             id, maxBufferSize, /* workloadSize = 1 */1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
 25:     );
 26:     final TaskExecutors taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
 27:     return new TaskDispatcher() {
 28:         @Override
 29:         public void process(ID id, T task, long expiryTime) {
 30:             acceptorExecutor.process(id, task, expiryTime);
 31:         }
 32
 33:         @Override
 34:         public void shutdown() {
 35:             acceptorExecutor.shutdown();
 36:             taskExecutor.shutdown();
 37:         }
 38:     };
 39: }
  • 第 1 至 21 行 :方法引數。比較多哈,請耐心理解。

    • `workloadSize` 引數,相比 `#createBatchingTaskDispatcher(…)` 少這個引數。在第 24 行,你會發現該引數傳遞給 AcceptorExecutor 使用 1 噢

    • `taskProcessor` 引數,自定義任務執行器實現

  • 第 21 至 25 行 :建立任務接收執行器。和 #createBatchingTaskDispatcher(…) 只差 workloadSize = 1 引數。在 「5. 建立任務接收器」 詳細解析。

  • 第 28 至 29 行 :建立任務執行器。和 `#createBatchingTaskDispatcher(…)` 差別很大。「6.2 建立單任務執行器」 詳細解析。

  • 第 30 至 42 行 :建立任務分發器。和 #createBatchingTaskDispatcher(…) 一樣。

5. 建立任務接收執行器

com.netflix.eureka.util.batcher.AcceptorExecutor ,任務接收執行器。建立構造方法程式碼如下:

// ... 省略程式碼,超過微信文章上限
  • 第 5 至 61 行 :屬性。比較多哈,請耐心理解。

    • 眼尖如你,會發現 AcceptorExecutor 即存在單任務工作佇列( `singleItemWorkQueue` ),又存在批次任務工作佇列( `batchWorkQueue` ) ,在 「9. 任務接收執行緒【排程任務】」 會解答這個疑惑。

  • 第 78 至 79 行 :建立網路通訊整形器。在 「7. 網路通訊整形器」 詳細解析。

  • 第 81 至 85 行 :建立接收任務執行緒

6. 建立任務執行器

com.netflix.eureka.util.batcher.TaskExecutors ,任務執行器。其內部提供建立單任務和批次任務執行器的兩種方法。TaskExecutors 構造方法如下:

// ... 省略程式碼,超過微信文章上限
  • workerThreads 屬性,工作執行緒工作任務佇列會被工作執行緒池併發拉取,併發執行

  • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnableFactory ,建立工作執行緒工廠介面。單任務和批次任務執行器的工作執行緒實現不同,透過自定義工廠實現類建立。

6.1 建立批次任務執行器

呼叫 TaskExecutors#batchExecutors(...) 方法,建立批次任務執行器。實現程式碼如下:

/**
* 建立批次任務執行器
*
@param name 任務執行器名
@param workerCount 任務執行器工作執行緒數
@param processor 任務處理器
@param acceptorExecutor 接收任務執行器
@param  任務編號泛型
@param  任務泛型
@return 批次任務執行器
*/

// ... 省略程式碼,超過微信文章上限
  • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.BatchWorkerRunnable ,批次任務工作執行緒。

6.2 建立單任務執行器

呼叫 TaskExecutors#singleItemExecutors(...) 方法,建立批次任務執行器。實現程式碼如下:

/**
* 建立單任務執行器
*
@param name 任務執行器名
@param workerCount 任務執行器工作執行緒數
@param processor 任務處理器
@param acceptorExecutor 接收任務執行器
@param  任務編號泛型
@param  任務泛型
@return 單任務執行器
*/

// ... 省略程式碼,超過微信文章上限
  • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.SingleTaskWorkerRunnable ,單任務工作執行緒。

6.3 工作執行緒抽象類

com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable ,任務工作執行緒抽象類。BatchWorkerRunnable 和 SingleTaskWorkerRunnable 都實現該類,差異在 #run() 的自定義實現。WorkerRunnable 實現程式碼如下:

// ... 省略程式碼,超過微信文章上限

7. 網路通訊整形器

com.netflix.eureka.util.batcher.TrafficShaper ,網路通訊整形器。當任務執行發生請求限流,或是請求網路失敗的情況,則延時 AcceptorRunner 將任務提交到工作任務佇列,從而避免任務很快去執行,再次發生上述情況。TrafficShaper 實現程式碼如下:

// ... 省略程式碼,超過微信文章上限
  • #registerFailure(…) ,在任務執行失敗時,提交任務結果給 TrafficShaper ,記錄發生時間。在 「10. 任務執行器【執行任務】」 會看到呼叫該方法。

  • #transmissionDelay(…) ,計算提交延遲,單位:毫秒。「9. 任務接收執行緒【排程任務】」 會看到呼叫該方法。

8. 任務接收執行器【處理任務】

呼叫 AcceptorExecutor#process(...) 方法,新增任務到接收任務佇列。實現程式碼如下:

// AcceptorExecutor.java
// ... 省略程式碼,超過微信文章上限
  • com.netflix.eureka.util.batcher.TaskHolder ,任務持有者,實現程式碼如下:

    // ... 省略程式碼,超過微信文章上限

9. 任務接收執行緒【排程任務】

後臺執行緒執行 AcceptorRunner#run(...) 方法,排程任務。實現程式碼如下:

// ... 省略程式碼,超過微信文章上限
  • 第 4 行 :無限迴圈執行排程,直到關閉。

  • 第 6 至 7 行 :呼叫 #drainInputQueues() 方法,迴圈處理完輸入佇列( 接收佇列 + 重新執行佇列 ),直到有待執行的任務。實現程式碼如下:

    // ... 省略程式碼,超過微信文章上限
    • 第 4 行 :優先從重新執行任務的隊尾拿較新的任務,從而實現保留更新的任務在待執行任務對映( pendingTasks ) 裡。

    • 第 12 行 :新增任務編號到待執行佇列( processingOrder ) 的頭部。效果如下圖:

    • 第 15 至 18 行 :如果待執行佇列( pendingTasks )已滿,清空重新執行佇列( processingOrder ),放棄較早的任務。

    • 重新執行佇列( reprocessQueue ) 和接收佇列( acceptorQueue )為空

    • 待執行任務對映( pendingTasks )不為空

    • 第 2 行 && 第 18 行 :迴圈,直到同時滿足如下全部條件:

    • 第 3 至 4 行 :處理完重新執行佇列( reprocessQueue )。實現程式碼如下:

      // ... 省略程式碼,超過微信文章上限
    • 第 5 至 6 行 :處理完接收佇列( acceptorQueue ),實現程式碼如下:

      // ... 省略程式碼,超過微信文章上限
    • 第 8 至 17 行 :當所有佇列為空,阻塞從接收佇列( acceptorQueue ) 拉取任務 10 ms。若拉取到,新增到待執行佇列( processingOrder )。

  • 第 12 至 16 行 :計算可排程任務的最小時間( scheduleTime )。

    • 當 scheduleTime 小於當前時間,不重新計算,即此時需要延遲等待排程。

    • 當 scheduleTime 大於等於當前時間,配合 TrafficShaper#transmissionDelay(…) 重新計算。

  • 第 19 行 :當 scheduleTime 小於當前時間,執行任務的排程。

  • 第 21 行 :呼叫 #assignBatchWork() 方法,排程批次任務。實現程式碼如下:

    // ... 省略程式碼,超過微信文章上限
    • x

    • 第 2 行 :呼叫 #hasEnoughTasksForNextBatch() 方法,判斷是否有足夠任務進行下一次批次任務排程:1)待執行任務( processingOrder )對映已滿;或者 2)到達批次任務處理最大等待延遲。實現程式碼如下:

      // ... 省略程式碼,超過微信文章上限
    • 第 5 至 17 行 :獲取批次任務( holders )。? 你會發現,本文說了半天的批次任務,實際是 List>

哈。

  • 第 4 行 :獲取批次任務工作請求訊號量( batchWorkRequests ) 。在任務執行器的批次任務執行器,每次執行時,發出 batchWorkRequests 。每一個訊號量需要保證獲取到一個批次任務

  • 第 19 至 20 行 :未排程到批次任務,釋放請求訊號量,代表請求實際未完成,每一個訊號量需要保證獲取到一個批次任務

  • 第 21 至 24 行 :新增批次任務到批次任務工作佇列。

  • 第 23 行 :呼叫 #assignSingleItemWork() 方法,排程單任務。

  • 第 23 行 :呼叫 #assignSingleItemWork() 方法,排程單任務,和 #assignBatchWork() 方法類似。實現程式碼如下:

    // ... 省略程式碼,超過微信文章上限
    • x

  • 第 26 至 31 行 :當排程任務前的待執行任務數( totalItems )等於當前待執行佇列( processingOrder )的任務數,意味著:1)任務執行器無任務請求,正在忙碌處理之前的任務;或者 2)任務延遲排程。睡眠 10 秒,避免資源浪費。

  • 10. 任務執行器【執行任務】

    10.1 批次任務工作執行緒

    批次任務工作後臺執行緒( BatchWorkerRunnable )執行 #run(...) 方法,排程任務。實現程式碼如下:

    // 
    // ... 省略程式碼,超過微信文章上限
    • 第 4 行 :無限迴圈執行排程,直到關閉。

    • 第 6 行 :呼叫 getWork() 方法,獲取一個批次任務直到成功。實現程式碼如下:

      // ... 省略程式碼,超過微信文章上限
      • 註意,批次任務工作佇列( batchWorkQueue ) 和單任務工作佇列( singleItemWorkQueue ) 是不同的佇列

      • 第 3 行 :呼叫 TaskDispatcher#requestWorkItems() 方法,發起請求訊號量,並獲得批次任務的工作佇列。實現程式碼如下:

        // TaskDispatcher.java
        // ... 省略程式碼,超過微信文章上限
      • 第 5 至 8 行 :迴圈獲取一個批次任務,直到成功。

    • 第 12 行 :呼叫 #getTasksOf(...) 方法,獲得實際批次任務。實現程式碼如下:

      // ... 省略程式碼,超過微信文章上限
      • x

    • 第 14 至 24 行 :呼叫處理器( TaskProcessor ) 執行任務。當任務執行結果為 Congestion 或 TransientError ,呼叫 AcceptorExecutor#reprocess(...) 提交整個批次任務重新處理,實現程式碼如下:

      // AcceptorExecutor.java
      // ... 省略程式碼,超過微信文章上限

    10.2 單任務工作執行緒

    單任務工作後臺執行緒( SingleTaskWorkerRunnable )執行 #run(...) 方法,排程任務,和 BatchWorkerRunnable#run(...) 基本類似,就不囉嗦了。實現程式碼如下:

    @Override
    // SingleTaskWorkerRunnable.java
    // ... 省略程式碼,超過微信文章上限

    666. 彩蛋

    ? 又是一篇長文。建議邊看程式碼,邊對照著整體流程圖,理解實際不難。

    當然,歡迎你有任何疑問,在我的公眾號( 芋道原始碼 ) 留言。

    胖友,分享我的公眾號( 芋道原始碼 ) 給你的胖友可好?




    如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

    知識星球

    目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

    01. 除錯環境搭建
    02. 專案結構一覽
    03. 配置 Configuration
    04. 核心流程一覽

    05. 拓展機制 SPI

    06. 執行緒池

    07. 服務暴露 Export

    08. 服務取用 Refer

    09. 註冊中心 Registry

    10. 動態編譯 Compile

    11. 動態代理 Proxy

    12. 服務呼叫 Invoke

    13. 呼叫特性 

    14. 過濾器 Filter

    15. NIO 伺服器

    16. P2P 伺服器

    17. HTTP 伺服器

    18. 序列化 Serialization

    19. 叢集容錯 Cluster

    20. 優雅停機

    21. 日誌適配

    22. 狀態檢查

    23. 監控中心 Monitor

    24. 管理中心 Admin

    25. 運維命令 QOS

    26. 鏈路追蹤 Tracing


    一共 60 篇++

    贊(0)

    分享創造快樂