「大數據」(八十)Spark之Shuffle機制

【導讀:數據是二十一世紀的石油,蘊含巨大價值,這是·情報通·大數據技術系列第[80]篇文章,歡迎閱讀和收藏】

1 基本概念

shuffle : Shuffle 是 MapReduce 框架中的一個特定的 phase ,介於 Map phase 和 Reduce phase 之間,當 Map 的輸出結果要被 Reduce 使用時,輸出結果需要按 key 哈希,並且分發到每一個 Reducer 上去,這個過程就是 shuffle 。

由於 shuffle 涉及到了磁盤的讀寫和網絡的傳輸,因此 shuffle 性能的高低直接影響到了整個程序的運行效率。下面這幅圖清晰地描述了 MapReduce 算法的整個流程,其中 shuffle phase 是介於 Map phase 和 Reduce phase 之間作為大數據離線計算平臺中最重要的模塊,其性能直接影響作業的運行時效和平臺資源消耗。 主要工作是從 Map 結束到 Reduce 開始之間的過程。 shuffle 階段又可以分為 Map 端的 shuffle 和 Reduce 端的 shuffle 。

「大數據」(八十)Spark之Shuffle機制

2 術語解釋

2.1 Map 端的 Shuffler

每個 map task 都有一個內存緩衝區(如上圖中的 buffer in memory 默認為 100MB ),存儲著 map 的輸出數據 ( 格式為 Key-Value 鍵值對 ) ,當緩衝區快滿的時候,需要將緩衝區中的數據以一個臨時文件的方式存放到磁盤上 , 溢寫是單線程來完成的,他不影響往該緩存中寫 map 結果的線程。溢寫線程啟動時不應該阻止 map 的結果輸出,所以整個緩衝區有個溢寫的比例 spill.percent, 這個比例默認是 0.8 ,也就是當緩衝區的數據已經達到閾值( buffer size * spill percent = 100MB * 0.8 = 80MB ),溢寫線程啟動,鎖定這 80MB 的內存,執行溢寫過程。 Map task 的輸出結果還可以往剩下的 20MB 內存中寫,互不影響。這裡需要注意在寫入磁盤之前,需要進行 partition,sort , combine 操作。隨著數據不斷地輸出,溢寫的文件越來越多,還要在磁盤上進行合併操作 , 最後合成一個溢出文件。

2.1.1 partition

每個輸出數據要經過 partition 這個程序得到一個分區號。默認情況下, partition 根據每一對 Key-Value 鍵值對的鍵的哈希值對 reduce 的個數取模得到一個分區號( hashcode%reduce 個數)。 partition 的目的是把數據分配到一個 reduce task 中去。

2.1.2 sort

sort 程序的目的是排序,排序的目標是 map 輸出的數據。默認情況下,按照鍵值對的 key 的 ASCII 碼值進行排序(也就是字典排序)。

2.1.3 combine

combine 函數是將具有相同 Key 的多個 Key-Value 鍵值對合併成一個鍵值對,這裡需要注意,若在 map 端使用 combine 操作,則首先需要在 map 中也調用 group 程序(在後面將會被介紹),因為在 combine 中需要用到相同的鍵。這樣做的目的是為了減少網絡傳輸。

2.2 Reduce 端的 Shuffler

2.2.1 fetch

Reduce 端按照各個 Map 端的 partition 程序得出的分區號進行抓取,抓取的數據同樣存在於內存中。

2.2.2 sort

將 Reduce 抓到的數據再次進行排序,排序調用的還是 Map 端使用的 Sort 程序。

2.2.3 group

接下來進行的是分組,默認的分組模式是根據每個數據( Key-Value )的 Key 是否相同進行分組的。鍵相等就在同一個組中。每一組數據傳給 Reduce 程序中(每組數據的特點都是相同的 Key 鍵)。

2.3 MapReduce 的 split 大小

分片大小要趨向與一個 HDFS 的一個數據塊的大小,默認是 64MB ,這裡我們需要敘述一下為何要趨向與一個數據塊的大小:

hadoop 在存儲輸入數據的節點上運行 map 任務,可以獲得最佳性能。這就是所謂的 “ 數據本地化優化 ” ,因為它無需使用寶貴的集群帶寬資源。分片趨向於數據塊大小的目的也是為了節省集群帶寬資源, hdfs 數據塊的大小就等於存儲在單個節點上的最大傳輸輸入塊的大小。若分片大於 HDFS 數據塊(跨越兩個數據塊),對於任何一個節點,基本上都不可能同時存儲這兩個數據塊,因此分片中的部分數據需要通過網絡傳輸到 map 任務節點,這樣就要消耗集群帶寬資源。

split 大小是根據以下算法確定的:

- max.split(100M)
- min.split(10M)
- block(64M)
- max(min.split,min(max.split,block))( 任何一個 split 都不能大於 block )。


分享到:


相關文章: