Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

很多情況大數據集群需要獲取業務數據,用於分析。通常有兩種方式:

  • 業務直接或間接寫入的方式
  • 業務的關係型數據庫同步到大數據集群的方式

第一種可以是在業務中編寫代碼,將覺得需要發送的數據發送到消息隊列,最終落地到大數據集群。

第二種則是通過數據同步的方式,將關係型數據同步到大數據集群,可以是存儲在 hdfs 上,使用 hive 進行分析,或者是直接存儲到 hbase 中。

其中數據同步又可以大致分為兩種:增量同步、CRUD 同步。

增量同步是隻將關係型數據庫中新增的數據進行同步,對於修改、刪除操作不進行同步,這種同步方式適用於那些一旦生成就不會變動的數據。 CRUD 同步則是數據的增、刪、改都需要進行同步,保證兩個庫中的數據一致性。

本文不講 binlog + Canal + 消息隊列 + JAR 實現數據實時同步的方案,也不講使用 Sqoop 進行離線同步。而是講解如何使用 Streamsets 零代碼完成整個實時同步流程。關於 Streamsets 具體是什麼,以及能做哪些其他的事情,大家可以前往 Streamsets 官網進行了解。從筆者瞭解的信息,在數據同步方面 Streamsets 十分好用。

要實現 mysql 數據的實時同步,首先我們需要打開其 binlog 模式,具體怎麼操作網上有很多教程,這裡就不進行闡述了。

那麼,現在就直接進入正題吧。

安裝

下載

Streamsets 可以直接從官網下載: archives.streamsets.com

這裡安裝的是 Core Tarball 格式,當然你也可以直接選擇下載 Full Tarball、Cloudera Parcel 或者其他格式。下載 Core Tarball 的好處是體積小,後期需要什麼庫的時候可以自行在 Streamsets Web 頁進行下載。相對於 Core Tarball,Full Tarball 默認幫你下載了很多庫,但是文件體積相對較大(>4G),並且可能很多庫我們暫時使用不到。

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

或者你可以直接使用這個鏈接進行下載:archives.streamsets.com/datacollect…

解壓啟動

Streamsets Core Tarball 下載好後,直接解壓就可以使用,非常方便。

tar xvzf streamsets-datacollector-core-3.7.1.tgz

cd streamsets-datacollector-3.7.1/bin/

./streamsets dc

Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS

Configuration of maximum open file limit is too low: 1024 (expected at least 32768). Please consult https://goo.gl/6dmjXd

如果在運行的時候遇到上面的報錯,修改操作系統的 open files 限制數量即可。

#vi /etc/security/limits.conf

添加兩行內容:

  • soft nofile 65536
  • hard nofile 65536

運行 'ulimit -n' 既可以看到 open files 設置值已生效。

Web 頁

Streamsets 擁有一個 Web 頁,默認端口是 18630。瀏覽器中輸入 ip:18630 即可進入 streamsets 的頁面,默認用戶名、密碼都是 admin。

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

Pipeline

準備工作

因為需要將 mysql 的數據實時同步到 hbase 中,但是下載的 Core Tarball 中沒有 MySQL Binary Log 以及 hbase 兩個 stage library,所以在 create new pipeline 之前需要先安裝它們。

安裝 MySQL Binary Log 庫

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

安裝 Hbase 庫,這裡注意一下,hbase 庫位於 CDH 中,所以選擇一個 CDH 版本進行安裝

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

安裝好後在 Installed Stage Libraries 中就能看到已經安裝了 MySQL Binary Log 和 Hbase

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

創建 Pipeline

MySQL Binary Log

創建一個 MySQL Binary Log

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

設置 mysql 的連接參數(Hostname, Port 以及 Server ID),這裡的 Server ID 與 mysql 配置文件(一般是 /etc/my.cnf)中的 server-id 保持一致

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

設置 mysql 的用戶名、密碼

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

其他設置:我們在 Include Tables 欄設置了兩張表(表與表之間用逗號隔開),意思是監控這兩張表的數據變化,其他表不關心。

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

Stream Selector

創建一個 Stream Selector,並將剛剛創建的 MySQL Binary Log 指向這個 Stream Selector。 設置過濾條件, 比如說 ${record:value("/Table")=='cartype'} 就是過濾 cartype 表。

可以看到 Stream Selector 有兩個出口(1 和 2),後面我們將會看到: 1 輸出到 Hbase, 2 數據到 Trash

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

Hbase & Trash

分別創建 Hbase 和 Trash,連接到 Stream Selector 上

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

配置 Hbase

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

Trash 無需進行配置

驗證 & 啟動

驗證

點擊右上角的“眼鏡”,驗證整個流程是否有問題。

這裡報錯:"java.lang.RuntimeException:Unable to get driver instance for jdbcUrl"。這個報錯的原因是缺少 mysql 連接的 jar 包。解決起來也很簡單,下載一個 jar 包然後放到 streamsets 指定的目錄下。我這邊的完整目錄是:/opt/streamsets/streamsets-datacollector-3.7.1/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib/mysql-connector-java-5.1.26-bin.jar, mysql-connector-java-5.1.26-bin.jar 就是我下載的 jar 包。

還有一點就是事先要將 Hbase 中相對應的表創建好,不然驗證會提示錯誤。

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

接著在頁面上重啟 streamsets 即可。

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

重新驗證,發現成功了。

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

點擊右上角播放標籤,啟動流程,這樣整個流程就已經完成了(數據已經在進行實時同步),查看各個 Stage 既可以看到有多少數據流入,多少數據流出。也可以直接進入 hbase 數據庫中查看是否有數據生成。

Mysql到Hbase數據如何實時同步,強大的Streamsets告訴你

以上就是如何使用 Streamsets 實時同步 mysql 數據到 hbase 中的整個操作流程。大家肯定發現了,整個流程沒有編寫任何的代碼,相對於 binlog + Canal + 消息隊列 + JAR 的方案是不是高效一些呢。當然任何方案都會有優缺點,Streamsets 這種方案的更多實際體驗還需要更多的觀察。

關注我:私信回覆“555”獲取往期Java高級架構資料、源碼、筆記、視頻Dubbo、Redis、Netty、zookeeper、Spring cloud、分佈式、高併發等架構技術往期架構視頻截圖


分享到:


相關文章: