趣頭條基於 Flink+ClickHouse 構建實時數據分析平臺

摘要:本文由趣頭條數據平臺負責人王金海分享,主要介紹趣頭條 Flink-to-Hive 小時級場景和 Flink-to-ClickHouse 秒級場景,內容分為以下四部分:

  • 一、業務場景與現狀分析

  • 二、Flink-to-Hive 小時級場景

  • 三、Flink-to-ClickHouse 秒級場景

  • 四、未來發展與思考

Tips:點擊「閱讀原文」可下載作者分享 PPT~

一、業務場景與現狀分析

趣頭條查詢的頁面分為離線查詢頁面和實時查詢頁面。趣頭條今年所實現的改造是在實時查詢中接入了 ClickHouse 計算引擎。根據不同的業務場景,實時數據報表中會展現數據指標曲線圖和詳細的數據指標表。目前數據指標的採集和計算為每五分鐘一個時間窗口,當然也存在三分鐘或一分鐘的特殊情況。數據指標數據全部從 Kafka 實時數據中導出,並導入 ClickHouse 進行計算。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

二、Flink-to-Hive 小時級場景

1.小時級實現架構圖

如下圖所示,Database 中的 Binlog 導出到 Kafka,同時 Log Server 數據也會上報到 Kafka。所有數據實時落地到 Kafka 之後,通過 Flink 抽取到 HDFS。下圖中 HDFS 到 Hive 之間為虛線,即 Flink 並非直接落地到 Hive,Flink 落地到 HDFS 後,再落地到 Hive 的時間可能是小時級、半小時級甚至分鐘級,需要知道數據的 Event time 已經到何時,再觸發 alter table,add partition,add location 等,寫入其分區。

這時需要有一個程序監控當前 Flink 任務的數據時間已經消費到什麼時候,如9點的數據,落地時需要查看 Kafka 中消費的數據是否已經到達9點,然後在 Hive 中觸發分區寫入。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

2.實現原理

趣頭條主要使用了 Flink 高階版本的一個特性——StreamingFileSink。StreamingFileSink 主要有幾點功能。

  • 第一, forBulkFormat 支持 avro、parquet 格式,即列式存儲格式。

  • 第二, withBucketAssigner 自定義按數據時間分桶,此處會定義一個EventtimeBucket,既按數據時間進行數據落地到離線中。

  • 第三, OnCheckPointRollingPolicy,根據 CheckPoint 時間進行數據落地,在一定的 CheckPoint 時間內數據落地並回穩。按照 CheckPoint 落地還有其它策略,如按照數據大小。

  • 第四, StreamingFileSink 是 Exactly-Once 語義實現。

Flink 中有兩個 Exactly-Once 語義實現,第一個是 Kafka,第二個是 StreamingFileSink。下圖為 OnCheckPointRollingPolicy 設計的每10分鐘落地一次到HDFS文件中的 demo。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

■ 如何實現 Exactly-Once

下圖左側為一個簡單的二 PC 模型。Coordinator 發送一個 prepare,執行者開始觸發 ack 動作,Coordinator 收到 ack 所有消息後,所有 ack 開始觸發 commit,所有執行者進行落地,將其轉化到 Flink 的模型中,Source 收到 checkpoint barrier 流時,開始觸發一個 snapshot。

每個算子的 CheckPoint、snapshot 都完成之後,CheckPoint 會給 Job Manager 發送 notifyCheckpointComplete。下圖中二階段模型和 Flink 模型左側三條線部分是一致的。因此用 Flink 可以實現二階段提交協議。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

■ 如何使用 Flink 實現二階段提交協議

首先,StreamingFileSink 實現兩個接口,CheckpointedFunction 和CheckpointListener。CheckpointedFunction 實現 initializeState 和 snapshotState 函數。CheckpointListener 是 notifyCheckpointComplete 的方法實現,因此這兩個接口可以實現二階段提交語義。

  • initializeState

initializeState 在任務啟動時會觸發三個動作。第一個是 commitPendingFile。實時數據落地到 Hdfs 上有三個狀態。第一個狀態是 in-progress ,正在進行狀態。第二個狀態是 pending 狀態,第三個狀態是 finished 狀態。

initializeState 在任務啟動時還會觸發 restoreInProgressFile,算子實時寫入。如果 CheckPoint 還未成功時程序出現問題,再次啟動時 initializeState 會 commit PendingFile,然後採用 Hadoop 2.7+ 版本的 truncate 方式重置或截斷 in-progress 文件。

  • invoke

實時寫入數據。

  • snapshotState

觸發 CheckPoint 時會將 in-progress 文件轉化為 pending state,同時記錄數據長度(truncate 方式需要截斷長度)。snapshotState 並非真正將數據寫入 HDFS,而是寫入 ListState。Flink 在 Barrier 對齊狀態時內部實現 Exactly-Once 語義,但是實現外部端到端的 Exactly-Once 語義比較困難。Flink 內部實現 Exactly-Once 通過 ListState,將數據全部存入 ListState,等待所有算子 CheckPoint 完成,再將 ListState 中的數據刷到 HDFS 中。

  • notifyCheckpointComplete

notifyCheckpointComplete 會觸發 pending 到 finished state 的數據寫入。實現方法是 rename,Streaming 不斷向 HDFS 寫入臨時文件,所有動作結束後通過 rename 動作寫成正式文件。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

3.跨集群多 nameservices

趣頭條的實時集群和離線集群是獨立的,離線集群有多套,實時集群目前有一套。通過實時集群寫入離線集群,會產生 HDFS nameservices 問題。在實時集群中將所有離線集群的 nameservices 用 namenode HA 的方式全部打入實時集群並不合適。那麼如何在任務中通過實時集群提交到各個離線集群?

如下圖所示,在 Flink 任務的 resource 下面,在 HDFS 的 xml 中間加入 <final>。在 PropertyHong Kong 中添加 nameservices,如 stream 是實時集群的 namenode HA 配置,data 是即將寫入的離線集群的 namenode HA 配置。那麼兩個集群中間的 HDFS set 不需要相互修改,直接可以在客戶端實現。/<final>

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

4.多用戶寫入權限

實時要寫入離線 HDFS,可能會涉及用戶權限問題。實時提交的用戶已經定義好該用戶在所有程序中都是同一個用戶,但離線中是多用戶的,因此會造成實時和離線用戶不對等。趣頭條在 API 中添加了 withBucketUser 寫 HDFS。配置好 nameservices後,接下來只需要知道該 HDFS 路徑通過哪個用戶來寫,比如配置一個 stream 用戶寫入。

API 層級的好處是一個 Flink 程序可以指定多個不同的 HDFS 和不同的用戶。多用戶寫入的實現是在 Hadoop file system 中加一個 ugi.do as ,代理用戶。以上為趣頭條使用 Flink 方式進行實時數據同步到 Hive 的一些工作。其中可能會出現小文件問題,小文件是後臺程序進行定期 merge,如果 CheckPoint 間隔時間較短,如3分鐘一次,會出現大量小文件問題。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

三、Flink-to-ClickHouse 秒級場景

1.秒級實現架構圖

趣頭條目前有很多實時指標,平均每五分鐘或三分鐘計算一次,如果每一個實時指標用一個 Flink 任務,或者一個 Flink SQL 來寫,比如消費一個 Kafka Topic,需要計算其日活、新增、流程等等當用戶提出一個新需求時,需要改當前的 Flink 任務或者啟動一個新的 Flink 任務消費 Topic。

因此會出現 Flink 任務不斷修改或者不斷起新的 Flink 任務的問題。趣頭條嘗試在 Flink 後接入 ClickHouse,實現整體的 OLAP。下圖為秒級實現架構圖。從 Kafka 到 Flink,到 Hive,到 ClickHouse 集群,對接外部 Horizon(實時報表),QE(實時 adhoc 查詢),千尋(數據分析),用戶畫像(實時圈人)。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

2.Why Flink+ClickHouse

  • 指標實現 sql 化描述:分析師提出的指標基本都以 SQL 進行描述。

  • 指標的上下線互不影響:一個 Flink 任務消費 Topic,如果還需要其它指標,可以保證指標的上下線互不影響。

  • 數據可回溯,方便異常排查:當日活下降,需要回溯排查是哪些指標口徑的邏輯問題,比如是報的數據差異或是數據流 Kafka 掉了,或者是因為用戶沒有上報某個指標導致日活下降,而 Flink 則無法進行回溯。

  • 計算快,一個週期內完成所有指標計算:需要在五分鐘內將成百上千的所有維度的指標全部計算完成。

  • 支持實時流,分佈式部署,運維簡單:支持 Kafka 數據實時流。

目前趣頭條 Flink 集群有 100+ 臺 32 核 128 G 3.5T SSD,日數據量 2000+ 億,日查詢量 21w+ 次,80% 查詢在 1s 內完成。下圖為單表測試結果。ClickHouse 單表測試速度快。但受制於架構,ClickHouse 的 Join 較弱。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

下圖是處理相對較為複雜的 SQL,count+group by+order by,ClickHouse 在 3.6s內完成 26 億數據計算。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

3.Why ClickHouse so Fast

ClickHouse 採用列式存儲 +LZ4、ZSTD 數據壓縮。其次,計算存儲結合本地化+向量化執行。Presto 數據可能存儲在 Hadoop 集群或者 HDFS 中,實時拉取數據進行計算。而 ClickHouse 計算存儲本地化是指每一臺計算機器存在本地 SSD 盤,只需要計算自己的數據,再進行節點合併。同時,LSM merge tree+Index。將數據寫入 ClickHouse 之後,會在後臺開始一個線程將數據進行 merge,做 Index 索引。如建常見的 DT 索引和小時級數據索引,以提高查詢性能。第四,SIMD+LLVM 優化。SIMD 是單指令多數據集。第五,SQL 語法及 UDF 完善。ClickHouse 對此有很大需求。在數據分析或者維度下拽時需要更高的特性,如時間窗口的一部分功能點。

  • Merge Tree:如下圖所示。第一層為實時數據寫入。後臺進行每一層級數據的merge。merge 時會進行數據排序,做 Index 索引。

  • ClickHouse Connector:ClickHouse 有兩個概念,Local table 和Distributed table。一般是寫 Local table ,讀 Distributed table。ClickHouse 一般以 5~10w一個批次進行數據寫入,5s一個週期。趣頭條還實現了 RoundRobinClickHouseDataSource。

  • BalancedClickHouseDataSource :MySQL 中配置一個 IP 和端口號就可以寫入數據,而 BalancedClickHouseDataSource 需要寫 Local 表,因此必須要知道該集群有多少個 Local 表,每一個 Local 表的 IP 和端口號。如有一百臺機器,需要將一百臺機器的 IP 和端口號全部配置好,再進行寫入。BalancedClickHouseDataSource 有兩個 schedule。scheduleActualization和 scheduleConnectionsCleaning 。配置一百臺機器的 IP 和端口號,會出現某些機器不連接或者服務不響應問題,scheduleActualization 會定期發現機器無法連接的問題,觸發下線或刪除 IP 等動作。scheduleConnectionsCleaning 會定期清理 ClickHouse 中無用的 http 請求。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台
  • RoundRobinClickHouseDataSource:趣頭條對BalancedClickHouseDataSource 進行加強的結果,實現了三個語義。testOnBorrow 設置為 true,嘗試 ping 看能否獲取連接。用 ClickHouse 寫入時是一個 batch,再將 testOnReturn 設置為 false,testWhileIdel 設置為true,填入官方 scheduleActualization 和 scheduleConnectionsCleaning 的功能。ClickHouse 後臺不斷進行 merge,如果 insert 過快使後臺 merge 速度變慢,跟不上 insert,出現報錯。因此需要儘量不斷往下寫,等寫完當前機器,再寫下一個機器,以5s間隔進行寫入,使 merge 速度能夠儘量與 insert 速度保持一致。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

4.Backfill

Flink 導入 ClickHouse,在數據查詢或展示報表時,會遇到一些問題,比如 Flink 任務出現故障、報錯或數據反壓等,或 ClickHouse 集群出現不可響應,zk 跟不上,insert 過快或集群負載等問題,這會導致整個任務出現問題。

如果流數據量突然暴增,啟動 Flink 可能出現一段時間內不斷追數據的情況,需要進行調整並行度等操作幫助 Flink 追數據。但這時已經出現數據積壓,若還要加大 Flink 併發度處理數據,ClickHouse 限制 insert 不能過快,否則會導致惡性循環。因此當 Flink 故障或 ClickHouse 集群故障時,等待 ClickHouse 集群恢復後,Flink 任務從最新數據開始消費,不再追過去一段時間的數據,通過 Hive 將數據導入到 ClickHouse。

由於之前已經通過 Kafka 將數據實時落地到 Hive,通過 Hive 將數據寫入 ClickHouse 中。ClickHouse 有分區,只需要將上一個小時的數據刪除,導入 Hive 的一小時數據,就可以繼續進行數據查詢操作。Backfill 提供了 Flink 任務小時級容錯以及 ClickHouse 集群小時級容錯機制。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台

未來發展與思考

1.Connector SQL 化

目前, Flink-to-Hive 以及 Flink-to-ClickHouse 都是趣頭條較為固化的場景,只需指定 HDFS 路徑以及用戶,其餘過程都可以通過 SQL 化描述。

2.Delta lake

Flink 是流批一體計算引擎,但是沒有流批一體的存儲。趣頭條會用 HBase、Kudu、Redis 等能夠與 Flink 實時交互的 KV 存儲進行數據計算。如計算新增問題,目前趣頭條的方案是需要將 Hive 歷史用戶刷到 Redis 或 HBase 中,與 Flink 進行實時交互判斷用戶是否新增。

但因為 Hive 中的數據和 Redis 中的數據是存儲為兩份數據。其次 Binlog 抽取數據會涉及 delete 動作,Hbase,Kudu 支持數據修改,定期回到 Hive 中。帶來的問題是 HBase,Kudu 中存在數據,Hive 又保存了一份數據,多出一份或多份數據。如果有流批一體的存儲支持上述場景,當 Flink 任務過來,可以與離線數據進行實時交互,包括實時查詢 Hive 數據等,可以實時判斷用戶是否新增,對數據進行實時修改、更新或 delete,也能支持 Hive 的批的動作存儲。

未來,趣頭條考慮對 Flink 做流批的存儲,使 Flink 生態統一為流批結合。

王金海,10 年互聯網歷練,先後在唯品會負責用戶畫像系統,提供人群的個性化營銷服務;餓了麼擔任架構師,負責大數據任務調度、元數據開發、任務畫像等工作;現為趣頭條數據中心平臺負責人,負責大數據基礎計算層(spark、presto、flink、clickhouse)、平臺服務層(libra 實時計算、kepler 離線調度)、數據產品層(qe即時查詢、horizon 數據報表、metadata 元數據、數據權限等)、以及團隊建設。

Flink 活動推薦

全球首次!阿里雲把 Apache 頂級開源項目會議搬到線上

普惠全球開發者,首個 Apache 頂級項目盛會直播上線。 Flink Forward 在線會議來啦!

聚焦Alibaba、Google、AWS、Uber、Netflix、新浪微博等海內外一線廠商,經典 Flink 應用場景,最新功能、未來規劃一覽無餘。

趣头条基于 Flink+ClickHouse 构建实时数据分析平台


分享到:


相關文章: