Flink 基礎學習(二)搭建一個 "Hello World" 程序

在學習技術時,總會有一個簡單程序 Demo 帶著我們入門,所以參考著官網例子,帶大家快速熟悉 Flink 的 Hello World~

說明一下,項目運行的環境如下:

OS : Mac

Flink Version : 1.9

IDE : IDEA

Java Version : 1.8

下面來講下關於環境準備,如果是 Windows 的用戶,請參照每個步驟,找到適應自己的安裝 or 啟動方法。

1 環境準備

首先我們默認已經安裝了 Jdk 1.8 和編碼工具 IDEA,下面來講如何安裝 Flink 和建立腳手架。下面展示的項目代碼已經放入了 Github,可以下載進行本地運行

1.1 安裝 Flink

<code>$ brew install apache-flink
/<code>

檢查安裝是否成功以及版本號

<code>$ flink --version
Version: 1.9.0, Commit ID: 9c32ed9
/<code>

接著以單機集群模式啟動 Flink

<code>$ sh /usr/local/Cellar/apache-flink/1.9.0/libexec/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host yejingqideMBP-c510.
Starting taskexecutor daemon on host yejingqideMBP-c510.

/<code>

然後訪問 localhost:8081 監控界面(1.9 版本更新了 UI):


Flink 基礎學習(二)搭建一個


1.2 創建項目

這裡推薦的是使用 maven 進行構建,在命令行中輸入如下內容(# 號後面是說明,請不要輸入):

<code>$ mvn archetype:generate \\
-DarchetypeGroupId=org.apache.flink \\ # flink 的 group.id
-DarchetypeArtifactId=flink-quickstart-java \\ # flink 的 artifact.id
-DarchetypeVersion=1.9.0 \\ # flink 的 version,以上三個請不要修改,按照默認即可
-DgroupId=wiki-edits \\ # 項目的 group.id
-DartifactId=wiki-edits \\ # 項目的 artifact.id
-Dversion=0.1 \\ # 項目的 version.id
-Dpackage=wikiedits \\ # 項目的基礎包名
-DinteractiveMode=false # 是否需要和用戶交互以獲得輸入,由於上面已經自己寫了項目的參數,所以禁用了。反之請刪掉 上面項目的配置,將交互模式設為 true
/<code>

如果按照官方的例子填寫,那麼你將得到如下的項目結構:

<code>$ tree wiki-edits
wiki-edits/
├── pom.xml
└── src
└── main
├── java
│ └── wikiedits
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
/<code>

如果是自己自定義的,包結構會不一致,但是通過腳手架創立的,pom 文件中預置的依賴都將一致,引入了 Flink 基礎開發相關的 API,然後通過 IDEA 打開該項目目錄,就可以開始我們的 Hello world。

2 開始項目

首先交代一下待會的流程,編寫程序代碼,啟動 netcat 命令來監聽 9000 端口,啟動或提交 Flink 程序,最後監聽日誌輸出信息。

2.1 項目代碼

Demo 的代碼作用是監聽 netcat 輸入的字符,然後進行聚合操作,最後輸出字符統計


<code>public class SocketTextStreamWordCount {

public static void main(String[] args) throws Exception {
String hostName = "127.0.0.1";
int port = 9000;
// 設置運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 獲取數據源
DataStreamSource<string> stream = env.socketTextStream(hostName, port);
// 計數
SingleOutputStreamOperator<tuple2>> sum = stream
.flatMap((new LineSplitter()))
.keyBy(0)
.sum(1);
// 輸出
sum.print();
// 提交任務
env.execute("Java Word from SocketTextStream Example");

}

public static final class LineSplitter implements FlatMapFunction<string>> {

@Override
public void flatMap(String s, Collector<tuple2>> collector) throws Exception {
String[] tokens = s.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<string>(token, 1));
}
}
}
}
}
/<string>/<tuple2>/<string>/<tuple2>/<string>/<code>


Flink 基礎學習(二)搭建一個


簡單說明一下,上面出現了 SocketTextStream 套接字字符 數據源(Source),接著是 算子(Operator): FlatMap(一個輸入源,可以輸出零個或多個結果)、KeyBy(按照某字段或者 tuple 元組中某個下標進行分類) 和 sum(跟翻譯一樣,就是進行聚合彙總) ,最後輸出

2.2 開啟 tcp 長鏈接

為了模擬流數據,我們造的場景是不斷往 9000 端口輸入字符,Flink 程序添加的數據源是 SocketTextStream (套接字字符流)。

在你的終端中輸入以下命令

<code>$ nc -l 9000
/<code>

有關 netcat 命令的用法,請看參考資料第二條,這裡的作用就是打開 TCP 長鏈接,監聽 9000 端口

2.3 啟動 Flink 程序

剛才第一個步驟中,已經編輯好了程序代碼,第二個步驟也已經啟動了一個 TCP 客戶端,啟動 Flink 程序有兩種方法:

2.3.1 本地調試

使用 IDEA 的好處很多,代碼補全,語法檢查和快捷鍵之類的。我經常使用的調試方法就是添加一個 psvm 的 main 方法,在裡面寫執行代碼,最後點擊綠色的啟動按鈕~


Flink 基礎學習(二)搭建一個


如果不需要調試,想直接看結果,選擇第一個 Run,但有時不確定代碼執行過程和出錯的具體原因,可以通過第二個選項 Debug 進行調試。

這是本地開發經常使用的方法,進行結果的驗證。

2.3.2 提交到 JobManager

前面我們啟動的是單機集群版,啟動了一個 JobManager 和 TaskWorker,打開的 localhost:8081 就是 JobManager 的監控面板,所以我們要通過下面的方式,將 Flink 程序提交到 JobManager。

這裡教一個簡單的方法,我們通過 mvn clean package 進行打包後,可以在 IDEA 集成的終端標籤欄下提交我們的程序:


Flink 基礎學習(二)搭建一個


由於每個人的絕對路徑都不一樣,所以我們通過 IDEA 的終端,它會自動定位到項目的路徑,然後執行時填寫相對路徑的 jar 包名字即可

<code>$ flink run -c cn.sevenyuan.wordcount.SocketTextStreamWordCount target/flink-quick-start-1.0-SNAPSHOT.jar
/<code>

-c 參數是指定運行的主程序入口,接著我們去查看監控面板,可以發現任務狀態已經處於監控中:


Flink 基礎學習(二)搭建一個


頂部信息講的是運行程序名字、時間、時間線、配置參數等信息,底下 Name 一欄,說明該程序邏輯步驟(讀取數據源,進行映射處理,使用 keyBy 和聚合運算,最後輸出到【打印 sink】)

2.4 輸入數據 & 驗證結果

前面驗證了程序正常啟動,接下來我們來驗證輸入和輸出

先來監聽輸出,進入 Flink 的日誌目錄,接著通過 tail 命令監聽任務執行者 TaskWorkder(默認會啟動一個任務執行者,所以編碼為 0) 的日誌輸出

<code>$ usr/local/Cellar/apache-flink/1.9.0/libexec/log
$ tail -400f flink*-taskexecutor-0*.out

/<code>

接著,在 nc -l 9000 對應的終端窗口中輸入如下數據:

<code>$ nc -l 9000
hello world
test world
test hello
hello my world
/<code>

最後就能夠看到以下輸出結果:

<code>(hello,1)
(world,1)

(test,1)
(world,2)
(test,2)
(hello,2)
(hello,3)
(my,1)
(world,3)
/<code>

每行字符以空格進行分割,然後分別進行彙總統計,得到的輸出結果一致。

3 擴展閱讀

如果你在官網閱覽,應該也曾看到過 TimeWindow 時間窗口的例子,下面是 Demo 代碼


<code>public class SocketWindowWordCount {

public static void main(String[] args) throws Exception {

// the port to connect to
String hostName = "127.0.0.1";
int port = 9000;

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data by connecting to the socket
DataStream<string> text = env.socketTextStream("localhost", port, "\\n");

// parse the data, group it, window it, and aggregate the counts
DataStream<wordwithcount> windowCounts = text
.flatMap(new FlatMapFunction<string>() {
@Override
public void flatMap(String value, Collector<wordwithcount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))

.reduce(new ReduceFunction<wordwithcount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.getWord(), a.getCount() + b.getCount());
}
});

// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);

env.execute("Socket Window WordCount");
}
}
/<wordwithcount>/<wordwithcount>/<string>/<wordwithcount>/<string>/<code>

這裡的程序代碼核心點在於,比之前的多了一個算子 timeWindow,並且有兩個參數,分別是時間窗口大小以及滑動窗口大小(Time size, Time slide),下面是簡單的輸入和輸出示意圖:


Flink 基礎學習(二)搭建一個


由於滑動窗口大小是 1s,窗口是有重合的部分,然後每秒統計自己所在窗口的數據(5s 內傳輸過來的數據),可以看到第 6s 時,已經捨棄掉第 0s 輸入的字符串數據。

小夥伴們也可以修改一下時間窗口大小和滑動窗口大小,然後輸入自定義的數據,進行不同參數的設置,看下輸出效果如何,是否有達到自己的預期。

這裡先初步接觸一下 時間(Time)和窗口(Window)概念,之後慢慢接觸逐步加深理解吧。


4 總結

本文基於 Mac 系統、 Apache Flink 1.9 版本進行了項目搭建和 Demo 編寫,介紹了 Suorce -> Transformation -> Sink 的流程。簡單的實現了一個字符計數器,往套接字數據源 SocketTextStream,源源不斷的輸入,然後進行統計出現的次數,如有疑惑或不對之處請與我討論~


分享到:


相關文章: