Linux 基金會的 Delta Lake(Delta.io)是一個給數據湖提供可靠性的開源存儲層軟件。在 QCon 全球軟件開發大會(上海站)2019 的演講中,Databricks 公司的 Engineering Manager 李瀟帶我們瞭解了 Delta Lake 在實際生產中的應用與實踐以及未來項目規劃,本文便整理自此次演講。 今天我跟大家分享 一下 Data Lake 和 Delta 架構。
2019 年 10 月 16 日,在荷蘭阿姆斯特丹舉行的 Spark+AI 歐洲峰會上,Databricks 正式宣佈將 Delta Lake 捐贈給了 Linux 基金會,其成為了該基金會中的一個正式項目。我們期待在今年(2019 年)或者是未來,很快, Delta Lake 將會成為數據湖的主流或者說是事實標準。
在 9 月份公佈的 2019 年最佳開源軟件獎名單中,Delta Lake 也榜上有名。正如官方對 Delta Lake 的頒獎評語描述,大家都很驚訝,Databricks 公司竟然把 Delta Lake 這個核心的拳頭產品開源了。Delta Lake 的推出實際上是為了解決 Spark 作為大數據分析平臺的諸多痛點,也相信它將會普惠整個 Spark 社區以及其他大數據社區,真正解決數據湖管理的各種關鍵問題。
很有幸,我參與了 Delta Lake 早期的開發,尤其是 merge、update、delete 這種關鍵 DML 的設計和實現。這個項目最早啟動於 2017 年 6 月。當時,多位用戶向我們抱怨 Spark 的不足和使用的不便,我們公司的工程師們討論後發現,是時候去提出我們自己的存儲架構。Spark 作為一種存儲和計算分離的一種計算引擎,之前我們主要依賴於其他開源或非開源的項目去解決各種數據存儲的問題,但實際上我們發現在用戶的生產環境中,現有的存儲方案都沒辦法真正的解決數據湖。於是乎,我們就和客戶一起嘗試去開發,去解決實際生產環境中的問題。經過四個月的快速開發,我們在 2017 年 10 月正式宣佈了 Delta Lake 產品的誕生。在第二年 6 月份的 Spark+AI 峰會中,Apple 的工程師和我們的工程師 Michael 一起做了主題演講,分享了 Apple 在使用 Delta Lake 的一些寶貴經驗,比如說他們當時用 Delta Lake 解決了 trillion 級別數據的大表的度寫。
Summit 之後,我們得到了多方的好評,目前已有超過 3000 個客戶正在將 Delta Lake 用於他們的生產環境中。在這個背景中,我們認為我們應該把它推廣到整個 Spark 社區,幫助整個大數據社區解決他們大數據管理的痛點。於是,2019 年 4 月,我們決定開源了。
但在開源之後,Spark 社區有很多反饋說 Delta Lake 是你們公司的一個的 Github repository,你們之後隨時可能會改開源的 License,你們的開源管理模式都不是很透明。於是乎,為了解決這樣的疑惑,我們決定把它捐贈給 Linux 基金會,讓它成為一個標準開放的平臺,讓更多的人可以參與到 Delta Lake 的開發和使用中來。
今天我們將分享一些典型的場景,為什麼 Delta Lake 可以解決大家的各種痛點,然後也分享一下 Delta Lake 的基本原理和 Delta 架構,以及它如何取代大家正在普遍使用的 Lambda 架構。
數據工程師的糾結與運維的凌亂
項目經理總會跟工程師說,我們有一個很簡單的需求。可是,事實往往卻是,這些簡單的需求相當之難以實現。
在大數據的生產系統,往往,作為工程師的你,會面對這樣的一個項目經理:“我要有這麼一個 Data Pipeline,持續地處理數據,並且是增量處理,只要有新的數據來了就處理,不應該每次把所有的歷史數據都重新處理,而是隻應該處理增量數據,並且要保證高效快速。記住,我們不能讓用戶在使用中意識到這是批處理還是流處理。總之,就是快速得到正確結果。”
那麼作為數據工程師的你,要建設一個基本的 Data Pipeline [數據處理流水線],按照項目經理的說法,那就很簡單。我們把 Kafka、Kinesis、各種各樣數據湖的格式用 Spark 度出來,再用 Spark 做一些數據清理和數據轉換,然後再把結果存到一個數據湖,再用另一個 Spark job 把數據湖的內容分析一下,做訓練或者做各種各樣的分析,最後產生一個報告給終端用戶。這是一個非常簡單的 Pipeline。但是這個 Pipeline 有個頭痛的問題。如果僅僅用 Spark 的批處理,那麼延遲可能不達標,而且也不是在做增量處理。
那麼第二個方案就出來了,用 Spark Structured Streaming。Structured Streaming 有 Trigger Once,可以幫你記錄上次處理到什麼地方,這樣的話可以把延遲降低,只處理增量,你也不需要去記錄和管理上次處理到哪裡了。可是我們又遇到了一個新的問題,就是你如果用 Structured Streaming,每個小的 Batch 都會產生多個小的 Spark 的結果文件。小文件越來越多,整個 Pipeline 就越來越慢,延遲往往到了最後就無法接受了。
為此,那我們就得選擇下一個方案。我們既然有小文件,就得定期去做壓縮。但是在做壓縮的過程中整個作業線會下線。為什麼?由於缺乏原子性讀寫的能力,沒辦法在寫你的壓縮的時候同時讀數據。壓縮的週期太長也會影響到你的生產最後報表的時效性。比如說,業務是不能接受半小時或者一個小時這種延遲的。那麼,這個時候,大家自然而然會選擇最經典的架構,Lambda 架構。就是說,你同時可以部署一個批處理的和一個流處理的,批可以慢一點,但是結果全面準確,而流處理就是用最快的時間對最新增量產生結果。然後將批和流的結果彙總,產生一個全局的結果。
但是這種 Lambda 架構需要同時運營兩個不同的 pipeline,並且額外資源消耗也大幅增多,運營的人力和資源成本都大幅提高。
並且我們對這兩個 pipeline 都需要做驗證。尤其是當數據來源於非結構數據的數據源,數據不是特別乾淨和一致。
對於驗證發現的錯誤,我們又不希望將 Pipeline 給宕下來,而是希望它自動去修復。那麼,一種解決方案就是避免對全表做修正,而是對某些分區重新處理。數據的重新處理一般都會影響你整個 pipeline 的延遲,而且還進一步增加硬件資源的負荷和 pipeline 的複雜度。
之後也許會有一些業務上的調整,或者是諸多原因,你可能想把數據湖做一些 update 和 merge。由於當前數據湖不支持 update 和 delete,那麼你可能需要自己實現 update 和 merge。我們發現不同用戶的實現方法都不太一樣,簡直就是各顯神通,這些方案不但容易出錯,複雜度和延遲也很高,而且大多數情況還不通用。
複雜歸複雜,但是經過了半年的研發,方案終於可以上線了,應該是一件開心的事情。可是這個 Lambda 架構上線之後你會收到無數的抱怨,比如說你這個數據加載太慢了,我們做一些元數據操作的時候,其他並行的命令和查詢都沒辦法用,都被 block 了。不得不等這些大的數據加載,或者是元數據處理做完了才能再做別的事情。或者用戶做 update 改數據湖的時候會得到大量的報告說 FileNotFound。也許是你的文件地址被更新了,但是元數據緩衝沒有更新,找不到文件還需要 Refresh 緩存,但有時候客戶會抱怨說 Refresh 好像不管用,可是什麼時候管用呢?如果你用的 Object Store,分析到最後,可能發現是 Eventual Consistency 的問題,也許你不得不要過半小時之後才會見到這個文件……總之就是各種各樣的錯。
運維已經很不容易了,相煎何太急。這個 Lambda 架構費錢又費力,將大好的時光浪費到了解決系統的各種不足和侷限,而不是花時間去從數據抽取價值,真是得不償失。
但是我們再反過來看,最開始第一個方案實際上是很簡單,很優美。那它到底哪裡錯了?是什麼原因導致它最後變得這麼複雜?我們缺了什麼?如何可以簡化來產生一個簡單易維護的架構?
這裡我們列出了五點原因:
1)第一,要支持同時讀寫,就意味著你寫的時候還可以讀,不應該讀到一個錯誤的結果。同時還可以支持多個寫,且能保證數據的一致性;
2)第二,可以高吞吐地從大表讀取數據。大數據方案不能有諸多限制,比如,我聽說有些方案裡最多隻可以支持幾個併發讀,或者讀的文件太多了就不讓你提交作業了。如果這樣,對業務方來說,你的整個設計是不滿足他的需求的;
3)第三,錯誤是無可避免,你要可以支持回滾,可以重做,或者可以刪改這個結果,不能為了支持刪改而要求業務方去做業務邏輯的調整;
4)第四,在重新改變業務邏輯的時候要對數據做重新處理,這個時候,業務是不能下線的。在數據被重新處理完成之前,數據湖的數據是要一直可被訪問的;
5)第五,因為有諸多原因,數據可能會有晚到的情況,你要能處理遲到數據而不推遲下階段的數據處理。
基於以上五點,我們基於 Delta Lake 和 Structured Streaming 產生了一個新的架構,叫 Delta 架構,它是對 Lambda 架構的一種顛覆,或者稱為一種提升。
在 Delta 架構下,批流是合併的,並且要持續的進行數據處理,按需來重新處理歷史數據,並且利用公有或私有云的特性來對計算或者存儲資源按需分別做彈性擴展。
Delta Lake 的基本原理
Delta Lake 的基本原理其實很簡單,簡單得令人髮指。作為一個普通的 Partquet 一般就是 Partition Directories 再加一些 Data Files。Delta Lake 也是基於這個結構的,唯一的區別就是它有一個 Transaction Log 記錄你的 Table Version 和變更歷史。
現在,讓我們來重新看待什麼構成了一張表。表實際上是一堆操作的結果,比如說改變元數據,改變名字,改變 Schema,增加或刪除一些 Partitioning,還有另外一種操作是添加或者移除文件。所有表的當前狀態或者是結果,都是這一系列 Action 產生的結果。這個結果包含了當前的 元數據,文件列表,transaction 的歷史,還有版本信息。
那怎麼去實現這個原子性?也很簡單,只要保證 Commit File 的順序和原子性就可以了。
比如說表的第一個版本,它是增加兩個文件,第二個版本就是把這兩個文件刪掉,增加一個新的文件,作為 Reader 來說,每次只能看到當前已經 Commit 的結果。
怎麼實現多個寫入的併發?Spark 的 Pipeline 一般都是高併發讀,低併發寫。在這種情況下,樂觀併發就更加合適了。它實際上很簡單,就說你多個用戶讀的時候,先記錄一下當前讀用的 data 版本是什麼,如果同時有兩個人都在 commit,只有一方可以成功,而另一方就需要去看一下成功方之前的 commit 裡有沒有碰他讀的文件。如果沒有改,他就改一下文件名就行了,如果改了,那就得重做。這個可以是 Delta Lake 自動去重試,也可以是事務提交方 / 業務方,去重做。
Delta Lake 需要解決的另一個經典問題就是大規模元數據的處理。你發現你有大量的 commit log file,因為每次 commit 都會產生一個文件,這其實也是一個經典的小文件處理。如何解決這種元數據處理?標準答案就是使用 Spark。Delta Lake 便是使用 Spark 去處理它的元數據。比如剛才說了一個裡子,加了兩個文件,減了兩個文件,之後加了一個 parquet,之後 Spark 會把這些 commit 全部讀下來,產生一個新的,我們稱之為叫 Checkpoint。
這就是 Delta Lake,就是這麼簡單。
Delta 架構
Delta 架構簡介
我們看一下 Delta 架構 ,怎麼用 Delta 架構代替經典的 Lambda 架構。
1)第一,同時讀寫,並且要保證數據的一致性
就是剛才我們提出的第一個需求,就是要支持 transcation,就是說你只要能實現讀寫之間的 Snapshot isolation 就行了,這樣你可以集中在你的 data flow,而不用擔心會不會讀到部分結果,不用擔心 FileNotFound 的這類錯誤,這些事情 Delta Lake 都可以幫你處理。
Delta Lake 提供了流,就是 streaming 和 batch 的讀入和寫入,標準 API,很容易實現,很容易去用。你可以在文檔裡面找到具體的 API。
2)可以高吞吐從大表讀取數據
可能處理過大數據的同學們就遇到過這個經典痛點,我也處理過客戶的這種問題好多次,在 沒有 Delta Lake 的時候,簡直痛不欲生。
如果沒有 Delta Lake,讀取百萬級的 patition 的 location path 是需要用 Hive metastore 一行行地讀的,要取一百萬行簡直是奇慢無比。然後,在每個 patition 的 地址裡還需要通過文件系統 列裡面包含的所有文件。這在對象存儲的系統裡,這種操作也是又貴又慢。
其實這個問題不又是一個典型的大數據問題嗎?大數據系統都解決不了大數據問題,那不是貽笑大方?
當然,這裡的解決方案很簡單,就是標準的 Spark,用 parquet 去存 file path,並且用 Spark 的分佈式的向量化的讀入去讀,這就是 Delta Lake 怎麼去解決之前的痛點。我們客戶因為這個性能輕鬆地提高了幾百倍甚至幾千倍。其實也就是因為 Hive metastore 和文件系統的 list file 操作實在太慢了。
3)支持回滾和刪改
數據這麼髒,回滾和刪改需求難以避免。Delta Lake 提供了 Time travel,因為 transaction log 實際能看到整個歷史變化的結果,所以 Delta Lake 實現這個很方便。我們提供了兩條 API,你可以基於 Timestamp 去做, 也可以基於 version number。Time travel 是一個特別好的功能,它可以做很多事情,不單單是糾錯,你還可以 Debug,重建過往報告,查賬,審計,複雜的 temporal query,對快速更新數據的表做版本查詢……
Delta Lake 還支持刪改(update/delete/merge),不過目前 Delta 還沒有自己的 SQL 語法,當然我們可以把 Spark 的語法完全複製過來,但是維護成本也很高。但 Spark 3.0 來了之後這個問題就迎刃而解了。當然,如果要支持 Spark 2.4 的話,Delta 需要加上自己的 SQL parser,我們還在討論要不要這樣幹。
4)在線業務不下線的同時可以重新處理歷史數據
你只要對 Delta Lake 做相關結果的刪除,重新改一下業務邏輯,歷史數據再做批處理,你就可以得到你的最新結果了。與此同時,因為 Delta Lake 支持 ACID,數據的下游適用方還可以同時訪問之前版本的數據。
5)處理遲到數據而無需推遲下階段的數據處理
處理遲到數據也不是什麼問題,只要你能支持 merge,如果存在就 update,不存在就 insert,不影響你現有的 Delta Lake 重寫。
如上所述, Delta Lake 完美解決了我們的需求,讓大家的 Data pipeline 重新變得簡單而優雅,而不需要用那麼複雜的 Lambda 架構了。
怎麼最好地使用 Delta 架構 ?基於跟客戶的各種的討論經驗,我們總結出了下面幾點。
你需要有多個 stage 的 Delta Lake。我們的基本 idea 是這樣的:第一個 stage 就是你要保證沒有原始數據損失。它保存在 Delta Lake 裡,萬一哪天發現之前的一些數據清理導致丟失了很重要的信息,你還可以輕鬆恢復。第二個 stage 就是做數據清理,做一些清理、轉換、filter。然後才真正達到一個可以被數據分析的第三個 stage。這是基於數據質量分成多個級別,多個狀態。至於實際生產線上需要多少個 stage,這個取決於業務的複雜度,SLA,和對延遲的要求。
Delta 架構的特性
來看一下 Delta 架構的特性。
1)持續數據流
這聽起來好像很高大上,但實際上稍微解釋多一點就很容易明白。
批流合併。Streaming 和 batch 用同一個 engine,不用維護多個;同一套 API,甚至都不用 batch 的 API,就用 streaming 的 API 就能解決問題;同樣的 user code,無需用到 Lambda 架構,純粹就是一條 pipeline 解決所有問題。高效增量數據載入。如果不斷有新數據進來就直接用 Structured Streaming 的 Trigger.Once 去記錄上一次你處理到哪,你只需要重啟這個 Trigger.Once,就處理了上次之後的新數據, 特別方便。快速無延遲的流處理,你可以選擇不同的 Trigger 的模式,當然 Trigger.Once 最省錢,當然你也可以低延遲,比如多長時間 Trigger 一次,也可以低延遲用持續 Trigger。你可以把批處理變成一個持續流處理,簡單易用。而且 Delta Lake 因為支持原子性,所以它能保證 exactly once,這一點很重要,其他的數據源基本沒辦法保證。
2)物化中間結果
這一點就有點顛覆傳統模式了。我們建議多次物化你的中間結果,也就是之前說的多個 stage。每個 stage 就是把中間結果落地存在文件裡,它有以下好處。
容錯恢復,出問題後可以回到某一個版本,從那個時候再開始,你不需要從最原始的數據開始,這點在 pipeline 裡是很重要的事情。方便故障排查,你知道哪一步出錯了,要是不存,業務方報告出錯的時候你也不知道問題出在哪兒,連 debug 都沒法 debug,回溯都沒辦法回溯。一寫多讀,當你的 pipeline 很多很複雜的時候,可能重用中間的一些結果,這真的很方便。這裡面比如說圖裡的兩個 pipeline ,其實到 T3 之前,都是一樣的。我們就可以複用。
如果你的轉換很複雜的時候,可以物化多次。到底物化多少次,取決於你對 Reliability/SLA 和 end-2-end latency 的取捨,你要是 Reliability/SLA 好,你就必須要物化多幾次,但是寫肯定有代價,所以 end-2-end latency 就慢,具體就要看你的需求了。
3)費用和延遲的取捨
流處理,持續的數據流入和處理,無需作業調度管理,需要永遠在線的 cluster。頻繁的批處理,分鐘級數據流入和處理,不需要低延遲,比如半小時就可以了,需要 warm pool of machine,無事關機,按需啟動。可使用 Spark structured streaming 的 Trigger.Once 模式。非頻繁批處理,若干小時或若干天的數據批流入和處理,無事關機,按需啟動,也可使用 structured streaming 的 Trigger.Once 模式。這樣一來,就可以節省很多資源了。
4)優化數據的物理存儲
根據常用查詢的 predicate,為改善讀取速度,可優化數據的物理存儲。比如,用 partitioning 和 z-ordering。Partitioning 大家都應該很清楚了,low cardinality 的 column 比較合適,就是每個 partition 不要超過 1 GB,一般比如說用 date 這是一種經常被使用的 partition column,每個 date 裡面要給予不同的 eventType。這樣,每個 partition 不會太大,也不會產生太多 partition。反之如果用 timestamp 做 partition column,產生的 partition value 就是無數個,簡直奇葩無比,可以輕鬆把 Hive metastore 給撐爆。在 Delta Lake 裡面我們也不建議,即使我們不用 metastore。第二就是 Z-Ordering,這個還沒到開源的版本,但是這個是可以解決什麼問題呢,就是是針對那種 high cardinality,就是 column 裡有大量的不一樣的 value,這種就適合做 z-ordering index。
5)重新處理歷史數據
每次 keep 住上一個 stage 的好處是什麼?你把結果一刪,重新用 Tigger.Once 再做一次就好了,結果就出來了。如果你係統部署在雲上,那對你來說也很簡單,你如果要快速回填,你就再多加幾臺機器,結果就更快地出來了。比如,從原來的十臺機器擴張到一百臺。
6) 數據質量的調整
這是也是一個需要改變大家思維方式的地方。
在最開始的時候,我們最好是保證數據完整性。schema 可以選擇自動合併,就可以避免數據的丟失。到了最後階段,我們就需要去強制 schema 不能變,data type 不能變,data expectation 也不能。比如,不能有 NULL。數據質量對於數據分析的準確度是至關重要的。
以上特性也不是很難理解,但是需要改變思維方式。
Delta 架構的優點
1)減少端到端的 pipeline SLA 多個使用單位(客戶)把 data pipeline 的 SLA 從幾小時減少到幾分鐘。
2)減少 pipeline 的維護成本原來的 Lambda 架構簡直就是費時費力。要同樣達到分鐘級的用例延遲,Delta Lake 架構並不需要這麼複雜。
3)更容易的處理數據更新和刪除簡化了 Change data capture,GDPR,Sessionization,數據去冗。這些都可以用 Delta Lake 去實現,方便很多。
4)通過計算和存儲的分離和可彈縮而降低了 infrastructure 的費用多個使用單位將 infrastructure 的費用降低了超過十倍。
Delta 架構的經典案例
這裡分享 3 個 Delta 架構的經典方案。
第一個是 COMCAST,一個像中國移動的通訊類公司,它收集了美國海量的用戶數據。它的 Petabyte-scale jobs 使用 Delta Lake ,從原來需要 640 個服務器降到 64 個,原來是 84 個 job 降低到 34 個 job,延遲還降了一半。
第二個是 Sam‘s Club,它們也是使用 Delta Lake ,原來根本達不到數據的一致性,現在可以達到。延遲從一個小時降到六秒。
第三個就是澳洲的 healthdirect,數據更乾淨\更一致了,做數據分析匹配的準確度從 80% 升到 95%,數據加載的時耗從一天降到了 20 分鐘。
這都是來自於 Delta Lake 用戶在 Spark Summit 上分享的案例。
使用 Delta Lake 特別簡單,就把 parquet 的 keywords 一換。
怎麼加這個 Delta Lake 呢,把這個 package 加上就好了。具體方法見 demo 演示:Delta Lake Primer( https://dbricks.co/dlw-01)。
Delta 社區
Delta Lake 迭代挺快的,我們內部實際上還有大量的 feature,只是還沒有開源,我們會逐步開源並且加強研發。
阿里巴巴的團隊在幫 Delta Lake 做讓 Hive 可以讀 Delta Lake 裡面的數據。
Delta Lake 的社區發展也真的很快,從 2019 年 4 月份開源,目前已有 3700 個客戶,maven 的下載量快兩萬,我們自己客戶使用量已經達到超過 2 exabyte 的讀和寫。