MongoDB Stream是如何實現完美數據增量遷移的?

MongoDB Stream是如何实现完美数据增量迁移的?

唐卓章(zale),華為技術專家,多年互聯網研發/架設經驗,關注NoSQL中間件高可用及彈性擴展,在分佈式系統架構性能優化方面有豐富的實踐經驗。目前從事物聯網平臺研發工作,致力於打造大容量高可用的物聯網服務。

一、背景介紹

最近微服務架構火得不行,但本質上也只是風口上的一個熱點詞彙。

作為筆者的經驗來說,想要應用一個新的架構需要帶來的變革成本是非常高的。

儘管如此,目前還是有許多企業踏上了服務化改造的道路,這其中則免不了“舊改”的各種繁雜事。

所謂的“舊改”,就是把現有的系統架構來一次重構,拆分成多個細粒度的服務後,然後找時間升級割接一把,讓新系統上線。這其中,數據的遷移往往會成為一個非常重要且繁雜的活兒。

拆分服務時數據遷移的挑戰在哪?

  • 首先是難度大,做一個遷移方案需要了解項目的前身今世,評估遷移方案、技術工具等;

  • 其次是成本高。由於新舊系統數據結構是不一樣的,需要定製開發遷移轉化功能,很難有一個通用的工具能一鍵遷移;

  • 再者對於一些容量大、可靠性要求高的系統,要能夠不影響業務,出了問題還能追溯,因此方案上還得往復雜了想。

二、常見方案

按照遷移的方案及流程,可將數據遷移分為三類:

1、停機遷移

最簡單的方案,停機遷移的順序如下:

MongoDB Stream是如何实现完美数据增量迁移的?

採用停機遷移的好處是流程操作簡單,工具成本低,然而缺點也很明顯,遷移過程中業務是無法訪問的,因此只適合於規格小、允許停服的場景。

2、業務雙寫

業務雙寫是指對現有系統先進行改造升級,支持同時對新庫和舊庫進行寫入。之後再通過數據遷移工具對舊數據做全量遷移,待所有數據遷移轉換完成後切換到新系統。

示意圖:

MongoDB Stream是如何实现完美数据增量迁移的?

業務雙寫的方案是平滑的,對線上業務影響極小,在出現問題的情況下可重新來過,操作壓力也會比較小。

筆者在早些年前嘗試過這樣的方案,整個遷移過程確實非常順利,但實現該方案比較複雜,需要對現有的代碼進行改造並完成新數據的轉換及寫入,對於開發人員的要求較高。在業務邏輯清晰、團隊對系統有足夠的把控能力的場景下適用。

3、增量遷移

增量遷移的基本思路是先進行全量的遷移轉換,待完成後持續進行增量數據的處理,直到數據追平後切換系統。

示意圖:

MongoDB Stream是如何实现完美数据增量迁移的?

關鍵點:

  • 要求系統支持增量數據的記錄。對於MongoDB可以利用oplog實現這點,為避免全量遷移過程中oplog被沖掉,在開始遷移前就必須開始監聽oplog,並將變更全部記錄下來;如果沒有辦法,需要從應用層上考慮,比如為所有的表(集合)記錄下updateTime這樣的時間戳,或者升級應用並支持將修改操作單獨記錄下來。

  • 增量數據的回放是持續的。在所有的增量數據回放轉換過程中,系統仍然會產生新的增量數據,這要求遷移工具能做到將增量數據持續回放並將之追平,之後才能做系統切換。

MongoDB 3.6版本開始便提供了Change Stream功能,支持對數據變更記錄做監聽。這為實現數據同步及轉換處理提供了更大的便利,下面將探討如何利用Change Stream實現數據的增量遷移。

三、Change Stream介紹

Chang Stream(變更記錄流)是指collection(數據庫集合)的變更事件流,應用程序通過db.collection.watch這樣的命令可以獲得被監聽對象的實時變更。

在該特性出現之前,你可以通過拉取oplog達到同樣的目的;但oplog的處理及解析相對複雜且存在被回滾的風險,如果使用不當的話還會帶來性能問題。Change Stream可以與aggregate framework結合使用,對變更集進行進一步的過濾或轉換。

參考鏈接:https://docs.mongodb.com/manual/aggregation/

由於Change Stream利用了存儲在oplog中的信息,因此對於單進程部署的MongoDB無法支持Change Stream功能,其只能用於啟用了副本集的獨立集群或分片集群。

監聽的目標

MongoDB Stream是如何实现完美数据增量迁移的?

變更事件

一個Change Stream Event的基本結構如下所示:

MongoDB Stream是如何实现完美数据增量迁移的?

字段說明:

MongoDB Stream是如何实现完美数据增量迁移的?

Change Steram支持的變更類型有以下幾個:

MongoDB Stream是如何实现完美数据增量迁移的?

利用以下的shell腳本,可以打印出集合 T_USER上的變更事件:

MongoDB Stream是如何实现完美数据增量迁移的?

下面提供一些樣例,感受一下:

insert事件

MongoDB Stream是如何实现完美数据增量迁移的?

update事件

MongoDB Stream是如何实现完美数据增量迁移的?

replace事件

MongoDB Stream是如何实现完美数据增量迁移的?

delete事件

MongoDB Stream是如何实现完美数据增量迁移的?

invalidate事件

MongoDB Stream是如何实现完美数据增量迁移的?

更多的Change Event信息可以參考:https://docs.mongodb.com/manual/reference/change-events/

四、實現增量遷移

本次設計了一個簡單的論壇帖子遷移樣例,用於演示如何利用Change Stream實現完美的增量遷移方案。

背景如下:

現有的系統中有一批帖子,每個帖子都屬於一個頻道(channel),如下表:

MongoDB Stream是如何实现完美数据增量迁移的?

新系統中頻道字段將採用英文簡稱,同時要求能支持平滑升級。根據前面篇幅的敘述,我們將使用Change Stream功能實現一個增量遷移的方案。

相關表的轉換如下圖:

MongoDB Stream是如何实现完美数据增量迁移的?

原理

topic是帖子原表,在遷移開始前將開啟watch任務持續獲得增量數據,並記錄到 topic_incr表中;接著執行全量的遷移轉換,之後再持續對增量表數據進行遷移,直到無新的增量為止。

接下來我們使用Java程序來完成相關代碼,mongodb-java--driver在3.6版本後才支持watch功能,需要確保升級到對應版本:

MongoDB Stream是如何实现完美数据增量迁移的?

定義Channel頻道的轉換表:

public static enum Channel {

Food("美食"),

Emotion("情感"),

Pet("寵物"),

House("家居"),

Marriage("徵婚"),

Education("教育"),

Travel("旅遊")

;

private final String oldName;

public String getOldName {

return oldName;

}

private Channel(String oldName) {

this.oldName = oldName;

}

/**

* 轉換為新的名稱

*

* @param oldName

* @return

*/

public static String toNewName(String oldName) {

for (Channel channel : values) {

if (channel.oldName.equalsIgnoreCase(oldName)) {

return channel.name;

return "";

* 返回一個隨機頻道

public static Channel random {

Channel channels = values;

int idx = (int) (Math.random * channels.length);

return channels[idx];

為topic表預寫入1w條記錄:

MongoDB Stream是如何实现完美数据增量迁移的?

開啟監聽任務,將topic上的所有變更寫入到增量表:

MongoDB Stream是如何实现完美数据增量迁移的?

代碼中通過watch命令獲得一個MongoCursor對象,用於遍歷所有的變更。

FullDocument.UPDATE_LOOKUP選項啟用後,在update變更事件中將攜帶完整的文檔數據(FullDocument)。

watch命令提交後,mongos會與分片上的mongod(主節點)建立訂閱通道,這可能需要花費一點時間。

為了模擬線上業務的真實情況,啟用幾個線程對topic表進行持續寫操作:

MongoDB Stream是如何实现完美数据增量迁移的?

ChangeTask實現邏輯如下:

MongoDB Stream是如何实现完美数据增量迁移的?

每一個變更任務會不斷對topic產生寫操作,觸發一系列ChangeEvent產生:

  • doInsert:生成隨機頻道的topic後,執行insert;

  • doUpdate:隨機取得一個topic,將其channel字段改為隨機值,執行update;

  • doReplace:隨機取得一個topic,將其channel字段改為隨機值,執行replace;

  • doDelete:隨機取得一個topic,執行delete。

以doUpdate為例,實現代碼如下:

MongoDB Stream是如何实现完美数据增量迁移的?

啟動一個全量遷移任務,將topic表中數據遷移到topic_new新表:

MongoDB Stream是如何实现完美数据增量迁移的?

在全量遷移開始前,先獲得當前時刻的的最大 _id 值(可以將此值記錄下來)作為終點,隨後逐個完成遷移轉換。

在全量遷移完成後,便開始最後一步:增量遷移。

注:增量遷移過程中,變更操作仍然在進行。

final MongoCollection topicIncrCollection = getCollection(coll_topic_incr);

final MongoCollection

topicNewCollection = getCollection(coll_topic_new);

ObjectId currentId = ;

Document sort = new Document("_id", 1);

MongoCursor cursor = ;

// 批量大小

int batchSize = 100;AtomicInteger count = new AtomicInteger(0);

try {

while (true) {

boolean isWatchTaskStillRunning = watchFlag.getCount > 0;

// 按ID增量分段拉取

if (currentId == ) {

cursor = topicIncrCollection.find.sort(sort).limit(batchSize).iterator;

} else {

cursor = topicIncrCollection.find(new Document("_id", new Document("$gt", currentId)))

.sort(sort).limit(batchSize).iterator;

}

boolean hasIncrRecord = false;

while (cursor.hasNext) {

hasIncrRecord = true;

Document incrDoc = cursor.next;

OperationType opType = OperationType.fromString(incrDoc.getString(field_op));

ObjectId docId = incrDoc.getObjectId(field_key);

// 記錄當前ID

currentId = incrDoc.getObjectId("_id");

if (opType == OperationType.DELETE) {

topicNewCollection.deleteOne(new Document("_id", docId));

} else {

Document doc = incrDoc.get(field_data, Document.class);

// channel轉換

String oldChannel = doc.getString(field_channel);

doc.put(field_channel, Channel.toNewName(oldChannel));

// 啟用upsert

UpdateOptions options = new UpdateOptions.upsert(true);

topicNewCollection.replaceOne(new Document("_id", docId),

incrDoc.get(field_data, Document.class), options);

}

if (count.incrementAndGet % 10 == 0) {

logger.info("IncrTransferTask progress, count: {}", count.get);

// 當watch停止工作(沒有更多變更),同時也沒有需要處理的記錄時,跳出

if (!isWatchTaskStillRunning && !hasIncrRecord) {

break;

}

sleep(200);

}

} catch (Exception e) {

logger.error("IncrTransferTask ERROR", e);

}

增量遷移的實現是一個不斷tail的過程,利用 **_id 字段的有序特性 ** 進行分段遷移;即記錄下當前處理的_id值,循環拉取在該_id值之後的記錄進行處理。

增量表(topic_incr)中除了DELETE變更之外,其餘的類型都保留了整個文檔,因此可直接利用replace + upsert追加到新表。

最後,運行整個程序。

MongoDB Stream是如何实现完美数据增量迁移的?

查看topic表和topic_new表,發現兩者數量是相同的。為了進一步確認一致性,我們對兩個表的分別做一次聚合統計:

topic表

MongoDB Stream是如何实现完美数据增量迁移的?

topic_new表

MongoDB Stream是如何实现完美数据增量迁移的?

前者輸出結果:

MongoDB Stream是如何实现完美数据增量迁移的?

後者輸出結果:

MongoDB Stream是如何实现完美数据增量迁移的?

前後對比的結果是一致的。

五、後續優化

前面的章節演示了一個增量遷移的樣例,在投入到線上運行之前,這些代碼還得繼續優化:

  • 寫入性能,線上的數據量可能會達到億級,在全量、增量遷移時應採用合理的批量化處理;另外可以通過增加併發線程,添置更多的Worker,分別對不同業務庫、不同表進行處理以提升效率。增量表存在冪等性,即回放多次其最終結果還是一致的,但需要保證表級有序,即一個表同時只有一個線程在進行增量回放。

  • 容錯能力,一旦watch監聽任務出現異常,要能夠從更早的時間點開始(使用startAtOperationTime參數),而如果寫入時發生失敗,要支持重試。

  • 回溯能力,做好必要的跟蹤記錄,比如將轉換失敗的ID號記錄下來,舊系統的數據需要保留,以免在事後追究某個數據問題時找不著北。

  • 數據轉換,新舊業務的差異不會很簡單,通常需要藉助大量的轉換表來完成。

  • 一致性檢查,需要根據業務特點開發自己的一致性檢查工具,用來證明遷移後數據達到想要的一致性級別。

BTW,數據遷移一定要結合業務特性、架構差異來做考慮,否則還是在耍流氓。

六、小結

服務化系統中擴容、升級往往會進行數據遷移,對於業務量大,中斷敏感的系統通常會採用平滑遷移的方式。

MongoDB 3.6版本後提供了Change Stream功能以支持應用訂閱數據的變更事件流,本文使用Stream功能實現了增量平滑遷移的例子,這是一次嘗試,相信後續這樣的應用場景會越來越多。

附參考文檔

  • 100億數據平滑數據遷移,不影響服務-58沈劍

  • MongoDB-ChangeStream

    https://docs.mongodb.com/manual/changeStreams/

  • Use-ChangeStream To Handle Temperature

    https://www.percona.com/blog/2017/11/22/mongodb-3-6-change-streams-nest-temperature-fan-control-use-case/


分享到:


相關文章: