【译】Spark 作业调度

原文地址:https://spark.apache.org/docs/latest/job-scheduling.html

这篇不仅仅是简单的翻译,有些比较不容易懂的地方,加了一些备注,另外翻译已经尽可能的准确,但是也难免有疏漏。

刘彬同学准备写一系列spark实战系列,本文是第三篇,翻译的Spark 作业调度!赞!推荐给大家,希望大家喜欢和支持!

系列文章

SparkContext 初始化内部原理

checkpoint的实现

1 概述

Spark有几个资源调度之间的工具,首先,回想一下,如集群部署模式概述中所提及的,每个Spark application (SparkContext实例)都会运行在一个独立的Execute进程中,集群管理器提供了Spark运行跨应用程序的调度功能,第二,每个Spark应用程序,如果它们由不同的线程提交多个“Job”(Spark action)则可能同时运行,如果应用程序通过网络服务请求,这是很正常的,Spark中包含一个公平调度程序用于在每个SparkContext中资源调度。

2 跨应用调度

当一个集群正在运行的时候,每个Spark应用程序都会独立的设置一个Executor JVM只会给该应用程序运行任务和存储数据,如果多个用户需要共享你的集群,通过集群管理器,它们由不同的选项

来管理分配。

最简单的选择,所有集群管理员都可以使用,资源的静态划分,通过这种方法,每个应用程序都获得了它可以使用的最大资源,并在整个过程中都保持这样,这是在Spark的Standalone和Yarn模式

中使⽤用的⽅方法,以及在粗粒度的Mesos模式,基于集群类型,可以配置集群分配类型:

Standalone mode:默认情况下,应用程序提交在Standalone集群模式中将会以FIFO(先入先出)顺序运行,每个应用程序将尝试所有可⽤用的节点,可以通过配置应⽤用程序配置spark.core.max来限制使用的最大节点数,或者不通过这个设置,改变应用默认的设置spark.deploy.defaultCores;最后,除了控制cores,每个应⽤用程序还可以通过设置spark.executor.memory来控制内存使⽤用。

Mesos:Mesos使用静态划分,设置spark.mesos.coarse配置属性为true,在Standalone模式可以选择设置spark.cores.max将每个应⽤用程序的资源共享,也可以设置spark.executor.memory来控制内存使用

Yarn:Spark YARN客户端的 —num-executors选项来控制在集群中分配资源的多少(spark.executor.instances 做为配置属性)而—executor-memory( spark.executor.memory配置属性 )和—

executor-cores( spark.executor.cores.配置属性 )控制每个执行期的资源,更多信息请查看YARN Spark Properties(如果你的spark通过YARN来调度执行,推荐看一下)

Mesos的第二个选项是动态分配CPU内核,在这个模式里,每个应用程序都有一个固定且独立的内存分配(通过设置spark.executor.memory),但是当一个应用程序没有在这个机器上运行,则其它的应用程序可能在这个集群上面分配内核运行任务,当你希望大量不太活跃的应用程序时这种方式是有用的,比如,来自不同用户的shell会话,然后它带来的风险就是可预见的延迟,因为当应⽤用程序有工作要做时,它可能需要在等待⼀一段时间才能获得内核来执行,要使用此模式,简单实用mesos:// 和 设置 spark.mesos.coarse 为false没有一种应用程序可以跨应用程序进行内存共享,如果你想通过这个方法共享数据,建议运行一个单独的服务器程序,该应用程序通过查询多个RDDs来服务于多个请求动态资源分配

Spark提供了一种机制,可以通过工作负载动态的调整应用程序占用的资源,这就意味着如果你的应用程序长时间没有使用和请求那么资源将会被回收返回给集群,这在多个应用程序在你的集群中

共享资源,这个特性很有用默认情况下,这个特性是禁用的,所有粗粒度的集群管理器都可用,⽐比如:Standalone 模式、Yarn模式和Mesos粗粒度模式都可用。

3 配置和设置

使用这个特性有两个要求,第一,应用程序必须将spark.dynamicAlloction.enabled设置为true,第二,必须在同一个集群的每个worker节点设置设置一个外部shuffle以及将spark.shuffle.service.enabled设置为true

外部shuffle程序的目的是是允许删除执行程序而不删除它们所编写的文件(下面描述的更详细),在集群管理器中设置此服务有不同的⽅方式:在Standalone模式下,启动workers并且将spark.shuffle.service.enable设置为true

在Mesos 粗粒度模式中,运行$SPARK_HOME/sbin/start-mesos-shuffle-service.sh 在所有从节点中并且将spark.shuffle.service.enabled设置为true,例如:可以通过Marathon来完成(Marathon:是mesos的⼀一个框架,支持长任务运行,比如Web服务等)

在YARN模式中,请看说明

译者补充:

在你的Yarn集群中每个NodeManager启动Spark Shuffle服务,需要遵循以下说明:

1.编译Spark与Yarn Profile,如果使用的是pre-packaged版本,可以跳过这一步

2.定位 the spark--yarn-shuffle.jar. 如果你用的是自己编译的spark并且使⽤用的是分布式环境的话,这是应该是在 $SPARK_HOME/common/network-yarn/target/scala-

3.在集群所有的NodeManager上面添加这个JAR包到classpath

4.在每个节点上的 yarn-site.xml,添加spark_shuffle到 yarn.nodemanager.aux-services,然后设置yarn.nodemanager.aux-services.spark_shuffle.class

到org.apache.spark.network.yarn.YarnShuffleService.

5.通过在etc/hadoop/yarn-env中设置YARN_HEAPSIZE( 默认为1000 )来增加NodeManager的堆大小,在shuffle过程中避免垃圾回收

6.在集群中重启所有的NodeManager

4 资源分配策略

在较高级别,在需要的时候,当它们不再使用时,Spark应该放弃Executors,当没有一个明确的方法去预测即将移除的Executor是否会在将来继续执行任务,或者将要添加的一组新的执行器是空闲

的,当移除和请求执⾏行器的时候我们需要⼀一组a set of heuristics to determinev

5 请求策略

当一个spark应用程序带有动态策略,当它有等待被调度的任务时,请求附加执行器,这种情况意味着现有的执行器不足以饱和和提交尚未完成的所有任务

Spark 请求执行器in rounds ,当存在spark.dynamicAllocation.schedulerBacklogTimeout 秒等待任务时,将触发实际的请求,如果等待队列依然存在,然后每次触发spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 此外,每次请求的执行器数量都比上一次指数级,比如,一个应用程序在第一轮添加一个执⾏行器,然后在后续继续添加了2,4,8个执⾏行器。

指数增加的动机是双重的,首先,应用程序在开始时应该谨慎的请求执行器,避免发现只有少数几个执行器可⽤用,这反而成为了TCP慢的理由,其次,应用程序应该及时提高资源使用率,避免实际

需要很多的执行器

6 移除策略

删除执行器策略要简单很多,当一个spark应用程序的空闲时间比spark.dynamicAllocation.executorIdleTimeout 要多时,Spark应该删除执行器,注意,在大多数情况下,这个条件和请求条件是互斥的,在此条件下,如果仍然有未完成的任务,则执⾏行器不应处于空闲状态

7 优雅的解除执行器

动态分配之前,一个Spark Executor退出失败或者相关联的应用程序退出,在这两种情况下,与执行器相关的所有状态都不再需要,可以安全的解除,但是,在动态分配情况下,与执行器显式地删除时,应用程序仍在运行,如果应用程序试图访问被执行人存储或写入的状态,则必须执行重新计算状态.

因此,Spark需要一种机制,在删除之前保留它的状态,从而优雅的移除它.这个条件对于shuffles及其重要,在Shuffle中,Spark Executor⾸首先将自己的Map输出写入到磁盘,然后当其它executors试图获取这些⽂文件时,它来充当这个文件的服务器,当任务执行缓慢的情况下,动态策略可能在shuffle完成之前删除一个executor,在这种情况下,由该executor计算的shuffle需要重新计算⼀一遍.

保留shuffle 文件的解决方案是引入外部shuffle服务,在spark1.2引入了,这个服务是一个长时间运行的进程,它运行在集群的每个节点上面,独立于应用程序和它的执行器,如果启用了这个服务,

spark executor将从服务中获取shuffle文件,而不是从彼此间获取,这表示在executors所写的所有shuffle状态都将继续被记录在executor的整个生命周期

除了写shuffle文件之外,executor还可以在磁盘和内存中缓存数据,但是,当删除一个executor的时候,所有缓存数据就无法访问,为了避免这种情况,默认的执行器中包含的缓存数据永远不会删除,可以配置spark.dynamicAllocation.cachedExecutorIdleTimeout,在以后版本中,缓存的数据可以通过一个类似于storage的堆外内存存储,这与通过外部shuffle服务保存文件的⽅方式类似。

8 应用程序内调度

在给定的spark应⽤用程序(SparkContext实例)中,如果从单线程中提交多个并行作业,则可以同时并行运行多个作业,通过“Job”,在本节中我们指的是一个Spark动作(比如:save,collect )

和任务需要运行的任务来评估该操作,Spark的调度器是线程安全的,它支持一个实例服务于多个请求的应用程序(比如多个用户查询)

默认情况下,Spark的调度器以FIFO方式运行作业,每个工作分为“阶段”(比如map和reduce阶段),第一个job获取所有可⽤用的资源,阶段任务启动,二个阶段的任务优先执行等等,如果队列头

部的作业不需要整个集群,则稍后的作业可以立即开始执行,但是如果队列头部的作业很大,则后面的作业可能被延迟

从Spark0.8开始,还可以配置作业间的公平共享,在公平共享下,Spark以“循环型”的方式分配任务之间的任务,这样所有工作都能得到大致相等的集群资源份额,这意味着,在长时间工作时提交

的短作业可以立即开始执行,而且能得到很好的响应时间,而不必等待长时间的任务,这种模式比较适合多用户

要启动公平调度程序,只需在配置SparkContext的时候,设置spark.scheduler.mode 属性为FAIR :

val conf = new SparkConf().setMaster(...).setAppName(...)

conf.set("spark.scheduler.mode", "FAIR")

val sc = new SparkContext(conf)

9 Fair调度器池

公平调度⽀支持将将作业分组到池中,然后为每个池分配不同的调度选项,这对于为一个重要的job创建一个“高优先级”池非常有用,比如:将每个user的job分组在一起然后给用户相同的份额,而不用管他们有多少个并发的job,此方法是在Hadoop Fair Scheduler之后建立的。如果没有任何修改,新提交的作业将进入默认池中,但是可以通过添加来spark.scheduler.pool设置作业池将本地配置提交到它们的线程池的SparkContext中,比如:

// Assuming sc is your SparkContext variable

sc.setLocalProperty("spark.scheduler.pool", "pool1”)

在设置本地属性之后,所有在该线程中提交的作业(通过此线程中的调⽤用者发送给RDD的save,count,collect,etc)将使用这个池的名称,同时这个设置是针对每个线程的,从而使的每个线程能

够代表同一个用户运行多个作业,如果想清除线程与之关联的池,只需要调用:

sc.setLocalProperty("spark.scheduler.pool", null)

池的默认行为

默认情况下,每个池在集群中将获得相同的份额(在默认池中共享每个作业),但是在每个池中,作业以FIFO顺序运行,比如:如果你为用户创建⼀、一个池,这意味着每个用户将获得集群的相同份额,并且每个用户的查询将以顺序运行,而不是从该⽤、用户的早期查询结果中获取。

10 配置池的属性

特定池的属性也可以通过配置文件进行修改,每个池支持三个属性:

调度模式:可以是FIFO或者FAIR,来控制池中的作业是顺序执行(默认情况下)还是共享池中的资源执行

权重:用来控制池相对其它池的共享份额,默认情况下,池的权重为1,比如,如果给一个特定的池权重为2,那么它将获得比其它池多两倍的资源,设置一个⾼高权重(1000)也可能使的池在本质上

获取优先权,权重位1000的池在得到job之后总能够首先执行(译注:如果一个池的权重设置的很高,那么不管在什么情况下,这个池的任务总是排在第一位执行)

minShare:除了整体的权重,每个池都可以得到一个管理员。希望它拥有的最小份额(作为CPU核⼼心数),在重新分配额外资源之前,公平调度程序会尝试满足所有活动的所需最小份额,因此

MinShare属性可以是另一种方法,以确保能够在不使集群的处于高度优先级的情况下,快速地获取一定数量的资源(⽐比如:10核),默认情况下,每个池的minshare为0

可以通过创建⼀一个类似于conf/fairscheduler.xml.template 的xml⽂文件来设置池的属性,并在sparkConf属性中设置spark.scheduler.allocation

conf.set("spark.scheduler.allocation.file", "/path/to/file”)

XML的文件格式只是简单的元素,里面包含各种设置,比如:

FAIR

1

2

FIFO

2

3

一个完整的例子,可以在conf/fairscheduler.xml.template中查看,注意,在XML⽂文件中没有配置的任何池只会得到所有设置的默认值(调度模式FIFO、权重1和minShare 0)。


分享到:


相關文章: