Apache Pulsar作為實時和歷史數據分析的一種存儲系統

該博客介紹了統一流和批處理執行的處理引擎的趨勢,並解釋了現有消息傳遞系統滿足統一存儲需求的挑戰。 然後,它解釋了為什麼Apache Pulsar作為一站式解決方案最適合此類需求。 最後,該博客以Pulsar Spark Connector為例,展示了Apache Pulsar如何在一個系統中存儲所有實時數據,以及它如何支持全時範圍的數據分析。


最新的實時數據存儲和處理方法

在大規模並行數據分析領域,AMPLab的"一個統統所有的堆棧"建議使用Apache Spark作為統一引擎,以支持所有常用的數據處理方案,例如批處理,流處理,交互式查詢和機器學習。 。 結構化流是Spark 2.2.0中發佈的新Apache Spark API,可讓您以對靜態數據進行批量計算的方式來表達對流數據的計算,並且Spark SQL引擎在內部針對這兩種方案執行了廣泛的優化。

另一方面,Apache Flink於2016年左右在公眾視野中具有許多吸引人的功能,例如,當時提供了更好的流處理支持,內置水印支持以及一次精確的語義。 Flink已迅速成為Spark的強大競爭對手。 無論使用哪種平臺,如今的用戶都更加關注如何快速發現數據的價值。 流數據和靜態數據不再是單獨的實體,而是整個數據生命週期中的兩種不同表示形式。

一個自然的想法出現了:收集數據時,我可以將所有流數據保留在消息傳遞系統中嗎? 對於傳統系統,答案是否定的。 以Apache Kafka為例,在Kafka中,主題的存儲是基於分區的-主題分區完全存儲在單個代理中並由單個代理訪問,該代理的容量受最小節點的容量限制。 因此,隨著數據大小的增長,只能通過分區重新平衡來實現容量擴展,這又需要重新複製整個分區,以平衡數據和流量到新添加的代理。 重新複製數據非常昂貴且容易出錯,並且會消耗網絡帶寬和IO。 更糟糕的是,隨著我們向基於容器的雲架構邁進,Kafka被設計為在物理機上運行,它缺乏許多關鍵功能,例如I / O隔離,多租戶和可伸縮性。

由於現有流媒體平臺的侷限性,組織將兩個單獨的系統用於流數據存儲:一個用於新導入數據的消息傳遞系統,然後將陳舊的數據卸載到用於長期存儲的冷存儲中。 將數據存儲分為兩個系統不可避免地帶來了兩個主要障礙:

· 一方面,為了保證分析結果的正確性和實時性,要求用戶瞭解每個數據的邊界,並需要對存儲在兩個系統中的數據進行聯合查詢。

· 另一方面,定期將流數據轉儲到文件或對象存儲中需要額外的操作和維護成本以及大量的群集計算資源消耗。

Apache Pulsar的簡短介紹

Apache Pulsar是由Yahoo創建的企業級分佈式消息傳遞系統,現在它是Apache Software Foundation中的頂級開源項目。 Pulsar遵循一般的pub-sub模式,生產者將消息發佈到主題。 消費者可以訂閱主題,處理收到的消息,並在消息處理後發送確認(Ack)。 訂閱是一個命名配置規則,它確定如何將消息傳遞給使用者。 Pulsar啟用四種可以在同一主題上共存的訂閱類型,按訂閱名稱區分:

· 獨佔訂閱-僅允許單個消費者附加到訂閱。

· 共享訂閱-可以由多個使用者訂閱; 每個消費者都收到一部分消息。

· 故障轉移訂閱-多個使用者可以附加到同一訂閱,但是隻有一個使用者可以接收消息。 僅噹噹前使用者失敗時,排隊的下一個使用者才開始接收消息。

· 密鑰共享訂閱-多個使用者可以附加到同一訂閱,並且具有相同密鑰或相同訂購密鑰的消息僅傳遞給一個消費者。

Pulsar是從頭開始創建的多租戶系統。 為了支持多租戶,Pulsar具有租戶的概念。 租戶可以分佈在群集中,並且可以將其身份驗證和授權方案應用於它們。 它們還是可以管理存儲配額,消息TTL和隔離策略的管理單元。 Pulsar的多租戶性質在主題URL中明顯可見,主題URL具有以下結構:persistent://:// tenant / namespace / topic。 如您所見,承租人是主題分類的最基本單位(比名稱空間和主題名稱更基本)。

為什麼Apache Pulsar最適合

基本分層的體系結構和以段為中心的存儲(使用Apache BookKeeper)是兩個關鍵的設計理念,它們使Apache Pulsar與其他消息傳遞系統相比更加先進。 一個Apache Pulsar集群由兩層組成:一個無狀態服務層,由一組接收和傳遞消息的代理組成;以及一個有狀態持久層,由一組稱為Bookies的Apache BookKeeper存儲節點組成,這些存儲節點持久地存儲消息。 讓我們一一調查設計:

分層架構

與Kafka相似,Pulsar基於主題分區存儲消息,每個主題分區都分配給Pulsar中的一個活動代理,該代理稱為該主題分區的所有者代理。 所有者代理服務於從分區讀取消息並將消息寫入分區。 如果代理失敗,Pulsar會自動將其擁有的主題分區移至群集中其餘可用的代理。 由於代理是"無狀態的",因此Pulsar僅在節點故障或代理群集擴展期間將所有權從一個代理轉移到另一個代理,因此在此期間未發生任何數據複製。

以段為中心的存儲

Apache Pulsar作為實時和歷史數據分析的一種存儲系統

> Figure 1. Layered architecture and Segment-centric storage of Pulsar

如圖1所示,Pulsar主題分區上的消息存儲在分佈式日誌中,並且該日誌進一步分為多個段。 每個段都存儲為Apache BookKeeper分類帳,該分類帳已分發並存儲在集群中的多個bookies中。 在寫入前一個段的時間超過配置的時間間隔後(又稱為基於時間的滾動),或者如果前一個段的大小已達到配置的閾值(又稱為基於大小的滾動),則創建新段 主題分區的所有權已更改。 通過分段,主題分區中的消息可以在群集中的所有bookies上平均分配和平衡,這意味著主題分區的容量不僅受一個節點的容量限制。 相反,它可以擴展到整個BookKeeper群集的總容量。

Apache Pulsar中的兩種設計理念提供了許多重要的好處,例如無限的主題分區存儲,無需數據重新平衡的即時擴展以及服務和存儲集群的獨立可伸縮性。 此外,Pulsar 2.0中引入的分層存儲提供了另一種降低舊數據存儲成本的方法。 使用分層存儲,可以將簿冊中的較舊郵件移至更便宜的存儲中,例如HDFS或S3。

最後但並非最不重要的一點是,Pulsar通過Pulsar Schema提供類型化的消息存儲。 因此,您可以在創建主題時指定數據模式,而Pulsar會為您完成其餘複雜的工作,例如消息驗證,消息序列化到有線格式以及消息反序列化。

Apache Pulsar連接器

我們開發了一種Pulsar Spark連接器,該連接器使Spark能夠對存儲在Pulsar中的消息執行流式處理或批處理作業,並將作業結果寫回到Pulsar。

Pulsar Spark連接器API

自從Spark 2.2.0中的結構化流傳輸以來,Spark使SparkSession成為編寫程序的唯一入口,並且您可以使用稱為DataFrame / DataSet的聲明性API來滿足您的需求。 在這樣的程序中,您聲明瞭如何生成,轉換和最終編寫一個DataFrame。 Spark SQL引擎會進行其他優化,並在集群上分佈式運行您的代碼。 以以下代碼為例,將Pulsar用作數據源或數據接收器:

· 使用一個或多個主題構建流媒體源。

· 構造一個批處理源。

· 將流媒體結果不斷匯入Pulsar主題

· 將批處理結果寫入Pulsar。

提示:

Pulsar Spark Connector支持直接讀取和寫入Pulsar消息的DataSet / DataFrame,消息的元數據字段(例如事件時間,消息ID)帶有兩個下劃線(例如__eventTime)作為前綴,以避免潛在的命名衝突。 消息。

Pulsar Spark連接器內部

Apache Pulsar作為實時和歷史數據分析的一種存儲系統

> Figure 2. An overview of Spark Structured Streaming architecture

圖2顯示了結構化流傳輸的主要組成部分(以下簡稱為" SS"):

· 輸入和輸出-提供容錯能力。 SS要求輸入源必須是可重播的,如果節點崩潰,則允許重新讀取最近的輸入數據(Pulsar Spark Connector保證了這一點)。 輸出接收器必須支持冪等寫入才能提供"完全一次"(Pulsar Spark Connector當前無法執行此操作,我們提供"至少一次"保證,並且您可以通過主鍵在以後的Spark作業中對消息進行重複數據刪除)。

· API-批處理程序和流處理程序共享Spark SQL批處理API,具有多個API功能以專門支持流處理。

· 觸發器-控制引擎嘗試計算新結果和更新輸出接收器的頻率。

· 用戶可以使用水印策略來確定何時停止處理延遲到達的數據。

· 有狀態的運算符-允許用戶通過鍵跟蹤和更新可變狀態,以進行復雜的處理。

· 執行層-SS在接收到DataFrame查詢後,確定如何遞增地運行它(針對流查詢),對其進行優化並通過以下一種執行模型來運行它:

· 默認情況下,微批處理模型可通過動態負載平衡,重新調整比例,故障恢復和緩解混亂來提高吞吐量。

· 低延遲情況下的連續模型。

· 日誌和狀態存儲-首先寫入預寫日誌以跟蹤每個源中的消耗位置。 大型狀態存儲可對操作員的內部狀態進行快照,並簡化故障期間的恢復過程。

流作業的執行流程

對於Pulsar Spark Connector,Spark中的源和接收器定義了我們應該如何實現讀/寫邏輯:

Apache Pulsar作為實時和歷史數據分析的一種存儲系統

> Figure 3. An example execution flow of a microbatch

StreamExecution在內部處理執行邏輯。 圖3顯示了StreamExecution內部的微批執行流程:

· 在每個微批處理的最開始,SS會向源詢問可用數據(getOffset)並將其持久保存到WAL。

· 然後,源根據SS提供的開始和結束偏移量在批處理(getBatch)中提供數據。

· SS觸發邏輯計劃的優化和編譯,並將計算結果寫入接收器(addBatch)。 注意:實際數據採集和計算在此處進行。

· 一旦成功將數據寫入接收器,SS就會通知源可以丟棄(提交)數據,並將成功執行的batchId寫入內部commitLog。 回到Pulsar Spark Connector,我們執行以下操作:

· 在查詢計劃階段,將從Pulsar獲取主題的架構,檢查其兼容性(查詢中的主題應共享相同的架構)並轉換為DataFrame架構。

· 為每個主題分區創建使用者,並在(開始,結束)之間返回數據。

· 收到來自SS的提交調用後,我們在主題分區上重置了Cursor,通知Pulsar可以清除數據。

· 對於我們從addBatch收到的DataFrame,記錄通過生產者發送發送到相應的主題。

主題/分區添加/刪除發現

流作業本質上是長期運行的。 在執行過程中,主題或分區可能會被刪除或添加。 Pulsar Spark Connector通過列出所有可用分區並將其與最後一個微批處理或紀元中的分區進行比較,從而在連續執行的每個微批處理或每個紀元的開始處啟用主題/分區發現,我們可以輕鬆地找到新添加的分區或哪個分區 分區不見了,並計劃新任務或相應地刪除現有任務。


(本文翻譯自Yijie Shen的文章《Apache Pulsar as One Storage System for Both Real-time and Historical Data Analysis》,參考:https://medium.com/streamnative/apache-pulsar-as-one-storage-455222c59017)


分享到:


相關文章: