前面一篇文章分析了broker的啟動過程,瀏覽了broker的基本功能。接下來的幾篇文章,準備按照十分鐘入門RocketMQ一文中提到的一系列特性,依次進行學習。這篇文章準備分析RocketMQ作為MQ的最基本的功能:訊息的釋出(publish)和訂閱(subscribe)。首先,我參考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和監控這篇文章完成了一個簡單的例子。
一、RocketMQ訊息模型
在部署RocketMQ的時候,先啟動name server,再啟動broker,這時候broker會將自己註冊到name server。應用程式中的producer啟動的時候,首先連線一臺name server,獲取broker的地址串列;然後再和broker建立連線,接下來就可以傳送訊息了。其中:一個producer只與一個name server連線,一個producer會跟所有broker建立連線,每個連線都會有心跳檢測機制。
producer會輪詢向指定topic的mq集合傳送訊息。
consumer有兩種消費樣式:叢集消費和廣播消費。叢集消費:多個consumer平均消費該topic下所有mq的訊息,即某個訊息在某個message queue中被一個consumer消費後,其他消費者就不會消費到它;廣播消費:所有consumer可以消費到發到這個topic下的所有訊息。
consumer有兩種獲取訊息的樣式:推樣式和拉樣式,在RocketMQ中,從技術實現角度看,推樣式也是在拉樣式上做了一層封裝。
二、訊息傳送
生產者Demo
首先給出程式碼,
package com.javadu.chapter8rocketmq.message;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import javax.annotation.PostConstruct;
/**
* 作用: 同步傳送訊息
* User: duqi
* Date: 2018/3/29
* Time: 13:52
*/
@Component
public class ProducerDemo {
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQProducer() {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
try {
defaultMQProducer.start();
Message message = new Message("TopicTest", "TagA",
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
for (int i = 0; i < 100; i++) {
SendResult sendResult = defaultMQProducer.send(message);
System.out.println("傳送訊息結果, msgId:" + sendResult.getMsgId() +
", 傳送狀態:" + sendResult.getSendStatus());
}
} catch (MQClientException | UnsupportedEncodingException | InterruptedException
| RemotingException | MQBrokerException e) {
e.printStackTrace();
} finally {
defaultMQProducer.shutdown();
}
}
}
生產者中有兩個屬性:
-
name server的地址,用於獲得broker的相關資訊
-
生產者集合producerGroup,在同一個producer group中有不同的producer實體,如果最早一個producer奔潰,則broker會通知該組內的其他producer實體進行事務提交或回滾。
RocketMQ中的訊息,使用Message表示,程式碼定義如下:
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
public Message() {
}
//省略了getter和setter方法
}
-
topic:該訊息將要往哪個topic發
-
flag:可以用作訊息過濾
-
properties:暫時沒理解【TODO】
-
body:訊息內容
每個訊息傳送完後,會得到一個SendResult物件,看下該物件的結構:
public class SendResult {
//傳送狀態
private SendStatus sendStatus;
//訊息ID,用於訊息去重、訊息跟蹤
private String msgId;
private MessageQueue messageQueue;
private long queueOffset;
//事務ID
private String transactionId;
private String offsetMsgId;
private String regionId;
//是否需要跟蹤
private boolean traceOn = true;
public SendResult() {
}
//省略了建構式、getter和setter等一系列方法
}
在這個demo中,我們是將訊息內容和訊息狀態一併列印到控制檯。
訊息傳送原始碼分析
在RocketMQ中的client模組的包結構如下,可以看出,作者並沒有將介面的定義和實現放在一個包下(這在我們的業務應用中是常見的做法,不一定合理)。producer和consumer包下分別定義了生產者和消費者的介面,將具體的實現放在impl包中。
首先關註producer包裡的內容,幾個主要的類如下:DefaultMQProducer是生產者的預設實現、MQAdmin用於定義一些管理介面、MQProducer用於定義一些生產者特有的介面。
在ProducerDemo中,透過`defaultMQProducer.start();啟動生產者,接下來看下start()方法的過程:
-
根據服務狀態決定接下來的動作
-
對於CREATE_JUST狀態
-
設定服務狀態
-
檢查配置
-
獲取或建立MQClientInstance實體
-
將生產者註冊到指定的producerGroup,即producerTable這個資料結構中,是一個map
-
填充topicPublishInfoTable資料結構
-
啟動生產者
-
對於RUNNING、STARTFAILED和SHUTDOWNALREADY,丟擲異常
public void start(final boolean startFactory) throws MQClientException {
//根據當前的服務狀態決定接下來的動作
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//建立一個客戶端工廠
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//將生產者註冊到指定producer group
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//填充topicPublishInfoTable
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//給該producer連線的所有broker傳送心跳訊息
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
順著 mQClientFactory.start()
往下跟,可以進一步瞭解生產者的細節,主要步驟有:
-
建立請求響應通道
-
啟動各種定時任務,例如:每隔2分鐘向name server拉取一次broker叢集的地址,這意味著如果某個broker宕機了,生產者在這兩分鐘之內的訊息是投遞失敗的;定期從name server拉取topic等路由資訊;定期清理失效的broker以及向broker傳送心跳訊息等。
-
啟動拉服務、負載均衡服務、推服務等服務,這三個服務跟消費者有關。這裡設計上不太明瞭,將消費者和生產者的啟動邏輯放在一起了。看pullMessageService和rebalanceService和初始化,它們是根據MQClientInstance初始化的,而MQClientInstance又是根據ClientConfig來配置的。
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
生產者啟動後,接下來看下訊息的傳送過程,如下圖所示,DefaultMQProducer提供了很多傳送訊息的方法,可以實現同步發訊息、非同步發訊息、指定訊息佇列、OneWay訊息、事務訊息等。
這裡我們只看最簡單的 send(Messagemessage)
方法,最終在DefaultMQProducerImpl中實現:
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//確認生產者狀態正常
this.makeSureStateOK();
//檢查訊息的合法性
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
//獲取訊息的目的地:Topic資訊
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
//計算出訊息的投遞次數,如果是同步投遞,則是1+重試次數,如果不是同步投遞,則只需要投遞一次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
//一個broker叢集有不同的broker節點,lastBrokerName記錄了上次投遞的broker節點,每個broker節點
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//選擇一個要傳送的訊息佇列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
//投遞訊息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
//根據訊息傳送樣式,對訊息傳送結果做不同的處理
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
傳送訊息的主要過程如下:
-
首先檢查生產者和訊息的合法性
-
然後獲取訊息傳送的資訊,該資訊存放在TopicPublishInfo物件中:
public class TopicPublishInfo {
//是否順序訊息
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
//維護該topic下用於的訊息佇列串列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
//計算下一次該投遞的佇列,這裡應用ThreadLocal,即使是同一臺機器中,每個producer實體都有自己的佇列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
//省略了getter和setter方法
//選擇指定lastBrokerName上的下一個mq
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
//選擇當前broker節點的下一個mq
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
}
-
選擇要傳送給該topic下的那個MessageQueue,選擇的邏輯分兩種情況:(1)預設情況,在上次投遞的broker節點上,輪詢到下一個message queue來傳送;(2)sendLatencyFaultEnable這個值設定為true的時候,這塊沒太看懂。
-
投遞訊息
-
根據訊息佇列執行樣式,針對投遞結果做不同的處理。
二、訊息消費
消費者Demo
消費者裡有個屬性需要看下:
-
consumerGroup:位於同一個consumerGroup中的consumer實體和producerGroup中的各個produer實體承擔的角色類似;consumerGroup中的實體還可以實現負載均衡和容災。PS:處於同一個consumerGroup裡的consumer實體一定是訂閱了同一個topic。
-
nameServer的地址:name server地址,用於獲取broker、topic資訊
消費者Demo裡做了以下幾個事情:
-
設定配置屬性
-
設定訂閱的topic,可以指定tag
-
設定第一次啟動的時候,從message queue的哪裡開始消費
-
設定訊息處理器
-
啟動消費者
package com.javadu.chapter8rocketmq.message;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 作用:
* User: duqi
* Date: 2018/3/29
* Time: 14:00
*/
@Component
public class ConsumerDemo {
/**
* 消費者的組名
*/
@Value("${apache.rocketmq.consumer.consumerGroup}")
private String consumerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() {
//消費者的組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//指定NameServer地址,多個地址以 ; 隔開
consumer.setNamesrvAddr(namesrvAddr);
try {
//訂閱PushTopic下Tag為push的訊息
consumer.subscribe("TopicTest", "TagA");
//設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費
//如果非第一次啟動,那麼按照上次消費的位置繼續消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
//輸出訊息內容
System.out.println("messageExt: " + messageExt);
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
//輸出訊息內容
System.out.println("消費響應:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);
}
} catch (Exception e) {
e.printStackTrace();
//稍後再試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消費者原始碼分析
前面分析過了,RocketMQ中的client模組統一提供了生產者和消費者客戶端,這塊我們看下消費者的幾個主要的類。前面提到過,RocketMQ實際上都是拉樣式,這裡的DefaultMQPushConsumer實現了推樣式,也只是對拉訊息服務做了一層封裝,即拉到訊息的時候觸發業務消費者註冊到這裡的callback,而具體拉訊息的服務是由PullMessageService實現的,這個細節後續再研究。
在ConsumerDemo中,設定好配置資訊後,會進行topic訂閱,呼叫了DefaultMQPushConsumer的subscribe方法,原始碼如下:
/**
* Subscribe a topic to consuming subscription.
*
* @param topic topic to subscribe.
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
* if null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
}
第一個引數是topic資訊,第二個引數用於用於訊息過濾tag欄位。真正的訂閱發生在DefaultMQPushConsumerImpl中,程式碼如下:
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
//構建包含訂閱資訊的物件,並放入負載平衡元件維護的map中,以topic為key
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
//如果已經跟broker叢集建立連線,則給所有的broker節點傳送心跳訊息
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
在ConsumerDemo中,接下里會設定消費者首次啟動時消費訊息的起始位置,這涉及到DefaultMQPushConsumer中的一個屬性——consumeFromWhere,這個值有三個可能的值
-
CONSUMEFROMLAST_OFFSET,預設值,表示從上次停止時的地方開始消費
-
CONSUMEFROMFIRST_OFFSET,從佇列的頭部開始消費
-
CONSUMEFROMTIMESTAMP,從指定的時間點開始消費
ConsumerDemo接下來會註冊一個callback,當訊息到達的時候就處理訊息(最新的訊息監聽者支援併發消費):
/**
* Register a callback to execute on message arrival for concurrent consuming.
*
* @param messageListener message handling callback.
*/
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
最後,我們看下ConsumerDemo的啟動過程,即DefaultMQPushConsumerImpl的start方法,主要做了下麵幾件事:
-
檢查配置
-
將訂閱資訊複製到負載均衡元件(rebalanceImpl)中;
-
負載均衡元件的幾個屬性的設定
-
處理不同訊息樣式(叢集樣式或廣播樣式)的配置
-
處理順序消費和併發消費的不同配置
-
將消費者資訊和consumer group註冊到MQ客戶端實體的consumerTable中
-
啟動消費者客戶端
參考資料
-
分散式開放訊息系統(RocketMQ)的原理與實踐
-
買好車提供的rocketmq-spring-boot-starter
-
Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和監控