flink任務編程-java版-流-數據庫to數據庫

本文就flink流計算的任務工程模板做一介紹,初始數據源和計算結果數據存儲都是通過JDBC連接到數據庫。本文以mysql為數據庫進行演示。

依賴包

<code><dependency>

<groupid>mysql/<groupid>

<artifactid>mysql-connector-java/<artifactid>

<version>8.0.15/<version>

/<dependency>/<code>


工程結構說明


flink任務編程-java版-流-數據庫to數據庫

工程結構

本工程包含以下幾個部分:

1. 結構化數據定義(DTO),本文以常見的學生基本信息為示例。

<code>package com.crazyice.lee.data;

import lombok.*;

@Data

@AllArgsConstructor

@NoArgsConstructor

@ToString(exclude = {"password","age"})

@RequiredArgsConstructor()

public class Student {

private int studentId;

private String name;

private String password;

private boolean sex;

private int age;

}/<code>

2. 任務編排,這裡演示了一個完整流任務的各個關鍵環節,包括:環境配置、源數據讀取(DataStreamSource)、處理過程(數據過濾篩選)、處理結果輸出入庫(DataSink)、啟動任務執行等。

<code>package com.crazyice.lee.jobs;

import com.crazyice.lee.data.Student;

import com.crazyice.lee.reader.JdbcReader;

import com.crazyice.lee.writer.JdbcWriter;

import lombok.extern.slf4j.Slf4j;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

@Slf4j

public class Mysql2MysqlJob {

public static void main(String[] args) throws Exception {

//設置環境

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//獲取數據-數據源

DataStreamSource<student> students = env.addSource(new JdbcReader());

students.name("從mysql讀取數據");

//處理數據過程

DataStream<student> subStudents = students

.filter(e -> e.isSex()).name("過濾男生")

.filter(e -> e.getAge() > 12).name("過濾12歲以上");

//寫入數據-處理結果

subStudents.addSink(new JdbcWriter()).name("寫入備份學生庫");

env.execute("讀取Mysql,數據過濾,寫入Mysql");

}


}/<student>/<student>/<code>

3. 初始數據源、結果寫入數據源配置,這部分通過實現RichSourceFunction、RichSinkFunction接口並配合配置文件實現,是相對固定的模式,如果要提高編程效率,可以考慮使用模板及配置的方式動態實現,因為代碼結構固定,所以不在此處粘貼,如果感興趣可以通過文章下面的開源鏈接查看。

測試驗證

flink支持本地測試模式,可以在編碼過程中排查錯誤,只需要在環境設置部分使用本地環境對象即可(將原先的getExecutionEnvironment()替換為createLocalEnvironment())也可以使用profile來進行編譯配置。

<code>//設置環境
final ExecutionEnvironment environment = properties.getProperty("local.running").equalsIgnoreCase("true") ? StreamExecutionEnvironment.createLocalEnvironment() : StreamExecutionEnvironment.getExecutionEnvironment();/<code>

源代碼:(https://gitee.com/crazyicelee/flink)


分享到:


相關文章: