來源:Valleylord
Kafka 副本和叢集
在生產環境中,Kafka 總是以“叢集+分割槽”方式執行的,以保證可靠性和效能。下麵是一個3副本的 Kafka 叢集實體。
首先,需要啟動3個 Kafka Broker,Broker 的配置檔案分別如下,
雖然每個 Broker 只配置了一個埠,實際上,Kafka 會多佔用一個,可能是用來 Broker 之間的複製的。另外,3個 Broker 都配置了,
在同一個 Zookeeper 上的 Broker 會被歸類到一個叢集中。註意,這些配置中並沒有指定哪一個 Broker 是主節點,哪些 Broker 是從節點,Kafka 採用的辦法是從可選的 Broker 中,選出每個分割槽的 Leader。也就是說,對某個 Topic 來說,可能0節點是 Leader,另外一些 Topic,可能1節點是 Leader;甚至,如果 topic1 有2個分割槽的話,分割槽1的 Leader 是0節點,分割槽2的 Leader 是1節點。
這種對等的設計,對於故障恢復是十分有用的,在節點崩潰的時候,Kafka 會自動選舉出可用的從節點,將其升級為主節點。在崩潰的節點恢復,加入叢集之後,Kafka 又會將這個節點加入到可用節點,並自動選舉出新的主節點。
實驗如下,先新建一個3副本,2分割槽的 Topic,
bin/kafka-topics.sh –create –zookeeper –replication-factor 3 –partitions 2 –topic topic1
初始狀況下,topic1 的狀態如下,
$ bin/kafka-topics.sh –describe –zookeeper –topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
對於上面的輸出,即使沒有檔案,也可以看懂大概:topic1 有2個分割槽,Partition 0 和 Partition 1,Leader 分別在 Broker 0 和 1。Replicas 表示副本在哪些 Broker 上,Isr(In-Sync Replicas)表示處於同步狀態中的 Broker,如果有 Broker 宕機了,那麼 Replicas 不會變,但是 Isr 會僅顯示沒有宕機的 Broker,詳見下麵的實驗。
然後分2個執行緒,執行之前寫的 Producer 和 Consumer 的示例程式碼,Producer 採用非同步傳送,訊息採用同步複製。在有訊息傳送的情況下,kill -9 停掉其中2個 Broker(Broker 0 和 Broker 1),模擬突然宕機。此時,topic1 狀態如下,
$ bin/kafka-topics.sh –describe –zookeeper –topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2
Topic: topic1 Partition: 1 Leader: 2 Replicas: 1,2,0 Isr: 2
可見,Kafka 已經選出了新的 Leader,訊息傳送沒有中斷。接著再啟動被停掉的那兩個 Broker,並檢視 topic1 的狀態,如下,
$ bin/kafka-topics.sh –describe –zookeeper –topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0
Topic: topic1 Partition: 1 Leader: 2 Replicas: 1,2,0 Isr: 2,1,0
$ bin/kafka-topics.sh –describe –zookeeper –topic topic1
Topic:topic1 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0
可以發現, 有一個短暫的時間,topic1 的兩個分割槽的 Leader 都是 Broker 2,但是在 Kafka 重新選舉之後,分割槽1的 Leader 變為 Broker 1。說明 Kafka 傾向於用不同的 Broker 做分割槽的 Leader,這樣更能達到負載均衡的效果。
再來看看 Producer 和 Consumer 的日誌,下麵這個片段是2個 Broker 宕機前後的日誌,
Send message: (00439, Message_00439) at offset 217 to partition(0) in 3 ms
Received message: (00438, Message_00438) at offset 216
Send message: (00440, Message_00440) at offset 218 to partition(0) in 5 ms
Send message: (00441, Message_00441) at offset 221 to partition(1) in 5 ms
Received message: (00441, Message_00441) at offset 221
Received message: (00439, Message_00439) at offset 217
Send message: (00442, Message_00442) at offset 222 to partition(1) in 5 ms
Send message: (00443, Message_00443) at offset 219 to partition(0) in 3 ms
Received message: (00440, Message_00440) at offset 218
Received message: (00443, Message_00443) at offset 219
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
Received message: (00442, Message_00442) at offset 222
Send message: (00452, Message_00452) at offset 223 to partition(1) in 7492 ms
Send message: (00454, Message_00454) at offset 224 to partition(1) in 7485 ms
Send message: (00455, Message_00455) at offset 225 to partition(1) in 7482 ms
Send message: (00458, Message_00458) at offset 226 to partition(1) in 7473 ms
Send message: (00460, Message_00460) at offset 227 to partition(1) in 7467 ms
Send message: (00461, Message_00461) at offset 228 to partition(1) in 7465 ms
Send message: (00462, Message_00462) at offset 229 to partition(1) in 7462 ms
Send message: (00463, Message_00463) at offset 230 to partition(1) in 7459 ms
Send message: (00464, Message_00464) at offset 231 to partition(1) in 7456 ms
Send message: (00465, Message_00465) at offset 232 to partition(1) in 7453 ms
Send message: (01103, Message_01103) at offset 543 to partition(1) in 5478 ms
Received message: (00631, Message_00631) at offset 310
Received message: (00633, Message_00633) at offset 311
Send message: (00451, Message_00451) at offset 220 to partition(0) in 7525 ms
Received message: (00634, Message_00634) at offset 312
Send message: (00453, Message_00453) at offset 221 to partition(0) in 7518 ms
Received message: (00639, Message_00639) at offset 313
Send message: (00456, Message_00456) at offset 222 to partition(0) in 7509 ms
Received message: (00641, Message_00641) at offset 314
Send message: (00457, Message_00457) at offset 223 to partition(0) in 7506 ms
Received message: (00643, Message_00643) at offset 315
出現錯誤的時候,Producer 丟擲了 NetworkException 異常。其中有3589條 Received 日誌,3583條 Send 日誌,7條 NetworkException 異常日誌,傳送訊息的最大序號是3590,接收訊息的最大序號是3589,有以下幾個值得註意的地方,
宕機之前,訊息的接收並不是順序的,這是因為 topic1 有2個分割槽,Kafka 只保證分割槽上的有序;
宕機之後,出現了長段的傳送日誌而沒有接收日誌,說明 Kafka 此時正在選舉,選舉的過程會阻塞消費者;
從接收訊息的條數和序號來看,所有的訊息都收到了,沒有丟(沒有收到3590的訊息可能是因為強制退出 client 行程的原因),傳送的過程的7個異常應該只是虛警,7條異常對應序號444~450,3583條 Send 訊息再加上這7條,與總訊息3590條一致;
從這個實驗中,可以看到,雖然 Kafka 不保證訊息重覆傳送,但是卻在儘量保證沒有訊息被重覆傳送,可能我的實驗場景還不夠極端,沒有做出訊息重覆的情況。
如之前所說,如果要保持完全順序性,需要使用單分割槽;如果要避免丟擲 NetworkException 異常,就使用 Producer 同步傳送。下麵,我們重做上面的例子,不同之處是使用單分割槽和 Producer 同步傳送,擷取一段 Broker 宕機時的日誌如下,
Sent message: (118, Message_00118)
Received message: (00118, Message_00118) at offset 117
Received message: (00119, Message_00119) at offset 118
Sent message: (119, Message_00119)
Sent message: (120, Message_00120)
Received message: (00120, Message_00120) at offset 119
Sent message: (121, Message_00121)
Received message: (00121, Message_00121) at offset 120
Sent message: (122, Message_00122)
Sent message: (123, Message_00123)
Sent message: (124, Message_00124)
Sent message: (125, Message_00125)
Sent message: (126, Message_00126)
Sent message: (127, Message_00127)
可見,由於採用同步傳送,Broker 宕機並沒有造成丟擲異常,另外,由於使用單分割槽,順序性也得到了保證,全域性沒有出現亂序的情況。
綜上,是否使用多分割槽更多的是對順序性的要求,而使用 Producer 同步傳送還是非同步傳送,更多是出於重覆訊息的考慮,如果非同步傳送丟擲異常,在保證不丟訊息的前提下,勢必要重發訊息,這就會導致收到重覆訊息。多分割槽和 Producer 非同步傳送,會帶來效能的提升,但是也會引入非順序性,重覆訊息等問題,如何取捨要看應用的需求。
Kafka 最佳實踐
Kafka 在一些應用場景中,有一些前人總結的最佳實踐 8 9。對最佳實踐,我的看法是,對於自己比較熟悉,有把握的部分,可以按自己的步驟進行;對一些自己不清楚的領域,可以借鑒其中的一些內容,至少不會錯的特別厲害。有文章10說,Kafka 在分割槽比較多的時候,相應時間會變長,這個現象值得在實踐中註意。
在 Kafka 與 RocketMQ 的對比中,RocketMQ 的一個核心功能就是可以支援同步刷盤,此時,即使突然斷電,也可以保證訊息不丟;而 Kafka 採用的是非同步刷盤,即使傳回寫入成功,也只是寫入緩衝區成功,並非已經持久化。因此,如果出現斷電或 kill -9 的情況,Kafka 記憶體中的訊息可能丟失。另外,同步刷盤的效率是比較低下的,一般生產中估計也不會使用,可以用優雅關閉的方式來關閉行程。如果不考慮這些極端情況的話,Kafka 基本是一個很可靠的訊息中介軟體。
How to choose the number of topics/partitions in a Kafka cluster?
