快手 Druid 精確去重的設計和實現

本次分享內容提綱:

  • 快手 Druid 平臺概覽
  • Druid 精確去重功能設計
  • Druid 其他改進
  • 快手 Druid Roadmap

★ 快手 Druid 平臺概覽

1. 從業務需求角度考慮,快手為什麼選擇 Druid ?

快手的業務特點包括超大數據規模、毫秒級查詢時延、高數據實時性要求、高併發查詢、高穩定性以及較高的 Schema 靈活性要求;因此快手選擇 Druid 平臺作為底層架構。由於 Druid 原生不支持數據精確去重功能,而快手業務中會涉及到例如計費等場景,有精確去重的需求。因此,本文重點講述如何在 Druid 平臺中實現精確去重。另一方面,Druid 對外的接口是 json 形式 ( Druid 0.9 版本之後逐步支持 SQL ) ,對 SQL 並不友好,本文最後部分會簡述 Druid 平臺與 MySQL 交互方面做的一些改進。

2. 下面重點介紹快手 Druid 平臺的架構:

快手 Druid 精確去重的設計和實現


  • 和大多數數據平臺一樣,快手的數據源主要有兩種:基於 kafka 的實時數據源和建立在 hadoop 內的離線數據源。將這兩種數據源分別通過 kafka index 和 hadoop index 導入到平臺核心—— Druid 模型中。
  • 在 Druid 中,將數據進行業務隔離、冷熱分層 ( 例如:冷數據放在硬盤,熱數據放在 SSD )
  • 外部業務接口方面,支持通過 API 自行開發業務模塊,支持通過 Kwai BI 做可視化;同時針對小部分人需求,現已支持 tableau ( tableau 通過 hive 連接 Druid ) 。
  • 數據平臺輔助系統包括 metric 監控 ( 實時監控 CPU 佔用率情況,進程情況,lag 信息等 ) 、探針系統 ( 用來查詢,定義查詢熱度、維度等信息,有助於優化 ) 、管理系統 ( 在原生態 Druid 的 json 接口基礎上定製管理系統,實現任務的可視化導入等管理操作 )

★ Druid 精確去重概述

1. 原生 Druid 去重功能支持情況

a. 維度列

  • cardinality agg,非精確,基於 hll 。查詢時 hash 函數較耗費 CPU
  • 嵌套 group by,精確,耗費資源
  • 社區 DistinctCount 插件,精確,但是侷限很大:
  • ① 僅支持單維度,構建時需要基於該維度做 hash partition
  • ② 不能跨 interval 進行計算

b. 指標列

  • HyperUniques/Sketch,非精確,基於 hll,攝入時做計算,相比 cardinality agg 性能更高

結論:Druid 缺乏一種支持預聚合、資源佔用低、通用性強的精確去重支持。

2. 精確去重方案

a. 精確去重方案:hashset

使用 hashset 的方式存儲,具備簡單通用等優點,但是資源佔用非常大。以下圖所示為例,將左上角的原始數據使用 hashset 方法,Year 作為維度列,City 作為指標列;按照維度列分成 2 個 Segment,轉換為右上角的數據格式;接下來將指標列聚合到 broker 中,計算出 size,即得到最終結果。整個過程如下圖所示:

快手 Druid 精確去重的設計和實現


但是使用 hashset 的去重方式,其資源佔用非常大;具體來說,假設有 5000W 條平均長度為 10B 的 string ( 共 500MB ) ,中間生成的 Hashset 會產生一系列鏈表結構,導致內存可達到 5G,即內存擴展可達 10 倍。因此,直接使用 hashset 的方法可以是完全不可行的。

在 hashset 方法基礎上可以考慮一些優化的思路:例如,通過增加節點,將數據分散到不同機器上,查詢時在每臺機器上分別聚合進而求和;另一種方式類似於 MapReduce,計算前利用 shuffle 模型將數據打散。然而這兩種優化思路和 Druid 平臺屬性有衝突,因此都不適用於 Druid 這個平臺。

b. 精確去重方案:字典編碼 +Bitmap

Bitmap 方法,即用一個位來表示一個數據 ( 這是理論上的最小值 ) 。這種方案的思路是:對要去重的數據列做編碼,將 string 或其他類型的數據轉化成 int 型的編碼,攝入到 Druid 後通過 Bitmap 的形式存儲;查詢時將多個 Bitmap 做交集,獲取結果。Bitmap 的範圍是 42 億,最大佔用空間 500M 左右;還可以使用壓縮算法,使空間佔用大大減少。

因此,字典編碼 +Bitmap 的方式,優點是存儲和查詢資源佔用少,可以將構建和查詢分開 ( 即字典不參與查詢 ) ;缺點是:字典需要做全局編碼,其構建過程較複雜。

考慮到 Kylin 曾使用過這種方案,因此快手的後續優化方案選擇在這種方案的基礎上進行優化。

3. 字典編碼方案

a. 字典編碼方案的使用

① Redis 方案

Redis 可以進行實時編碼,可以同時支持離線任務和實時任務,因此,實時處理數據和離線處理數據可以使用相同的方案。

但是 Redis 的 ID 生成速度有瓶頸,最大 5W/s;查詢上壓力也較大,對交互會造成一定的麻煩;同時對 Redis 的穩定性要求也比較高。

② 離線 MR 方案

離線 MR 方案,利用 MR 的分佈式存儲,編碼和查詢的吞吐量更高,還用用更高的容錯性。但是 MR 僅支持離線導入任務。

綜合這兩種字典編碼方案的優劣,得到以下結論:

  • 離線任務使用離線 MR 編碼,通過小時級任務解決實現準實時
  • 實時任務僅支持原始 int 去重 ( 無需編碼 )

b. 字典編碼方案模型

快手的字典編碼方案參照的是 Kylin 的 AppendTrie 樹模型,模型詳見下圖。

快手 Druid 精確去重的設計和實現


Trie 樹模型主要是使用字符串做編碼;快手支持各種類型數據,可將不同類型數據統一轉換成字符串類型,再使用 Trie 樹模型做編碼。Trie 樹模型可以實現 Append,有兩種方式:

  • 按照節點 ( 而不是位置 ) 保存 ID,從而保持已有 ID 不變。
  • 記錄當前模型最大 ID,便於給新增節點分配 ID 。

但是這種模型會帶來一個問題:假如基數特別大,Trie 樹在內存中就會無限擴張。為了控制內存佔用率,選擇單顆子樹設定閾值,超過即分裂,進而控制單顆子樹內存消耗。分裂後,每棵樹負責一個範圍 ( 類似於 HBase 中的 region 分區 ) ;查詢時候只需要查詢一顆子樹即可。

快手 Druid 精確去重的設計和實現


為節省 CPU 資源,基於 guava LoadingCache,按需加載子樹 ( 即懶加載 ) ,並使用 LRU 方法換出子樹。

LRU ( Least Recently Used ) 是內存管理的一種置換算法,即內存不夠時,將最近不常用的子樹從內存清除,加載到磁盤上。LRU 算法根據數據的歷史訪問記錄來進行淘汰數據,其核心思想是“如果數據最近被訪問過,那麼將來被訪問的幾率也更高”。

c. 字典併發構建:

字典會存在併發構建的問題,分別使用 MVCC 以及 Zookeeper 分佈式鎖的方案。

① 使用 MVCC ( 持久化在 hdfs 上 ) ,在讀取的時候會使用最新版本,拷貝到 working 中進行構建,構建完成生成新的版本號;如歷史數據過多,會根據版本個數及 TTL 進行清理。

② 構建字典的過程中會操作臨時目錄,如果存在多個進程同時去寫臨時目錄,會存在衝突問題;因此引入 Zookeeper 分佈式鎖,基於 DataSource 和列來做唯一的定位,從而保證同一個字典同時只能有一個進程。

d. 精確去重的實現

① 新增 unique 指標存儲

  • 使用 ComplexMetricSerde 定義一個指標如何 Serde,即序列化與反序列化;
  • 使用 Aggregator,讓用戶定義一個指標如何執行聚合,即上卷;
  • 使用 Aggregator 的 Buffer 版本 BufferAggregator 作為 Aggregator 的替代品;
  • 使用 AggregatorFractory 定義指標的名稱,獲取具體的 Aggregator 。

② 精確去重的整體流程介紹

快手 Druid 精確去重的設計和實現


  • 使用 DetermineConfigurationJob 計算最終 Segment 分片情況。
  • 使用 BuildDictJob,Map 將同一去重列發送到一個 reducer 中 ( Map 端可先 combine ) ;每個 reducer 構建一列的全局字典;每列字典構建申請 ZK 鎖。
  • IndexGeneratorJob,在 Map 中加載字典,將去重列編碼成 int;Reducer 將 int 聚合成 Bitmap 存儲;每一個 reducer 生成一個 Segment。

此套精確去重的使用方法:

  • 導入時,定義 unique 類型指標
快手 Druid 精確去重的設計和實現


  • 查詢時,使用 unique 類型查詢
快手 Druid 精確去重的設計和實現


e. 性能優化與測試

① 優化字典查詢

· 存在一個超高基數列 ( UHC ) 的去重指標:在 IndexGenerator 之前增加 ClusterBy UHC 任務,保證每個 map 處理的 uhc 列數據有序,避免多次換入換出。

· 存在多個超高基數列的去重指標:拆成多個 DataSource,調大 IndexGenerator 任務的 map 內存,保證字典能加載到內存

② Bitmap 查詢優化

減少 targetPartitionSize 或調大 numShards,增加 Segment 個數,提升並行度。

使用 Bitmap 做 or 計算時候,建議使用 batchOr 方法,而不是逐行 or,避免 I/O 與 or 計算交替,緩存持續被刷新導致性能降低。關於 batchOr 的選擇,rolling_bitmap 提供一些接口,默認選擇 naive_or ( 儘量延遲計算模型 ) ,通常情況下其性能較好;priorityqueue_or 使用堆排序,每次合併最小兩個 bitmap,消耗更多內存,官方建議 benchmark 後使用;如果結果是較長的連續序列,可以選擇手動按順序依次 inplace or。

另一個思路是 Croaring-high performance low-level implementation

Rolling bitmap 用 C 語言實現了 Simd,相對於 java 版,Smid 版更多地利用了向量指令 ( avx2 ) ,性能提升了 80%。

快手將 Smid 以 JNI 的方式引入,對 100W 的 Bitmap 做 Or 操作,使用 Java 版本單線程操作,計算用時 13s;用 JNI 調用 Croaring,數據進行反序列化(需要 memcpy 用時 6.5s),計算消耗 7.5s,總用時 14s(慢於 java 版本)。Croaring 暫時不支持原地 inplace 反序列化 ( ImmutableRoaringBitmap ) ,導致其實際運行效率與官方稱的 80% 提升不一致。

③ 傳輸層編碼

Broker 與 Historical 之間通過 http 協議交換數據,默認打開 gzip 壓縮 ( 也可以選擇不壓縮,即 plain json ) 。測試發現 gzip 壓縮的過程中會耗用大量 CPU,因此在萬兆的網絡下建議不壓縮。

快手 Druid 精確去重的設計和實現


④ 對上述查詢優化步驟依次性能測試:

8 個維度,10 億基數的數據,選擇列 author_id 作為其去重指標,攝入後達 150W 行;10 臺 historical 機器:

快手 Druid 精確去重的設計和實現


未做優化去重,查詢時間 50s;

增加 Segment 數量至 10,查詢時間為 7s,提升約 7 倍性能;

將 Or 的策略設置為 BatchOr,查詢時間為 4s;

關閉 gzip 壓縮,查詢時間為 2s。

以上是對 10 億基數數據作去重的查詢時間;如果數據基數在 1 億以下,查詢時間為毫秒級。

★ Druid 其他方面做的改進

1. 資源隔離部署方案

快手 Druid 精確去重的設計和實現


該方案為 Druid 官方推薦部署方案,充分利用 Druid 的分層特質性,根據業務,根據數據冷熱,分到不同 proxy 上,保證各個業務相互不受影響。

2. 物化視圖

物化視圖是包括一個查詢結果的數據庫對象,可以將其理解為遠程數據的的本地副本,或者用來生成基於數據表求和的彙總表,這些副本是隻讀的。

如果想修改本地副本,必須用高級複製的功能;如果想從一個表或視圖中抽取數據時,可以用從物化視圖中抽取。物化視圖可以通過數據庫的內部機制可以定期更新,將一些大的耗時的表連接用物化視圖實現,會提高查詢的效率。

a. 維度物化

Druid 社區 0.13 開始具備物化視圖功能,社區實現了維度的物化。應用場景如下:原始數據有很高維度,而實際查詢使用到的維度是原始維度的子集;因此不妨對原始維度做小的聚合 ( 類似 Kylin 中的 cube 和 cuboid ) ;對經常查詢的部分維度做物化。

快手 Druid 精確去重的設計和實現


b. 時序物化

除了維度上的物化,還包括時序上的物化,例如將原始數據按照小時聚合、按照分鐘聚合。

快手 Druid 精確去重的設計和實現


c. 物化效果

快手 Druid 精確去重的設計和實現


這裡,主要關心物化膨脹率和物化命中率,兩項指標會影響最終的查詢效率。物化膨脹率,表示物化後的數據存儲耗用資源情況;物化命中率,可以描述物化後與實際應用場景的匹配度。表格中,ds4 和 ds5 都做到了物化膨脹率很低,物化命中率很高,因此,查詢效率可獲得 7~9 倍的提升。

3. Historical 快速重啟

快手數據平臺硬件資源,大約是 12 塊容量 2T 的 SATA 硬盤,數據量平均為 10W segments,數據佔用 10T 空間,重啟一次大約 40min。

Druid 中,默認重啟的時候會加載全部數據;考慮到超過 10T 的元信息在啟動時並不需要,可以將加載數據推遲到查詢時候。因此,使用 Guava Suppliers.memoize 命令,延遲數據加載信息,只有在查詢的時候才作列的加載。此過程通過一個參數 ( LazyLoad ) 控制。

( 此代碼已提交 Druid 社區:druid.segmentCache.lazyLoadOnStart ( pr: 6988 ) )

優化後,重啟過程只需要 2min,提升 20 倍。

4. Kafka Index 方面的改進

a. TaskCount 自動伸縮

Kafka index 任務數即 TaskCount 為固定,需要按照峰值任務數設定,這樣導致在非高峰時刻會存在資源浪費。

這裡實施的一個優化策略是:

  • KafkaSupervisor 增加 DynamicTaskCountNotice
  • 基於 task 的 cpu use & kafka lag 閾值,進行 25% 增減 task count
  • 無需重啟 supervisor


快手 Druid 精確去重的設計和實現


b. 精細化調度

Middle Manager 的 indexing task 資源分配從 slot 改成按照內存大小分配 ( 類似於 MapReduce 從 1.0 到 2.0 的改進 ) ,具體優化方法如下:

  • 區分 Kafka indexing task 和 Hadoop indexing task ( 一般 Hadoop indexing task 不需要佔用內存 )
  • 允許在提交 task 時指定 task 內存大小。


快手 Druid 精確去重的設計和實現


使用這種方式優化,實時數據任務處理可節省超過 65% 的內存佔用,離線數據任務處理可節省超過 87% 的內存佔用。

5. 元數據交互

元數據 ( Metadata ),又稱中介數據、中繼數據,主要是描述數據屬性 ( property ) 的信息,用來支持如指示存儲位置、歷史數據、資源查找、文件記錄等功能,是關於數據的組織、數據域及其關係的信息,可以看作是一種電子式目錄。簡言之,元數據就是關於數據的數據。

a. Overlord 與 MySQL 交互優化

Overlord 節點負責接收任務,協調和分配任務,為任務創建鎖,並返回任務狀態給任務發送方,Overlord 有兩種運行模式:本地模式或者遠程模式 ( 默認本地模式 ) 。

Overlord 控制檯可以查看等待的任務、運行的任務、可用的 worker,最近創建和結束的 worker 等。

Segment 是 Druid 中最基本的數據存儲單元,採用列式的方式存儲某一個時間間隔 ( interval ) 內某一個數據源 ( dataSource ) 的部分數據所對應的所有維度值、度量值、時間維度以及索引。

Segment 的邏輯名稱結構為:

表示數據源 ( 或表 ) ; 和 分別表示時間段的起止時間; 表示版本號,用於區分多次加載同一數據對應的 Segment; 表示分區編號 ( 在每個 interval 內,可能會有多個 partition )

Segments 在 HDFS 上的物理存儲路徑下包括兩個文件:descriptor.json 和 index.zip 。前者記錄的是 Segment 的描述文件(樣例見下表),其內容也保存在 Druid 集群的元數據的 druid_segments 表中;index.zip 則是數據文件。

複製代碼

 
描述文件 descriptor.json 樣例:
{
"dataSource": "AD_active_user",
"interval": "2018-04-01T00:00:00.000+08:00/2018-04-02T00:00:00.000+08:00",
"version": "2018-04-01T00:04:07.022+08:00",
"loadSpec": {
"type": "hdfs",
"path": "/druid/segments/AD_active_user/20180401T000000.000+0800_20180402T000000.000+0800/2018-04-01T00_04_07.022+08_00/1/index.zip"
},
"dimensions": "appkey,spreadid,pkgid",
"metrics": "myMetrics,count,offsetHyperLogLog",
"shardSpec": {
"type": "numbered",
"partitionNum": 1,
"partitions": 0
},
"binaryVersion": 9,
"size": 168627,
"identifier": "AD_active_user_2018-04-01T00:00:00.000+08:00_2018-04-02T00:00:00.000+08:00_2018-04-01T00:04:07.022+08:00_1"
}
 

將 druid_segments 增加索引 ( dataSource,used,end ) ,查詢時間從 10s 優化到 1s 。

b. Coordinator 與 MySQL 交互優化

Druid 的 Coordinator 節點主要負責 Segment 的管理和分發。具體的就是,Coordinator 會基於配置與歷史節點通信加載或丟棄 Segment 。Druid Coordinator 負責加載新的 Segment,丟棄過期的 Segment,管理 Segment 的副本以及 Segment 的負載均衡。

Coordinator 會根據配置參數裡的設置的事件定期的執行,每次執行具體的操作之前都會估算集群的狀態。與 broker 節點和 historycal 節點類似的,Coordinator 節點也會和 Zookeeper 集群保持連接用於交互集群信息。同時,Coordinator 也會和保存著 Segment 和 rule 信息的數據源保持通信。可用的 Segment 會被存儲在 Segment 的表中,並且列出了所有應該被集群加載的 Segment 。Rule 保存在 rule 的表中,指出了應該如何處理 Segment 。

① Segment 發現

將 Coordinator 中默認的全量掃描 druid_segments 表改成增加讀取,druid_segments 添加索引 ( used,created_date ) ,查詢時間從 1.7min 優化到 30ms 。

② 整個協調週期

  • Segment 之前使用 TreeSet 按時間排序確保優先 load 最近的 segment 現在優化成 ConcurrentHashSet,並分成最近一天和一天前的兩個 set,先操作最近一天的;
  • Apply rules 的時候對 LoadRule 先判斷集群副本和配置的是否一致,是的話就跳過,提升併發能力;
  • Cleanup 和 overshadow 兩個操作合併;
  • 查詢時間從 3min 優化到 30s 。

★ 快手 Druid Roadmap

1. 在線化

今年會實現多集群、高可用,著重保證在線業務的 SLA,同時線上業務也要著重權限的管理。

2. 易用性

實現對 SQL 的支持,同時快手的 BI 產品也會做相應的升級和優化。

3. 性能及優化 ( Druid 內核方面,與社區合作 )

  • 自適應的物化視圖,智能化
  • 數值型索引
  • 向量化引擎

最後附上我們提交社區的代碼:

https://github.com/apache/incubator-druid/pull/7594( 精確去重功能 )

https://github.com/apache/incubator-druid/pull/6988 ( historical 快速重啟 )

作者介紹:

鄧鈁元,快手大數據架構團隊研發工程師,畢業於浙江大學,曾就職於百度、貝殼,目前負責快手 Druid 平臺研發工作,多年底層集群以及 OLAP 引擎研發、分佈式系統的優化經驗,熱衷開源,為 hadoop / kylin / druid 等社區貢獻代碼。

本文來自 DataFun 社區

原文鏈接

https://mp.weixin.qq.com/s/jDW1sordtki-O5-tsVE94g


分享到:


相關文章: