如今大數據在各行業的應用越來越廣泛:運營基於數據關注運營效果,產品基於數據分析關注轉化率情況,開發基於數據衡量系統優化效果等。
美圖公司有美拍、美圖秀秀、美顏相機等十幾個 app,每個 app 都會基於數據做個性化推薦、搜索、報表分析、反作弊、廣告等,整體對數據的業務需求比較多、應用也比較廣泛。
因此美圖數據技術團隊的業務背景主要體現在:業務線多以及應用比較廣泛。這也是促使我們搭建數據平臺的一個最主要的原因,由業務驅動。
美圖數據平臺整體架構
如圖所示是我們數據平臺的整體架構。在數據收集這部分,我們構建一套採集服務端日誌系統 Arachnia,支持各 app 集成的客戶端 SDK,負責收集 app 客戶端數據;同時也有基於 DataX 實現的數據集成(導入導出);Mor 爬蟲平臺支持可配置的爬取公網數據的任務開發。
數據存儲層主要是根據業務特點來選擇不同的存儲方案,目前主要有用到 HDFS、MongoDB、Hbase、ES 等。在數據計算部分,當前離線計算主要還是基於 Hive&MR、實時流計算是 Storm 、 Flink 以及還有另外一個自研的 bitmap 系統 Naix。
在數據開發這塊我們構建了一套數據工坊、數據總線分發、任務調度等平臺。數據可視化與應用部分主要是基於用戶需求構建一系列數據應用平臺,包括:A/B 實驗平臺、渠道推廣跟蹤平臺、數據可視化平臺、用戶畫像等等。
右側展示的是一些各組件都可能依賴的基礎服務,包括地理位置、元數據管理、唯一設備標識等。
如下圖所示是基本的數據架構流圖,典型的 lamda 架構,從左端數據源收集開始,Arachnia、AppSDK 分別將服務端、客戶端數據上報到代理服務 collector,通過解析數據協議,把數據寫到 kafka,然後實時流會經過一層數據分發,最終業務消費 kafka 數據進行實時計算。
離線會由 ETL 服務負責從 Kafka dump 數據到 HDFS,然後異構數據源(比如 MySQL、Hbase 等)主要基於 DataX 以及 Sqoop 進行數據的導入導出,最終通過 hive、kylin、spark 等計算把數據寫入到各類的存儲層,最後通過統一的對外 API 對接業務系統以及我們自己的可視化平臺等。
數據平臺的階段性發展
企業級數據平臺建設主要分三個階段:
- 剛開始是基本使用免費的第三方平臺,這個階段的特點是能快速集成並看到 app 的一些統計指標,但是缺點也很明顯,沒有原始數據除了那些第三方提供的基本指標其他分析、推薦等都無法實現。所以有從 0 到 1 的過程,讓我們自己有數據可以用;
- 在有數據可用後,因為業務線、需求量的爆發,需要提高開發效率,讓更多的人參與數據開發、使用到數據,而不僅僅侷限於數據研發人員使用,所以就涉及到把數據、計算存儲能力開放給各個業務線,而不是握在自己手上;
- 在當數據開放了以後,業務方會要求數據任務能否跑得更快,能否秒出,能否更實時;另外一方面,為了滿足業務需求集群的規模越來越大,因此會開始考慮滿足業務的同時,如何實現更節省資源。
美圖現在是處於第二與第三階段的過渡期,在不斷完善數據開放的同時,也逐步提升查詢分析效率,以及開始考慮如何進行優化成本。接下來會重點介紹 0 到 1 以及數據開放這兩個階段我們平臺的實踐以及優化思路。
從 0 到 1
從 0 到 1 解決從數據採集到最終可以使用數據。如圖 4 所示是數據收集的演進過程,從剛開始使用類似 umeng、flurry 這類的免費第三方平臺,到後面快速使用 rsync 同步日誌到一臺服務器上存儲、計算,再到後面快速開發了一個簡單的python腳本支持業務服務器上報日誌,最終我們開發了服務端日誌採集系統 Arachnia 以及客戶端 AppSDK。
數據採集是數據的源頭,在整個數據鏈路中是相對重要的環節,需要更多關注:數據是否完整、數據是否支持實時上報、數據埋點是否規範準確、以及維護管理成本。因此我們的日誌採集系統需要滿足以下需求:
- 能集成管理維護,包括 Agent 能自動化部署安裝升級卸載、配置熱更、延遲方面的監控;
- 在可靠性方面至少需要保證 at least once;
- 美圖現在有多 IDC 的情況,需要能支持多個 IDC 數據採集彙總到數據中心;
- 在資源消耗方面儘量小,儘量做到不影響業務。
基於以上需求我們沒有使用 flume、scribe、fluentd,最終選擇自己開發一套採集系統 Arachnia。
上圖是 Arachnia 的簡易架構圖,它通過系統大腦進行集中式管理。puppet 模塊主要作為單個 IDC 內統一彙總 Agent 的 metrics,中轉轉發的 metrics 或者配置熱更命令。採集器 Agent 主要是運維平臺負責安裝、啟動後從 brain 拉取到配置,並開始採集上報數據到 collector。
接著看 Arachnia 的實踐優化,首先是 at least once 的可靠性保證。不少的系統都是採用把上報失敗的數據通過 WAL 的方式記錄下來,重試再上報,以免上報失敗丟失。我們的實踐是去掉 WAL,增加了 coordinator 來統一的分發管理 tx 狀態。
開始採集前會從 coordinator 發出 txid,source 接收到信號後開始採集,並交由 sink 發送數據,發送後會ack tx,告訴 coordinator 已經 commit。coordinator 會進行校驗確認,然後再發送 commit 的信號給 source、sink 更新狀態,最終 tx 完 source 會更新採集進度到持久層(默認是本地 file)。該方式如果在前面 3 步有問題,則數據沒有發送成功,不會重複執行;如果後面 4 個步驟失敗,則數據會重複,該 tx 會被重放。
基於上文的 at least once 可靠性保證,有些業務方是需要唯一性的,我們這邊支持為每條日誌生成唯一 ID 標識。另外一個數據採集系統的主要實踐是:唯一定位一個文件以及給每條日誌做唯一的 MsgID,方便業務方可以基於 MsgID 在發生日誌重複時能在後面做清洗。
我們一開始是使用 filename,後面發現 filename 很多業務方都會變更,所以改為 inode,但是 inode linux 會回收重複利用,最後是以 inode & 文件頭部內容做 hash 來作為fileID。而 MsgID 是通過 agentID & fileID & offset 來唯一確認。
數據上報之後由 collector 負責解析協議推送數據到 Kafka,那麼 Kafka 如何落地到 HDFS 呢? 首先看美圖的訴求:
- 支持分佈式處理;
- 涉及到較多業務線因此有多種數據格式,所以需要支持多種數據格式的序列化,包括 json、avro、特殊分隔符等;
- 支持因為機器故障、服務問題等導致的數據落地失敗重跑,而且需要能有比較快速的重跑能力,因為一旦這塊故障,會影響到後續各個業務線的數據使用;
- 支持可配置的 HDFS 分區策略,能支持各個業務線相對靈活的、不一樣的分區配置;
- 支持一些特殊的業務邏輯處理,包括:數據校驗、過期過濾、測試數據過濾、注入等;
基於上述訴求痛點,美圖從 Kafka 落地到 HDFS 的數據服務實現方式如圖 7 所示。
基於 Kafka 和 MR 的特點,針對每個 kafka topic 的 partition,組裝 mapper 的 inputsplit,然後起一個 mapper 進程處理消費這個批次的 kafka 數據,經過數據解析、業務邏輯處理、校驗過濾、最終根據分區規則落地寫到目標 HDFS 文件。
落地成功後會把這次處理的 meta 信息(包括 topic、partition、開始的 offset、結束的offset)存儲到 MySQL。下次再處理的時候,會從上次處理的結束的 offset 開始讀取消息,開始新一批的數據消費落地。
實現了基本功能後難免會遇到一些問題,比如不同的業務 topic 的數據量級是不一樣的,這樣會導致一次任務需要等待 partition 數據量最多以及處理時間最長的 mapper 結束,才能結束整個任務。那我們怎麼解決這個問題呢?系統設計中有個不成文原則是:分久必合、合久必分,針對數據傾斜的問題我們採用了類似的思路。
首先對數據量級較小的 partition 合併到一個 inputsplit,達到一個 mapper 可以處理多個業務的 partition 數據,最終落地寫多份文件。
另外對數據量級較大的 partition 支持分段拆分,平分到多個 mapper 處理同一個 partition,這樣就實現了更均衡的 mapper 處理,能更好地應對業務量級的突增。
除了數據傾斜的問題,還出現各種原因導致數據 dump 到 HDFS 失敗的情況,比如因為 kafka 磁盤問題、hadoop 集群節點宕機、網絡故障、外部訪問權限等導致該 ETL 程序出現異常,最終可能導致因為未 close HDFS 文件導致文件損壞等,需要重跑數據。那我們的數據時間分區基本都是以天為單位,用原來的方式可能會導致一個天粒度的文件損壞,解析無法讀取。
我們採用了分兩階段處理的方式:mapper 1 先把數據寫到一個臨時目錄,mapper 2 把 Hdfs 的臨時目錄的數據 append 到目標文件。這樣當 mapper1 失敗的時候可以直接重跑這個批次,而不用重跑整天的數據;當 mapper2 失敗的時候能直接從臨時目錄 merge 數據替換最終文件,減少了重新 ETL 天粒度的過程。
在數據的實時分發訂閱寫入到 kafka1 的數據基本是每個業務的全量數據,但是針對需求方大部分業務都只關注某個事件、某小類別的數據,而不是任何業務都消費全量數據做處理,所以我們增加了一個實時分發 Databus 來解決這個問題。
Databus 支持業務方自定義分發 rules 往下游的 kafka 集群寫數據,方便業務方訂閱處理自己想要的數據,並且支持更小粒度的數據重複利用。
上圖可以看出 Databus 的實現方式,它的主體基於 Storm 實現了 databus topology。Databus 有兩個 spout,一個支持拉取全量以及新增的 rules,然後更新到下游的分發 bolt 更新緩存規則,另外一個是從 kafka 消費的 spout。而 distributionbolt 主要是負責解析數據、規則 match,以及把數據往下游的 kafka 集群發送。
數據開放
有了原始數據並且能做離線、實時的數據開發以後,隨之而來的是數據開發需求的井噴,數據研發團隊應接不暇。所以我們通過數據平臺的方式開放數據計算、存儲能力,賦予業務方有數據開發的能力。
對實現元數據管理、任務調度、數據集成、DAG 任務編排、可視化等不一一贅述,主要介紹數據開放後,美圖對穩定性方面的實踐心得。
數據開放和系統穩定性是相愛相殺的關係:一方面,開放了之後不再是有數據基礎的研發人員來做,經常會遇到提交非法、高資源消耗等問題的數據任務,給底層的計算、存儲集群的穩定性造成了比較大的困擾;另外一方面,其實也是因為數據開放,才不斷推進我們必須提高系統穩定性。
針對不少的高資源、非法的任務,我們首先考慮能否
在 HiveSQL 層面能做一些校驗、限制。如圖 13 所示是 HiveSQL 的整個解析編譯為可執行的 MR 的過程:首先基於 Antlr 做語法的解析,生成 AST,接著做語義解析,基於AST 會生成 JAVA 對象 QueryBlock。基於 QueryBlock 生成邏輯計劃後做邏輯優化,最後生成物理計劃,進行物理優化後,最終轉換為一個可執行的 MR 任務。
我們主要在語義解析階段生成 QueryBlock 後,拿到它做了不少的語句校驗,包括:非法操作、查詢條件限制、高資源消耗校驗判斷等。
第二個在穩定性方面的實踐,主要是對集群的優化,包括:
- 我們完整地對 Hive、Hadoop 集群做了一次升級。主要是因為之前在低版本有 fix 一些問題以及合併一些社區的 patch,在後面新版本都有修復;另外一個原因是新版本的特性以及性能方面的優化。我們把 Hive 從 0.13 版本升級到 2.1 版本,Hadoop 從 2.4 升級到 2.7;
- 對 Hive 做了 HA 的部署優化。我們把 HiveServer 和 MetaStoreServer 拆分開來分別部署了多個節點,避免合併在一個服務部署運行相互影響;
- 之前執行引擎基本都是 On MapReduce 的,我們也在做 Hive On Spark 的遷移,逐步把線上任務從 Hive On MR 切換到 Hive On Spark;
- 拉一個內部分支對平時遇到的一些問題做 bugfix 或合併社區 patch 的特性;
在平臺穩定性方面的實踐最後一部分是提高權限、安全性,防止對集群、數據的非法訪問、攻擊等。提高權限主要分兩塊:API 訪問與集群。
- API Server :上文提到我們有 OneDataAPI,提供給各個業務系統訪問數據的統一 API。這方面主要是額外實現了一個統一認證 CA 服務,業務系統必須接入 CA 拿到 token 後來訪問OneDataAPI,OneDataAPI 在 CA 驗證過後,合法的才允許真正訪問數據,從而防止業務系統可以任意訪問所有數據指標。
- 集群:目前主要是基於 Apache Ranger 來統一各類集群,包括 Kafka、Hbase、Hadoop 等做集群的授權管理和維護;
以上就是美圖在搭建完數據平臺並開放給各個業務線使用後,對平臺穩定性做的一些實踐和優化。
總結
- 首先在搭建數據平臺之前,一定要先瞭解業務,看業務的整體體量是否比較大、業務線是否比較廣、需求量是否多到嚴重影響我們的生產力。如果都是肯定答案,那可以考慮儘快搭建數據平臺,以更高效、快速提高數據的開發應用效率。如果本身的業務量級、需求不多,不一定非得套大數據或者搭建多麼完善的數據平臺,以快速滿足支撐業務優先。
- 在平臺建設過程中,需要重點關注數據質量、平臺的穩定性,比如關注數據源採集的完整性、時效性、設備的唯一標識,多在平臺的穩定性方面做優化和實踐,為業務方提供一個穩定可靠的平臺。
- 在提高分析決策效率以及規模逐漸擴大後需要對成本、資源做一些優化和思考。
閱讀更多 IT技術管理那些事兒 的文章