歡迎光臨
每天分享高質量文章

訊息佇列中介軟體 RocketMQ 原始碼分析 —— Message 儲存

原文地址:http://www.yunai.me/RocketMQ/message-store/ 

(建議使用原文地址閱讀:1、閱讀體驗;2、程式碼排版混亂因而省略;)

RocketMQ 帶註釋原始碼地址 :https://github.com/YunaiV/incubator-rocketmq 
?本系列每 1-2 周更新一篇,歡迎訂閱、關註、收藏 公眾號


  • 1、概述

  • 2、CommitLog 結構

  • 3、CommitLog 儲存訊息

    • MappedFile#落盤

    • FlushRealTimeService

    • CommitRealTimeService

    • GroupCommitService

    • CommitLog#putMessage(…)

    • MappedFileQueue#getLastMappedFile(…)

    • MappedFile#appendMessage(…)

    • DefaultAppendMessageCallback#doAppend(…)

    • FlushCommitLogService

  • 結尾

1、概述

本文接《RocketMQ 原始碼分析 —— Message 傳送與接收》;
主要解析 CommitLog 儲存訊息部分。

2、CommitLog 結構

CommitLogMappedFileQueueMappedFile 的關係如下:

CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N。

反應到系統檔案如下:

Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:27 00000000000000000000
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:29 00000000001073741824
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000002147483648
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:33 00000000003221225472
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000004294967296

CommitLogMappedFileQueueMappedFile 的定義如下:

  • MappedFile :00000000000000000000、00000000001073741824、00000000002147483648等檔案。

  • MappedFileQueue :MappedFile 所在的檔案夾,對 MappedFile 進行封裝成檔案佇列,對上層提供可無限使用的檔案容量。

    • 每個 MappedFile 統一檔案大小。

    • 檔案命名方式:fileName[n] = fileName[n – 1] + mappedFileSize。在 CommitLog 裡預設為 1GB。

  • CommitLog :針對 MappedFileQueue 的封裝使用。

CommitLog 目前儲存在 MappedFile 有兩種內容型別:

  1. MESSAGE :訊息。

  2. BLANK :檔案不足以儲存訊息時的空白佔位。

CommitLog 儲存在 MappedFile的結構:

MESSAGE[1] MESSAGE[2] MESSAGE[n – 1] MESSAGE[n] BLANK

MESSAGE 在 CommitLog 儲存結構:

第幾位 欄位 說明 資料型別 位元組數
1 MsgLen 訊息總長度 Int 4
2 MagicCode MESSAGE_MAGIC_CODE Int 4
3 BodyCRC 訊息內容CRC Int 4
4 QueueId 訊息佇列編號 Int 4
5 Flag flag Int 4
6 QueueOffset 訊息佇列位置 Long 8
7 PhysicalOffset 物理位置。在 CommitLog 的順序儲存位置。 Long 8
8 SysFlag MessageSysFlag Int 4
9 BornTimestamp 生成訊息時間戳 Long 8
10 BornHost 生效訊息的地址+埠 Long 8
11 StoreTimestamp 儲存訊息時間戳 Long 8
12 StoreHost 儲存訊息的地址+埠 Long 8
13 ReconsumeTimes 重新消費訊息次數 Int 4
14 PreparedTransationOffset Long 8
15 BodyLength + Body 內容長度 + 內容 Int + Bytes 4 + bodyLength
16 TopicLength + Topic Topic長度 + Topic Byte + Bytes 1 + topicLength
17 PropertiesLength + Properties 拓展欄位長度 + 拓展欄位 Short + Bytes 2 + PropertiesLength

BLANK 在 CommitLog 儲存結構:

第幾位 欄位 說明 資料型別 位元組數
1 maxBlank 空白長度 Int 4
2 MagicCode BLANK_MAGIC_CODE Int 4

3、CommitLog 儲存訊息

CommitLog#putMessage(…)

// 省略程式碼
  • 說明 :儲存訊息,並傳回儲存結果。

  • 第 2 行 :設定儲存時間等。

  • 第 16 至 36 行 :事務訊息相關,暫未瞭解。

  • 第 45 & 97 行 :獲取鎖與釋放鎖。

  • 第 52 行 :再次設定儲存時間。目前會有多處地方設定儲存時間。

  • 第 55 至 62 行 :獲取 MappedFile,若不存在或已滿,則進行建立。詳細解析見:MappedFileQueue#getLastMappedFile(…)。

  • 第 65 行 :插入訊息到 MappedFile,解析解析見:MappedFile#appendMessage(…)。

  • 第 69 至 80 行 :MappedFile 已滿,建立新的,再次插入訊息

  • 第 116 至 140 行 :訊息刷盤,即持久化到檔案。上面插入訊息實際未儲存到硬碟。此處,根據不同的刷盤策略,執行會有不同。詳細解析見:FlushCommitLogService。

  • 第 143 至 173 行 :Broker 主從同步。後面的文章會詳細解析?。

MappedFileQueue#getLastMappedFile(…)

// 省略程式碼
  • 說明 :獲取最後一個 MappedFile,若不存在或檔案已滿,則進行建立。

  • 第 5 至 11 行 :計算當檔案不存在或已滿時,新建立檔案的 createOffset

  • 第 14 行 :計算檔案名。從此處我們可
    以得知,MappedFile的檔案命名規則:

> fileName[n] = fileName[n - 1] + n * mappedFileSize
> fileName[0] = startOffset - (startOffset % this.mappedFileSize)

目前 `CommitLog` 的 `startOffset` 為 0。
此處有個**疑問**,為什麼需要 `(startOffset % this.mappedFileSize)`。例如:

| startOffset  | mappedFileSize | createOffset |
| --- | :-- | :-- |
| 5 | 1 | 5 |
| 5 | 2 | 4 |
| 5 | 3 | 3  |
| 5 | 4 | 4 |
| 5 | > 5 | 0 |

_如果有知道的同學,麻煩提示下。?_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 計算出來的是,以 `this.mappedFileSize` 為每個檔案大小時,`startOffset` 所在檔案的開始`offset`*
  • 第 30 至 35 行 :設定 MappedFile是否是第一個建立的檔案。該標識用於 ConsumeQueue 對應的 MappedFile ,詳見 ConsumeQueue#fillPreBlank

MappedFile#appendMessage(…)

 // 省略程式碼
  • 說明 :插入訊息到 MappedFile,並傳回插入結果。

  • 第 8 行 :獲取需要寫入的位元組緩衝區。為什麼會有 writeBuffer != null 的判斷後,使用不同的位元組緩衝區,見:FlushCommitLogService。

  • 第 9 至 11 行 :設定寫入 position,執行寫入,更新 wrotePosition(當前寫入位置,下次開始寫入開始位置)。

DefaultAppendMessageCallback#doAppend(…)

// 省略程式碼
  • 說明 :插入訊息到位元組緩衝區。

  • 第 45 行 :計算物理位置。在 CommitLog 的順序儲存位置。

  • 第 47 至 49 行 :計算 CommitLog 裡的 offsetMsgId。這裡一定要和 msgId 區分開。

計算方式 長度
offsetMsgId Broker儲存時生成 Hex(storeHostBytes, wroteOffset) 32
msgId Client傳送訊息時生成 Hex(行程編號, IP, ClassLoader, startTime, currentTime, 自增序列) 32 《RocketMQ 原始碼分析 —— Message 基礎》
  • 第 51 至 61 行 :獲取佇列位置(offset)。

  • 第 78 至 95 行 :計算訊息總長度。

  • 第 98 至 112 行 :當檔案剩餘空間不足時,寫入 BLANK 佔位,傳回結果。

  • 第 114 至 161 行 :寫入 MESSAGE 。

  • 第 173 行 :更新佇列位置(offset)。

FlushCommitLogService

執行緒服務 場景 插入訊息效能
CommitRealTimeService 非同步刷盤 && 開啟記憶體位元組緩衝區 第一
FlushRealTimeService 非同步刷盤 && 關閉記憶體位元組緩衝區 第二
GroupCommitService 同步刷盤 第三

MappedFile#落盤

方式
方式一 寫入記憶體位元組緩衝區(writeBuffer) 從記憶體位元組緩衝區(write buffer)提交(commit)到檔案通道(fileChannel) 檔案通道(fileChannel)flush
方式二 寫入對映檔案位元組緩衝區(mappedByteBuffer) 對映檔案位元組緩衝區(mappedByteBuffer)flush

flush相關程式碼

考慮到寫入效能,滿足 flushLeastPages * OS_PAGE_SIZE 才進行 flush

// 省略程式碼

commit相關程式碼:

考慮到寫入效能,滿足 commitLeastPages * OS_PAGE_SIZE 才進行 commit

// 省略程式碼

FlushRealTimeService

訊息插入成功時,非同步刷盤時使用。

// 省略程式碼 
  • 說明:實時 flush 執行緒服務,呼叫 MappedFile#flush 相關邏輯。

  • 第 23 至 29 行 :每 flushPhysicQueueThoroughInterval 週期,執行一次 flush 。因為不是每次迴圈到都能滿足 flushCommitLogLeastPages 大小,因此,需要一定週期進行一次強制 flush 。當然,不能每次迴圈都去執行強制 flush,這樣效能較差。

  • 第 33 行 至 37 行 :根據 flushCommitLogTimed 引數,可以選擇每次迴圈是固定週期還是等待喚醒。預設配置是後者,所以,每次插入訊息完成,會去呼叫 commitLogService.wakeup() 。

  • 第 45 行 :呼叫 MappedFile 進行 flush

  • 第 61 至 65 行 :Broker 關閉時,強制 flush,避免有未刷盤的資料。

CommitRealTimeService

訊息插入成功時,非同步刷盤時使用。
和 FlushRealTimeService 類似,效能更好。

// 省略程式碼

GroupCommitService

訊息插入成功時,同步刷盤時使用。

// 省略程式碼
  • 說明:批次寫入執行緒服務。

  • 第 16 至 25 行 :新增寫入請求。方法設定了 sync 的原因:this.requestsWrite 會和 this.requestsRead 不斷交換,無法保證穩定的同步。

  • 第 27 至 34 行 :讀寫佇列交換。

  • 第 38 至 60 行 :迴圈寫入佇列,進行 flush

    • 第 43 行 :考慮到有可能每次迴圈的訊息寫入的訊息,可能分佈在兩個 MappedFile(寫第N個訊息時,MappedFile 已滿,建立了一個新的),所以需要有迴圈2次。

    • 第 51 行 :喚醒等待寫入請求執行緒,透過 CountDownLatch 實現

  • 第 61 至 66 行 :直接刷盤。此處是由於傳送的訊息的 isWaitStoreMsgOK 未設定成 TRUE ,導致未走批次提交。

  • 第 73 至 80 行 :每 10ms 執行一次批次提交。當然,如果 wakeup() 時,則會立即進行一次批次提交。當 Broker 設定成同步落盤 && 訊息 isWaitStoreMsgOK=true,訊息需要略大於 10ms 才能傳送成功。當然,效能相對非同步落盤較差,可靠性更高,需要我們在實際使用時去取捨。

結尾

寫的第二篇與RocketMQ原始碼相關的博文,看到有閱讀、點贊、收藏甚至訂閱,很受鼓舞。

《Message儲存》比起《Message傳送&接收》從難度上說是更大的,當然也是更有趣的,如果存在理解錯誤或者表達不清晰,還請大家多多包含。如果可以的話,還請麻煩新增 QQ:7685413 進行指出,避免自己的理解錯誤,給大家造成困擾。

推薦《Kafka設計解析(六)- Kafka高效能架構之道》,作者站在的高度比我高的多的多,嗯,按照李小璐的說法:高一個喜馬拉雅山。?認真啃讀《Linux核心設計與實現(原書第3版)》,day day up。

再次感謝大家的閱讀、點贊、收藏。

下一篇:《RocketMQ 原始碼分析 —— Message 拉取與消費》 起航!

贊(0)

分享創造快樂