(點選上方公眾號,可快速關註)
來源:Valleylord ,
valleylord.github.io/post/201607-mq-rocketmq/
RocketMQ 的主備樣式
按之前所說,只有 RocketMQ 的多主多從非同步複製是可以生產使用的,因此只在這個場景下測試。另外,訊息採用 Push 順序樣式消費。
假設叢集採用2主2備的樣式,需要啟動4個 Broker,配置檔案如下,
brokerName=broker-a
brokerId=0
listenPort=10911
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlog
brokerRole=ASYNC_MASTER
brokerName=broker-a
brokerId=1
listenPort=10921
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async-slave
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlog
brokerRole=SLAVE
brokerName=broker-b
brokerId=0
listenPort=20911
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async/commitlog
brokerRole=ASYNC_MASTER
brokerRole=ASYNC_MASTER
brokerName=broker-b
brokerId=1
listenPort=20921
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async-slave
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlog
brokerRole=SLAVE
另外,每個機構共通的配置項如下,
brokerClusterName=DefaultCluster
brokerIP1=192.168.232.23
namesrvAddr=192.168.232.23:9876
deleteWhen=04
fileReservedTime=120
flushDiskType=ASYNC_FLUSH
其他設定均採用預設。啟動 NameServer 和所有 Broker,並試執行一下 Producer,然後看一下 TestTopic1 當前的情況,
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
“brokerDatas”:[
{
“brokerAddrs”:{0:”192.168.232.23:20911″,1:”192.168.232.23:20921″
},
“brokerName”:”broker-b”
},
{
“brokerAddrs”:{0:”192.168.232.23:10911″,1:”192.168.232.23:10921″
},
“brokerName”:”broker-a”
}
],
“filterServerTable”:{},
“queueDatas”:[
{
“brokerName”:”broker-a”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
},
{
“brokerName”:”broker-b”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
}
]
}
可見,TestTopic1 在2個 Broker 上,且每個 Broker 備機也在執行。下麵開始主備切換的實驗,分別啟動 Consumer 和 Producer 行程,訊息採用 Pull 順序樣式消費。在訊息傳送接收過程中,使用 kill -9 停掉 broker-a 的主行程,模擬突然宕機。此時,TestTopic1 的狀態如下,
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
“brokerDatas”:[
{
“brokerAddrs”:{0:”192.168.232.23:20911″,1:”192.168.232.23:20921″
},
“brokerName”:”broker-b”
},
{
“brokerAddrs”:{1:”192.168.232.23:10921″
},
“brokerName”:”broker-a”
}
],
“filterServerTable”:{},
“queueDatas”:[
{
“brokerName”:”broker-a”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
},
{
“brokerName”:”broker-b”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
}
]
}
broker-a 的節點已經減少為只有1個從節點。然後啟動broker-a 的主節點,模擬恢復,再看一下 TestTopic1 的狀態,
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
“brokerDatas”:[
{
“brokerAddrs”:{0:”192.168.232.23:20911″,1:”192.168.232.23:20921″
},
“brokerName”:”broker-b”
},
{
“brokerAddrs”:{0:”192.168.232.23:10911″,1:”192.168.232.23:10921″
},
“brokerName”:”broker-a”
}
],
“filterServerTable”:{},
“queueDatas”:[
{
“brokerName”:”broker-a”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
},
{
“brokerName”:”broker-b”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
}
]
}
此時,RocketMQ 已經恢復。
再來看看 Producer 和 Consumer 的日誌,先看 Producer 的,如下,
……
00578SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F08, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=141]
00579SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F9F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=141]
00580SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078D47, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=700]
00581SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078DDE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=700]
00582SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078E75, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=699]
00583SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078F0C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=699]
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
00588SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078FA3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=701]
00589SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007903A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=701]
00590SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000790D1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=700]
00591SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079168, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=700]
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
00596SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000791FF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=702]
00597SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079296, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=702]
00598SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007932D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=701]
00599SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000793C4, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=701]
00600SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007945B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=703]
00601SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000794F2, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=703]
00602SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079589, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=702]
00603SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079620, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=702]
……
01389SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000965BE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=900]
01390SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096655, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=899]
01391SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000966EC, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=899]
01392SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127036, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=143]
01393SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001270CD, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=141]
01394SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127164, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=142]
01395SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001271FB, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=142]
01396SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096783, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=901]
01397SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000009681A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=901]
日誌中顯示,在傳送完00583條訊息之後,開始發生異常 connect to <192.168.232.23:10911> failed,原因應該是 broker-a 的主節點被 kill 掉。之後,從00596條訊息開始,RocketMQ 又恢復正常,原因是 broker-b 已經開始提供服務,承擔了所有的工作。然後,又重新啟動了 broker-a 主節點,由於該節點的加入,從01392條訊息開始,broker-a 又開始恢復工作。實驗中可以驗證,RocketMQ 所謂的多主多備樣式,實際上,備機被弱化到無以復加,在主節點宕機的時候,備機無法接替主機的工作,而只是將尚未傳送的資料發送出去,由剩下的主節點接替工作。也就是說,N 主 N 備的 RocketMQ 叢集中,總共有 2N 臺機器,實際工作的只有 N 臺,如果有一臺掛了,就只有 N-1 臺工作了,機器的利用率太低了。
再來看一下 Consumer 的日誌,如下,
RocketMQ 00551PullResult [pullStatus=FOUND, nextBeginOffset=696, minOffset=0, maxOffset=696, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=695, sysFlag=0, bornTimestamp=1469175032446, bornHost=/192.168.234.98:51987, storeTimestamp=1469175020973, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007859C, commitLogOffset=492956, bodyCRC=943070764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=696, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00559PullResult [pullStatus=FOUND, nextBeginOffset=697, minOffset=0, maxOffset=697, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=696, sysFlag=0, bornTimestamp=1469175032720, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021247, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000787F8, commitLogOffset=493560, bodyCRC=921540126, reconsumeTimes=0,
…
RocketMQ 01408PullResult [pullStatus=FOUND, nextBeginOffset=211, minOffset=0, maxOffset=211, msgFoundList=1]MessageExt [queueId=0, storeSize=151, queueOffset=145, sysFlag=0, bornTimestamp=1469175072078, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060614, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000001274EE, commitLogOffset=1209582, bodyCRC=98787231, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=211, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01416PullResult [pullStatus=FOUND, nextBeginOffset=214, minOffset=0, maxOffset=214, msgFoundList=3]MessageExt [queueId=0, storeSize=151, queueOffset=146, sysFlag=0, bornTimestamp=1469175072405, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060934, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000012774A, commitLogOffset=1210186, bodyCRC=2067809241, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=214, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
詳細日誌請檢視原文。
可以看到,Consumer 在 broker-a 宕機時間的附近,也出現了異常,connect to <192.168.232.23:10911> failed。雖然還能保持分割槽上的順序性,但是已經某種程度上出現了一些紊亂,例如,將我在實驗之前的資料給取了出來(Hello MetaQ的訊息)。可是,我在實驗前,明明做過刪除這個 Topic 的動作,看來 RocketMQ 所謂的刪除,並未刪除 Topic 的資料。之後,broker-a 主機重啟之後,又恢復正常。
RocketMQ Pull樣式消費需要手動管理 offset 和指定分割槽,這個在呼叫的時候不覺得,實際執行的時候才會發現每次總是消費一個分割槽,消費完之後,才開始消費下一個分割槽,而下一個分割槽可能已經堆積了很多訊息了,手動做訊息分配又比較費事。或許,Push 順序樣式消費才是更好的選擇。
另外還有幾個比較異常的情況,實驗中有幾次出現了 CODE: 17 DESC: topic[TopicTest1] not exist, apply first please! 這樣的錯誤,實際上,這時候我只是關掉了 Producer;還有,sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1 明明檔案中說可以用來新增 Topic,而實際上不行。
補充一下:之後,我又使用 Push 順序樣式消費重做了上述實驗,結論差不多。只是因為有多執行緒的原因,日誌看起來偶爾有錯位,這個問題不大,可以解決。而且,在關閉重啟 Broker 的附近,往往伴隨著多次的訊息重發,不過,RocketMQ 也不保證訊息只收到一次就是了。訊息重覆的問題,Kafka 要比 RocketMQ 顯得不那麼嚴重一些。Push 順序樣式消費不需要指定 offset,不需要指定分割槽,第二次啟動可以自動從前一次的 offset 後開始消費。功能上這個與 Kafka 的 Consumer 更類似,雖然 RocketMQ 採用的是非同步樣式。
RocketMQ 最佳實踐
實際上,RocketMQ 自己就有一份《RocketMQ 最佳實踐》的檔案,裡面提到了一些系統設計的問題,例如消費者要冪等,一個應用對應一個 Topic,如此等等。這些經驗不僅僅是對 RocketMQ 有用,對 Kafka 也頗有借鑒意義。
後記
這裡談談我對選擇 RocketMQ 還是 Kafka 的個人建議。以上已經做了多處 RocketMQ 和 Kafka 的對比,我個人覺得,Kafka 是一個不斷發展中的系統,開源社群比 RocketMQ 要大,也要更活躍一些;另外,Kafka 最新版本已經有了同步複製,訊息可靠性更有保障;還有,Kafka 的分割槽機制,幾乎實現了自動負載均衡,這絕對是個殺手級特性;RocketMQ 雖然提供了很多易用的功能,遠超出 Kafka,但這些功能並不一定都能用得上,而且多數可以繞過。相比之下,Kafka 的基本功能更加吸引我,再處理故障恢復的時候,細節上要勝過 RocketMQ。當然,如果是 A 公司內部,或者所在公司使用了 A 公司的雲產品,那麼 RocketMQ 的企業級特性更多一些,或許我會選擇 RocketMQ。
系列
【關於投稿】
如果大家有原創好文投稿,請直接給公號傳送留言。
① 留言格式:
【投稿】+《 文章標題》+ 文章連結
② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/
③ 最後請附上您的個人簡介哈~
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能