flink是當下火熱流行的大數據框架,支持高吞吐、低延遲、高性能,自帶時間事件,支持有狀態計算和窗口操作,支持任務重啟從最終計算點開始不必重新運行整個任務。作者:悲喜世界;來源:開源中國
說完flink特點,開始搭建windows下開發的環境,運行環境如下:
系統:windows10
jdk版本:oracle jdk1.8,最低1.8
scala:2.11.12,2.11是大部分軟件的開發版本,適應性強,且比較新。
maven:3.3.3,最低要求3.1
flink:1.9.1
開發工具:idea 2019.3.1
jdk、scala、maven安裝方式,自行百度,不在贅述,win+r,可查看到版本接口。
準備好基礎環境,進入正題。
1、安裝flink
下載地址:https://flink.apache.org/downloads.html#apache-flink-191
選擇對應版本,下載即可,下載後的安裝包是tgz壓縮文件,直接解壓到安裝路徑。
雙擊運行bin目錄下start-cluster.bat文件,flink啟動,可以http://localhost:8081看到flink運行頁面。
2、創建項目
打開idea,選擇File --> New --> Project ,選擇mave直接next。
輸入項目名和項目路徑,點擊finish即可。
修改pom.xml指定flink相關配置,內容如下:
<code> <properties>
<maven.compiler.source>1.8/<maven.compiler.source>
<maven.compiler.target>1.8/<maven.compiler.target>
<encoding>UTF-8/<encoding>
<flink.version>1.9.1/<flink.version>
<scala.version>2.11/<scala.version>
/<properties>
<dependencies>
<dependency>
<groupid>org.apache.flink/<groupid>
<artifactid>flink-scala_${scala.version}/<artifactid>
<version>${flink.version}/<version>
<scope>provided/<scope>
/<dependency>
<dependency>
<groupid>org.apache.flink/<groupid>
<artifactid>flink-streaming-scala_${scala.version}/<artifactid>
<version>${flink.version}/<version>
<scope>provided/<scope>
/<dependency>
/<dependencies>/<code>
項目運行發生類加載異常,詳情如下:
需要導入flink安裝目錄下jar包,選擇File --> Project Structure --> Modules,選擇Dependencies --> 右邊加號 --> JARs or directorie
找到剛剛flink安裝目錄,指定到lib目錄下:
點擊apply和ok退出配置。
3、編寫代碼
在src.main目錄下增加scala源目錄,最終目錄結構如下:
創建項目讀取的txt文件, 在src目錄下創建demo.txt。內容如下,空格分隔。
<code>a b d 的
ada a 的
ada a/<code>
java目錄下創建WordCountJava類,代碼如下
<code>import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* java版本flink模型
*/
public class WordCountJava {
public static void main(String[] args) throws Exception {
//第一步:獲取運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//第二步:從文件中讀取
DataStreamSource text = env.readTextFile("./src/demo.txt");
//第三步:計算數據
DataStream windowCount = text.flatMap(new FlatMapFunction<string>() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] splits = value.split("\\\\s");
for (String word : splits) {
out.collect(new WordWithCount(word, 1L));
}
}
}).keyBy("word").sum("count"); //.timeWindow(Time.seconds(2), Time.seconds(1))
//把數據打印到控制檯
windowCount.print().setParallelism(1);//使用一個並行度
//注意:因為flink是懶加載的,所以必須調用execute方法,上面的代碼才會執行
env.execute("streaming word count");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\\'' +
", count=" + count +
'}';
}
}
}/<string>/<code>
右鍵run項目,執行結果如下:
3、總結
首次嘗試flink開發,相比hadoop,環境搭建簡單了很多,不需要裝虛擬機,後續更新更多代碼,豐富技能儲備。
閱讀更多 加米穀大數據 的文章