歡迎光臨
每天分享高質量文章

訊息佇列中介軟體 RocketMQ 原始碼分析 —— Message 傳送與接收

原文地址:http://www.yunai.me/RocketMQ/message-send-and-receive/?mp 

(建議使用原文地址閱讀:1、閱讀體驗;2、程式碼排版混亂因而省略。)
RocketMQ 帶註釋原始碼地址 :https://github.com/YunaiV/incubator-rocketmq
?本系列每 1-2 周更新一篇,歡迎訂閱、關註、收藏 公眾號


  • 1、概述

  • 2、Producer 傳送訊息

    • DefaultMQProducerImpl#tryToFindTopicPublishInfo()

    • MQFaultStrategy

    • DefaultMQProducerImpl#sendKernelImpl()

    • MQFaultStrategy

    • LatencyFaultTolerance

    • LatencyFaultToleranceImpl

    • FaultItem

    • DefaultMQProducer#send(Message)

    • DefaultMQProducerImpl#sendDefaultImpl()

  • 3、Broker 接收訊息

    • AbstractSendMessageProcessor#msgCheck

    • SendMessageProcessor#sendMessage

    • DefaultMessageStore#putMessage

  • 4、某種結尾


1、概述

  1. Producer 傳送訊息。主要是同步傳送訊息原始碼,涉及到 非同步/Oneway傳送訊息,事務訊息會跳過。

  2. Broker 接收訊息。(儲存訊息在《RocketMQ 原始碼分析 —— Message 儲存》解析)

2、Producer 傳送訊息

DefaultMQProducer#send(Message)

  // .... 省略程式碼

  • 說明:傳送同步訊息,DefaultMQProducer#send(Message) 對 DefaultMQProducerImpl#send(Message)進行封裝。

DefaultMQProducerImpl#sendDefaultImpl()

 // .... 省略程式碼

  • 說明 :傳送訊息。步驟:獲取訊息路由資訊,選擇要傳送到的訊息佇列,執行訊息傳送核心方法,並對傳送結果進行封裝傳回。

  • 第 1 至 7 行:對sendsendDefaultImpl(...)進行封裝。

  • 第 20 行 :invokeID僅僅用於列印日誌,無實際的業務用途。

  • 第 25 行 :獲取 Topic路由資訊, 詳細解析見:DefaultMQProducerImpl#tryToFindTopicPublishInfo()

  • 第 30 & 34 行 :計算呼叫傳送訊息到成功為止的最大次數,併進行迴圈。同步或非同步傳送訊息會呼叫多次,預設配置為3次。

  • 第 36 行 :選擇訊息要傳送到的佇列,詳細解析見:MQFaultStrategy

  • 第 43 行 :呼叫傳送訊息核心方法,詳細解析見:DefaultMQProducerImpl#sendKernelImpl()

  • 第 46 行 :更新Broker可用性資訊。在選擇傳送到的訊息佇列時,會參考Broker傳送訊息的延遲,詳細解析見:MQFaultStrategy

  • 第 62 至 68 行:當丟擲RemotingException時,如果進行訊息傳送失敗重試,則可能導致訊息傳送重覆。例如,傳送訊息超時(RemotingTimeoutException),實際Broker接收到該訊息並處理成功。因此,Consumer在消費時,需要保證冪等性。

DefaultMQProducerImpl#tryToFindTopicPublishInfo()

 // .... 省略程式碼

  • 說明 :獲得 Topic釋出資訊。優先從快取topicPublishInfoTable,其次從Namesrv中獲得。

  • 第 3 行 :從快取topicPublishInfoTable中獲得 Topic釋出資訊。

  • 第 5 至 9 行 :從 Namesrv 中獲得 Topic釋出資訊。

  • 第 13 至 17 行 :當從 Namesrv 無法獲取時,使用 {@link DefaultMQProducer#createTopicKey} 對應的 Topic釋出資訊。目的是當 Broker 開啟自動建立 Topic開關時,Broker 接收到訊息後自動建立Topic,詳細解析見《RocketMQ 原始碼分析 —— Topic》。

MQFaultStrategy

MQFaultStrategy

 // .... 省略程式碼

  • 說明 :Producer訊息傳送容錯策略。預設情況下容錯策略關閉,即sendLatencyFaultEnable=false

  • 第 30 至 62 行 :容錯策略選擇訊息佇列邏輯。優先獲取可用佇列,其次選擇一個broker獲取佇列,最差傳回任意broker的一個佇列。

  • 第 64 行 :未開啟容錯策略選擇訊息佇列邏輯。

  • 第 74 至 79 行 :更新延遲容錯資訊。當 Producer 傳送訊息時間過長,則邏輯認為N秒內不可用。按照latencyMaxnotAvailableDuration的配置,對應如下:

    Producer傳送訊息消耗時長 Broker不可用時長
    >= 15000 ms 600 * 1000 ms
    >= 3000 ms 180 * 1000 ms
    >= 2000 ms 120 * 1000 ms
    >= 1000 ms 60 * 1000 ms
    >= 550 ms 30 * 1000 ms
    >= 100 ms 0 ms
    >= 50 ms 0 ms

LatencyFaultTolerance

  // .... 省略程式碼

  • 說明 :延遲故障容錯介面

LatencyFaultToleranceImpl

  // .... 省略程式碼

  • 說明 :延遲故障容錯實現。維護每個物件的資訊。

FaultItem

 // .... 省略程式碼

  • 說明 :物件故障資訊。維護物件的名字、延遲、開始可用的時間。

DefaultMQProducerImpl#sendKernelImpl()

 // .... 省略程式碼

  • 說明 :傳送訊息核心方法。該方法真正發起網路請求,傳送訊息給 Broker

  • 第 21 行 :生產訊息編號,詳細解析見《RocketMQ 原始碼分析 —— Message 基礎》。

  • 第 64 至 121 行 :構建傳送訊息請求SendMessageRequestHeader

  • 第 107 至 117 行 :執行 MQClientInstance#sendMessage(...) 發起網路請求。

3、Broker 接收訊息

SendMessageProcessor#sendMessage

 // .... 省略程式碼

  • #processRequest() 說明 :處理訊息請求。

  • #sendMessage() 說明 :傳送訊息,並傳回傳送訊息結果。

  • 第 51 至 55 行 :訊息配置(Topic配置)校驗,詳細解析見:AbstractSendMessageProcessor#msgCheck()。

  • 第 60 至 64 行 :訊息佇列編號小於0時,Broker 可以設定隨機選擇一個訊息佇列。

  • 第 72 至 103 行 :對RETRY型別的訊息處理。如果超過最大消費次數,則topic修改成”%DLQ%” + 分組名, 即加 死信隊 (Dead Letter Queue),詳細解析見:《RocketMQ 原始碼分析 —— Topic》。

  • 第 105 至 118 行 :建立MessageExtBrokerInner

  • 第 132 :儲存訊息,詳細解析見:DefaultMessageStore#putMessage()。

  • 第 133 至 183 行 :處理訊息傳送結果,設定響應結果和提示。

  • 第 186 至 214 行 :傳送成功,響應。這裡doResponse(ctx, request, response)進行響應,最後return null,原因是:響應給 Producer 可能發生異常,#doResponse(ctx, request, response)捕捉了該異常並輸出日誌。這樣做的話,我們進行排查 Broker 接收訊息成功後響應是否存在異常會方便很多。

AbstractSendMessageProcessor#msgCheck

 // .... 省略程式碼

  • 說明:校驗訊息是否正確,主要是Topic配置方面,例如:Broker 是否有寫入許可權,topic配置是否存在,佇列編號是否正確。

  • 第 11 至 18 行 :檢查Topic是否可以被髮送。目前是 {@link MixAll.DEFAULT_TOPIC} 不被允許傳送。

  • 第 20 至 51 行 :當找不到Topic配置,則進行建立。當然,建立會存在不成功的情況,例如說:defaultTopic 的Topic配置不存在,又或者是 存在但是不允許繼承,詳細解析見《RocketMQ 原始碼分析 —— Topic》。

DefaultMessageStore#putMessage

  // .... 省略程式碼
  • 說明:儲存訊息封裝,最終儲存需要 CommitLog 實現。

  • 第 7 至 27 行 :校驗 Broker 是否可以寫入。

  • 第 29 至 39 行 :訊息格式與大小校驗。

  • 第 47 行 :呼叫 CommitLong 進行儲存,詳細邏輯見:《RocketMQ 原始碼分析 —— Message 儲存》

4、某種結尾

感謝閱讀、收藏、點贊本文的工程師同學。

閱讀原始碼是件令自己很愉悅的事情,編寫原始碼解析是讓自己腦細胞死傷無數的過程,痛並快樂著。

如果有內容寫的存在錯誤,或是不清晰的地方,見笑了,?。歡迎加 QQ:7685413 我們一起探討,共進步。

再次感謝閱讀、收藏、點贊本文的工程師同學。




贊(0)

分享創造快樂