網易雲 戴丹:《實時流計算在業務中的應用》

網易雲 戴丹:《實時流計算在業務中的應用》


9月15日FMI-2018人工智能與大數據高峰論壇圓滿落幕,網易雲戴丹老師就實時流計算在業務中的應用進行了深入的分享。

飛馬網將其內容整理如下:

各位早上好,我在這邊跟大家分享一些流式計算技術相關的內容。首先介紹一下網易對大數據這塊數據平臺基本的情況,首先是數據業務架構,網易經過近20多年的發展,已經沉澱了非常豐富的業務數據,本身的業務當前在跑的可能有一兩百個,整個數據業務架構有三層結構,最底下是業務數據的採集,包括日誌數據,數據庫數據,這部分數據可以時時導入,離線導入形成一個離線數據倉,現在也有時時數倉,第二層是數據平臺,相當於管理所有業務數據,在網易有兩個平臺,一個是網易猛獁,是開放給大家使用的運營開發平臺,還有網易有數,是可視化的,這是偏業務層的跟技術稍微遠一點。數據採集有存儲計算,還有離線計算,時時計算,最上面是數據應用這一層,對應到非常具體的業務,網易新聞,雲音樂,考拉,雲課堂之類,以新聞為例,在實時計算這一塊,他在新聞推薦這一塊用的比較多一點,比如用網易新聞客戶端,每天給你推送一些消息,這個也是有實時計算的應用在裡面。每個人的興趣點所處的地理位置,年齡段性別都不一樣,會做一個興趣的推薦,為什麼還需要實時計算,因為這個推薦的效果跟時間和本人的狀態非常相關,跟當前的熱點也有關係,實時計算是用來真實反饋推薦效果的。比如在網易考拉做大促,運營的一些活動促銷做推送的時候也是用到了實時計算,比如考拉618,當前上午給你推送了一波消息,這波消息的推送是通過實時計算它的轉化率,計算推薦效果是否OK,運營再根據這個結果做策略的調整。

接下來是純技術相關的,這是網易大數據的架構,這個各大公司都差不多,這是數據層和管理層具體的展示,底下數據源這一層還是結構化,半結構化,非結構化的數據,數據集成,在網易分為兩類,一類是全量的非實時的,還有增量實時的,採集系統和數據庫增量系統,還有存儲,實時計算有離線計算和在線計算並列的。再往上是平臺層,是給業務開發的,旁邊還有一些輔助的功能,作業流和相關的特性。

下面是今天我要跟大家分享的重點,網易流式計算的平臺,我們這個流式計算平臺叫Sloth,項目立項在2017年初,2016年有一部電影《瘋狂動物城》,裡面有一隻樹懶叫閃電,英文名是Sloth,用這個樹懶類比,是非常便捷非常快的,可能動作很慢,達到的效果是可以做很多的工作,但是表達能力非常強,可以把實時計算的場景跑起來,就像瘋狂動物城裡的情景,樹懶跑的很快。還有網易夢獁在《冰河世紀》裡跟樹懶是三劍客,所以這是簡單高效的平臺。

下面分兩部分介紹一下流式計算的經驗,一部分是產品特性,第二部分是具體關鍵實現的細節。首先是Sloth的特點,首先用SQL開發的流計算任務,比較重要的點是完全兼容離線SQL,因為流式SQL是去年火起來的,我們做這個是去年年初做成第一個版本,流和離線的表達能力比較類似,原來用Having解決大數據計算問題的人,繼續用寫SQL的方式做,但是從原來天級別的計算時效性提升到秒級。

第二個特點,支持UDF,Having、Join、subquery,兼容MySQL,同時也兼容UDF,用戶可以自己提交UDF,實現個性化跟業務場景需求比較相關的自己實現的函數,寫完了之後可以在SQL裡直接使用。

第三,支持DDL、SQL級聯和維表關聯,DDL本身並不參與計算邏輯定義,但是他是計算輸入輸出定義的關鍵因素,大家知道不管是離線計算還是實時計算,計算三要素無非是輸入輸出,DDL是定義輸出輸入的,適配各種樣的存儲郵件,輸入主要是像MQ之類的數據隊列,輸出比較豐富多樣,這個業務場景可能是Kbuka、數據庫之類的。SQL級別,我們在項目之初,絕大部分項目沒有SQL的支持,用戶相當於脫離不了計算引擎的東西,肯定要寫一些代碼,SQL的支持是為了讓業務方寫純SQL的方式解決流式計算的問題,他不需要了解這個代碼怎麼寫。還有一個是維表關聯,這是業務裡用的比較多的,相當於是一個流和表結合計算的場景,可能大家業務中涉及的比較多的是用流式計算,結合靜態的或者變化不頻繁的K1的配置查詢,或者是字段補全,用ID查用戶名之類的,這也是流式計算生態裡比較關鍵的點。

第四,擴展了Flink做計算引擎,Flink已經是不錯的了,而且狀態越來越好,但是在項目之初包括現在還是有很多不足,我們的實時計算平臺而言,我們對Flink做了很多改進,比如支持數據落期,支持數據撤銷,還有Join的性能優化,做了很多工作,為此才足以讓流式計算平臺跑起來。

第五點,我們用了一個技術方案實現SQL到Flink的轉化,因為流式計算本身是一個實時性要求比較高的場景,對運行式程序效率要求比較高一點,用這個技術相當於可以大幅度的提高運行效率,相關技術在後面的PPT展示。倒數第二點是增量計算模型,從我去年做分享的反饋來看,瞭解和理解的人並不是特別多,今年可能會好一點,等一下會做一個解釋,為什麼需要有增量計算,為什麼有撤銷,大家在什麼情況下需要注意撤銷這件事。

最後一個是流式計算平臺本身的數據特性的保證,保證了exactlyonce和at least Once,恰好一次處理和至少一次處理,這是得益於Flink框架的能力才做到這個事情,所有的這些特點構成了Sloth這個平臺所必須的能力和先進的點。

給大家看一下內部流式計算平臺的產品,這個上線一年多了,產品形態比較簡單,左邊這個地方是三個欄目,任務開發、任務運維、郵件管理。任務開發相當於給業務方的產品寫SQL的地方,SQL是S-SQL,允許用戶創建UDF,是一個函數名,有一個類名,在UDF裡自由發揮,包括了檢查和交付式的調試,這個截圖是真實的線上應用,是一個廣告的業務,有十來個任務,任務開發完了之後開始上線運營,這裡主要表達的是流式計算SQL可以解決80%左右的場景,但是用SQL的好處是非常快,可能算個點擊率轉化率,用SQL寫,從開發到上線半天時間就夠了,用平臺不用運維,而且還有一個實時監控,這是平臺裡面集成了單個任務計算的延遲,因為本身對延遲是非常敏感的,所以首先用戶要能夠看到延遲有多少,如果延遲比較高,還是要監控告警之類的。

下面給大家講一個具體的流式Sloth的應用,這是廣告業務的實時報表,廣告業務在各公司都是比較重要的,因為它直接相關營收,這裡舉的例子是報表,這是流式Sloth最典型的應用,最成熟,沒有任何問題。廣告主通過實時報表來觀測廣告流量的強佔情況,根據這個廣告的PV數和廣告投放的好壞,根據投放之後實時的點擊率等等,廣告主會看廣告的點擊情況進行投放策略的變更,如果轉化率比較好,他會繼續加大投入,如果這個轉化率很低的話,他會及時撤下來,減少沒必要的投入,整個場景很明確,我們流式怎麼來表達,剛才我提到,計算的話三個點,輸入、計算、輸出,計算的輸入和輸出先通過TCR表示,這個裡面有兩個數據源,一個是廣告的點擊數據,第二是廣告PV,假設Kafka是原始數據類型,我們並不能直接從裡面提取出A字段B字段C字段,也不需要填寫數據源相關的屬性,比如這個Topic屬於哪一個Topic,在哪一個broker上面,消費的ID是什麼,看哪個數據,本身這個數據是實時更新的,比如現在是11點多,Kafka的數據是11點,並且不斷追加的。

第二部分,定義輸出,就看業務方需要計算哪些東西,在這個場景裡假設用戶,廣告主維度的,包括數據時間,比如十點到十一點,我投的一個廣告,他的轉化率怎麼樣,這裡有一個廣告主ID點擊數,PV、收入,這個輸出表也有相關的屬性,輸入是Kafka,輸出是數據庫,必須支持輸出存儲,他還支持臨時表,臨時表沒有物理的影射,就是一個動態表的概念,所以它的的類型是type,因為我們的輸出場景會計算很多個,廣告主維度,創意維度,計劃維度非常多,一個實時流可以用多次計算,這個實時流可以被用很多次,你的Sql會很複雜,因為源頭計算四個輸出,源頭要重複計算四次,有了這個Type表之後,相當於只需要把寬表輸出一次就可以了,想算多少維度就算多少。

第三部分,計算,這裡面截了一部分,這個SQL本身也很簡單,首先需要把點擊數據和PV數據匯合到同一個流裡,這裡的關鍵字是SUM,點擊數據和PV數據剛才定義的是原始數據,需要解析,平臺不知道你的數據長什麼樣子,這個時候通過用戶寫自定義函數的方式把數據解析出來,點擊數據解析是從原始數據寫一個Click,這是一個函數,用Java實現,來寫這個,點擊ID,這是時間,這是事件的小時,廣告ID,創業ID之類的,點擊收錄一些字段。然後PV數據也是一樣的,只不過中間有一些字段是為零的,因為PV和Click要統一的,沒有的就為零。

輸入的兩個流,作為一個預計算,點擊PV收入相加,相當於得到最細力度空間層的寬表,放在這個臨時表裡面,下一步是根據中間管表計算具體的報表,在右邊小的SQL裡是從廣告維度的實時報表,他是一張輸出表,這個對應的是數據庫,有時間、廣告主、日誌日期,相當於每一個小時做一個實時的聚合,聚合的內容也是點擊PV的融合,這樣相當於SQL跑起來之後,廣告主維度對應的物理存儲,比如MySQL的物理表不斷更新,每秒更新一次,當前時間的轉化率可以實時的看到,如果作為一個頁面呈現,這個頁面是實時動態的。

這個場景計算這部分已經講完了,大家看到的直觀感受是什麼,其實中間沒有提到實時計算相關的字眼,這就是SQL帶來的好處,用了SQL大家可以用解決離線計算的思維解決實時計算的問題,因為SQL是一樣的,無非是數據的輸入是持續輸入的,可以把數據流理解成動態的表,輸出也理解成動態的表,整個實時計算的SQL是很順暢的。

這是總結的第二個為什麼用SQL的好處,開發很便捷,完整的場景有四個維度的實時報表,一共包含兩個UDF,7DDL,5DML,一個DML是算中間寬表,4個DML是四個實時報表維度,300行的SQL,這個開發效率比直接用框架提升了四倍以上。

特別是在網易的業務場景下,這個優勢更明顯,因為網易至少有一百多個業務,直接拿優勢平臺就可以了,不需要每個業務團隊重新做框架的運維業務的開發。

首先分享一下流式計算平臺實現的原理,Sloth是一個SQLFlink的東西,最核心在於SQL怎麼變成Flink代碼,Flink的開源項目裡早就有了,2017年1.1版本就開始有了,現在相對比較成熟了。Sloth原理跟Flink是類似的。整個過程是這樣的,SQL到Flink的代碼分為兩個階段,第一個階段是SQL到執行計劃,第二,執行計劃到真實的Flink代碼,這張圖是SQL到執行計劃的階段,Sloth,SQL是用開源的Parse做的,我們支持了表定義,DML是原先要支持的,SQL解析分成這幾個步驟,第一,把所有的DDL、DML解析成SQL,相當於SQL到內部數據結構,第二個階段,根據信息和語法本身做校驗。第三,語法到語法數的轉化,生成的是邏輯執行計劃數,這是單句SQL生成的,有多少DML就有多少狼藉執行計劃數,一個業務的任務可能包含非常多的DML,操作非常多實時流,輸出到非常多的實時流,他有很多顆。

第三,做Optimise,這是比較難的一點,最後一部是做數的合併和翻譯,根據表的原信息,DML的解析結果,根據你的輸入輸出表,臨時表,把多棵的執行計劃樹串聯成一個完整的很大的執行計劃樹,整個是有效環圖,我們業務裡沒有知識環,但實際上這個環也可以。

上一步得到了邏輯,執行計劃樹,下一步是做一個Flink代碼的翻譯,我們翻譯到了Flink的郵箱,這一塊是Flink最穩定的特性,舉個簡單的例子,怎麼做Select,從這張表裡查詢一千個記錄,這裡是一棵邏輯執行計劃樹,最下面是一個掃描,然後是filter,過濾,再往上是做投影,把你要算的東西投影出來,最後一部分是做聚合,在這裡面做一個創新,這加一的操作。

CodeGen怎麼解決計劃到代碼生成的問題,早些用的是火山模型,在左邊這塊,相當於這個圖裡每一層執行計劃的每一個節點就是一個函數,多層之間是多層的函數調用,這是過濾的節點翻譯出來的代碼,它是不斷的去判斷這個Next是不是需要到上一個節點,這火山模型,代碼比較多,函數調用比較多,直觀上講,函數調用多現在開銷大一點,但是實測下來也還好,我們還有更好的選擇,Whole-stage CodeGen這是在現在的Flink裡都有的技術,過不錯,而且生成的代碼跟人寫的差不多,一個計劃只有一個函數,它的邏輯很直觀,是這個表裡所有的數據,第二層做一個過濾,第三步是聚合,所有滿足的條件都加一,算完了之後直接返回,這就是Whole-stageCodeGen,從執行計劃書的森林到完整的代碼有很多工作要做,比如這個中間表怎麼連接,這個表有多個輸入,也可能有多個輸出,怎麼翻譯代碼,怎麼做新的優化,怎麼把數據實踐,任務相關的特性,SQL以外的東西也要融合到裡面。

下面講一下UDF,UDF的語法跟Hive差不多,兼容Hive的UDF,這個裡面語法上是兼容的,但是Hive代碼不能那來直接用,UDTF可以拿來用,UDF不行,在聚合函數里,我們在Hive原來的基礎上撤銷。再跟大家介紹一下什麼是增量計算,或者什麼是流式計算的數據撤銷,這是一個概念性的東西,相信大家一定能聽懂,我舉了一個實際場景的例子,如果在流式計算裡不考慮撤銷,你有可能得到的結果是錯的。

我們需要做的計算是這樣的,電商平臺要對所有商家的銷售額度分類統計,銷售額0-100之間歸為一類,100-200之間歸為一類,這是一個實時計算,這個結果不斷動態更新,這個場景用SQL很容易表達,我們要處理的數據是商家的ID和每筆訂單的金額,然後整個計算用SQL來表達,兩次查詢,第一步是對商家的ID做一個分組,算這個商家的銷售總額,第二步是外部查詢,對商家做分統,這個商家銷售金額總額是在哪個區間裡,最終回得到每個區間的值和這個區間裡面所有商家的數量。大家一步一步來看這個計算怎麼做,假設輸入數據有四個,包含三個商家,一號商家和二號商家分別有30塊和50塊錢的訂單,三號商家兩筆訂單,用離線計算的模型分兩步,第一步SUM,先按商家的訂單總金額,第一層得到1號30,2號10塊,3號是130,第二步是分統,得到的結果是0-120,第二個區間一位商家,這個就很直白,沒有任何懸念,也沒有任何有問題的地方。

下一個是流式計算的場景,做流式計算的人肯定知道流式計算的數據特點是無界的,你的計算程序並不知道數據流什麼時候結束,有可能出現亂序,A訂單比B訂單先完成,但是計算系統有可能是B訂單先完成,這跟採集傳輸計算模型都有關係,還有一個是亂序、延遲,可能這筆訂單現在生成,但是要過一個半小時兩個小時,因為某些網絡的原因,系統的原因,延遲到了計算引擎,這只是一個延遲的Case。流式計算裡因為你的輸出是不斷更新的,假設三號商家50個訂單延遲了,第一次輸出之前沒有到,第一次輸出,左邊這個部分他的過程跟離線的一樣,第一層做SUM,第二層做分統,之前三個訂單分別是三個商家分別少於一百塊,得到結果0-100區間三個商家,系統算完了這個之後,三號商家第二筆訂單進來了,我們在流式計算場景裡做一個疊加的計算,我們肯定在之前計算結果上做疊加計算,對這條新流入的數據重複應用兩層計算,第一層是計算SUM,三號商家訂單總金額80塊,再加上新的訂單,總金額是130塊,第二步做分統,100到200,裡面剛好有三號商家。這個時候整個系統輸出的結果,第一區間三位商家,第二區間一位商家,顯然跟剛才離線計算的結果不一樣,離線計算結果第一區間只有兩位專家,在這個時代迭代計算的方式得到的結果不一樣,這個問題也很好理解,0-130這個結果是過期的計算結果,這個結果在第一次輸出的時候對的,因為數據不斷更新,這個結果隨著時間推移就失效了,不代表當前時刻數據的狀態,計算引擎怎麼修整這個問題,這就是增量計算或者系統實現裡實現的數據撤銷,數據的輸入在不變的情況下增加撤銷的邏輯。

左邊部分我不再贅述,計算結果,第一區間三位商家,對第四條數據做增量計算的時候,我們需要注意一個問題,在第一層計算的時候,三號商家聚合節點,它的緩存結構三號商家80塊,流入一個訂單50塊,計算結果更新了,得到三號商家訂單總金額130塊,他其實需要做更多事情告訴下一個節點,我現在有新的結果給你,老的結果應該作廢,所謂的作廢是撤銷數據,他應該把什麼數據作廢,就是之前緩存的計算結果作廢,這個地方我用了一個-號表示是特殊的撤銷消息,並不是計算結果,這個時候整個流式計算系統需要對撤銷消息做特殊的處理,來看這個撤銷消息,撤銷消息的分統工作跟正常的保持一致,他也應該被分到0-100區間,在第二層收到撤銷消息的時候需要對應逆操作,撤銷了上游的3、80的結果,第二層是+1的,他應該-1,-1之後更新計算結果,從130更新成120,同時需要繼續輸出一條額外的消息,把130這個消息給撤銷掉,對應到實際的運行態裡面,有可能從數據庫裡把130這條記錄刪掉,或者直接改成120,這兩個操作合併成一個。

最右邊的3-100計算不受影響,仍然放在100-200區間,最終去掉過時階段,得到最終結果是120-210,這個跟離線計算的結果是一致的,這就是為什麼流式計算裡面需要做增量計算,需要撤銷這個概念。總結一下,什麼場景下需要做撤銷,撤銷消息是聚合節點產生的,如果你的系統只有一層計算節點,可能並不一定需要做撤銷,跟現在的計算結果直接在輸出系統裡,直接做覆蓋就可以了,新結果把老結果覆蓋,你看到的一定是最新的有效數據結果,或者你並不Care計算結果是不是精確,差一點就差一點,數據延遲非常小,非常偶然,及時發生也不需要做到完全正確,你也可以不做撤銷。還有一個,你希望結果精確,但是又不會做撤銷,或者不知道怎麼做,或者覺得這個代價太高了,那要犧牲數據的實時性,相當於在這個時刻就不輸出了,那你就不需要做撤銷,犧牲實時性,比如一個小時才輸出一次,等第二次數據進來之後再輸出。所以那些對業務有實時性要求比較高,又希望計算結果精確,這個情況需要撤銷這件事情的。其實最新的版本里,這一塊已經比較完善了,配合API可以解決這個問題,我們Sloth在Flink社區沒有這個東西的時候已經做了。

最後講一下我們成果的總結,因為Sloth整個對Flink的依賴非常重,我們跟社區也有一些互動,為什麼不直接使用FlinkSQL,用FlinkSQL在我們項目之初是不現實的,因為它不支持Join,整個SQL是一個雛形,第二,他並不是一個相當的引擎,如果有比Flink更先進的引擎出來,他只需要把引擎換掉就可以了,用戶並不需要關注引擎是什麼。我們跟社區也有互動,在今年培養了一位Flinkcommiter。以上就是今天分享的內容,謝謝大家。

想要獲取大會PPT的朋友可以關注公眾號【FMI飛馬網】—底部導航欄回覆關鍵詞"ppt"進行查閱哦!


分享到:


相關文章: