Kafka能做什麼?十分鐘構建你的實時數據流管道

本文將對Kafka做一個入門簡介,並展示如何使用Kafka構建一個文本數據流管道。通過本文,讀者可以瞭解一個流處理數據管道(Pipeline)的大致結構:數據生產者源源不斷地生成數據流,數據流通過消息隊列投遞,數據消費者異步地對數據流進行處理。

Kafka簡介

2010年,LinkedIn開始了其內部流數據處理平臺的開發,2011年將該系統捐獻給了Apache基金會,取名Apache Kafka(以下簡稱Kafka)。Kafka的創始人Jay Kreps覺得這個系統主要用於優化讀寫,應該用一個作家的名字來命名,加上他很喜歡作家卡夫卡的文學作品,覺得這個名字對於一個開源項目來說很酷,因此取名Kafka。

Kafka是一種面向大數據領域的消息系統。在大數據生態圈中,Hadoop的HDFS或Amazon S3提供數據存儲服務,Hadoop MapReduce、Spark和Flink負責計算,Kafka是被用來連接這些系統和應用的消息系統。

Kafka可以連接多個組件和系統


消息系統的功能

消息系統一般使用“生產者-消費者(Producer-Consumer)”模型來解決問題。如下圖所示,生產者生成數據,將數據發送到一個緩存區域,消費者從緩存區域中消費數據。

生產者消費者模型

消息系統可以解決以下問題:

系統解耦。很多企業內部有眾多系統,即使一個APP也包含眾多模塊,如果將所有的系統和模塊都放在一起作為一個龐大的系統來開發,未來很難維護和擴展。如果將各個模塊獨立出來,模塊之間通過消息系統來通信,未來可以輕鬆擴展每個獨立模塊。另外,假設沒有消息隊列,M個生產者和N個消費者通信,會產生M*N個數據管道,消息隊列將這個複雜度降到了M+N。異步處理。同步是指如果模塊A向模塊B發送消息,必須等待返回結果後才能執行接下來的業務邏輯。異步是消息發送方模塊A無需等待返回結果即可繼續執行,只需要向消息隊列中發送消息,至於誰去處理這些消息,消息等待多長時間才能被處理等一系列問題,都是消費者負責的事情。異步處理更像是發佈通知,發送方不用去關心誰去接收通知,如何對通知做出響應等問題。流量削峰。電商促銷、搶票等場景會對系統產生巨大的壓力,瞬時請求暴漲,消息系統的緩存就像一個蓄水池,以很低的成本將上游的洪峰緩存起來,下游的服務按照自身處理能力從緩存中拉取數據,避免服務崩潰。數據冗餘。很多情況下,下游的數據處理模塊可能發生故障,消息系統將數據緩存起來,直到數據被處理,一定程度上避免了數據丟失風險。

Kafka作為一個消息系統,主要提供三種核心能力:

為數據的生產者提供發佈功能,為數據的消費者提供訂閱功能,即傳統的消息隊列的能力。將數據流緩存在緩存區,為數據提供容錯性,有一定的數據存儲能力。提供了一些輕量級流處理能力。

可見Kafka不僅僅是一個消息隊列,也有數據存儲和流處理的功能,確切地說,Kafka是一個流處理系統。

Kafka的一些核心概念

Topic

Kafka按照Topic來區分不同的數據。以淘寶這樣的電商平臺為例,某個Topic發佈買家用戶在電商平臺的行為日誌,比如搜索、點擊、聊天、購買等行為;另外一個Topic發佈賣家用戶在電商平臺上的行為日誌,比如上新、發貨、退貨等行為。

Producer

多個Producer將某種數據發佈到某個Topic下。比如電商平臺多臺線上服務器將買家行為日誌發送到名為user_behavior的Topic下。

Consumer

多個Consumer被分為一組,名為Cosumer Group,一組Consumer Group訂閱一個Topic下的數據。通常我們可以使用Flink編寫的程序作為Kafka的Consumer來對一個數據流做處理。

使用Kafka構建一個文本數據流

下載和安裝

絕大多數的大數據框架基於Java,因此在進行開發之前要先搭建Java編程環境,主要是下載和配置JDK(Java Development Kit)。網絡上針對不同操作系統安裝JDK的相關教程已經很多,這裡不再贅述。

接下來我們從Kafka官網(https://kafka.apache.org/downloads)下載二進制文件形式的軟件包,軟件包格式為tgz。Windows用戶可以使用7zip或WinRAR軟件解壓tgz文件,Linux和macOS用戶需要使用命令行工具,進入該下載目錄,執行命令解壓。

$ tar -xzf kafka_2.12-2.3.0.tgz$ cd kafka_2.12-2.3.0

注意,$符號表示該行命令在類Unix操作系統(Linux和macOS)命令行中執行,而不是在Python交互命令或其他任何交互界面中。Windows的命令行提示符是大於號>。

注意,bin目錄默認為Linux和macOS設計,本文基於macOS,直接了使用bin目錄中的文件。Windows用戶要進入bin\\windows\\來啟動相應腳本,且腳本文件後綴要改為.bat。

啟動服務

Kafka使用ZooKeeper來管理集群,因此需要先啟動ZooKeeper。剛剛下載的Kafka包裡已經包含了ZooKeeper的啟動腳本,可以使用這個腳本快速啟動一個ZooKeeper服務。

$ bin/zookeeper-server-start.sh config/zookeeper.properties

啟動成功後將有對應日誌打印到屏幕上。

接下來新開啟一個命令行會話,啟動Kafka:

$ bin/kafka-server-start.sh config/server.properties

以上兩個操作均使用config文件夾下的默認配置文件,需要注意配置文件的路徑是否寫錯。生產環境下的配置文件比默認配置文件複雜得多。

創建Topic

新開啟一個命令行會話,創建一個名為Shakespeare的Topic:

$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Shakespeare

也可以使用命令查看已有的Topic:

$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Shakespeare

發送消息

接下來我們模擬Producer,假設這個Producer是莎士比亞本人,它不斷向Shakespeare這個Topic下發送自己的最新作品:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Shakespeare
>To be, or not to be, that is the question:

每一行作為一條消息事件,被髮送到了Kafka集群上,雖然這個集群只有本機這一臺服務器。

消費數據

另外一些人想了解莎士比亞向Kafka發送過哪些新作,所以需要使用一個Consumer來消費剛剛發送的數據。我們啟動一個命令行會話來模擬Consumer:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Shakespeare --from-beginning
To be, or not to be, that is the question:

Producer端和Consumer端在不同的命令行會話中,我們可以在Producer的命令行會話裡不斷輸入一些文本,切換到Consumer端後,可以看到相應的文本被髮送了過來。

至此,模擬了一個實時數據流數據管道:不同人可以創建屬於自己的Topic,發佈屬於自己的內容,其他人可以訂閱一到多個Topic,根據自身需求設計後續處理邏輯。

小結

Kafka是一種消息系統,提供了數據流“發佈/訂閱”功能,保證了數據冗餘。