日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

隨著微博業務線的快速擴張,微博廣告各類業務日誌的數量也隨之急劇增長。傳統基於 Hadoop 生態的離線數據存儲計算方案已在業界形成統一的默契,但受制於離線計算的時效性制約,越來越多的數據應用場景已從離線轉為實時。微博廣告實時數據平臺以此為背景進行設計與構建,目前該系統已支持日均處理日誌數量超過百億,接入產品線、業務日誌類型若干。

一.技術選型

相比於 Spark,目前 Spark 的生態總體更為完善一些,且在機器學習的集成和應用性暫時領先。但作為下一代大數據引擎的有力競爭者-Flink 在流式計算上有明顯優勢,Flink 在流式計算裡屬於真正意義上的單條處理,每一條數據都觸發計算,而不是像 Spark 一樣的 Mini Batch 作為流式處理的妥協。Flink 的容錯機制較為輕量,對吞吐量影響較小,而且擁有圖和調度上的一些優化,使得 Flink 可以達到很高的吞吐量。而 Strom 的容錯機制需要對每條數據進行 ack,因此其吞吐量瓶頸也是備受詬病。

這裡引用一張圖來對常用的實時計算框架做個對比。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

Flink 特點

Flink 是一個開源的分佈式實時計算框架。Flink 是有狀態的和容錯的,可以在維護一次應用程序狀態的同時無縫地從故障中恢復;它支持大規模計算能力,能夠在數千個節點上併發運行;它具有很好的吞吐量和延遲特性。同時,Flink 提供了多種靈活的窗口函數。

1)狀態管理機制

Flink 檢查點機制能保持 exactly-once 語義的計算。狀態保持意味著應用能夠保存已經處理的數據集結果和狀態。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

2)事件機制

Flink 支持流處理和窗口事件時間語義。事件時間可以很容易地通過事件到達的順序和事件可能的到達延遲流中計算出準確的結果。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

3)窗口機制

Flink 支持基於時間、數目以及會話的非常靈活的窗口機制(window)。可以定製 window 的觸發條件來支持更加複雜的流模式。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

4)容錯機制

Flink 高效的容錯機制允許系統在高吞吐量的情況下支持 exactly-once 語義的計算。Flink 可以準確、快速地做到從故障中以零數據丟失的效果進行恢復。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

5)高吞吐、低延遲

Flink 具有高吞吐量和低延遲(能快速處理大量數據)特性。下圖展示了 Apache Flink 和 Apache Storm 完成分佈式項目計數任務的性能對比。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

二.架構演變

初期架構

初期架構僅為計算與存儲兩層,新來的計算需求接入後需要新開發一個實時計算任務進行上線。重複模塊的代碼複用率低,重複率高,計算任務間的區別主要是集中在任務的計算指標口徑上。

在存儲層,各個需求方所需求的存儲路徑都不相同,計算指標可能在不通的存儲引擎上有重複,有計算資源以及存儲資源上的浪費情況。並且對於指標的計算口徑也是僅侷限於單個任務需求裡的,不通需求任務對於相同的指標的計算口徑沒有進行統一的限制於保障。各個業務方也是在不同的存儲引擎上開發數據獲取服務,對於那些專注於數據應用本身的團隊來說,無疑當前模式存在一些弊端。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

後期架構

隨著數據體量的增加以及業務線的擴展,前期架構模式的弊端逐步開始顯現。從當初單需求單任務的模式逐步轉變為通用的數據架構模式。為此,我們開發了一些基於 Flink 框架的通用組件來支持數據的快速接入,並保證代碼模式的統一性和維護性。在數據層,我們基於 Clickhouse 來作為我們數據倉庫的計算和存儲引擎,利用其支持多維 OLAP 計算的特性,來處理在多維多指標大數據量下的快速查詢需求。在數據分層上,我們參考與借鑑離線數倉的經驗與方法,構建多層實時數倉服務,並開發多種微服務來為數倉的數據聚合,指標提取,數據出口,數據質量,報警監控等提供支持。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

整體架構分為五層:

1)接入層:接入原始數據進行處理,如 Kafka、RabbitMQ、File 等。

2)計算層:選用 Flink 作為實時計算框架,對實時數據進行清洗,關聯等操作。

3)存儲層:對清洗完成的數據進行數據存儲,我們對此進行了實時數倉的模型分層與構建,將不同應用場景的數據分別存儲在如 Clickhouse,Hbase,Redis,Mysql 等存儲。服務中,並抽象公共數據層與維度層數據,分層處理壓縮數據並統一數據口徑。

4)服務層:對外提供統一的數據查詢服務,支持從底層明細數據到聚合層數據 5min/10min/1hour 的多維計算服務。同時最上層特徵指標類數據,如計算層輸入到Redis、Mysql 等也從此數據接口進行獲取。

5)應用層:以統一查詢服務為支撐對各個業務線數據場景進行支撐。

  • 監控報警:對 Flink 任務的存活狀態進行監控,對異常的任務進行郵件報警並根據設定的參數對任務進行自動拉起與恢復。根據如 Kafka 消費的 offset 指標對消費處理延遲的實時任務進行報警提醒。
  • 數據質量:監控實時數據指標,對歷史的實時數據與離線 hive 計算的數據定時做對比,提供實時數據的數據質量指標,對超過閾值的指標數據進行報警。

三.數據處理流程

1.整體流程

整體數據從原始數據接入後經過 ETL 處理, 進入實時數倉底層數據表,經過配置化聚合微服務組件向上進行分層數據的聚合。根據不同業務的指標需求也可通過特徵抽取微服務直接配置化從數倉中抽取到如 Redis、ES、Mysql 中進行獲取。大部分的數據需求可通過統一數據服務接口進行獲取。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

2.問題與挑戰

原始日誌數據因為各業務日誌的不同,所擁有的維度或指標數據並不完整。所以需要進行實時的日誌的關聯才能獲取不同維度條件下的指標數據查詢結果。並且關聯日誌的回傳週期不同,有在 10min 之內完成 95% 以上回傳的業務日誌,也有類似於激活日誌等依賴第三方回傳的有任務日誌,延遲窗口可能大於1天。

並且最大日誌關聯任務的日均數據量在 10 億級別以上,如何快速處理與構建實時關聯任務的問題首先擺在我們面前。對此我們基於 Flink 框架開發了配置化關聯組件。對於不同關聯日誌的指標抽取,我們也開發了配置化指標抽取組件用於快速提取複雜的日誌格式。以上兩個自研組件會在後面的內容裡再做詳細介紹。

1)回傳週期超過關聯窗口的日誌如何處理?

對於回傳晚的日誌,我們在關聯窗口內未取得關聯結果。我們採用實時+離線的方式進行數據回刷補全。實時處理的日誌我們會將未關聯的原始日誌輸出到另外一個暫存地(Kafka),同時不斷消費處理這個未關聯的日誌集合,設定超時重關聯次數與超時重關聯時間,超過所設定任意閾值後,便再進行重關聯。離線部分,我們採用 Hive 計算昨日全天日誌與 N 天內的全量被關聯日誌表進行關聯,將最終的結果回寫進去,替換實時所計算的昨日關聯數據。

2)如何提高 Flink 任務性能?

① Operator Chain

為了更高效地分佈式執行,Flink 會盡可能地將 operator 的 subtask 鏈接(chain)在一起形成 task。每個 task 在一個線程中執行。將 operators 鏈接成 task 是非常有效的優化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數據在緩衝區的交換,減少了延遲的同時提高整體的吞吐量。

Flink 會在生成 JobGraph 階段,將代碼中可以優化的算子優化成一個算子鏈(Operator Chains)以放到一個 task(一個線程)中執行,以減少線程之間的切換和緩衝的開銷,提高整體的吞吐量和延遲。下面以官網中的例子進行說明。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

圖中,source、map、[keyBy|window|apply]、sink 算子的並行度分別是 2、2、2、2、1,經過 Flink 優化後,source 和 map 算子組成一個算子鏈,作為一個 task 運行在一個線程上,其簡圖如圖中 condensed view 所示,並行圖如 parallelized view 所示。算子之間是否可以組成一個 Operator Chains,看是否滿足以下條件:

  • 上下游算子的並行度一致;
  • 下游節點的入度為 1;
  • 上下游節點都在同一個 slot group 中;
  • 下游節點的 chain 策略為 ALWAYS;
  • 上游節點的 chain 策略為 ALWAYS 或 HEAD;
  • 兩個節點間數據分區方式是 forward;
  • 用戶沒有禁用 chain。

② Flink 異步 IO

流式計算中,常常需要與外部系統進行交互。而往往一次連接中你那個獲取連接等待通信的耗時會佔比較高。下圖是兩種方式對比示例:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

圖中棕色的長條表示等待時間,可以發現網絡等待時間極大地阻礙了吞吐和延遲。為了解決同步訪問的問題,異步模式可以併發地處理多個請求和回覆。也就是說,你可以連續地向數據庫發送用戶 a、b、c 等的請求,與此同時,哪個請求的回覆先返回了就處理哪個回覆,從而連續的請求之間不需要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實現原理。

③ Checkpoint 優化

Flink 實現了一套強大的 checkpoint 機制,使它在獲取高吞吐量性能的同時,也能保證 Exactly Once 級別的快速恢復。

首先提升各節點 checkpoint 的性能考慮的就是存儲引擎的執行效率。Flink 官方支持的三種 checkpoint state 存儲方案中,Memory 僅用於調試級別,無法做故障後的數據恢復。其次還有 Hdfs 與 Rocksdb,當所做 Checkpoint 的數據大小較大時,可以考慮採用 Rocksdb 來作為 checkpoint 的存儲以提升效率。

其次的思路是資源設置,我們都知道 checkpoint 機制是在每個 task 上都會進行,那麼當總的狀態數據大小不變的情況下,如何分配減少單個 task 所分的 checkpoint 數據變成了提升 checkpoint 執行效率的關鍵。

最後,增量快照. 非增量快照下,每次 checkpoint 都包含了作業所有狀態數據。而大部分場景下,前後 checkpoint 裡,數據發生變更的部分相對很少,所以設置增量 checkpoint,僅會對上次 checkpoint 和本次 checkpoint 之間狀態的差異進行存儲計算,減少了 checkpoint 的耗時。

3)如何保障任務的穩定性?

在任務執行過程中,會遇到各種各樣的問題,導致任務異常甚至失敗。所以如何做好異常情況下的恢復工作顯得異常重要。

① 設定重啟策略

Flink 支持不同的重啟策略,以在故障發生時控制作業如何重啟。集群在啟動時會伴隨一個默認的重啟策略,在沒有定義具體重啟策略時會使用該默認策略。如果在工作提交時指定了一個重啟策略,該策略會覆蓋集群的默認策略。

默認的重啟策略可以通過 Flink 的配置文件 flink-conf.yaml 指定。配置參數 restart-strategy 定義了哪個策略被使用。

常用的重啟策略:

  • 固定間隔(Fixed delay);
  • 失敗率(Failure rate);
  • 無重啟(No restart)。

② 設置 HA

Flink 在任務啟動時指定 HA 配置主要是為了利用 Zookeeper 在所有運行的 JobManager 實例之間進行分佈式協調 .Zookeeper 通過 leader 選取和輕量級一致性的狀態存儲來提供高可用的分佈式協調服務。

③ 任務監控報警平臺

在實際環境中,我們遇見過因為集群狀態不穩定而導致的任務失敗。在 Flink 1.6 版本中,甚至遇見過任務出現假死的情況,也就是 Yarn 上的 job 資源依然存在,而 Flink 任務實際已經死亡。為了監測與恢復這些異常的任務,並且對實時任務做統一的提交、報警監控、任務恢復等管理,我們開發了任務提交與管理平臺。通過 Shell 拉取 Yarn 上 Running 狀態與 Flink Job 狀態的列表進行對比,心跳監測平臺上的所有任務,並進行告警、自動恢復等操作。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

④ 作業指標監控

Flink 任務在運行過程中,各 Operator 都會產生各自的指標數據,例如,Source 會產出 numRecordIn、numRecordsOut 等各項指標信息,我們會將這些指標信息進行收集,並展示在我們的可視化平臺上。指標平臺如下圖:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

⑤ 任務運行節點監控

我們的 Flink 任務都是運行在 Yarn 上,針對每一個運行的作業,我們需要監控其運行環境。會收集 JobManager 及 TaskManager 的各項指標。收集的指標有 jobmanager-fullgc-count、jobmanager-younggc-count、jobmanager-fullgc-time、jobmanager-younggc-time、taskmanager-fullgc-count、taskmanager-younggc-count、taskmanager-fullgc-time、taskmanager-younggc-time 等,用於判斷任務運行環境的健康度,及用於排查可能出現的問題。監控界面如下:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

四.數據關聯組件

1.如何選擇關聯方式?

1)Flink Table

從 Flink 的官方文檔,我們知道 Flink 的編程模型分為四層,sql 是最高層的 api, Table api 是中間層,DataSteam/DataSet Api 是核心,stateful Streaming process 層是底層實現。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

剛開始我們直接使用 Flink Table 做為數據關聯的方式,直接將接入進來的 DataStream 註冊為 Dynamic Table 後進行兩表關聯查詢,如下圖:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

但嘗試後發現在做那些日誌數據量大的關聯查詢時往往只能在較小的時間窗口內做查詢,否則會超過 datanode 節點單臺內存限制,產生異常。但為了滿足不同業務日誌延遲到達的情況,這種實現方式並不通用。

2)Rocksdb

之後,我們直接在 DataStream 上進行處理,在 CountWindow 窗口內進行關聯操作,將被關聯的數據 Hash 打散後存儲在各個 datanode 節點的 Rocksdb 中,利用 Flink State 原生支持 Rocksdb 做 Checkpoint 這一特性進行算子內數據的備份與恢復。這種方式是可行的,但受制於 Rocksdb 集群物理磁盤為非 SSD 的因素,這種方式在我們的實際線上場景中關聯耗時較高。

3)外部存儲關聯

如 Redis 類的 KV 存儲的確在查詢速度上提升不少,但類似廣告日誌數據這樣單條日誌大小較大的情況,會佔用不少寶貴的機器內存資源。經過調研後,我們選取了 Hbase 作為我們日誌關聯組件的關聯數據存儲方案。

為了快速構建關聯任務,我們開發了基於 Flink 的配置化組件平臺,提交配置文件即可生成數據關聯任務並自動提交到集群。下圖是任務執行的處理流程。

示意圖如下:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

下圖是關聯組件內的執行流程圖:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

2.問題與優化

1)加入 Interval Join

隨著日誌量的增加,某些需要進行關聯的日誌數量可能達到日均十幾億甚至幾十億的量級。前期關聯組件的配置化生成任務的方式的確解決了大部分線上業務需求,但隨著進一步的關聯需求增加,Hbase 面臨著巨大的查詢壓力。在我們對 Hbase 表包括 rowkey 等一系列完成優化之後,我們開始了對關聯組件的迭代與優化。

第一步,減少 Hbase 的查詢。我們使用 Flink Interval Join 的方式,先將大部分關聯需求在程序內部完成,只有少部分仍需查詢的日誌會去查詢外部存儲(Hbase). 經驗證,以請求日誌與實驗日誌關聯為例,對於設置 Interval Join 窗口在 10s 左右即可減少 80% 的 hbase 查詢請求。

① Interval Join 的語義示意圖

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

  • 數據 JOIN 的區間 - 比如時間為 3 的 EXP 會在 IMP 時間為[2, 4]區間進行JOIN;
  • WaterMark - 比如圖示 EXP 一條數據時間是 3,IMP 一條數據時間是 5,那麼WaterMark是根據實際最小值減去 UpperBound 生成,即:Min(3,5)-1 = 2;
  • 過期數據 - 出於性能和存儲的考慮,要將過期數據清除,如圖當 WaterMark 是 2 的時候時間為 2 以前的數據過期了,可以被清除。

② Interval Join 內部實現邏輯

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

③ Interval Join 改造

因 Flink 原生的 Intervak Join 實現的是 Inner Join,而我們業務中所需要的是 Left Join,具體改造如下:

  • 取消右側數據流的 join 標誌位;
  • 左側數據流有 join 數據時不存 state。

2)關聯率動態監控

在任務執行中,往往會出現意想不到的情況,比如被關聯的數據日誌出現缺失,或者日誌格式錯誤引發的異常,造成關聯任務的關聯率下降嚴重。那麼此時關聯任務雖然繼續在運行,但對於整體數據質量的意義不大,甚至是反向作用。在任務進行恢復的時,還需要清除異常區間內的數據,將 Kafka Offset 設置到異常前的位置再進行處理。

故我們在關聯組件的優化中,加入了動態監控,下面示意圖:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

  • 關聯任務中定時探測指定時間範圍 Hbase 是否有最新數據寫入,如果沒有,說明寫 Hbase 任務出現問題,則終止關聯任務;
  • 當寫 Hbase 任務出現堆積時,相應的會導致關聯率下降,當關聯率低於指定閾值時終止關聯任務;
  • 當關聯任務終止時會發出告警,修復上游任務後可重新恢復關聯任務,保證關聯數據不丟失。

五.數據清洗組件

為了快速進行日誌數據的指標抽取,我們開發了基於 Flink 計算平臺的指標抽取組件Logwash。封裝了基於 Freemaker 的模板引擎做為日誌格式的解析模塊,對日誌進行提取,算術運算,條件判斷,替換,循環遍歷等操作。

下圖是 Logwash 組件的處理流程:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

組件支持文本與 Json 兩種類型日誌進行解析提取,目前該清洗組件已支持微博廣告近百個實時清洗需求,提供給運維組等第三方非實時計算方向人員快速進行提取日誌的能力。

配置文件部分示例:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

六.FlinkStream 組件庫

Flink 中 DataStream 的開發,對於通用的邏輯及相同的代碼進行了抽取,生成了我們的通用組件庫 FlinkStream。FlinkStream 包括了對 Topology 的抽象及默認實現、對 Stream 的抽象及默認實現、對 Source 的抽象和某些實現、對 Operator 的抽象及某些實現、Sink 的抽象及某些實現。任務提交統一使用可執行 Jar 和配置文件,Jar 會讀取配置文件構建對應的拓撲圖。

1.Source 抽象

對於 Source 進行抽象,創建抽象類及對應接口,對於 Flink Connector 中已有的實現,例如 kafka,Elasticsearch 等,直接創建新 class 並繼承接口,實現對應的方法即可。對於需要自己去實現的 connector,直接繼承抽象類及對應接口,實現方法即可。目前只實現了 KafkaSource。

2.Operator 抽象

與 Source 抽象類似,我們實現了基於 Stream 到 Stream 級別的 Operator 抽象。創建抽象 Operate 類,抽象 Transform 方法。對於要實現的 Transform 操作,直接繼承抽象類,實現其抽象方法即可。目前實現的 Operator,直接按照文檔使用。如下:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

3.Sink 抽象

針對 Sink,我們同樣創建了抽象類及接口。對 Flink Connector 中已有的 Sink 進行封裝。目前可通過配置進行數據輸出的 Sink。目前以實現和封裝的 Sink 組件有:Kafka、Stdout、Elasticsearch、Clickhouse、Hbase、Redis、MySQL。

4.Stream 抽象

創建 Stream 抽象類及抽象方法 buildStream,用於構建 StreamGraph。我們實現了默認的 Stream,buildStream 方法讀取 Source 配置生成 DataStream,通過 Operator 配置列表按順序生成拓撲圖,通過 Sink 配置生成數據寫出組件。

5.Topology 抽象

對於單 Stream,要處理的邏輯可能比較簡單,主要讀取一個 Source 進行數據的各種操作並輸出。對於複雜的多 Stream 業務需求,比如多流 Join,多流 Union、Split 流等,因此我們多流業務進行了抽象,產生了 Topology。在 Topology 這一層可以對多流進行配置化操作。對於通用的操作,我們實現了默認 Topology,直接通過配置文件就可以實現業務需求。對於比較複雜的業務場景,用戶可以自己實現 Topology。

6.配置化

我們對抽象的組件都是可配置化的,直接通過編寫配置文件,構造任務的運行拓撲結構,啟動任務時指定配置文件。

  • 正文文本框 Flink Environment 配置化,包括時間處理類型、重啟策略,checkpoint 等;
  • Topology 配置化,可配置不同 Stream 之間的處理邏輯與 Sink;
  • Stream 配置化,可配置 Source,Operator 列表,Sink。

配置示例如下:


run_env:
timeCharacteristic: "ProcessingTime" #ProcessingTime\\IngestionTime\\EventTime
restart: # 重啟策略配置
type: # noRestart, fixedDelayRestart, fallBackRestart, failureRateRestart
checkpoint: # 開啟checkpoint
type: "rocksdb" #
streams:
impStream: #粉絲經濟曝光日誌
type: "DefaultStream"
config:
source:
type: "Kafka011" # 源是kafka011版本
config:
parallelism: 20
operates:
-
type: "StringToMap"
config:

-
type: "SplitElement"
config:
...
-
type: "SelectElement"
config:
transforms:
-
type: "KeyBy"
config:
-
type: "CountWindowWithTimeOut" #Window需要和KeyBy組合使用
config:
-
type: "SplitStream"
config:
-
type: "SelectStream"
config:
sink:
-
type: Kafka
config:
-
type: Kafka
config:

7.部署

在實時任務管理平臺,新建任務,填寫任務名稱,選擇任務類型(Flink)及版本,上傳可執行 Jar 文件,導入配置或者手動編寫配置,填寫 JobManager 及 TaskManager 內存配置,填寫並行度配置,選擇是否重試,選擇是否從 checkpoint 恢復等選項,保存後即可在任務列表中啟動任務,並觀察啟動日誌用於排查啟動錯誤。

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

七.FlinkSQL 擴展

SQL 語言是一門聲明式的,簡單的,靈活的語言,Flink 本身提供了對 SQL 的支持。Flink 1.6 版本和 1.8 版本對 SQL 語言的支持有限,不支持建表語句,不支持對外部數據的關聯操作。因此我們通過 Apache Calcite 對 Flink SQL API 進行了擴展,用戶只需要關心業務需求怎麼用 SQL 語言來表達即可。

1.支持創建源表

擴展了支持創建源表 SQL,通過解析 SQL 語句,獲取數據源配置信息,創建對應的 TableSource 實例,並將其註冊到 Flink environment。示例如下:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

2.支持創建維表

使用 Apache Calcite 對 SQL 進行解析,通過維表關鍵字識別維表,使用 RichAsyncFunction 算子異步讀取維表數據,並通過 flatMap 操作生成關聯後的 DataStream,然後轉換為 Table 註冊到 Flink Environment。示例如下:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

3.支持創建視圖

使用 SQLQuery 方法,支持從上一層表或者視圖中創建視圖表,並將新的視圖表註冊到 Flink Environment。創建語句需要按照順序寫,比如 myView2 是從視圖 myView1 中創建的,則 myView1 創建語句要在myView2語句前面。如下:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

4.支持創建結果表

支持創建結果表,通過解析 SQL 語句,獲取配置信息,創建對應的 AppendStreamTableSink 或者 UpsertStreamTableSink 實例,並將其註冊到 Flink Environment。示例如下:

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

5.支持自定義UDF

支持自定義 UDF 函數,繼承 ScalarFunction 或者 TableFunction。在 resources 目錄下有相應的 UDF 資源配置文件,默認會註冊全部可執行 Jar 包中配置的 UDF。直接按照使用方法使用即可。

6.部署

部署方式同 Flink Stream 組件。

八.實時數據倉庫的構建

為了保證實時數據的統一對外出口以及保證數據指標的統一口徑,我們根據業界離線數倉的經驗來設計與構架微博廣告實時數倉。

1.分層概覽

數據倉庫分為三層,自下而上為:數據引入層(ODS,Operation Data Store)、數據公共層(CDM,Common Data Model)和數據應用層(ADS,Application Data Service)

日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設

  • 數據引入層(ODS,Operation Data Store):將原始數據幾乎無處理的存放在數據倉庫系統,結構上與源系統基本保持一致,是數據倉庫的數據準。
  • 數據公共層(CDM,Common Data Model,又稱通用數據模型層):包含 DIM 維度表、DWD 和 DWS,由 ODS 層數據加工而成。主要完成數據加工與整合,建立一致性的維度,構建可複用的面向分析和統計的明細事實表,以及彙總公共粒度的指標。

公共維度層(DIM):基於維度建模理念思想,建立整個企業的一致性維度。降低數據計算口徑和算法不統一風險。

公共維度層的表通常也被稱為邏輯維度表,維度和維度邏輯表通常一一對應。

公共彙總粒度事實層(DWS,Data Warehouse Service):以分析的主題對象作為建模驅動,基於上層的應用和產品的指標需求,構建公共粒度的彙總指標事實表,以寬表化手段物理化模型。構建命名規範、口徑一致的統計指標,為上層提供公共指標,建立彙總寬表、明細事實表。

公共彙總粒度事實層的表通常也被稱為彙總邏輯表,用於存放派生指標數據。

明細粒度事實層(DWD,Data Warehouse Detail):以業務過程作為建模驅動,基於每個具體的業務過程特點,構建最細粒度的明細層事實表。可以結合企業的數據使用特點,將明細事實表的某些重要維度屬性字段做適當冗餘,也即寬表化處理。

明細粒度事實層的表通常也被稱為邏輯事實表。

  • 數據應用層(ADS,Application Data Service):存放數據產品個性化的統計指標數據。根據 CDM 與 ODS 層加工生成。

2.詳細分層模型


日均百億級日誌處理:微博基於 Flink 的實時計算平臺建設


對於原始日誌數據,ODS 層幾乎是每條日誌抽取字段後進行保留,這樣便能對問題的回溯與追蹤。在 CDM 層對 ODS 的數據僅做時間粒度上的數據壓縮,也就是在指定時間切分窗口裡,對所有維度下的指標做聚合操作,而不涉及業務性的操作。在 ADS 層,我們會有配置化抽取微服務,對底層數據做定製化計算和提取,輸出到用戶指定的存儲服務裡。

  • 呂永衛,微博廣告資深數據開發工程師,實時數據項目組負責人。
  • 黃鵬,微博廣告實時數據開發工程師,負責法拉第實驗平臺數據開發、實時數據關聯平臺、實時算法特徵數據計算、實時數據倉庫、實時數據清洗組件開發工作。
  • 林發明,微博廣告資深數據開發工程師,負責算法實時特徵數據計算、實時數據關聯平臺、實時數據倉庫、Flink Stream 組件開發工作。
  • 崔澤峰,微博廣告資深數據開發工程師,負責實時算法特徵數據計算、實時任務管理平臺、FlinkStream 組件、FlinkSQL 擴展開發工作。


分享到:


相關文章: