Spark 常规性能调优

1 常规性能调优一:最优资源配置

Spark 性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后(其实就是没钱提升硬件后),在此基础上再考虑进行后面论述的性能调优策略。

资源的分配在使用脚本提交 Spark 任务时进行指定,标准的 Spark 任务提交脚本如代码清单2-1所示:

代码清单 2-1 标准 Spark 提交脚本

/usr/opt/modules/spark/bin/spark-submit \\
--class com.test.spark.Analysis \\
--num-executors 80 \\
--driver-memory 6g \\
--executor-memory 6g \\
--executor-cores 3 \\
/usr/opt/modules/spark/jar/spark.jar \\

可以进行分配的资源如表 2-1 所示:

表 2-1 可分配资源表


Spark 常规性能调优

调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度。

对于具体资源的分配,我们分别讨论 Spark 的两种 Cluster 运行模式:

第一种是 Spark Standalone 模式,你在提交任务前,一定知道或者可以从运维部门获取到你可以使用的资源情况,在编写 submit 脚本的时候,就根据可用的资源情况进行资源的分配,比如说集群有 15 台机器,每台机器为 8G 内存,2 个 CPU core,那么就指定 15 个 Executor,每个 Executor 分配 8G 内存,2 个 CPU core。

第二种是 Spark Yarn 模式,由于 Yarn 使用资源队列进行资源的分配和调度,在写 submit 脚本的时候,就根据 Spark 作业要提交到的资源队列,进行资源的分配,比如资源队列有 400G 内存,100 个 CPU core,那么指定 50 个 Executor,每个 Executor 分配 8G 内存,2 个 CPU core。

对表 2-1 中的各项资源进行了调节后,得到的性能提升如表 2-2 所示:

表2-2 资源调节后的性能提升

Spark 常规性能调优

补充:生产环境 Spark submit 脚本配置

/usr/local/spark/bin/spark-submit \\
--class com.test.spark.WordCount \\
--num-executors 80 \\
--driver-memory 6g \\
--executor-memory 6g \\
--executor-cores 3 \\
--master yarn-cluster \\
--queue root.default \\
--conf spark.yarn.executor.memoryOverhead=2048 \\
--conf spark.core.connection.ack.wait.timeout=300 \\
/usr/local/spark/spark.jar

参数配置参考值:

--num-executors:50~100

--driver-memory:1G~5G

--executor-memory:6G~10G

--executor-cores:3

--master:实际生产环境一定使用yarn-cluster

实际场景举例:

上面说的这些 submit 提交,相信大家都需要做,但看完这小结后你在提交任务时会进行最优配置了吗?没有实际工作过的同学肯定还是不会,因为这些说白了还是理论,对是对,但实际生产中不能生搬硬套。

比如工作中一般会用到 CDH 的公司,比如小编的,一般会在 oozie 上通过建立 workflow 进行任务的调度,那么对于跑 Spark 的任务,其参数配置就要视业务而定了。上面说到 --driver-memory 这个配置不必太大,一般都会比 --executor-memory 小,因为后者才是进行计算的地方。但小编的公司 --driver-memory 居然会设置成 --executor-memory 的两倍。为啥?因为我们有的业务需要拉到 driver 端的缓存中,小就爆了,别问我为什么拉到 driver 端,这多傻啊,业务场景不一样,需要的计算方式也不一样。当然我不否定会有更优的方式处理,这里只是说理论不能偏离实际。

配置额外补充:动态资源分配(小编公司中使用的配置方式)

--conf spark.shuffle.service.enabled=true //启用External shuffle Service服务
--conf spark.dynamicAllocation.enabled=true //开启动态资源分配
--conf spark.dynamicAllocation.minExecutors=1 //每个Application最小分配的executor数
--conf spark.dynamicAllocation.maxExecutors=40 //每个Application最大并发分配的executor数

开启动态分配策略后,application 会在 task 因没有足够资源被挂起的时候去动态申请资源,这种情况意味着该 application 现有的 executor 无法满足所有 task 并行运行。spark 一轮一轮的申请资源,当有 task 挂起或等待 spark.dynamicAllocation.schedulerBacklogTimeout (默认1s)时间的时候,会开始动态资源分配;之后会每隔 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (默认1s)时间申请一次,直到申请到足够的资源。每次申请的资源量是指数增长的,即1,2,4,8等。

之所以采用指数增长,出于两方面考虑:其一,开始申请的少是考虑到可能 application 会马上得到满足;其次要成倍增加,是为了防止 application 需要很多资源,而该方式可以在很少次数的申请之后得到满足。

2 常规性能调优二:RDD 优化

1.RDD 复用

在对 RDD 进行算子时,要避免相同的算子和计算逻辑之下对 RDD 进行重复的计算,如图 2-1 所示:

Spark 常规性能调优

对图 2-1 中的 RDD 计算架构进行修改,得到如图 2-2 所示的优化结果:

Spark 常规性能调优

2.RDD 持久化

在 Spark 中,当多次对同一个 RDD 执行算子操作时,每一次都会对这个 RDD 以之前的父 RDD 重新计算一次,这种情况是必须要避免的,对同一个 RDD 的重复计算是对资源的极大浪费,因此,必须对多次使用的 RDD 进行持久化,通过持久化将公共 RDD 的数据缓存到内存/磁盘中,之后对于公共 RDD 的计算都会从内存/磁盘中直接获取 RDD 数据。

对于 RDD 的持久化,有两点需要说明:

第一,RDD 的持久化是可以进行序列化的,当内存无法将 RDD 的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中。

第二,如果对于数据的可靠性要求很高,并且内存充足,可以使用副本机制,对 RDD 数据进行持久化。当持久化启用了复本机制时,对于持久化的每个数据单元都存储一个副本,放在其他节点上面,由此实现数据的容错,一旦一个副本数据丢失,不需要重新计算,还可以使用另外一个副本。

  1. RDD 尽可能早的 filter 操作

获取到初始 RDD 后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升 Spark 作业的运行效率。

3 常规性能调优三:并行度调节

Spark 作业中的并行度指各个 stage 的 task 的数量。

如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20 个 Executor,每个 Executor 分配 3 个 CPU core,而 Spark 作业有 40 个 task,这样每个 Executor 分配到的 task 个数是 2 个,这就使得每个 Executor 有一个 CPU core 空闲,导致资源的浪费。

理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个 Spark 作业的性能和运行速度。

Spark 官方推荐,task 数量应该设置为 Spark 作业总 CPU core 数量的2~3倍。之所以没有推荐 task 数量与 CPU core 总数相等,是因为 task 的执行时间不同,有的 task 执行速度快而有的 task 执行速度慢,如果 task 数量与 CPU core 总数相等,那么执行快的 task 执行完成后,会出现 CPU core 空闲的情况。如果 task 数量设置为 CPU core 总数的2~3倍,那么一个 task 执行完毕后,CPU core 会立刻执行下一个 task,降低了资源的浪费,同时提升了 Spark 作业运行的效率。

Spark作业并行度的设置如代码清单2-2所示:

代码清单2-2 Spark 作业并行度设置

val conf = new SparkConf().set("spark.default.parallelism", "500")

4 常规性能调优四:广播大变量

默认情况下,task 中的算子中如果使用了外部的变量,每个 task 都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对 RDD 进行持久化,可能就无法将 RDD 数据存入内存,只能写入磁盘,磁盘 IO 将会严重消耗性能;另一方面,task 在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的 GC,GC 会导致工作线程停止,进而导致 Spark 暂停工作一段时间,严重影响 Spark 性能。

假设当前任务配置了 20 个 Executor,指定 500 个 task,有一个 20M 的变量被所有 task 共用,此时会在500 个 task 中产生 500 个副本,耗费集群 10G 的内存,如果使用了广播变量, 那么每个 Executor 保存一个副本,一共消耗 400M 内存,内存消耗减少了 5 倍。

广播变量在每个 Executor 保存一个副本,此 Executor 的所有 task 共用此广播变量,这让变量产生的副本数量大大减少。

在初始阶段,广播变量只在 Driver 中有一份副本。task 在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的 Executor 对应的 BlockManager 中尝试获取变量,如果本地没有,BlockManager 就会从 Driver 或者其他节点的 BlockManager 上远程拉取变量的复本,并由本地的 BlockManager 进行管理;之后此 Executor 的所有 task 都会直接从本地的 BlockManager 中获取变量。

注意:这里说的大变量也是有限制的,如果变量太大,超过了 executor 内存,那么这个变量时放不进去的,就会 OOM。

5 常规性能调优五:Kryo 序列化

默认情况下,Spark 使用 Java 的序列化机制。Java 的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现 Serializable 接口即可,但是,Java 序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。

Kryo 序列化机制比 Java 序列化机制性能提高 10 倍左右,Spark 之所以没有默认使用 Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时 Kryo 需要用户在使用前注册需要序列化的类型,不够方便,但从Spark 2.0.0 版本开始,简单类型、简单类型数组、字符串类型的 Shuffling RDDs 已经默认使用 Kryo 序列化方式了。

Kryo 序列化注册方式的实例代码如代码清单 2-3 所示:

代码清单 2-3 Kryo 序列化机制配置代码

public class MyKryoRegistrator implements KryoRegistrator
{
@Override
public void registerClasses(Kryo kryo)
{
kryo.register(StartupReportLogs.class);
}
}

配置 Kryo 序列化方式的实例代码如代码清单2-4所示:

代码清单 2-4 Kryo 序列化机制配置代码

//创建SparkConf对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化库,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//在Kryo序列化库中注册自定义的类集合,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "test.com.MyKryoRegistrator")

6 常规性能调优六:调节本地化等待时长

Spark 作业运行过程中,Driver 会对每一个 stage 的 task 进行分配。根据 Spark 的 task 分配算法,Spark 希望 task 能够运行在它要计算的数据所在的节点(数据本地化思想),这样就可以避免数据的网络传输。通常来说,task 可能不会被分配到它处理的数据所在的节点,因为这些节点可用的资源可能已经用尽,此时,Spark 会等待一段时间,默认 3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将 task 分配到比较差的本地化级别所对应的节点上,比如将 task 分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级。

当 task 要处理的数据不在 task 所在节点上时,会发生数据的传输。task 会通过所在节点的 BlockManager 获取数据,BlockManager 发现数据不在本地时,会通过网络传输组件从数据所在节点的 BlockManager 处获取数据。

网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此,我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分task,那么当前的task将有机会得到执行,这样就能够改善Spark作业的整体性能。

Spark的本地化等级如表2-3所示:

表2-3 Spark本地化等级


Spark 常规性能调优

在Spark项目开发阶段,可以使用client模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的task数据本地化的级别,如果大部分都是PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看task的本地化级别有没有提升,并观察Spark作业的运行时间有没有缩短。

注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark作业的运行时间反而增加了。

Spark本地化等待时长的设置如代码清单2-5所示:

代码清单2-5 Spark本地化等待时长设置示例

val conf = new SparkConf().set("spark.locality.wait", "6") 



分享到:


相關文章: