1. MAPREDUCE原理篇
Mapreduce是一個分佈式運算程序的編程框架 ,是用戶開發“基於hadoop的數據分析應用”的核心框架;
Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分佈式運算程序,併發運行在一個hadoop集群上;
1.1 為什麼要MAPREDUCE
(1)海量數據在單機上處理因為硬件資源限制,無法勝任
(2)而一旦將單機版程序擴展到集群來分佈式運行,將極大增加程序的複雜度和開發難度
(3)引入mapreduce框架後,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分佈式計算中的複雜性交由框架來處理
設想一個海量數據場景下的wordcount需求:
單機版:內存受限,磁盤受限,運算能力受限
分佈式:
- 文件分佈式存儲(HDFS)
- 運算邏輯需要至少分成2個階段(一個階段獨立併發,一個階段匯聚)
- 運算程序如何分發
- 程序如何分配運算任務(切片)
- 兩階段的程序如何啟動?如何協調?
- 整個程序運行過程中的監控?容錯?重試?
可見在程序由單機版擴成分佈式時,會引入大量的複雜工作。為了提高開發效率,可以將分佈式程序中的公共功能封裝成框架,讓開發人員可以將精力集中於業務邏輯。
而mapreduce就是這樣一個分佈式程序的通用框架,其應對以上問題的整體結構如下:
- MRAppMaster(mapreduce application master)
- MapTask
- ReduceTask
1.2MapReduce的框架思想
1.2 MAPREDUCE框架結構及核心運行機制
1.2.1結構
一個完整的mapreduce程序在分佈式運行時有三類實例進程:
1、MRAppMaster:負責整個程序的過程調度及狀態協調
2、mapTask:負責map階段的整個數據處理流程
3、ReduceTask:負責reduce階段的整個數據處理流程
1.2.2 MR程序運行流程
1.2.2.1流程示意圖
1.2.2.2流程解析
1.一個mr程序啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動後根據本次job的描述信息,計算出需要的maptask實例數量,然後向集群申請機器啟動相應數量的maptask進程
2.maptask進程啟動之後,根據給定的數據切片範圍進行數據處理,主體流程為:
1.利用客戶指定的inputformat來獲取RecordReader讀取數據,形成輸入KV對
2.將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算,並將map()方法輸出的KV對收集到緩存
3.將緩存中的KV對按照K分區排序後不斷溢寫到磁盤文件
3.MRAppMaster監控到所有maptask進程任務完成之後,會根據客戶指定的參數啟動相應數量的reducetask進程,並告知reducetask進程要處理的數據範圍(數據分區)
4.Reducetask進程啟動之後,根據MRAppMaster告知的待處理數據所在位置,從若干臺maptask運行所在機器上獲取到若干個maptask輸出結果文件,並在本地進行重新歸併排序,然後按照相同key的KV為一個組,調用客戶定義的reduce()方法進行邏輯運算,並收集運算輸出的結果KV,然後調用客戶指定的outputformat將結果數據輸出到外部存儲
2.MR核心數據處理全過程(包括shuffle)
1)mapreduce中,map階段處理的數據如何傳遞給reduce階段,是mapreduce框架中最關鍵的一個流程,這個流程就叫shuffle;
2)shuffle:洗牌、發牌(核心機制:數據分區、排序、緩存);
3)具體來說:就是將maptask輸出的處理結果數據,分發給reducetask,並在分發的過程中,對數據按key進行了分區和排序(默認也會分區,排序)。
上面的流程是整個mapreduce最全工作流程,具體shuffle過程詳解,如下:
1)maptask收集我們的map()方法輸出的kv對,放到內存緩衝區中
2)從內存緩衝區不斷溢出本地磁盤文件(非HDFS,因為是中間結果沒必要),可能會溢出多個文件
3)多個溢出文件會被合併成大的溢出文件
4)在溢出過程中,及合併的過程中,都要調用partitoner進行分組和針對key進行排序
5)reducetask根據自己的分區號,去各個maptask機器上取相應的結果分區數據
6)reducetask會取到同一個分區的來自不同maptask的結果文件,reducetask會將這些文件再進行合併(歸併排序)
7)合併成大文件後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
注意
1.Shuffle中的緩衝區大小會影響到mapreduce程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快。
緩衝區的大小可以通過參數調整,參數:io.sort.mb 默認100M
2.map任務將輸出的結果時寫到本地磁盤,並不是HDFS系統,因為map輸出的是中間結果,等reduce處理完以後,會從本地磁盤刪除,所以沒有必要上傳到hdfs上,還會形成副本。這也是MapReduce比Spark慢的原因之一,因為Spark是將中間結果緩存到內存裡,再次使用的時候直接加載,比mapreduce還有從本地磁盤讀取快了太多。
3.如果運行map任務的節點在將map輸出的結果傳送給reduce之前就失敗了,那麼Hadoop將在另一個節點上重新運行這個map任務以再次構建新的map中間結果。MapReduce框架好就好在程序員不用擔心繫統失效的問題,因為框架可以檢測到失敗的任務,並重新在正常的機器上啟動執行,這個主要是由MRappmaster進行監控的任務情況。如果失敗讓yarn進行調度分配新的container.
閱讀更多 大數據架構師丨 的文章