Flink中parallelism並行度和slot槽位的理解

Flink應用程序在一個分佈式環境中並行執行。當一個數據流程序提交到作業管理器(JobManager)執行時,系統將會創建一個數據流圖,然後準備執行需要的操作符。每一個操作符將會並行化到一個或者多個任務中去。每個算子的並行任務都會處理這個算子的輸入流中的一份子集。一個算子並行任務的個數叫做算子的並行度。它決定了算子執行的並行化程度,以及這個算子能處理多少數據量。


算子的並行度可以在執行環境這個層級來控制,也可以針對每個不同的算子設置不同的並行度。默認情況下,應用程序中所有算子的並行度都將設置為執行環境的並行度。執行環境的並行度(也就是所有算子的默認並行度)將在程序開始運行時自動初始化。如果應用程序在本地執行環境中運行,並行度將被設置為CPU的核數。當我們把應用程序提交到一個處於運行中的Flink集群時,執行環境的並行度將被設置為集群默認的並行度,除非我們在客戶端提交應用程序時顯式的設置好並行度。


parallelism指的是並行度的意思。在 Flink 框架中代表每個任務的並行度,適當的提高並行度可以大大提高 job 的執行效率。

slot指的是任務槽位的意思,flink中任務的並行性由每個 Task Manager 上可用的 slot 決定。

並行度是動態概念,任務槽數量是靜態概念。並行度<=任務槽數量。一個任務槽最多運行一個並行度。


一、如何設置flink job的parallelism

(1)在flink的配置文件中flink-conf.yaml,默認的並行度為1;

parallelism.default: 1


(2)在通過shell方式提交flink job的時候,可以使用-p指定程序的並行度;

./bin/flink run -p 10 ../word-count.jar


(3)在flink job程序內設置並行度;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(10);


(4)每個算子指定並行度;

Flink中parallelism並行度和slot槽位的理解

data.keyBy(new xxxKey())

.flatMap(new xxxFlatMapFunction()).setParallelism(2)

.map(new xxxMapFunction).setParallelism(2)

.addSink(new xxxSink()).setParallelism(1)

上面每個算子設置的並行度優先級要高於前面 env設置的並行度,然後才是配置文件中默認並行度。


二、如何理解flink中的slot?

slot 是指 taskmanager 的併發執行能力。

flink-conf.yaml中默認taskmanager.numberOfTaskSlots=1


下面,我們設置taskmanager.numberOfTaskSlots=3;即每一個 taskmanager 中分配 3 個 TaskSlot, 3 個 taskmanager 一共有 9 個 TaskSlot。

Flink中parallelism並行度和slot槽位的理解


Flink中parallelism並行度和slot槽位的理解


三、調整parallelism並行度設置

parallelism 是指 taskmanager 實際使用的併發能力。

parallelism.default=1;即運行程序默認的並行度為 1,9 個 TaskSlot 只用了 1 個,有 8 個空閒。設置合適的並行度才能提高效率。

Flink中parallelism並行度和slot槽位的理解

parallelism 是可配置;下面我們調整parallelism 並行度。

圖中 example2 每個算子設置的並行度是 2, example3 每個算子設置的並行度是 9,所有slot槽位都佔用了。

Flink中parallelism並行度和slot槽位的理解

example4 除了 sink 是設置的並行度為 1,其他算子設置的並行度都是 9。數據從其他TaskManagers上的所有槽位流到這個Sink接收器上。

Flink中parallelism並行度和slot槽位的理解


Flink中parallelism並行度和slot槽位的理解


分享到:


相關文章: