如何將kafka中的數據快速導入Hadoop?

Kafka是一個分佈式發佈—訂閱系統,由於其強大的分佈式和性能特性,迅速成為數據管道的關鍵部分。它可完成許多工作,例如消息傳遞、指標收集、流處理和日誌聚合。Kafka的另一個有效用途是將數據導入Hadoop。使用Kafka的關鍵原因是它將數據生產者和消費者分離,允許擁有多個獨立的生產者(可能由不同的開發團隊編寫)。同樣,還有多個獨立的消費者(也可能由不同的團隊編寫)。此外,消費者可以是實時/同步或批量/離線/異步。當對比RabbitMQ等其他pub-sub工具時,後一個屬性有很大區別。

如何將kafka中的數據快速導入Hadoop?

要使用Kafka,有一些需要理解的概念:

topic—topic是相關消息的訂閱源;

分區—每個topic由一個或多個分區組成,這些分區是由日誌文件支持的有序消息隊列;

生產者和消費者—生產者和消費者將消息寫入分區並從分區讀取。

Brokers—Brokers是管理topic和分區併為生產者和消費者請求提供服務的Kafka流程。

Kafka不保證對topic的“完全”排序,只保證組成topic的各個分區是有序的。消費者應用程序可以根據需要強制執行對“全局”topic排序。


如何將kafka中的數據快速導入Hadoop?

顯示了Kafka的概念模型


如何將kafka中的數據快速導入Hadoop?

示瞭如何在Kafka部署分發分區的示例


為了支持容錯,可以複製topic,這意味著每個分區可以在不同主機上具有可配置數量的副本。這提供了更高的容錯能力,這意味著單個服務器死亡對數據或生產者和消費者的可用性來說不是災難性的。

此處採用Kafka版本0.8和Camus的0.8.X。

實踐:使用Camus將Avro數據從Kafka複製到HDFS

該技巧在已經將數據流入Kafka用於其他目的並且希望將數據置於HDFS中的情況下非常有用。

問題

希望使用Kafka作為數據傳遞機制來將數據導入HDFS。

解決方案

使用LinkedIn開發的解決方案Camus將Kafka中的數據複製到HDFS。

討論

Camus是LinkedIn開發的一個開源項目。Kafka在LinkedIn大量部署,而Camus則用作將數據從Kafka複製到HDFS。

開箱即用,Camus支持Kafka中的兩種數據格式:JSON和Avro。在這種技術中,我們將通過Camus使用Avro數據。Camus對Avro的內置支持要求Kafka發佈者以專有方式編寫Avro數據,因此對於這種技術,我們假設希望在Kafka中使用vanilla序列化數據。

讓這項技術發揮作用需要完成三個部分的工作:首先要將一些Avro數據寫入Kafka,然後編寫一個簡單的類來幫助Camus反序列化Avro數據,最後運行一個Camus作業來執行數據導入。

為了把Avro記錄寫入Kafka,在以下代碼中,需要通過配置必需的Kafka屬性來設置Kafka生成器,從文件加載一些Avro記錄,並將它們寫出到Kafka:

如何將kafka中的數據快速導入Hadoop?

可以使用以下命令將樣本數據加載到名為test的Kafka的topic中:


如何將kafka中的數據快速導入Hadoop?


Kafka控制檯使用者可用於驗證數據是否已寫入Kafka,這會將二進制Avro數據轉儲到控制檯:

如何將kafka中的數據快速導入Hadoop?

完成後,編寫一些Camus代碼,以便可以在Camus中閱讀這些Avro記錄。

實踐:編寫Camus和模式註冊表

首先,需要了解三種Camus概念:

解碼器—解碼器的工作是將從Kafka提取的原始數據轉換為Camus格式。

編碼器—編碼器將解碼數據序列化為將存儲在HDFS中的格式。

Schema註冊表—提供有關正在編碼的Avro數據的schema信息。

正如前面提到的,Camus支持Avro數據,但確實需要Kafka生產者使用Camus KafkaAvroMessageEncoder類來編寫數據,該類為Avro序列化二進制數據添加了部分專有數據,可能是因為Camus中的解碼器可以驗證它是由該類編寫的。

在此示例中,使用 Avro serialization進行序列化,因此需要編寫自己的解碼器。幸運的是,這很簡單:


如何將kafka中的數據快速導入Hadoop?

如何將kafka中的數據快速導入Hadoop?


你可能已經注意到我們在Kafka中寫了一個特定的Avro記錄,但在Camus中我們將該記錄讀作通用的Avro記錄,而不是特定的Avro記錄,這是因為CamusWrapper類僅支持通用Avro記錄。否則,特定的Avro記錄可以更簡單地使用,因為可以使用生成的代碼並具有隨之而來的所有安全特徵。

CamusWrapper對象是從Kafka提取的數據。此類存在的原因是允許將元數據粘貼到envelope中,例如時間戳,服務器名稱和服務詳細信息。強烈建議使用的任何數據都有一些與每條記錄相關的有意義的時間戳(通常這將是創建或生成記錄的時間)。然後,可以使用接受時間戳作為參數的CamusWrapper構造函數:

public CamusWrapper(R record, long timestamp) { ... }

如果未設置時間戳,則Camus將在創建包裝器時創建新的時間戳。在確定輸出記錄的HDFS位置時,在Camus中使用此時間戳和其他元數據。

接下來,需要編寫一個schema註冊表,以便Camus Avro編碼器知道正在寫入HDFS的Avro記錄的schema詳細信息。註冊架構時,還要指定從中拉出Avro記錄的Kafka的topic名稱:


如何將kafka中的數據快速導入Hadoop?


運行Camus

Camus在Hadoop集群上作為MapReduce作業運行,希望在該集群中導入Kafka數據。需要向Camus提供一堆屬性,可以使用命令行或者使用屬性文件來執行此操作,我們將使用此技術的屬性文件:

如何將kafka中的數據快速導入Hadoop?

從屬性中可以看出,無需明確告訴Camus要導入哪些topic。Camus自動與Kafka通信以發現topic(和分區)以及當前的開始和結束偏移。

如果想要精確控制導入的topic,可以分別使用kafka.whitelist.topics和kafka.blacklist.topics列舉白名單(限制topic)和黑名單(排除topic),可以使用逗號作為分隔符指定多個topic,還支持正則表達式,如以下示例所示,其匹配topic的“topic1”或以“abc”開頭,後跟一個或多個數字的任何topic,可以使用與value完全相同的語法指定黑名單:

kafka.whitelist.topics=topic1,abc[0-9]+

一旦屬性全部設置完畢,就可以運行Camus作業了:


如何將kafka中的數據快速導入Hadoop?


如何將kafka中的數據快速導入Hadoop?


這將導致Avro數據在HDFS中著陸。我們來看看HDFS中的內容:

如何將kafka中的數據快速導入Hadoop?

第一個文件包含已導入的數據,其他供Camus管理。

可以使用AvroDump實用程序查看HDFS中的數據文件:


如何將kafka中的數據快速導入Hadoop?


那麼,當Camus工作正在運行時究竟發生了什麼? Camus導入過程作為MapReduce作業執行,如圖5.16所示。

如何將kafka中的數據快速導入Hadoop?

隨著MapReduce中的Camus任務成功,Camus OutputCommitter(允許在任務完成時執行自定義工作的MapReduce構造)以原子方式將任務的數據文件移動到目標目錄。OutputCommitter還為任務正在處理的所有分區創建偏移文件,同一作業中的其他任務可能會失敗,但這不會影響成功任務的狀態——成功任務的數據和偏移輸出仍然存在,因此後續的Camus執行將從最後一個已知的成功狀態恢復處理。

接下來,讓我們看看Camus導入數據的位置以及如何控制行為。

數據分區

之前,我們看到了Camus導入位於Kafka的Avro數據,讓我們仔細看看HDFS路徑結構,如圖5.17所示,看看可以做些什麼來確定位置。


如何將kafka中的數據快速導入Hadoop?


圖5.17 在HDFS中解析導出數據的Camus輸出路徑

路徑的日期/時間由從CamusWrapper中提取的時間戳確定,可以從MessageDecoder中的Kafka記錄中提取時間戳,並將它們提供給CamusWrapper,這將允許按照有意義的日期對數據進行分區,而不是默認值,這只是在MapReduce中讀取Kafka記錄的時間。

Camus支持可插拔分區程序,允許控制圖5.18所示路徑的一部分。

如何將kafka中的數據快速導入Hadoop?

圖5.18 Camus分區路徑

Camus Partitioner接口提供了兩種必須實現的方法:


如何將kafka中的數據快速導入Hadoop?


例如,自定義分區程序可創建用於Hive分區的路徑。

 總結

Camus提供了一個完整的解決方案,可以在HDFS中從Kafka獲取數據,並在出現問題時負責維護狀態和進行錯誤處理。通過將其與Azkaban或Oozie集成,可以輕鬆實現自動化,並根據消息時間組織HDFS數據執行簡單的數據管理。值得一提的是,當涉及到ETL時,與Flume相比,它的功能是無懈可擊的。

Kafka捆綁了一種將數據導入HDFS的機制。它有一個KafkaETLInputFormat輸入格式類,可用於在MapReduce作業中從Kafka提取數據。要求編寫MapReduce作業以執行導入,但優點是可以直接在MapReduce流中使用數據,而不是將HDFS用作數據的中間存儲。接下來,我們將討論如何將駐留在Hadoop中的數據傳輸到其他系統,例如文件系統和其他地方。


分享到:


相關文章: