(點選上方公眾號,可快速關註)
來源:謝晞鳴 ,
fdx321.github.io/2017/08/17/【RocketMQ原始碼學習】2-Namesrv/
1. Namesrv 簡介
Namesrv 可以理解為一個註冊中心, 整個Namesrv的程式碼非常簡單,主要包含兩塊功能:
-
管理一些 KV 的配置
-
管理一些 Topic、Broker的註冊資訊
2. Namesrv 啟動過程
啟動過程主要涉及 NamesrvStartup/NamesrvController 兩個類, NamesrvStartup 負責解析命令列的一些引數到各種 Config 物件中(NamesrvConfig/NettyServerConfig等),如果命令列引數中帶有配置檔案的路徑,也會從配置檔案中讀取配置到各種 Config 物件中,然後初始化 NamesrvController,配置shutdownHook, 啟動 NamesrvController。 NamesrvController 會去初始化和啟動各個元件,主要是:
-
建立NettyServer,註冊 requestProcessor,用於處理不同的網路請求
-
啟動 NettyServer
-
啟動各種 scheduled task.
不僅僅 Namesrv 是這樣,其他模組在啟動過程中也都是 startup/controller/config 一起完成這樣的套路。
3. Namesrv 主要元件
Processor 執行緒池,nettyServer 接收到請求後,封裝成任務提交到該執行緒池。
remoting 模組維護了這樣一個 processorTable:
HashMap
> processorTable
一個 processor 可以處理多個 request code, 多個 processor 也可以共用一個執行緒池。對於 Namesrv, 只有一個 processor 執行緒池,給兩個 Processor 共享。
DefaultRequestProcessor(Namesrv 還有一個 ClusterTestRequestProcessor 繼承了該 Processor,在 clusterTest enable的情* 況下使用它來 getRouteInfoByTopic),用來處理 namesrv 接收到的所有 RequestCode, Processor 內部會根據不同的RequestCode 呼叫不同的方法。
KVConfigManager, 維護了一些KV方式的配置資料,可以根據請求,執行新增、刪除、查詢等操作
RouteInfoManager, 維護了topic/broker/cluster/filter這些東西的路由資訊,同樣支援增刪改查的操作
schedued 執行緒,按一定的頻率做兩個事情,掃描不活躍的broker;列印所有KV配置資訊
4. 以broker註冊為例看下Namesrv的工作過程
1. DefaultRequestProcessor 處理來自 NettyServer的 [RemotingCommand] request, 如果 request.getCode 是 RequestCode.REGISTER_BROKER, 就去註冊。這裡會根據request.version來判斷,從V3_0_11 開始支援了FilterServer。
2. 從 request 解碼得到 RegisterBrokerRequestHeader, 包含以下欄位:
-
brokerName, // 預設是BrokerConfig裡的獲得的locakHostName
-
brokerAddr, //brokerConfig.getBrokerIP1() + “:” + nettyServerConfig.getListenPort()
-
clusterName, //預設是BrokerConfig的”DefaultCluster”
-
haServerAddr, //brokerConfig.getBrokerIP2() + “:” + messageStoreConfig.getHaListenPort()
-
brokerId, //如果是MASTER,就是MixAll.MASTER_ID(也就0),否則就是其他
3. 從 request.body 解碼得到 RegisterBrokerBody, RegisterBrokerBody 包含以下內容,用JSON的方式來描述吧:
{
“topicConfigSerializeWrapper”: {
“topicConfigTable”:{
“topic_xxx”:{
“defaultReadQueueNums”:”16″,
“defaultWriteQueueNums”:”16″,
“topicName”:”xxx”,
“readQueueNums”:””,
“writeQueueNums”:””,
“perm”:””,
“topicFilterType”:””,
“topicSysFlag”:””,
“order”:””
},
},
“dataVersion”:{
“timestamp”:”xxxx”,
“counter”:”xxxx”
}
},
“filterServerList”:[
“”,//filterServerAddr
]
}
4. 在 clusterAddrTable 中新增一條記錄
5. 在 brokerAddrTable 中新增一條記錄,這裡會構建一個BrokerData
{
“cluster”:”xxx”,
“brokerName”:”xxx”,
“brokerAddrs”:{
“brokerId_xx”:”broker address xxx”
}
}
6. 如果是第一次註冊或者topicConfig發生了變更,會去更新topicQueueTable
7. 在brokerLiveTable新增該broker
8. 在filterServerTable新增這些filterServer的地址串列
5.其他
以上內容看下來,namesrv 是一個無狀態的應用,可以水平任意擴充套件。每一個 broker 都會和所有的 namesrv 保持長連線(有個scheduled task會按一定頻率給所有namesrv做register broker的操作),所以 namesrv 之間沒有主從關係,也不需要複製資料。client(producer/consumer) 隨機選一個 namesrv 連線。client 中的 namesrv 地址串列是怎麼來的呢,有兩種方式:
-
透過命令列或配置檔案在啟動的時候獲得的
-
透過 Scheduled task,按一定的頻率從一個 web 服務 fetch的(web服務可以自建),如果有變更,就更新這個 namesrv 地址串列。
client 選擇 namesrv的過程如下, index遞增取模,然並不是每次都這麼乾,取到後會快取起來。
if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null)
return channelNew;
}
}
看到這裡我產生了疑問,那豈不是每個 client 啟動的時候都取的是第一個 namesrv,它不會壓力很大嗎,後來發現 namesrvIndex 的初始值是隨機的。
以上所有扯淡都是基於原始碼 https://github.com/apache/incubator-rocketmq (tag:rocketmq-all-4.1.0-incubating)所貼程式碼有所刪減。
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能