歡迎光臨
每天分享高質量文章

RocketMQ 原始碼學習 4 : 訊息傳送

(點選上方公眾號,可快速關註)


來源:謝晞鳴 ,

fdx321.github.io/2017/08/21/【RocketMQ原始碼學習】4-訊息傳送/

1. Client端,三種傳送方式

RocketMQ 支援常見的三種傳送方式,

SYNC

producer.send(msg)

同步的傳送方式,會等待傳送結果後才傳回。可以用 send(msg, timeout) 的方式指定等待時間,如果不指定,就是預設的 3000ms. 這個timeout 最終會被設定到 ResponseFuture 裡,再傳送完訊息後,用 countDownLatch 去 await timeout的時間,如果過期,就會丟擲異常。

ASYNC

producer.send(msg, new SendCallback() {

    @Override

    public void onSuccess(SendResult sendResult) {

        System.out.printf(“%-10d OK %s %n”, index, sendResult.getMsgId());

    }

    @Override

    public void onException(Throwable e) {

        System.out.printf(“%-10d Exception %s %n”, index, e);

        e.printStackTrace();

    }

});

非同步的傳送方式,傳送完後,立刻傳回。Client 在拿到 Broker 的響應結果後,會回呼指定的 callback. 這個 API 也可以指定 Timeout,不指定也是預設的 3000ms.

ONEWAY

producer.sendOneway(msg);

比較簡單,發出去後,什麼都不管直接傳回。

對於每種方式,Producer 還提供了可以指定 MessageQueue, MessageQueueSelector的API,這屬於稍微高階一點的玩法,一般用它提供的預設的策略選擇 MessageQueue 就可以了。

2. Client端傳送過程

下麵以 SYNC 方式為例,看下整個訊息的傳送過程,其他方式略有差異,總體流程類似。

1. 根據 Topic 找到指定的 TopicPublishInfo

先去本地 map 找,如果沒有,就去 Namesrv fetch, 如果 Namesrv 裡也沒有,則用預設的 Topic 再去 fetch TopicRouteData. 對用用預設 Topic 的這種情況,Client 拿到資料後,會去構建 TopicPublishInfo, 然後用當前的 Topic 作為 key 放到本地 map 裡。Broker 在接收到訊息的時候,會去更新它本地的配置,然後在 registerBroker 的時候會去更新 namesrv 中的 TopicRouteData 資訊,這樣 Namesrv 中就會有這樣一份配置了。當然,也可以事先在 Namesrv 增加該配置,很多公司內部都有這樣定製的平臺來管理MQ的接入配置。

public class TopicPublishInfo {

    private boolean orderTopic = false;

    private boolean haveTopicRouterInfo = false;

    private List messageQueueList = new ArrayList();

    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

    private TopicRouteData topicRouteData;

}

 

public class TopicRouteData {

    private String orderTopicConf;

    private List queueDatas;

    private List brokerDatas;

    private HashMap/* Filter Server */> filterServerTable;

}

QueueData 定義了這個 read 和 write 的 queue的數量,Client 在拿到 TopicRouteData 後,會根據這裡配的數量去構建響應數目的messageQueue,即 messageQueueList. brokerDatas 儲存了各個 broker 的相關資訊。

2. 從 messageQueueList 中選擇一個 MessageQueue

如果沒有 enable latencyFaultTolerance,就用遞增取模的方式選擇。如果 enable 了,在遞增取模的基礎上,再過濾掉 not available 的。這裡所謂的 latencyFaultTolerance, 是指對之前失敗的,按一定的時間做退避:

long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

舉個例子,如果上次請求的 latency 超過 550L ms, 就退避 3000L ms;超過 1000L,就退避 60000L.

以上就是 Producer 到 Broker 的簡單的負載均衡。

3. 傳送訊息

到這一步,我們已經拿到了這些關鍵資料:

  • Message, 要傳送的訊息

  • MessageQueue,這裡麵包括 topic/brokerName/queueId

  • CommunicationMode, 傳送方式, SYNC/ASYNC/ONEWAY

  • TopicPublishInfo

有了這些資料,就可以構建 RequestHeader 了,大部分欄位意思都很明顯(當然,前提是對RocketMQ的原始碼有所熟悉),個別欄位見註釋。

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());

requestHeader.setTopic(msg.getTopic());

requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());

requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());

requestHeader.setQueueId(mq.getQueueId());

//系統Flag, 用於判斷走什麼邏輯。標識是否壓縮,事務的不同TYPE(prepare/rollback/commit/not transaction) 等

requestHeader.setSysFlag(sysFlag); 

requestHeader.setBornTimestamp(System.currentTimeMillis());

//訊息Flag, 最終會落地

requestHeader.setFlag(msg.getFlag());

requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));

requestHeader.setReconsumeTimes(0);

//TODO,暫不知道這個欄位是幹嘛用的

requestHeader.setUnitMode(this.isUnitMode());

requestHeader.setBatch(msg instanceof MessageBatch);

最後用這些 essay-header 欄位,以及 message body 構建 RemotingCommand,透過 remoting 模組發給 broker.

4. 處理結果

  • 傳送成功:直接傳回傳送結果

  • 傳送失敗:如果 enable retryAnotherBrokerWhenNotStoreOK,就會重試,預設重試兩次(retryTimesWhenSendFailed)。否則直接傳回結果

  • 傳送異常:Producer 對異常做了很好的區分,如果是 Remoting 和 Client 模組的異常,就重試,如果是 Broker 模組的異常,根據不同的 response code 做不同的處理,有的重試,有的丟擲異常,有的傳回結果。

3. Broker端,訊息的處理和落地

如圖,Broker 有很多 Processor 用來處理不同型別的請求,有些 Processor 會共用一個 Processor 執行緒池。對於訊息傳送,Broker 的 remoting 模組在接收到請求後,根據request code,最終會交給 SendMessageProcessor 來處理。SendMessageProcessor 會依次做以下處理:

  • 做一些校驗,包括但不限於

  1. broker 是否可寫

  2. topic 配置是否存在,如果不存在就新建一個(createTopicInSendMessageMethod)

  3. 校驗 queueId 是否超過指定大小

  • 構建 MessageExtBrokerInner

  • 將 MessageExtBrokerInner 交給 Store 處理

  • 處理 Store 傳回的結果,BrokerStatsManager 做一些統計更新,設定 Response 中的一些欄位並傳回。

Store 收到訊息後,會先做一些校驗,然後交給 commitLog 去 put,然後做些統計並傳回。Store 儲存訊息的過程比較複雜,後面會單獨分析。

4. 其他

1. 順序訊息

很多應用並不關註訊息順序,而且訊息沒有順序並不代表訊息內容沒有順序,合理的系統設計可以避免順序問題。MQ 要保證訊息順序必然會損失效能、增加系統實現複雜度。具體的分析可以看 分散式開放訊息系統(RocketMQ)的原理與實踐

http://www.jianshu.com/p/453c6e7ff81c

在 RocketMQ 裡, 在傳送訊息的時候可以自己定義 MessageQueueSelector,對於同一個訂單ID(或其他ID)的不同訊息,可以讓它走同一個 MessageQueue,這樣就可以按順序發給同一個 Broker 了。

2. Batch Message

Producer 的 API 還支援一次發多個訊息。 

List messages = new ArrayList<>();

messages.add(new Message(topic, “Tag”, “OrderID001”, “Hello world 0”.getBytes()));

messages.add(new Message(topic, “Tag”, “OrderID002”, “Hello world 1”.getBytes()));

 

producer.send(messages);

Client 模組會將 Message List 封裝成 MessageBatch,且會標記 requestHeader 的 batch 標誌位為 true. Broker 在接收到訊息後就可以根據這個標誌位去做不同的處理。

5. Reference

  • RocketMQ 原理簡介

    http://alibaba.github.io/RocketMQ-docs/document/design/RocketMQ_design.pdf

  • 分散式開放訊息系統(RocketMQ)的原理與實踐

    http://www.jianshu.com/p/453c6e7ff81c

系列

【關於投稿】


如果大家有原創好文投稿,請直接給公號傳送留言。


① 留言格式:
【投稿】+《 文章標題》+ 文章連結

② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/

③ 最後請附上您的個人簡介哈~



看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂