一文搞懂 FlinkX,基于flink的分布式数据同步工具

1 什么是FlinkX

  • FlinkX是在是袋鼠云内部广泛使用的基于flink的分布式离线数据同步框架,实现了多种异构数据源之间高效的数据迁移。

不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

一文搞懂 FlinkX,基于flink的分布式数据同步工具

2 工作原理

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行,工作原理如下图:

一文搞懂 FlinkX,基于flink的分布式数据同步工具

3 快速起步

3.1 运行模式

  • 单机模式:对应Flink集群的单机模式
  • standalone模式:对应Flink集群的分布式模式
  • yarn模式:对应Flink集群的yarn模式

3.2 执行环境

  • Java: JDK8及以上
  • Flink集群: 1.4及以上(单机模式不需要安装Flink集群)
  • 操作系统:理论上不限,但是目前只编写了shell启动脚本,用户可以可以参考shell脚本编写适合特定操作系统的启动脚本。

3.3 打包

进入项目根目录,使用maven打包:

<code>mvn clean package -Dmaven.test.skip/<code>

打包结束后,项目根目录下会产生bin目录和plugins目录,其中bin目录包含FlinkX的启动脚本,plugins目录下存放编译好的数据同步插件包

3.4 启动

3.4.1 命令行参数选项

  • model描述:执行模式,也就是flink集群的工作模式local: 本地模式standalone: 独立部署模式的flink集群yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"必选:否默认值:local
  • job描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。必选:是默认值:无
  • pluginRoot描述:插件根目录地址,也就是打包后产生的pluginRoot目录。必选:是默认值:无
  • flinkconf描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf必选:否默认值:无
  • yarnconf描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop必选:否默认值:无

3.4.2 启动数据同步任务

  • 以本地模式启动数据同步任务
<code>bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*/<code>
  • 以standalone模式启动数据同步任务
<code>bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json  -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*/<code>
  • 以yarn模式启动数据同步任务
<code>bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json  -pluginRoot /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*/<code>

4 数据同步任务模版

从最高空俯视,一个数据同步的构成很简单,如下:

<code>{
    "job": {
        "setting": {...},
        "content": [...]
    }
}/<code>

数据同步任务包括一个job元素,而这个元素包括setting和content两部分。

  • setting: 用于配置限速、错误控制和脏数据管理
  • content: 用于配置具体任务信息,包括从哪里来(Reader插件信息),到哪里去(Writer插件信息)

4.1 setting

<code>    "setting": {
        "speed": {...},
        "errorLimit": {...},
        "dirty": {...}
    }/<code>

setting包括speed、errorLimit和dirty三部分,分别描述限速、错误控制和脏数据管理的配置信息

4.1.1 speed

<code>            "speed": {
                 "channel": 3,
                 "bytes": 0
            }/<code>
  • channel: 任务并发数
  • bytes: 每秒字节数,默认为 Long.MAX_VALUE

4.1.2 errorLimit

<code>            "errorLimit": {
                "record": 10000,
                "percentage": 100
            }/<code>
  • record: 出错记录数超过record设置的条数时,任务标记为失败
  • percentage: 当出错记录数超过percentage百分数时,任务标记为失败

4.1.3 dirty

<code>        "dirty": {
                "path": "/tmp",
                "hadoopConfig": {
                    "fs.default.name": "hdfs://ns1",
                    "dfs.nameservices": "ns1",
                    "dfs.ha.namenodes.ns1": "nn1,nn2",
                    "dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
                    "dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
                    "dfs.ha.automatic-failover.enabled": "true",
                    "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                    "fs.hdfs.impl.disable.cache": "true"
                }
            }/<code>
  • path: 脏数据存放路径
  • hadoopConfig: 脏数据存放路径对应hdfs的配置信息(hdfs高可用配置)

4.1.4 restore

<code>"restore": {

        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      }/<code>

restore配置请参考断点续传

4.2 content

<code>        "content": [
            {
               "reader": {
                    "name": "...",
                    "parameter": {
                        ...
                    }
                },
               "writer": {
                    "name": "...",
                    "parameter": {
                         ...
                     }
                }
            }
        ]/<code>
  • reader: 用于读取数据的插件的信息
  • writer: 用于写入数据的插件的信息

reader和writer包括name和parameter,分别表示插件名称和插件参数

4.3 数据同步任务例子

详见flinkx-examples子工程


代码地址:https://github.com/DTStack/flinkx

我们已经开始应用了,小伙伴们赶紧来尝鲜吧!


分享到:


相關文章: