Sqoop在導入導出數據時都會使用代碼生成器來創建對應表的實體類,用於保存從表中抽取的記錄,默認存放於“/tmp/sqoop-root/compile/隨機字符/”目錄下,用戶必須手動將此目錄下的實體類jar包上傳至Sqoop的lib目錄下,才可執行導入導出操作。研究Sqoop生成的實體類有助於我們深入理解Sqoop。
例如,之前用到的temp類生成的源碼如下:
// ORM class for table 'temp'
// WARNING: This class is AUTO-GENERATED. Modify at your own risk.
//
// Debug information:
// Generated date: Sun Dec 16 05:30:10 PST 2018
// For connector: org.apache.sqoop.manager.MySQLManager
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import com.cloudera.sqoop.lib.JdbcWritableBridge;
import com.cloudera.sqoop.lib.DelimiterSet;
import com.cloudera.sqoop.lib.FieldFormatter;
import com.cloudera.sqoop.lib.RecordParser;
import com.cloudera.sqoop.lib.BooleanParser;
import com.cloudera.sqoop.lib.BlobRef;
import com.cloudera.sqoop.lib.ClobRef;
import com.cloudera.sqoop.lib.LargeObjectLoader;
import com.cloudera.sqoop.lib.SqoopRecord;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class temp extends SqoopRecord implements DBWritable, Writable {
private final int PROTOCOL_VERSION = 3;
public int getClassFormatVersion() { return PROTOCOL_VERSION; }
protected ResultSet __cur_result_set;
private String year;
public String get_year() {
return year;
}
public void set_year(String year) {
this.year = year;
}
public temp with_year(String year) {
this.year = year;
return this;
}
private Integer temperature;
public Integer get_temperature() {
return temperature;
}
public void set_temperature(Integer temperature) {
this.temperature = temperature;
}
public temp with_temperature(Integer temperature) {
this.temperature = temperature;
return this;
}
private Integer quality;
public Integer get_quality() {
return quality;
}
public void set_quality(Integer quality) {
this.quality = quality;
}
public temp with_quality(Integer quality) {
this.quality = quality;
return this;
}
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof temp)) {
return false;
}
temp that = (temp) o;
boolean equal = true;
equal = equal && (this.year == null ? that.year == null : this.year.equals(that.year));
equal = equal && (this.temperature == null ? that.temperature == null : this.temperature.equals(that.temperature));
equal = equal && (this.quality == null ? that.quality == null : this.quality.equals(that.quality));
return equal;
}
public void readFields(ResultSet __dbResults) throws SQLException {
this.__cur_result_set = __dbResults;
this.year = JdbcWritableBridge.readString(1, __dbResults);
this.temperature = JdbcWritableBridge.readInteger(2, __dbResults);
this.quality = JdbcWritableBridge.readInteger(3, __dbResults);
}
public void loadLargeObjects(LargeObjectLoader __loader)
throws SQLException, IOException, InterruptedException {
}
public void write(PreparedStatement __dbStmt) throws SQLException {
write(__dbStmt, 0);
}
public int write(PreparedStatement __dbStmt, int __off) throws SQLException {
JdbcWritableBridge.writeString(year, 1 + __off, 12, __dbStmt);
JdbcWritableBridge.writeInteger(temperature, 2 + __off, 4, __dbStmt);
JdbcWritableBridge.writeInteger(quality, 3 + __off, 4, __dbStmt);
return 3;
}
public void readFields(DataInput __dataIn) throws IOException {
if (__dataIn.readBoolean()) {
this.year = null;
} else {
this.year = Text.readString(__dataIn);
}
if (__dataIn.readBoolean()) {
this.temperature = null;
} else {
this.temperature = Integer.valueOf(__dataIn.readInt());
}
if (__dataIn.readBoolean()) {
this.quality = null;
} else {
this.quality = Integer.valueOf(__dataIn.readInt());
}
}
public void write(DataOutput __dataOut) throws IOException {
if (null == this.year) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
Text.writeString(__dataOut, year);
}
if (null == this.temperature) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.temperature);
}
if (null == this.quality) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.quality);
}
}
private final DelimiterSet __outputDelimiters = new DelimiterSet((char) 9, (char) 10, (char) 0, (char) 0, false);
public String toString() {
return toString(__outputDelimiters, true);
}
public String toString(DelimiterSet delimiters) {
return toString(delimiters, true);
}
public String toString(boolean useRecordDelim) {
return toString(__outputDelimiters, useRecordDelim);
}
public String toString(DelimiterSet delimiters, boolean useRecordDelim) {
StringBuilder __sb = new StringBuilder();
char fieldDelim = delimiters.getFieldsTerminatedBy();
__sb.append(FieldFormatter.escapeAndEnclose(year==null?"null":year, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(temperature==null?"null":"" + temperature, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(quality==null?"null":"" + quality, delimiters));
if (useRecordDelim) {
__sb.append(delimiters.getLinesTerminatedBy());
}
return __sb.toString();
}
private final DelimiterSet __inputDelimiters = new DelimiterSet((char) 9, (char) 10, (char) 0, (char) 0, false);
private RecordParser __parser;
public void parse(Text __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(CharSequence __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(byte [] __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(char [] __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(ByteBuffer __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(CharBuffer __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
private void __loadFromFields(List fields) {
Iterator __it = fields.listIterator();
String __cur_str;
__cur_str = __it.next();
if (__cur_str.equals("null")) { this.year = null; } else {
this.year = __cur_str;
}
__cur_str = __it.next();
if (__cur_str.equals("null") || __cur_str.length() == 0) { this.temperature = null; } else {
this.temperature = Integer.valueOf(__cur_str);
}
__cur_str = __it.next();
if (__cur_str.equals("null") || __cur_str.length() == 0) { this.quality = null; } else {
this.quality = Integer.valueOf(__cur_str);
}
}
public Object clone() throws CloneNotSupportedException {
temp o = (temp) super.clone();
return o;
}
public Map getFieldMap() {
Map __sqoop$field_map = new TreeMap();
__sqoop$field_map.put("year", this.year);
__sqoop$field_map.put("temperature", this.temperature);
__sqoop$field_map.put("quality", this.quality);
return __sqoop$field_map;
}
public void setField(String __fieldName, Object __fieldVal) {
if ("year".equals(__fieldName)) {
this.year = (String) __fieldVal;
}
else if ("temperature".equals(__fieldName)) {
this.temperature = (Integer) __fieldVal;
}
else if ("quality".equals(__fieldName)) {
this.quality = (Integer) __fieldVal;
}
else {
throw new RuntimeException("No such field: " + __fieldName);
}
}
}
由源碼可見,生成的實體類實現了DBWritable接口的序列化方法,,如圖1所示,這些方法能使temp類和JDBC進行交互:
JDBC的Resultset接口提供了一個用於從查詢結果中檢索記錄的遊標,這裡的readFields()方法將用ResultSet中一行數據的列來填充temp對象的字段。write()方法允許Sqoop將新的temp行插入表,這個過程稱為“導出”。
Sqoop啟動的MapReduce作業用到一個InputFormat,它可以通過JDBC從一個數據庫表中讀取部分內容。Hadoop提供的DataDBInputFormat能夠為幾個Map任務對查詢結果進行劃分。但是,為了獲得更好的導入性能,經常將這樣的查詢劃分到多個節點上執行。查詢是根據一個“劃分列”來進行劃分的。根據表的元數據,Sqoop會選擇一個合適的列作為劃分列(通常是表的主鍵)。主鍵列中的最小值和最大值會被讀出,與目標任務數一起用來確定每個Map任務要執行的查詢。
例如,假設表中有100,000條記錄,其id列的值為0-99,999。
在導入這張表時,Sqoop會判斷出id是表的主鍵列。啟動MapReduce作業時,用來執行導入的DataDrivenDBInputFormat便會發出一條類似於SELECT MIN(id),MAX(id) FROM table的查詢語句。檢索出的數據將用於對整個數據集進行劃分。
假設指定並行運行5個Map任務(使用-m 5),這樣便可以確定每個Map任務要執行的查詢數據條件。劃分列的選擇是影響並行執行效率的重要因素。如果id列的值不是均勻分佈的(也許在id值50,000到75,000的範圍內沒有記錄),那麼有一部分Map任務可能只有很少或沒有工作要做,而其他任務則有很多工作要做。
在運行一個導入作業時,用戶可以指定一個列作為劃分列,從而調整作業的劃分使其符合數據的真實分佈。如果使用-m 1參數來讓一個任務執行導入作業,就不再需要這個劃分過程。
在生成反序列化代碼和配置InputFormat之後,Sqoop將作業發送到MapReduce集群。Map任務執行查詢並且將ResultSet中的數據反序列化到生成類的實例,這些數據要麼被直接保存在SequenceFile文件中,要麼在寫到HDFS之前被轉換成分隔的文本。
為了使用導入記錄的個別字段,必須對字段分隔符(以及轉義/包圍字符)進行解析,抽出字段的值並轉換為相應的數據類型。例如,在文本文件中,氣候質量被表示成字符串“1",但必須被解析為Java的Integer或int類型的變量。Sqoop生成的實體類能夠自動完成這個過程,使開發人員可以將精力集中在真正要運行的MapReduce作業上。每個自動生成的類都有幾個名為parse()的重載方法,這些方法可以對錶示為Text、Charsequence、char[]或其他常見類型的數據進行操作。
本文摘編自《大數據Hadoop 3.X分佈式處理實戰》,經出版方授權發佈。
編輯推薦:
1.版本新。本書採用Hadoop3,版本較新,幫助讀者學習前沿技術。
2.項目大。深度剖析日誌分析、推薦系統、垃圾消息三大企業級項目實戰案例。讀者稍加改造,即可在生產環境中使用。
3.內容全。詳細介紹HDFS、MapReduce、HBase、Hive、Sqoop、Spark 等主流大數據工具。
4.資源多。贈送12小時視頻講解和全書配套範例源碼。
留言說出與hadoop相關的話題,精選留言一名評論送出此書。機會多多,不可錯過!