本文將對Kafka做一個入門簡介,並展示如何使用Kafka構建一個文本數據流管道。通過本文,讀者可以瞭解一個流處理數據管道(Pipeline)的大致結構:數據生產者源源不斷地生成數據流,數據流通過消息隊列投遞,數據消費者異步地對數據流進行處理。
Kafka簡介
2010年,LinkedIn開始了其內部流數據處理平臺的開發,2011年將該系統捐獻給了Apache基金會,取名Apache Kafka(以下簡稱Kafka)。Kafka的創始人Jay Kreps覺得這個系統主要用於優化讀寫,應該用一個作家的名字來命名,加上他很喜歡作家卡夫卡的文學作品,覺得這個名字對於一個開源項目來說很酷,因此取名Kafka。
Kafka是一種面向大數據領域的消息系統。在大數據生態圈中,Hadoop的HDFS或Amazon S3提供數據存儲服務,Hadoop MapReduce、Spark和Flink負責計算,Kafka是被用來連接這些系統和應用的消息系統。
Kafka可以連接多個組件和系統
消息系統的功能
消息系統一般使用“生產者-消費者(Producer-Consumer)”模型來解決問題。如下圖所示,生產者生成數據,將數據發送到一個緩存區域,消費者從緩存區域中消費數據。
生產者消費者模型
消息系統可以解決以下問題:
Kafka作為一個消息系統,主要提供三種核心能力:
可見Kafka不僅僅是一個消息隊列,也有數據存儲和流處理的功能,確切地說,Kafka是一個流處理系統。
Kafka的一些核心概念
Topic
Kafka按照Topic來區分不同的數據。以淘寶這樣的電商平臺為例,某個Topic發佈買家用戶在電商平臺的行為日誌,比如搜索、點擊、聊天、購買等行為;另外一個Topic發佈賣家用戶在電商平臺上的行為日誌,比如上新、發貨、退貨等行為。
Producer
多個Producer將某種數據發佈到某個Topic下。比如電商平臺多臺線上服務器將買家行為日誌發送到名為user_behavior的Topic下。
Consumer
多個Consumer被分為一組,名為Cosumer Group,一組Consumer Group訂閱一個Topic下的數據。通常我們可以使用Flink編寫的程序作為Kafka的Consumer來對一個數據流做處理。
使用Kafka構建一個文本數據流
下載和安裝
絕大多數的大數據框架基於Java,因此在進行開發之前要先搭建Java編程環境,主要是下載和配置JDK(Java Development Kit)。網絡上針對不同操作系統安裝JDK的相關教程已經很多,這裡不再贅述。
接下來我們從Kafka官網(https://kafka.apache.org/downloads)下載二進制文件形式的軟件包,軟件包格式為tgz。Windows用戶可以使用7zip或WinRAR軟件解壓tgz文件,Linux和macOS用戶需要使用命令行工具,進入該下載目錄,執行命令解壓。
$ tar -xzf kafka_2.12-2.3.0.tgz$ cd kafka_2.12-2.3.0
注意,$符號表示該行命令在類Unix操作系統(Linux和macOS)命令行中執行,而不是在Python交互命令或其他任何交互界面中。Windows的命令行提示符是大於號>。
注意,bin目錄默認為Linux和macOS設計,本文基於macOS,直接了使用bin目錄中的文件。Windows用戶要進入bin\\windows\\來啟動相應腳本,且腳本文件後綴要改為.bat。
啟動服務
Kafka使用ZooKeeper來管理集群,因此需要先啟動ZooKeeper。剛剛下載的Kafka包裡已經包含了ZooKeeper的啟動腳本,可以使用這個腳本快速啟動一個ZooKeeper服務。
$ bin/zookeeper-server-start.sh config/zookeeper.properties
啟動成功後將有對應日誌打印到屏幕上。
接下來新開啟一個命令行會話,啟動Kafka:
$ bin/kafka-server-start.sh config/server.properties
以上兩個操作均使用config文件夾下的默認配置文件,需要注意配置文件的路徑是否寫錯。生產環境下的配置文件比默認配置文件複雜得多。
創建Topic
新開啟一個命令行會話,創建一個名為Shakespeare的Topic:
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Shakespeare
也可以使用命令查看已有的Topic:
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Shakespeare
發送消息
接下來我們模擬Producer,假設這個Producer是莎士比亞本人,它不斷向Shakespeare這個Topic下發送自己的最新作品:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Shakespeare
>To be, or not to be, that is the question:
每一行作為一條消息事件,被髮送到了Kafka集群上,雖然這個集群只有本機這一臺服務器。
消費數據
另外一些人想了解莎士比亞向Kafka發送過哪些新作,所以需要使用一個Consumer來消費剛剛發送的數據。我們啟動一個命令行會話來模擬Consumer:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Shakespeare --from-beginning
To be, or not to be, that is the question:
Producer端和Consumer端在不同的命令行會話中,我們可以在Producer的命令行會話裡不斷輸入一些文本,切換到Consumer端後,可以看到相應的文本被髮送了過來。
至此,模擬了一個實時數據流數據管道:不同人可以創建屬於自己的Topic,發佈屬於自己的內容,其他人可以訂閱一到多個Topic,根據自身需求設計後續處理邏輯。
小結
Kafka是一種消息系統,提供了數據流“發佈/訂閱”功能,保證了數據冗餘。