DBLE LOAD DATA 功能實現解析

熱愛技術、樂於分享的技術人,目前主要從事數據庫相關技術的研究。

1. 概述

本篇文章主要介紹 DBLE LOAD DATA 大規模數據導入功能的實現,包括方案設計、源碼解讀。

下面就讓我們一起來探秘 DBLE 是如何實現該功能的吧!


2. 方案設計

LOAD DATA 為 MySQL 提供的從文本文件導入數據到表的語法,作為數據庫中間件,當然也需要實現對應的功能,來滿足用戶的導入數據需求。

DBLE 對該功能的實現其實就是直接模擬了 MySQL 對 LOAD DATA 命令相應的處理協議。當然作為數據庫中間件,還需要處理相應數據的存儲、數據路由情況以及與後端 MySQL 的交互等方面的邏輯。

下圖即為 DBLE 對 LOAD DATA 處理的整體流程:



3. 源碼解讀

DBLE 與 LOAD DATA 功能實現相關的類其實主要有兩個,一個是 ServerLoadDataInfileHandler類,一個是 LoadDataUtil 類, ServerLoadDataInfileHandler 類主要處理的是與客戶端交互的邏輯,而 LoadDataUtil 類主要處理的是與後端 MySQL 交互的邏輯。

下面我們就從客戶端發送命令到 DBLE 處理,最後到 DBLE 與後端 MySQL 交互的過程,來詳細看下相應的代碼。

當客戶端發來 LOAD DATA 導入數據到表命令的時候,DBLE 作為服務端會接收到相應的命令並進行處理,對應的代碼在 ServerQueryHandler#query 方法中,這裡會判斷 SQL 的類型為 LOAD DATA,然後進一步處理:

<code>public void query(String sql) { ServerConnection c = this.source; if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.valueOf(c) + sql); } …… int rs = ServerParse.parse(sql); boolean isWithHint = ServerParse.startWithHint(sql); int sqlType = rs & 0xff; …… switch (sqlType) { …… case ServerParse.LOAD_DATA_INFILE_SQL: //對LOAD DATA的處理,調用FrontendConnection#loadDataInfileStart方法 c.loadDataInfileStart(sql); break; …… } }/<code>

繼續看一下 FrontendConnection#loadDataInfileStart 方法:

<code> public void loadDataInfileStart(String sql) { if (loadDataInfileHandler != null) { try { //進一步調用了ServerLoadDataInfileHandler#start方法 loadDataInfileHandler.start(sql); } catch (Exception e) { LOGGER.info("load data error", e); writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.getMessage()); } } else { writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "load data infile sql is not unsupported!"); } }/<code>

下面便進入到了 ServerLoadDataInfileHandler#start 方法,前面講過該類主要處理的是 DBLE 與客戶端的交互邏輯。

該方法比較長,大家可以去細看,主要功能還是解析了客戶端發送過來的 SQL 語句,然後針對 LOAD DATA 語法,如果導入文件是本機文件,則直接進行解析,否則的話會向客戶端發送獲取文件的命令,讓客戶端傳輸文件過來:

<code>public void start(String strSql) { …… parseLoadDataPram(); //如果文件不在本地,則向客戶端發送命令,請求數據文件,這裡的local可能會讓人疑惑,但MySQL語法確實是這麼規定的,load data local用法反而是文件不在本地的用法 if (statement.isLocal()) { isStartLoadData = true; //request file from client ByteBuffer buffer = serverConnection.allocate(); RequestFilePacket filePacket = new RequestFilePacket(); filePacket.setFileName(fileName.getBytes()); filePacket.setPacketId(1); filePacket.write(buffer, serverConnection, true); } else { //如果文件在本地的話,先判斷文件是否存在,不存在則報錯,存在的話需要對文件進行讀取,計算每一行的路由結果,然後對不同節點的數據分別進行存儲 if (!new File(fileName).exists()) { String msg = fileName + " is not found!"; clear(); serverConnection.writeErrMessage(ErrorCode.ER_FILE_NOT_FOUND, msg); } else { if (parseFileByLine(fileName, loadData.getCharset(), loadData.getLineTerminatedBy())) { RouteResultset rrs = buildResultSet(routeResultMap); if (rrs != null) { flushDataToFile(); isStartLoadData = false; serverConnection.getSession2().execute(rrs); } } } } }/<code>

DBLE 發送命令給客戶端後,客戶端便會源源不斷地把數據文件發送過來,對發送過來文件的處理邏輯在 ServerLoadDataInfileHandler#handle 方法中,該方法其實就是對傳輸過來的文件進行轉儲,默認數據小於 200Mb 則存在內存中,否則的話存儲到本地文件:

<code>public void handle(byte[] data) { try { if (sql == null) { clear(); serverConnection.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command"); return; } BinaryPacket packet = new BinaryPacket(); ByteArrayInputStream inputStream = new ByteArrayInputStream(data, 0, data.length); packet.read(inputStream); //這裡就是對發送過來的文件進行轉儲 saveByteOrToFile(packet.getData(), false); } catch (IOException e) { throw new RuntimeException(e); } }/<code>

文件發送完成,客戶端還會發送一個空包過來,告訴 DBLE 數據發送完了,然後 DBLE 會進行下一步處理(其實這裡就是 MySQL 協議中的規定),下一步處理的邏輯在 ServerLoadDataInfileHandler#end方法中。

該方法也比較長,主要處理邏輯是將接受過來的文件進一步計算路由,根據計算結果將文件根據不同節點分別存儲,最後構建路由結果集,通過 DBLE 下發 LOAD DATA 命令到後端不同的 MySQL 節點:

<code>public void end(byte packId) { isStartLoadData = false; this.packID = packId; //empty packet for end saveByteOrToFile(null, true); if (isHasStoreToFile) { //這裡便是計算路由,並根據路由結果存儲不同節點的數據文件 parseFileByLine(tempFile, loadData.getCharset(), loadData.getLineTerminatedBy()); } …… //構建路由結果集,下發後端MySQL,執行LOAD DATA命令 RouteResultset rrs = buildResultSet(routeResultMap); if (rrs != null) { flushDataToFile(); serverConnection.getSession2().execute(rrs); }}/<code>

DBLE 與後端 MySQL 的交互邏輯跟客戶端與 DBLE 的交互邏輯基本一樣,因為都是基於 MySQL 協議嘛,DBLE 這邊還需要做的就是將不同節點的數據文件發送給後端的 MySQL,具體的邏輯在 LoadDataUtil#requestFileDataResponse 方法中,該方法就是將 DBLE 處理過的數據文件,發送到後端的 MySQL 了,由 MySQL 來進行真正的數據存儲:

<code>public static void requestFileDataResponse(byte[] data, BackendConnection conn) { byte packId = data[3]; MySQLConnection c = (MySQLConnection) conn; RouteResultsetNode rrn = (RouteResultsetNode) conn.getAttachment(); LoadData loadData = rrn.getLoadData(); List<string> loadDataData = loadData.getData(); BufferedInputStream in = null; try { //如果數據較小,都在內存中,則直接發送 if (loadDataData != null && loadDataData.size() > 0) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); for (String loadDataDataLine : loadDataData) { String s = loadDataDataLine + loadData.getLineTerminatedBy(); byte[] bytes = s.getBytes(CharsetUtil.getJavaCharset(loadData.getCharset())); bos.write(bytes); } packId = writeToBackConnection(packId, new ByteArrayInputStream(bos.toByteArray()), c); } else { //否則的話,先讀取文件,然後再發送數據 in = new BufferedInputStream(new FileInputStream(loadData.getFileName())); packId = writeToBackConnection(packId, in, c); } } …… }/<string>/<code>

到這裡,整個 DBLE 對 LOAD DATA 的處理流程就講完啦。


4. 總結

本篇文章主要分析講解了 DBLE 對 LOAD DATA 功能的實現,包括方案設計以及源碼解讀,希望大家看完後能對整個 LOAD DATA 功能有更進一步的瞭解。