摘要: 原創出處 http://www.iocoder.cn/RocketMQ/high-availability/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!
本文主要基於 RocketMQ 4.0.x 正式版
-
1. 概述
-
2. Namesrv 高可用
-
2.1 Broker 註冊到 Namesrv
-
2.2 Producer、Consumer 訪問 Namesrv
-
3. Broker 高可用
-
3.2 Broker 主從
-
3.1.1 配置
-
3.1.2 元件
-
3.1.3 通訊協議
-
3.1.4 Slave
-
3.1.5 Master
-
3.1.6 Master_SYNC
-
3.2 Producer 傳送訊息
-
3.3 Consumer 消費訊息
-
4. 總結
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
1. 概述
本文主要解析 Namesrv
、 Broker
如何實現高可用, Producer
、 Consumer
怎麼與它們通訊保證高可用。
2. Namesrv 高可用
啟動多個 Namesrv
實現高可用。
相較於 Zookeeper
、 Consul
、 Etcd
等, Namesrv
是一個超輕量級的註冊中心,提供命名服務。
2.1 Broker 註冊到 Namesrv
-
? 多個
Namesrv
之間,沒有任何關係(不存在類似Zookeeper
的Leader
/Follower
等角色),不進行通訊與資料同步。透過Broker
迴圈註冊多個Namesrv
。
1: // ⬇️⬇️⬇️【BrokerOuterAPI.java】
2: public RegisterBrokerResult registerBrokerAll(
3: final String clusterName,
4: final String brokerAddr,
5: final String brokerName,
6: final long brokerId,
7: final String haServerAddr,
8: final TopicConfigSerializeWrapper topicConfigWrapper,
9: final List<String> filterServerList,
10: final boolean oneway,
11: final int timeoutMills) {
12: RegisterBrokerResult registerBrokerResult = null;
13:
14: List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
15: if (nameServerAddressList != null) {
16: for (String namesrvAddr : nameServerAddressList) { // 迴圈多個 Namesrv
17: try {
18: RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
19: haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
20: if (result != null) {
21: registerBrokerResult = result;
22: }
23:
24: log.info("register broker to name server {} OK", namesrvAddr);
25: } catch (Exception e) {
26: log.warn("registerBroker Exception, {}", namesrvAddr, e);
27: }
28: }
29: }
30:
31: return registerBrokerResult;
32: }
2.2 Producer、Consumer 訪問 Namesrv
-
?
Producer
、Consumer
從Namesrv
串列選擇一個可連線的進行通訊。
1: // ⬇️⬇️⬇️【NettyRemotingClient.java】
2: private Channel getAndCreateNameserverChannel() throws InterruptedException {
3: // 傳回已選擇、可連線Namesrv
4: String addr = this.namesrvAddrChoosed.get();
5: if (addr != null) {
6: ChannelWrapper cw = this.channelTables.get(addr);
7: if (cw != null && cw.isOK()) {
8: return cw.getChannel();
9: }
10: }
11: //
12: final List<String> addrList = this.namesrvAddrList.get();
13: if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
14: try {
15: // 傳回已選擇、可連線的Namesrv
16: addr = this.namesrvAddrChoosed.get();
17: if (addr != null) {
18: ChannelWrapper cw = this.channelTables.get(addr);
19: if (cw != null && cw.isOK()) {
20: return cw.getChannel();
21: }
22: }
23: // 從【Namesrv串列】中選擇一個連線的傳回
24: if (addrList != null && !addrList.isEmpty()) {
25: for (int i = 0; i < addrList.size(); i++) {
26: int index = this.namesrvIndex.incrementAndGet();
27: index = Math.abs(index);
28: index = index % addrList.size();
29: String newAddr = addrList.get(index);
30:
31: this.namesrvAddrChoosed.set(newAddr);
32: Channel channelNew = this.createChannel(newAddr);
33: if (channelNew != null)
34: return channelNew;
35: }
36: }
37: } catch (Exception e) {
38: log.error("getAndCreateNameserverChannel: create name server channel exception", e);
39: } finally {
40: this.lockNamesrvChannel.unlock();
41: }
42: } else {
43: log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
44: }
45:
46: return null;
47: }
3. Broker 高可用
啟動多個 Broker分組
形成 叢集
實現高可用。Broker分組
= Master節點
x1 + Slave節點
xN。
類似 MySQL
, Master節點
提供讀寫服務, Slave節點
只提供讀服務。
3.2 Broker 主從
-
每個分組,
Master
節點 不斷傳送新的CommitLog
給Slave
節點。Slave
節點 不斷上報本地的CommitLog
已經同步到的位置給Master
節點。 -
Broker分組
與Broker分組
之間沒有任何關係,不進行通訊與資料同步。 -
消費進度 目前不支援
Master
/Slave
同步。
叢集內, Master
節點 有兩種型別: Master_SYNC
、 Master_ASYNC
:前者在 Producer
傳送訊息時,等待 Slave
節點 儲存完畢後再傳回傳送結果,而後者不需要等待。
3.1.1 配置
目前官方提供三套配置:
-
2m-2s-async
brokerClusterName | brokerName | brokerRole | brokerId |
---|---|---|---|
DefaultCluster | broker-a | ASYNC_MASTER | 0 |
DefaultCluster | broker-a | SLAVE | 1 |
DefaultCluster | broker-b | ASYNC_MASTER | 0 |
DefaultCluster | broker-b | SLAVE | 1 |
|
|||
|
|||
brokerClusterName | brokerName | brokerRole | brokerId |
— | — | — | — |
DefaultCluster | broker-a | SYNC_MASTER | 0 |
DefaultCluster | broker-a | SLAVE | 1 |
DefaultCluster | broker-b | SYNC_MASTER | 0 |
DefaultCluster | broker-b | SLAVE | 1 |
|
|||
|
|||
brokerClusterName | brokerName | brokerRole | brokerId |
— | — | — | — |
DefaultCluster | broker-a | ASYNC_MASTER | 0 |
DefaultCluster | broker-b | ASYNC_MASTER | 0 |
3.1.2 元件
再看具體實現程式碼之前,我們來看看 Master
/ Slave
節點 包含的元件:
-
Master
節點 -
ReadSocketService
:讀來自Slave
節點 的資料。 -
WriteSocketService
:寫到往Slave
節點 的資料。 -
AcceptSocketService
:接收Slave
節點 連線。 -
HAConnection
-
Slave
節點 -
HAClient
:對Master
節點 連線、讀寫資料。
3.1.3 通訊協議
Master
節點 與 Slave
節點 通訊協議很簡單,只有如下兩條。
物件 | 用途 | 第幾位 | 欄位 | 資料型別 | 位元組數 | 說明 |
---|---|---|---|---|---|---|
Slave=>Master | 上報CommitLog已經同步到的物理位置 | |||||
0 | maxPhyOffset | Long | 8 | CommitLog最大物理位置 | ||
Master=>Slave | 傳輸新的 CommitLog 資料 |
|||||
0 | fromPhyOffset | Long | 8 | CommitLog開始物理位置 | ||
1 | size | Int | 4 | 傳輸CommitLog資料長度 | ||
2 | body | Bytes | size | 傳輸CommitLog資料 |
3.1.4 Slave
-
Slave
主迴圈,實現了不斷不斷不斷從Master
傳輸CommitLog
資料,上傳Master
自己本地的CommitLog
已經同步物理位置。
1: // ⬇️⬇️⬇️【HAClient.java】
2: public void run() {
3: log.info(this.getServiceName() + " service started");
4:
5: while (!this.isStopped()) {
6: try {
7: if (this.connectMaster()) {
8: // 若到滿足上報間隔,上報到Master進度
9: if (this.isTimeToReportOffset()) {
10: boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
11: if (!result) {
12: this.closeMaster();
13: }
14: }
15:
16: this.selector.select(1000);
17:
18: // 處理讀取事件
19: boolean ok = this.processReadEvent();
20: if (!ok) {
21: this.closeMaster();
22: }
23:
24: // 若進度有變化,上報到Master進度
25: if (!reportSlaveMaxOffsetPlus()) {
26: continue;
27: }
28:
29: // Master過久未傳回資料,關閉連線
30: long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
31: if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
32: .getHaHousekeepingInterval()) {
33: log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
34: + "] expired, " + interval);
35: this.closeMaster();
36: log.warn("HAClient, master not response some time, so close connection");
37: }
38: } else {
39: this.waitForRunning(1000 * 5);
40: }
41: } catch (Exception e) {
42: log.warn(this.getServiceName() + " service has exception. ", e);
43: this.waitForRunning(1000 * 5);
44: }
45: }
46:
47: log.info(this.getServiceName() + " service end");
48: }
-
第 8 至 14 行 :固定間隔(預設5s)向
Master
上報Slave
本地CommitLog
已經同步到的物理位置。該操作還有心跳的作用。 -
第 16 至 22 行 :處理
Master
傳輸Slave
的CommitLog
資料。
-
我們來看看
#dispatchReadRequest(...)
與#reportSlaveMaxOffset(...)
是怎麼實現的。
1: // 【HAClient.java】
2: /**
3: * 讀取Master傳輸的CommitLog資料,並傳回是異常
4: * 如果讀取到資料,寫入CommitLog
5: * 異常原因:
6: * 1. Master傳輸來的資料offset 不等於 Slave的CommitLog資料最大offset
7: * 2. 上報到Master進度失敗
8: *
9: * @return 是否異常
10: */
11: private boolean dispatchReadRequest() {
12: final int msgHeaderSize = 8 + 4; // phyoffset + size
13: int readSocketPos = this.byteBufferRead.position();
14:
15: while (true) {
16: // 讀取到請求
17: int diff = this.byteBufferRead.position() - this.dispatchPostion;
18: if (diff >= msgHeaderSize) {
19: // 讀取masterPhyOffset、bodySize。使用dispatchPostion的原因是:處理資料“粘包”導致資料讀取不完整。
20: long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
21: int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
22: // 校驗 Master傳輸來的資料offset 是否和 Slave的CommitLog資料最大offset 是否相同。
23: long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
24: if (slavePhyOffset != 0) {
25: if (slavePhyOffset != masterPhyOffset) {
26: log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
27: + slavePhyOffset + " MASTER: " + masterPhyOffset);
28: return false;
29: }
30: }
31: // 讀取到訊息
32: if (diff >= (msgHeaderSize + bodySize)) {
33: // 寫入CommitLog
34: byte[] bodyData = new byte[bodySize];
35: this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
36: this.byteBufferRead.get(bodyData);
37: HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
38: // 設定處理到的位置
39: this.byteBufferRead.position(readSocketPos);
40: this.dispatchPostion += msgHeaderSize + bodySize;
41: // 上報到Master進度
42: if (!reportSlaveMaxOffsetPlus()) {
43: return false;
44: }
45: // 繼續迴圈
46: continue;
47: }
48: }
49:
50: // 空間寫滿,重新分配空間
51: if (!this.byteBufferRead.hasRemaining()) {
52: this.reallocateByteBuffer();
53: }
54:
55: break;
56: }
57:
58: return true;
59: }
60:
61: /**
62: * 上報進度
63: *
64: * @param maxOffset 進度
65: * @return 是否上報成功
66: */
67: private boolean reportSlaveMaxOffset(final long maxOffset) {
68: this.reportOffset.position(0);
69: this.reportOffset.limit(8);
70: this.reportOffset.putLong(maxOffset);
71: this.reportOffset.position(0);
72: this.reportOffset.limit(8);
73:
74: for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
75: try {
76: this.socketChannel.write(this.reportOffset);
77: } catch (IOException e) {
78: log.error(this.getServiceName()
79: + "reportSlaveMaxOffset this.socketChannel.write exception", e);
80: return false;
81: }
82: }
83:
84: return !this.reportOffset.hasRemaining();
85: }
3.1.5 Master
-
ReadSocketService
邏輯同HAClient#processReadEvent(...)
基本相同,我們直接看程式碼。
1: // ⬇️⬇️⬇️【ReadSocketService.java】
2: private boolean processReadEvent() {
3: int readSizeZeroTimes = 0;
4:
5: // 清空byteBufferRead
6: if (!this.byteBufferRead.hasRemaining()) {
7: this.byteBufferRead.flip();
8: this.processPostion = 0;
9: }
10:
11: while (this.byteBufferRead.hasRemaining()) {
12: try {
13: int readSize = this.socketChannel.read(this.byteBufferRead);
14: if (readSize > 0) {
15: readSizeZeroTimes = 0;
16:
17: // 設定最後讀取時間
18: this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
19:
20: if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
21: // 讀取Slave 請求來的CommitLog的最大位置
22: int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
23: long readOffset = this.byteBufferRead.getLong(pos - 8);
24: this.processPostion = pos;
25:
26: // 設定Slave CommitLog的最大位置
27: HAConnection.this.slaveAckOffset = readOffset;
28:
29: // 設定Slave 第一次請求的位置
30: if (HAConnection.this.slaveRequestOffset < 0) {
31: HAConnection.this.slaveRequestOffset = readOffset;
32: log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
33: }
34:
35: // 通知目前Slave進度。主要用於Master節點為同步型別的。
36: HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
37: }
38: } else if (readSize == 0) {
39: if (++readSizeZeroTimes >= 3) {
40: break;
41: }
42: } else {
43: log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
44: return false;
45: }
46: } catch (IOException e) {
47: log.error("processReadEvent exception", e);
48: return false;
49: }
50: }
51:
52: return true;
53: }
-
WriteSocketService
計算Slave
開始同步的位置後,不斷向Slave
傳輸新的CommitLog
資料。
1: // ⬇️⬇️⬇️【WriteSocketService.java】
2: @Override
3: public void run() {
4: HAConnection.log.info(this.getServiceName() + " service started");
5:
6: while (!this.isStopped()) {
7: try {
8: this.selector.select(1000);
9:
10: // 未獲得Slave讀取進度請求,sleep等待。
11: if (-1 == HAConnection.this.slaveRequestOffset) {
12: Thread.sleep(10);
13: continue;
14: }
15:
16: // 計算初始化nextTransferFromWhere
17: if (-1 == this.nextTransferFromWhere) {
18: if (0 == HAConnection.this.slaveRequestOffset) {
19: long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
20: masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMapedFileSizeCommitLog());
21: if (masterOffset < 0) {
22: masterOffset = 0;
23: }
24:
25: this.nextTransferFromWhere = masterOffset;
26: } else {
27: this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
28: }
29:
30: log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
31: + "], and slave request " + HAConnection.this.slaveRequestOffset);
32: }
33:
34: if (this.lastWriteOver) {
35: long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
36: if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) { // 心跳
37:
38: // Build Header
39: this.byteBufferHeader.position(0);
40: this.byteBufferHeader.limit(essay-headerSize);
41: this.byteBufferHeader.putLong(this.nextTransferFromWhere);
42: this.byteBufferHeader.putInt(0);
43: this.byteBufferHeader.flip();
44:
45: this.lastWriteOver = this.transferData();
46: if (!this.lastWriteOver)
47: continue;
48: }
49: } else { // 未傳輸完成,繼續傳輸
50: this.lastWriteOver = this.transferData();
51: if (!this.lastWriteOver)
52: continue;
53: }
54:
55: // 選擇新的CommitLog資料進行傳輸
56: SelectMappedBufferResult selectResult =
57: HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
58: if (selectResult != null) {
59: int size = selectResult.getSize();
60: if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
61: size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
62: }
63:
64: long thisOffset = this.nextTransferFromWhere;
65: this.nextTransferFromWhere += size;
66:
67: selectResult.getByteBuffer().limit(size);
68: this.selectMappedBufferResult = selectResult;
69:
70: // Build Header
71: this.byteBufferHeader.position(0);
72: this.byteBufferHeader.limit(essay-headerSize);
73: this.byteBufferHeader.putLong(thisOffset);
74: this.byteBufferHeader.putInt(size);
75: this.byteBufferHeader.flip();
76:
77: this.lastWriteOver = this.transferData();
78: } else { // 沒新的訊息,掛起等待
79: HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
80: }
81: } catch (Exception e) {
82:
83: HAConnection.log.error(this.getServiceName() + " service has exception.", e);
84: break;
85: }
86: }
87:
88: // 斷開連線 & 暫停寫執行緒 & 暫停讀執行緒 & 釋放CommitLog
89: if (this.selectMappedBufferResult != null) {
90: this.selectMappedBufferResult.release();
91: }
92:
93: this.makeStop();
94:
95: readSocketService.makeStop();
96:
97: haService.removeConnection(HAConnection.this);
98:
99: SelectionKey sk = this.socketChannel.keyFor(this.selector);
100: if (sk != null) {
101: sk.cancel();
102: }
103:
104: try {
105: this.selector.close();
106: this.socketChannel.close();
107: } catch (IOException e) {
108: HAConnection.log.error("", e);
109: }
110:
111: HAConnection.log.info(this.getServiceName() + " service end");
112: }
113:
114: /**
115: * 傳輸資料
116: */
117: private boolean transferData() throws Exception {
118: int writeSizeZeroTimes = 0;
119: // Write Header
120: while (this.byteBufferHeader.hasRemaining()) {
121: int writeSize = this.socketChannel.write(this.byteBufferHeader);
122: if (writeSize > 0) {
123: writeSizeZeroTimes = 0;
124: this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
125: } else if (writeSize == 0) {
126: if (++writeSizeZeroTimes >= 3) {
127: break;
128: }
129: } else {
130: throw new Exception("ha master write essay-header error < 0");
131: }
132: }
133:
134: if (null == this.selectMappedBufferResult) {
135: return !this.byteBufferHeader.hasRemaining();
136: }
137:
138: writeSizeZeroTimes = 0;
139:
140: // Write Body
141: if (!this.byteBufferHeader.hasRemaining()) {
142: while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
143: int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
144: if (writeSize > 0) {
145: writeSizeZeroTimes = 0;
146: this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
147: } else if (writeSize == 0) {
148: if (++writeSizeZeroTimes >= 3) {
149: break;
150: }
151: } else {
152: throw new Exception("ha master write body error < 0");
153: }
154: }
155: }
156:
157: boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
158:
159: if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
160: this.selectMappedBufferResult.release();
161: this.selectMappedBufferResult = null;
162: }
163:
164: return result;
165: }
3.1.6 Master_SYNC
-
Producer
傳送訊息時,Master_SYNC
節點 會等待Slave
節點 儲存完畢後再傳回傳送結果。
核心程式碼如下:
1: // ⬇️⬇️⬇️【CommitLog.java】
2: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
3: // ....省略處理髮送程式碼
4: // Synchronous write double 如果是同步Master,同步到從節點
5: if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
6: HAService service = this.defaultMessageStore.getHaService();
7: if (msg.isWaitStoreMsgOK()) {
8: // Determine whether to wait
9: if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
10: if (null == request) {
11: request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
12: }
13: service.putRequest(request);
14:
15: // 喚醒WriteSocketService
16: service.getWaitNotifyObject().wakeupAll();
17:
18: boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
19: if (!flushOK) {
20: log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
21: + msg.getTags() + " client address: " + msg.getBornHostString());
22: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
23: }
24: }
25: // Slave problem
26: else {
27: // Tell the producer, slave not available
28: putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
29: }
30: }
31: }
32:
33: return putMessageResult;
34: }
-
第 16 行 :喚醒
WriteSocketService
。 -
喚醒後,
WriteSocketService
掛起等待新訊息結束,Master
傳輸Slave
新的CommitLog
資料。 -
Slave
收到資料後,立即上報最新的CommitLog
同步進度到Master
。ReadSocketService
喚醒第 18 行:request#waitForFlush(...)
。
我們來看下 GroupTransferService
的核心邏輯程式碼:
1: // ⬇️⬇️⬇️【GroupTransferService.java】
2: private void doWaitTransfer() {
3: synchronized (this.requestsRead) {
4: if (!this.requestsRead.isEmpty()) {
5: for (CommitLog.GroupCommitRequest req : this.requestsRead) {
6: // 等待Slave上傳進度
7: boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
8: for (int i = 0; !transferOK && i < 5; i++) {
9: this.notifyTransferObject.waitForRunning(1000); // 喚醒
10: transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
11: }
12:
13: if (!transferOK) {
14: log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
15: }
16:
17: // 喚醒請求,並設定是否Slave同步成功
18: req.wakeupCustomer(transferOK);
19: }
20:
21: this.requestsRead.clear();
22: }
23: }
24: }
3.2 Producer 傳送訊息
-
Producer
傳送訊息時,會對Broker
叢集 的所有佇列進行選擇。
核心程式碼如下:
1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】
2: private SendResult sendDefaultImpl(//
3: Message msg, //
4: final CommunicationMode communicationMode, //
5: final SendCallback sendCallback, //
6: final long timeout//
7: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
8: // .... 省略:處理【校驗邏輯】
9: // 獲取 Topic路由資訊
10: TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
11: if (topicPublishInfo != null && topicPublishInfo.ok()) {
12: MessageQueue mq = null; // 最後選擇訊息要傳送到的佇列
13: Exception exception = null;
14: SendResult sendResult = null; // 最後一次傳送結果
15: int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次呼叫
16: int times = 0; // 第幾次傳送
17: String[] brokersSent = new String[timesTotal]; // 儲存每次傳送訊息選擇的broker名
18: // 迴圈呼叫傳送訊息,直到成功
19: for (; times < timesTotal; times++) {
20: String lastBrokerName = null == mq ? null : mq.getBrokerName();
21: MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 選擇訊息要傳送到的佇列
22: if (tmpmq != null) {
23: mq = tmpmq;
24: brokersSent[times] = mq.getBrokerName();
25: try {
26: beginTimestampPrev = System.currentTimeMillis();
27: // 呼叫傳送訊息核心方法
28: sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
29: endTimestamp = System.currentTimeMillis();
30: // 更新Broker可用性資訊
31: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
32: // .... 省略:處理【傳送傳回結果】
33: }
34: } catch (e) { // .... 省略:處理【異常】
35:
36: }
37: } else {
38: break;
39: }
40: }
41: // .... 省略:處理【傳送傳回結果】
42: }
43: // .... 省略:處理【找不到訊息路由】
44: }
如下是除錯 #sendDefaultImpl(...)
時 TopicPublishInfo
的結果, Producer
獲得到了 broker-a
, broker-b
兩個 Broker
分組 的訊息佇列:
3.3 Consumer 消費訊息
-
Consumer
消費訊息時,會對Broker
叢集 的所有佇列進行選擇。