Spark 弹性分布式数据集(RDD)

1.RDD简介

RDD弹性分布式数据集,一个RDD代表一个被分区的只读数据集。

RDD是spark的核心数据结构。通过RDD的依赖关系形成spark的调度顺序,通过对RDD的操作形成整个spark程序。

RDD的本质:在并行计算的各个阶段进行有效的数据共享(减少网络之间的文件传输代价)。

1.1.RDD的容错处理

由于RDD的接口只支持粗粒度操作(即一个操作会被应用在RDD的所有数据上),所有只要通过记录下这些作用在RDD之上的转换操作,来构建rdd的继承关系(lineage),就可以有效的进行容错处理。

1.2.RDD的生成

RDD的生成只有三种途径:

1.从内存集合

2.外部存储系统,hdfs(hive ,Cassandra, hbase)

默认 spark为每个hdfs的数据块创建一个分区,也可以设置较大分区数, 但是不能设置分区数小于数据块的数量。

使用sparkContext.wholeTextFiles读取一个包含很多小文件的目录,返回(文件名,内容)对,相对于textFile 返回文件中每一行作为一条记录。

为了SequenceFiles文件,使用sc.sequenceFile[K,V],k v是文件中k,v的数据类型。

为了hadoop其他的输入格式,可以使用sc.hadoopRDD。

一个partition 对应一个task,一个task 必定存在于一个Executor,一个Executor 对应一个JVM.

3.通过转换操作来自于其他的RDD ,如map,filter,join等。

2.spark RDD接口

partition 分区,一个RDD会有一个或者多个分区

preferredLocations(p) 对于分区p,返回数据本地化计算的节点

dependencies:RDD依赖关系

compute(p,context): 对于分区p,进行迭代计算

partitioner:RDD分区函数

2.1.RDD分区(partitions)

一个RDD包含一个或多个分区,每个分区都有分区属性,分区的多少决定了对RDD进行并行计算的并行度。

分区决定并行计算的粒度。每个rdd分区的计算操作都在一个单独的任务中被执行。

一个partition 对应一个task,一个task 必定存在于一个Executor,一个Executor 对应一个JVM.

利用rdd的成员变量partitions返回的数组大小查询RDD分区数。

#数组转换成分区,第二个参数是指定分区数

scala> val rdd =sc.parallelize(1 to 100,2)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :12

#返回分区数

scala> rdd.partitions.size

res2: Int = 2

#查看默认分区数 ()

scala> val rdd2 = sc.parallelize(1 to 100)

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :12

scala> rdd.partitions.size

res5: Int = 2

2.2.RDD优先位置(preferredLocations)

返回此RDD的每个分区所存储的位置。(用于在spark进行任务调度的时候,尽可能将任务分配到数据块所存储的位置)

依从hadoop读取数据生成RDD为例:preferredLocations返回每个数据块所在的机器名或IP地址,如果每一块数据是多份存储,则返回多个机器地址。

#读取hdfs生成一个类型为MappedRDD的RDD (一个文件生成一个RDD).

scala> val rdd = sc.textFile("hdfs://yarn1:8020/hmbbs_logs/access_2013_05_31.log")

16/04/27 21:45:41 INFO MemoryStore: ensureFreeSpace(219256) called with curMem=0, maxMem=311387750

16/04/27 21:45:41 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 214.1 KB, free 296.8 MB)

rdd: org.apache.spark.rdd.RDD[String] = MappedRDD[4] at textFile at :12

#通过依赖关系找到原始的rdd

scala> val hadoopRDD = rdd.dependencies(0).rdd

hadoopRDD: org.apache.spark.rdd.RDD[_] = HadoopRDD[3] at textFile at :12

#hadoopRDD分区个数

scala> hadoopRDD.partitions.size

16/04/27 21:46:35 INFO FileInputFormat: Total input paths to process : 1

16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.64:50010

16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.63:50010

16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.62:50010

res7: Int = 2

返回第一分区所在服务器

scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(0))

res9: Seq[String] = WrappedArray(yarn4, yarn3, yarn2)

2.3.RDD依赖关系

由于RDD是粗粒度的操作数据集,每一个转换操作都会生成一个新的RDD,所以RDD之间会形成类似于流水线一样的前后依赖关系,

有两种类型依赖: 窄依赖和宽依赖。

窄依赖:一个父RDD分区最多只被子RDD的一个分区所使用

宽依赖:多个子RDD的分区依赖于同一个父RDD的分区。

宽依赖通常是Spark拆分Stage的边界,在同一个Stage内均为窄依赖。

  • 窄依赖:

每一个父RDD的分区最多只被子RDD的一个分区所使用,如图所示:

Spark 弹性分布式数据集(RDD)

  • 宽依赖

多个子RDD的分区会依赖同一个父RDD的分区,如图所示:

Spark 弹性分布式数据集(RDD)

使用 RDD.toDebugString 可以看到整个 logical plan (RDD 的数据依赖关系).

区分两种依赖关系原因:

org.apache.spark.OneToOneDependency(窄依赖)

org.apache.spark.ShuffleDependency(宽依赖)

scala> val rdd = sc.makeRDD(1 to 10 )

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :12

#map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

#将数据转成K-V格式.

scala> val mapRDD = rdd.map(x => (x,x))

scala> mapRDD.dependencies

res14: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@22c3119a)

scala> val shuffleRDD = mapRDD.partitionBy(new org.apache.spark.HashPartitioner(3))

shuffleRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[7] at partitionBy at :16

scala> shuffleRDD.dependencies

res16: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@5a6bded6)

2.4.RDD分区计算(compute)

spark中每个RDD计算都是以分区为单位,而RDD中的compute函数都是在对迭代器进行复合,不需要保存每次的计算结果。

compute函数只返回相应分区数据的迭代器.

基于RDD的每一个分区,执行compute操作。

对于HadoopRDD来说,compute中就是从HDFS读取分区中数据块信息。

对于JdbcRDD来说,就是连接数据库,执行查询,读取每一条数据。

scala> val rdd = sc.parallelize(1 to 10 ,2)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :12

scala> val map_rdd = rdd.map(a => a+ 1)

map_rdd: org.apache.spark.rdd.RDD[Int] = MappedRDD[9] at map at :14

scala> val filter_rdd = map_rdd.filter(a => (a >3))

filter_rdd: org.apache.spark.rdd.RDD[Int] = FilteredRDD[10] at filter at :16

scala> val context = new org.apache.spark.TaskContext(0,0,0)

context: org.apache.spark.TaskContext = org.apache.spark.TaskContext@7fc24284

#返回分区1的迭代器

scala> val iter0 = filter_rdd.compute(filter_rdd.partitions(0),context)

iter0: Iterator[Int] = non-empty iterator

scala> iter0.toList

res17: List[Int] = List(4, 5, 6)

#返回分区2的迭代器

scala> val iter1 = filter_rdd.compute(filter_rdd.partitions(1),context)

iter1: Iterator[Int] = non-empty iterator

scala> iter1.toList

res18: List[Int] = List(7, 8, 9, 10, 11)

2.5.RDD分区函数(partitioner)

partitioner就是spark的分区函数,spark实现两种分区函数: HashPatition 和RangePatitoner,且partitioner属性只存在于K-V类型的RDD中,对于非K-V类型的partitioner的值是None。

分区函数决定了rdd本身的分区数量,也可作为其父rddshuffle输出中每个分区进行数据分割的依据。

#HashPartitioner案例:

scala> val rdd = sc.makeRDD(1 to 10, 2).map(x => (x,x))

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[1] at map at :12

scala> rdd.partitioner

res0: Option[org.apache.spark.Partitioner] = None

#最终被分为3个分区,根据key % 3 分到不同分区

scala> val group_rdd = rdd.groupByKey(new org.apache.spark.HashPartitioner(3))

scala> group_rdd.partitioner

res1: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@796d413c)

#行动操作,查看每个分区内的值

scala> group_rdd.collectPartitions()

Spark 弹性分布式数据集(RDD)

3.RDD属性:

通过RDD的内部属性,可以获取相应的元数据信息,通过这些元数据信息可以支持更负责的算法或优化。

分区列表: 通过分区列表可以找到一个RDD中包含的所有分区及其所在地址。

计算每个分片的函数:

对父RDD的依赖列表:

对key-value对数据类型RDD的分区器:控制分区策略和分区数。

每个数据分区的地址列表: 如果数据有副本,通过地址列表可以获取单个数据块的所有副本地址,为负载均衡和容错提供支持。

4.spark计算工作流

spark工作流: 输入,运行转换,输出。

在运行转换中通过算子对RDD进行转换。算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。

4.1.输入:

在spark程序运行中,数据从外部数据空间输入到spark,数据就进入了spark运行时数据空间,会转化为spark中的数据块,通过blockManager进行管理。

4.2.运行转换:

在spark数据输入形成RDD后,变可以通过转换算子等,对数据操作并将RDD转化成新的RDD,通过运行算子,触发spark提交作业。如果数据需要复用,可以通过cache算子,将数据缓存到内存。

4.3.输出:

程序运行结束数据会输出spark运行时空间,存储到分布式存储中(saveAsTextFile输出到HDFS)或scala数据或集合中(collect 输出到scala集合,count返回scala int型数据)。

5.RDD分区说明

RDD作为一个分布式的数据集,是分布在多个worker节点上的。如下图所示,RDD1有五个分区(partition),他们分布在了四个worker nodes 上面,RDD2有三个分区,分布在了三个worker nodes上面。

Spark 弹性分布式数据集(RDD)

想要重新给rdd分区,直接调用rdd.repartition方法就可以了,如果想具体控制哪些数据分布在哪些分区上,可以传一个Ordering进去。比如说,我想要数据随机地分布成10个分区,可以:

class MyOrdering[T] extends Ordering[T]{ def compare(x:T,y:T) = math.random compare math.random}// 假设数据是Int类型的rdd.repartition(10)(new MyOrdering[Int]) 

6.RDD操作

rdd有两种类型操作: transformations 和actions

transformations :转换RDD

action 操作:运行计算返回结果给驱动程序。

transformation 是懒操作,只有当action需要给driver 程序返回结果时才执行。


分享到:


相關文章: