本文就flink流計算的任務工程模板做一介紹,初始數據源和計算結果數據存儲都是通過JDBC連接到數據庫。本文以mysql為數據庫進行演示。
依賴包
<code><dependency>
<groupid>mysql/<groupid>
<artifactid>mysql-connector-java/<artifactid>
<version>8.0.15/<version>
/<dependency>/<code>
工程結構說明
本工程包含以下幾個部分:
1. 結構化數據定義(DTO),本文以常見的學生基本信息為示例。
<code>package com.crazyice.lee.data;
import lombok.*;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString(exclude = {"password","age"})
@RequiredArgsConstructor()
public class Student {
private int studentId;
private String name;
private String password;
private boolean sex;
private int age;
}/<code>
2. 任務編排,這裡演示了一個完整流任務的各個關鍵環節,包括:環境配置、源數據讀取(DataStreamSource)、處理過程(數據過濾篩選)、處理結果輸出入庫(DataSink)、啟動任務執行等。
<code>package com.crazyice.lee.jobs;
import com.crazyice.lee.data.Student;
import com.crazyice.lee.reader.JdbcReader;
import com.crazyice.lee.writer.JdbcWriter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@Slf4j
public class Mysql2MysqlJob {
public static void main(String[] args) throws Exception {
//設置環境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數據-數據源
DataStreamSource<student> students = env.addSource(new JdbcReader());
students.name("從mysql讀取數據");
//處理數據過程
DataStream<student> subStudents = students
.filter(e -> e.isSex()).name("過濾男生")
.filter(e -> e.getAge() > 12).name("過濾12歲以上");
//寫入數據-處理結果
subStudents.addSink(new JdbcWriter()).name("寫入備份學生庫");
env.execute("讀取Mysql,數據過濾,寫入Mysql");
}
}/<student>/<student>/<code>
3. 初始數據源、結果寫入數據源配置,這部分通過實現RichSourceFunction、RichSinkFunction接口並配合配置文件實現,是相對固定的模式,如果要提高編程效率,可以考慮使用模板及配置的方式動態實現,因為代碼結構固定,所以不在此處粘貼,如果感興趣可以通過文章下面的開源鏈接查看。
測試驗證
flink支持本地測試模式,可以在編碼過程中排查錯誤,只需要在環境設置部分使用本地環境對象即可(將原先的getExecutionEnvironment()替換為createLocalEnvironment())也可以使用profile來進行編譯配置。
<code>//設置環境
final ExecutionEnvironment environment = properties.getProperty("local.running").equalsIgnoreCase("true") ? StreamExecutionEnvironment.createLocalEnvironment() : StreamExecutionEnvironment.getExecutionEnvironment();/<code>
源代碼:(https://gitee.com/crazyicelee/flink)
閱讀更多 瘋冰無極 的文章