Spark Datasets 介绍

背景

开发者一直很喜欢Spark提供的简简洁而强大的API接口, 最小的编码工作实现复杂的分析过程。

Databricks 通过引入DataFrames 和 Spark SQL 来改善Spark的可用性和性能。

DataFrames 和 Spark SQL 都是基于结构化的数据(比如数据库表,JSON文件等)的上层API, 通过他们Spark可以自动优化存储(Storage)和计算(computation)。

这些优化得益于Catalyst optimizer 和 Tungsten。 但是Spark的RDD API做不到这些,比如操作原始二进制形式的数据。

Datasets,他是DataFrame API的扩展, 提供了类型安全的,面向对象的变成接口。

Spark1.6 包含了Datasets的API预览版本, 也是下几个版本开发重点。

Datasets 利用Spark 的 Catalyst optimizer解析表达式和数据field 来生成执行计划。

Datasets 还支持Tungsten的快速内存编码(fast in-memory encoding).

Dataset继承了这些优点并且是类型安全的,这意味着在编译时能够检查错误。同时还拥有面向对象风格的接口。

Datasets 使用

Datasets 是强类型的、不可变的对象集合,集合中的对象映射为关系型数据的schema。

Datasets 的核心是一个新的概念Encoder,它负责JVM对象和tabular representation之间的转换。

tabular representation 使用Spark的Tungsten 二进制格式,允许直接操作序列化的数据,提升内存使用。

Spark1.6 支持自动为一下类型生成encoder, 基本类型(比如:String, Integer ,Long), Scala case class, JavaBeans.

Datasets的API和RDD非常相似,提供了很多相同转换函数(比如:map, flatMap, filter)。

下面的代码,实现了从文本中读取行,然后分割成单词。

RDD:

Spark Datasets 介绍

Datasets:

Spark Datasets 介绍

RDD 和 Datasets 都可以很轻松通过lambda函数实现这些转换。编译器和IDE知道用那种类型,并且可以在构造数据管道时提供帮助提示和错误信息。

虽然这些高层次的代码在句法上很相似,但是使用Datasets可以获得关系型执行引擎的能力。

比如,执行一个聚合操作

RDD:

Spark Datasets 介绍

Datasets:

Spark Datasets 介绍

Datasets

Datasets 版本不止在代码上更简洁,执行速度上也更快。

从下面的对比图,可以看出Datasets比RDD快。

相比之下,使用RDD要获得同样的性能,需要用户手动考虑并行的执行计算。

Spark Datasets 介绍

Datasets API 的另一个优点就是降低内存使用。

因为Spark 知道Datasets中数据的结构,所以在缓存Dataset时能够进行更多的优化。

下图比较了RDD和Datasets缓存几百万字符串使用的内存。缓存,对于双方来说都能够导致性能的提升。但是,由于Dataset Encoder给Spark提供更多关于数据存储的信息,所以缓存占用空间更小,大概比RDD小4.5倍。

Spark Datasets 介绍

Encoders 光速序列化

Encoder 是被优化过的,在序列化和反序列化时,使用运行时代码生成器构建bytecode。

所以速度明显快于java或kryo序列化器。

除了速度,Encoder序列化占用的空间更小(2倍),降低了网络传输成本。此外,序列化的数据是Tungsten二进制格式,这意味着许多时候都可以直接操作数据,而不需要实例化整个对象。Spark支持的Encoder有原始类型(如String, Integer, Long),Scala case class和Java bean。未来会加入可自定义类型的Encoder。

Spark Datasets 介绍

无缝支持半结构化数据

Encoder还可以作为一个强大的桥梁,连接半结构化的格式(例如JSON)和类型安全的语言如Java和Scala。

例如,以下关于大学的数据集:

{"name": "UC Berkeley", "yearFounded": 1868, numStudents: 37581}

{"name": "MIT", "yearFounded": 1860, numStudents: 11318}

你可以简单地定义一个类,并将输入数据映射到类上,而不是人工提取字段,再把它们转换成期望的类型。Spark会自动识别名称和类型。

Spark Datasets 介绍

Encoder在映射的过程中,会先检查定义的类的类型是否与数据相符,如果不相符,则能够提供有用的错误信息,防止以不正确的方式处理TB级的数据。例如,如果我们使用的数据类型太小,转换到一个对象时会导致截断,这时Analyzer会抛出AnalysisException,如:如果numStudents是byte类型的,当有数据超过255时就会报错。

case class University(numStudents: Byte)

val schools = sqlContext.read.json("/schools.json").as[University]

org.apache.spark.sql.AnalysisException: Cannot upcast yearFounded from bigint to smallint as it may truncate

执行映射时,Encoder将自动处理复杂类型,包括嵌套类、array和map。

Java和Scala的简单API

Dataset的另一个目标是为Scala和java提供统一的接口。这种统一对应Java用户来说是个好消息,因为它确保了Java的接口不会落后于Scala。代码实例也会变得更通用,也不同处理输入类型的少许区别了。对于Java用户唯一不同的是需要指定Encoder,因为编译器不提供类型信息。例如,如果想要处理JSON数据使用Java可以这样做:

Spark Datasets 介绍

下一步展望

Dataset是一个新的API,它很容易与RDD和现有Spark项目相融合。只需要通过Dataset的rdd方法,就能将Dataset转换成RDD。

从长远来看,我们希望,Dataset可以成为编程时的首选。

Spark 2.0 版本的Dataset,我们计划做以下改进:

  • 性能优化:在很多场景中,现在的Datasets实现不能利用额外信息增加性能,可能比RDD还慢 。在接下来的几个版本中,我们将致力于改善这些新API的性能。
  • 自定义Encoder:开放自定义Encoder的API。
  • Python支持。
  • 统一DataFrame和Dataset:为了保证兼容性,DataFrame和Dataset目前不是继承自共同的父类。Spark 2.0 版本,我们将会统一这些抽象,并尽可能不改变现有的 API,使得开发库时能够更容易的兼容DataFrame和Dataset。

  • 如果你想尝试一下Dataset。我们提供的以下例子:Working with Classes, Word Count。https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Classes.html


分享到:


相關文章: