Apache Flink 零基礎入門(三):DataStream API 編程

整理:高贇

前面已經為大家介紹了 Flink 的基本概念以及安裝部署的過程,從而希望能夠幫助讀者建立起對 Flink 的初步印象。本次課程開始,我們將進入第二部分,即 Flink 實際開發的相關內容。本次課程將首先介紹 Flink 開發中比較核心的 DataStream API 。我們首先將回顧分佈式流處理的一些基本概念,這些概念對於理解實際的 DataStream API 有非常大的作用。然後,我們將詳細介紹 DataStream API 的設計,最後我們將通過一個例子來演示 DataStream API 的使用。

1. 流處理基本概念

對於什麼是流處理,從不同的角度有不同的定義。其實流處理與批處理這兩個概念是對立統一的,它們的關係有點類似於對於 Java 中的 ArrayList 中的元素,是直接看作一個有限數據集並用下標去訪問,還是用迭代器去訪問。

Apache Flink 零基礎入門(三):DataStream API 編程

圖1. 左圖硬幣分類器。硬幣分類器也可以看作一個流處理系統,用於硬幣分類的各部分組件提前串聯在一起,硬幣不斷進入系統,並最終被輸出到不同的隊列中供後續使用。右圖同理。

流處理系統本身有很多自己的特點。一般來說,由於需要支持無限數據集的處理,流處理系統一般採用一種數據驅動的處理方式。它會提前設置一些算子,然後等到數據到達後對數據進行處理。為了表達複雜的計算邏輯,包括 Flink 在內的分佈式流處理引擎一般採用 DAG 圖來表示整個計算邏輯,其中 DAG 圖中的每一個點就代表一個基本的邏輯單元,也就是前面說的算子。由於計算邏輯被組織成有向圖,數據會按照邊的方向,從一些特殊的 Source 節點流入系統,然後通過網絡傳輸、本地傳輸等不同的數據傳輸方式在算子之間進行發送和處理,最後會通過另外一些特殊的 Sink 節點將計算結果發送到某個外部系統或數據庫中。

Apache Flink 零基礎入門(三):DataStream API 編程

圖2. 一個 DAG 計算邏輯圖與實際的物理時模型。邏輯圖中的每個算子在物理圖中可能有多個併發。

對於實際的分佈式流處理引擎,它們的實際運行時物理模型要更復雜一些,這是由於每個算子都可能有多個實例。如圖 2 所示,作為 Source 的 A 算子有兩個實例,中間算子 C 也有兩個實例。在邏輯模型中,A 和 B 是 C 的上游節點,而在對應的物理邏輯中,C 的所有實例和 A、B 的所有實例之間可能都存在數據交換。在物理模型中,我們會根據計算邏輯,採用系統自動優化或人為指定的方式將計算工作分佈到不同的實例中。只有當算子實例分佈到不同進程上時,才需要通過網絡進行數據傳輸,而同一進程中的多個實例之間的數據傳輸通常是不需要通過網絡的。

表1. Apache Storm 構造 DAG 計算圖。Apache Storm 的接口定義更加“面向操作”,因此更加底層。

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

表2. Apache Flink 構造 DAG 計算圖。Apache Flink 的接口定義更加“面向數據”,因此更加高層。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

DataStream<string> text = env.readTextFile ("input");
DataStream<tuple2>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
counts.writeAsText("output");
/<tuple2>/<string>

由於流處理的計算邏輯是通過 DAG 圖來表示的,因此它們的大部分 API 都是圍繞構建這種計算邏輯圖來設計的。例如,對於幾年前非常流行的 Apache Storm,它的 Word Count 的示例如表 1 所示。基於 Apache Storm 用戶需要在圖中添加 Spout 或 Bolt 這種算子,並指定算子之前的連接方式。這樣,在完成整個圖的構建之後,就可以將圖提交到遠程或本地集群運行。

與之對比,Apache Flink 的接口雖然也是在構建計算邏輯圖,但是 Flink 的 API 定義更加面向數據本身的處理邏輯,它把數據流抽象成為一個無限集,然後定義了一組集合上的操作,然後在底層自動構建相應的 DAG 圖。可以看出,Flink 的 API 要更“上層”一些。許多研究者在進行實驗時,可能會更喜歡自由度高的 Storm,因為它更容易保證實現預想的圖結構;而在工業界則更喜歡 Flink 這類高級 API,因為它使用更加簡單。

2. Flink DataStream API 概覽

基於前面對流處理的基本概念,本節將詳細介紹 Flink DataStream API 的使用方式。我們首先還是從一個簡單的例子開始看起。表3是一個流式 Word Count 的示例,雖然它只有 5 行代碼,但是它給出了基於 Flink DataStream API 開發程序的基本結構。

表3. 基於 Flink DataStream API 的 Word Count 示例.

//1、設置運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、配置數據源讀取數據
DataStream<string> text = env.readTextFile ("input");
//3、進行一系列轉換
DataStream<tuple2>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
//4、配置數據匯寫出數據
counts.writeAsText("output");
//5、提交執行
env.execute("Streaming WordCount");
/<tuple2>/<string>

為了實現流式 Word Count,我們首先要先獲得一個 StreamExecutionEnvironment 對象。它是我們構建圖過程中的上下文對象。基於這個對象,我們可以添加一些算子。對於流處理程度,我們一般需要首先創建一個數據源去接入數據。在這個例子中,我們使用了 Environment 對象中內置的讀取文件的數據源。這一步之後,我們拿到的是一個 DataStream 對象,它可以看作一個無限的數據集,可以在該集合上進行一序列的操作。例如,在 Word Count 例子中,我們首先將每一條記錄(即文件中的一行)分隔為單詞,這是通過 FlatMap 操作來實現的。調用 FlatMap 將會在底層的 DAG 圖中添加一個 FlatMap 算子。然後,我們得到了一個記錄是單詞的流。我們將流中的單詞進行分組(keyBy),然後累積計算每一個單詞的數據(sum(1))。計算出的單詞的數據組成了一個新的流,我們將它寫入到輸出文件中。

最後,我們需要調用 env#execute 方法來開始程序的執行。需要強調的是,前面我們調用的所有方法,都不是在實際處理數據,而是在構通表達計算邏輯的 DAG 圖。只有當我們將整個圖構建完成並顯式的調用 Execute 方法後,框架才會把計算圖提供到集群中,接入數據並執行實際的邏輯。

基於流式 Word Count 的例子可以看出,基於 Flink 的 DataStream API 來編寫流處理程序一般需要三步:通過 Source 接入數據、進行一系統列的處理以及將數據寫出。最後,不要忘記顯式調用 Execute 方式,否則前面編寫的邏輯並不會真正執行。

Apache Flink 零基礎入門(三):DataStream API 編程

圖3. Flink DataStream 操作概覽

從上面的例子中還可以看出,Flink DataStream API 的核心,就是代表流數據的 DataStream 對象。整個計算邏輯圖的構建就是圍繞調用 DataStream 對象上的不同操作產生新的 DataStream 對象展開的。整體來說,DataStream 上的操作可以分為四類。第一類是對於單條記錄的操作,比如篩除掉不符合要求的記錄(Filter 操作),或者將每條記錄都做一個轉換(Map 操作)。第二類是對多條記錄的操作。比如說統計一個小時內的訂單總成交量,就需要將一個小時內的所有訂單記錄的成交量加到一起。為了支持這種類型的操作,就得通過 Window 將需要的記錄關聯到一起進行處理。第三類是對多個流進行操作並轉換為單個流。例如,多個流可以通過 Union、Join 或 Connect 等操作合到一起。這些操作合併的邏輯不同,但是它們最終都會產生了一個新的統一的流,從而可以進行一些跨流的操作。最後, DataStream 還支持與合併對稱的操作,即把一個流按一定規則拆分為多個流(Split 操作),每個流是之前流的一個子集,這樣我們就可以對不同的流作不同的處理。

Apache Flink 零基礎入門(三):DataStream API 編程

圖4. 不同類型的 DataStream 子類型。不同的子類型支持不同的操作集合。

為了支持這些不同的流操作,Flink 引入了一組不同的流類型,用來表示某些操作的中間流數據集類型。完整的類型轉換關係如圖4所示。首先,對於一些針對單條記錄的操作,如 Map 等,操作的結果仍然是是基本的 DataStream 類型。然後,對於 Split 操作,它會首先產生一個 SplitStream,基於 SplitStream 可以使用 Select 方法來篩選出符合要求的記錄並再將得到一個基本的流。

類似的,對於 Connect 操作,在調用 streamA.connect(streamB)後可以得到一個專門的 ConnectedStream。ConnectedStream 支持的操作與普通的 DataStream 有所區別,由於它代表兩個不同的流混合的結果,因此它允許用戶對兩個流中的記錄分別指定不同的處理邏輯,然後它們的處理結果形成一個新的 DataStream 流。由於不同記錄的處理是在同一個算子中進行的,因此它們在處理時可以方便的共享一些狀態信息。上層的一些 Join 操作,在底層也是需要依賴於 Connect 操作來實現的。

另外,如前所述,我們可以通過 Window 操作對流可以按時間或者個數進行一些切分,從而將流切分成一個個較小的分組。具體的切分邏輯可以由用戶進行選擇。當一個分組中所有記錄都到達後,用戶可以拿到該分組中的所有記錄,從而可以進行一些遍歷或者累加操作。這樣,對每個分組的處理都可以得到一組輸出數據,這些輸出數據形成了一個新的基本流。

對於普通的 DataStream,我們必須使用 allWindow 操作,它代表對整個流進行統一的 Window 處理,因此是不能使用多個算子實例進行同時計算的。針對這一問題,就需要我們首先使用 KeyBy 方法對記錄按 Key 進行分組,然後才可以並行的對不同 Key 對應的記錄進行單獨的 Window 操作。KeyBy 操作是我們日常編程中最重要的操作之一,下面我們會更詳細的介紹。

Apache Flink 零基礎入門(三):DataStream API 編程

圖5. 基本流上的 Window 操作與 KeyedStream 上的 Window 操對比。KeyedStream 上的 Window 操作使採用多個實例併發處理成為了可能。

基本 DataStream 對象上的 allWindow 與 KeyedStream 上的 Window 操作的對比如圖5所示。為了能夠在多個併發實例上並行的對數據進行處理,我們需要通過 KeyBy 將數據進行分組。KeyBy 和 Window 操作都是對數據進行分組,但是 KeyBy 是在水平分向對流進行切分,而 Window 是在垂直方式對流進行切分。

使用 KeyBy 進行數據切分之後,後續算子的每一個實例可以只處理特定 Key 集合對應的數據。除了處理本身外,Flink 中允許算子維護一部分狀態(State),在KeyedStream 算子的狀態也是可以分佈式存儲的。由於 KeyBy 是一種確定的數據分配方式(下文將介紹其它分配方式),因此即使發生 Failover 作業重啟,甚至發生了併發度的改變,Flink 都可以重新分配 Key 分組並保證處理某個 Key 的分組一定包含該 Key 的狀態,從而保證一致性。

最後需要強調的是,KeyBy 操作只有當 Key 的數量超過算子的併發實例數才可以較好的工作。由於同一個 Key 對應的所有數據都會發送到同一個實例上,因此如果Key 的數量比實例數量少時,就會導致部分實例收不到數據,從而導致計算能力不能充分發揮。

3. 其它問題

除 KeyBy 之外,Flink 在算子之前交換數據時還支持其它的物理分組方式。如圖 1 所示,Flink DataStream 中物理分組方式包括:

  • Global: 上游算子將所有記錄發送給下游算子的第一個實例。
  • Broadcast: 上游算子將每一條記錄發送給下游算子的所有實例。
  • Forward:只適用於上游算子實例數與下游算子相同時,每個上游算子實例將記錄發送給下游算子對應的實例。
  • Shuffle:上游算子對每條記錄隨機選擇一個下游算子進行發送。
  • Rebalance:上游算子通過輪詢的方式發送數據。
  • Rescale:當上遊和下游算子的實例數為 n 或 m 時,如果 n < m,則每個上游實例向ceil(m/n)或floor(m/n)個下游實例輪詢發送數據;如果 n > m,則 floor(n/m) 或 ceil(n/m) 個上游實例向下遊實例輪詢發送數據。
  • PartitionCustomer:當上述內置分配方式不滿足需求時,用戶還可以選擇自定義分組方式。
Apache Flink 零基礎入門(三):DataStream API 編程

圖6. 除keyBy外其它的物理分組方式。

除分組方式外,Flink DataStream API 中另一個重要概念就是類型系統。圖 7 所示,Flink DataStream 對像都是強類型的,每一個 DataStream 對象都需要指定元素的類型,Flink 自己底層的序列化機制正是依賴於這些信息對序列化等進行優化。具體來說,在 Flink 底層,它是使用 TypeInformation 對象對類型進行描述的,TypeInformation 對象定義了一組類型相關的信息供序列化框架使用。

Apache Flink 零基礎入門(三):DataStream API 編程

圖7. Flink DataStream API 中的類型系統

Flink 內置了一部分常用的基本類型,對於這些類型,Flink 也內置了它們的TypeInformation,用戶一般可以直接使用而不需要額外的聲明,Flink 自己可以通過類型推斷機制識別出相應的類型。但是也會有一些例外的情況,比如,Flink DataStream API 同時支持 Java 和 Scala,Scala API 許多接口是通過隱式的參數來傳遞類型信息的,所以如果需要通過 Java 調用 Scala 的 API,則需要把這些類型信息通過隱式參數傳遞過去。另一個例子是 Java 中對泛型存在類型擦除,如果流的類型本身是一個泛型的話,則可能在擦除之後無法推斷出類型信息,這時候也需要顯式的指定。

在 Flink 中,一般 Java 接口採用 Tuple 類型來組合多個字段,而 Scala 則更經常使用 Row 類型或 Case Class。相對於 Row,Tuple 類型存在兩個問題,一個是字段個數不能超過 25 個,此外,所有字段不允許有 null 值。最後,Flink 也支持用戶自定義新的類型和 TypeInformation,並通過 Kryo 來實現序列化,但是這種方式可帶來一些遷移等方面的問題,所以儘量不要使用自定義的類型。

4.示例

然後,我們再看一個更復雜的例子。假設我們有一個數據源,它監控系統中訂單的情況,當有新訂單時,它使用 Tuple2 輸出訂單中商品的類型和交易額。然後,我們希望實時統計每個類別的交易額,以及實時統計全部類別的交易額。

表4. 實時訂單統計示例。

public class GroupedProcessingTimeWindowSample {
private static class DataSource extends RichParallelSourceFunction<tuple2>> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<tuple2>> ctx) throws Exception {
Random random = new Random();
while (isRunning) {
Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5);
String key = "類別" + (char) ('A' + random.nextInt(3));
int value = random.nextInt(10) + 1;
System.out.println(String.format("Emits\\t(%s, %d)", key, value));
ctx.collect(new Tuple2<>(key, value));
}
}
@Override
public void cancel() {
isRunning = false;
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<tuple2>> ds = env.addSource(new DataSource());
KeyedStream<tuple2>, Tuple> keyedStream = ds.keyBy(0);
keyedStream.sum(1).keyBy(new KeySelector<tuple2>, Object>() {
@Override
public Object getKey(Tuple2<string> stringIntegerTuple2) throws Exception {
return "";
}
}).fold(new HashMap<string>(), new FoldFunction<tuple2>, HashMap<string>>() {
@Override
public HashMap<string> fold(HashMap<string> accumulator, Tuple2<string> value) throws Exception {
accumulator.put(value.f0, value.f1);
return accumulator;
}
}).addSink(new SinkFunction<hashmap>>() {
@Override
public void invoke(HashMap<string> value, Context context) throws Exception {
// 每個類型的商品成交量
System.out.println(value);
// 商品成交總量
System.out.println(value.values().stream().mapToInt(v -> v).sum());
}
});
env.execute();

}
}
/<string>/<hashmap>/<string>/<string>/<string>/<string>/<tuple2>/<string>/<string>/<tuple2>/<tuple2>/<tuple2>/<tuple2>/<tuple2>

示例的實現如表4所示。首先,在該實現中,我們首先實現了一個模擬的數據源,它繼承自 RichParallelSourceFunction,它是可以有多個實例的 SourceFunction 的接口。它有兩個方法需要實現,一個是 Run 方法,Flink 在運行時對 Source 會直接調用該方法,該方法需要不斷的輸出數據,從而形成初始的流。在 Run 方法的實現中,我們隨機的產生商品類別和交易量的記錄,然後通過 ctx#collect 方法進行發送。另一個方法是 Cancel 方法,當 Flink 需要 Cancel Source Task 的時候會調用該方法,我們使用一個 Volatile 類型的變量來標記和控制執行的狀態。

然後,我們在 Main 方法中就可以開始圖的構建。我們首先創建了一個 StreamExecutioniEnviroment 對象。創建對象調用的 getExecutionEnvironment 方法會自動判斷所處的環境,從而創建合適的對象。例如,如果我們在 IDE 中直接右鍵運行,則會創建 LocalStreamExecutionEnvironment 對象;如果是在一個實際的環境中,則會創建 RemoteStreamExecutionEnvironment 對象。

基於 Environment 對象,我們首先創建了一個 Source,從而得到初始的流。然後,為了統計每種類別的成交量,我們使用 KeyBy 按 Tuple 的第 1 個字段(即商品類型)對輸入流進行分組,並對每一個 Key 對應的記錄的第 2 個字段(即成交量)進行求合。在底層,Sum 算子內部會使用 State 來維護每個Key(即商品類型)對應的成交量之和。當有新記錄到達時,Sum 算子內部會更新所維護的成交量之和,並輸出一條記錄。

如果只統計各個類型的成交量,則程序可以到此為止,我們可以直接在 Sum 後添加一個 Sink 算子對不斷更新的各類型成交量進行輸出。但是,我們還需要統計所有類型的總成交量。為了做到這一點,我們需要將所有記錄輸出到同一個計算節點的實例上。我們可以通過 KeyBy 並且對所有記錄返回同一個 Key,將所有記錄分到同一個組中,從而可以全部發送到同一個實例上。

然後,我們使用 Fold 方法來在算子中維護每種類型商品的成交量。注意雖然目前 Fold 方法已經被標記為 Deprecated,但是在 DataStream API 中暫時還沒有能替代它的其它操作,所以我們仍然使用 Fold 方法。這一方法接收一個初始值,然後當後續流中每條記錄到達的時候,算子會調用所傳遞的 FoldFunction 對初始值進行更新,併發送更新後的值。我們使用一個 HashMap 來對各個類別的當前成交量進行維護,當有一條新的到達時,我們就更新該 HashMap。這樣在 Sink 中,我們收到的是最新的商品類別和成交量的 HashMap,我們可以依賴這個值來輸出各個商品的成交量和總的成交量。

需要指出的是,這個例子主要是用來演示 DataStream API 的用法,實際上還會有更高效的寫法,此外,更上層的 Table / SQL 還支持 Retraction 機制,可以更好的處理這種情況。

Apache Flink 零基礎入門(三):DataStream API 編程

圖8. API 原理圖

最後,我們對 DataStream API 的原理進行簡要的介紹。當我們調用 DataStream#map 算法時,Flink 在底層會創建一個 Transformation 對象,這一對象就代表我們計算邏輯圖中的節點。它其中就記錄了我們傳入的 MapFunction,也就是 UDF(User Define Function)。隨著我們調用更多的方法,我們創建了更多的 DataStream 對象,每個對象在內部都有一個 Transformation 對象,這些對象根據計算依賴關係組成一個圖結構,就是我們的計算圖。後續 Flink 將對這個圖結構進行進一步的轉換,從而最終生成提交作業所需要的 JobGraph。

5. 總結

本文主要介紹了 Flink DataStream API,它是當前 Flink 中比較底層的一套 API。在實際的開發中,基於該 API 需要用戶自己處理 State 與 Time 等一些概念,因此需要較大的工作量。後續課程還會介紹更上層的 Table / SQL 層的 API,未來 Table / SQL 可能會成為 Flink 主流的 API,但是對於接口來說,越底層的接口表達能力越強,在一些需要精細操作的情況下,仍然需要依賴於 DataStream API。


分享到:


相關文章: