01.06 MongoDB 4.2 內核解析

MongoDB 從3.6版本開始支持了 Change Stream 能力(4.0、4.2 版本在能力上做了很多增強),用於訂閱 MongoDB 內部的修改操作,change stream 可用於 MongoDB 之間的增量數據遷移、同步,也可以將 MongoDB 的增量訂閱應用到其他的關聯繫統;比如電商場景裡,MongoDB 裡存儲新的訂單信息,業務需要根據新增的訂單信息去通知庫存管理系統發貨。

Change Stream 與 Tailing Oplog 對比

在 change stream 功能之前,如果要獲取 MongoDB 增量的修改,可以通過不斷 tailing oplog 的方式來 拉取增量的 oplog ,然後針對拉取到的 oplog 集合,來過濾滿足條件的 oplog。這種方式也能滿足絕大部分場景的需求,但存在如下的不足。

  1. 使用門檻較高,用戶需要針對 oplog 集合,打開特殊選項的的 tailable cursor ("tailable": true, "awaitData" : true)。
  2. 用戶需要自己管理增量續傳,當拉取應用 crash 時,用戶需要記錄上一條拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再繼續拉取。
  3. 結果過濾必須在拉取側完成,但只需要訂閱部分 oplog 時,比如針對某個 DB、某個 Collection、或某種類型的操作,必須要把左右的 oplog 拉取到再進行過濾。
  4. 對於 update 操作,oplog 只包含操作的部分內容,比如 {$set: {x: 1}} ,而應用經常需要獲取到完整的文檔內容。
  5. 不支持 Sharded Cluster 的訂閱,用戶必須針對每個 shard 進行 tailing oplog,並且這個過程中不能有 moveChunk 操作,否則結果可能亂序。

MongoDB Change Stream 解決了 Tailing oplog 存在的不足

  1. 簡單易用,提供統一的 Change Stream API,一次 API 調用,即可從 MongoDB Server 側獲取增量修改。
  2. 統一的進度管理,通過 resume token 來標識拉取位置,只需在 API 調用時,帶上上次結果的 resume token,即可從上次的位置接著訂閱。
  3. 支持對結果在 Server 端進行 pipeline 過濾,減少網絡傳輸,支持針對 DB、Collection、OperationType 等維度進行結果過濾。
  4. 支持 fullDocument: "updateLookup" 選項,對於 update,返回當時對應文檔的完整內容。
  5. 支持 Sharded Cluster 的修改訂閱,相同的 API 請求發到 mongos ,即可獲取集群維度全局有序的修改。

Change Stream 實戰

以 Mongo shell 為例,使用 Change Stream 非常簡單,mongo shell 封裝了針對整個實例、DB、Collection 級別的訂閱操作。

<code>db.getMongo().watch()    訂閱整個實例的修改
db.watch() 訂閱指定DB的修改
db.collection.watch() 訂閱指定Collection的修改/<code>
  • 新建連接1發起訂閱操作
<code>mytest:PRIMARY>db.coll.watch([], {maxAwaitTimeMS: 60000})  最多阻塞等待 1分鐘/<code>
  • 新建連接2寫入新數據
<code>mytest:PRIMARY> db.coll.insert({x: 100})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 101})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 102})
WriteResult({ "nInserted" : 1 })/<code>
  • 連接1上收到 Change Stream 更新
<code>mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000})
{ "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } }
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }/<code>
  • 上述 ChangeStream 結果裡,_id 字段的內容即為 resume token,標識著 oplog 的某個位置,如果想從某個位置繼續訂閱,在 watch 時,通過 resumeAfter 指定即可。比如每個應用訂閱了上述3條修改,但只有第一條已經成功消費了,下次訂閱時指定第一條的 resume token 即可再次訂閱到接下來的2條。
<code>mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }})
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }/<code>

Change Stream 內部實現

watch() wrapper

db.watch() 實際上是一個 API wrapper,實際上 Change Stream 在 MongoDB 內部實際上是一個 aggregation 命令,只是加了一個特殊的 $changestream 階段,在發起 change stream 訂閱操作後,可通過 db.currentOp() 看到對應的 aggregation/getMore 操作的詳細參數。

<code>{
"op" : "getmore",

"ns" : "test.coll",
"command" : {
"getMore" : NumberLong("233479991942333714"),
"collection" : "coll",
"maxTimeMS" : 50000,
"lsid" : {
"id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
},
},
"planSummary" : "COLLSCAN",
"cursor" : {
"cursorId" : NumberLong("233479991942333714"),
"createdDate" : ISODate("2019-12-31T06:35:52.479Z"),
"lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"),
"nDocsReturned" : NumberLong(1),
"nBatchesReturned" : NumberLong(1),
"noCursorTimeout" : false,
"tailable" : true,
"awaitData" : true,
"originatingCommand" : {
"aggregate" : "coll",
"pipeline" : [
{
"$changeStream" : {
"fullDocument" : "default"
}
}
],
"cursor" : {

},
"lsid" : {
"id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
},
"$clusterTime" : {
"clusterTime" : Timestamp(1577774144, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"$db" : "test"
},
"operationUsingCursorId" : NumberLong(7019500)
},
"numYields" : 2,
"locks" : {

}
}/<code>

resume token

resume token 用來描述一個訂閱點,本質上是 oplog 信息的一個封裝,包含 clusterTime、uuid、documentKey等信息,當訂閱 API 帶上 resume token 時,MongoDB Server 會將 token 轉換為對應的信息,並定位到 oplog 起點繼續訂閱操作。

<code>struct ResumeTokenData {
Timestamp clusterTime;
int version = 0;
size_t applyOpsIndex = 0;
Value documentKey;
boost::optional<uuid> uuid;
};/<uuid>/<code>

ResumeTokenData 結構裡包含 version 信息,在 4.0.7 以前的版本,version 均為0; 4.0.7 引入了一種新的 resume token 格式,version 為 1; 另外在 3.6 版本里,Resume Token 的編碼與 4.0 也有所不同;所以在版本升級後,有可能出現不同版本 token 無法識別的問題,所以儘量要讓 MongoDB Server 所有組件(Replica Set 各個成員,ConfigServer、Mongos)都保持相同的內核版本。

updateLookup

Change Stream 支持針對 update 操作,獲取當前的文檔完整內容,而不是僅更新操作本身,比如

<code>mytest:PRIMARY> db.coll.find({_id: 101})
{ "_id" : 101, "name" : "jack", "age" : 18 }
mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })/<code>

上面的 update 操作,默認情況下,change stream 會收到 {_id: 101}, {$set: {age: 20} 的內容,而並不會包含這個文檔其他未更新字段的信息;而加上 fullDocument: "updateLookup" 選項後,Change Stream 會根據文檔 _id 去查找文檔當前的內容並返回。

需要注意的是,updateLookup 選項只能保證最終一致性,比如針對上述文檔,如果連續更新100次,update 的 change stream 並不會按順序收到中間每一次的更新,因為每次都是去查找文檔當前的內容,而當前的內容可能已經被後續的修改覆蓋。

Sharded cluster

Change Stream 支持針對 sharded cluster 進行訂閱,會保證全局有序的返回結果;為了達到全局有序這個目標,mongos 需要從每個 shard 都返回訂閱結果按時間戳進行排序合併返回。

在極端情況下,如果某些 shard 寫入量很少或者沒有寫入,change stream 的返回延時會受到影響,因為需要等到所有 shard 都返回訂閱結果;默認情況下,mongod server 每10s會產生一條 Noop 的特殊oplog,這個機制會間接驅動 sharded cluster 在寫入量不高的情況下也能持續運轉下去。

由於需要全局排序,在 sharded cluster 寫入量很高時,Change Stream 的性能很可能跟不上;如果對性能要求非常高,可以考慮關閉 Balancer,在每個 shard 上各自建立 Change Stream。


分享到:


相關文章: