1 算子調優一:mapPartitions
普通的map算子對RDD中的每一個元素進行操作,而mapPartitions算子對RDD中每一個分區進行操作。如果是普通的map算子,假設一個partition有1萬條數據,那麼map算子中的function要執行1萬次,也就是對每個元素進行操作。
圖2-3 map算子
如果是mapPartition算子,由於一個task處理一個RDD的partition,那麼一個task只會執行一次function,function一次接收所有的partition數據,效率比較高。
圖2-4 mapPartitions算子
比如,當要把RDD中的所有數據通過JDBC寫入數據,如果使用map算子,那麼需要對RDD中的每一個元素都創建一個數據庫連接,這樣對資源的消耗很大,如果使用mapPartitions算子,那麼針對一個分區的數據,只需要建立一個數據庫連接。
mapPartitions算子也存在一些缺點:對於普通的map操作,一次處理一條數據,如果在處理了2000條數據後內存不足,那麼可以將已經處理完的2000條數據從內存中垃圾回收掉;但是如果使用mapPartitions算子,但數據量非常大時,function一次處理一個分區的數據,如果一旦內存不足,此時無法回收內存,就可能會OOM,即內存溢出。
因此,mapPartitions算子適用於數據量不是特別大的時候,此時使用mapPartitions算子對性能的提升效果還是不錯的。(當數據量很大的時候,一旦使用mapPartitions算子,就會直接OOM)
在項目中,應該首先估算一下RDD的數據量、每個partition的數據量,以及分配給每個Executor的內存資源,如果資源允許,可以考慮使用mapPartitions算子代替map。
2 算子調優二:foreachPartition優化數據庫操作
在生產環境中,通常使用foreachPartition算子來完成數據庫的寫入,通過foreachPartition算子的特性,可以優化寫數據庫的性能。
如果使用foreach算子完成數據庫的操作,由於foreach算子是遍歷RDD的每條數據,因此,每條數據都會建立一個數據庫連接,這是對資源的極大浪費,因此,對於寫數據庫操作,我們應當使用foreachPartition算子。
與mapPartitions算子非常相似,foreachPartition是將RDD的每個分區作為遍歷對象,一次處理一個分區的數據,也就是說,如果涉及數據庫的相關操作,一個分區的數據只需要創建一次數據庫連接,如圖2-5所示:
圖2-5 foreachPartition算子
使用了foreachPartition算子後,可以獲得以下的性能提升:
- 對於我們寫的function函數,一次處理一整個分區的數據;
- 對於一個分區內的數據,創建唯一的數據庫連接;
- 只需要向數據庫發送一次SQL語句和多組參數;
在生產環境中,全部都會使用foreachPartition算子完成數據庫操作。foreachPartition算子存在一個問題,與mapPartitions算子類似,如果一個分區的數據量特別大,可能會造成OOM,即內存溢出。
3 算子調優三:filter與coalesce的配合使用
在Spark任務中我們經常會使用filter算子完成RDD中數據的過濾,在任務初始階段,從各個分區中加載到的數據量是相近的,但是一旦進過filter過濾後,每個分區的數據量有可能會存在較大差異,如圖2-6所示:
圖2-6 分區數據過濾結果
根據圖2-6我們可以發現兩個問題:
- 每個partition的數據量變小了,如果還按照之前與partition相等的task個數去處理當前數據,有點浪費task的計算資源;
- 每個partition的數據量不一樣,會導致後面的每個task處理每個partition數據的時候,每個task要處理的數據量不同,這很有可能導致數據傾斜問題。
如圖2-6所示,第二個分區的數據過濾後只剩100條,而第三個分區的數據過濾後剩下800條,在相同的處理邏輯下,第二個分區對應的task處理的數據量與第三個分區對應的task處理的數據量差距達到了8倍,這也會導致運行速度可能存在數倍的差距,這也就是數據傾斜問題。
針對上述的兩個問題,我們分別進行分析:
- 針對第一個問題,既然分區的數據量變小了,我們希望可以對分區數據進行重新分配,比如將原來4個分區的數據轉化到2個分區中,這樣只需要用後面的兩個task進行處理即可,避免了資源的浪費。
- 針對第二個問題,解決方法和第一個問題的解決方法非常相似,對分區數據重新分配,讓每個partition中的數據量差不多,這就避免了數據傾斜問題。
那麼具體應該如何實現上面的解決思路?我們需要coalesce算子。
repartition與coalesce都可以用來進行重分區,其中repartition只是coalesce接口中shuffle為true的簡易實現,coalesce默認情況下不進行shuffle,但是可以通過參數進行設置。
假設我們希望將原本的分區個數A通過重新分區變為B,那麼有以下幾種情況:
- A > B(多數分區合併為少數分區)
① A與B相差值不大(注:這裡的不大指比如100個分區到80個)
此時使用coalesce即可,無需shuffle過程。
② A與B相差值很大(注:這裡比如100個到2個)
此時可以使用coalesce並且不啟用shuffle過程,但是會導致合併過程性能低下,所以推薦設置coalesce的第二個參數為true,即啟動shuffle過程。
小結:若filter後每個分區間數據量差不多,總量減少,那麼用coalesce即可,但注意不能減到太小,否則會數據傾斜。若filter後分區間數據量差異很大,那麼就要重新shuffle了,用reparation,目的是打散數據重新均勻分佈到各個分區中。coalesce默認狀態下即便傳入的分區數值變大,真實分區數是不會增大的。
- A < B(少數分區分解為多數分區)
此時使用repartition即可,如果使用coalesce需要將shuffle設置為true,否則coalesce無效。
我們可以在filter操作之後,使用coalesce算子針對每個partition的數據量各不相同的情況,壓縮partition的數量,而且讓每個partition的數據量儘量均勻緊湊,以便於後面的task進行計算操作,在某種程度上能夠在一定程度上提升性能。
注意:local模式是進程內模擬集群運行,已經對並行度和分區數量有了一定的內部優化,因此不用去設置並行度和分區數量。
4 算子調優四:repartition 解決 SparkSQL 低並行度問題
在第一節的常規性能調優中我們講解了並行度的調節策略,但是,並行度的設置對於Spark SQL是不生效的,用戶設置的並行度只對於Spark SQL以外的所有Spark的stage生效。
Spark SQL的並行度不允許用戶自己指定,Spark SQL自己會默認根據hive表對應的HDFS文件的split個數自動設置Spark SQL所在的那個stage的並行度,用戶自己通spark.default.parallelism參數指定的並行度,只會在沒Spark SQL的stage中生效。
由於Spark SQL所在stage的並行度無法手動設置,如果數據量較大,並且此stage中後續的transformation操作有著複雜的業務邏輯,而Spark SQL自動設置的task數量很少,這就意味著每個task要處理為數不少的數據量,然後還要執行非常複雜的處理邏輯,這就可能表現為第一個有Spark SQL的stage速度很慢,而後續的沒有Spark SQL的stage運行速度非常快。
為了解決Spark SQL無法設置並行度和task數量的問題,我們可以使用repartition算子。
圖2-7 repartition算子使用前後對比圖
Spark SQL這一步的並行度和task數量肯定是沒有辦法去改變了,但是,對於Spark SQL查詢出來的RDD,立即使用repartition算子,去重新進行分區,這樣可以重新分區為多個partition,從repartition之後的RDD操作,由於不再涉及Spark SQL,因此stage的並行度就會等於你手動設置的值,這樣就避免了Spark SQL所在的stage只能用少量的task去處理大量數據並執行復雜的算法邏輯。使用repartition算子的前後對比如圖2-7所示。
5 算子調優五:reduceByKey本地聚合
reduceByKey相較於普通的shuffle操作一個顯著的特點就是會進行map端的本地聚合,map端會先對本地的數據進行combine操作,然後將數據寫入給下個stage的每個task創建的文件中,也就是在map端,對每一個key對應的value,執行reduceByKey算子函數。reduceByKey算子的執行過程如圖2-8所示:
圖2-8 reduceByKey算子執行過程
使用reduceByKey對性能的提升如下:
- 本地聚合後,在map端的數據量變少,減少了磁盤IO,也減少了對磁盤空間的佔用;
- 本地聚合後,下一個stage拉取的數據量變少,減少了網絡傳輸的數據量;
- 本地聚合後,在reduce端進行數據緩存的內存佔用減少;
- 本地聚合後,在reduce端進行聚合的數據量減少。
基於reduceByKey的本地聚合特徵,我們應該考慮使用reduceByKey代替其他的shuffle算子,例如groupByKey。reduceByKey與groupByKey的運行原理如圖2-9和圖2-10所示:
圖2-9 groupByKey原理
圖2-10 reduceByKey原理
根據上圖可知,groupByKey不會進行map端的聚合,而是將所有map端的數據shuffle到reduce端,然後在reduce端進行數據的聚合操作。由於reduceByKey有map端聚合的特性,使得網絡傳輸的數據量減小,因此效率要明顯高於groupByKey。
閱讀更多 kane0409 的文章