1 什麼是FlinkX
- FlinkX是在是袋鼠雲內部廣泛使用的基於flink的分佈式離線數據同步框架,實現了多種異構數據源之間高效的數據遷移。
不同的數據源頭被抽象成不同的Reader插件,不同的數據目標被抽象成不同的Writer插件。理論上,FlinkX框架可以支持任意數據源類型的數據同步工作。作為一套生態系統,每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。
2 工作原理
在底層實現上,FlinkX依賴Flink,數據同步任務會被翻譯成StreamGraph在Flink上執行,工作原理如下圖:
3 快速起步
3.1 運行模式
- 單機模式:對應Flink集群的單機模式
- standalone模式:對應Flink集群的分佈式模式
- yarn模式:對應Flink集群的yarn模式
3.2 執行環境
- Java: JDK8及以上
- Flink集群: 1.4及以上(單機模式不需要安裝Flink集群)
- 操作系統:理論上不限,但是目前只編寫了shell啟動腳本,用戶可以可以參考shell腳本編寫適合特定操作系統的啟動腳本。
3.3 打包
進入項目根目錄,使用maven打包:
<code>mvn clean package -Dmaven.test.skip/<code>
打包結束後,項目根目錄下會產生bin目錄和plugins目錄,其中bin目錄包含FlinkX的啟動腳本,plugins目錄下存放編譯好的數據同步插件包
3.4 啟動
3.4.1 命令行參數選項
- model描述:執行模式,也就是flink集群的工作模式local: 本地模式standalone: 獨立部署模式的flink集群yarn: yarn模式的flink集群,需要提前在yarn上啟動一個flink session,使用默認名稱"Flink session cluster"必選:否默認值:local
- job描述:數據同步任務描述文件的存放路徑;該描述文件中使用json字符串存放任務信息。必選:是默認值:無
- pluginRoot描述:插件根目錄地址,也就是打包後產生的pluginRoot目錄。必選:是默認值:無
- flinkconf描述:flink配置文件所在的目錄(單機模式下不需要),如/hadoop/flink-1.4.0/conf必選:否默認值:無
- yarnconf描述:Hadoop配置文件(包括hdfs和yarn)所在的目錄(單機模式下不需要),如/hadoop/etc/hadoop必選:否默認值:無
3.4.2 啟動數據同步任務
- 以本地模式啟動數據同步任務
<code>bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*/<code>
- 以standalone模式啟動數據同步任務
<code>bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*/<code>
- 以yarn模式啟動數據同步任務
<code>bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json -pluginRoot /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*/<code>
4 數據同步任務模版
從最高空俯視,一個數據同步的構成很簡單,如下:
<code>{ "job": { "setting": {...}, "content": [...] } }/<code>
數據同步任務包括一個job元素,而這個元素包括setting和content兩部分。
- setting: 用於配置限速、錯誤控制和髒數據管理
- content: 用於配置具體任務信息,包括從哪裡來(Reader插件信息),到哪裡去(Writer插件信息)
4.1 setting
<code> "setting": { "speed": {...}, "errorLimit": {...}, "dirty": {...} }/<code>
setting包括speed、errorLimit和dirty三部分,分別描述限速、錯誤控制和髒數據管理的配置信息
4.1.1 speed
<code> "speed": { "channel": 3, "bytes": 0 }/<code>
- channel: 任務併發數
- bytes: 每秒字節數,默認為 Long.MAX_VALUE
4.1.2 errorLimit
<code> "errorLimit": { "record": 10000, "percentage": 100 }/<code>
- record: 出錯記錄數超過record設置的條數時,任務標記為失敗
- percentage: 當出錯記錄數超過percentage百分數時,任務標記為失敗
4.1.3 dirty
<code> "dirty": { "path": "/tmp", "hadoopConfig": { "fs.default.name": "hdfs://ns1", "dfs.nameservices": "ns1", "dfs.ha.namenodes.ns1": "nn1,nn2", "dfs.namenode.rpc-address.ns1.nn1": "node02:9000", "dfs.namenode.rpc-address.ns1.nn2": "node03:9000", "dfs.ha.automatic-failover.enabled": "true", "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", "fs.hdfs.impl.disable.cache": "true" } }/<code>
- path: 髒數據存放路徑
- hadoopConfig: 髒數據存放路徑對應hdfs的配置信息(hdfs高可用配置)
4.1.4 restore
<code>"restore": { "isRestore": false, "restoreColumnName": "", "restoreColumnIndex": 0 }/<code>
restore配置請參考斷點續傳
4.2 content
<code> "content": [ { "reader": { "name": "...", "parameter": { ... } }, "writer": { "name": "...", "parameter": { ... } } } ]/<code>
- reader: 用於讀取數據的插件的信息
- writer: 用於寫入數據的插件的信息
reader和writer包括name和parameter,分別表示插件名稱和插件參數
4.3 數據同步任務例子
詳見flinkx-examples子工程
代碼地址:https://github.com/DTStack/flinkx
我們已經開始應用了,小夥伴們趕緊來嚐鮮吧!