Spark性能調優知識點分享

Spark性能調優知識點分享

1、大數據性能調優的本質

編程的時候發現一個驚人的規律,軟件是不存在的!所有編程高手級別的人無論做什麼類型的編程,最終思考的都是硬件方面的問題!最終思考都是在一秒、一毫秒、甚至一納秒到底是如何運行的,並且基於此進行算法實現和性能調優,最後都是回到了硬件!

在大數據性能的調優,它的本質是硬件的調優!即基於 CPU(計算)、Memory(存儲)、IO-Disk/ Network(數據交互) 基礎上構建算法和性能調優!我們在計算的時候,數據肯定是存儲在內存中的。磁盤IO怎麼去處理和網絡IO怎麼去優化。

2、Spark性能調優要點分析

在大數據性能本質的思路上,我們應該需要在那些方面進行調優呢?比如:

  • 並行度

  • 壓縮

  • 序例化

  • 數據傾斜

  • JVM調優 (例如 JVM 數據結構化優化)

  • 內存調優

  • Task性能調優 (例如包含 Mapper 和 Reducer 兩種類型的 Task)

  • Shuffle 網絡調優 (例如小文件合併)

  • RDD 算子調優 (例如 RDD 複用、自定義 RDD)

  • 數據本地性

  • 容錯調優

  • 參數調優

大數據最怕的就是數據本地性(內存中)和數據傾斜或者叫數據分佈不均衡、數據轉輸,這個是所有分佈式系統的問題!數據傾斜其實是跟你的業務緊密相關的。所以調優 Spark 的重點一定是在數據本地性和數據傾斜入手。

  • 資源分配和使用:你能夠申請多少資源以及如何最優化的使用計算資源

  • 關發調優:如何基於 Spark 框架內核原理和運行機制最優化的實現代碼功能

  • Shuffle調優:分佈式系統必然面臨的殺手級別的問題

  • 數據傾斜:分佈式系統業務本身有數據傾斜

基於以上調優的本質和思路,我們根據經驗總結了以下一些Spark性能調優的知識點。

3、Spark性能調優經驗知識點

1)如果 Spark 中 CPU 的使用率不夠高,可以考慮為當前的程序分配更多的 Executor, 或者增加更多的 Worker 實例來充分的使用多核的潛能。

2.)適當設置 Partition 分片數是非常重要的,過少的 Partition 分片數可能會因為每個 Partition 數據量太大而導致 OOM 以及頻繁的 GC,而過多的 Parition 分片數據可能會因為每個 Partition 數據量太小而導致執行效率低下。

3)提升 Spark 硬件尤其是 CPU 使用率的一個方式 就是增加 Executor 的並行度,但是如果 Executor 過多的話,直接分配 在每個 Executor的內存就大大減少,在內存的操作就減少,基於磁盤的操作就越來越多,導致性能越來越差。

4)處理 Spark Job 的時候如果發現比較容易內存溢出,一個比較有效的辦法是減少並行的 Executor 的數量,這樣每個 Executor 就可以分配到更多的內存,進而增加每個 Task 使用的內存數量,降低 OOM 的風險。

5)處理Spark Job 的時候如果發現比較容易內存溢出,一個比較有效的辦法就是增加 Task 的並行度,這樣每個 Task 處理的 Partition 的數量就變少了,減少了 OOM的可能性。

6)處理Spark Job 的時候如果發現某些 Task 運行得特別慢,一個處理辦法是增加並行的 Executor 的個數,這樣每個 Executor 分配 的計算資源就變少了,可以提升硬件的整體使用效率。另一個辦法是增加 Task 的並行度,減少每個 Partition 的數據量來提高執行效率。

7)處理Spark Job 的時候如果出現特別多的小文件,這時候就可以通過 coalesce 來減少 Partition 的數量,進而減少並行運算的 Task 的數量來減少過多任務的開闢,從而提升硬件的使用效率。

8)默認情況下 Spark 的 Executor 會盡可能佔用當前機器上儘量多的 Core,這樣帶來一個好處就是可以最大化的提高計算的並行度,減少一個 Job 中任務 運行的批次,但帶來一個風險就是如果每個 Task 佔用內存比較大,就需要頻繁的 spill over 或者有更多的 OOM 的風險。

9)Spark 集群在默認情況每臺 host 上只有一個 Worker, 而每個 Worker 默認只會為當前應用程序分配一個 Executor來執行 Task,但實際上通過配置 Spark-env.sh 可以讓每臺 host 上有若干的 Worker, 而每個 Worker 下面又可以有若干個 Executor。

10)Spark Stage 內部是一組計算邏輯完全相同但處理數據不同的分佈式並行運行的 Task 構成, Stage 內部的計算都以 Pipeline 的方式進行,不同的 Stage之間是產生 Shuffler 的唯一方式。

11)在Spark 中可以考慮在 Worker 節點上使用固態硬盤以及把 Worker 的 Shuffle 結構保存到 RAMDisk 的方式來極大的提高性能。

12)當經常發現機器頻繁的 OOM 的時候,可以考慮的一種方式就是減少並行度,這樣同樣的內存空間並行運算的任務 少了,那麼對內存的佔用就更少了,也就減少了 OOM 的可能性。

4、Spark 性能優化核心基石

Spark 採用的是 Master-Slaves 的模式進行資源管理和任務執行的管理:

  • 資源管理: Master-Workers, 在一臺機器上可以有多個 Workers;

  • 任務執行: Driver-Executors,當在一臺機器上分配多個 Workers 的時候那麼默認情況下每個 Worker 都會為當前運行的應用程序分配一個 Executor,但是我們可以修改配置來讓每個 Worker 為我們當前的應用 程序分配若干個 Executors; 程序運行的時候會被劃分成為若干個 Stages(Stages內部沒有 Shuffle,遇到 Shuffle 的時候會劃分 Stage),每個 Stage裡面包含若干個處理邏輯完全一樣只是處理數據不一樣的 Task, 這些 Task 會被分配到 Executor 上去並行執行。

5、Spark性能優化招式

1)Broadcast 優化

如果 Task 在運行的過程中使用超過 20KB 大小的靜態大對象,這個時候一般都要考慮使用 Broadcast。例如一個大表 Join 一個小表,此時如果使用 Broadcast 把小表廣播出去,這時候大表就只需在自己的節點等待小表數據的到來。

2)Task 性能優化

  • 慢任務的性能優化:可以考慮減少每個 Partition 處理的數據量。同時建議開啟 spark.speculation。

  • 儘量減少 Shuffle, 例如我們要儘量減少 groupByKey 操作,因為 groupByKey 會要求通過網絡拷貝(shuffle) 所有的數據,優先考慮使用 reduceByKey。因為 reduceByKey 會首先 reduce locally,然後再拷貝。

3)數據傾斜

  • 定義更加合理的 Key(或者說自定義 Partitioner)

  • 可以考慮使用 ByteBuffer 來存儲 Block

4)網絡

  • 可以考慮 Shuffle 的數據放在 alluxio (前身 Tackyon) 中帶來更好的數據本地性,減少網絡的 Shuffler

  • 優先採用 Netty (Spark 2.X 的默認方式)的方式進行網絡通信

  • mapPartitions 中的函數在一個 Partition 裡作用一次

5)數據結構

  • Java的對象。對象頭是16個字節(例如指向對象的指針等元數據),如果對象中只有一個 int 的 property,則此時會佔據 20 個字節,也就是說對象的元數據佔用了大部分的空間,所有在封裝數據的時候儘量不要使用對象。例如說使用 Json 格式來狀封裝數據

  • Java 的 String 在實際佔用內存方面要額外使用 40 個字節(內部使用 char 數組來保存字符),另外需要注意的是 String 中每個字符是2個字節(UTF-16),如果內部有5個字符的話,實際上會佔用50個字節。

  • Java中的集合List、Map 等等,其內部一般使用鏈表來實現。具體的每個數據使用 Entry 等,這些也非常消耗內存

  • Java 中的基本數據類型會自動封箱操作,這會額外增加對象頭的空間佔用。

  • 優先使用原生數據,儘可能不要直接使用 ArrayList、HashMap、LinkedList 等數據結構

  • 優先使用 String (推薦使用 JSON),而不是採用 HashMap、List 等來封裝數據

6)內存消耗診斷

  • JVM 自帶的診斷工具。例如: JMap、JConsole等

  • 在開發、測試、生產環境下用的最多的是日誌。 Driver 端產生的日誌,最簡單也是最有效的方式就是調用 RDD.cache,當進行 cache 操作的時候, Driver 上的 BlockManangerMaster 會記錄該信息並寫進日誌中。

7)persist 和 checkpoint

  • 當反覆使用某個(些)RDD的時候,建議使用 persist 來對數據進行緩存

  • 如果某個步驟的 RDD 計算特別耗時或者經歷了很多步的計算,數據丟失的話則重新計算的代價比較大,此時考慮使用 checkpoint,因為 checkpoint 是把數據寫入 HDFS 的,天然具有高可靠性

8)序列化和反序列化

發送磁盤IO 和網絡通信的時候會序列化和反序列化,更為重要的考慮序列化和反序列化的時候有另外兩種情況:Persist 的時候和編程的時候,使用算子的函數操作如果傳入了外部數據就必須序列化和反序列化。

  • Spark 的序列化機制默認使用 Java 自帶的序列化機制(其實現類是 ObjectInputStream 和 ObjectOutputStream)。效率較低,強烈建議使用 Kryo 序列化機制 ,它比 Java 的序列化節省近 10 倍的空間。

  • Spark 中如果我們自定義了 RDD 中的數據元素的類型,則必須實現 Serializable 接口,也可以實現自己的序列化接口(Externalizable)來實現更高效的 Java序列化算法。如果使用 Kryo,則需要把自定義的類註冊給 Kryo。

  • Spark 中 Scala 常用的類型自動的能過 AllScalaRegistry 註冊給了 Kryo 進行序列化管理。

  • Kryo 在序列化的時候緩存空間默認大小是 2MB,可以根據具體的業務模型調整該大小,通過設置 spark.kryoserializer.buffer

  • 在使用 Kryo 的時候,強烈建議註冊時寫完整的包名和類名。

9)數據本地性

  • 如果數據是 PROCESS_LOCAL, 但是此時並沒有空閒的 Core 來運行我們的 Task,此時 Task 就要等待。例如等待3000ms, 3000ms 內如果 Task 不能運行,則退而求其次採用 NODE_LOCAL。同樣的道理 NODE_LOCAL也會有等待時間。

  • 如何配置 Locality呢? 可以統一採用 spark.locality.wait 來設置。也可以分別設置如: spark.locality.wait.node、spark.locality.wait.process 。

10)RDD 的自定義(以 Spark on HBase 為例)

  • 第一步是定義 RDD.getParitions 的實現

    createRelation 具體確定 HBase 的鏈接方式和具體訪問的表

    然後通過 HBase 的API 來獲取 Region 的 List

    可以過濾出有效的數據

    最後返回 Region 的 Array[Partition],也就是說一個 Partition處理一個 Region 的數據,為更佳的數據本地性打下基礎

  • 第二步是 RDD.getPreferredLocations

    根據 Split 包含的 Region 信息來確定 Region 具體在什麼節點上。這樣 Task 在調度的時候就可以優先被分配到 Region 所在的機器上,最大化地提高數據本地性

  • 第三步是 RDD.compute

    根據 Split 中的 Region 等信息調用 HBase 的 API 來進行操作(主要是查詢)

11)Shuffle 性能調優

  • 問題: Shuffle output file lost? 真正的原因一般由 GC 導致的。GC 尤其是Full GC 時通常會導致線程停止工作,這個時候下一個 Stage 的 Task 在默認情況下就會嘗試重試來獲取數據,一般重試3 次,每次重試時間間隔為5S,也就是說默認情況下 15S 內如果還是無法抓到數據的話,就會出現 Shuffle output file lost 等 情況 ,進而導致 Task重試,甚至會導致 Stage 重試,最嚴重的是會導致 App 失敗。在這個時候首先就要採用高效的內存數據結構和序列化機制,JVM 的調優來減少 Full GC 的產生。

  • 在 Shuffle 的時候, Reducer 端獲取數據會有一個指定大小的緩存空間,如果內存不夠,可以適當的增大該緩存空間(通過調整 spark.reducer.maxSizeInFlight),否則會 Spill 到磁盤上,影響效率。

  • 在 Shuffle MapTask 端通常也會增大Map 任務的寫磁盤的緩存。默認值是32K。

  • 調整獲取 Shuffle 數據的重試次數,默認是3次,通常建議增大重試次數。

  • 調整獲取 Shuffle 數據的重試時間間隔,默認是5秒。強烈建議提高該時間。

  • 覺得以上兩點可以看出,默認情況下會有 15 秒的時間,如果GC需要這麼長的時間的話,應該是GC的問題,首先應該是優化GC。

  • 在 Reducer 端做 Aggregation 的時候,默認是 20% 的內存用來做 Aggegation。如果超出了這個大小就會溢出到磁盤上,建議調在百分比來提高性能。

12)鎢絲計劃

  • Tungsten 的內存管理 機制獨立於 JVM, 所以Spark 操作數據的進候具體操作的是 Binary Data,而不是 JVM Object。而且還免去了序列化和反序列化的過程。

  • 內存管理方面: Spark 使用了 sum.misc.Unsafe 來進行 Off-heap 級別的內存分配、指針使用及內存釋放。Spark 為了統一管理 Off-heap 和 On-heap 而提出了 Page

  • 如果想讓程序使用 Tungsten 功能,可以配置 spark.shuffle.manager=tungsten-sort

  • DataFrame 中自動開啟了 Tungsen 功能

  • 寫數據在內存足夠大的情況下是寫到 Page 裡面,在 Page 中有一條條的 Record,如果內存不夠的話會 Spill 到磁盤上。

13)如何看內存是否足夠?兩方面:

  • 系統默認情況下給 ShuffleMapTask 最大準備了多少內存空間。默認情況下是 ExecutorHeapMemory * 0.8 * 0.2(spark.shuffle.memoryFraction=0.2, spark.shuffle.safetyFraction=0.8)。

  • 和 Task 處理的 Partition 大小緊密相關。


分享到:


相關文章: