DBLE LOAD DATA 功能實現解析

熱愛技術、樂於分享的技術人,目前主要從事數據庫相關技術的研究。


1. 概述

本篇文章主要介紹 DBLE LOAD DATA 大規模數據導入功能的實現,包括方案設計、源碼解讀。

下面就讓我們一起來探秘 DBLE 是如何實現該功能的吧!


2. 方案設計

LOAD DATA 為 MySQL 提供的從文本文件導入數據到表的語法,作為數據庫中間件,當然也需要實現對應的功能,來滿足用戶的導入數據需求。

DBLE 對該功能的實現其實就是直接模擬了 MySQL 對 LOAD DATA 命令相應的處理協議。當然作為數據庫中間件,還需要處理相應數據的存儲、數據路由情況以及與後端 MySQL 的交互等方面的邏輯。

下圖即為 DBLE 對 LOAD DATA 處理的整體流程:


分佈式 | DBLE LOAD DATA 功能實現解析


3. 源碼解讀

DBLE 與 LOAD DATA 功能實現相關的類其實主要有兩個,一個是 ServerLoadDataInfileHandler類,一個是 LoadDataUtil 類, ServerLoadDataInfileHandler 類主要處理的是與客戶端交互的邏輯,而 LoadDataUtil 類主要處理的是與後端 MySQL 交互的邏輯。

下面我們就從客戶端發送命令到 DBLE 處理,最後到 DBLE 與後端 MySQL 交互的過程,來詳細看下相應的代碼。

當客戶端發來 LOAD DATA 導入數據到表命令的時候,DBLE 作為服務端會接收到相應的命令並進行處理,對應的代碼在 ServerQueryHandler#query 方法中,這裡會判斷 SQL 的類型為 LOAD DATA,然後進一步處理:

<code>public void query(String sql) {        ServerConnection c = this.source;        if (LOGGER.isDebugEnabled()) {            LOGGER.debug(String.valueOf(c) + sql);        }         ……        int rs = ServerParse.parse(sql);        boolean isWithHint = ServerParse.startWithHint(sql);        int sqlType = rs & 0xff;        ……        switch (sqlType) {        ……            case ServerParse.LOAD_DATA_INFILE_SQL:                    //對LOAD DATA的處理,調用FrontendConnection#loadDataInfileStart方法                    c.loadDataInfileStart(sql);                    break;      ……      }  }/<code>

繼續看一下 FrontendConnection#loadDataInfileStart 方法:

<code>    public void loadDataInfileStart(String sql) {        if (loadDataInfileHandler != null) {            try {                //進一步調用了ServerLoadDataInfileHandler#start方法                loadDataInfileHandler.start(sql);            } catch (Exception e) {                LOGGER.info("load data error", e);                writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.getMessage());            }        } else {            writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "load data infile sql is not  unsupported!");        }    }/<code>

下面便進入到了 ServerLoadDataInfileHandler#start 方法,前面講過該類主要處理的是 DBLE 與客戶端的交互邏輯。

該方法比較長,大家可以去細看,主要功能還是解析了客戶端發送過來的 SQL 語句,然後針對 LOAD DATA 語法,如果導入文件是本機文件,則直接進行解析,否則的話會向客戶端發送獲取文件的命令,讓客戶端傳輸文件過來:

<code>public void start(String strSql) {        ……        parseLoadDataPram();        //如果文件不在本地,則向客戶端發送命令,請求數據文件,這裡的local可能會讓人疑惑,但MySQL語法確實是這麼規定的,load data local用法反而是文件不在本地的用法        if (statement.isLocal()) {            isStartLoadData = true;            //request file from client            ByteBuffer buffer = serverConnection.allocate();            RequestFilePacket filePacket = new RequestFilePacket();            filePacket.setFileName(fileName.getBytes());            filePacket.setPacketId(1);            filePacket.write(buffer, serverConnection, true);        } else {            //如果文件在本地的話,先判斷文件是否存在,不存在則報錯,存在的話需要對文件進行讀取,計算每一行的路由結果,然後對不同節點的數據分別進行存儲            if (!new File(fileName).exists()) {                String msg = fileName + " is not found!";                clear();                serverConnection.writeErrMessage(ErrorCode.ER_FILE_NOT_FOUND, msg);            } else {                if (parseFileByLine(fileName, loadData.getCharset(), loadData.getLineTerminatedBy())) {                    RouteResultset rrs = buildResultSet(routeResultMap);                    if (rrs != null) {                        flushDataToFile();                        isStartLoadData = false;                        serverConnection.getSession2().execute(rrs);                    }                }            }        }    }/<code>

DBLE 發送命令給客戶端後,客戶端便會源源不斷地把數據文件發送過來,對發送過來文件的處理邏輯在 ServerLoadDataInfileHandler#handle 方法中,該方法其實就是對傳輸過來的文件進行轉儲,默認數據小於 200Mb 則存在內存中,否則的話存儲到本地文件:

<code>public void handle(byte[] data) {        try {            if (sql == null) {                clear();                serverConnection.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");                return;            }            BinaryPacket packet = new BinaryPacket();            ByteArrayInputStream inputStream = new ByteArrayInputStream(data, 0, data.length);            packet.read(inputStream);            //這裡就是對發送過來的文件進行轉儲            saveByteOrToFile(packet.getData(), false);        } catch (IOException e) {            throw new RuntimeException(e);        }    }/<code>

文件發送完成,客戶端還會發送一個空包過來,告訴 DBLE 數據發送完了,然後 DBLE 會進行下一步處理(其實這裡就是 MySQL 協議中的規定),下一步處理的邏輯在 ServerLoadDataInfileHandler#end方法中。

該方法也比較長,主要處理邏輯是將接受過來的文件進一步計算路由,根據計算結果將文件根據不同節點分別存儲,最後構建路由結果集,通過 DBLE 下發 LOAD DATA 命令到後端不同的 MySQL 節點:

<code>public void end(byte packId) {        isStartLoadData = false;        this.packID = packId;        //empty packet for end        saveByteOrToFile(null, true);        if (isHasStoreToFile) {            //這裡便是計算路由,並根據路由結果存儲不同節點的數據文件            parseFileByLine(tempFile, loadData.getCharset(), loadData.getLineTerminatedBy());        }        ……        //構建路由結果集,下發後端MySQL,執行LOAD DATA命令        RouteResultset rrs = buildResultSet(routeResultMap);        if (rrs != null) {            flushDataToFile();            serverConnection.getSession2().execute(rrs);        }}/<code> 

DBLE 與後端 MySQL 的交互邏輯跟客戶端與 DBLE 的交互邏輯基本一樣,因為都是基於 MySQL 協議嘛,DBLE 這邊還需要做的就是將不同節點的數據文件發送給後端的 MySQL,具體的邏輯在 LoadDataUtil#requestFileDataResponse 方法中,該方法就是將 DBLE 處理過的數據文件,發送到後端的 MySQL 了,由 MySQL 來進行真正的數據存儲:

<code>public static void requestFileDataResponse(byte[] data, BackendConnection conn) {        byte packId = data[3];        MySQLConnection c = (MySQLConnection) conn;        RouteResultsetNode rrn = (RouteResultsetNode) conn.getAttachment();        LoadData loadData = rrn.getLoadData();        List<string> loadDataData = loadData.getData();        BufferedInputStream in = null;        try {            //如果數據較小,都在內存中,則直接發送            if (loadDataData != null && loadDataData.size() > 0) {                ByteArrayOutputStream bos = new ByteArrayOutputStream();                for (String loadDataDataLine : loadDataData) {                    String s = loadDataDataLine + loadData.getLineTerminatedBy();                    byte[] bytes = s.getBytes(CharsetUtil.getJavaCharset(loadData.getCharset()));                    bos.write(bytes);                }                packId = writeToBackConnection(packId, new ByteArrayInputStream(bos.toByteArray()), c);            } else {                //否則的話,先讀取文件,然後再發送數據                in = new BufferedInputStream(new FileInputStream(loadData.getFileName()));                packId = writeToBackConnection(packId, in, c);            }        }     ……    }/<string>/<code>

到這裡,整個 DBLE 對 LOAD DATA 的處理流程就講完啦。


4. 總結

本篇文章主要分析講解了 DBLE 對 LOAD DATA 功能的實現,包括方案設計以及源碼解讀,希望大家看完後能對整個 LOAD DATA 功能有更進一步的瞭解。


分享到:


相關文章: