windows flink開發環境搭建java實例

flink是當下火熱流行的大數據框架,支持高吞吐、低延遲、高性能,自帶時間事件,支持有狀態計算和窗口操作,支持任務重啟從最終計算點開始不必重新運行整個任務。作者:悲喜世界;來源:開源中國

windows flink開發環境搭建java實例

說完flink特點,開始搭建windows下開發的環境,運行環境如下:

系統:windows10

jdk版本:oracle jdk1.8,最低1.8

scala:2.11.12,2.11是大部分軟件的開發版本,適應性強,且比較新。

maven:3.3.3,最低要求3.1

flink:1.9.1

開發工具:idea 2019.3.1

jdk、scala、maven安裝方式,自行百度,不在贅述,win+r,可查看到版本接口。

windows flink開發環境搭建java實例

準備好基礎環境,進入正題。

1、安裝flink

下載地址:https://flink.apache.org/downloads.html#apache-flink-191

windows flink開發環境搭建java實例

選擇對應版本,下載即可,下載後的安裝包是tgz壓縮文件,直接解壓到安裝路徑。

雙擊運行bin目錄下start-cluster.bat文件,flink啟動,可以http://localhost:8081看到flink運行頁面。

windows flink開發環境搭建java實例

2、創建項目

打開idea,選擇File --> New --> Project ,選擇mave直接next。

windows flink開發環境搭建java實例

輸入項目名和項目路徑,點擊finish即可。

windows flink開發環境搭建java實例

修改pom.xml指定flink相關配置,內容如下:

<code>    <properties>
<maven.compiler.source>1.8/<maven.compiler.source>
<maven.compiler.target>1.8/<maven.compiler.target>
<encoding>UTF-8/<encoding>
<flink.version>1.9.1/<flink.version>
<scala.version>2.11/<scala.version>
/<properties>

<dependencies>




<dependency>
<groupid>org.apache.flink/<groupid>
<artifactid>flink-scala_${scala.version}/<artifactid>
<version>${flink.version}/<version>
<scope>provided/<scope>
/<dependency>

<dependency>
<groupid>org.apache.flink/<groupid>
<artifactid>flink-streaming-scala_${scala.version}/<artifactid>
<version>${flink.version}/<version>
<scope>provided/<scope>
/<dependency>

/<dependencies>/<code>

項目運行發生類加載異常,詳情如下:

windows flink開發環境搭建java實例

需要導入flink安裝目錄下jar包,選擇File --> Project Structure --> Modules,選擇Dependencies --> 右邊加號 --> JARs or directorie

windows flink開發環境搭建java實例

找到剛剛flink安裝目錄,指定到lib目錄下:

windows flink開發環境搭建java實例

點擊apply和ok退出配置。

3、編寫代碼

在src.main目錄下增加scala源目錄,最終目錄結構如下:

windows flink開發環境搭建java實例

創建項目讀取的txt文件, 在src目錄下創建demo.txt。內容如下,空格分隔。

<code>a b d 的
ada a 的
ada a/<code>

java目錄下創建WordCountJava類,代碼如下

<code>import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
* java版本flink模型
*/
public class WordCountJava {
public static void main(String[] args) throws Exception {
//第一步:獲取運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//第二步:從文件中讀取
DataStreamSource text = env.readTextFile("./src/demo.txt");
//第三步:計算數據
DataStream windowCount = text.flatMap(new FlatMapFunction<string>() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] splits = value.split("\\\\s");
for (String word : splits) {
out.collect(new WordWithCount(word, 1L));
}
}
}).keyBy("word").sum("count"); //.timeWindow(Time.seconds(2), Time.seconds(1))

//把數據打印到控制檯
windowCount.print().setParallelism(1);//使用一個並行度
//注意:因為flink是懶加載的,所以必須調用execute方法,上面的代碼才會執行
env.execute("streaming word count");
}

public static class WordWithCount {
public String word;
public long count;

public WordWithCount() {
}


public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}

@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\\'' +
", count=" + count +
'}';
}
}

}/<string>/<code>

右鍵run項目,執行結果如下:

windows flink開發環境搭建java實例

3、總結

首次嘗試flink開發,相比hadoop,環境搭建簡單了很多,不需要裝虛擬機,後續更新更多代碼,豐富技能儲備。


分享到:


相關文章: