(點選上方公眾號,可快速關註)
來源:Valleylord ,
valleylord.github.io/post/201607-mq-rocketmq/
RocketMQ 的 Java API
RocketMQ 是用 Java 語言開發的,因此,其 Java API 相對是比較豐富的,當然也有部分原因是 RocketMQ 本身提供的功能就比較多。RocketMQ API 提供的功能包括,
-
廣播消費,這個在之前已經提到過;
-
訊息過濾,支援簡單的 Message Tag 過濾,也支援按 Message Header、body 過濾;
-
順序消費和亂序消費,之前也提到過,這裡的順序消費應該指的是普通順序性,這一點與 Kafka 相同;
-
Pull 樣式消費,這個是相對 Push 樣式來說的,Kafka 就是 Pull 樣式消費;
-
事務訊息,這個好像沒有開源,但是 example 程式碼中有示例,總之,不推薦用;
-
Tag,RocketMQ 在 Topic 下麵又分了一層 Tag,用於表示訊息類別,可以用來過濾,但是順序性還是以 Topic 來看;
單看功能的話,即使不算事務訊息,也不算 Tag,RocketMQ 也遠超 Kafka,Kafka 應該只實現了 Pull 樣式消費 + 順序消費這2個功能。RocketMQ 的程式碼示例在 rocketmq-example 中,註意,程式碼是不能直接執行的,因為所有的程式碼都少了設定 name server 的部分,需要自己手動加上,例如,producer.setNamesrvAddr(“192.168.232.23:9876”);。
先來看一下生產者的 API,比較簡單,只有一種,如下,
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import java.util.List;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”);
producer.setNamesrvAddr(“192.168.232.23:9876”);
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message(“TopicTest1”,// topic
“TagA”,// tag
“OrderID188”,// key
(“RocketMQ “+String.format(“%05d”, i)).getBytes());// body
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List
mqs, Message msg, Object arg) { Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, i));
System.out.println(String.format(“%05d”, i)+sendResult);
}
}
catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
可以發現,相比 Kafka 的 API,只多了 Tag,但實際上行為有很大不同。Kafka 的生產者客戶端,有同步和非同步兩種樣式,但都是阻塞樣式,send 方法傳回傳送狀態的 Future,可以透過 Future 的 get 方法阻塞獲得傳送狀態。而 RocketMQ 採用的是同步非阻塞樣式,傳送之後立刻傳回傳送狀態(而不是 Future)。正常情況下,兩者使用上差別不大,但是在高可用場景中發生主備切換的時候,Kafka 的同步可以等待切換完成並重連,最後傳回;而 RocketMQ 只能立刻報錯,由生產者選擇是否重發。所以,在生產者的 API 上,其實 Kafka 是要強一些的。
另外,RocketMQ 可以透過指定 MessageQueueSelector 類的實現來指定將訊息傳送到哪個分割槽去,Kafka 是透過指定生產者的 partitioner.class 引數來實現的,靈活性上 RocketMQ 略勝一籌。
再來看消費者的API,由於 RocketMQ 的功能比較多,我們先看 Pull 樣式消費的API,如下,
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
public class PullConsumer {
private static final Map
offseTable = new HashMap (); public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(“please_rename_unique_group_name_5”);
consumer.setNamesrvAddr(“192.168.232.23:9876”);
consumer.start();
Set
mqs = consumer.fetchSubscribeMessageQueues(“TopicTest1”); for (MessageQueue mq : mqs) {
System.out.println(“Consume from the queue: ” + mq);
SINGLE_MQ: while (true) {
try {
long offset = consumer.fetchConsumeOffset(mq, true);
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
if (null != pullResult.getMsgFoundList()) {
for (MessageExt messageExt : pullResult.getMsgFoundList()) {
System.out.print(new String(messageExt.getBody()));
System.out.print(pullResult);
System.out.println(messageExt);
}
}
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
// TODO
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
return offset;
return 0;
}
}
這部分的 API 其實是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分割槽,而 Kafka 可以自動管理(當然也可以手動管理),並且不需要指定分割槽(分割槽是在 Kafka 訂閱的時候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用 OffsetStore 介面,提供了兩種管理方式,本地檔案和遠端 Broker。這部分感覺兩者差不多。
下麵再看看 Push 樣式順序消費,程式碼如下,
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“please_rename_unique_group_name_3”);
consumer.setNamesrvAddr(“192.168.232.23:9876”);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(“TopicTest1”, “TagA || TagC || TagD”);
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List
msgs, ConsumeOrderlyContext context) { context.setAutoCommit(false);
System.out.println(Thread.currentThread().getName() + ” Receive New Messages: ” + msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
}
else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
}
else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
}
else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println(“Consumer Started.”);
}
}
雖然提供了 Push 樣式,RocketMQ 內部實際上還是 Pull 樣式的 MQ,Push 樣式的實現應該採用的是長輪詢,這點與 Kafka 一樣。使用該方式有幾個註意的地方,
-
接收訊息的監聽類要使用 MessageListenerOrderly;
-
ConsumeFromWhere 有幾個引數,表示從頭開始消費,從尾開始消費,還是從某個 TimeStamp 開始消費;
-
可以控制 offset 的提交,應該就是 context.setAutoCommit(false); 的作用;
控制 offset 提交這個特性非常有用,某種程度上擴充套件一下,就可以當做事務來用了,看程式碼 ConsumeMessageOrderlyService 的實現,其實並沒有那麼複雜,在不啟用 AutoCommit 的時候,只有傳回 COMMIT 才 commit offset;啟用 AutoCommit 的時候,傳回 COMMIT、ROLLBACK(這個比較扯)、SUCCESS 的時候,都 commit offset。
後來發現,commit offset 功能在 Kafka 裡面也有提供,使用新的 API,呼叫 consumer.commitSync。
再看一個 Push 樣式亂序消費 + 訊息過濾的例子,消費者的程式碼如下,
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“ConsumerGroupNamecc4”);
consumer.setNamesrvAddr(“192.168.232.23:9876”);
consumer.subscribe(“TopicTest1”, MessageFilterImpl.class.getCanonicalName());
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + ” Receive New Messages: ” + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println(“Consumer Started.”);
}
}
這個例子與之前順序消費不同的地方在於,
-
接收訊息的監聽類使用的是 MessageListenerConcurrently;
-
回呼方法中,使用的是自動 offset commit;
-
訂閱的時候增加了訊息過濾類 MessageFilterImpl;
訊息過濾類 MessageFilterImpl 的程式碼如下,
import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;
public class MessageFilterImpl implements MessageFilter {
@Override
public boolean match(MessageExt msg) {
String property = msg.getUserProperty(“SequenceId”);
if (property != null) {
int id = Integer.parseInt(property);
if ((id % 3) == 0 && (id > 10)) {
return true;
}
}
return false;
}
}
RocketMQ 執行過濾是在 Broker 端,Broker 所在的機器會啟動多個 FilterServer 過濾行程;Consumer 啟動後,會向 FilterServer 上傳一個過濾的 Java 類;Consumer 從 FilterServer 拉訊息,FilterServer 將請求轉發給 Broker,FilterServer 從 Broker 收到訊息後,按照 Consumer 上傳的 Java 過濾程式做過濾,過濾完成後傳回給 Consumer。這種過濾方法可以節省網路流量,但是增加了 Broker 的負擔。可惜我沒有實驗出來使用過濾的效果,即使是用 github wiki 上的例子8也沒成功,不糾結了。RocketMQ 的按 Tag 過濾的功能也是在 Broker 上做的過濾,能用,是個很方便的功能。
還有一種廣播消費樣式,比較簡單,可以去看程式碼,不再列出。
總之,RocketMQ 提供的功能比較多,比 Kafka 多很多易用的 API。
【關於投稿】
如果大家有原創好文投稿,請直接給公號傳送留言。
① 留言格式:
【投稿】+《 文章標題》+ 文章連結
② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/
③ 最後請附上您的個人簡介哈~
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能