摘要: 原創出處 http://www.iocoder.cn/RocketMQ/message-send-and-consume-orderly/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!
本文主要基於 RocketMQ 4.0.x 正式版
-
1. 概述
-
2.
Producer
順序傳送 -
3.
Consumer
嚴格順序消費 -
3.1 獲得(鎖定)訊息佇列
-
3.2 移除訊息佇列
-
3.3 消費訊息佇列
-
3.1.1 消費訊息
-
3.1.2 處理消費結果
-
3.13 訊息處理佇列核心方法
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
1. 概述
建議前置閱讀內容:
-
《RocketMQ 原始碼分析 —— Message 傳送與接收》
-
《RocketMQ 原始碼分析 —— Message 拉取與消費(下)》
當然對 Message
傳送與消費已經有一定瞭解的同學,可以選擇跳過。
RocketMQ
提供了兩種順序級別:
-
普通順序訊息 :
Producer
將相關聯的訊息傳送到相同的訊息佇列。 -
完全嚴格順序 :在
普通順序訊息
的基礎上,Consumer
嚴格順序消費。
絕大部分場景下只需要用到普通順序訊息。
例如說:給使用者傳送簡訊訊息 + 傳送推送訊息,將兩條訊息傳送到不同的訊息佇列,若其中一條訊息佇列消費較慢造成堵塞,使用者可能會收到兩條訊息會存在一定的時間差,帶來的體驗會相對較差。當然類似這種場景,即使有一定的時間差,不會產生系統邏輯上BUG。另外, 普通順序訊息
效能能更加好。
那麼什麼時候使用使用完全嚴格順序?如下是來自官方檔案的說明:
目前已知的應用只有資料庫
binlog
同步強依賴嚴格順序訊息,其他應用絕大部分都可以容忍短暫亂序,推薦使用普通的順序訊息
?上程式碼!!!
2. Producer
順序傳送
官方傳送順序訊息的例子:
1: package org.apache.rocketmq.example.ordermessage;
2:
3: import java.io.UnsupportedEncodingException;
4: import java.util.List;
5: import org.apache.rocketmq.client.exception.MQBrokerException;
6: import org.apache.rocketmq.client.exception.MQClientException;
7: import org.apache.rocketmq.client.producer.DefaultMQProducer;
8: import org.apache.rocketmq.client.producer.MQProducer;
9: import org.apache.rocketmq.client.producer.MessageQueueSelector;
10: import org.apache.rocketmq.client.producer.SendResult;
11: import org.apache.rocketmq.common.message.Message;
12: import org.apache.rocketmq.common.message.MessageQueue;
13: import org.apache.rocketmq.remoting.common.RemotingHelper;
14: import org.apache.rocketmq.remoting.exception.RemotingException;
15:
16: public class Producer {
17: public static void main(String[] args) throws UnsupportedEncodingException {
18: try {
19: MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
20: producer.start();
21:
22: String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
23: for (int i = 0; i < 100; i++) {
24: int orderId = i % 10;
25: Message msg =
26: new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
27: ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
28: SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
29: @Override
30: public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
31: Integer id = (Integer) arg;
32: int index = id % mqs.size();
33: return mqs.get(index);
34: }
35: }, orderId);
36:
37: System.out.printf("%s%n", sendResult);
38: }
39:
40: producer.shutdown();
41: } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
42: e.printStackTrace();
43: }
44: }
45: }
-
第 28 至 35 行 :實現了根據
id%mqs.size()
來進行訊息佇列的選擇。當前例子,我們傳遞orderId
作為引數,那麼相同的orderId
能夠進入相同的訊息佇列。
MessageQueueSelector
介面的原始碼:
1: public interface MessageQueueSelector {
2:
3: /**
4: * 選擇訊息佇列
5: *
6: * @param mqs 訊息佇列
7: * @param msg 訊息
8: * @param arg 引數
9: * @return 訊息佇列
10: */
11: MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
12: }
Producer
選擇佇列傳送訊息方法的原始碼:
16: private SendResult sendSelectImpl(//
17: Message msg, //
18: MessageQueueSelector selector, //
19: Object arg, //
20: final CommunicationMode communicationMode, //
21: final SendCallback sendCallback, final long timeout//
22: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
23: this.makeSureStateOK();
24: Validators.checkMessage(msg, this.defaultMQProducer);
25:
26: TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
27: if (topicPublishInfo != null && topicPublishInfo.ok()) {
28: MessageQueue mq = null;
29: try {
30: mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
31: } catch (Throwable e) {
32: throw new MQClientException("select message queue throwed exception.", e);
33: }
34:
35: if (mq != null) {
36: return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
37: } else {
38: throw new MQClientException("select message queue return null.", null);
39: }
40: }
41:
42: throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
43: }
-
第 30 行 :選擇訊息佇列。
-
第 36 行 :傳送訊息。
3. Consumer
嚴格順序消費
Consumer
在嚴格順序消費時,透過 三 把鎖保證嚴格順序消費。
-
Broker
訊息佇列鎖(分散式鎖) : -
叢集樣式下,
Consumer
從Broker
獲得該鎖後,才能進行訊息拉取、消費。 -
廣播樣式下,
Consumer
無需該鎖。 -
Consumer
訊息佇列鎖(本地鎖) :Consumer
獲得該鎖才能操作訊息佇列。 -
Consumer
訊息處理佇列消費鎖(本地鎖) :Consumer
獲得該鎖才能消費訊息佇列。
可能同學有疑問,為什麼有 Consumer
訊息佇列鎖還需要有 Consumer
訊息佇列消費鎖呢??讓我們帶著疑問繼續往下看。
3.1 獲得(鎖定)訊息佇列
叢集樣式下, Consumer
更新屬於自己的訊息佇列時,會向 Broker
鎖定該訊息佇列(廣播樣式下不需要)。如果鎖定失敗,則更新失敗,即該訊息佇列不屬於自己,不能進行消費。核心程式碼如下:
1: // ⬇️⬇️⬇️【RebalanceImpl.java】
2: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
3: // ..... 此處省略部分程式碼
4: // 增加 不在processQueueTable && 存在於mqSet 裡的訊息佇列。
5: List<PullRequest> pullRequestList = new ArrayList<>(); // 拉訊息請求陣列
6: for (MessageQueue mq : mqSet) {
7: if (!this.processQueueTable.containsKey(mq)) {
8: if (isOrder && !this.lock(mq)) { // 順序訊息鎖定訊息佇列
9: log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
10: continue;
11: }
12:
13: this.removeDirtyOffset(mq);
14: ProcessQueue pq = new ProcessQueue();
15: long nextOffset = this.computePullFromWhere(mq);
16: if (nextOffset >= 0) {
17: ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
18: if (pre != null) {
19: log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
20: } else {
21: log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
22: PullRequest pullRequest = new PullRequest();
23: pullRequest.setConsumerGroup(consumerGroup);
24: pullRequest.setNextOffset(nextOffset);
25: pullRequest.setMessageQueue(mq);
26: pullRequest.setProcessQueue(pq);
27: pullRequestList.add(pullRequest);
28: changed = true;
29: }
30: } else {
31: log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
32: }
33: }
34: }
35:
36: // ..... 此處省略部分程式碼
37: }
38:
39: // ⬇️⬇️⬇️【RebalanceImpl.java】
40: /**
41: * 請求Broker獲得指定訊息佇列的分散式鎖
42: *
43: * @param mq 佇列
44: * @return 是否成功
45: */
46: public boolean lock(final MessageQueue mq) {
47: FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
48: if (findBrokerResult != null) {
49: LockBatchRequestBody requestBody = new LockBatchRequestBody();
50: requestBody.setConsumerGroup(this.consumerGroup);
51: requestBody.setClientId(this.mQClientFactory.getClientId());
52: requestBody.getMqSet().add(mq);
53:
54: try {
55: // 請求Broker獲得指定訊息佇列的分散式鎖
56: Set<MessageQueue> lockedMq =
57: this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
58:
59: // 設定訊息處理佇列鎖定成功。鎖定訊息佇列成功,可能本地沒有訊息處理佇列,設定鎖定成功會在lockAll()方法。
60: for (MessageQueue mmqq : lockedMq) {
61: ProcessQueue processQueue = this.processQueueTable.get(mmqq);
62: if (processQueue != null) {
63: processQueue.setLocked(true);
64: processQueue.setLastLockTimestamp(System.currentTimeMillis());
65: }
66: }
67:
68: boolean lockOK = lockedMq.contains(mq);
69: log.info("the message queue lock {}, {} {}",
70: lockOK ? "OK" : "Failed",
71: this.consumerGroup,
72: mq);
73: return lockOK;
74: } catch (Exception e) {
75: log.error("lockBatchMQ exception, " + mq, e);
76: }
77: }
78:
79: return false;
80: }
-
⬆️⬆️⬆️
-
第 8 至 11 行 :順序消費時,鎖定訊息佇列。如果鎖定失敗,新增訊息處理佇列失敗。
Broker
訊息佇列鎖會過期,預設配置 30s。因此, Consumer
需要不斷向 Broker
掃清該鎖過期時間,預設配置 20s 掃清一次。核心程式碼如下:
1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: public void start() {
3: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
4: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
5: @Override
6: public void run() {
7: ConsumeMessageOrderlyService.this.lockMQPeriodically();
8: }
9: }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
10: }
11: }
3.2 移除訊息佇列
叢集樣式下, Consumer
移除自己的訊息佇列時,會向 Broker
解鎖該訊息佇列(廣播樣式下不需要)。核心程式碼如下:
1: // ⬇️⬇️⬇️【RebalancePushImpl.java】
2: /**
3: * 移除不需要的佇列相關的資訊
4: * 1. 持久化消費進度,並移除之
5: * 2. 順序消費&叢集樣式,解鎖對該佇列的鎖定
6: *
7: * @param mq 訊息佇列
8: * @param pq 訊息處理佇列
9: * @return 是否移除成功
10: */
11: @Override
12: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
13: // 同步佇列的消費進度,並移除之。
14: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
15: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
16: // 叢集樣式下,順序消費移除時,解鎖對佇列的鎖定
17: if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
18: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
19: try {
20: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
21: try {
22: return this.unlockDelay(mq, pq);
23: } finally {
24: pq.getLockConsume().unlock();
25: }
26: } else {
27: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
28: mq, //
29: pq.getTryUnlockTimes());
30:
31: pq.incTryUnlockTimes();
32: }
33: } catch (Exception e) {
34: log.error("removeUnnecessaryMessageQueue Exception", e);
35: }
36:
37: return false;
38: }
39: return true;
40: }
41:
42: // ⬇️⬇️⬇️【RebalancePushImpl.java】
43: /**
44: * 延遲解鎖 Broker 訊息佇列鎖
45: * 當訊息處理佇列不存在訊息,則直接解鎖
46: *
47: * @param mq 訊息佇列
48: * @param pq 訊息處理佇列
49: * @return 是否解鎖成功
50: */
51: private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
52: if (pq.hasTempMessage()) { // TODO 疑問:為什麼要延遲移除
53: log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
54: this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
55: @Override
56: public void run() {
57: log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
58: RebalancePushImpl.this.unlock(mq, true);
59: }
60: }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
61: } else {
62: this.unlock(mq, true);
63: }
64: return true;
65: }
-
⬆️⬆️⬆️
-
第 20 至 32 行 :獲取訊息佇列消費鎖,避免和訊息佇列消費衝突。如果獲取鎖失敗,則移除訊息佇列失敗,等待下次重新分配消費佇列時,再進行移除。如果未獲得鎖而進行移除,則可能出現另外的
Consumer
和當前Consumer
同時消費該訊息佇列,導致訊息無法嚴格順序消費。 -
第 51 至 64 行 :解鎖
Broker
訊息佇列鎖。如果訊息處理佇列存在剩餘訊息,則延遲解鎖Broker
訊息佇列鎖。❓為什麼訊息處理佇列存在剩餘訊息不能直接解鎖呢??我也不知道,百思不得其解。如果有知道的同學麻煩教育下俺。
3.3 消費訊息佇列
?本節會類比併發消費消費佇列,建議對照 PushConsumer併發消費訊息 一起理解。
3.1.1 消費訊息
1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: class ConsumeRequest implements Runnable {
3:
4: /**
5: * 訊息處理佇列
6: */
7: private final ProcessQueue processQueue;
8: /**
9: * 訊息佇列
10: */
11: private final MessageQueue messageQueue;
12:
13: @Override
14: public void run() {
15: if (this.processQueue.isDropped()) {
16: log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
17: return;
18: }
19:
20: // 獲得 Consumer 訊息佇列鎖
21: final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
22: synchronized (objLock) {
23: // (廣播樣式) 或者 (叢集樣式 && Broker訊息佇列鎖有效)
24: if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
25: || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
26: final long beginTime = System.currentTimeMillis();
27: // 迴圈
28: for (boolean continueConsume = true; continueConsume; ) {
29: if (this.processQueue.isDropped()) {
30: log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
31: break;
32: }
33:
34: // 訊息佇列分散式鎖未鎖定,提交延遲獲得鎖並消費請求
35: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
36: && !this.processQueue.isLocked()) {
37: log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
38: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
39: break;
40: }
41: // 訊息佇列分散式鎖已經過期,提交延遲獲得鎖並消費請求
42: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
43: && this.processQueue.isLockExpired()) {
44: log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
45: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
46: break;
47: }
48:
49: // 當前週期消費時間超過連續時長,預設:60s,提交延遲消費請求。預設情況下,每消費1分鐘休息10ms。
50: long interval = System.currentTimeMillis() - beginTime;
51: if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
52: ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
53: break;
54: }
55:
56: // 獲取消費訊息。此處和併發訊息請求不同,併發訊息請求已經帶了消費哪些訊息。
57: final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
58: List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
59: if (!msgs.isEmpty()) {
60: final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
61:
62: ConsumeOrderlyStatus status = null;
63:
64: // ....省略程式碼:Hook:before
65:
66: // 執行消費
67: long beginTimestamp = System.currentTimeMillis();
68: ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
69: boolean hasException = false;
70: try {
71: this.processQueue.getLockConsume().lock(); // 鎖定佇列消費鎖
72:
73: if (this.processQueue.isDropped()) {
74: log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
75: this.messageQueue);
76: break;
77: }
78:
79: status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
80: } catch (Throwable e) {
81: log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
82: RemotingHelper.exceptionSimpleDesc(e), //
83: ConsumeMessageOrderlyService.this.consumerGroup, //
84: msgs, //
85: messageQueue);
86: hasException = true;
87: } finally {
88: this.processQueue.getLockConsume().unlock(); // 鎖定佇列消費鎖
89: }
90:
91: // ....省略程式碼:解析消費結果狀態
92:
93: // ....省略程式碼:Hook:after
94:
95: ConsumeMessageOrderlyService.this.getConsumerStatsManager()
96: .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
97:
98: // 處理消費結果
99: continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
100: } else {
101: continueConsume = false;
102: }
103: }
104: } else {
105: if (this.processQueue.isDropped()) {
106: log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
107: return;
108: }
109:
110: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
111: }
112: }
113: }
114:
115: }
-
⬆️⬆️⬆️
-
第 20 行 :獲得
Consumer
訊息佇列鎖。 -
第 58 行 :從訊息處理佇列順序獲得訊息。和併發消費獲得訊息不同。併發消費請求在請求建立時,已經設定好消費哪些訊息。
-
第 71 行 :獲得
Consumer
訊息處理佇列消費鎖。相比【Consumer
訊息佇列鎖】,其粒度較小。這就是上文提到的❓為什麼有Consumer
訊息佇列鎖還需要有 Consumer 訊息佇列消費鎖呢的原因。 -
第 79 行 :執行消費。
-
第 99 行 :處理消費結果。
3.1.2 處理消費結果
順序消費訊息結果 ( ConsumeOrderlyStatus
) 有四種情況:
-
SUCCESS
:消費成功但不提交。 -
ROLLBACK
:消費失敗,消費回滾。 -
COMMIT
:消費成功提交並且提交。 -
SUSPEND_CURRENT_QUEUE_A_MOMENT
:消費失敗,掛起消費佇列一會會,稍後繼續消費。
考慮到 ROLLBACK
、 COMMIT
暫時只使用在 MySQLbinlog
場景,官方將這兩狀態標記為 @Deprecated
。當然,相應的實現邏輯依然保留。
在併發消費場景時,如果消費失敗, Consumer
會將消費失敗訊息發回到 Broker
重試佇列,跳過當前訊息,等待下次拉取該訊息再進行消費。
但是在完全嚴格順序消費消費時,這樣做顯然不行。也因此,消費失敗的訊息,會掛起佇列一會會,稍後繼續消費。
不過消費失敗的訊息一直失敗,也不可能一直消費。當超過消費重試上限時, Consumer
會將消費失敗超過上限的訊息發回到 Broker
死信佇列。
讓我們來看看程式碼:
1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: /**
3: * 處理消費結果,並傳回是否繼續消費
4: *
5: * @param msgs 訊息
6: * @param status 消費結果狀態
7: * @param context 消費Context
8: * @param consumeRequest 消費請求
9: * @return 是否繼續消費
10: */
11: public boolean processConsumeResult(//
12: final List<MessageExt> msgs, //
13: final ConsumeOrderlyStatus status, //
14: final ConsumeOrderlyContext context, //
15: final ConsumeRequest consumeRequest//
16: ) {
17: boolean continueConsume = true;
18: long commitOffset = -1L;
19: if (context.isAutoCommit()) {
20: switch (status) {
21: case COMMIT:
22: case ROLLBACK:
23: log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue());
24: case SUCCESS:
25: // 提交訊息已消費成功到訊息處理佇列
26: commitOffset = consumeRequest.getProcessQueue().commit();
27: // 統計
28: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
29: break;
30: case SUSPEND_CURRENT_QUEUE_A_MOMENT:
31: // 統計
32: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
33: if (checkReconsumeTimes(msgs)) { // 計算是否暫時掛起(暫停)消費N毫秒,預設:10ms
34: // 設定訊息重新消費
35: consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
36: // 提交延遲消費請求
37: this.submitConsumeRequestLater(//
38: consumeRequest.getProcessQueue(), //
39: consumeRequest.getMessageQueue(), //
40: context.getSuspendCurrentQueueTimeMillis());
41: continueConsume = false;
42: } else {
43: commitOffset = consumeRequest.getProcessQueue().commit();
44: }
45: break;
46: default:
47: break;
48: }
49: } else {
50: switch (status) {
51: case SUCCESS:
52: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
53: break;
54: case COMMIT:
55: // 提交訊息已消費成功到訊息處理佇列
56: commitOffset = consumeRequest.getProcessQueue().commit();
57: break;
58: case ROLLBACK:
59: // 設定訊息重新消費
60: consumeRequest.getProcessQueue().rollback();
61: this.submitConsumeRequestLater(//
62: consumeRequest.getProcessQueue(), //
63: consumeRequest.getMessageQueue(), //
64: context.getSuspendCurrentQueueTimeMillis());
65: continueConsume = false;
66: break;
67: case SUSPEND_CURRENT_QUEUE_A_MOMENT: // 計算是否暫時掛起(暫停)消費N毫秒,預設:10ms
68: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
69: if (checkReconsumeTimes(msgs)) {
70: // 設定訊息重新消費
71: consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
72: // 提交延遲消費請求
73: this.submitConsumeRequestLater(//
74: consumeRequest.getProcessQueue(), //
75: consumeRequest.getMessageQueue(), //
76: context.getSuspendCurrentQueueTimeMillis());
77: continueConsume = false;
78: }
79: break;
80: default:
81: break;
82: }
83: }
84:
85: // 訊息處理佇列未dropped,提交有效消費進度
86: if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
87: this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
88: }
89:
90: return continueConsume;
91: }
92:
93: private int getMaxReconsumeTimes() {
94: // default reconsume times: Integer.MAX_VALUE
95: if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
96: return Integer.MAX_VALUE;
97: } else {
98: return this.defaultMQPushConsumer.getMaxReconsumeTimes();
99: }
100: }
101:
102: /**
103: * 計算是否要暫停消費
104: * 不暫停條件:存在訊息都超過最大消費次數並且都發回broker成功
105: *
106: * @param msgs 訊息
107: * @return 是否要暫停
108: */
109: private boolean checkReconsumeTimes(List<MessageExt> msgs) {
110: boolean suspend = false;
111: if (msgs != null && !msgs.isEmpty()) {
112: for (MessageExt msg : msgs) {
113: if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
114: MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
115: if (!sendMessageBack(msg)) { // 發回失敗,中斷
116: suspend = true;
117: msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
118: }
119: } else {
120: suspend = true;
121: msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
122: }
123: }
124: }
125: return suspend;
126: }
127:
128: /**
129: * 發回訊息。
130: * 訊息發回broker後,對應的訊息佇列是死信佇列。
131: *
132: * @param msg 訊息
133: * @return 是否傳送成功
134: */
135: public boolean sendMessageBack(final MessageExt msg) {
136: try {
137: // max reconsume times exceeded then send to dead letter queue.
138: Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
139: String originMsgId = MessageAccessor.getOriginMessageId(msg);
140: MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
141: newMsg.setFlag(msg.getFlag());
142: MessageAccessor.setProperties(newMsg, msg.getProperties());
143: MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
144: MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
145: MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
146: newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
147:
148: this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
149: return true;
150: } catch (Exception e) {
151: log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
152: }
153:
154: return false;
155: }
-
⬆️⬆️⬆️
-
第 21 至 29 行 :消費成功。在自動提交進度(
AutoCommit
)的情況下,COMMIT
、ROLLBACK
、SUCCESS
邏輯已經統一。 -
第 30 至 45 行 :消費失敗。當訊息重試次數超過上限(預設 :16次)時,將訊息傳送到
Broker
死信佇列,跳過這些訊息。此時,訊息佇列無需掛起,繼續消費後面的訊息。 -
第 85 至 88 行 :提交消費進度。
3.13 訊息處理佇列核心方法
?涉及到的四個核心方法的原始碼:
1: // ⬇️⬇️⬇️【ProcessQueue.java】
2: /**
3: * 訊息對映
4: * key:訊息佇列位置
5: */
6: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>(); /**
7: * 訊息對映臨時儲存(消費中的訊息)
8: */
9: private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<>();
10:
11: /**
12: * 回滾消費中的訊息
13: * 邏輯類似於{@link #makeMessageToCosumeAgain(List)}
14: */
15: public void rollback() {
16: try {
17: this.lockTreeMap.writeLock().lockInterruptibly();
18: try {
19: this.msgTreeMap.putAll(this.msgTreeMapTemp);
20: this.msgTreeMapTemp.clear();
21: } finally {
22: this.lockTreeMap.writeLock().unlock();
23: }
24: } catch (InterruptedException e) {
25: log.error("rollback exception", e);
26: }
27: }
28:
29: /**
30: * 提交消費中的訊息已消費成功,傳回消費進度
31: *
32: * @return 消費進度
33: */
34: public long commit() {
35: try {
36: this.lockTreeMap.writeLock().lockInterruptibly();
37: try {
38: // 消費進度
39: Long offset = this.msgTreeMapTemp.lastKey();
40:
41: //
42: msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
43:
44: //
45: this.msgTreeMapTemp.clear();
46:
47: // 傳回消費進度
48: if (offset != null) {
49: return offset + 1;
50: }
51: } finally {
52: this.lockTreeMap.writeLock().unlock();
53: }
54: } catch (InterruptedException e) {
55: log.error("commit exception", e);
56: }
57:
58: return -1;
59: }
60:
61: /**
62: * 指定訊息重新消費
63: * 邏輯類似於{@link #rollback()}
64: *
65: * @param msgs 訊息
66: */
67: public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
68: try {
69: this.lockTreeMap.writeLock().lockInterruptibly();
70: try {
71: for (MessageExt msg : msgs) {
72: this.msgTreeMapTemp.remove(msg.getQueueOffset());
73: this.msgTreeMap.put(msg.getQueueOffset(), msg);
74: }
75: } finally {
76: this.lockTreeMap.writeLock().unlock();
77: }
78: } catch (InterruptedException e) {
79: log.error("makeMessageToCosumeAgain exception", e);
80: }
81: }
82:
83: /**
84: * 獲得持有訊息前N條
85: *
86: * @param batchSize 條數
87: * @return 訊息
88: */
89: public List<MessageExt> takeMessags(final int batchSize) {
90: List<MessageExt> result = new ArrayList<>(batchSize);
91: final long now = System.currentTimeMillis();
92: try {
93: this.lockTreeMap.writeLock().lockInterruptibly();
94: this.lastConsumeTimestamp = now;
95: try {
96: if (!this.msgTreeMap.isEmpty()) {
97: for (int i = 0; i < batchSize; i++) {
98: Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
99: if (entry != null) {
100: result.add(entry.getValue());
101: msgTreeMapTemp.put(entry.getKey(), entry.getValue());
102: } else {
103: break;
104: }
105: }
106: }
107:
108: if (result.isEmpty()) {
109: consuming = false;
110: }
111: } finally {
112: this.lockTreeMap.writeLock().unlock();
113: }
114: } catch (InterruptedException e) {
115: log.error("take Messages exception", e);
116: }
117:
118: return result;
119: }