我希望自己刚开始就了解关于Spark的事情

Jeremy Krinsley,Pam Wu,Daniel Melemed,Jarrod Parker,Linan Zheng

大约12个月前,我们决定将实体解决方案管道移至Scala / Spark Universe。 这并非没有痛点。 这是我们作为公司的第一笔主要推动力,目的是生产已经存在很长时间的实体解析原型。 这也是我们团队第一次与Scala或Spark合作。

回顾一年,我想通过虫洞将数十个"学习时刻"传递给我以前的自我。

如果有机会,这里是传递:

知道你洗了什么

随机播放是指通过Spark集群的网络在Worker之间传输数据。 它是需要重组数据的操作的中心,这被称为广泛依赖关系(请参阅广泛与狭窄依赖关系)。 这种操作可能很快成为Spark应用程序的瓶颈。 要正确使用Spark,您需要了解随机播放的内容,为此,必须了解您的数据。

数据倾斜导致随机混排

偏斜是数据分配中的不平衡。 如果您无法说明数据的分布方式,则可能会发现Spark天真地将绝大多数行放在一个执行程序上,而将其余部分放在所有执行程序上。 这是歪斜的,无论是由于引起内存不足错误,网络超时还是永无休止地以指数级方式运行的进程,都会杀死您的应用程序。

分布均匀的列上的分区

控制Spark随机播放的一种有效方法是智能地对数据进行分区。 在右列(或一组列)上进行分区有助于平衡为了执行操作而必须跨集群网络映射的数据量。 通常,对唯一ID进行分区是个不错的策略,但不要对稀疏填充的列或过度代表特定值的列进行分区。

当心默认分区

围绕要解决的事情建模分区的数量绝对至关重要。 在我们的应用程序阶段中,我们一次在许多异构大小的数据集上并行运行转换,200个分区工作正常。

当我们处理数十亿个成对比较时,我们发现4-10k范围内的分区工作效率最高。

此外,如果您在单个服务器(或本地)上运行测试,则可以通过将数据重新划分为大小1来看到速度的显着提高。 本地8或16核计算机,但是在运行CI的2核服务器上将无法完成。 将数据组合到1个分区解决了我们的问题。

使用.par推动您的工作进入超速驾驶状态

虽然您可以依靠Spark来完成许多并行的繁重工作,但可以通过深思熟虑地使用Scala内置的.par功能(可在可迭代对象上使用)来进一步推动工作。 我们的ER管道的初始步骤包括读取数十个异构数据集,并对每个数据集应用共享的转换管道。 一个简单的datasets.par.foreach将我们的运行时间减少了一半。

当然,您只能在完全确定性的管道方面依靠它的用法,并且不存在竞争状况的风险。 过度使用.par可能很快导致神秘地消失或覆盖数据。

连接高度易燃

到目前为止,联接是最大的改组违规者,Spark启用的规模扩大了sql联接的危险。 如果联接双方的联接值重复,即使联接中等大小的数据也可能导致爆炸。 这是我们Enigma必须特别警惕的地方,"唯一的"公共数据密钥可能会导致数百万行的联接成指数爆炸成十亿行的联接!

如果您的联接列有可能具有空值,则可能会出现严重的偏差。 解决此问题的一个好方法是"盐化"您的null。 这实质上意味着在运行联接之前将任意值(如uuid)预填充到空单元格中。

您的数据真实吗?

Spark中的操作分为转换和操作。 转换是惰性操作,可让Spark在后台优化您的查询。 他们将设置一个DataFrame进行更改(例如添加一列或将其连接到另一个列),但不会在这些计划上执行。 这可能会导致令人惊讶的结果。 例如,重要的是要记住,在执行操作之前,UDF的行为要没有具体值。 例如,想象一下,使用Spark内置的


monotonically_increasing_id创建一个id列,然后尝试加入该列。 如果您没有在生成这些ID之间进行任何操作(例如检查点检查),则您的值尚未实现。 结果将是不确定的!

检查点Checkpoint是你的朋友

检查点基本上是将数据保存到磁盘并重新加载回磁盘的过程,这在Spark之外的其他任何地方都是多余的。 这不仅会触发任何等待转换的动作,还会截断该对象的Spark查询计划。 该动作不仅会显示在您的Spark UI中(从而指示您的工作确切位置),而且还有助于避免重新触发DAG中的潜在udf动作并节省资源,因为它有可能允许您释放 否则将被缓存以供下游访问的内存。 根据我们的经验,检查点数据也是数据调试取证和重新定位的重要来源。 例如,我们的管道的培训数据是从应用程序中途生成的5亿行表中过滤掉的。

健全性通过监视检查您的运行时

Spark UI是您的朋友,Ganglia等监控工具也是您的朋友,它可以让您实时了解运行情况。 Yarn对Spark查询计划的描述可以立即传达您的意图是否与您的执行相吻合。 是应该作为一个联接的东西实际上是许多小的联接的级联吗?

SparkUI还包含有关作业级别,阶段级别和执行者级别的信息。 这意味着您可以快速查看去往每个分区或每个执行器的数据数量/卷是否有意义,并且可以查看工作的任何部分是否应该占数据的10%,但占用了90%的数据。 时间。 监视工具使您可以查看执行程序之间的总内存和CPU使用率,这对于资源规划和对失败作业的尸体剖析至关重要。

刚开始使用Spark时,我们在Yarn和Amazon的EMRFS上使用了独立集群。 我们了解了收集Spark日志是一项艰巨的任务的艰辛方法。 现在,我们很高兴使用Databricks,它可以为我们处理日志聚合的基本问题,但是,如果您要自己开发解决方案,那么像Kibana这样的日志聚合工具可能对于自省性至关重要。

错误消息不代表他们说什么

当问题真的出在其他地方时,Spark抱怨了一件事情就花了一段时间。

· "由对等方重置连接"通常意味着您偏斜了数据,并且一个特定的工作线程内存不足。

· "
java.net.SocketTimeoutException:写入超时"可能意味着您将分区数设置得太高,并且文件系统在处理Spark试图执行的同时写数时太慢。

· "序列化结果的总大小…大于
spark.driver.maxResultSize"可能意味着您将分区数设置得过高,并且结果不适用于特定工作人员。

· "列x不是表y的成员":您运行了一半的管道,只是为了发现此sql连接错误。 将运行时执行与验证一起放在前端,以避免对这些错误进行逆向工程。

· 有时您会收到一个真正的内存不足错误,但取证工作将是为了了解原因:是的,您可以增加单个工作人员的人数以使该问题消失,但是在执行此操作之前,您应该始终问自己: "数据分布合理吗?"

Scala / Spark CSV读取很脆弱

来自Python,得知在Scala / Spark中天真地读取CSV常常会导致无声的转义字符错误,这真是令人惊讶。 场景:您有一个CSV并将它天真地读入spark:

val df = spark.read.option("header", "true").csv("quote-happy.csv")

您的DataFrame看起来很高兴-没有运行时异常,您可以在DataFrame上执行操作。 但是,在对列进行了仔细的调试之后,您意识到在数据的某个点上,实际上所有内容都移到了一个或几个列上。 事实证明,为了安全起见,您需要在读取中包含.option(" escape"," "")。

更好的建议:使用实木复合地板!

Parquet是你的朋友

开源文件格式旨在为读/写操作提供比未压缩CSV更高效率的数量级。

Parquet是" columnar",因为它被设计为仅从Spark sql查询中指定的那些列中选择数据,并跳过不需要的那些列。 此外,它在类似于sql的过滤操作上实现"谓词下推"操作,该操作仅对给定列中值的相关子集有效地运行查询。 从未压缩的表格文件格式转换为镶木地板是提高Spark性能的最基本的操作之一。

如果您负责从另一种格式生成Parquet(例如,您正在使用PyArrow和Pandas进行某些大规模迁移),请注意,仅创建一个Parquet文件将带来该格式的主要优势。

结论

在使用Spark一年后,您就收到了一些零散的建议。 希望我未来的自己已经发现了虫洞,并希望在您阅读本文时寄给我第二版。

最初于2018年11月8日发布在www.enigma.com。

(本文翻译自Enigma的文章《Things I Wish I'd Known About Spark When I Started (One Year Later Edition)》,参考:
https://medium.com/enigma-engineering/things-i-wish-id-known-about-spark-when-i-started-one-year-later-edition-d767430181ed)