大數據系列:使用Apache Sqoop進行增量數據的加載

大數據系列:使用Apache Sqoop進行增量數據的加載

介紹:

Apache Sqoop(發音:skup)是一款開源的工具,主要用於在Hadoop(Hive)與傳統的數據庫(mysql、postgresql...)間進行數據的傳遞,可以將一個關係型數據庫(例如 : MySQL ,Oracle ,Postgres等)中的數據導進到Hadoop的HDFS中,也可以將HDFS的數據導進到關係型數據庫中。

數據加載

數據可以一次性全部加載到HDFS中,也可以增量加載。
在本文中,我們將探討兩種將關係數據庫中的數據增量加載到HDFS的技術
(1)增量追加
(2)增量最後修改

注意: 本文假定您具有RDBMS,Sql,Hadoop,Sqoop和HDFS的基本知識。
我們將從默認安裝在Hadoop上的MySQL加載數據

為了逐步加載數據,我們創建了 sqoop作業 ,而不是運行一次sqoop腳本。
Sqoop作業存儲元數據信息,例如最後更新值,增量模式,文件格式,輸出目錄等,這些信息作為增量加載數據時的參考。

增量追加

當在源數據庫表中以增加的id(鍵)值連續添加新行時,使用此技術。

1.創建一個示例表併為其填充值

CREATE TABLE stocks (
id INT NOT NULL AUTO_INCREMENT,
symbol varchar(10),
name varchar(40),
trade_date DATE,
close_price DOUBLE,
volume INT,
updated_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ,
PRIMARY KEY ( id )
);


INSERT INTO stocks
(symbol, name, trade_date, close_price, volume)
VALUES
('AAL', 'American Airlines', '2015-11-12', 42.4, 4404500);

INSERT INTO stocks
(symbol, name, trade_date, close_price, volume)
VALUES
('AAPL', 'Apple', '2015-11-12', 115.23, 40217300);

INSERT INTO stocks
(symbol, name, trade_date, close_price, volume)
VALUES
('AMGN', 'Amgen', '2015-11-12', 157.0, 1964900);

我們可以在下面的屏幕截圖中看到股票表有三行。

大數據系列:使用Apache Sqoop進行增量數據的加載

MySQL中的庫存表

2.在該表上授予特權

在stocks表上授予localhost上所有用戶的特權

將test.stocks上的所有特權授予``@'localhost';

大數據系列:使用Apache Sqoop進行增量數據的加載

將test.stocks上的所有特權授予``@'localhost';

3.使用增量追加選項創建並執行Sqoop作業

sqoop job — create incrementalappendImportJob — import — connect jdbc:mysql://localhost/test — username root — password hortonworks1 — table stocks — target-dir /user/hirw/sqoop/stocks_append — incremental append — check-column id -m 1
大數據系列:使用Apache Sqoop進行增量數據的加載

列出Sqoop Job ,以檢查是否已成功創建

sqoop job — list

大數據系列:使用Apache Sqoop進行增量數據的加載

sqoop job — list

使用show job選項 可以檢查Sqoop作業的重要元數據信息,例如表名,增量列,增量模式等。

sqoop job — show incrementalappendImportJob

大數據系列:使用Apache Sqoop進行增量數據的加載

sqoop job — show incrementalappendImportJob

默認情況下, 增量列是表的主鍵列, 在這種情況下是id列。

執行Sqoop作業 並觀察寫入HDFS的記錄

sqoop job — exec incrementalappendImportJob

大數據系列:使用Apache Sqoop進行增量數據的加載

sqoop job — exec incrementalappendImportJob

4.觀察作業中的元數據信息

Incremental.last.value字段存儲在HDFS中導入的id字段的最後一個值。
此值現在更新為 3。

大數據系列:使用Apache Sqoop進行增量數據的加載

sqoop job — show incrementalappendImportJob

下次運行此作業時,它將僅加載id值大於3的那些行。

檢查HDFS文件系統
我們可以觀察到所有三個記錄都已寫入HDFS文件系統

大數據系列:使用Apache Sqoop進行增量數據的加載

HDFS Filesystem /user/hirw/sqoop/stocks_append/part-m-00000

4.在源表中插入值

INSERT INTO stocks 
(symbol, name, trade_date, close_price, volume)
VALUES
('GARS', 'Garrison', '2015-11-12', 12.4, 23500);

INSERT INTO stocks
(symbol, name, trade_date, close_price, volume)
VALUES
('SBUX', 'Starbucks', '2015-11-12', 62.90, 4545300);

INSERT INTO stocks
(symbol, name, trade_date, close_price, volume)
VALUES
('SGI', 'Silicon Graphics', '2015-11-12', 4.12, 123200);
大數據系列:使用Apache Sqoop進行增量數據的加載

5.再次執行Sqoop作業,並觀察HDFS中的輸出

sqoop job — exec incrementalappendImportJob

大數據系列:使用Apache Sqoop進行增量數據的加載

在HDFS中觀察數據已增量加載

作業已加載ID值大於3的所有行

大數據系列:使用Apache Sqoop進行增量數據的加載

HDFS Filesystem /user/hirw/sqoop/stocks_append/part-m-00001

6.觀察作業中的元數據信息

Incremental.last.value已從3更新到6!

大數據系列:使用Apache Sqoop進行增量數據的加載

大數據系列:使用Apache Sqoop進行增量數據的加載

增量最後修改

當更新源數據庫表中的行時使用此技術。
它類似於增量追加,但它也跟蹤對錶進行的更新,我們需要在源表中使用updated_on列來跟蹤正在進行的更新。

1.創建增量上次修改作業

我們使用- incremental lastmodified選項而不是追加

sqoop job — create incrementalImportModifiedJob — import — connect jdbc:mysql://localhost/test — username root — password hortonworks1 — table stocks — target-dir /user/hirw/sqoop/stocks_modified — incremental lastmodified — check-column updated_time -m 1 — append
大數據系列:使用Apache Sqoop進行增量數據的加載

2.觀察作業中的元數據信息

觀察增量。模式設置為DateLastModified和增量.col設置為updated_time

sqoop job — show incrementalImportModifiedJob

3.執行Sqoop作業並觀察元數據信息

sqoop job — exec incrementalImportModifiedJob

執行後,我們可以觀察到所有六行都已加載到HDFS文件系統中。

大數據系列:使用Apache Sqoop進行增量數據的加載

HDFS Filesystem /user/hirw/sqoop/stocks_modified/part-m-00000

sqoop job — show incrementalImportModifiedJob

我們可以看到,incremental.last.value設置為stocks表的updated_time列的最大時間戳。
在下一次運行中,作業將加載具有updated_time值大於以下屏幕快照中突出顯示的時間戳記的行。

大數據系列:使用Apache Sqoop進行增量數據的加載

4.修改源表中的數據

我們更新兩行並插入三行,總共有五處更改。

--

UPDATE stocks SET volume = volume+100, updated_time=now() WHERE id in (2,3);

----

INSERT INTO stocks
(symbol, name, trade_date, close_price, volume)
VALUES
('GARS', 'Garrison', '2015-11-12', 12.4, 23500);

INSERT INTO stocks
(symbol, name, trade_date, close_price, volume)
VALUES
('SBUX', 'Starbucks', '2015-11-12', 62.90, 4545300);

INSERT INTO stocks
(symbol, name, trade_date, close_price, volume)
VALUES
('SGI', 'Silicon Graphics', '2015-11-12', 4.12, 123200);

--

5.執行Sqoop作業並觀察元數據信息

sqoop job — exec incrementalImportModifiedJob
大數據系列:使用Apache Sqoop進行增量數據的加載

我們插入並更新了五個記錄。

大數據系列:使用Apache Sqoop進行增量數據的加載

HDFS Filesystem /user/hirw/sqoop/stocks_modified/part-m-00001

我們已經檢索了已更改的記錄,但這不是預期的結果,因為我們需要合併更改。

6.合併變更

首先,我們使用Sqoop codegen命令為stocks表生成一個JAR文件

sqoop codegen — connect jdbc:mysql://localhost/test — username root — password hortonworks1 — table stocks — outdir /home/hirw/sqoop/sqoop-codegen-stocks

然後將stocks.jar複製到我們的hadoop集群的本地文件系統中

使用Sqoop Merge命令合併更改,並提及要合併更改的主鍵(在本例中為id)。

sqoop merge — new-data /user/hirw/sqoop/stocks_modified/part-m-00001 — onto /user/hirw/sqoop/stocks_modified/part-m-00000 — target-dir /user/hirw/sqoop/stocks_modified/merged — jar-file stocks.jar — class-name stocks — merge-key id

此命令將使用合併密鑰ID將文件part-m-00001中的數據合併到文件part-m-00000中。

觀察合併目錄中的輸出

大數據系列:使用Apache Sqoop進行增量數據的加載

HDFS Filesystem /user/hirw/sqoop/stocks_modified/merged/part-r-00000

至此,我們已經成功合併了兩個文件中的更改!

總結:

本文探討了兩種將關係數據庫中的數據增量加載到HDFS的技術,希望對大家有所幫助。

譯自:medium

如果發現任何不正確的地方,或者想分享有關上述主題的更多信息,歡迎反饋。


分享到:


相關文章: