DAGScheduler-Stage提交

在DAGScheduler的handleJobSubmitted方法中,

 private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],

func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
// 如果作業運行在HadoopRDD上,而底層HDFS的文件已被刪除,那麼在創建新的Stage是將會拋出異常。
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))

val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}

其中createResultStage方法創建ResultStage,在submitStage方法中提交finalStage。handleJobSubmitted方法中的ActiveJob是一個普通的數據結構,保存了當前的Job的一些信息:

 private[spark] class ActiveJob(
val jobId: Int,
val finalStage: Stage,
val callSite: CallSite,

val listener: JobListener,
val properties: Properties) {

/**
* Number of partitions we need to compute for this job. Note that result stages may not need
* to compute all partitions in their target RDD, for actions like first() and lookup().
*/
val numPartitions = finalStage match {
case r: ResultStage => r.partitions.length
case m: ShuffleMapStage => m.rdd.partitions.length
}

/** Which partitions of the stage have finished */
val finished = Array.fill[Boolean](numPartitions)(false)

var numFinished = 0
}

getMissingParentStages方法根據finalStage找父Stage,如果有父Stage,就直接返回;如果沒有父Stage,就進行創建。

submitStage源碼, 位於org.apache.spark.scheduler.DAGScheduler#submitStage

 private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}

submitStage方法首先從activeJobForStage中獲取JobID;如果JobID已經定義為isDefined,那麼獲取即將計算的Stage(getMissingParentStages),然後進行升序排序。

如果父Stage為空,那麼提交submitMissingTasks,DAGScheduler把處理的過程交給具體的TaskScheduler去處理。

如果父Stage不為空,將循環遞歸調用submitStage(parent),從後向前回溯。後面的Stage依賴於前面的Stage。只有前面依賴的Stage計算完畢後,後面的Stage才會運行。

Stage

DAGScheduler會將Job的RDD劃分到不同的Stage,並構建這些Stage的依賴關係。這樣可以使得沒有依賴關係的Stage並行執行,並保證有順序依賴關係的Stage順序執行。並行執行能夠有效利用集群資源,提升運行效率,而串行執行則適用於在時間、資源上存在強制依賴的場景。

Stage分為需要處理的Shuffle的ShuffleMapStage和最下游的ResultStage。上游Stage優於下游Stage執行,ResultStage是最後執行的Stage。

Stage的屬性如下:

  • id:Stage的身份標識
  • rdd:當前Stage包含的RDD
  • numTasks:當前Stage的Task數量
  • parents:當前Stage的父Stage列表。一個Stage可以有一到多個父Stage
  • firstJobId:第一個提交當前Stage的Job的身份標識(即Job的Id)。當使用FIFO調度時,通過FIrstJobId首先計算來自比較早Job的Stage,或者在發生故障是更快的恢復。
  • callSite:應用程序中與當前Stage相關聯的調用棧信息。
  • numPartitions:當前Stage的分區數量。numPartitions實際為rdd的分區的數量。
  • jobIds:當前Stage所屬的Job的身份標識集合。一個Stage可以屬於一到多個Job。
  • pendingPartitions:存儲待處理分區的索引的集合。
  • nextAttemptId:用於生成Stage下一次嘗試的身份標識。
  • _latestInfo:Stage最近一次嘗試信息,即StageInfo。
  • fetchFailedAttemptIds:發生FetchFailure的Stage嘗試的身份標識集合。此屬性用於避免在發生FetchFailure後無止境的重試。
  • clearFailures:清空fetchFailedAttemptIds
  • failedOnFetchAndShouldAbort:用於將發生FetchFailure的Stage嘗試的身份標識添加到fetchFailedAttemptIds中,並返回發生FetchFailure的次數是否已經超過了允許發生 FetchFailure的次數的狀態。允許發生FetchFailure的次數固定為4.
  • latestInfo:返回最近一次Stage嘗試的StageInfo,即返回_latestInfo
  • findMissingPartitions:找到還未執行完成的分區。需要子類實現。
  • makeNewStageAttempt:用於創建新的Stage嘗試,
 def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
val metrics = new TaskMetrics
metrics.register(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
nextAttemptId += 1
}

makeNewStageAttempt方法的執行步驟如下:

  • 調用StageInfo的fromStage方法創建新的StageInfo
  • 增加nextAttemptId

抽象類Stage有兩個實現類,分別是ShuffleMapStage、ResultStage

ShuffleMapStage

ShuffleMapStage的DAG調度過程的中間Stage,可以包括一到多個ShuffleMapTask,這些ShuffleMapTask將生成用於SHuffle的數據。ShuffleMapStage一般是ResultStage或者其他ShuffleMapStage的父Stage,ShuffleMapTask則通過Shuffle與下游Stage中的Task串聯起來。

從ShuffleMapStage的命名可以看出,它將Shuffle的數據映射到下游Stage的各個分區中。

ShuffleMapStage處理繼承父類Stage的屬性外,還包括一下屬性:

  • shuffleDep:與ShuffleMapStage相關聯的ShuffleDependency
  • _mapStageJobs:與ShuffleMapStage相對應的ActiveJob的列表。
  • _numAvailableOutputs:ShuffleMapStage可用的map任務的輸出數量。也代表了執行成功的map數量
  • outputLocs:ShuffleMapStage的各個map與其對應的MapStatus列表的映射關係。由於map任務可能會運行多次,因而可能會有多個MapStatus。

ShuffleMapStage提供的方法:

  • mapStageJobs方法:即獲取_mapStageJobs數據
  • addActiveJob方法與removeActiveJob方法:向ShuffleMapStage相關聯的ActiveJob的列表中添加或刪除ActiveJob。源碼如下
 /** Adds the job to the active job list. */
def addActiveJob(job: ActiveJob): Unit = {
_mapStageJobs = job :: _mapStageJobs
}

/** Removes the job from the active job list. */
def removeActiveJob(job: ActiveJob): Unit = {
_mapStageJobs = _mapStageJobs.filter(_ != job)
}
  • numAvailableOutputs方法:讀取_numAvailableOutputs
  • isAvailable方法:當_numAvailableOutputs與numPartitions相等時為true。ShuffleMapStage的所有分區的map任務執行成功後,ShuffleMapStage才是可用的。
  • findMissingPartitions方法:找到所有還未執行成功而需要計算的分區。源碼如下:
 override def findMissingPartitions(): Seq[Int] = {
val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
assert(missing.size == numPartitions - _numAvailableOutputs,
s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
missing
}
  • addOutputLoc方法:當面某一個分區的任務執行完成後,首先將分區與MapStatus的對應關係添加到outputLocs中,然後將可用的輸出數加1.源碼如下:
 def addOutputLoc(partition: Int, status: MapStatus): Unit = {
val prevList = outputLocs(partition)
outputLocs(partition) = status :: prevList
if (prevList == Nil) {
_numAvailableOutputs += 1
}
}

ResultStage

ResultStage可以使用指定的函數對RDD中的分區進行計算並得出最終結果。ResultStage是最後執行的Stage。此階段主要進行作業的收尾工作。ResultStage除了繼承父類Stage的屬性外,還包括一下屬性:

  • func:即對RDD的分區進行計算的函數。func是ResultStage的構造器參數,指定了函數的形式必須滿足(TaskContext, Iterator[_]) => _
  • partitions:由RDD的各個分區的索引組成的數組。
  • _activeJob:ResultStage處理的ActiveJob

ResultStage提供的一些方法

 def activeJob: Option[ActiveJob] = _activeJob

def setActiveJob(job: ActiveJob): Unit = {
_activeJob = Option(job)
}

def removeActiveJob(): Unit = {

_activeJob = None
}

/**
* Returns the sequence of partition ids that are missing (i.e. needs to be computed).
*
* This can only be called when there is an active job.
*/
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}

其中findMissingPartitions方法用於找出當前Job的所有分區中還沒有完成的分區的索引。ResultStage判斷一個分區是否完成,是通過ActiveJob的Boolean類型數組finished,因為finished記錄了每個分區是否完成。

StageInfo

StageInfo用於描述Stage信息,可以傳遞給SparkListener。StageInfo包括一下屬性:

  • stageId:Stage的id
  • attemptId:當前Stage嘗試的id。
  • name:當前Stage的名稱。
  • numTasks:當前Stage的Task數量
  • rddInfos:RDD信息(即RDDInfo)序列
  • parentIds:當前Stage的父Stage的id序列
  • details:詳細的線程棧信息
  • taskMetrics:Task的度量信息
  • taskLocalityPreferences:類型為Seq[Seq[TaskLocation]],用於存儲任務的本地性偏好
  • submissionTime:DAGScheduler將當前Stage提交給TaskScheduler的時間
  • completionTime:當前Stage中所有Task完成的時間(即Stage完成的時間)或者Stage被取消的時間。
  • failureReason:如果Stage失敗了,用於記錄失敗的原因。
  • accumulables:存儲所有聚合器計算的最終值。

StageInfo提供了一個當Stage失敗是要調用的方法stageFailed,源碼如下:

 def stageFailed(reason: String) {
failureReason = Some(reason)
completionTime = Some(System.currentTimeMillis)
}


分享到:


相關文章: