歡迎光臨
每天分享高質量文章

RocketMQ 原始碼學習 2 : Namesrv

(點選上方公眾號,可快速關註)


來源:謝晞鳴 ,

fdx321.github.io/2017/08/17/【RocketMQ原始碼學習】2-Namesrv/

1. Namesrv 簡介

Namesrv 可以理解為一個註冊中心, 整個Namesrv的程式碼非常簡單,主要包含兩塊功能:

  • 管理一些 KV 的配置

  • 管理一些 Topic、Broker的註冊資訊

2. Namesrv 啟動過程

啟動過程主要涉及 NamesrvStartup/NamesrvController 兩個類, NamesrvStartup 負責解析命令列的一些引數到各種 Config 物件中(NamesrvConfig/NettyServerConfig等),如果命令列引數中帶有配置檔案的路徑,也會從配置檔案中讀取配置到各種 Config 物件中,然後初始化 NamesrvController,配置shutdownHook, 啟動 NamesrvController。 NamesrvController 會去初始化和啟動各個元件,主要是:

  • 建立NettyServer,註冊 requestProcessor,用於處理不同的網路請求

  • 啟動 NettyServer

  • 啟動各種 scheduled task.

不僅僅 Namesrv 是這樣,其他模組在啟動過程中也都是 startup/controller/config 一起完成這樣的套路。

3. Namesrv 主要元件

Processor 執行緒池,nettyServer 接收到請求後,封裝成任務提交到該執行緒池。

remoting 模組維護了這樣一個 processorTable:

HashMap> processorTable

一個 processor 可以處理多個 request code, 多個 processor 也可以共用一個執行緒池。對於 Namesrv, 只有一個 processor 執行緒池,給兩個 Processor 共享。

DefaultRequestProcessor(Namesrv 還有一個 ClusterTestRequestProcessor 繼承了該 Processor,在 clusterTest enable的情* 況下使用它來 getRouteInfoByTopic),用來處理 namesrv 接收到的所有 RequestCode, Processor 內部會根據不同的RequestCode 呼叫不同的方法。

KVConfigManager, 維護了一些KV方式的配置資料,可以根據請求,執行新增、刪除、查詢等操作

RouteInfoManager, 維護了topic/broker/cluster/filter這些東西的路由資訊,同樣支援增刪改查的操作

schedued 執行緒,按一定的頻率做兩個事情,掃描不活躍的broker;列印所有KV配置資訊

4. 以broker註冊為例看下Namesrv的工作過程

1. DefaultRequestProcessor 處理來自 NettyServer的 [RemotingCommand] request, 如果 request.getCode 是 RequestCode.REGISTER_BROKER, 就去註冊。這裡會根據request.version來判斷,從V3_0_11 開始支援了FilterServer。

2. 從 request 解碼得到 RegisterBrokerRequestHeader, 包含以下欄位:

  • brokerName, // 預設是BrokerConfig裡的獲得的locakHostName

  • brokerAddr, //brokerConfig.getBrokerIP1() + “:” + nettyServerConfig.getListenPort()

  • clusterName, //預設是BrokerConfig的”DefaultCluster”

  • haServerAddr, //brokerConfig.getBrokerIP2() + “:” + messageStoreConfig.getHaListenPort()

  • brokerId, //如果是MASTER,就是MixAll.MASTER_ID(也就0),否則就是其他

3. 從 request.body 解碼得到 RegisterBrokerBody, RegisterBrokerBody 包含以下內容,用JSON的方式來描述吧:

{

  “topicConfigSerializeWrapper”: {

      “topicConfigTable”:{

         “topic_xxx”:{

           “defaultReadQueueNums”:”16″,

          “defaultWriteQueueNums”:”16″,

          “topicName”:”xxx”,

          “readQueueNums”:””,

          “writeQueueNums”:””,

          “perm”:””,

          “topicFilterType”:””,

          “topicSysFlag”:””,

          “order”:””

         },

      },

      “dataVersion”:{

         “timestamp”:”xxxx”,

         “counter”:”xxxx”

      }

   },

  “filterServerList”:[

     “”,//filterServerAddr

  ]

}

4. 在 clusterAddrTable 中新增一條記錄

5. 在 brokerAddrTable 中新增一條記錄,這裡會構建一個BrokerData

{

  “cluster”:”xxx”,

  “brokerName”:”xxx”,

  “brokerAddrs”:{

     “brokerId_xx”:”broker address xxx”

   }

}

6. 如果是第一次註冊或者topicConfig發生了變更,會去更新topicQueueTable

7. 在brokerLiveTable新增該broker

8. 在filterServerTable新增這些filterServer的地址串列

5.其他

以上內容看下來,namesrv 是一個無狀態的應用,可以水平任意擴充套件。每一個 broker 都會和所有的 namesrv 保持長連線(有個scheduled task會按一定頻率給所有namesrv做register broker的操作),所以 namesrv 之間沒有主從關係,也不需要複製資料。client(producer/consumer) 隨機選一個 namesrv 連線。client 中的 namesrv 地址串列是怎麼來的呢,有兩種方式:

  1. 透過命令列或配置檔案在啟動的時候獲得的

  2. 透過 Scheduled task,按一定的頻率從一個 web 服務 fetch的(web服務可以自建),如果有變更,就更新這個 namesrv 地址串列。

client 選擇 namesrv的過程如下, index遞增取模,然並不是每次都這麼乾,取到後會快取起來。

if (addrList != null && !addrList.isEmpty()) {

    for (int i = 0; i < addrList.size(); i++) {

        int index = this.namesrvIndex.incrementAndGet();

        index = Math.abs(index);

        index = index % addrList.size();

        String newAddr = addrList.get(index);

        this.namesrvAddrChoosed.set(newAddr);

        Channel channelNew = this.createChannel(newAddr);

        if (channelNew != null)

            return channelNew;

    }

}

看到這裡我產生了疑問,那豈不是每個 client 啟動的時候都取的是第一個 namesrv,它不會壓力很大嗎,後來發現 namesrvIndex 的初始值是隨機的。

以上所有扯淡都是基於原始碼 https://github.com/apache/incubator-rocketmq (tag:rocketmq-all-4.1.0-incubating)所貼程式碼有所刪減。

看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂