深入理解Sqoop的導入與導出—文末贈書

Sqoop在導入導出數據時都會使用代碼生成器來創建對應表的實體類,用於保存從表中抽取的記錄,默認存放於“/tmp/sqoop-root/compile/隨機字符/”目錄下,用戶必須手動將此目錄下的實體類jar包上傳至Sqoop的lib目錄下,才可執行導入導出操作。研究Sqoop生成的實體類有助於我們深入理解Sqoop。

例如,之前用到的temp類生成的源碼如下:

// ORM class for table 'temp'

// WARNING: This class is AUTO-GENERATED. Modify at your own risk.

//

// Debug information:

// Generated date: Sun Dec 16 05:30:10 PST 2018

// For connector: org.apache.sqoop.manager.MySQLManager

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.lib.db.DBWritable;

import com.cloudera.sqoop.lib.JdbcWritableBridge;

import com.cloudera.sqoop.lib.DelimiterSet;

import com.cloudera.sqoop.lib.FieldFormatter;

import com.cloudera.sqoop.lib.RecordParser;

import com.cloudera.sqoop.lib.BooleanParser;

import com.cloudera.sqoop.lib.BlobRef;

import com.cloudera.sqoop.lib.ClobRef;

import com.cloudera.sqoop.lib.LargeObjectLoader;

import com.cloudera.sqoop.lib.SqoopRecord;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.nio.ByteBuffer;

import java.nio.CharBuffer;

import java.sql.Date;

import java.sql.Time;

import java.sql.Timestamp;

import java.util.Arrays;

import java.util.Iterator;

import java.util.List;

import java.util.Map;

import java.util.TreeMap;


public class temp extends SqoopRecord implements DBWritable, Writable {

private final int PROTOCOL_VERSION = 3;

public int getClassFormatVersion() { return PROTOCOL_VERSION; }

protected ResultSet __cur_result_set;

private String year;

public String get_year() {

return year;

}

public void set_year(String year) {

this.year = year;

}

public temp with_year(String year) {

this.year = year;

return this;

}

private Integer temperature;

public Integer get_temperature() {

return temperature;

}

public void set_temperature(Integer temperature) {

this.temperature = temperature;

}

public temp with_temperature(Integer temperature) {

this.temperature = temperature;

return this;

}

private Integer quality;

public Integer get_quality() {

return quality;

}

public void set_quality(Integer quality) {

this.quality = quality;

}

public temp with_quality(Integer quality) {

this.quality = quality;

return this;

}

public boolean equals(Object o) {

if (this == o) {

return true;

}

if (!(o instanceof temp)) {

return false;

}

temp that = (temp) o;

boolean equal = true;

equal = equal && (this.year == null ? that.year == null : this.year.equals(that.year));

equal = equal && (this.temperature == null ? that.temperature == null : this.temperature.equals(that.temperature));

equal = equal && (this.quality == null ? that.quality == null : this.quality.equals(that.quality));

return equal;

}

public void readFields(ResultSet __dbResults) throws SQLException {

this.__cur_result_set = __dbResults;

this.year = JdbcWritableBridge.readString(1, __dbResults);

this.temperature = JdbcWritableBridge.readInteger(2, __dbResults);

this.quality = JdbcWritableBridge.readInteger(3, __dbResults);

}

public void loadLargeObjects(LargeObjectLoader __loader)

throws SQLException, IOException, InterruptedException {

}

public void write(PreparedStatement __dbStmt) throws SQLException {

write(__dbStmt, 0);

}

public int write(PreparedStatement __dbStmt, int __off) throws SQLException {

JdbcWritableBridge.writeString(year, 1 + __off, 12, __dbStmt);

JdbcWritableBridge.writeInteger(temperature, 2 + __off, 4, __dbStmt);

JdbcWritableBridge.writeInteger(quality, 3 + __off, 4, __dbStmt);

return 3;

}

public void readFields(DataInput __dataIn) throws IOException {

if (__dataIn.readBoolean()) {

this.year = null;

} else {

this.year = Text.readString(__dataIn);

}

if (__dataIn.readBoolean()) {

this.temperature = null;

} else {

this.temperature = Integer.valueOf(__dataIn.readInt());

}

if (__dataIn.readBoolean()) {

this.quality = null;

} else {

this.quality = Integer.valueOf(__dataIn.readInt());

}

}

public void write(DataOutput __dataOut) throws IOException {

if (null == this.year) {

__dataOut.writeBoolean(true);

} else {

__dataOut.writeBoolean(false);

Text.writeString(__dataOut, year);

}

if (null == this.temperature) {

__dataOut.writeBoolean(true);

} else {

__dataOut.writeBoolean(false);

__dataOut.writeInt(this.temperature);

}

if (null == this.quality) {

__dataOut.writeBoolean(true);

} else {

__dataOut.writeBoolean(false);

__dataOut.writeInt(this.quality);

}

}

private final DelimiterSet __outputDelimiters = new DelimiterSet((char) 9, (char) 10, (char) 0, (char) 0, false);

public String toString() {

return toString(__outputDelimiters, true);

}

public String toString(DelimiterSet delimiters) {

return toString(delimiters, true);

}

public String toString(boolean useRecordDelim) {

return toString(__outputDelimiters, useRecordDelim);

}

public String toString(DelimiterSet delimiters, boolean useRecordDelim) {

StringBuilder __sb = new StringBuilder();

char fieldDelim = delimiters.getFieldsTerminatedBy();

__sb.append(FieldFormatter.escapeAndEnclose(year==null?"null":year, delimiters));

__sb.append(fieldDelim);

__sb.append(FieldFormatter.escapeAndEnclose(temperature==null?"null":"" + temperature, delimiters));

__sb.append(fieldDelim);

__sb.append(FieldFormatter.escapeAndEnclose(quality==null?"null":"" + quality, delimiters));

if (useRecordDelim) {

__sb.append(delimiters.getLinesTerminatedBy());

}

return __sb.toString();

}

private final DelimiterSet __inputDelimiters = new DelimiterSet((char) 9, (char) 10, (char) 0, (char) 0, false);

private RecordParser __parser;

public void parse(Text __record) throws RecordParser.ParseError {

if (null == this.__parser) {

this.__parser = new RecordParser(__inputDelimiters);

}

List __fields = this.__parser.parseRecord(__record);

__loadFromFields(__fields);

}

public void parse(CharSequence __record) throws RecordParser.ParseError {

if (null == this.__parser) {

this.__parser = new RecordParser(__inputDelimiters);

}

List __fields = this.__parser.parseRecord(__record);

__loadFromFields(__fields);

}

public void parse(byte [] __record) throws RecordParser.ParseError {

if (null == this.__parser) {

this.__parser = new RecordParser(__inputDelimiters);

}

List __fields = this.__parser.parseRecord(__record);

__loadFromFields(__fields);

}

public void parse(char [] __record) throws RecordParser.ParseError {

if (null == this.__parser) {

this.__parser = new RecordParser(__inputDelimiters);

}

List __fields = this.__parser.parseRecord(__record);

__loadFromFields(__fields);

}

public void parse(ByteBuffer __record) throws RecordParser.ParseError {

if (null == this.__parser) {

this.__parser = new RecordParser(__inputDelimiters);

}

List __fields = this.__parser.parseRecord(__record);

__loadFromFields(__fields);

}

public void parse(CharBuffer __record) throws RecordParser.ParseError {

if (null == this.__parser) {

this.__parser = new RecordParser(__inputDelimiters);

}

List __fields = this.__parser.parseRecord(__record);

__loadFromFields(__fields);

}

private void __loadFromFields(List fields) {

Iterator __it = fields.listIterator();

String __cur_str;

__cur_str = __it.next();

if (__cur_str.equals("null")) { this.year = null; } else {

this.year = __cur_str;

}

__cur_str = __it.next();

if (__cur_str.equals("null") || __cur_str.length() == 0) { this.temperature = null; } else {

this.temperature = Integer.valueOf(__cur_str);

}

__cur_str = __it.next();

if (__cur_str.equals("null") || __cur_str.length() == 0) { this.quality = null; } else {

this.quality = Integer.valueOf(__cur_str);

}

}

public Object clone() throws CloneNotSupportedException {

temp o = (temp) super.clone();

return o;

}

public Map getFieldMap() {

Map __sqoop$field_map = new TreeMap();

__sqoop$field_map.put("year", this.year);

__sqoop$field_map.put("temperature", this.temperature);

__sqoop$field_map.put("quality", this.quality);

return __sqoop$field_map;

}

public void setField(String __fieldName, Object __fieldVal) {

if ("year".equals(__fieldName)) {

this.year = (String) __fieldVal;

}

else if ("temperature".equals(__fieldName)) {

this.temperature = (Integer) __fieldVal;

}

else if ("quality".equals(__fieldName)) {

this.quality = (Integer) __fieldVal;

}

else {

throw new RuntimeException("No such field: " + __fieldName);

}

}

}

由源碼可見,生成的實體類實現了DBWritable接口的序列化方法,,如圖1所示,這些方法能使temp類和JDBC進行交互:

深入理解Sqoop的導入與導出—文末贈書

圖1 DBWritable接口方法

JDBC的Resultset接口提供了一個用於從查詢結果中檢索記錄的遊標,這裡的readFields()方法將用ResultSet中一行數據的列來填充temp對象的字段。write()方法允許Sqoop將新的temp行插入表,這個過程稱為“導出”。

Sqoop啟動的MapReduce作業用到一個InputFormat,它可以通過JDBC從一個數據庫表中讀取部分內容。Hadoop提供的DataDBInputFormat能夠為幾個Map任務對查詢結果進行劃分。但是,為了獲得更好的導入性能,經常將這樣的查詢劃分到多個節點上執行。查詢是根據一個“劃分列”來進行劃分的。根據表的元數據,Sqoop會選擇一個合適的列作為劃分列(通常是表的主鍵)。主鍵列中的最小值和最大值會被讀出,與目標任務數一起用來確定每個Map任務要執行的查詢。

例如,假設表中有100,000條記錄,其id列的值為0-99,999。

在導入這張表時,Sqoop會判斷出id是表的主鍵列。啟動MapReduce作業時,用來執行導入的DataDrivenDBInputFormat便會發出一條類似於SELECT MIN(id),MAX(id) FROM table的查詢語句。檢索出的數據將用於對整個數據集進行劃分。

假設指定並行運行5個Map任務(使用-m 5),這樣便可以確定每個Map任務要執行的查詢數據條件。劃分列的選擇是影響並行執行效率的重要因素。如果id列的值不是均勻分佈的(也許在id值50,000到75,000的範圍內沒有記錄),那麼有一部分Map任務可能只有很少或沒有工作要做,而其他任務則有很多工作要做。

在運行一個導入作業時,用戶可以指定一個列作為劃分列,從而調整作業的劃分使其符合數據的真實分佈。如果使用-m 1參數來讓一個任務執行導入作業,就不再需要這個劃分過程。

在生成反序列化代碼和配置InputFormat之後,Sqoop將作業發送到MapReduce集群。Map任務執行查詢並且將ResultSet中的數據反序列化到生成類的實例,這些數據要麼被直接保存在SequenceFile文件中,要麼在寫到HDFS之前被轉換成分隔的文本。

為了使用導入記錄的個別字段,必須對字段分隔符(以及轉義/包圍字符)進行解析,抽出字段的值並轉換為相應的數據類型。例如,在文本文件中,氣候質量被表示成字符串“1",但必須被解析為Java的Integer或int類型的變量。Sqoop生成的實體類能夠自動完成這個過程,使開發人員可以將精力集中在真正要運行的MapReduce作業上。每個自動生成的類都有幾個名為parse()的重載方法,這些方法可以對錶示為Text、Charsequence、char[]或其他常見類型的數據進行操作。

本文摘編自《大數據Hadoop 3.X分佈式處理實戰》,經出版方授權發佈。

深入理解Sqoop的導入與導出—文末贈書


編輯推薦:

1.版本新。本書採用Hadoop3,版本較新,幫助讀者學習前沿技術。

2.項目大。深度剖析日誌分析、推薦系統、垃圾消息三大企業級項目實戰案例。讀者稍加改造,即可在生產環境中使用。

3.內容全。詳細介紹HDFS、MapReduce、HBase、Hive、Sqoop、Spark 等主流大數據工具。

4.資源多。贈送12小時視頻講解和全書配套範例源碼。

留言說出與hadoop相關的話題,精選留言一名評論送出此書。機會多多,不可錯過!


分享到:


相關文章: