文章整理:加米穀大數據
作為現代企業的重要工具,流處理和實時分析這類工具逐漸興起,越來越多的企業以 Apache Flink 為核心構建平臺,並將其作為服務在內部提供。在最新舉辦的 Flink Forward 會議中, Uber 、 Netflix 和 阿里巴巴等公司的許多相關主題演講進一步說明了這一趨勢。
這些平臺旨在通過減輕最終用戶的所有運營負擔來簡化內部的 Application (應用)提交。為了提交 Flink 應用程序,這些平臺通常只公開一個集中式或低並行度端點(例如 Web 前端)用於應用提交,我們將調用 Deployer(部署器)。
平臺開發人員和維護人員經常提到的障礙之一是,Deployer 可能是一個很難配置的大量資源消耗者。如果按照平均負載進行配置,可能會導致 Deployer 服務被部署請求淹沒(在最壞的情況下,短時間內對所有生產應用程序都是如此),而按照最高負載進行規劃的話,又會帶來不必要的成本。根據這一觀察結果,Flink 1.11 引入了 Application 模式(應用模式)作為部署選項,它允許一個輕量級、更可伸縮性的應用提交過程,從而使應用程序部署負載更均勻地分佈在集群的各個節點上。
為了理解這個問題以及瞭解 Application 模式如何解決該問題,我們首先簡要概述 Flink 中應用程序執行的當前狀態,然後再闡述部署模式引入的架構變化以及如何利用它們。
Flink 中的應用程序執行
在 Flink 中執行應用程序主要涉及三個實體:Client(客戶端)、JobManager(作業管理器)和 TaskManager(任務管理器)。Client 負責將應用提交給集群,JobManager 負責執行期間必要的 bookkeeping,而 TaskManager 則負責實際的計算。更多細節請參考 Flink 的架構文檔。
當前部署模式
在 1.11 版本中引入 Application 模式之前,Flink 允許用戶在 Session(會話)或 Per-Job 集群上執行應用程序。兩者之間的差異與集群生命週期和它們提供的資源隔離保證有關。
Session 模式
Session 模式(會話模式)假定集群已經運行,並使用該集群的資源來執行任何提交的應用程序。在同一(Session)集群中執行的應用程序使用相同的資源,並因此相互競爭。這樣做的好處是,你無需為每個提交的作業分配整個集群的資源開銷。但是,如果其中一個作業行為不正常或者關閉了 TaskManager,那麼在該 TaskManager 上運行的所有作業都將受到故障的影響。除了對導致故障的作業產生負面影響之外,這還意味著潛在的大規模恢復過程,即所有重新啟動的作業同時訪問文件系統,並使其不可用於其他服務。此外,單個集群運行多個作業意味著 JobManager 的負載更大,它負責集群中所有作業的 bookkeeping。這種模式非常適合啟動短作業,例如交互式查詢。
Per-Job 模式
在 Per-Job 模式中,可用的集群管理器框架(如 YARN 或 Kubernetes)用於為每個提交的作業啟動 Flink 集群,該集群僅對該作業可用。當作業完成後,集群將關閉,並清理所有延遲的資源(例如文件)。這種模式允許更好的資源隔離,因為行為不正常的作業不會影響任何其他作業。另外,由於每個應用程序都有自己的 JobManager,因此它將 bookkeeping 負載分散到多個實體。考慮到前面提到的 Session 模式中的資源隔離問題,對於長時間運行的作業,用戶經常選擇 Per-Job 模式,因為這些作業願意接受一定程度的啟動延遲的增加,以支持彈性。
總之,在 Session 模式中,集群生命週期獨立於集群中運行的任何作業,並且集群中運行的所有作業共享其資源。Per-Job 模式選擇為每個提交的作業分配一個集群,已提供更好的資源隔離保證,因為資源不會在作業之間共享。在這種情況下,集群的生命週期與作業的生命週期相關。
Application 提交
Flink 應用程序的執行包括兩個階段: pre-flight :即當用戶的 main() 方法被調用時; runtime :即用戶代碼調用 execute() 時立即觸發。 main() 方法使用 Flink 的 API(DataStream API、Table API、DataSet API)之一構造用戶程序。當 main() 方法調用 env.execute() 時,用戶定義的管道將被轉換成一種 Flink 運行時可以理解的形式,稱為 Job Graph(作業圖),並將其傳遞給集群。
儘管它們方法有所不同,Session 模式和 Per-Job 模式都會在 Client 執行應用程序的 main() 方法,即 pre-flight 階段。
對於已經在本地擁有作業的所有依賴項,然後通過在其機器上運行的 Client 提交其應用程序的單個用戶來說,這通常不是問題。但是,對於通過遠程實體(如 Deployer)提交的情況下,這個過程包括:
本地下載應用程序的依賴項;執行 main() 方法提取 Job Graph;將 Job Graph 及其依賴項發送到集群以便執行;等待結果。這使得 Client 消耗了大量的資源,因為它可能需要大量的網絡帶寬來下載依賴項或將二進制文件發送到集群,並且需要 CPU 週期來執行 main() 方法。隨著越來越多的用戶共享同一個 Client,這個問題甚至會變得更加突出。
上圖展示了使用紅色、藍色和綠色表示的三個應用程序的兩種部署模式。每個矩形都有三個並行項。黑色矩形表示不同的進程,分別是 TaskManager、JobManager 和 Deployer。我們假設在所有情況下只有一個 Deployer 進程。彩色三角形表示提交進程的負載,而彩色矩形表示 TaskManager 和 JobManager 進程的負載。如圖所示,Per-Job 模式和 Session 模式下的 Deployer 共享相同的負載。它們的不同之處在於任務的分配和 JobManager 負載。在 Session 模式下,集群中的所有作業都有一個 JobManager,而在 Per-Job 模式下,每個作業都有一個 JobManager。此外,Session 模式下的任務會隨機分配給 TaskManager,而在 Per-Job 模式下,每個 TaskManager 只能有單個作業任務。
Application 模式
Application 模式建立在上述觀察結果的基礎上,並嘗試將 Per-Job 模式的資源隔離與輕量級且可伸縮的應用提交過程結合起來。為實現這一點,它為每個提交的應用程序創建一個集群,但是這一次,應用程序的 main() 方法在 JobManager 上執行。
為每個應用程序創建一個集群可以看作是創建一個只在特定應用程序的作業之間共享的 Session 集群,並在應用程序結束時關閉。使用這種架構,Application 模式提供與 Per-Job 模式相同的資源隔離和負載平衡保證,但在整個應用程序的粒度上。這是有道理的,因為屬於同一應用程序的工作應該相互關聯,並被視為一個單元。
在 JobManager 上執行 main() 方法不僅可以節省提取 Job Graph 所需的 CPU 週期,也可以節省 Client 本地下載依賴項並將 Job Graph 及其依賴項發送到集群所需的帶寬。此外,由於每個應用程序只有一個 JobManager,因此,它可以更均勻地分散網絡負載。上圖對此進行了說明,其中我們具有與 “Session 和 Per-Job 部署模式” 部分中相同的場景,但是這一次 Client 負載已經轉移到了每個應用程序的 JobManager。
注:在 Application 模式下, main() 方法是在集群上執行的,而不是像在其他模式中那樣在 Client 上執行。和可能對代碼產生影響,例如,使用 regsiterCachedFile() 在環境中註冊的任何路徑都必須由應用程序的 JobManager 進行訪問。
與 Per-Job 模式相比,Application 模式允許提交由多個作業組成的應用程序。作業執行的順序不受部署模式的影響,而是受用於啟動作業的調用的影響。使用阻塞 execute() 方法建立一個順序,並將導致下一個作業的執行被延遲到“這個”作業完成為止。相反,一旦提交了當前作業,非阻塞 executeAsync() 方法將立即繼續提交“下一個”作業。
降低網絡需求
如上所述,通過在 JobManager 上執行應用程序的 main() 方法,Application 模式可以節省以前在提交作業時所需的大量資源。但仍有改進的餘地。
重點關注 YARN,它已經支持所有提到的 here 2,即使 Application 模式已經就緒,Client 仍然需要發送用戶 jar 到 JobManager。此外,對於每個應用程序,Client 必須將 “flink-dist” 目錄發送到集群,該目錄包含框架本身的二進制文件,包括 flink-dist.jar 、 lib/ 和 plugin/ 目錄。這兩者可能會在 Client 佔用大量帶寬。此外,在每次提交時發送相同的 flink-dist 二進制文件既是對帶寬的浪費,也是對存儲空間的浪費。只需允許應用程序共享相同的二進制文件即可減少存儲空間的浪費。
在 Flink 1.11 中,我們引入了醫學選項,允許用戶進行如下操作:
指定一個目錄的遠程路徑,在該目錄中,YARN 可以找到 Flink 分發二進制文件,以及指定一個遠程路徑,YARN 可以在其中找到用戶 jar。對於第一步,我們利用了 YARN 的分佈式緩存,並允許應用程序共享這些二進制文件。因此,如果一個應用程序碰巧在它的 TaskManager 的本地存儲中找到了 Flink 的副本,由於之前的一個應用程序在同一個 TaskManager 上執行,它甚至都不需要在內部下載它。
注:這兩種優化都可以用於 YARN 上的所有部署模式,而不僅僅是 Application 模式。
示例:YARN 上的 Application 模式
有關完整說明,請參閱 Flink 的官方文檔,更具體地說,請參閱引用集群管理框架的頁面,例如 YARN 或 Kubernetes 。接下來我將給出一些關於 YARN 的例子,其中上述所有功能都是可用的。
要以 Application 模式啟動用用程序,可以使用:
./bin/flink run-application -t yarn-application ./MyApplication.jar
使用這條命令,所有的配置參數,例如用於引導應用程序狀態的保存點的路徑,或者所需的 JobManager/TaskManager 內存大小,都可以通過它們的配置選項(以 -d 作為前綴)來指定。有關可用配置選項的目錄,請參閱 Flink 的 配置頁面。
例如,指定 JobManager 和 TaskManager 內存大小的命令如下所示:
./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
./MyApplication.jar
最後,為了進一步節省提交應用程序 jar 所需的帶寬,可以預先將其上傳到 HDFS,並指定指向 ./MyApplication.jar 的遠程路徑,如下所示:
./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dyarn.provided.lib.dirs="hdfs://myhdfs/remote-flink-dist-dir" \
hdfs://myhdfs/jars/MyApplication.jar
這將使作業提交變得更加輕量級,因為所需的 Flink jar 和應用程序 jar 將從指定的遠程位置提取,而不是由 Client 發送到集群。Client 將唯一提供給集群的是應用程序的配置,其中包括上述提到的所有路徑。
總結
我們希望本文的討論能夠幫助你理解 Flink 提供的各種部署模式之間的差異,並且能夠幫助你作出明智的決定,究竟哪一種模式適合你自己的設置。