原文地址:http://www.yunai.me/RocketMQ/message-store/
(建議使用原文地址閱讀:1、閱讀體驗;2、程式碼排版混亂因而省略;)
RocketMQ
帶註釋原始碼地址 :https://github.com/YunaiV/incubator-rocketmq
?本系列每 1-2 周更新一篇,歡迎訂閱、關註、收藏 公眾號
-
1、概述
-
2、CommitLog 結構
-
3、CommitLog 儲存訊息
-
MappedFile#落盤
-
FlushRealTimeService
-
CommitRealTimeService
-
GroupCommitService
-
CommitLog#putMessage(…)
-
MappedFileQueue#getLastMappedFile(…)
-
MappedFile#appendMessage(…)
-
DefaultAppendMessageCallback#doAppend(…)
-
FlushCommitLogService
-
結尾
1、概述
本文接《RocketMQ 原始碼分析 —— Message 傳送與接收》;
主要解析 CommitLog
儲存訊息部分。
2、CommitLog 結構
CommitLog
、MappedFileQueue
、MappedFile
的關係如下:
CommitLog
: MappedFileQueue
: MappedFile
= 1 : 1 : N。
反應到系統檔案如下:
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296
CommitLog
、MappedFileQueue
、MappedFile
的定義如下:
-
MappedFile
:00000000000000000000、00000000001073741824、00000000002147483648等檔案。 -
MappedFileQueue
:MappedFile
所在的檔案夾,對MappedFile
進行封裝成檔案佇列,對上層提供可無限使用的檔案容量。 -
每個
MappedFile
統一檔案大小。 -
檔案命名方式:fileName[n] = fileName[n – 1] + mappedFileSize。在
CommitLog
裡預設為 1GB。 -
CommitLog
:針對MappedFileQueue
的封裝使用。
CommitLog
目前儲存在 MappedFile
有兩種內容型別:
-
MESSAGE :訊息。
-
BLANK :檔案不足以儲存訊息時的空白佔位。
CommitLog
儲存在 MappedFile
的結構:
MESSAGE[1] MESSAGE[2] … MESSAGE[n – 1] MESSAGE[n] BLANK
MESSAGE
在 CommitLog
儲存結構:
第幾位 | 欄位 | 說明 | 資料型別 | 位元組數 |
---|---|---|---|---|
1 | MsgLen | 訊息總長度 | Int | 4 |
2 | MagicCode | MESSAGE_MAGIC_CODE | Int | 4 |
3 | BodyCRC | 訊息內容CRC | Int | 4 |
4 | QueueId | 訊息佇列編號 | Int | 4 |
5 | Flag | flag | Int | 4 |
6 | QueueOffset | 訊息佇列位置 | Long | 8 |
7 | PhysicalOffset | 物理位置。在 CommitLog 的順序儲存位置。 |
Long | 8 |
8 | SysFlag | MessageSysFlag | Int | 4 |
9 | BornTimestamp | 生成訊息時間戳 | Long | 8 |
10 | BornHost | 生效訊息的地址+埠 | Long | 8 |
11 | StoreTimestamp | 儲存訊息時間戳 | Long | 8 |
12 | StoreHost | 儲存訊息的地址+埠 | Long | 8 |
13 | ReconsumeTimes | 重新消費訊息次數 | Int | 4 |
14 | PreparedTransationOffset | Long | 8 | |
15 | BodyLength + Body | 內容長度 + 內容 | Int + Bytes | 4 + bodyLength |
16 | TopicLength + Topic | Topic長度 + Topic | Byte + Bytes | 1 + topicLength |
17 | PropertiesLength + Properties | 拓展欄位長度 + 拓展欄位 | Short + Bytes | 2 + PropertiesLength |
BLANK
在 CommitLog
儲存結構:
第幾位 | 欄位 | 說明 | 資料型別 | 位元組數 |
---|---|---|---|---|
1 | maxBlank | 空白長度 | Int | 4 |
2 | MagicCode | BLANK_MAGIC_CODE | Int | 4 |
3、CommitLog 儲存訊息
CommitLog#putMessage(…)
// 省略程式碼
-
說明 :儲存訊息,並傳回儲存結果。
-
第 2 行 :設定儲存時間等。
-
第 16 至 36 行 :事務訊息相關,暫未瞭解。
-
第 45 & 97 行 :獲取鎖與釋放鎖。
-
第 52 行 :再次設定儲存時間。目前會有多處地方設定儲存時間。
-
第 55 至 62 行 :獲取
MappedFile
,若不存在或已滿,則進行建立。詳細解析見:MappedFileQueue#getLastMappedFile(…)。 -
第 65 行 :插入訊息到
MappedFile
,解析解析見:MappedFile#appendMessage(…)。 -
第 69 至 80 行 :
MappedFile
已滿,建立新的,再次插入訊息。 -
第 116 至 140 行 :訊息刷盤,即持久化到檔案。上面插入訊息實際未儲存到硬碟。此處,根據不同的刷盤策略,執行會有不同。詳細解析見:FlushCommitLogService。
-
第 143 至 173 行 :
Broker
主從同步。後面的文章會詳細解析?。
MappedFileQueue#getLastMappedFile(…)
// 省略程式碼
-
說明 :獲取最後一個
MappedFile
,若不存在或檔案已滿,則進行建立。 -
第 5 至 11 行 :計算當檔案不存在或已滿時,新建立檔案的
createOffset
。 -
第 14 行 :計算檔案名。從此處我們可
以得知,MappedFile
的檔案命名規則:
> fileName[n] = fileName[n - 1] + n * mappedFileSize
> fileName[0] = startOffset - (startOffset % this.mappedFileSize)
目前 `CommitLog` 的 `startOffset` 為 0。
此處有個**疑問**,為什麼需要 `(startOffset % this.mappedFileSize)`。例如:
| startOffset | mappedFileSize | createOffset |
| --- | :-- | :-- |
| 5 | 1 | 5 |
| 5 | 2 | 4 |
| 5 | 3 | 3 |
| 5 | 4 | 4 |
| 5 | > 5 | 0 |
_如果有知道的同學,麻煩提示下。?_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 計算出來的是,以 `this.mappedFileSize` 為每個檔案大小時,`startOffset` 所在檔案的開始`offset`*
-
第 30 至 35 行 :設定
MappedFile
是否是第一個建立的檔案。該標識用於ConsumeQueue
對應的MappedFile
,詳見ConsumeQueue#fillPreBlank
。
MappedFile#appendMessage(…)
// 省略程式碼
-
說明 :插入訊息到
MappedFile
,並傳回插入結果。 -
第 8 行 :獲取需要寫入的位元組緩衝區。為什麼會有
writeBuffer != null
的判斷後,使用不同的位元組緩衝區,見:FlushCommitLogService。 -
第 9 至 11 行 :設定寫入
position
,執行寫入,更新wrotePosition
(當前寫入位置,下次開始寫入開始位置)。
DefaultAppendMessageCallback#doAppend(…)
// 省略程式碼
-
說明 :插入訊息到位元組緩衝區。
-
第 45 行 :計算物理位置。在
CommitLog
的順序儲存位置。 -
第 47 至 49 行 :計算
CommitLog
裡的offsetMsgId
。這裡一定要和msgId
區分開。
計算方式 | 長度 | |||
---|---|---|---|---|
offsetMsgId | Broker儲存時生成 | Hex(storeHostBytes, wroteOffset) | 32 | |
msgId | Client傳送訊息時生成 | Hex(行程編號, IP, ClassLoader, startTime, currentTime, 自增序列) | 32 | 《RocketMQ 原始碼分析 —— Message 基礎》 |
-
第 51 至 61 行 :獲取佇列位置(offset)。
-
第 78 至 95 行 :計算訊息總長度。
-
第 98 至 112 行 :當檔案剩餘空間不足時,寫入
BLANK
佔位,傳回結果。 -
第 114 至 161 行 :寫入
MESSAGE
。 -
第 173 行 :更新佇列位置(offset)。
FlushCommitLogService
執行緒服務 | 場景 | 插入訊息效能 |
---|---|---|
CommitRealTimeService | 非同步刷盤 && 開啟記憶體位元組緩衝區 | 第一 |
FlushRealTimeService | 非同步刷盤 && 關閉記憶體位元組緩衝區 | 第二 |
GroupCommitService | 同步刷盤 | 第三 |
MappedFile#落盤
方式 | |||
---|---|---|---|
方式一 | 寫入記憶體位元組緩衝區(writeBuffer) | 從記憶體位元組緩衝區(write buffer)提交(commit)到檔案通道(fileChannel) | 檔案通道(fileChannel)flush |
方式二 | 寫入對映檔案位元組緩衝區(mappedByteBuffer) | 對映檔案位元組緩衝區(mappedByteBuffer)flush |
flush相關程式碼
考慮到寫入效能,滿足 flushLeastPages * OS_PAGE_SIZE
才進行 flush
。
// 省略程式碼
commit相關程式碼:
考慮到寫入效能,滿足 commitLeastPages * OS_PAGE_SIZE
才進行 commit
。
// 省略程式碼
FlushRealTimeService
訊息插入成功時,非同步刷盤時使用。
// 省略程式碼
-
說明:實時
flush
執行緒服務,呼叫MappedFile#flush
相關邏輯。 -
第 23 至 29 行 :每
flushPhysicQueueThoroughInterval
週期,執行一次flush
。因為不是每次迴圈到都能滿足flushCommitLogLeastPages
大小,因此,需要一定週期進行一次強制flush
。當然,不能每次迴圈都去執行強制flush
,這樣效能較差。 -
第 33 行 至 37 行 :根據
flushCommitLogTimed
引數,可以選擇每次迴圈是固定週期還是等待喚醒。預設配置是後者,所以,每次插入訊息完成,會去呼叫commitLogService.wakeup()
。 -
第 45 行 :呼叫
MappedFile
進行flush
。 -
第 61 至 65 行 :
Broker
關閉時,強制flush
,避免有未刷盤的資料。
CommitRealTimeService
訊息插入成功時,非同步刷盤時使用。
和 FlushRealTimeService
類似,效能更好。
// 省略程式碼
GroupCommitService
訊息插入成功時,同步刷盤時使用。
// 省略程式碼
-
說明:批次寫入執行緒服務。
-
第 16 至 25 行 :新增寫入請求。方法設定了
sync
的原因:this.requestsWrite
會和this.requestsRead
不斷交換,無法保證穩定的同步。 -
第 27 至 34 行 :讀寫佇列交換。
-
第 38 至 60 行 :迴圈寫入佇列,進行
flush
。 -
第 43 行 :考慮到有可能每次迴圈的訊息寫入的訊息,可能分佈在兩個
MappedFile
(寫第N個訊息時,MappedFile
已滿,建立了一個新的),所以需要有迴圈2次。 -
第 51 行 :喚醒等待寫入請求執行緒,透過
CountDownLatch
實現 -
第 61 至 66 行 :直接刷盤。此處是由於傳送的訊息的
isWaitStoreMsgOK
未設定成TRUE
,導致未走批次提交。 -
第 73 至 80 行 :每 10ms 執行一次批次提交。當然,如果
wakeup()
時,則會立即進行一次批次提交。當Broker
設定成同步落盤 && 訊息isWaitStoreMsgOK=true
,訊息需要略大於 10ms 才能傳送成功。當然,效能相對非同步落盤較差,可靠性更高,需要我們在實際使用時去取捨。
結尾
寫的第二篇與RocketMQ原始碼相關的博文,看到有閱讀、點贊、收藏甚至訂閱,很受鼓舞。
《Message儲存》比起《Message傳送&接收》從難度上說是更大的,當然也是更有趣的,如果存在理解錯誤或者表達不清晰,還請大家多多包含。如果可以的話,還請麻煩新增 QQ:7685413 進行指出,避免自己的理解錯誤,給大家造成困擾。
推薦《Kafka設計解析(六)- Kafka高效能架構之道》,作者站在的高度比我高的多的多,嗯,按照李小璐的說法:高一個喜馬拉雅山。?認真啃讀《Linux核心設計與實現(原書第3版)》,day day up。
再次感謝大家的閱讀、點贊、收藏。
下一篇:《RocketMQ 原始碼分析 —— Message 拉取與消費》 起航!