Spark任務調度|Spark,從入門到精通

歡迎閱讀美圖數據技術團隊的「Spark,從入門到精通」系列文章,本系列文章將由淺入深為大家介紹 Spark,從框架入門到底層架構的實現,相信總有一種姿勢適合你,歡迎大家持續關注:)

Spark任務調度|Spark,從入門到精通

圖 1

如圖 1 所示是 Spark 的執行過程,那麼具體 Drvier 是如何把 Task 提交給 Executor 的呢?本文將通過 DAGScheduler 、TaskScheduler、調度池和 Executor 四部分介紹 Spark 的任務調度原理及過程。

DAGScheduler

Spark 任務調度中各個 RDD 之間存在著依賴關係,這些依賴關係就形成有向無環圖 DAG,DAGScheduler 負責對這些依賴關係形成的 DAG 並進行 Stage 劃分,而 DAGScheduler 分為創建、Job 提交、Stage 劃分、Task 生成四個部分。

DAGScheduler 創建

private[spark]
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
this(
sc,
taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env)
}
.....
}

DAGScheduler 在 SparkContext 中創建,並且需要提供 TaskScheduler 的實例。在構造函數中的 MapOutputTrackerMaster 是運行在 Driver 端用來管理 ShuffleMapTask 的輸出,下游的 Task 可以通過 MapOutputTrackerMaster 來獲取 Shuffle 輸出的位置信息。

private[spark]
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
.....
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
.....
}

DAGScheduler 是基於 Akka Actor 的消息傳遞機制來構建事件循環處理邏輯,如上段代碼所示,在 DAGScheduler 初始化時創建了 eventProcessLoop 以處理各種 DAGSchedulerEvent,這些事件包括作業的提交、任務狀態的變化、監控等。

Job 提交

Spark任務調度|Spark,從入門到精通

圖 2

如圖 2 所示是 RDD 的 count 執行調用過程。其中,在 DAGScheduelr 的 submitJob 方法中會生成 JobId,並創建一個 JobWaiter 監聽 Job 是否執行成功。一個 Job 內包含多個 Task,只有所有 Task 都執行成功該 Job 才會被 JobWaiter 標記為 Succes。

Stage 劃分

用戶提交的計算任務是由多個 RDD 構成的 DAG, 當 RDD 在轉換時需要進行 Shuffle,Shuffle 的過程中就將這個 DAG 劃分成了多個 Stage。

由於後面的 Stage 需要前面的 Stage 提供 Shuffle 的結果,因此不同的 Stage 不能並行計算。那麼 RDD 在哪些操作時需要進行 Shuffle 呢?這裡涉及到 RDD 的兩種依賴關係:寬依賴與窄依賴。

Spark任務調度|Spark,從入門到精通

圖 3

如圖 3 左側所示為窄依賴,由於 RDD 每個 partition 依賴固定數量的 parent RDD 的 partition,所以可以通過 Task 來處理這些 partition。而且這些 partition 相互獨立,所以 Task 可以並行計算。寬依賴反之。

Spark任務調度|Spark,從入門到精通

圖 4

讓我們舉例說明 Stage 的劃分過程,如圖 4 所示從觸發 Action 的 RDD G 開始劃分,G 依賴 B 和 F,處理 B 和 F 的順序是隨機的,假設先處理 B。由於 G 和 B 是窄依賴關係,可以劃分到同一個 Stage 。接著處理 F,此時 F 和 G 是寬依賴關係,所以將 F 劃分到一個新的 Stage,以此類推劃分其它 Stage。

接著以 Stage 1 為例看它的計算方式,如圖 4 所示 RDD A 有三個 Partition,因此會生成三個 ShuffleMapTask,這三個 Task 會把結果輸出到三個 Partition 中。

Task 生成

任務生成首先要獲取需要計算的 Partition,如果是最後的 Stage 所對應的 Task 是 ResultTask,那麼先判斷 ResultTask 是否結束,若結束則無需計算;對於其它 Stage 對應的都是 ShuffleMapTask,因此只需要判斷 Stage 中是否有緩存結果。判斷出哪些 Partition 需要計算後生成對應的 Task,然後封裝到相應的 TaskSet 中,並提交給 TaskScheduler。TaskSet 中包含了一組處理邏輯完全相同的 Task,但它們的處理數據不同,這裡的每個 Task 負責一個 partition。

TaskScheduler

TaskScheduler 是在 SparkContext 中通過 createTaskScheduler 把引用傳給 DAGScheduler 的構造函數。每個 TaskScheduler 都會對應一個 SchedulerBackend,TaskScheduler 負責 Application 中不同 job 之間的調度,在 Task 執行失敗時啟動重試機制,並且為執行速度慢的 Task 啟動備份的任務;而 SchdulerBackend 負責與 Cluster Manager 交互,獲取該 Application 分配到的資源,然後傳給 TaskScheduler。

TaskScheduler 執行流程主要分成兩個部分:Driver 端執行和 Executor 執行,他們的執行步驟分別如下:

Driver 端執行TaskSchedulerImpl#submitTasks 將Task加入到TaskSetManager當中ScheduleBuilder#addTaskSetManager 根據調度優先級確定調度順序CoarseGrainedSchdulerBackend#reviveOffersDriverActor#makeOffersTaskSchedulerImpl#resourceOffers 響應資源調度請求,為每個Task分配資源DriverActor#launchTasks 將tasks發送到Executor

Executor上執行ReceiveWithLogging#launchTasksExecutor#launchTask


調度池


調度池顧名思義就是存放了一堆待執行的任務,它決定 TaskSetManager 的調度順序,然後由 TaskSetManager 根據就近原則來確定 Task 運行在哪個 Executor。

那麼它是如何決定 TaskSetManager 的調度順序的呢? 調度池主要有兩個決策策略:FIFO 和 FAIR。

Spark任務調度|Spark,從入門到精通

圖 5

首先以整體看 FIFO 和 FAIR 的執行對比圖圖 5,可以看出在左側 FIFO 只有一個調度池,即 rootPool,裡面包含了待調度的 TaskSetManager;而右側 FAIR 在 rootPool 調度池中包含了多個子調度池,比如圖中的 production 和 test 調度池。

在 FIFO 算法中需要保證 JobId 比較小的優先執行,如果是同一個 Job 則 StageId 比較小的先被調度。FAIR 算法則提供參數配置,如圖 6 所示是一份配置文件:

Spark任務調度|Spark,從入門到精通

圖 6

接著看看我們的 Spark 集群是如何配置的。

private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable, s2: Schedulable): Boolean
}
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
res < 0
}
}
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare = 0
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
s1.name < s2.name
}
}
}


首先獲取 S1 和 S2 兩個調度池中的運行狀態 Task 個數,若 S1 的運行狀態 Task 數小於該調度池的最小資源數,而 S2 相反,那麼此時優先調度 S1 中的 Task;如果 S1 和 S2 中的運行狀態 Task 數都小於該調度池的最小資源數,那麼就依據資源佔用率決定調度優先級;如果 S1、S2 的運行狀態 Task 數都大於所屬調度池的最小資源數,那麼就對比它們的已運行 task 個數與分配權重的比例,得出來比例較小的優先調度。

Executor

Spark任務調度|Spark,從入門到精通

圖 7


Spark任務調度|Spark,從入門到精通

圖 8

如圖 8 所示,Executor 是在 worker 收到 master 的 LaunchExecutorde 消息後創建的。在 TaskScheduler 階段提交 Task 之後 Driver 會序列化封裝 Task 的依賴文件和自身信息,然後在 Executor 上反序列化得到 Task。在準備好了 Task 的執行環境之後就通過 TaskRunner 去執行計算,得到執行狀態。值得注意的是,在得到計算結果發回 Driver 的過程中,如果文件太大會被直接丟棄(可以通過 spark.driver.maxResultSize 來設定大小)。


分享到:


相關文章: