Spark-Core:调度

基本概念

  • Job(作业):RDD中有行动操作所生成一个或者多个调度阶段。
  • Stage(调取阶段): 一个Job需要拆分成多组任务来完成,每组任务由Stage封装。与一个Job所涉及的PartitionRDD类似,Stage之间也有依赖关系。
  • Task(任务):一个独立的工作单元,有Driver Program发送到Executor上去执行。通常情况下,一个Task处理RDD的一个Partition的数据。根据Task返回类型的不同,Task又分为ShuffleMapTask和ResultTask。是Spark执行的最小单元。
  • TaskSet(任务集): 一组任务就是一个TaskSet,对应一个Stage。其中,一个TaskSet的所有Task之间没有Shuffle依赖,因此互相之间可以并行运行。
  • DAGScheduler(高层调度器):面向调度阶段的任务调度器,负责接受Spark应用程序提交的作业,根据RDD的以来关系划分调度阶段,并提交调度阶段给TaskScheduler。
  • TaskScheduler(任务调度器):面向任务的调度器,接受DAGScheduler提交的调度阶段,然后把任务分发到Worker节点运行,由Worker节点的Executor来运行该任务。

概述

Spark的调度系统用于将用户提价的“任务”调度到集群中的不同节点执行。

Spark资源调度分为两层

  • 第一层是Cluster Manager,将资源分配给Application;
  • 第二层是Application,进一步将资源分配给Application的各个Task。

Spark 首先会对提交的Job进行一系列RDD的转换,并通过RDD的依赖关系构成有向无环图(Direct Acyclic Graph, DAG)。然后根据RDD依赖的不同将RDD划分到不同的阶段(Stage),每个阶段按照分区(Partition)的数量创建多个任务(Task)。最后将这些任务提交到集群的各个节点上运行。

在Spark中最重要的是DAGScheduler和TaskScheduler两个调度器,其中DAGScheduler负责任务的逻辑调度,讲作业拆分成不同阶段的具有依赖关系的任务集,而TaskScheduler则负责具体任务的调度执行。

调度系统的主要工作流程如下:

Spark-Core:调度

Spark调度

  • 1)build operator DAG:用户提交的Job将首先被转换成一系列RDD并通过RDD之间的关系构建DAG,然后将RDD构成的DAG提交到调度系统。
  • 2)split graph into stages of tasks:DAGScheduler负责接受有RDD构成的DAG,酱油系列RDD划分到不同的Stage。根据Stage的不同类型(目前有ResultStage和ShuffleStage两种),给Stage中未完成的Partition创建不同类型的Task(目前有ResultTask和ShuffleMapTask两种)。每个Stage将因为未完成Partition的多少,创建零到多个Tasl。DAGScheduler最后将每个Stage中的Task以任务集合(TaskSet)的形式提交给TaskScheduler继续处理。
  • 3)launch tasks via cluster manager:使用集群管理器(cluster manager)分配资源与任务调度,对于失败的任务还会有一定的重试与容错机制。TaskSchedler负责从DAGScheduler接受TaskSet,创建TaskSetManager对TaskSet进行管理,并将此TaskSetManager添加到调度池中,最后将对Task的调度交给调度后端接口(SchedulerBackend)处理。SchedulerBackend首先申请TaskShceduler,按照Task调度算法(目前有FIFO和FAIR两种)对调度池中的所有TaskSetManager进行排序,然后对TaskSet按照最大本地性原则分配资源,最后在各个分配的节点上运行TaskSet中的Task。
  • 4)execute tasks:执行任务,并将任务中间结果和最终结果存入存储体系。

DAGScheduler

DAGScheduler是面向Stage的高层调度器,DAGScheduler把DAG拆分成很多的Tasks,每组Tasks都是一个Stage,解析时以Shuffle为边界反向解析构建Stage,每当遇到Shuffle,就会产生新的Stage,然后以一个个TaskSet的形式提交给底层调度器TaskScheduler。DAGScheduler需要记录哪些RDD被存入磁盘等物化动作,同时要寻求Task的最优化调度。DAGScheduler还需要监视因为Shuffle跨节点输出可能导致的失败,如果发现这个Stage失败,可能就要重新提交该Stage。

DAG将调度提交给DAGScheduler,DAGScheduler调度时会根据是否需要经过Shuffle过程将Job划分为多个Stage。DAGScheduler的调度过程中,Stage阶段的划分是根据是否有Shuffle过程,也就是当存在ShuffleDependency的宽依赖时,需要进行Shuffle,这是才会将作业(Job)划分成多个Stage。

TaskScheduler

TaskScheduler的核心任务是提交TaskSet到集群运算并汇报结果。

  • 为TaskSet创建和维护一个TaskSetManager,并跟踪任务的本地性以及错误信息。
  • 遇到Straggle任务时,会放到其他节点进行重试
  • 向DAGScheduler回报执行情况,包括在SHuffle输出丢失的时候,报告fetch failed错误。

TaskSet是一个数据结构,TaskSet包含一系列高层调度器交给底层调度器的任务的集合。


分享到:


相關文章: