數據傾斜的處理

數據傾斜的處理

一、什麼是數據傾斜

分佈式數據處理中一份大的數據會按照key分組分成多個小份的數據放到不同的機器上去並行的處理,這裡的大數據分成多份我們叫做分區

分區最佳的情況就是分出來的數據很均勻,放到各個機器上去,剛好能夠並行的一起跑完。

很差的情況就是分出來不均勻,導致大部分數據跑完了,就留下一兩個還在跑。

這樣導致一下幾個危害:

  1. 最終任務花費的時間很長,失去了分佈式計算的優勢
  2. 不均勻的分配也可能會,讓小的數據一下跑完,資源就空閒了,而大的數據可能會由於資源的不足跑掛掉,最終導致整個任務的失敗。

在spark中的表現:某個stage的有幾個task跑得比大部分task都慢很多,可以在WEB-UI的stage詳情頁觀察到。

首先從Summary Metrics這個塊看到duration(持續時間)的max超出75%百分位很多。

可以看到duration比其他任務要長很多而且還失敗了,這時候我們還觀察到這個失敗的task的shuffle read 比別的task都多,不管從size(佔用空間)還是records(記錄數)上。如下圖


數據傾斜的處理


二、數據傾斜的解決方法

數據傾斜有三種形式得傾斜:

  • 一是分區不均,某幾個分區對應的key太多。多數情況都是這種傾斜。
  • 二是單個key對應的數據量太多
  • 三是單條記錄數據太大(比如數組中的值太多)

2.1 加並行度

這是一種很簡單的處理方案,將分區增多,數據打得更散,充分發揮分佈式的優勢。

但是分區增量task也會增多,帶來的額外的管理成本就更多了,分的太多反而跑得更慢,存儲結果的成本也增加了,不是一個很好的解決方案。

可以在以下幾個地方增加分區。

1.在傾斜的stage之前使用reparation重分區。

2.設置shuffle的並行度,大部分情況都使用這個。

2.2 處理特殊case

這種就比較常見了,經常會發現很多stage跑到剩下一個task死活跑不過或者耗時非常久。傾斜的key我們可以通過groupby key進行count來尋找,一般都是空值、空字符串、還有特別熱點的key。如何處理這就看你的業務需求咯。

2.3 利用小trick打散key

針對第二種傾斜的形式,我們可以在key上加隨機前綴或後綴這樣加鹽的方式來將一個key變成多個key先進行一次shuffle,最後再還原回來。

例如我們需要進行分組統計,但是數據傾斜了,我們可以對key加隨機前綴,把一個key變成多個進行count,最後sum。

def add_random_prefix(key):
return str(randint[0,3])+key
def remove_random_prefix(key):
return key[1:]
df.selectExpr("add_random_prefix(key) key_with_prefix")\
.groupby("key_with_prefix").count()\
.selectExpr("remove_random_prefix(key_with_prefix) key")\
.groupby("key").sum()

這種方式比較麻煩特別是在join的情況下,要考慮的東西比較多。

加鹽的方式也會數據量不是那麼多的key也打的更散了,計算起來有點浪費資源。

2.4 自定義分區方案

這種就更高端了些,需要自己去實現一個partitioner,不多說,還不如構造key來實現自定義分區。

三、總結

我們探討了什麼是數據傾斜,如何判別,如何解決數據傾斜,是不是又收穫了很多呢。

可以關注我其他文章看看spark是如何分區進行計算的


分享到:


相關文章: