我希望自己剛開始就瞭解關於Spark的事情

Jeremy Krinsley,Pam Wu,Daniel Melemed,Jarrod Parker,Linan Zheng

我希望自己剛開始就瞭解關於Spark的事情

大約12個月前,我們決定將實體解決方案管道移至Scala / Spark Universe。 這並非沒有痛點。 這是我們作為公司的第一筆主要推動力,目的是生產已經存在很長時間的實體解析原型。 這也是我們團隊第一次與Scala或Spark合作。

回顧一年,我想通過蟲洞將數十個"學習時刻"傳遞給我以前的自我。

如果有機會,這裡是傳遞:

知道你洗了什麼

隨機播放是指通過Spark集群的網絡在Worker之間傳輸數據。 它是需要重組數據的操作的中心,這被稱為廣泛依賴關係(請參閱廣泛與狹窄依賴關係)。 這種操作可能很快成為Spark應用程序的瓶頸。 要正確使用Spark,您需要了解隨機播放的內容,為此,必須瞭解您的數據。

數據傾斜導致隨機混排

偏斜是數據分配中的不平衡。 如果您無法說明數據的分佈方式,則可能會發現Spark天真地將絕大多數行放在一個執行程序上,而將其餘部分放在所有執行程序上。 這是歪斜的,無論是由於引起內存不足錯誤,網絡超時還是永無休止地以指數級方式運行的進程,都會殺死您的應用程序。

分佈均勻的列上的分區

控制Spark隨機播放的一種有效方法是智能地對數據進行分區。 在右列(或一組列)上進行分區有助於平衡為了執行操作而必須跨集群網絡映射的數據量。 通常,對唯一ID進行分區是個不錯的策略,但不要對稀疏填充的列或過度代表特定值的列進行分區。

當心默認分區

圍繞要解決的事情建模分區的數量絕對至關重要。 在我們的應用程序階段中,我們一次在許多異構大小的數據集上並行運行轉換,200個分區工作正常。

當我們處理數十億個成對比較時,我們發現4-10k範圍內的分區工作效率最高。

此外,如果您在單個服務器(或本地)上運行測試,則可以通過將數據重新劃分為大小1來看到速度的顯著提高。 本地8或16核計算機,但是在運行CI的2核服務器上將無法完成。 將數據組合到1個分區解決了我們的問題。

使用.par推動您的工作進入超速駕駛狀態

雖然您可以依靠Spark來完成許多並行的繁重工作,但可以通過深思熟慮地使用Scala內置的.par功能(可在可迭代對象上使用)來進一步推動工作。 我們的ER管道的初始步驟包括讀取數十個異構數據集,並對每個數據集應用共享的轉換管道。 一個簡單的datasets.par.foreach將我們的運行時間減少了一半。

當然,您只能在完全確定性的管道方面依靠它的用法,並且不存在競爭狀況的風險。 過度使用.par可能很快導致神秘地消失或覆蓋數據。

連接高度易燃

到目前為止,聯接是最大的改組違規者,Spark啟用的規模擴大了sql聯接的危險。 如果聯接雙方的聯接值重複,即使聯接中等大小的數據也可能導致爆炸。 這是我們Enigma必須特別警惕的地方,"唯一的"公共數據密鑰可能會導致數百萬行的聯接成指數爆炸成十億行的聯接!

如果您的聯接列有可能具有空值,則可能會出現嚴重的偏差。 解決此問題的一個好方法是"鹽化"您的null。 這實質上意味著在運行聯接之前將任意值(如uuid)預填充到空單元格中。

您的數據真實嗎?

Spark中的操作分為轉換和操作。 轉換是惰性操作,可讓Spark在後臺優化您的查詢。 他們將設置一個DataFrame進行更改(例如添加一列或將其連接到另一個列),但不會在這些計劃上執行。 這可能會導致令人驚訝的結果。 例如,重要的是要記住,在執行操作之前,UDF的行為要沒有具體值。 例如,想象一下,使用Spark內置的


monotonically_increasing_id創建一個id列,然後嘗試加入該列。 如果您沒有在生成這些ID之間進行任何操作(例如檢查點檢查),則您的值尚未實現。 結果將是不確定的!

檢查點Checkpoint是你的朋友

檢查點基本上是將數據保存到磁盤並重新加載回磁盤的過程,這在Spark之外的其他任何地方都是多餘的。 這不僅會觸發任何等待轉換的動作,還會截斷該對象的Spark查詢計劃。 該動作不僅會顯示在您的Spark UI中(從而指示您的工作確切位置),而且還有助於避免重新觸發DAG中的潛在udf動作並節省資源,因為它有可能允許您釋放 否則將被緩存以供下游訪問的內存。 根據我們的經驗,檢查點數據也是數據調試取證和重新定位的重要來源。 例如,我們的管道的培訓數據是從應用程序中途生成的5億行表中過濾掉的。

健全性通過監視檢查您的運行時

Spark UI是您的朋友,Ganglia等監控工具也是您的朋友,它可以讓您實時瞭解運行情況。 Yarn對Spark查詢計劃的描述可以立即傳達您的意圖是否與您的執行相吻合。 是應該作為一個聯接的東西實際上是許多小的聯接的級聯嗎?

SparkUI還包含有關作業級別,階段級別和執行者級別的信息。 這意味著您可以快速查看去往每個分區或每個執行器的數據數量/卷是否有意義,並且可以查看工作的任何部分是否應該佔數據的10%,但佔用了90%的數據。 時間。 監視工具使您可以查看執行程序之間的總內存和CPU使用率,這對於資源規劃和對失敗作業的屍體剖析至關重要。

剛開始使用Spark時,我們在Yarn和Amazon的EMRFS上使用了獨立集群。 我們瞭解了收集Spark日誌是一項艱鉅的任務的艱辛方法。 現在,我們很高興使用Databricks,它可以為我們處理日誌聚合的基本問題,但是,如果您要自己開發解決方案,那麼像Kibana這樣的日誌聚合工具可能對於自省性至關重要。

錯誤消息不代表他們說什麼

當問題真的出在其他地方時,Spark抱怨了一件事情就花了一段時間。

· "由對等方重置連接"通常意味著您偏斜了數據,並且一個特定的工作線程內存不足。

· "
java.net.SocketTimeoutException:寫入超時"可能意味著您將分區數設置得太高,並且文件系統在處理Spark試圖執行的同時寫數時太慢。

· "序列化結果的總大小…大於
spark.driver.maxResultSize"可能意味著您將分區數設置得過高,並且結果不適用於特定工作人員。

· "列x不是表y的成員":您運行了一半的管道,只是為了發現此sql連接錯誤。 將運行時執行與驗證一起放在前端,以避免對這些錯誤進行逆向工程。

· 有時您會收到一個真正的內存不足錯誤,但取證工作將是為了瞭解原因:是的,您可以增加單個工作人員的人數以使該問題消失,但是在執行此操作之前,您應該始終問自己: "數據分佈合理嗎?"

Scala / Spark CSV讀取很脆弱

來自Python,得知在Scala / Spark中天真地讀取CSV常常會導致無聲的轉義字符錯誤,這真是令人驚訝。 場景:您有一個CSV並將它天真地讀入spark:

val df = spark.read.option("header", "true").csv("quote-happy.csv")

您的DataFrame看起來很高興-沒有運行時異常,您可以在DataFrame上執行操作。 但是,在對列進行了仔細的調試之後,您意識到在數據的某個點上,實際上所有內容都移到了一個或幾個列上。 事實證明,為了安全起見,您需要在讀取中包含.option(" escape"," "")。

更好的建議:使用實木複合地板!

Parquet是你的朋友

開源文件格式旨在為讀/寫操作提供比未壓縮CSV更高效率的數量級。

Parquet是" columnar",因為它被設計為僅從Spark sql查詢中指定的那些列中選擇數據,並跳過不需要的那些列。 此外,它在類似於sql的過濾操作上實現"謂詞下推"操作,該操作僅對給定列中值的相關子集有效地運行查詢。 從未壓縮的表格文件格式轉換為鑲木地板是提高Spark性能的最基本的操作之一。

如果您負責從另一種格式生成Parquet(例如,您正在使用PyArrow和Pandas進行某些大規模遷移),請注意,僅創建一個Parquet文件將帶來該格式的主要優勢。

結論

在使用Spark一年後,您就收到了一些零散的建議。 希望我未來的自己已經發現了蟲洞,並希望在您閱讀本文時寄給我第二版。

最初於2018年11月8日發佈在www.enigma.com。

(本文翻譯自Enigma的文章《Things I Wish I'd Known About Spark When I Started (One Year Later Edition)》,參考:
https://medium.com/enigma-engineering/things-i-wish-id-known-about-spark-when-i-started-one-year-later-edition-d767430181ed)


分享到:


相關文章: