DAGScheduler-提交Task

提交Task

當調度階段提交運行後,在DAGScheduler的submitMissingTasks方法中,會根據調度階段partition個數拆分對應個數任務,這些任務組成一個任務集提交到TaskScheduler進行處理

對於ResultStage生成ResultTask,對於ShuffleMapStage生成ShuffleMapTask。對於每一個任務集包含了對應調度階段的所有任務,這些任務處理邏輯完全一樣, 不同的是對應處理的數據,而這些數據是其對應的數據分片(Partition)。DAGScheduler的submitMissingTasks方法如下,此方法在Stage沒有不可用的父Stage時,提交當前Stage還未完成的任務:

 private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")

// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\\n${Utils.exceptionString(e)}", Some(e))

runningStages -= stage
return
}

stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage

// Abort execution
return
case NonFatal(e) =>
abortStage(stage, s"Task serialization failed: $e\\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}

val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,

taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}

if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)

val debugString = stage match {
case stage: ShuffleMapStage =>
s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})"
case stage : ResultStage =>
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)

submitWaitingChildStages(stage)
}
}

submitMissingTasks的執行過程總結如下:

  • 清空當前Stage的pendingPartitions。由於當前Stage的任務剛開始執行,所以需要清空便於記錄需要計算的分區任務,
  • 調用Stage的findMissingPartitions方法,找出當前Stage的所有分區中還沒有完成計算的分區的索引。
  • 獲取ActiveJob的properties。properties包含了當前Job的調度、group、描述等屬性信息。
  • 將當前Stage加入runningStages集合中,即當前Stage已經處於運行狀態。
  • 調用outputCommitCoordinator的stageStart方法,啟動對當前Stage的輸出提交到HDFS的協調。
  • 調用DAGScheduler的getPreferredLocs方法,獲取partitionsToCompute中的每一個分區的偏好位置。如果發生任何異常,則調用Stage的makeNewStageAttempt方法開始一次新的Stage執行嘗試,然後向listenerBus投遞SparkListenerStageSubmitted事件。
  • 調用Stage的makeNewStageAttempt方法開始Stage的執行嘗試,並向listenerBus投遞SparkListenerStageSubmitted事件。
  • 如果當前Stage是ShuffleMapStage,那麼對Stage的rdd和ShuffleDependency進行序列化;如果當前Stage是ResultStage,那麼對Stage的rdd和對RDD的分區進行計算的func進行序列化
  • 調用SparkContext的broadcast方法,廣播上一步生成的序列化對象
  • 如果當前Stage是ShuffleMapStage,則為ShuffleMapStage的每一個分區創建一個ShuffleMapTask。如果當前Stage是ResultStage,則為ResultStage的每一個分區創建一個ResultTask。
  • 如果上一步創建了至少1個Task,那麼將此Task處理的分區索引添加到Stage的pendingPartitions中,然後為這批Task創建TaskSet,並調用TaskScheduler的submitTasks方法提交此TaskSet
  • 如果沒有創建任何Task,意味著當前Stage沒有Task需要提交執行,因此調用DAGScheduler的markStageAsFinished方法,將當前Stage標記為完成。然後調用submitWaitingChildStages方法,提交當前Stage的子Stage。

markStageAsFinished方法,

 private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
val serviceTime = stage.latestInfo.submissionTime match {
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
case _ => "Unknown"
}
if (errorMessage.isEmpty) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.latestInfo.completionTime = Some(clock.getTimeMillis())

// Clear failure count for this stage, now that it's succeeded.
// We only limit consecutive failures of stage attempts,so that if a stage is
// re-used many times in a long-running job, unrelated failures don't eventually cause the
// stage to be aborted.
stage.clearFailures()
} else {
stage.latestInfo.stageFailed(errorMessage.get)
logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}")
}

outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
runningStages -= stage
}

markStageAsFinished的執行步驟如下:

1、計算Stage的執行時間

2、如果執行Stage的過程中並沒有發生錯誤,那麼設置Stage的latestInfo的完成時間等於系統當前時間,並調用Stage的clearFailures方法清空fetchFailedAttemptIds。

3、如果執行Stage的過程中發生了錯誤,那麼調用StageInfo的stageFailed方法保存失敗原因和Stage的完成時間。

4、調用outputCommitCoordinator的stageEnd方法,停止對當前Stage的輸出提交到HDFS的協調。

5、向listenerBus投遞SparkListenerStageCompleted消息

6、將當前Stage從正在運行的Stage中移除。

submitWaitingChildStages方法,提交Stage的所有處於等待的子Stage:

 private def submitWaitingChildStages(parent: Stage) {
logTrace(s"Checking if any dependencies of $parent are now runnable")
logTrace("running: " + runningStages)
logTrace("waiting: " + waitingStages)
logTrace("failed: " + failedStages)
val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
waitingStages --= childStages
for (stage submitStage(stage)
}
}

Task獲取最佳位置算法

根據當前Stage進行判斷,如果是ShuffleMapStage,則從getPreferredLocs(stage.rdd, id)獲取任務的本地性信息;如果是ResultStage,則從getPreferredLocs(stage.rdd, p)獲取任務的本地性信息。

源碼,在submitMissingTasks方法中:

 private def submitMissingTasks(stage: Stage, jobId: Int) {
………………
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>

val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
………………
}

其中partitionsToCompute為需要計算的Partition的Id

 val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

具體一個Partition中的數據本地性的算法實現在getPreferredLocs方法中

 private[spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
getPreferredLocsInternal(rdd, partition, new HashSet)
}

getPreferredLocsInternal是getPreferredLocs的遞歸實現:這個方法是線程安全的,只能被DAGScheduler通過線程安全方法getCacheLocs()使用。

getPreferredLocsInternal源碼如下:

 private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
// 如果分區已被訪問,則無須重新訪問。這避免了路徑探索
if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
// 已訪問的分區返回零
return Nil
}
// If the partition is cached, return the cache locations
// 如果分區已被緩存,返回緩存的位置。
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}

// If the RDD has some placement preferences (as is the case for input RDDs), get those
// 如果RDD位置優先(輸入RDDs的情況),就獲取它
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}

// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
// 如果RDD是窄依賴,將選擇第一個窄依賴的第一個分區作為位置首選項。
// 理想情況下,將基於傳輸大小選擇
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}

case _ =>
}

Nil
}

getPreferredLocsInternal代碼中:

  • 首先判斷visited中是否包含當前的RDD的partition。
  • 如果partition被緩存,則在getCacheLocs(rdd)(partition)傳入rdd和partition,獲取緩存的位置信息。如果能夠獲取到信息,就返回。

getCacheLocs源碼:

 private[scheduler] def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
// 注意:這個不用getOrElse(),因為方法被調用0(任務數)次

if (!cacheLocs.contains(rdd.id)) {
// Note: if the storage level is NONE, we don't need to get locations from block manager.
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
IndexedSeq.fill(rdd.partitions.length)(Nil)
} else {
val blockIds =
rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
blockManagerMaster.getLocations(blockIds).map { bms =>
bms.map(bm => TaskLocation(bm.host, bm.executorId))
}
}
cacheLocs(rdd.id) = locs
}
cacheLocs(rdd.id)
}

getCacheLocs中的cacheLocs是一個HashMap,包含每個RDD的分區上的緩存位置信息。map的key值是RDD的ID,value是由分區編號索引組成的數組。每個數組的值是RDD分區緩存位置的集合。

 private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]

getPreferredLocsInternal方法在具體算法實現的時候,首先查詢DAGScheduler的內存數據結構中是否存在當前Partition的數據本地性信息,如果有,則直接返回;如果沒有,首先調用rdd.getPreferredLocations。

如果自定義RDD,那麼一定要寫getPreferredLocations,這是RDD的五大特性之一。為保證Task計算的數據本地性,最關鍵的方式是必須實現RDD的getPreferredLocations。數據不動代碼動。

 final def preferredLocations(split: Partition): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
getPreferredLocations(split)
}
}
 protected def getPreferredLocations(split: Partition): Seq[String] = Nil

數據的本地性在運行前就已經完成,因為RDD構建的時候已經有元數據的信息。

DAGScheduler計算數據本地性的時候巧妙地藉助了RDD自身的getPreferredLocations中數據,最大化地優化效率,因為getPreferredLocations中表明每個Partition的數據本地性,雖然當前Partition可能被persist或者checkpoint,但是persist或者checkpoint默認情況下肯定和getPreferredLocations中的Partition的數據本地性一致的。


分享到:


相關文章: