源碼剖析-數據同步神器Sqoop與Datax的對比分析

前言

在數據同步見過sqoop,datax,hdata,filesync,這四個工具。分析內部的實現邏輯各有巧思,也收穫良多。 Sqoop1和sqoop2 底層數據同步的基本原理是一樣的,所以我選擇了sqoop1的源碼作為分析的切入點。datax和hdata在架構上大同小異,hdata在數據傳輸借鑑了Disruptor一些內部實現,從Disruptor官網文檔介紹讓人對的原理及其感興趣。好東西留待以後慢慢研究,所以選擇datax更為一般的實現。filesync是公司內部開發的一個文件同步工具,用於遠程文件和hdfs的文件同步

Sqoop

導入

sqoop從數據庫同步到hdfs有倆種方式,1,JDBC的連接。2,使用數據庫提供的工具我們以mysql為例子同步過程分為三個步驟:1,對數據分片;2,讀取數據;3,寫入數據

JDBC

對數據分片採用通用方式,用count聚合函數獲取需要同步的數據量nums,獲取設置的map數m,nums/m就是每個map需要同步的數據量,見下面代碼:

<code>//1,對數據進行分片
@Override
public List...

statement = connection.createStatement();
//獲取nums
results = statement.executeQuery(getCountQuery());
long count = results.getLong(1);
int chunks = ConfigurationHelper.getJobNumMaps(job);
long chunkSize = (count / chunks);
...
for (int i = 0; i < chunks; i++) {
DBInputSplit split;

if ((i + 1) == chunks) {
split = new DBInputSplit(i * chunkSize, count);
} else {
split = new DBInputSplit(i * chunkSize, (i * chunkSize)
+ chunkSize);
}

splits.add(split);
}

...
}
}

protected String getCountQuery() {

if (dbConf.getInputCountQuery() != null) {
return dbConf.getInputCountQuery();
}

StringBuilder query = new StringBuilder();
query.append("SELECT COUNT(*) FROM " + tableName);

if (conditions != null && conditions.length() > 0) {
query.append(" WHERE " + conditions);
}
return query.toString();
}
//2,讀取數據庫中的數據
// 根據構造函數創建select 語句,sqoop中分為三種oracle,db2,通用
protected String getSelectQuery() {
StringBuilder query = new StringBuilder();

// Default codepath for MySQL, HSQLDB, etc.
// Relies on LIMIT/OFFSET for splits.
if (dbConf.getInputQuery() == null) {
query.append("SELECT ");

for (int i = 0; i < fieldNames.length; i++) {
query.append(fieldNames[i]);
if (i != fieldNames.length -1) {
query.append(", ");
}
}

query.append(" FROM ").append(tableName);
query.append(" AS ").append(tableName); //in hsqldb this is necessary
if (conditions != null && conditions.length() > 0) {
query.append(" WHERE (").append(conditions).append(")");
}

String orderBy = dbConf.getInputOrderBy();
if (orderBy != null && orderBy.length() > 0) {
query.append(" ORDER BY ").append(orderBy);
}
} else {
//PREBUILT QUERY
query.append(dbConf.getInputQuery());

}

try {
query.append(" LIMIT ").append(split.getLength());
query.append(" OFFSET ").append(split.getStart());
} catch (IOException ex) {
// Ignore, will not throw.
}

return query.toString();
}
// 3,寫入hdfs,採用一般的context.write/<inputsplit>/<code>

從讀取數據庫中夠著的語句看得出作者是有一番琢磨的,但是個人對構造的數據庫語句的執行性能表示不大滿意,也可能是出於通用寫法,也可能是作者對數據不太瞭解,eg:在oracle中可以加入hint採用直接路徑讀方式,效率可以提升一個量級。

數據庫客戶端工具

採用mysql提供的客戶端工具sqldump,要使用的前提是集群的機子有安裝mysql 客戶端。

<code>// 分片獲取的上下限的方式,如下:
protected String getBoundingValsQuery() {
// If the user has provided a query, use that instead.
String userQuery = getDBConf().getInputBoundingQuery();
if (null != userQuery) {
return userQuery;
}

// Auto-generate one based on the table name we've been provided with.
StringBuilder query = new StringBuilder();

String splitCol = getDBConf().getInputOrderBy();
query.append("SELECT MIN(").append(splitCol).append("), ");
query.append("MAX(").append(splitCol).append(") FROM ");
query.append(getDBConf().getInputTableName());

String conditions = getDBConf().getInputConditions();
if (null != conditions) {
query.append(" WHERE ( " + conditions + " )");
}

return query.toString();
}
//2,讀取數據,這是最特別的實現,通過構造mysqldump語句導出數據
public void map(String splitConditions, NullWritable val, Context context)
throws IOException, InterruptedException {

LOG.info("Beginning mysqldump fast path import");

ArrayList<string> args = new ArrayList<string>();
String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY);

// We need to parse the connect string URI to determine the database name.
// Using java.net.URL directly on the connect string will fail because
// Java doesn't respect arbitrary JDBC-based schemes. So we chop off the
// scheme (everything before '://') and replace it with 'http', which we
// know will work.
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);

if (null == databaseName) {
throw new IOException("Could not determine database name");
}

LOG.info("Performing import of table " + tableName + " from database "
+ databaseName);

args.add(MySQLUtils.MYSQL_DUMP_CMD); // requires that this is on the path.

String password = DBConfiguration.getPassword((JobConf) conf);
String passwordFile = null;

Process p = null;
AsyncSink sink = null;
AsyncSink errSink = null;
PerfCounters counters = new PerfCounters();
try {
// --defaults-file must be the first argument.
if (null != password && password.length() > 0) {
passwordFile = MySQLUtils.writePasswordFile(conf);
args.add("--defaults-file=" + passwordFile);
}


// Don't use the --where="<whereclause>" version because spaces in it can
// confuse Java, and adding in surrounding quotes confuses Java as well.
String whereClause = conf.get(MySQLUtils.WHERE_CLAUSE_KEY, "(1=1)")
+ " AND (" + splitConditions + ")";
args.add("-w");
args.add(whereClause);

args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
}
args.add("--skip-opt");
args.add("--compact");
args.add("--no-create-db");
args.add("--no-create-info");
args.add("--quick"); // no buffering
args.add("--single-transaction");

String username = conf.get(MySQLUtils.USERNAME_KEY);
if (null != username) {
args.add("--user=" + username);
}

// If the user supplied extra args, add them here.
String [] extra = conf.getStrings(MySQLUtils.EXTRA_ARGS_KEY);
if (null != extra) {
for (String arg : extra) {
args.add(arg);
}
}

args.add(databaseName);
args.add(tableName);
...

// Actually start the mysqldump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
\t...
}
// 3,寫入hdfs,這個也是值得說一說地方,作者將導出後的數據,切分成 A,B,C 這種格式(分隔符,可依據傳入的參數指定)然後在統一推到hdfs。
private static class CopyingStreamThread extends ErrorableThread {
public static final Log LOG = LogFactory.getLog(
CopyingStreamThread.class.getName());
\t...

public void run() {
BufferedReader r = null;
...
r = new BufferedReader(new InputStreamReader(this.stream));

// Actually do the read/write transfer loop here.
int preambleLen = -1; // set to this for "undefined"
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF.
}

if (inLine.trim().length() == 0 || inLine.startsWith("--")) {
continue; // comments and empty lines are ignored
}

// this line is of the form "INSERT .. VALUES ( actual value text
// );" strip the leading preamble up to the '(' and the trailing
// ');'.
if (preambleLen == -1) {
// we haven't determined how long the preamble is. It's constant
// across all lines, so just figure this out once.
String recordStartMark = "VALUES (";
preambleLen = inLine.indexOf(recordStartMark)
+ recordStartMark.length();
}

// chop off the leading and trailing text as we write the
// output to HDFS.
int len = inLine.length() - 2 - preambleLen;
context.write(inLine.substring(preambleLen, inLine.length() - 2)
+ "\\n", null);
counters.addBytes(1 + len);
}
...
}
}
}/<whereclause>/<string>/<string>/<code>

導出

sqoop 還是提供倆種方式 1,jdbc;2,客戶端導出

JDBC

使用jdbc導出,使用客戶端的insert語句,批量插入。比較一般,具體可見jdbc批量插入相關章節

使用客戶端工具

mapreduce 先將hdfs數據寫入到本地文件路徑,按表名命名的文件,mysqlimport 讀取本地文件將輸入導出到mysql

<code>private void initMySQLImportProcess() throws IOException {
File taskAttemptDir = TaskId.getLocalWorkPath(conf);

this.fifoFile = new File(taskAttemptDir,
conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
String filename = fifoFile.toString();

// Create the FIFO itself.
try {
new NamedFifo(this.fifoFile).create();
} catch (IOException ioe) {
// Command failed.
LOG.error("Could not mknod " + filename);
this.fifoFile = null;
throw new IOException(
"Could not create FIFO to interface with mysqlimport", ioe);
}

// Now open the connection to mysqlimport.
ArrayList<string> args = new ArrayList<string>();

String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);

if (null == databaseName) {
throw new IOException("Could not determine database name");
}

args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
String password = DBConfiguration.getPassword((JobConf) conf);

if (null != password && password.length() > 0) {
passwordFile = new File(MySQLUtils.writePasswordFile(conf));
args.add("--defaults-file=" + passwordFile);
}

String username = conf.get(MySQLUtils.USERNAME_KEY);

if (null != username) {
args.add("--user=" + username);
}

args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
}

args.add("--compress");
args.add("--local");
args.add("--silent");

// Specify the subset of columns we're importing.
DBConfiguration dbConf = new DBConfiguration(conf);
String [] cols = dbConf.getInputFieldNames();
if (null != cols) {
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String col : cols) {
if (!first) {
sb.append(",");
}
sb.append(col);
first = false;
}

args.add("--columns=" + sb.toString());
}

// Specify the delimiters to use.
int outputFieldDelim = conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
(int) ',');
int outputRecordDelim = conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
(int) '\\n');
int enclosedBy = conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0);
int escapedBy = conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0);
boolean encloseRequired = conf.getBoolean(
MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);

args.add("--fields-terminated-by=0x"
+ Integer.toString(outputFieldDelim, 16));
args.add("--lines-terminated-by=0x"
+ Integer.toString(outputRecordDelim, 16));
if (0 != enclosedBy) {
if (encloseRequired) {
args.add("--fields-enclosed-by=0x" + Integer.toString(enclosedBy, 16));
} else {
args.add("--fields-optionally-enclosed-by=0x"
+ Integer.toString(enclosedBy, 16));

}
}

if (0 != escapedBy) {
args.add("--escaped-by=0x" + Integer.toString(escapedBy, 16));
}

// These two arguments are positional and must be last.
args.add(databaseName);
args.add(filename);

...

// Actually start mysqlimport.
mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
...

}/<string>/<string>/<code>

datax

這是阿里開源了一個單機版本的源碼,想要達成的一個目標,所有數據的交換都只要datax提供一種通用的接口就可以,用起來簡單,不需要開發人員在學習mysql,oracle,mapreduce等的代碼編寫,想法很棒。結構如下:

源碼剖析-數據同步神器Sqoop與Datax的對比分析

數據的傳輸全部依賴內存,實現基本原理類似flume,memorychanne。倆者都有數據丟失可能,在異常情況下。有興趣的同學可以去看看flume的源碼。回到導入導出的話題。 在導入和導出,datax提供了一個統一的模型採用jdbc方式去鏈接。

<code>public void startRead(Configuration readerSliceConfig,
RecordSender recordSender,
TaskPluginCollector taskPluginCollector, int fetchSize) {
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
String table = readerSliceConfig.getString(Key.TABLE);

PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);

LOG.info("Begin to read record by Sql: [{}\\n] {}.",
querySql, basicMsg);
PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
queryPerfRecord.start();

Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
username, password);

// session config .etc related
DBUtil.dealWithSessionConfig(conn, readerSliceConfig,
this.dataBaseType, basicMsg);

int columnNumber = 0;
ResultSet rs = null;
try {
rs = DBUtil.query(conn, querySql, fetchSize);
queryPerfRecord.end();

ResultSetMetaData metaData = rs.getMetaData();
columnNumber = metaData.getColumnCount();

//這個統計乾淨的result_Next時間
PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
allResultPerfRecord.start();

long rsNextUsedTime = 0;
long lastTime = System.nanoTime();
while (rs.next()) {
rsNextUsedTime += (System.nanoTime() - lastTime);

this.transportOneRecord(recordSender, rs,
metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
lastTime = System.nanoTime();
}

allResultPerfRecord.end(rsNextUsedTime);
//目前大盤是依賴這個打印,而之前這個Finish read record是包含了sql查詢和result next的全部時間
LOG.info("Finished read record by Sql: [{}\\n] {}.",
querySql, basicMsg);

}catch (Exception e) {
throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {
DBUtil.closeDBResources(null, conn);
}
}/<code>

總結

sqoop和datax都採用插件的方式方便用戶進行開發新的讀寫插件,但是基於現有的代碼來看,sqoop的direct模式的性能會比datax更高,而且sqoop是站在大象的肩膀上,穩定性會比單機版的datax來得高

擴展

  • 構建分佈式集群的datax構思,
源碼剖析-數據同步神器Sqoop與Datax的對比分析

  • 突發奇想,如果datax不是單機版的我會考慮使用她?簡單構思了一下,畫了一個草圖,來和大家進行討論討論,基本流程:每個node上都部署了datax的服務(需要封裝一個在線服務用於啟動datax),定時將node狀態信息(狀態是否存活,內存,cpu負載,等信息)上報到consul配置中心。配置中心的配置由資源管理模塊統一進行管理,當作業提交一個導出導出的請求,先計算分片的數量,以及每個分片需要使用的資源,然後向資源管理模塊申請資源,作業管理根據申請到的資源,將啟動作業發送到服務註冊中心,去啟動作業,當作業發生異常情況,反饋給作業管理,進入下一個的待調度列表。
  • java調用shell,命令需要考慮到倆個東西,輸入流和異常流。後者是最容易忽略的地方,直接關係著程序的健壯性,sqoop客戶端導入導出給我們提供了一個標準的例子。
  • 對數據拆分,然後進行並行操作,這在數據處理領域是一個比較常見的事,共享一個比較經典的例子。
  • <code>import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;

    public class AccumulateNum implements Serializable {



    public final static List<integer> LST_NUM=new ArrayList();

    static {
    for(int i=1;i<=100000;i++){
    LST_NUM.add(i);
    }
    }

    public static class AccumulateTask extends RecursiveTask<long> {

    //fork的粒度
    private final static int THRESHOLD=1000;
    private int start;
    private int end;
    public AccumulateTask(int start,int end){
    this.start=start;
    this.end=end;
    }

    @Override
    public Long compute() {
    long sum=0;
    if(end<=start)return sum;

    if(end-start<threshold> //拆分到理想的粒度進行求和
    for(int i=start;i<=end;++i){
    sum+= LST_NUM.get(i);
    }
    }else {
    //繼續拆分所有求和的數組
    int mid=(start+end)/2;
    AccumulateTask left = new AccumulateTask(start, mid);
    left.fork();
    AccumulateTask right = new AccumulateTask(mid + 1, end);
    right.fork();
    sum=left.join();
    sum += right.join();
    }
    return sum;
    }
    }

    public static void main(String[] args) throws Exception {

    int cpu_num=Runtime.getRuntime().availableProcessors();
    ForkJoinPool pool = new ForkJoinPool();

    Long startTime=System.nanoTime();
    Future<long> future =pool.submit(new AccumulateTask(0,LST_NUM.size()-1));
    Long endTime=System.nanoTime();
    System.out.println(future.get());
    System.out.println(endTime-startTime+"ns");
    pool.shutdown();
    pool.awaitTermination(1, TimeUnit.SECONDS);

    }
    }/<long>/<threshold>/<long>/<integer>/<code>


    專注於技術熱點大數據,人工智能,JAVA、Python、 C 、GO、Javascript等語言最新前言技術,及業務痛點問題分析,請關注【編程我最懂】共同交流學習。


    分享到:


    相關文章: