(點選上方公眾號,可快速關註)
來源:Valleylord ,
valleylord.github.io/post/201607-mq-kafka/
Kafka 副本和叢集
在生產環境中,Kafka 總是以“叢集+分割槽”方式執行的,以保證可靠性和效能。下麵是一個3副本的 Kafka 叢集實體。
首先,需要啟動3個 Kafka Broker,Broker 的配置檔案分別如下,
broker.id=0
listeners=PLAINTEXT://192.168.232.23:9092
log.dirs=/tmp/kafka-logs
broker.id=1
listeners=PLAINTEXT://192.168.232.23:9093
log.dirs=/tmp/kafka-logs-1
broker.id=1
listeners=PLAINTEXT://192.168.232.23:9094
log.dirs=/tmp/kafka-logs-2
雖然每個 Broker 只配置了一個埠,實際上,Kafka 會多佔用一個,可能是用來 Broker 之間的複製的。另外,3個 Broker 都配置了,
zookeeper.connect=localhost:2181
delete.topic.enable=true
在同一個 Zookeeper 上的 Broker 會被歸類到一個叢集中。註意,這些配置中並沒有指定哪一個 Broker 是主節點,哪些 Broker 是從節點,Kafka 採用的辦法是從可選的 Broker 中,選出每個分割槽的 Leader。也就是說,對某個 Topic 來說,可能0節點是 Leader,另外一些 Topic,可能1節點是 Leader;甚至,如果 topic1 有2個分割槽的話,分割槽1的 Leader 是0節點,分割槽2的 Leader 是1節點。
這種對等的設計,對於故障恢復是十分有用的,在節點崩潰的時候,Kafka 會自動選舉出可用的從節點,將其升級為主節點。在崩潰的節點恢復,加入叢集之後,Kafka 又會將這個節點加入到可用節點,並自動選舉出新的主節點。
實驗如下,先新建一個3副本,2分割槽的 Topic,
bin/kafka-topics.sh –create –zookeeper 192.168.232.23:2181 –replication-factor 3 –partitions 2 –topic topic1
初始狀況下,topic1 的狀態如下,
$ bin/kafka-topics.sh –describe –zookeeper 192.168.232.23:2181 –topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
對於上面的輸出,即使沒有檔案,也可以看懂大概:topic1 有2個分割槽,Partition 0 和 Partition 1,Leader 分別在 Broker 0 和 1。Replicas 表示副本在哪些 Broker 上,Isr(In-Sync Replicas)表示處於同步狀態中的 Broker,如果有 Broker 宕機了,那麼 Replicas 不會變,但是 Isr 會僅顯示沒有宕機的 Broker,詳見下麵的實驗。
然後分2個執行緒,執行之前寫的 Producer 和 Consumer 的示例程式碼,Producer 採用非同步傳送,訊息採用同步複製。在有訊息傳送的情況下,kill -9 停掉其中2個 Broker(Broker 0 和 Broker 1),模擬突然宕機。此時,topic1 狀態如下,
$ bin/kafka-topics.sh –describe –zookeeper 192.168.232.23:2181 –topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2
Topic: topic1 Partition: 1 Leader: 2 Replicas: 1,2,0 Isr: 2
可見,Kafka 已經選出了新的 Leader,訊息傳送沒有中斷。接著再啟動被停掉的那兩個 Broker,並檢視 topic1 的狀態,如下,
$ bin/kafka-topics.sh –describe –zookeeper 192.168.232.23:2181 –topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0
Topic: topic1 Partition: 1 Leader: 2 Replicas: 1,2,0 Isr: 2,1,0
$ bin/kafka-topics.sh –describe –zookeeper 192.168.232.23:2181 –topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0
可以發現, 有一個短暫的時間,topic1 的兩個分割槽的 Leader 都是 Broker 2,但是在 Kafka 重新選舉之後,分割槽1的 Leader 變為 Broker 1。說明 Kafka 傾向於用不同的 Broker 做分割槽的 Leader,這樣更能達到負載均衡的效果。
再來看看 Producer 和 Consumer 的日誌,下麵這個片段是2個 Broker 宕機前後的日誌,
……
Send message: (00439, Message_00439) at offset 217 to partition(0) in 3 ms
Received message: (00438, Message_00438) at offset 216
Send message: (00440, Message_00440) at offset 218 to partition(0) in 5 ms
Send message: (00441, Message_00441) at offset 221 to partition(1) in 5 ms
Received message: (00441, Message_00441) at offset 221
Received message: (00439, Message_00439) at offset 217
Send message: (00442, Message_00442) at offset 222 to partition(1) in 5 ms
Send message: (00443, Message_00443) at offset 219 to partition(0) in 3 ms
Received message: (00440, Message_00440) at offset 218
Received message: (00443, Message_00443) at offset 219
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
Received message: (00442, Message_00442) at offset 222
Send message: (00452, Message_00452) at offset 223 to partition(1) in 7492 ms
Send message: (00454, Message_00454) at offset 224 to partition(1) in 7485 ms
Send message: (00455, Message_00455) at offset 225 to partition(1) in 7482 ms
Send message: (00458, Message_00458) at offset 226 to partition(1) in 7473 ms
Send message: (00460, Message_00460) at offset 227 to partition(1) in 7467 ms
Send message: (00461, Message_00461) at offset 228 to partition(1) in 7465 ms
Send message: (00462, Message_00462) at offset 229 to partition(1) in 7462 ms
Send message: (00463, Message_00463) at offset 230 to partition(1) in 7459 ms
Send message: (00464, Message_00464) at offset 231 to partition(1) in 7456 ms
Send message: (00465, Message_00465) at offset 232 to partition(1) in 7453 ms
……
Send message: (01103, Message_01103) at offset 543 to partition(1) in 5478 ms
Received message: (00631, Message_00631) at offset 310
Received message: (00633, Message_00633) at offset 311
Send message: (00451, Message_00451) at offset 220 to partition(0) in 7525 ms
Received message: (00634, Message_00634) at offset 312
Send message: (00453, Message_00453) at offset 221 to partition(0) in 7518 ms
Received message: (00639, Message_00639) at offset 313
Send message: (00456, Message_00456) at offset 222 to partition(0) in 7509 ms
Received message: (00641, Message_00641) at offset 314
Send message: (00457, Message_00457) at offset 223 to partition(0) in 7506 ms
Received message: (00643, Message_00643) at offset 315
……
出現錯誤的時候,Producer 丟擲了 NetworkException 異常。其中有3589條 Received 日誌,3583條 Send 日誌,7條 NetworkException 異常日誌,傳送訊息的最大序號是3590,接收訊息的最大序號是3589,有以下幾個值得註意的地方,
-
宕機之前,訊息的接收並不是順序的,這是因為 topic1 有2個分割槽,Kafka 只保證分割槽上的有序;
-
宕機之後,出現了長段的傳送日誌而沒有接收日誌,說明 Kafka 此時正在選舉,選舉的過程會阻塞消費者;
-
從接收訊息的條數和序號來看,所有的訊息都收到了,沒有丟(沒有收到3590的訊息可能是因為強制退出 client 行程的原因),傳送的過程的7個異常應該只是虛警,7條異常對應序號444~450,3583條 Send 訊息再加上這7條,與總訊息3590條一致;
從這個實驗中,可以看到,雖然 Kafka 不保證訊息重覆傳送,但是卻在儘量保證沒有訊息被重覆傳送,可能我的實驗場景還不夠極端,沒有做出訊息重覆的情況。
如之前所說,如果要保持完全順序性,需要使用單分割槽;如果要避免丟擲 NetworkException 異常,就使用 Producer 同步傳送。下麵,我們重做上面的例子,不同之處是使用單分割槽和 Producer 同步傳送,擷取一段 Broker 宕機時的日誌如下,
……
Sent message: (118, Message_00118)
Received message: (00118, Message_00118) at offset 117
Received message: (00119, Message_00119) at offset 118
Sent message: (119, Message_00119)
Sent message: (120, Message_00120)
Received message: (00120, Message_00120) at offset 119
Sent message: (121, Message_00121)
Received message: (00121, Message_00121) at offset 120
Sent message: (122, Message_00122)
Sent message: (123, Message_00123)
Sent message: (124, Message_00124)
Sent message: (125, Message_00125)
Sent message: (126, Message_00126)
Sent message: (127, Message_00127)
……
可見,由於採用同步傳送,Broker 宕機並沒有造成丟擲異常,另外,由於使用單分割槽,順序性也得到了保證,全域性沒有出現亂序的情況。
綜上,是否使用多分割槽更多的是對順序性的要求,而使用 Producer 同步傳送還是非同步傳送,更多是出於重覆訊息的考慮,如果非同步傳送丟擲異常,在保證不丟訊息的前提下,勢必要重發訊息,這就會導致收到重覆訊息。多分割槽和 Producer 非同步傳送,會帶來效能的提升,但是也會引入非順序性,重覆訊息等問題,如何取捨要看應用的需求。
Kafka 最佳實踐
Kafka 在一些應用場景中,有一些前人總結的最佳實踐 8 9。對最佳實踐,我的看法是,對於自己比較熟悉,有把握的部分,可以按自己的步驟進行;對一些自己不清楚的領域,可以借鑒其中的一些內容,至少不會錯的特別厲害。有文章10說,Kafka 在分割槽比較多的時候,相應時間會變長,這個現象值得在實踐中註意。
後記
在 Kafka 與 RocketMQ 的對比中,RocketMQ 的一個核心功能就是可以支援同步刷盤,此時,即使突然斷電,也可以保證訊息不丟;而 Kafka 採用的是非同步刷盤,即使傳回寫入成功,也只是寫入緩衝區成功,並非已經持久化。因此,如果出現斷電或 kill -9 的情況,Kafka 記憶體中的訊息可能丟失。另外,同步刷盤的效率是比較低下的,一般生產中估計也不會使用,可以用優雅關閉的方式來關閉行程。如果不考慮這些極端情況的話,Kafka 基本是一個很可靠的訊息中介軟體。
參考文章
-
http://kafka.apache.org/documentation.html
-
http://www.jianshu.com/p/453c6e7ff81c
-
http://www.infoq.com/cn/author/%E9%83%AD%E4%BF%8A#文章
-
http://developer.51cto.com/art/201501/464491.htm
-
https://segmentfault.com/q/1010000004292925
-
http://www.cnblogs.com/gnivor/p/5318319.html
-
http://www.cnblogs.com/davidwang456/p/4313784.html
-
http://www.jianshu.com/p/8689901720fd
-
http://zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/
-
How to choose the number of topics/partitions in a Kafka cluster?
系列
【關於投稿】
如果大家有原創好文投稿,請直接給公號傳送留言。
① 留言格式:
【投稿】+《 文章標題》+ 文章連結
② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/
③ 最後請附上您的個人簡介哈~
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能