DAGScheduler-阶段划分

DAGScheduler-阶段划分

Spark调度阶段的划分由DAGScheduler实现的,DAGScheduler会从最后一个RDD出发,使用广度优先遍历整个依赖树,从而划分调度阶段,调度阶段划分依据是以操作是否为宽依赖(ShuffleDependency)进行的,即当某个RDD的操作是Shuffle时,以该Shuffle操作为界限划分成前后两个调度阶段。

createResultStage

在DAGScheduler的handleJobSubmitted方法中根据最后一个RDD生成ResultStage,通过createResultStage创建finalStage,传入的参数包括最后一个finalRDD,操作的函数func,分区partitions、jobId、callSite等内容。

createResultStage源码,位于org.apache.spark.scheduler.DAGScheduler#createResultStage

 private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

createResultStage中,基于作业Id,作业的ID(jobId)是作为第三个参数转进来的,创建了ResultStage。createResultStage的getOrCreateParentStages获取或创建一个给定RDD的父Stages列表。

getOrCreateParentStages源码,位于org.apache.spark.scheduler.DAGScheduler#getOrCreateParentStages

 private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}

getShuffleDependencies

getOrCreateParentStages方法调用了getShuffleDependencies(rdd),getShuffleDependencies返回给定RDD的父节点中直接的shuffle依赖。这个函数不会返回更远祖先节点的依赖。

根据Shuffle依赖生成一个新的Stage。如果不是Shuffle级别的依赖,就将其加入waitingForVisit.push(dependency.rdd),waitingForVisit是一个栈Stack,把当前依赖的RDD入栈。然后进行循环遍历,将waitingForVisit.pop()的内容弹出来放入到toVisit,如果没有访问过,就判断toVisit.dependencies,如果是Shuffle依赖,加入到parents结构;如果是窄依赖,就加入到waitingForVisit。

getShuffleDependencies源码,位于org.apache.spark.scheduler.DAGScheduler#getShuffleDependencies

 private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}

已上图为例:

RDD G同时依赖于RDD B,RDD F;根据依赖关系,RDD B 和RDD G在同一个Stage里,RDD G和RDD F不在同一个Stage里。

  • 首先将RDD G放入到waitingForVisit,然后看RDD G的依赖关系,依赖于RDD B和RDD F;
  • RDD G和RDD F构成宽依赖,所以加入父Stage里,是一个新的Stage
  • RDD G和RDD B构成窄依赖,把RDD B放入到栈waitingForVisit中,RDD G和RDD B在同一个Stage中。
  • 从栈waitingForVisit中将RDD B出栈,获得宽依赖RDD A,将构成一个新的Stage。
  • RDD G的getShuffleDependencies最终返回HashSet(ShuffleDependency(RDD F), ShuffleDependency(RDD A))
  • 然后getShuffleDependencies(rdd).map遍历调用getOrCreateShuffleMapStage直接创建父Stage。

getOrCreateShuffleMapStage

getOrCreateParentStages方法通过getShuffleDependencies(rdd).map进行map转换是调用了getOrCreateShuffleMapStage方法。如果在shuffleIdToMapStage数据结构中shuffleId已经存在,那么就获取shuffle map stage;否则,shuffle map stage不存在,处理即将进行的更远祖先节点的shuffle map stage,还将创建一个自己的shuffle map stage。

getOrCreateShuffleMapStage源码,位于org.apache.spark.scheduler.DAGScheduler#getOrCreateShuffleMapStage

 private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage

case None =>
// Create stages for all missing ancestor shuffle dependencies.
// 创建所有即将计算的祖先shuffle依赖的阶段
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
// 尽管getMissingAncestorShuffleDeoendencies值返回shuffle的依赖,其以不在shuffleIdToMapStage中。
//
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
// 为给定的shuffle依赖创建一个阶段。
createShuffleMapStage(shuffleDep, firstJobId)
}
}

getOrCreateShuffleMapStage方法中:

  • 如果根据shuffleId匹配获取到Stage,就返回Stage。首先从shuffleIdToMapStage中根据shuffleId获取Stage。shuffleIdToMapStage是一个HashMap数据结构,将ShuffleDependency ID对应到ShuffleMapStage的映射关系,shuffleIdToMapStage只包含当前运行作业的映射数据,当Shuffle Stage作业完成时,Shuffle映射数据将被删除,Shuffle的数据将记录在MapOutputTracker中。
  • 如果根据shuffleId没有获取到Stage,调用getMissingAncestorShuffleDependencies方法,createShuffleMapStage创建所有即将进行计算的祖先shuffle依赖的Stage

getMissingAncestorShuffleDependencies方法查找shuffle依赖中还没有进行shuffleToMapStage注册的祖先节点。getMissingAncestorShuffleDependencies源码,位于org.apache.spark.scheduler.DAGScheduler#getMissingAncestorShuffleDependencies

 private def getMissingAncestorShuffleDependencies(
rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val ancestors = new Stack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
// 手动维护堆栈,防止堆栈溢出
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
getShuffleDependencies(toVisit).foreach { shuffleDep =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
ancestors.push(shuffleDep)
waitingForVisit.push(shuffleDep.rdd)
} // Otherwise, the dependency and its ancestors have already been registered.
}
}
}
ancestors
}

createShuffleMapStage方法根据Shuffle依赖的分区创建一个ShuffleMapStage,如果前一个Stage已生成相同的Shuffle数据,那Shuffle数据仍是可用的,createShuffleMapStage方法将复制Shuffle数据的位置信息取获取数据,无须重新生成一次数据。

createShuffleMapStage源码,位于org.apache.spark.scheduler.DAGScheduler#createShuffleMapStage

 def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)

stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)

if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// A previously run stage generated partitions for this shuffle, so for each output
// that's still available, copy information about that output location to the new stage
// (so we don't unnecessarily re-compute that data).
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
DAGScheduler-阶段划分

在DAG划分Stage示意图汇总,对RDD G调用createResultStage,通过getOrCreateParentStages获取所有父List[Stage]:Stage1、Stage2,然后创建自己的Stage3

  • getOrCreateParentStages:获取或创建给定RDD的父RDD列表。在示意图中RDD G在getOrCreateParentStages方法中调用getShuffleDependencies获取RDD G所有直接宽依赖集合HashSet(ShuffleDependency(RDD F), ShuffleDependency(RDD A)),这里是RDD F和RDD A的宽依赖集合,然后遍历集合,对(ShuffleDependency(RDD F), ShuffleDependency(RDD A))分别调用getOrCreateShuffleMapStage
  • 对ShuffleDependency(RDD A)调用getOrCreateShuffleMapStage,getOrCreateShuffleMapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffleDependencies查找ShuffleDependency(RDD A)祖先的Shuffle依赖,返回为空;对ShuffleDependency(RDD A)调用createShuffleMapStage, RDD A没有父Stage,创建Stage 1
  • 对ShuffleDependency(RDD F)调用getOrCreateShuffleMapStage,getOrCreateShuffleMapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffleDependencies查找ShuffleDependency(RDD F)祖先的Shuffle依赖,返回为空;对ShuffleDependency(RDD F)调用createShuffleMapStage, RDD F之前的RDD C到RDD D、RDD F,RDD E到RDD F之间没有Shuffle,没有宽依赖不会产生Stage。因此RDD F没有父Stage,创建Stage 2
  • 最后把List(Stage 1, Stage 2)作为Stage 3的父Stage,创建Stage 3。Stage 3 是ResultStage。


分享到:


相關文章: