DAGScheduler-階段劃分

DAGScheduler-階段劃分

Spark調度階段的劃分由DAGScheduler實現的,DAGScheduler會從最後一個RDD出發,使用廣度優先遍歷整個依賴樹,從而劃分調度階段,調度階段劃分依據是以操作是否為寬依賴(ShuffleDependency)進行的,即當某個RDD的操作是Shuffle時,以該Shuffle操作為界限劃分成前後兩個調度階段。

createResultStage

在DAGScheduler的handleJobSubmitted方法中根據最後一個RDD生成ResultStage,通過createResultStage創建finalStage,傳入的參數包括最後一個finalRDD,操作的函數func,分區partitions、jobId、callSite等內容。

createResultStage源碼,位於org.apache.spark.scheduler.DAGScheduler#createResultStage

 private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

createResultStage中,基於作業Id,作業的ID(jobId)是作為第三個參數轉進來的,創建了ResultStage。createResultStage的getOrCreateParentStages獲取或創建一個給定RDD的父Stages列表。

getOrCreateParentStages源碼,位於org.apache.spark.scheduler.DAGScheduler#getOrCreateParentStages

 private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}

getShuffleDependencies

getOrCreateParentStages方法調用了getShuffleDependencies(rdd),getShuffleDependencies返回給定RDD的父節點中直接的shuffle依賴。這個函數不會返回更遠祖先節點的依賴。

根據Shuffle依賴生成一個新的Stage。如果不是Shuffle級別的依賴,就將其加入waitingForVisit.push(dependency.rdd),waitingForVisit是一個棧Stack,把當前依賴的RDD入棧。然後進行循環遍歷,將waitingForVisit.pop()的內容彈出來放入到toVisit,如果沒有訪問過,就判斷toVisit.dependencies,如果是Shuffle依賴,加入到parents結構;如果是窄依賴,就加入到waitingForVisit。

getShuffleDependencies源碼,位於org.apache.spark.scheduler.DAGScheduler#getShuffleDependencies

 private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}

已上圖為例:

RDD G同時依賴於RDD B,RDD F;根據依賴關係,RDD B 和RDD G在同一個Stage裡,RDD G和RDD F不在同一個Stage裡。

  • 首先將RDD G放入到waitingForVisit,然後看RDD G的依賴關係,依賴於RDD B和RDD F;
  • RDD G和RDD F構成寬依賴,所以加入父Stage裡,是一個新的Stage
  • RDD G和RDD B構成窄依賴,把RDD B放入到棧waitingForVisit中,RDD G和RDD B在同一個Stage中。
  • 從棧waitingForVisit中將RDD B出棧,獲得寬依賴RDD A,將構成一個新的Stage。
  • RDD G的getShuffleDependencies最終返回HashSet(ShuffleDependency(RDD F), ShuffleDependency(RDD A))
  • 然後getShuffleDependencies(rdd).map遍歷調用getOrCreateShuffleMapStage直接創建父Stage。

getOrCreateShuffleMapStage

getOrCreateParentStages方法通過getShuffleDependencies(rdd).map進行map轉換是調用了getOrCreateShuffleMapStage方法。如果在shuffleIdToMapStage數據結構中shuffleId已經存在,那麼就獲取shuffle map stage;否則,shuffle map stage不存在,處理即將進行的更遠祖先節點的shuffle map stage,還將創建一個自己的shuffle map stage。

getOrCreateShuffleMapStage源碼,位於org.apache.spark.scheduler.DAGScheduler#getOrCreateShuffleMapStage

 private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage

case None =>
// Create stages for all missing ancestor shuffle dependencies.
// 創建所有即將計算的祖先shuffle依賴的階段
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
// 儘管getMissingAncestorShuffleDeoendencies值返回shuffle的依賴,其以不在shuffleIdToMapStage中。
//
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
// 為給定的shuffle依賴創建一個階段。
createShuffleMapStage(shuffleDep, firstJobId)
}
}

getOrCreateShuffleMapStage方法中:

  • 如果根據shuffleId匹配獲取到Stage,就返回Stage。首先從shuffleIdToMapStage中根據shuffleId獲取Stage。shuffleIdToMapStage是一個HashMap數據結構,將ShuffleDependency ID對應到ShuffleMapStage的映射關係,shuffleIdToMapStage只包含當前運行作業的映射數據,當Shuffle Stage作業完成時,Shuffle映射數據將被刪除,Shuffle的數據將記錄在MapOutputTracker中。
  • 如果根據shuffleId沒有獲取到Stage,調用getMissingAncestorShuffleDependencies方法,createShuffleMapStage創建所有即將進行計算的祖先shuffle依賴的Stage

getMissingAncestorShuffleDependencies方法查找shuffle依賴中還沒有進行shuffleToMapStage註冊的祖先節點。getMissingAncestorShuffleDependencies源碼,位於org.apache.spark.scheduler.DAGScheduler#getMissingAncestorShuffleDependencies

 private def getMissingAncestorShuffleDependencies(
rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val ancestors = new Stack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
// 手動維護堆棧,防止堆棧溢出
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
getShuffleDependencies(toVisit).foreach { shuffleDep =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
ancestors.push(shuffleDep)
waitingForVisit.push(shuffleDep.rdd)
} // Otherwise, the dependency and its ancestors have already been registered.
}
}
}
ancestors
}

createShuffleMapStage方法根據Shuffle依賴的分區創建一個ShuffleMapStage,如果前一個Stage已生成相同的Shuffle數據,那Shuffle數據仍是可用的,createShuffleMapStage方法將複製Shuffle數據的位置信息取獲取數據,無須重新生成一次數據。

createShuffleMapStage源碼,位於org.apache.spark.scheduler.DAGScheduler#createShuffleMapStage

 def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)

stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)

if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// A previously run stage generated partitions for this shuffle, so for each output
// that's still available, copy information about that output location to the new stage
// (so we don't unnecessarily re-compute that data).
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
DAGScheduler-階段劃分

在DAG劃分Stage示意圖彙總,對RDD G調用createResultStage,通過getOrCreateParentStages獲取所有父List[Stage]:Stage1、Stage2,然後創建自己的Stage3

  • getOrCreateParentStages:獲取或創建給定RDD的父RDD列表。在示意圖中RDD G在getOrCreateParentStages方法中調用getShuffleDependencies獲取RDD G所有直接寬依賴集合HashSet(ShuffleDependency(RDD F), ShuffleDependency(RDD A)),這裡是RDD F和RDD A的寬依賴集合,然後遍歷集合,對(ShuffleDependency(RDD F), ShuffleDependency(RDD A))分別調用getOrCreateShuffleMapStage
  • 對ShuffleDependency(RDD A)調用getOrCreateShuffleMapStage,getOrCreateShuffleMapStage中根據shuffleDep.shuffleId模式匹配調用getMissingAncestorShuffleDependencies查找ShuffleDependency(RDD A)祖先的Shuffle依賴,返回為空;對ShuffleDependency(RDD A)調用createShuffleMapStage, RDD A沒有父Stage,創建Stage 1
  • 對ShuffleDependency(RDD F)調用getOrCreateShuffleMapStage,getOrCreateShuffleMapStage中根據shuffleDep.shuffleId模式匹配調用getMissingAncestorShuffleDependencies查找ShuffleDependency(RDD F)祖先的Shuffle依賴,返回為空;對ShuffleDependency(RDD F)調用createShuffleMapStage, RDD F之前的RDD C到RDD D、RDD F,RDD E到RDD F之間沒有Shuffle,沒有寬依賴不會產生Stage。因此RDD F沒有父Stage,創建Stage 2
  • 最後把List(Stage 1, Stage 2)作為Stage 3的父Stage,創建Stage 3。Stage 3 是ResultStage。


分享到:


相關文章: