Flink應用程序在一個分佈式環境中並行執行。當一個數據流程序提交到作業管理器(JobManager)執行時,系統將會創建一個數據流圖,然後準備執行需要的操作符。每一個操作符將會並行化到一個或者多個任務中去。每個算子的並行任務都會處理這個算子的輸入流中的一份子集。一個算子並行任務的個數叫做算子的並行度。它決定了算子執行的並行化程度,以及這個算子能處理多少數據量。
算子的並行度可以在執行環境這個層級來控制,也可以針對每個不同的算子設置不同的並行度。默認情況下,應用程序中所有算子的並行度都將設置為執行環境的並行度。執行環境的並行度(也就是所有算子的默認並行度)將在程序開始運行時自動初始化。如果應用程序在本地執行環境中運行,並行度將被設置為CPU的核數。當我們把應用程序提交到一個處於運行中的Flink集群時,執行環境的並行度將被設置為集群默認的並行度,除非我們在客戶端提交應用程序時顯式的設置好並行度。
parallelism指的是並行度的意思。在 Flink 框架中代表每個任務的並行度,適當的提高並行度可以大大提高 job 的執行效率。
slot指的是任務槽位的意思,flink中任務的並行性由每個 Task Manager 上可用的 slot 決定。
並行度是動態概念,任務槽數量是靜態概念。並行度<=任務槽數量。一個任務槽最多運行一個並行度。
一、如何設置flink job的parallelism
(1)在flink的配置文件中flink-conf.yaml,默認的並行度為1;
parallelism.default: 1
(2)在通過shell方式提交flink job的時候,可以使用-p指定程序的並行度;
./bin/flink run -p 10 ../word-count.jar
(3)在flink job程序內設置並行度;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
(4)每個算子指定並行度;
data.keyBy(new xxxKey())
.flatMap(new xxxFlatMapFunction()).setParallelism(2)
.map(new xxxMapFunction).setParallelism(2)
.addSink(new xxxSink()).setParallelism(1)
上面每個算子設置的並行度優先級要高於前面 env設置的並行度,然後才是配置文件中默認並行度。
二、如何理解flink中的slot?
slot 是指 taskmanager 的併發執行能力。
flink-conf.yaml中默認taskmanager.numberOfTaskSlots=1
下面,我們設置taskmanager.numberOfTaskSlots=3;即每一個 taskmanager 中分配 3 個 TaskSlot, 3 個 taskmanager 一共有 9 個 TaskSlot。
三、調整parallelism並行度設置
parallelism 是指 taskmanager 實際使用的併發能力。
parallelism.default=1;即運行程序默認的並行度為 1,9 個 TaskSlot 只用了 1 個,有 8 個空閒。設置合適的並行度才能提高效率。
parallelism 是可配置;下面我們調整parallelism 並行度。
圖中 example2 每個算子設置的並行度是 2, example3 每個算子設置的並行度是 9,所有slot槽位都佔用了。
example4 除了 sink 是設置的並行度為 1,其他算子設置的並行度都是 9。數據從其他TaskManagers上的所有槽位流到這個Sink接收器上。