- Kafka in 30 seconds
- This Benchmark
- The Setup
- Producer Throughput
- Consumer Throughput
這篇文章是關於LinkedIn如何用kafka作為一個中央釋出-訂閱日誌,在應用程式,流處理,hadoop資料提取之間整合資料。無論如何,kafka日誌一個好處就是廉價。百萬級別的TPS都不是很大的事情。因為日誌比起資料庫或者K-V儲存是更簡單的東西。我們的生產環境kafka叢集每天每秒處理上千萬讀寫請求,並且只是構建在一個非常普通的硬體上。
接下來讓我們做一些壓測,看看kafka究竟多麼牛逼。
Kafka in 30 seconds
為了幫助理解接下來的壓測,首先讓我們大概瞭解一下kafka是什麼,以及一些kafka工作的細節。kafka是LinkedIn開發一個分散式訊息系統,現在是 Apache Software Foundation的成員之一,並且非常多的公司在使用kafka。
生產者將記錄傳送到kafka叢集,叢集保留這些記錄並將其交給消費者;
01-producer_consumer.png
kafka一個最核心的概念就是topic(筆者在這裡並不打算翻譯它,無論翻譯成什麼都覺得變味了)。生產者釋出記錄到topic,消費者訂閱一個或多個topic。kafka的topic實際上就是一個分割槽後的write-ahead log。生產者把需要釋出的記錄追加到這些日誌後面。消費者訂閱它們。每一個記錄都是一個K-V對,key主要用於分配記錄到日誌分割槽。
下圖是一個簡單的示例圖,生產者如何寫記錄到一個擁有兩個分割槽的topic,以及消費者如何讀這個topic:
上圖展示了生產者如何追加日誌到兩個分割槽,以及消費者讀取日誌。日誌中每條記錄都有一個相關的條目編號,我們把它稱為offset。消費者使用offset來描述其在每個日誌中的位置。
這些分割槽分割槽在叢集的各個伺服器上。
需要註意kafka與很多訊息系統不一樣,它的日誌總是持久化,當接收到訊息後,會立即寫到檔案系統。消費者讀訊息時訊息並不會被刪除。它的保留策略透過配置來決定。這就允許在資料使用者可能需要重新載入資料的情況下使用。並且也能節省空間,無論多少消費者,日誌共享一份。
傳統的訊息系統,常常一個消費者一個佇列,因此增加消費者,資料空間就會成倍增加。這使得Kafka非常適合普通訊息傳遞系統之外的事物,例如充當離線資料系統(如Hadoop)的管道。 這些離線系統可能僅作為週期性ETL週期的一部分在一定時間間隔載入,或者可能會停機幾個小時進行維護,在此期間,如果需要,Kafka能夠緩衝甚至TB量級的未消耗資料。
kafka也複製日誌到多臺伺服器上,為了容錯。複製實現是kafka一個非常重要的架構特性。和其他訊息系統相比,複製不是一種需要複雜配置的異乎尋常的外掛,只能在非常特殊的情況下使用。 相反,kafka的架構複製被假定為預設值:我們將未複製的資料視為複製因子恰好為1的特殊情況。
生產者在釋出包含記錄偏移量的訊息時會收到確認。傳送到同一個分割槽的第一條記錄分配的offset為0,第二條是1,以此類推。消費者透過offset指定的位置消費資料,並且消費者透過週期性的提交topic(名為__consumer_offsets
)從而儲存代表訊息位置的offset到日誌中,達到持久化的標的。儲存這個offset的目的是為了消費者崩潰後,其他消費者能從儲存的位置繼續消費訊息。
kafka簡單介紹到此為止,系統這一切都有意義。
This Benchmark
對於此次基準測試,我喜歡遵循我稱之為“懶惰基準測試(lazy benchmarking)”的風格。當您使用系統時,您通常擁有將其調整到任何特定用例的完美的專有技術。這導致了一種基準測試,您可以將配置大幅調整到基準測試,或者更糟糕的是針對您測試的每個場景進行不同的調整。我認為系統的真正測試不是它在完美調整時的表現,而是它如何“現成”執行。對於在具有數十個或數百個用例的多租戶設定中執行的系統尤其如此,其中針對每個用例的調優不僅不切實際而且不可能。因此,我幾乎堅持使用伺服器和客戶端的預設設定。我將指出我懷疑透過一點調整可以改善結果的區域,但我試圖抵制任何擺弄自己以改善結果的誘惑。
配置和壓測命令文末會貼出來,所以如果你感興趣的話,在你們的伺服器上也能重現本文的壓測結果。
The Setup
本次測試,總計6臺伺服器,配置如下:
- Intel Xeon 2.5 GHz processor with six cores
- Six 7200 RPM SATA drives
- 32GB of RAM
- 1Gb Ethernet
kafka叢集安裝在其中的3臺伺服器上,6塊硬碟直接掛載,沒有RAID。另外三臺伺服器用於Zookeeper和壓力測試。
3臺伺服器的叢集不是很大,但是因為我們只測試複製因子為3,所以三臺伺服器叢集足夠。顯而易見的是,我們能透過增加更多的分割槽,傳播資料到更多的伺服器上來水平擴充套件我們的叢集。
這些硬體不是LinkedIn平常使用的kafka硬體。我們的kafka伺服器有針對性的調優,能更好的執行的執行kafka。這次測試,我從Hadoop叢集中借用了這幾臺伺服器,這些伺服器都是我們持久化系統中最便宜的裝置。 Hadoop的使用樣式與Kafka非常相似,所以這是一件合理的事情。
Producer Throughput
接下來的測試是壓測生產者的吞吐量,測試過程中沒有消費者執行,因此所有訊息被持久化(稍後會測試生產者和消費者都存在的場景),但是沒有被讀取。
Single producer thread, no replication
- 821,557 records/sec
- 78.3 MB/sec
這第一個測試基於的topic:6個分割槽,沒有副本。然後單執行緒盡可能快的產生5千萬個小記錄(100byte)。在這些測試中關註小記錄的原因是它對於訊息系統來說是更難的情況。如果訊息很大,很容易以MB/秒獲得良好的吞吐量,但是當訊息很小時反而很難獲得良好的吞吐量,因為處理每個訊息的開銷佔主導地位。
一個直接的觀察是,這裡的壓測資料遠高於人們的預期,特別是對於持久儲存系統。 如果您習慣於隨機訪問資料系統(如資料庫或鍵值儲存),通常會產生大約5,000到50,000次查詢的最大吞吐量,這接近於良好的RPC層可以執行的速度遠端請求。 由於兩個關鍵設計原則,我們超過了這一點:
- 我們努力確保我們進行線性磁碟I/O。這些伺服器提供的六塊廉價磁碟的線性總吞吐量為822 MB /秒。許多訊息系統將永續性視為昂貴的附加元件,認為其會降低效能並且應該謹慎使用,但這是因為它們沒有進行線性I/O.
- 在每個階段,我們都致力於將少量資料批次合併到更大的網路和磁碟I/O操作中。 例如,在新生產者中,我們使用“group commit”類似的機制來確保在另一個I/O正在進行中時發起的任何記錄被組合在一起。 有關瞭解批處理重要性的更多資訊,請參閱David Patterson寫的”Latency Lags Bandwidth”。
Single producer thread, 3x async replication
- 786,980 records/sec
- 75.1 MB/sec
這次測試和前一次的測試幾乎一樣,除了每個分割槽有三個副本(因此寫到網路或者磁碟的資料是前一次的三倍)。每個伺服器都從生產者那裡為它作為leader分割槽執行寫操作,以及為其作為follower分割槽獲取和寫入資料。
本次測試的複製是非同步的,即acks=0。訊息只要寫到本地日誌即可,不需要等待這個分割槽的其他副本收到訊息。這就意味著,如果leader崩潰,可能會丟失最新的一些還未同步到副本的訊息。
我希望人們能從中得到的關鍵是複製可以更快。對應3x複製,叢集總寫入能力有3倍的退化,因為每個寫操作要做3次。但是每個客戶端的吞吐量依然表現不錯。 高效能複製在很大程度上取決於我們的消費者的效率,後面會在消費者部分討論。
Single producer thread, 3x sync replication
- 421,823 records/sec
- 40.2 MB/sec
此次測試和前面的測試一樣,除了leader需要等待所有in-sync replicas確認收到訊息才會傳回結果給生產者。即acks=all或者acks=-1。這種樣式下,只要有一個in-sync replica存在,訊息就不會丟失。
Kafka中的同步複製與非同步複製沒有根本的不同。分割槽leader總是跟蹤follower副本進度,監控它們是否存在。在所有in-sync replicas確認收到訊息之前,我們永遠不會向消費者發出訊息。使用同步複製,我們要等待響應給生產者的請求,直到follower副本都已經複製。
這種額外的延遲似乎會影響我們的吞吐量。由於伺服器上的程式碼路徑非常相似,我們可以透過調整批處理來更好地改善這種影響,並允許客戶端緩衝更多未完成的請求。 但是,本著避免特殊情況調整的原則,我沒有這麼做。
Three producers, 3x async replication
- 2,024,032 records/sec
- 193.0 MB/sec
我們的單一生產者處理顯然不能壓出三節點叢集的能力上限。為了增加負載,重覆前面的非同步複製樣式測試流程,但是在三臺不同伺服器上執行三個不同的生產者(在同一臺機器上執行更多行程將無助於我們使NIC飽和)。然後,我們可以檢視這三個生產者的總吞吐量,以更好地瞭解群集的總容量。
Producer Throughput VS. Stored Data
許多訊息系統一個隱藏的危險是,只有在他們儲存的資料在記憶體中才會工作的很好。當資料備份不能被消費時(資料就需要儲存到磁碟上),吞吐量會下降幾個等級,甚至更多。這就意味著只有在消費者速度能跟上生產者,並且佇列是空的情況下系統才會執行良好。一旦消費者落後,沒有消費的訊息需要備份,備份可能會使資料持久化到磁碟上,這就會引起效能大幅下降。這意味著訊息傳遞系統無法跟上傳入的資料。這種情況非常嚴重,訊息系統在大部分情況下,應該能做到平和的處理佇列中的訊息。
kafka總是採用追加的方式持久化訊息,並且對於沒有消費的資料,持久化的的時間複雜度是 O(1)。
這次實驗測試,讓我們在一段延長的時間內執行吞吐量測試,併在儲存的資料集增長時繪製結果圖:
如圖所示,效能並沒有明顯的變化。但是由於資料大小所以沒有影響:我們在寫入TB資料之後也表現得同樣好,就像前幾百MB一樣。
圖中的效能波動主要是因為Linux系統I/O管理批次處理資料,週期性的把資料flush到磁碟。LinkedIn的kafka生產環境上針對這個有一些調優。可以參考kafka Hardware and OS。
Consumer Throughput
OK,現在讓我們把註意力轉移到消費者吞吐量上來。
請註意,複製因子不會影響此測試的結果。因為不管複製因子如何,消費者只能從一個副本讀取。 同樣,生產者的確認級別(acks引數)也無關緊要,因為消費者只讀取完全確認的訊息(所有In-Sync Replicas都已經同步的訊息才能被消費)。 這是為了確保消費者看到的任何訊息在leader切換後始終存在(如果當前leader發生異常需要重新選舉新的leader的話)。
Single Consumer
- 940,521 records/sec
- 89.7 MB/sec
第一次測試:將在有6個分割槽,3個副本的topic中單執行緒消費5千萬條訊息。
kafka消費者效率很高,它直接從linux檔案系統中抓取日誌塊。它透過sendfile這個API,直接透過作業系統傳輸資料,所以沒有透過應用程式複製此資料的開銷。
本次測試實際上從日誌初始位置開始,因此它在做真正的讀I/O。但是在生產環境中,消費者幾乎完全從OS頁面快取中讀取,因為它正在讀取剛剛由某個生產者產生的資料(這些資料仍然在快取中)。事實上,如果您在生產伺服器上執行相關命令檢視I/O stat,會看到消耗大量資料被消費,也根本沒有物理讀取。
讓消費者盡可能cheap,是我們希望kafka做的一件非常重要的事情。首先,副本也是消費者。所以,讓消費者cheap,副本也會cheap。其次,這樣會是處理資料不是非常昂貴的操作。因此出於可伸縮性的原因,我們不需要嚴格控制。
cheap字面含義是便宜,但是在這裡的含義,我覺得是業務邏輯不要太複雜。
Three Consumers
- 2,615,968 records/sec
- 249.5 MB/sec
重覆上面相同的測試,不同的是有三個消費者並行處理。三個消費者分佈在三臺不同伺服器上。這三個消費者屬於同一個消費者組中的成員,即它們消費同樣的topic。
和我們預期一樣,我們看到消費能力線性擴充套件,幾乎就是單個消費者吞吐量的3倍,這一點都不令人驚訝。
Producer and Consumer
- 795,064 records/sec
- 75.8 MB/sec
上面的測試僅限於生產者和消費者執行在不同伺服器。現在,讓我們把生產者和消費者執行在同一臺伺服器上。實際上,我們也是這樣做的,因為這樣的話,複製工作就是讓伺服器本身充當消費者。
對於此次測試,我們將基於6個分割槽,3個副本的topic,分別執行1個生產者和1個消費者,並且topic初始為空。 生產者再次使用非同步複製。 報告的吞吐量是消費者吞吐量(顯然,是生產者吞吐量的上限)。
和我們預期一樣,得到的結果和只有生產者時基本相同,前提是消費者相當cheap。
Effect of Message Size
前面的測試已經展示了100位元組大小訊息kafka的效能。對於消費系統來說,更小的訊息是更大的問題。因為它們放大了系統記賬的開銷。 我們可以透過在記錄/秒和MB/秒兩者中繪製吞吐量來顯示這一點:
這張圖和我們預期一樣,隨著訊息體越來越大,每秒我們能傳送的訊息數量也會減少。但是,如果我們看MB/秒效能報告,我們會看到實際使用者資料的總位元組吞吐量隨著訊息變大而增加:
總結:訊息體越大,每秒能處理的訊息數量越少,但是每秒能處理的訊息體積越大;訊息體越小,每秒能處理的訊息數量越多,但是每秒能處理的訊息體積越小;
另外我們可以看到,對於10位元組的訊息,我們實際上只是透過獲取鎖並將訊息排入傳送來限制CPU – 我們無法實際最大化網路。 但是,從100位元組開始,我們實際上看到網路飽和。
End-to-end Latency
- 2 ms (median)
- 3 ms (99th percentile)
- 14 ms (99.9th percentile)
到現在為止,我們討論的都是吞吐量。但是訊息傳遞的延遲情況呢?也就是說,訊息傳遞到消費者,需要多長的時間。此次測試,我們將建立生產者和消費者,並重覆計算生產者將訊息傳送到kafka叢集然後由我們的消費者接收所需的時間。
請註意,Kafka僅在所有in-sync replicas確認訊息後才向消費者發出訊息。因此,無論我們使用同步還是非同步複製,此測試都會給出相同的結果,因為該設定僅影響對生產者的確認,而本次測試是生產者傳送的訊息傳遞到消費者的時間。
Replicating this test
如果你想要在你自己的伺服器上,執行這些壓力測試,當然沒有問題。正如我所說的,我大部分情況下只是使用我們預裝的效能測試工具,這些工具隨Kafka釋出包一起提供,並且伺服器和客戶端大部分都是預設配置。
attachment
下麵給出本次壓測一些命令,以及kafka伺服器配置。
benchmark commands
###############################################################
壓測指令碼(zk叢集地址後的/afei是配置的chroot):
--zookeeper:10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei
--broker: 10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092
################################################################
建立需要的TOPIC:
bin/kafka-topics.sh --zookeeper 10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei --create --topic TPC-P6-R1 --partitions 6 --replication-factor 1
bin/kafka-topics.sh --zookeeper 10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei --create --topic TPC-P6-R3 --partitions 6 --replication-factor 3
1個生產者-單執行緒&無副本:
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R1 --num-records 50000000 --record-size 128 --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
執行指令碼說明:
--num-records表示傳送訊息的數量,即5kw條;
--record-size表示每條訊息的大小,即128位元組;
--throughput表示吞吐量限制,-1沒有限制;
--producer-props後面的都是生產者配置
1個生產者-單執行緒&3個副本非同步寫入:
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
1個生產者-單執行緒&3個副本同步寫入:
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=-1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
3個生產者-單執行緒&3個副本非同步寫入:
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
- 傳送50億條100個位元組大小的訊息
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records 5000000000 --record-size 100 --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
消費尺寸的影響--分別嘗試各種不同位元組大小訊息
for i in 10 100 1000 10000 100000;
do
echo ""
echo $i
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records $((1000*1024*1024/$i)) --record-size $i --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
done;
單個消費者訊息能力:
bin/kafka-consumer-perf-test.sh --zookeeper 10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei --messages 50000000 --topic TPC-P6-R3 --threads 1
3個消費者消費能力--在3臺伺服器上執行3個消費者:
bin/kafka-consumer-perf-test.sh --zookeeper 10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei --messages 50000000 --topic TPC-P6-R3 --threads 1
生產者&消費者:
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
bin/kafka-consumer-perf-test.sh --zookeeper 10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei --messages 50000000 --topic TPC-P6-R3 --threads 1
server config
broker.id=0
port=9092
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/grid/a/dfs-data/kafka-logs,/grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs
num.partitions=8
log.retention.hours=168
log.segment.bytes=536870912
log.cleanup.interval.mins=1
zookeeper.connect=10.0.0.1:2181
zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false
replica.lag.max.messages=10000000
英文原文地址:https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines