Spark-关于Shuffle

一、关于Shuffle

**Shuffle**

,洗牌,。之所以需要Shuffle,还是因为具有某种共同特征的一类数据需要最终汇聚(aggregate)到一个计算节点上进行计算。这些数据分布在各个存储节点上并且由不同节点的计算单元处理。以最简单的WordCount 为例,其中数据保存在Node1、Node2和Node3;经过处理后,这些数据最终会汇聚到Node a、Node b处理,。 这个数据重新打乱,然后汇聚到不同节点的过程就是Shuffle,。

二、关于Shuffle调优

### 关于Spark Shuffle 概述

Spark是分布式计算系统,数据块在不同节点执行,但是一些操作,例如join,需要将不同节点上相同的Key对应的Value聚集到一起,Shuffle便应运而生。

Shuffle 是很耗资源的操作: **网络IO****磁盘IO**(因为spark中Shuffle是一定落磁盘的)。Spark默认使用32KB的memory buffer存储Shuffle的中间结果,如果buffer满了就会写入磁盘,生成一个小文件,每个Partition的多个小文件会在map端处理结束后合并为一个大文件,这些临时文件会在对应的application结束后被删除。

#### MapReduce

MapReduce的过程,大致如下

- read、compute and buffer in memory ,map task读取数据,计算并将数据存储在内存。

- sort、spill to disk and merge 内存不足,数据溢写到磁盘,溢写的数据是已经sort的,最后对溢写的磁盘数据再进行一次merge sort。

- reduce fetch data and merge ,reduce task读取map的输出,并进行归并排序。

- compute and output 对归并后的数据进行计算并输出。

#### 触发Shuffle操作

以下三类:

- **repartition相关** repartition、coalesce

- **ByKey操作** groupByKey、reduceByKey、combineByKey、aggregateByKey等

- **join相关** cogroup、join

#### 实现

Spark中先后实现了三个Shuffle算法,如下

// Let the user specify short names for shuffle managers

// 让用户给shuffle managers指定一个短名称

/** 这里支持了两种类型的shuffle

* 以前还有一种"hash"->"org.apache.spark.shuffle.hash.HashShuffleManager"

* 但是HashShuffleManager被取消使用了

* */

val shortShuffleMgrNames = Map(

"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,

"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)

hash shuffle(被取消了)、sort shuffle、tungsten-sort shuffle的实现细节

### 1、什么是shuffle

每个Spark作业启动运行的时候,首先Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批Task,然后将这些Task分配到各个Executor进程中执行。一个stage的所有Task都执行完毕之后,在各个executor节点上会产生大量的文件,这些文件会通过IO写入磁盘(这些文件存放的时候这个stage计算得到的中间结果),然后Driver就会调度运行下一个stage。下一个stage的Task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到程序执行完毕,最终得到我们想要的结果。Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如groupByKey、countByKey、reduceByKey、join等等)每当遇到这种类型的RDD算子的时候,划分出一个stage界限来。

每个shuffle的前半部分stage的每个task都会创建出后半部分stage对应的task数量的文件,(注意是前半部分的每个task都会创建相同数量的文件)。shuffle的后半部分stage的task拉取前半部分stage中task产生的文件(这里拉取的文件是:属于自己task计算的那部分文件);然后每个task会有一个内存缓冲区,使用HashMap对值进行汇集;比如,task会对我们自己定义的聚合函数,如reduceByKey()算子,把所有的值进行累加,聚合出来得到最终的值,就完成了shuffle操作。

那么默认的这种shuffle操作对性能有什么影响吗?

举个例子;有100个节点,每个节点运行一个executor,每个executor有2个cpu core,总共有1000个task;那么每个executor平均10个task。那么每个节点将会输出map端文件为:10 * 1000 = 10000;整个map端输出的文件数:100 * 10000 = 100万;shuffle中写磁盘操作是最消耗性能的。那么有什么办法可以降低文件个数的产生呢?先来看看下面这个图

为了解决产生大量文件的问题,我们可以**在map端输出的位置,将文件进行合并操作**,即使用

spark.shuffle.consolidateFiles 参数来合并文件,具体的使用方式为

new SparkConf().set("spark.shuffle.consolidateFiles","true")

开启文件合并以后,我们map端输出的文件会变为20万左右,也就是说map端输出的文件是原来默认的五分之一。所以说通过这个参数的设置,可以大大提升我们Spark作业的运行速度

### 关于map端内存缓冲和reduce端内存占比的优化。

**什么是map端内存缓冲区呢?**默认情况下,每个map端的task 输出的一些中间结果在写入磁盘之前,会先被写入到一个临时的内存缓冲区,这个缓冲区的默认大小为32kb,当内存缓冲区满溢之后,才会将产生的中间结果spill到磁盘上。

**reduce端内存占比又是什么呢?**reduce端的task在拉取到数据之后,会用一个hashmap的数据结构对各个key对应的value进行汇聚操作。在进行汇聚操作的时候,其使用的内存是由executor进程给分配的,默认将executor的内存的20%分配给reduce task 进行聚合操作使用。这里会有一个问题,当reduce task拉取的数据很多导致其分配的内存放不下的时候,这个时候会将放不下的数据全部spill到磁盘上去。

为了解决map端数据满溢引发的spill和reduce端数据过大引发的spill操作。我们可以通过两个参数来适当调整,以避免上述情况的出现,这个两个参数分别是:

spark.shuffle.file.buffer #map task的内存缓冲调节参数,默认是32kb

spark.shuffle.memoryFraction #reduce端聚合内存占比,默认0.2

### 怎么判断在什么时候对这两个参数进行调整呢?

通过监控平台查看每个executor的task的shuffle write和shuffle read的运行次数,如果发现这个指标的运行次数比较多,那么就应该考虑这两个参数的调整了;这个参数调整有一个前提,spark.shuffle.file.buffer参数每次扩大一倍的方式进行调整,spark.shuffle.memoryFraction参数每次增加0.1进行调整。


分享到:


相關文章: