源码剖析-数据同步神器Sqoop与Datax的对比分析

前言

在数据同步见过sqoop,datax,hdata,filesync,这四个工具。分析内部的实现逻辑各有巧思,也收获良多。 Sqoop1和sqoop2 底层数据同步的基本原理是一样的,所以我选择了sqoop1的源码作为分析的切入点。datax和hdata在架构上大同小异,hdata在数据传输借鉴了Disruptor一些内部实现,从Disruptor官网文档介绍让人对的原理及其感兴趣。好东西留待以后慢慢研究,所以选择datax更为一般的实现。filesync是公司内部开发的一个文件同步工具,用于远程文件和hdfs的文件同步

Sqoop

导入

sqoop从数据库同步到hdfs有俩种方式,1,JDBC的连接。2,使用数据库提供的工具我们以mysql为例子同步过程分为三个步骤:1,对数据分片;2,读取数据;3,写入数据

JDBC

对数据分片采用通用方式,用count聚合函数获取需要同步的数据量nums,获取设置的map数m,nums/m就是每个map需要同步的数据量,见下面代码:

<code>//1,对数据进行分片
@Override
public List...

statement = connection.createStatement();
//获取nums
results = statement.executeQuery(getCountQuery());
long count = results.getLong(1);
int chunks = ConfigurationHelper.getJobNumMaps(job);
long chunkSize = (count / chunks);
...
for (int i = 0; i < chunks; i++) {
DBInputSplit split;

if ((i + 1) == chunks) {
split = new DBInputSplit(i * chunkSize, count);
} else {
split = new DBInputSplit(i * chunkSize, (i * chunkSize)
+ chunkSize);
}

splits.add(split);
}

...
}
}

protected String getCountQuery() {

if (dbConf.getInputCountQuery() != null) {
return dbConf.getInputCountQuery();
}

StringBuilder query = new StringBuilder();
query.append("SELECT COUNT(*) FROM " + tableName);

if (conditions != null && conditions.length() > 0) {
query.append(" WHERE " + conditions);
}
return query.toString();
}
//2,读取数据库中的数据
// 根据构造函数创建select 语句,sqoop中分为三种oracle,db2,通用
protected String getSelectQuery() {
StringBuilder query = new StringBuilder();

// Default codepath for MySQL, HSQLDB, etc.
// Relies on LIMIT/OFFSET for splits.
if (dbConf.getInputQuery() == null) {
query.append("SELECT ");

for (int i = 0; i < fieldNames.length; i++) {
query.append(fieldNames[i]);
if (i != fieldNames.length -1) {
query.append(", ");
}
}

query.append(" FROM ").append(tableName);
query.append(" AS ").append(tableName); //in hsqldb this is necessary
if (conditions != null && conditions.length() > 0) {
query.append(" WHERE (").append(conditions).append(")");
}

String orderBy = dbConf.getInputOrderBy();
if (orderBy != null && orderBy.length() > 0) {
query.append(" ORDER BY ").append(orderBy);
}
} else {
//PREBUILT QUERY
query.append(dbConf.getInputQuery());

}

try {
query.append(" LIMIT ").append(split.getLength());
query.append(" OFFSET ").append(split.getStart());
} catch (IOException ex) {
// Ignore, will not throw.
}

return query.toString();
}
// 3,写入hdfs,采用一般的context.write/<inputsplit>/<code>

从读取数据库中够着的语句看得出作者是有一番琢磨的,但是个人对构造的数据库语句的执行性能表示不大满意,也可能是出于通用写法,也可能是作者对数据不太了解,eg:在oracle中可以加入hint采用直接路径读方式,效率可以提升一个量级。

数据库客户端工具

采用mysql提供的客户端工具sqldump,要使用的前提是集群的机子有安装mysql 客户端。

<code>// 分片获取的上下限的方式,如下:
protected String getBoundingValsQuery() {
// If the user has provided a query, use that instead.
String userQuery = getDBConf().getInputBoundingQuery();
if (null != userQuery) {
return userQuery;
}

// Auto-generate one based on the table name we've been provided with.
StringBuilder query = new StringBuilder();

String splitCol = getDBConf().getInputOrderBy();
query.append("SELECT MIN(").append(splitCol).append("), ");
query.append("MAX(").append(splitCol).append(") FROM ");
query.append(getDBConf().getInputTableName());

String conditions = getDBConf().getInputConditions();
if (null != conditions) {
query.append(" WHERE ( " + conditions + " )");
}

return query.toString();
}
//2,读取数据,这是最特别的实现,通过构造mysqldump语句导出数据
public void map(String splitConditions, NullWritable val, Context context)
throws IOException, InterruptedException {

LOG.info("Beginning mysqldump fast path import");

ArrayList<string> args = new ArrayList<string>();
String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY);

// We need to parse the connect string URI to determine the database name.
// Using java.net.URL directly on the connect string will fail because
// Java doesn't respect arbitrary JDBC-based schemes. So we chop off the
// scheme (everything before '://') and replace it with 'http', which we
// know will work.
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);

if (null == databaseName) {
throw new IOException("Could not determine database name");
}

LOG.info("Performing import of table " + tableName + " from database "
+ databaseName);

args.add(MySQLUtils.MYSQL_DUMP_CMD); // requires that this is on the path.

String password = DBConfiguration.getPassword((JobConf) conf);
String passwordFile = null;

Process p = null;
AsyncSink sink = null;
AsyncSink errSink = null;
PerfCounters counters = new PerfCounters();
try {
// --defaults-file must be the first argument.
if (null != password && password.length() > 0) {
passwordFile = MySQLUtils.writePasswordFile(conf);
args.add("--defaults-file=" + passwordFile);
}


// Don't use the --where="<whereclause>" version because spaces in it can
// confuse Java, and adding in surrounding quotes confuses Java as well.
String whereClause = conf.get(MySQLUtils.WHERE_CLAUSE_KEY, "(1=1)")
+ " AND (" + splitConditions + ")";
args.add("-w");
args.add(whereClause);

args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
}
args.add("--skip-opt");
args.add("--compact");
args.add("--no-create-db");
args.add("--no-create-info");
args.add("--quick"); // no buffering
args.add("--single-transaction");

String username = conf.get(MySQLUtils.USERNAME_KEY);
if (null != username) {
args.add("--user=" + username);
}

// If the user supplied extra args, add them here.
String [] extra = conf.getStrings(MySQLUtils.EXTRA_ARGS_KEY);
if (null != extra) {
for (String arg : extra) {
args.add(arg);
}
}

args.add(databaseName);
args.add(tableName);
...

// Actually start the mysqldump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
\t...
}
// 3,写入hdfs,这个也是值得说一说地方,作者将导出后的数据,切分成 A,B,C 这种格式(分隔符,可依据传入的参数指定)然后在统一推到hdfs。
private static class CopyingStreamThread extends ErrorableThread {
public static final Log LOG = LogFactory.getLog(
CopyingStreamThread.class.getName());
\t...

public void run() {
BufferedReader r = null;
...
r = new BufferedReader(new InputStreamReader(this.stream));

// Actually do the read/write transfer loop here.
int preambleLen = -1; // set to this for "undefined"
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF.
}

if (inLine.trim().length() == 0 || inLine.startsWith("--")) {
continue; // comments and empty lines are ignored
}

// this line is of the form "INSERT .. VALUES ( actual value text
// );" strip the leading preamble up to the '(' and the trailing
// ');'.
if (preambleLen == -1) {
// we haven't determined how long the preamble is. It's constant
// across all lines, so just figure this out once.
String recordStartMark = "VALUES (";
preambleLen = inLine.indexOf(recordStartMark)
+ recordStartMark.length();
}

// chop off the leading and trailing text as we write the
// output to HDFS.
int len = inLine.length() - 2 - preambleLen;
context.write(inLine.substring(preambleLen, inLine.length() - 2)
+ "\\n", null);
counters.addBytes(1 + len);
}
...
}
}
}/<whereclause>/<string>/<string>/<code>

导出

sqoop 还是提供俩种方式 1,jdbc;2,客户端导出

JDBC

使用jdbc导出,使用客户端的insert语句,批量插入。比较一般,具体可见jdbc批量插入相关章节

使用客户端工具

mapreduce 先将hdfs数据写入到本地文件路径,按表名命名的文件,mysqlimport 读取本地文件将输入导出到mysql

<code>private void initMySQLImportProcess() throws IOException {
File taskAttemptDir = TaskId.getLocalWorkPath(conf);

this.fifoFile = new File(taskAttemptDir,
conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
String filename = fifoFile.toString();

// Create the FIFO itself.
try {
new NamedFifo(this.fifoFile).create();
} catch (IOException ioe) {
// Command failed.
LOG.error("Could not mknod " + filename);
this.fifoFile = null;
throw new IOException(
"Could not create FIFO to interface with mysqlimport", ioe);
}

// Now open the connection to mysqlimport.
ArrayList<string> args = new ArrayList<string>();

String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);

if (null == databaseName) {
throw new IOException("Could not determine database name");
}

args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
String password = DBConfiguration.getPassword((JobConf) conf);

if (null != password && password.length() > 0) {
passwordFile = new File(MySQLUtils.writePasswordFile(conf));
args.add("--defaults-file=" + passwordFile);
}

String username = conf.get(MySQLUtils.USERNAME_KEY);

if (null != username) {
args.add("--user=" + username);
}

args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
}

args.add("--compress");
args.add("--local");
args.add("--silent");

// Specify the subset of columns we're importing.
DBConfiguration dbConf = new DBConfiguration(conf);
String [] cols = dbConf.getInputFieldNames();
if (null != cols) {
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String col : cols) {
if (!first) {
sb.append(",");
}
sb.append(col);
first = false;
}

args.add("--columns=" + sb.toString());
}

// Specify the delimiters to use.
int outputFieldDelim = conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
(int) ',');
int outputRecordDelim = conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
(int) '\\n');
int enclosedBy = conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0);
int escapedBy = conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0);
boolean encloseRequired = conf.getBoolean(
MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);

args.add("--fields-terminated-by=0x"
+ Integer.toString(outputFieldDelim, 16));
args.add("--lines-terminated-by=0x"
+ Integer.toString(outputRecordDelim, 16));
if (0 != enclosedBy) {
if (encloseRequired) {
args.add("--fields-enclosed-by=0x" + Integer.toString(enclosedBy, 16));
} else {
args.add("--fields-optionally-enclosed-by=0x"
+ Integer.toString(enclosedBy, 16));

}
}

if (0 != escapedBy) {
args.add("--escaped-by=0x" + Integer.toString(escapedBy, 16));
}

// These two arguments are positional and must be last.
args.add(databaseName);
args.add(filename);

...

// Actually start mysqlimport.
mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
...

}/<string>/<string>/<code>

datax

这是阿里开源了一个单机版本的源码,想要达成的一个目标,所有数据的交换都只要datax提供一种通用的接口就可以,用起来简单,不需要开发人员在学习mysql,oracle,mapreduce等的代码编写,想法很棒。结构如下:

源码剖析-数据同步神器Sqoop与Datax的对比分析

数据的传输全部依赖内存,实现基本原理类似flume,memorychanne。俩者都有数据丢失可能,在异常情况下。有兴趣的同学可以去看看flume的源码。回到导入导出的话题。 在导入和导出,datax提供了一个统一的模型采用jdbc方式去链接。

<code>public void startRead(Configuration readerSliceConfig,
RecordSender recordSender,
TaskPluginCollector taskPluginCollector, int fetchSize) {
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
String table = readerSliceConfig.getString(Key.TABLE);

PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);

LOG.info("Begin to read record by Sql: [{}\\n] {}.",
querySql, basicMsg);
PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
queryPerfRecord.start();

Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
username, password);

// session config .etc related
DBUtil.dealWithSessionConfig(conn, readerSliceConfig,
this.dataBaseType, basicMsg);

int columnNumber = 0;
ResultSet rs = null;
try {
rs = DBUtil.query(conn, querySql, fetchSize);
queryPerfRecord.end();

ResultSetMetaData metaData = rs.getMetaData();
columnNumber = metaData.getColumnCount();

//这个统计干净的result_Next时间
PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
allResultPerfRecord.start();

long rsNextUsedTime = 0;
long lastTime = System.nanoTime();
while (rs.next()) {
rsNextUsedTime += (System.nanoTime() - lastTime);

this.transportOneRecord(recordSender, rs,
metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
lastTime = System.nanoTime();
}

allResultPerfRecord.end(rsNextUsedTime);
//目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间
LOG.info("Finished read record by Sql: [{}\\n] {}.",
querySql, basicMsg);

}catch (Exception e) {
throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {
DBUtil.closeDBResources(null, conn);
}
}/<code>

总结

sqoop和datax都采用插件的方式方便用户进行开发新的读写插件,但是基于现有的代码来看,sqoop的direct模式的性能会比datax更高,而且sqoop是站在大象的肩膀上,稳定性会比单机版的datax来得高

扩展

  • 构建分布式集群的datax构思,
源码剖析-数据同步神器Sqoop与Datax的对比分析

  • 突发奇想,如果datax不是单机版的我会考虑使用她?简单构思了一下,画了一个草图,来和大家进行讨论讨论,基本流程:每个node上都部署了datax的服务(需要封装一个在线服务用于启动datax),定时将node状态信息(状态是否存活,内存,cpu负载,等信息)上报到consul配置中心。配置中心的配置由资源管理模块统一进行管理,当作业提交一个导出导出的请求,先计算分片的数量,以及每个分片需要使用的资源,然后向资源管理模块申请资源,作业管理根据申请到的资源,将启动作业发送到服务注册中心,去启动作业,当作业发生异常情况,反馈给作业管理,进入下一个的待调度列表。
  • java调用shell,命令需要考虑到俩个东西,输入流和异常流。后者是最容易忽略的地方,直接关系着程序的健壮性,sqoop客户端导入导出给我们提供了一个标准的例子。
  • 对数据拆分,然后进行并行操作,这在数据处理领域是一个比较常见的事,共享一个比较经典的例子。
  • <code>import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;

    public class AccumulateNum implements Serializable {



    public final static List<integer> LST_NUM=new ArrayList();

    static {
    for(int i=1;i<=100000;i++){
    LST_NUM.add(i);
    }
    }

    public static class AccumulateTask extends RecursiveTask<long> {

    //fork的粒度
    private final static int THRESHOLD=1000;
    private int start;
    private int end;
    public AccumulateTask(int start,int end){
    this.start=start;
    this.end=end;
    }

    @Override
    public Long compute() {
    long sum=0;
    if(end<=start)return sum;

    if(end-start<threshold> //拆分到理想的粒度进行求和
    for(int i=start;i<=end;++i){
    sum+= LST_NUM.get(i);
    }
    }else {
    //继续拆分所有求和的数组
    int mid=(start+end)/2;
    AccumulateTask left = new AccumulateTask(start, mid);
    left.fork();
    AccumulateTask right = new AccumulateTask(mid + 1, end);
    right.fork();
    sum=left.join();
    sum += right.join();
    }
    return sum;
    }
    }

    public static void main(String[] args) throws Exception {

    int cpu_num=Runtime.getRuntime().availableProcessors();
    ForkJoinPool pool = new ForkJoinPool();

    Long startTime=System.nanoTime();
    Future<long> future =pool.submit(new AccumulateTask(0,LST_NUM.size()-1));
    Long endTime=System.nanoTime();
    System.out.println(future.get());
    System.out.println(endTime-startTime+"ns");
    pool.shutdown();
    pool.awaitTermination(1, TimeUnit.SECONDS);

    }
    }/<long>/<threshold>/<long>/<integer>/<code>


    专注于技术热点大数据,人工智能,JAVA、Python、 C 、GO、Javascript等语言最新前言技术,及业务痛点问题分析,请关注【编程我最懂】共同交流学习。


    分享到:


    相關文章: