MapReduce的shuffle過程詳解(分片、分區、合併、歸併,)

shuffle過程

shuffle概念


MapReduce的shuffle過程詳解(分片、分區、合併、歸併,)

shuffle的本意是洗牌、混洗的意思,把一組有規則的數據儘量打亂成無規則的數據。而在MapReduce中,shuffle更像是洗牌的逆過程,指的是將map端的無規則輸出按指定的規則“打亂”成具有一定規則的數據,以便reduce端接收處理。其在MapReduce中所處的工作階段是map輸出後到reduce接收前,具體可以分為map端和reduce端前後兩個部分。在shuffle之前,也就是在map階段,MapReduce會對要處理的數據進行分片(split)操作,為每一個分片分配一個MapTask任務。接下來map()函數會對每一個分片中的每一行數據進行處理得到鍵值對(key,value),其中key為偏移量,value為一行的內容。此時得到的鍵值對又叫做“中間結果”。此後便進入shuffle階段,由此可以看出shuffle階段的作用是處理“中間結果”。

此處應該想一下,為什麼需要shuffle,它的作用是什麼?

在瞭解shuffle的具體流程之前,應先對以下兩個概念有所瞭解:

block塊(物理劃分)

block是HDFS中的基本存儲單位,hadoop1.x默認大小為64M而hadoop2.x默認塊大小為128M。文件上傳到HDFS,就要劃分數據成塊,這裡的劃分屬於物理的劃分(實現機制也就是設置一個read方法,每次限制最多讀128M的數據後調用write進行寫入到hdfs),塊的大小可通過 dfs.block.size配置。block採用冗餘機制保證數據的安全:默認為3份,可通過dfs.replication配置。注意:當更改塊大小的配置後,新上傳的文件的塊大小為新配置的值,以前上傳的文件的塊大小為以前的配置值。

split分片(邏輯劃分)

Hadoop中split劃分屬於邏輯上的劃分,目的只是為了讓map task更好地獲取數據。split是通過hadoop中的InputFormat接口中的getSplit()方法得到的。那麼,split的大小具體怎麼得到呢?

首先介紹幾個數據量:

totalSize:整個mapreduce job輸入文件的總大小。

numSplits:來自job.getNumMapTasks(),即在job啟動時用戶利用 org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設置的值,從方法的名稱上看,是用於設置map的個數。但是,最終map的個數也就是split的個數並不一定取用戶設置的這個值,用戶設置的map個數值只是給最終的map個數一個提示,只是一個影響因素,而不是決定因素。

goalSize:totalSize/numSplits,即期望的split的大小,也就是每個mapper處理多少的數據。但也僅僅是期望。

minSize:split的最小值,該值可由兩個途徑設置:

  • 1.通過子類重寫方法protected void setMinSplitSize(long minSplitSize)進行設置。一般情況為1,特殊情況除外
  • 2.通過配置文件中的mapred.min.split.size進行設置

    最終取兩者中的最大值!

split計算公式finalSplitSize=max(minSize,min(goalSize,blockSize))

shuffle流程概括

因為頻繁的磁盤I/O操作會嚴重的降低效率,因此“中間結果”不會立馬寫入磁盤,而是優先存儲到map節點的“環形內存緩衝區”,在寫入的過程中進行分區(partition),也就是對於每個鍵值對來說,都增加了一個partition屬性值,然後連同鍵值對一起序列化成字節數組寫入到緩衝區(緩衝區採用的就是字節數組,默認大小為100M)。當寫入的數據量達到預先設置的闕值後(mapreduce.map.io.sort.spill.percent,默認0.80,或者80%)便會啟動溢寫出線程將緩衝區中的那部分數據溢出寫(spill)到磁盤的臨時文件中,並在寫入前根據key進行排序(sort)和合並(combine,可選操作)。溢出寫過程按輪詢方式將緩衝區中的內容寫到mapreduce.cluster.local.dir屬性指定的目錄中。當整個map任務完成溢出寫後,會對磁盤中這個map任務產生的所有臨時文件(spill文件)進行歸併(merge)操作生成最終的正式輸出文件,此時的歸併是將所有spill文件中的相同partition合併到一起,並對各個partition中的數據再進行一次排序(sort),生成key和對應的value-list,文件歸併時,如果溢寫文件數量超過參數min.num.spills.for.combine的值(默認為3)時,可以再次進行合併

。至此,map端shuffle過程結束,接下來等待reduce task來拉取數據。對於reduce端的shuffle過程來說,reduce task在執行之前的工作就是不斷地拉取當前job裡每個map task的最終結果,然後對從不同地方拉取過來的數據不斷地做merge最後合併成一個分區相同的大文件,然後對這個文件中的鍵值對按照key進行sort排序,排好序之後緊接著進行分組,分組完成後才將整個文件交給reduce task處理。

糾正:分區好像是發生在溢出寫過程之前,也就是當滿足溢出寫條件時,首先進行分區,然後分區內排序,並且選擇性的combine,最後寫出到磁盤。

下圖是shuffle的官方流程圖:

MapReduce的shuffle過程詳解(分片、分區、合併、歸併,)

結合下面三張圖可以清楚地理解shuffle過程

MapReduce的shuffle過程詳解(分片、分區、合併、歸併,)

MapReduce的shuffle過程詳解(分片、分區、合併、歸併,)

shuffle詳細流程

Map端shuffle

①分區partition

②寫入環形內存緩衝區

③執行溢出寫

排序sort--->合併combiner--->生成溢出寫文件

④歸併merge




① 分區Partition

在將map()函數處理後得到的(key,value)對寫入到緩衝區之前,需要先進行分區操作,這樣就能把map任務處理的結果發送給指定的reducer去執行,從而達到負載均衡,避免數據傾斜。MapReduce提供默認的分區類(HashPartitioner),其核心代碼如下:

MapReduce的shuffle過程詳解(分片、分區、合併、歸併,)

getPartition()方法有三個參數,前兩個指的是mapper任務輸出的鍵值對,而第三個參數指的是設置的reduce任務的數量,默認值為1。因為任何整數與1相除的餘數肯定是0。也就是說默認的getPartition()方法的返回值總是0,也就是Mapper任務的輸出默認總是送給同一個Reducer任務,最終只能輸出到一個文件中。如果想要讓mapper輸出的結果給多個reducer處理,那麼只需要寫一個類,讓其繼承Partitioner類,並重寫getPartition()方法,讓其針對不同情況返回不同數值即可。並在最後通過job設置指定分區類和reducer任務數量即可。

②寫入環形內存緩衝區

因為頻繁的磁盤I/O操作會嚴重的降低效率,因此“中間結果”不會立馬寫入磁盤,而是優先存儲到map節點的“環形內存緩衝區”,並做一些預排序以提高效率,當寫入的數據量達到預先設置的闕值後便會執行一次I/O操作將數據寫入到磁盤。每個map任務都會分配一個環形內存緩衝區,用於存儲map任務輸出的鍵值對(默認大小100MB,mapreduce.task.io.sort.mb調整)以及對應的partition,被緩衝的(key,value)對已經被序列化(為了寫入磁盤)。

③執行溢寫出

一旦緩衝區內容達到閾值(mapreduce.map.io.sort.spill.percent,默認0.80,或者80%),就會會鎖定這80%的內存,並在每個分區中對其中的鍵值對按鍵進行sort排序,具體是將數據按照partition和key兩個關鍵字進行排序,排序結果為緩衝區內的數據按照partition為單位聚集在一起,同一個partition內的數據按照key有序。排序完成後會創建一個溢出寫文件(臨時文件),然後開啟一個後臺線程把這部分數據以一個臨時文件的方式溢出寫(spill)到本地磁盤中(如果客戶端自定義了Combiner(相當於map階段的reduce),則會在分區排序後到溢寫出前自動調用combiner,將相同的key的value相加,這樣的好處就是減少一些到磁盤的數據量。這個過程叫“合併”)。剩餘的20%的內存在此期間可以繼續寫入map輸出的鍵值對。溢出寫過程按輪詢方式將緩衝區中的內容寫到mapreduce.cluster.local.dir屬性指定的目錄中。

合併Combiner

如果指定了Combiner,可能在兩個地方被調用:

1.當為作業設置Combiner類後,緩存溢出線程將緩存存放到磁盤時,就會調用;

2.緩存溢出的數量超過mapreduce.map.combine.minspills(默認3)時,在緩存溢出文件合併的時候會調用

合併(Combine)和歸併(Merge)的區別:

兩個鍵值對和,如果合併,會得到,如果歸併,會得到>

特殊情況:當數據量很小,達不到緩衝區闕值時,怎麼處理?

對於這種情況,目前看到有兩種不一樣的說法:

①不會有寫臨時文件到磁盤的操作,也不會有後面的合併。

②最終也會以臨時文件的形式存儲到本地磁盤

至於真實情況是怎麼樣的,我還不清楚。。。

④歸併merge

當一個map task處理的數據很大,以至於超過緩衝區內存時,就會生成多個spill文件。此時就需要對同一個map任務產生的多個spill文件進行歸併生成最終的一個已分區且已排序的大文件。配置屬性mapreduce.task.io.sort.factor控制著一次最多能合併多少流,默認值是10。這個過程包括排序和合並(可選),歸併得到的文件內鍵值對有可能擁有相同的key,這個過程如果client設置過Combiner,也會合並相同的key值的鍵值對(根據上面提到的combine的調用時機可知)。

溢出寫文件歸併完畢後,Map將刪除所有的臨時溢出寫文件,並告知NodeManager任務已完成,只要其中一個MapTask完成,ReduceTask就開始複製它的輸出(Copy階段分區輸出文件通過http的方式提供給reducer)

壓縮

寫磁盤時壓縮map端的輸出,因為這樣會讓寫磁盤的速度更快,節約磁盤空間,並減少傳給reducer的數據量。默認情況下,輸出是不壓縮的(將mapreduce.map.output.compress設置為true即可啟動)



Reduce端shuffle

①複製copy

②歸併merge

③reduce

結合下面這張圖可以直觀感受reduce端的shuffle過程


MapReduce的shuffle過程詳解(分片、分區、合併、歸併,)

①複製copy

Reduce進程啟動一些數據copy線程,通過HTTP方式請求MapTask所在的NodeManager以獲取輸出文件。

NodeManager需要為分區文件運行reduce任務。並且reduce任務需要集群上若干個map任務的map輸出作為其特殊的分區文件。而每個map任務的完成時間可能不同,因此只要有一個任務完成,reduce任務就開始複製其輸出。

reduce任務有少量複製線程,因此能夠並行取得map輸出。默認線程數為5,但這個默認值可以通過mapreduce.reduce.shuffle.parallelcopies屬性進行設置。

【Reducer如何知道自己應該處理哪些數據呢?】

因為Map端進行partition的時候,實際上就相當於指定了每個Reducer要處理的數據(partition就對應了Reducer),所以Reducer在拷貝數據的時候只需拷貝與自己對應的partition中的數據即可。每個Reducer會處理一個或者多個partition。

【reducer如何知道要從哪臺機器上去的map輸出呢?】

map任務完成後,它們會使用心跳機制通知它們的application master、因此對於指定作業,application master知道map輸出和主機位置之間的映射關係。reducer中的一個線程定期詢問master以便獲取map輸出主機的位置。知道獲得所有輸出位置。

②歸併merge

Copy 過來的數據會先放入內存緩衝區中,這裡的緩衝區大小要比 map 端的更為靈活,它基於 JVM 的 heap size 設置,因為 Shuffle 階段 Reducer 不運行,所以應該把絕大部分的內存都給 Shuffle 用。

Copy過來的數據會先放入內存緩衝區中,如果內存緩衝區中能放得下這次數據的話就直接把數據寫到內存中,即內存到內存merge。Reduce要向每個Map去拖取數據,在內存中每個Map對應一塊數據,當內存緩存區中存儲的Map數據佔用空間達到一定程度的時候,開始啟動內存中merge,把內存中的數據merge輸出到磁盤上一個文件中,即內存到磁盤merge。與map端的一些類似,在將buffer中多個map輸出合併寫入磁盤之前,如果設置了Combiner,則會化簡壓縮合並的map輸出。Reduce的內存緩衝區可通過mapred.job.shuffle.input.buffer.percent配置,默認是JVM的heap size的70%。內存到磁盤merge的啟動門限可以通過mapred.job.shuffle.merge.percent配置,默認是66%。

當屬於該reducer的map輸出全部拷貝完成,則會在reducer上生成多個文件(如果拖取的所有map數據總量都沒有內存緩衝區,則數據就只存在於內存中),這時開始執行合併操作,即磁盤到磁盤merge,Map的輸出數據已經是有序的,Merge進行一次合併排序,所謂Reduce端的sort過程就是這個合併的過程,採取的排序方法跟map階段不同,因為每個map端傳過來的數據是排好序的,因此眾多排好序的map輸出文件在reduce端進行合併時採用的是歸併排序,針對鍵進行歸併排序。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的。最終Reduce shuffle過程會輸出一個整體有序的數據塊。

③reduce

當一個reduce任務完成全部的複製和排序後,就會針對已根據鍵排好序的Key構造對應的Value迭代器。這時就要用到分組,默認的根據鍵分組,自定義的可是使用 job.setGroupingComparatorClass()方法設置分組函數類。對於默認分組來說,只要這個比較器比較的兩個Key相同,它們就屬於同一組,它們的 Value就會放在一個Value迭代器,而這個迭代器的Key使用屬於同一個組的所有Key的第一個Key。

在reduce階段,reduce()方法的輸入是所有的Key和它的Value迭代器。此階段的輸出直接寫到輸出文件系統,一般為HDFS。如果採用HDFS,由於NodeManager也運行數據節點,所以第一個塊副本將被寫到本地磁盤。

1、當reduce將所有的map上對應自己partition的數據下載完成後,reducetask真正進入reduce函數的計算階段。由於reduce計算時同樣是需要內存作為buffer,可以用mapreduce.reduce.input.buffer.percent(default 0.0)(源代碼MergeManagerImpl.java:674行)來設置reduce的緩存。

這個參數默認情況下為0,也就是說,reduce是全部從磁盤開始讀處理數據。如果這個參數大於0,那麼就會有一定量的數據被緩存在內存並輸送給reduce,當reduce計算邏輯消耗內存很小時,可以分一部分內存用來緩存數據,可以提升計算的速度。所以默認情況下都是從磁盤讀取數據,如果內存足夠大的話,務必設置該參數讓reduce直接從緩存讀數據,這樣做就有點Spark Cache的感覺。

2、Reduce在這個階段,框架為已分組的輸入數據中的每個鍵值對對調用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法。Reduce任務的輸出通常是通過調用 OutputCollector.collect(WritableComparable,Writable)寫入文件系統的。


分享到:


相關文章: