摘要: 原創出處 http://www.iocoder.cn/RocketMQ/message-schedule-and-retry/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!
排版又崩了,請【閱讀原文】。
本文主要基於 RocketMQ 4.0.x 正式版
-
1. 概述
-
2. 定時訊息
-
2.1 延遲級別
-
2.2 Producer 傳送定時訊息
-
2.3 Broker 儲存定時訊息
-
2.4 Broker 傳送定時訊息
-
2.5 Broker 持久化定時傳送進度
-
3. 訊息重試
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
1. 概述
建議前置閱讀內容:
-
《RocketMQ 原始碼分析 —— Message 傳送與接收》
-
《RocketMQ 原始碼分析 —— Message 拉取與消費(下)》
? 為什麼把定時訊息與訊息重試放在一起?你猜。
? 你猜我猜不猜。
2. 定時訊息
定時訊息是指訊息發到 Broker 後,不能立刻被 Consumer 消費,要到特定的時間點或者等待特定的時間後才能被消費。
下圖是定時訊息的處理邏輯圖:
2.1 延遲級別
RocketMQ
目前只支援固定精度的定時訊息。官方說法如下:
如果要支援任意的時間精度,在 Broker 層面,必須要做訊息排序,如果再涉及到持久化,那麼訊息排序要不可避免的產生巨大效能開銷。
-
延遲級別:
延遲級別 | 時間 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
|
|
|
|
|
|
2: /** | |
3: * 訊息延遲級別字串配置 | |
4: */ | |
5: private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”; | |
6: | |
7: // ⬇️⬇️⬇️【ScheduleMessageService.java】 | |
8: /** | |
9: * 解析延遲級別 | |
10: * | |
11: * @return 是否解析成功 | |
12: */ | |
13: public boolean parseDelayLevel() { | |
14: HashMap |
|
15: timeUnitTable.put(“s”, 1000L); | |
16: timeUnitTable.put(“m”, 1000L * 60); | |
17: timeUnitTable.put(“h”, 1000L * 60 * 60); | |
18: timeUnitTable.put(“d”, 1000L * 60 * 60 * 24); | |
19: | |
20: String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); | |
21: try { | |
22: String[] levelArray = levelString.split(” “); | |
23: for (int i = 0; i < levelArray.length; i++) { | |
24: String value = levelArray[i]; | |
25: String ch = value.substring(value.length() – 1); | |
26: Long tu = timeUnitTable.get(ch); | |
27: | |
28: int level = i + 1; | |
29: if (level > this.maxDelayLevel) { | |
30: this.maxDelayLevel = level; | |
31: } | |
32: long num = Long.parseLong(value.substring(0, value.length() – 1)); | |
33: long delayTimeMillis = tu * num; | |
34: this.delayLevelTable.put(level, delayTimeMillis); | |
35: } | |
36: } catch (Exception e) { | |
37: log.error(“parseDelayLevel exception”, e); | |
38: log.info(“levelString String = {}”, levelString); | |
39: return false; | |
40: } | |
41: | |
42: return true; | |
43: } | |
2.2 Producer 傳送定時訊息 |
|
|
|
|
|
|
2.3 Broker 儲存定時訊息
-
? 儲存訊息時,延遲訊息進入
Topic
為SCHEDULE_TOPIC_XXXX
。 -
? 延遲級別 與 訊息佇列編號 做固定對映:QueueId = DelayLevel – 1。
核心程式碼如下:
1: // ⬇️⬇️⬇️【CommitLog.java】
2: /**
3: * 新增訊息,傳回訊息結果
4: *
5: * @param msg 訊息
6: * @return 結果
7: */
8: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
9: // ....(省略程式碼)
10:
11: // 定時訊息處理
12: final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
13: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
14: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
15: // Delay Delivery
16: if (msg.getDelayTimeLevel() > 0) {
17: if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
18: msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
19: }
20:
21: // 儲存訊息時,延遲訊息進入 `Topic` 為 `SCHEDULE_TOPIC_XXXX` 。
22: topic = ScheduleMessageService.SCHEDULE_TOPIC;
23:
24: // 延遲級別 與 訊息佇列編號 做固定對映
25: queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
26:
27: // Backup real topic, queueId
28: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
30: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
31:
32: msg.setTopic(topic);
33: msg.setQueueId(queueId);
34: }
35: }
36:
37: // ....(省略程式碼)
38: }
39:
40: // ⬇️⬇️⬇️【ScheduleMessageService.java】
41: /**
42: * 根據 延遲級別 計算 訊息佇列編號
43: * QueueId = DelayLevel - 1
44: *
45: * @param delayLevel 延遲級別
46: * @return 訊息佇列編號
47: */
48: public static int delayLevel2QueueId(final int delayLevel) {
49: return delayLevel - 1;
50: }
-
? 生成
ConsumeQueue
時,每條訊息的tagsCode
使用【訊息計劃消費時間】。這樣,ScheduleMessageService
在輪詢ConsumeQueue
時,可以使用tagsCode
進行過濾。
核心程式碼如下:
1: // ⬇️⬇️⬇️【CommitLog.java】
2: /**
3: * check the message and returns the message size
4: *
5: * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
6: */
7: public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
8: try {
9: // // ....(省略程式碼)
10:
11: // 17 properties
12: short propertiesLength = byteBuffer.getShort();
13: if (propertiesLength > 0) {
14: // ....(省略程式碼)
15: String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
16: if (tags != null && tags.length() > 0) {
17: tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
18: }
19:
20: // Timing message processing
21: {
22: String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
23: if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
24: int delayLevel = Integer.parseInt(t);
25:
26: if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
27: delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
28: }
29:
30: if (delayLevel > 0) {
31: tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
32: storeTimestamp);
33: }
34: }
35: }
36: }
37:
38: // ....(省略程式碼)
39:
40: return new DispatchRequest(//
41: topic, // 1
42: queueId, // 2
43: physicOffset, // 3
44: totalSize, // 4
45: tagsCode, // 5
46: storeTimestamp, // 6
47: queueOffset, // 7
48: keys, // 8
49: uniqKey, //9
50: sysFlag, // 9
51: preparedTransactionOffset// 10
52: );
53: } catch (Exception e) {
54: }
55:
56: return new DispatchRequest(-1, false /* success */);
57: }
58:
59: // ⬇️⬇️⬇️【ScheduleMessageService.java】
60: /**
61: * 計算 投遞時間【計劃消費時間】
62: *
63: * @param delayLevel 延遲級別
64: * @param storeTimestamp 儲存時間
65: * @return 投遞時間【計劃消費時間】
66: */
67: public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
68: Long time = this.delayLevelTable.get(delayLevel);
69: if (time != null) {
70: return time + storeTimestamp;
71: }
72:
73: return storeTimestamp + 1000;
74: }
2.4 Broker 傳送定時訊息
-
? 對
SCHEDULE_TOPIC_XXXX
每條消費佇列對應單獨一個定時任務進行輪詢,傳送 到達投遞時間【計劃消費時間】 的訊息。
下圖是傳送定時訊息的處理邏輯圖:
實現程式碼如下:
1: /**
2: * ⬇️⬇️⬇️ 傳送(投遞)延遲訊息定時任務
3: */
4: class DeliverDelayedMessageTimerTask extends TimerTask {
5: /**
6: * 延遲級別
7: */
8: private final int delayLevel;
9: /**
10: * 位置
11: */
12: private final long offset;
13:
14: public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
15: this.delayLevel = delayLevel;
16: this.offset = offset;
17: }
18:
19: @Override
20: public void run() {
21: try {
22: this.executeOnTimeup();
23: } catch (Exception e) {
24: // XXX: warn and notify me
25: log.error("ScheduleMessageService, executeOnTimeup exception", e);
26: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
27: this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
28: }
29: }
30:
31: /**
32: * 糾正可投遞時間。
33: * 因為傳送級別對應的傳送間隔可以調整,如果超過當前間隔,則修正成當前配置,避免後面的訊息無法傳送。
34: *
35: * @param now 當前時間
36: * @param deliverTimestamp 投遞時間
37: * @return 糾正結果
38: */
39: private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
40: long result = deliverTimestamp;
41:
42: long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
43: if (deliverTimestamp > maxTimestamp) {
44: result = now;
45: }
46:
47: return result;
48: }
49:
50: public void executeOnTimeup() {
51: ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
52:
53: long failScheduleOffset = offset;
54:
55: if (cq != null) {
56: SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
57: if (bufferCQ != null) {
58: try {
59: long nextOffset = offset;
60: int i = 0;
61: for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
62: long offsetPy = bufferCQ.getByteBuffer().getLong();
63: int sizePy = bufferCQ.getByteBuffer().getInt();
64: long tagsCode = bufferCQ.getByteBuffer().getLong();
65:
66: long now = System.currentTimeMillis();
67: long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
68:
69: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
70:
71: long countdown = deliverTimestamp - now;
72:
73: if (countdown <= 0) { // 訊息到達可傳送時間
74: MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
75: if (msgExt != null) {
76: try {
77: // 傳送訊息
78: MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
79: PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
80: if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 傳送成功
81: continue;
82: } else { // 傳送失敗
83: // XXX: warn and notify me
84: log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());
85:
86: // 安排下一次任務
87: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
88:
89: // 更新進度
90: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
91: return;
92: }
93: } catch (Exception e) {
94: // XXX: warn and notify me
95: log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
96: + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);
97: }
98: }
99: } else {
100: // 安排下一次任務
101: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
102:
103: // 更新進度
104: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
105: return;
106: }
107: } // end of for
108:
109: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
110:
111: // 安排下一次任務
112: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
113:
114: // 更新進度
115: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
116: return;
117: } finally {
118: bufferCQ.release();
119: }
120: } // end of if (bufferCQ != null)
121: else { // 消費佇列已經被刪除部分,跳轉到最小的消費進度
122: long cqMinOffset = cq.getMinOffsetInQueue();
123: if (offset < cqMinOffset) {
124: failScheduleOffset = cqMinOffset;
125: log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
126: + cqMinOffset + ", queueId=" + cq.getQueueId());
127: }
128: }
129: } // end of if (cq != null)
130:
131: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
132: }
133:
134: /**
135: * 設定訊息內容
136: *
137: * @param msgExt 訊息
138: * @return 訊息
139: */
140: private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
141: MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
142: msgInner.setBody(msgExt.getBody());
143: msgInner.setFlag(msgExt.getFlag());
144: MessageAccessor.setProperties(msgInner, msgExt.getProperties());
145:
146: TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
147: long tagsCodeValue =
148: MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
149: msgInner.setTagsCode(tagsCodeValue);
150: msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
151:
152: msgInner.setSysFlag(msgExt.getSysFlag());
153: msgInner.setBornTimestamp(msgExt.getBornTimestamp());
154: msgInner.setBornHost(msgExt.getBornHost());
155: msgInner.setStoreHost(msgExt.getStoreHost());
156: msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
157:
158: msgInner.setWaitStoreMsgOK(false);
159: MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
160:
161: msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
162:
163: String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
164: int queueId = Integer.parseInt(queueIdStr);
165: msgInner.setQueueId(queueId);
166:
167: return msgInner;
168: }
169: }
2.5 Broker 持久化定時傳送進度
-
? 定時訊息傳送進度儲存在檔案(
../config/delayOffset.json
)裡 -
? 每 10s 定時持久化傳送進度。
核心程式碼如下:
1: // ⬇️⬇️⬇️【ScheduleMessageService.java】
2: /**
3: public void start() {
4: // 定時傳送訊息
5: for (Map.Entry
entry : this.delayLevelTable.entrySet()) {
6: Integer level = entry.getKey();
7: Long timeDelay = entry.getValue();
8: Long offset = this.offsetTable.get(level);
9: if (null == offset) {
10: offset = 0L;
11: }
12:
13: if (timeDelay != null) {
14: this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
15: }
16: }
17:
18: // 定時持久化傳送進度
19: this.timer.scheduleAtFixedRate(new TimerTask() {
20:
21: @Override
22: public void run() {
23: try {
24: ScheduleMessageService.this.persist();
25: } catch (Exception e) {
26: log.error("scheduleAtFixedRate flush exception", e);
27: }
28: }
29: }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
30: }
3. 訊息重試
Consumer 消費訊息失敗後,要提供一種重試機制,令訊息再消費一次。
-
?
Consumer
將消費失敗的訊息發回Broker
,進入延遲訊息佇列。即,消費失敗的訊息,不會立即消費。
核心程式碼如下:
1: // ⬇️⬇️⬇️【SendMessageProcessor.java】
2: /**
3: * 消費者發回訊息
4: *
5: * @param ctx ctx
6: * @param request 請求
7: * @return 響應
8: * @throws RemotingCommandException 當遠端呼叫異常
9: */
10: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
11: throws RemotingCommandException {
12: // ....(省略程式碼)
13: // 處理 delayLevel(獨有)。
14: int delayLevel = requestHeader.getDelayLevel();
15: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
16: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
17: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
18: }
19: if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
20: // ....(省略程式碼)
21: } else {
22: if (0 == delayLevel) {
23: delayLevel = 3 + msgExt.getReconsumeTimes();
24: }
25: msgExt.setDelayTimeLevel(delayLevel);
26: }
27:
28: // ....(省略程式碼)
29: return response;
30: }