kafka壓測:3臺廉價服務器支撐200萬TPS

作者:阿飛的博客
來源:https://www.jianshu.com/p/ba0642c2c328
kafka壓測:3臺廉價服務器支撐200萬TPS

這篇文章是關於LinkedIn如何用kafka作為一箇中央發佈-訂閱日誌,在應用程序,流處理,hadoop數據提取之間集成數據。無論如何,kafka日誌一個好處就是廉價。百萬級別的TPS都不是很大的事情。因為日誌比起數據庫或者K-V存儲是更簡單的東西。我們的生產環境kafka集群每天每秒處理上千萬讀寫請求,並且只是構建在一個非常普通的硬件上。

接下來讓我們做一些壓測,看看kafka究竟多麼牛逼。

Kafka in 30 seconds

為了幫助理解接下來的壓測,首先讓我們大概瞭解一下kafka是什麼,以及一些kafka工作的細節。kafka是LinkedIn開發一個分佈式消息系統,現在是 Apache Software Foundation的成員之一,並且非常多的公司在使用kafka。

生產者將記錄發送到kafka集群,集群保留這些記錄並將其交給消費者;

kafka壓測:3臺廉價服務器支撐200萬TPS

01-producer_consumer.png

kafka一個最核心的概念就是topic(筆者在這裡並不打算翻譯它,無論翻譯成什麼都覺得變味了)。生產者發佈記錄到topic,消費者訂閱一個或多個topic。kafka的topic實際上就是一個分區後的write-ahead log。生產者把需要發佈的記錄追加到這些日誌後面。消費者訂閱它們。每一個記錄都是一個K-V對,key主要用於分配記錄到日誌分區。

下圖是一個簡單的示例圖,生產者如何寫記錄到一個擁有兩個分區的topic,以及消費者如何讀這個topic:

kafka壓測:3臺廉價服務器支撐200萬TPS

02-partitioned_log_0.png

上圖展示了生產者如何追加日誌到兩個分區,以及消費者讀取日誌。日誌中每條記錄都有一個相關的條目編號,我們把它稱為offset。消費者使用offset來描述其在每個日誌中的位置。

這些分區分區在集群的各個服務器上。

需要注意kafka與很多消息系統不一樣,它的日誌總是持久化,當接收到消息後,會立即寫到文件系統。消費者讀消息時消息並不會被刪除。它的保留策略通過配置來決定。這就允許在數據使用者可能需要重新加載數據的情況下使用。並且也能節省空間,無論多少消費者,日誌共享一份。

傳統的消息系統,常常一個消費者一個隊列,因此增加消費者,數據空間就會成倍增加。這使得Kafka非常適合普通消息傳遞系統之外的事物,例如充當離線數據系統(如Hadoop)的管道。 這些離線系統可能僅作為週期性ETL週期的一部分在一定時間間隔加載,或者可能會停機幾個小時進行維護,在此期間,如果需要,Kafka能夠緩衝甚至TB量級的未消耗數據。

kafka也複製日誌到多臺服務器上,為了容錯。複製實現是kafka一個非常重要的架構特性。和其他消息系統相比,複製不是一種需要複雜配置的異乎尋常的插件,只能在非常特殊的情況下使用。 相反,kafka的架構複製被假定為默認值:我們將未複製的數據視為複製因子恰好為1的特殊情況。

生產者在發佈包含記錄偏移量的消息時會收到確認。發送到同一個分區的第一條記錄分配的offset為0,第二條是1,以此類推。消費者通過offset指定的位置消費數據,並且消費者通過週期性的提交topic(名為__consumer_offsets)從而保存代表消息位置的offset到日誌中,達到持久化的目標。保存這個offset的目的是為了消費者崩潰後,其他消費者能從保存的位置繼續消費消息。

kafka簡單介紹到此為止,系統這一切都有意義。

This Benchmark

對於此次基準測試,我喜歡遵循我稱之為“懶惰基準測試(lazy benchmarking)”的風格。當您使用系統時,您通常擁有將其調整到任何特定用例的完美的專有技術。這導致了一種基準測試,您可以將配置大幅調整到基準測試,或者更糟糕的是針對您測試的每個場景進行不同的調整。我認為系統的真正測試不是它在完美調整時的表現,而是它如何“現成”執行。對於在具有數十個或數百個用例的多租戶設置中運行的系統尤其如此,其中針對每個用例的調優不僅不切實際而且不可能。因此,我幾乎堅持使用服務器和客戶端的默認設置。我將指出我懷疑通過一點調整可以改善結果的區域,但我試圖抵制任何擺弄自己以改善結果的誘惑。

配置和壓測命令文末會貼出來,所以如果你感興趣的話,在你們的服務器上也能重現本文的壓測結果。

The Setup

本次測試,總計6臺服務器,配置如下:

  • Intel Xeon 2.5 GHz processor with six cores
  • Six 7200 RPM SATA drives
  • 32GB of RAM
  • 1Gb Ethernet

kafka集群安裝在其中的3臺服務器上,6塊硬盤直接掛載,沒有RAID。另外三臺服務器用於Zookeeper和壓力測試。

3臺服務器的集群不是很大,但是因為我們只測試複製因子為3,所以三臺服務器集群足夠。顯而易見的是,我們能通過增加更多的分區,傳播數據到更多的服務器上來水平擴展我們的集群。

這些硬件不是LinkedIn平常使用的kafka硬件。我們的kafka服務器有針對性的調優,能更好的運行的運行kafka。這次測試,我從Hadoop集群中借用了這幾臺服務器,這些服務器都是我們持久化系統中最便宜的設備。 Hadoop的使用模式與Kafka非常相似,所以這是一件合理的事情。

Producer Throughput

接下來的測試是壓測生產者的吞吐量,測試過程中沒有消費者運行,因此所有消息被持久化(稍後會測試生產者和消費者都存在的場景),但是沒有被讀取。

Single producer thread, no replication

  • 821,557 records/sec
  • 78.3 MB/sec

這第一個測試基於的topic:6個分區,沒有副本。然後單線程儘可能快的產生5千萬個小記錄(100byte)。在這些測試中關注小記錄的原因是它對於消息系統來說是更難的情況。如果消息很大,很容易以MB/秒獲得良好的吞吐量,但是當消息很小時反而很難獲得良好的吞吐量,因為處理每個消息的開銷占主導地位。

一個直接的觀察是,這裡的壓測數據遠高於人們的預期,特別是對於持久存儲系統。 如果您習慣於隨機訪問數據系統(如數據庫或鍵值存儲),通常會產生大約5,000到50,000次查詢的最大吞吐量,這接近於良好的RPC層可以執行的速度遠程請求。 由於兩個關鍵設計原則,我們超過了這一點:

  1. 我們努力確保我們進行線性磁盤I/O。這些服務器提供的六塊廉價磁盤的線性總吞吐量為822 MB /秒。許多消息系統將持久性視為昂貴的附加組件,認為其會降低性能並且應該謹慎使用,但這是因為它們沒有進行線性I/O.
  2. 在每個階段,我們都致力於將少量數據批量合併到更大的網絡和磁盤I/O操作中。 例如,在新生產者中,我們使用“group commit”類似的機制來確保在另一個I/O正在進行中時發起的任何記錄被組合在一起。 有關了解批處理重要性的更多信息,請參閱David Patterson寫的"Latency Lags Bandwidth"。

Single producer thread, 3x async replication

  • 786,980 records/sec
  • 75.1 MB/sec

這次測試和前一次的測試幾乎一樣,除了每個分區有三個副本(因此寫到網絡或者磁盤的數據是前一次的三倍)。每個服務器都從生產者那裡為它作為leader分區執行寫操作,以及為其作為follower分區獲取和寫入數據。

本次測試的複製是異步的,即acks=0。消息只要寫到本地日誌即可,不需要等待這個分區的其他副本收到消息。這就意味著,如果leader崩潰,可能會丟失最新的一些還未同步到副本的消息。

我希望人們能從中得到的關鍵是複製可以更快。對應3x複製,集群總寫入能力有3倍的退化,因為每個寫操作要做3次。但是每個客戶端的吞吐量依然表現不錯。 高性能複製在很大程度上取決於我們的消費者的效率,後面會在消費者部分討論。

Single producer thread, 3x sync replication

  • 421,823 records/sec
  • 40.2 MB/sec

此次測試和前面的測試一樣,除了leader需要等待所有in-sync replicas確認收到消息才會返回結果給生產者。即acks=all或者acks=-1。這種模式下,只要有一個in-sync replica存在,消息就不會丟失。

Kafka中的同步複製與異步複製沒有根本的不同。分區leader總是跟蹤follower副本進度,監控它們是否存在。在所有in-sync replicas確認收到消息之前,我們永遠不會向消費者發出消息。使用同步複製,我們要等待響應給生產者的請求,直到follower副本都已經複製。

這種額外的延遲似乎會影響我們的吞吐量。由於服務器上的代碼路徑非常相似,我們可以通過調整批處理來更好地改善這種影響,並允許客戶端緩衝更多未完成的請求。 但是,本著避免特殊情況調整的原則,我沒有這麼做。

Three producers, 3x async replication

  • 2,024,032 records/sec
  • 193.0 MB/sec

我們的單一生產者處理顯然不能壓出三節點集群的能力上限。為了增加負載,重複前面的異步複製模式測試流程,但是在三臺不同服務器上運行三個不同的生產者(在同一臺機器上運行更多進程將無助於我們使NIC飽和)。然後,我們可以查看這三個生產者的總吞吐量,以更好地瞭解群集的總容量。

Producer Throughput VS. Stored Data

許多消息系統一個隱藏的危險是,只有在他們保存的數據在內存中才會工作的很好。當數據備份不能被消費時(數據就需要存儲到磁盤上),吞吐量會下降幾個等級,甚至更多。這就意味著只有在消費者速度能跟上生產者,並且隊列是空的情況下系統才會運行良好。一旦消費者落後,沒有消費的消息需要備份,備份可能會使數據持久化到磁盤上,這就會引起性能大幅下降。這意味著消息傳遞系統無法跟上傳入的數據。這種情況非常嚴重,消息系統在大部分情況下,應該能做到平和的處理隊列中的消息。

kafka總是採用追加的方式持久化消息,並且對於沒有消費的數據,持久化的的時間複雜度是 O(1)。

這次實驗測試,讓我們在一段延長的時間內運行吞吐量測試,並在存儲的數據集增長時繪製結果圖:

kafka壓測:3臺廉價服務器支撐200萬TPS

03-throughput_vs_size_0.png

如圖所示,性能並沒有明顯的變化。但是由於數據大小所以沒有影響:我們在寫入TB數據之後也表現得同樣好,就像前幾百MB一樣。

圖中的性能波動主要是因為Linux系統I/O管理批量處理數據,週期性的把數據flush到磁盤。LinkedIn的kafka生產環境上針對這個有一些調優。可以參考kafka Hardware and OS。

Consumer Throughput

OK,現在讓我們把注意力轉移到消費者吞吐量上來。

請注意,複製因子不會影響此測試的結果。因為不管複製因子如何,消費者只能從一個副本讀取。 同樣,生產者的確認級別(acks參數)也無關緊要,因為消費者只讀取完全確認的消息(所有In-Sync Replicas都已經同步的消息才能被消費)。 這是為了確保消費者看到的任何消息在leader切換後始終存在(如果當前leader發生異常需要重新選舉新的leader的話)。

Single Consumer

  • 940,521 records/sec
  • 89.7 MB/sec

第一次測試:將在有6個分區,3個副本的topic中單線程消費5千萬條消息。

kafka消費者效率很高,它直接從linux文件系統中抓取日誌塊。它通過sendfile這個API,直接通過操作系統傳輸數據,所以沒有通過應用程序複製此數據的開銷。

本次測試實際上從日誌初始位置開始,因此它在做真正的讀I/O。但是在生產環境中,消費者幾乎完全從OS頁面緩存中讀取,因為它正在讀取剛剛由某個生產者產生的數據(這些數據仍然在緩存中)。事實上,如果您在生產服務器上運行相關命令查看I/O stat,會看到消耗大量數據被消費,也根本沒有物理讀取。

讓消費者儘可能cheap,是我們希望kafka做的一件非常重要的事情。首先,副本也是消費者。所以,讓消費者cheap,副本也會cheap。其次,這樣會是處理數據不是非常昂貴的操作。因此出於可伸縮性的原因,我們不需要嚴格控制。

cheap字面含義是便宜,但是在這裡的含義,我覺得是業務邏輯不要太複雜。

Three Consumers

  • 2,615,968 records/sec
  • 249.5 MB/sec

重複上面相同的測試,不同的是有三個消費者並行處理。三個消費者分佈在三臺不同服務器上。這三個消費者屬於同一個消費者組中的成員,即它們消費同樣的topic。

和我們預期一樣,我們看到消費能力線性擴展,幾乎就是單個消費者吞吐量的3倍,這一點都不令人驚訝。

Producer and Consumer

  • 795,064 records/sec
  • 75.8 MB/sec

上面的測試僅限於生產者和消費者運行在不同服務器。現在,讓我們把生產者和消費者運行在同一臺服務器上。實際上,我們也是這樣做的,因為這樣的話,複製工作就是讓服務器本身充當消費者。

對於此次測試,我們將基於6個分區,3個副本的topic,分別運行1個生產者和1個消費者,並且topic初始為空。 生產者再次使用異步複製。 報告的吞吐量是消費者吞吐量(顯然,是生產者吞吐量的上限)。

和我們預期一樣,得到的結果和只有生產者時基本相同,前提是消費者相當cheap。

Effect of Message Size

前面的測試已經展示了100字節大小消息kafka的性能。對於消費系統來說,更小的消息是更大的問題。因為它們放大了系統記賬的開銷。 我們可以通過在記錄/秒和MB/秒兩者中繪製吞吐量來顯示這一點:

kafka壓測:3臺廉價服務器支撐200萬TPS

04-size_vs_record_throughput.png

這張圖和我們預期一樣,隨著消息體越來越大,每秒我們能發送的消息數量也會減少。但是,如果我們看MB/秒性能報告,我們會看到實際用戶數據的總字節吞吐量隨著消息變大而增加:

kafka壓測:3臺廉價服務器支撐200萬TPS

05-size_vs_mb_throughput.png

總結:消息體越大,每秒能處理的消息數量越少,但是每秒能處理的消息體積越大;消息體越小,每秒能處理的消息數量越多,但是每秒能處理的消息體積越小;

另外我們可以看到,對於10字節的消息,我們實際上只是通過獲取鎖並將消息排入發送來限制CPU - 我們無法實際最大化網絡。 但是,從100字節開始,我們實際上看到網絡飽和。

End-to-end Latency

  • 2 ms (median)
  • 3 ms (99th percentile)
  • 14 ms (99.9th percentile)

到現在為止,我們討論的都是吞吐量。但是消息傳遞的延遲情況呢?也就是說,消息傳遞到消費者,需要多長的時間。此次測試,我們將創建生產者和消費者,並重復計算生產者將消息發送到kafka集群然後由我們的消費者接收所需的時間。

請注意,Kafka僅在所有in-sync replicas確認消息後才向消費者發出消息。因此,無論我們使用同步還是異步複製,此測試都會給出相同的結果,因為該設置僅影響對生產者的確認,而本次測試是生產者發送的消息傳遞到消費者的時間。

Replicating this test

如果你想要在你自己的服務器上,運行這些壓力測試,當然沒有問題。正如我所說的,我大部分情況下只是使用我們預裝的性能測試工具,這些工具隨Kafka發佈包一起提供,並且服務器和客戶端大部分都是默認配置。

attachment

下面給出本次壓測一些命令,以及kafka服務器配置。

benchmark commands

 ###############################################################
壓測腳本(zk集群地址後的/afei是配置的chroot):
--zookeeper:10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei
--broker: 10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092
################################################################
創建需要的TOPIC:
bin/kafka-topics.sh --zookeeper 10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei --create --topic TPC-P6-R1 --partitions 6 --replication-factor 1
bin/kafka-topics.sh --zookeeper 10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei --create --topic TPC-P6-R3 --partitions 6 --replication-factor 3
1個生產者-單線程&無副本:
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R1 --num-records 50000000 --record-size 128 --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
執行腳本說明:
--num-records表示發送消息的數量,即5kw條;
--record-size表示每條消息的大小,即128字節;
--throughput表示吞吐量限制,-1沒有限制;

--producer-props後面的都是生產者配置
1個生產者-單線程&3個副本異步寫入:
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
1個生產者-單線程&3個副本同步寫入:
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=-1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
3個生產者-單線程&3個副本異步寫入:
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
- 發送50億條100個字節大小的消息
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records 5000000000 --record-size 100 --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
消費尺寸的影響--分別嘗試各種不同字節大小消息
for i in 10 100 1000 10000 100000;
do
echo ""
echo $i
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records $((1000*1024*1024/$i)) --record-size $i --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
done;
單個消費者消息能力:
bin/kafka-consumer-perf-test.sh --zookeeper 10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei --messages 50000000 --topic TPC-P6-R3 --threads 1
3個消費者消費能力--在3臺服務器上運行3個消費者:
bin/kafka-consumer-perf-test.sh --zookeeper 10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei --messages 50000000 --topic TPC-P6-R3 --threads 1
生產者&消費者:
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic TPC-P6-R3 --num-records 50000000 --record-size 100 --throughput -1 --producer-props acks=1 bootstrap.servers=10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092 buffer.memory=67108864 batch.size=8196
bin/kafka-consumer-perf-test.sh --zookeeper 10.0.1.1:2181,10.0.1.2:2181,10.0.1.2:2181/afei --messages 50000000 --topic TPC-P6-R3 --threads 1

server config

broker.id=0
port=9092
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/grid/a/dfs-data/kafka-logs,/grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs
num.partitions=8

log.retention.hours=168
log.segment.bytes=536870912
log.cleanup.interval.mins=1
zookeeper.connect=10.0.0.1:2181
zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false
replica.lag.max.messages=10000000

英文原文地址:https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines


分享到:


相關文章: