1.RDD简介
RDD弹性分布式数据集,一个RDD代表一个被分区的只读数据集。
RDD是spark的核心数据结构。通过RDD的依赖关系形成spark的调度顺序,通过对RDD的操作形成整个spark程序。
RDD的本质:在并行计算的各个阶段进行有效的数据共享(减少网络之间的文件传输代价)。
1.1.RDD的容错处理
由于RDD的接口只支持粗粒度操作(即一个操作会被应用在RDD的所有数据上),所有只要通过记录下这些作用在RDD之上的转换操作,来构建rdd的继承关系(lineage),就可以有效的进行容错处理。
1.2.RDD的生成
RDD的生成只有三种途径:
1.从内存集合
2.外部存储系统,hdfs(hive ,Cassandra, hbase)
默认 spark为每个hdfs的数据块创建一个分区,也可以设置较大分区数, 但是不能设置分区数小于数据块的数量。
使用sparkContext.wholeTextFiles读取一个包含很多小文件的目录,返回(文件名,内容)对,相对于textFile 返回文件中每一行作为一条记录。
为了SequenceFiles文件,使用sc.sequenceFile[K,V],k v是文件中k,v的数据类型。
为了hadoop其他的输入格式,可以使用sc.hadoopRDD。
一个partition 对应一个task,一个task 必定存在于一个Executor,一个Executor 对应一个JVM.
3.通过转换操作来自于其他的RDD ,如map,filter,join等。
2.spark RDD接口
partition 分区,一个RDD会有一个或者多个分区
preferredLocations(p) 对于分区p,返回数据本地化计算的节点
dependencies:RDD依赖关系
compute(p,context): 对于分区p,进行迭代计算
partitioner:RDD分区函数
2.1.RDD分区(partitions)
一个RDD包含一个或多个分区,每个分区都有分区属性,分区的多少决定了对RDD进行并行计算的并行度。
分区决定并行计算的粒度。每个rdd分区的计算操作都在一个单独的任务中被执行。
一个partition 对应一个task,一个task 必定存在于一个Executor,一个Executor 对应一个JVM.
利用rdd的成员变量partitions返回的数组大小查询RDD分区数。
#数组转换成分区,第二个参数是指定分区数
scala> val rdd =sc.parallelize(1 to 100,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at
#返回分区数
scala> rdd.partitions.size
res2: Int = 2
#查看默认分区数 ()
scala> val rdd2 = sc.parallelize(1 to 100)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at
scala> rdd.partitions.size
res5: Int = 2
2.2.RDD优先位置(preferredLocations)
返回此RDD的每个分区所存储的位置。(用于在spark进行任务调度的时候,尽可能将任务分配到数据块所存储的位置)
依从hadoop读取数据生成RDD为例:preferredLocations返回每个数据块所在的机器名或IP地址,如果每一块数据是多份存储,则返回多个机器地址。
#读取hdfs生成一个类型为MappedRDD的RDD (一个文件生成一个RDD).
scala> val rdd = sc.textFile("hdfs://yarn1:8020/hmbbs_logs/access_2013_05_31.log")
16/04/27 21:45:41 INFO MemoryStore: ensureFreeSpace(219256) called with curMem=0, maxMem=311387750
16/04/27 21:45:41 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 214.1 KB, free 296.8 MB)
rdd: org.apache.spark.rdd.RDD[String] = MappedRDD[4] at textFile at
#通过依赖关系找到原始的rdd
scala> val hadoopRDD = rdd.dependencies(0).rdd
hadoopRDD: org.apache.spark.rdd.RDD[_] = HadoopRDD[3] at textFile at
#hadoopRDD分区个数
scala> hadoopRDD.partitions.size
16/04/27 21:46:35 INFO FileInputFormat: Total input paths to process : 1
16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.64:50010
16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.63:50010
16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.62:50010
res7: Int = 2
返回第一分区所在服务器
scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(0))
res9: Seq[String] = WrappedArray(yarn4, yarn3, yarn2)
2.3.RDD依赖关系
由于RDD是粗粒度的操作数据集,每一个转换操作都会生成一个新的RDD,所以RDD之间会形成类似于流水线一样的前后依赖关系,
有两种类型依赖: 窄依赖和宽依赖。
窄依赖:一个父RDD分区最多只被子RDD的一个分区所使用
宽依赖:多个子RDD的分区依赖于同一个父RDD的分区。
宽依赖通常是Spark拆分Stage的边界,在同一个Stage内均为窄依赖。
窄依赖:
每一个父RDD的分区最多只被子RDD的一个分区所使用,如图所示:
宽依赖
多个子RDD的分区会依赖同一个父RDD的分区,如图所示:
使用 RDD.toDebugString
可以看到整个 logical plan (RDD 的数据依赖关系).
区分两种依赖关系原因:
org.apache.spark.OneToOneDependency(窄依赖)
org.apache.spark.ShuffleDependency(宽依赖)
scala> val rdd = sc.makeRDD(1 to 10 )
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at
#map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
#将数据转成K-V格式.
scala> val mapRDD = rdd.map(x => (x,x))
scala> mapRDD.dependencies
res14: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@22c3119a)
scala> val shuffleRDD = mapRDD.partitionBy(new org.apache.spark.HashPartitioner(3))
shuffleRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[7] at partitionBy at
scala> shuffleRDD.dependencies
res16: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@5a6bded6)
2.4.RDD分区计算(compute)
spark中每个RDD计算都是以分区为单位,而RDD中的compute函数都是在对迭代器进行复合,不需要保存每次的计算结果。
compute函数只返回相应分区数据的迭代器.
基于RDD的每一个分区,执行compute操作。
对于HadoopRDD来说,compute中就是从HDFS读取分区中数据块信息。
对于JdbcRDD来说,就是连接数据库,执行查询,读取每一条数据。
scala> val rdd = sc.parallelize(1 to 10 ,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at
scala> val map_rdd = rdd.map(a => a+ 1)
map_rdd: org.apache.spark.rdd.RDD[Int] = MappedRDD[9] at map at
scala> val filter_rdd = map_rdd.filter(a => (a >3))
filter_rdd: org.apache.spark.rdd.RDD[Int] = FilteredRDD[10] at filter at
scala> val context = new org.apache.spark.TaskContext(0,0,0)
context: org.apache.spark.TaskContext = org.apache.spark.TaskContext@7fc24284
#返回分区1的迭代器
scala> val iter0 = filter_rdd.compute(filter_rdd.partitions(0),context)
iter0: Iterator[Int] = non-empty iterator
scala> iter0.toList
res17: List[Int] = List(4, 5, 6)
#返回分区2的迭代器
scala> val iter1 = filter_rdd.compute(filter_rdd.partitions(1),context)
iter1: Iterator[Int] = non-empty iterator
scala> iter1.toList
res18: List[Int] = List(7, 8, 9, 10, 11)
2.5.RDD分区函数(partitioner)
partitioner就是spark的分区函数,spark实现两种分区函数: HashPatition 和RangePatitoner,且partitioner属性只存在于K-V类型的RDD中,对于非K-V类型的partitioner的值是None。
分区函数决定了rdd本身的分区数量,也可作为其父rddshuffle输出中每个分区进行数据分割的依据。
#HashPartitioner案例:
scala> val rdd = sc.makeRDD(1 to 10, 2).map(x => (x,x))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[1] at map at
scala> rdd.partitioner
res0: Option[org.apache.spark.Partitioner] = None
#最终被分为3个分区,根据key % 3 分到不同分区
scala> val group_rdd = rdd.groupByKey(new org.apache.spark.HashPartitioner(3))
scala> group_rdd.partitioner
res1: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@796d413c)
#行动操作,查看每个分区内的值
scala> group_rdd.collectPartitions()
3.RDD属性:
通过RDD的内部属性,可以获取相应的元数据信息,通过这些元数据信息可以支持更负责的算法或优化。
分区列表: 通过分区列表可以找到一个RDD中包含的所有分区及其所在地址。
计算每个分片的函数:
对父RDD的依赖列表:
对key-value对数据类型RDD的分区器:控制分区策略和分区数。
每个数据分区的地址列表: 如果数据有副本,通过地址列表可以获取单个数据块的所有副本地址,为负载均衡和容错提供支持。
4.spark计算工作流
spark工作流: 输入,运行转换,输出。
在运行转换中通过算子对RDD进行转换。算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
4.1.输入:
在spark程序运行中,数据从外部数据空间输入到spark,数据就进入了spark运行时数据空间,会转化为spark中的数据块,通过blockManager进行管理。
4.2.运行转换:
在spark数据输入形成RDD后,变可以通过转换算子等,对数据操作并将RDD转化成新的RDD,通过运行算子,触发spark提交作业。如果数据需要复用,可以通过cache算子,将数据缓存到内存。
4.3.输出:
程序运行结束数据会输出spark运行时空间,存储到分布式存储中(saveAsTextFile输出到HDFS)或scala数据或集合中(collect 输出到scala集合,count返回scala int型数据)。
5.RDD分区说明
RDD作为一个分布式的数据集,是分布在多个worker节点上的。如下图所示,RDD1有五个分区(partition),他们分布在了四个worker nodes 上面,RDD2有三个分区,分布在了三个worker nodes上面。
想要重新给rdd分区,直接调用rdd.repartition方法就可以了,如果想具体控制哪些数据分布在哪些分区上,可以传一个Ordering进去。比如说,我想要数据随机地分布成10个分区,可以:
class MyOrdering[T] extends Ordering[T]{ def compare(x:T,y:T) = math.random compare math.random}// 假设数据是Int类型的rdd.repartition(10)(new MyOrdering[Int])
6.RDD操作
rdd有两种类型操作: transformations 和actions
transformations :转换RDD
action 操作:运行计算返回结果给驱动程序。
transformation 是懒操作,只有当action需要给driver 程序返回结果时才执行。
閱讀更多 從大數據說起 的文章