Kafka快速入門

一、介紹

是一個分佈式、分區的、多副本的、多訂閱者,基於zookeeper協調的消息隊列系統

1、點對點模式

生產者發送消息到一個隊列中。此時,將有一個或多個消費者消費隊列中的數據。但是一條消息只能被消費一次。當一個消費者消費了隊列中的某條數據之後,該條數據則從消息隊列中刪除。即使有多個消費者同時消費數據,也能保證順序


Kafka快速入門


生產者發送一條消息到queue,只有一個消費者能收到

2、發佈-訂閱模式

生產者生產數據到一個topic中。與點對點消不同的是,消費者可以訂閱一個或多個topic,消費者可以消費該topic中所有的數據,同一條數據可以被多個消費者消費,數據被消費後不會立馬刪除。


Kafka快速入門


發佈者發送到topic的消息,只有訂閱了topic的訂閱者才會收到消息


Kafka快速入門


3、broker

Kafka 集群包含一個或多個服務器,服務器節點稱為broker。

4、Topic

每條發佈到Kafka集群的消息都有一個類別,這個類別被稱為Topic。類似於數據庫的表名

5、partition

topic中的數據分割為一個或多個partition。每個topic至少有一個partition。

6、Producer

生產者即數據的發佈者,該角色將消息發佈到Kafka的topic中。broker接收到生產者發送的消息後,broker將該消息追加到當前用於追加數據的segment文件中。生產者發送的消息,存儲到一個partition中,生產者也可以指定數據存儲的partition。

7、Consumer

消費者可以從broker中讀取數據。消費者可以消費多個topic中的數據。

8、Consumer Group

每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。


9、Leader

每個partition有多個副本,其中有且僅有一個作為Leader,Leader是當前負責數據的讀寫的partition。

10、Follower

Follower跟隨Leader,所有寫請求都通過Leader路由,數據變更會廣播給所有Follower,Follower與Leader保持數據同步。如果Leader失效,則從Follower中選舉出一個新的Leader。當Follower與Leader掛掉、卡住或者同步太慢,leader會把這個follower從“in sync replicas”(ISR)列表中刪除,重新創建一個Follower。



Topic相當於傳統消息系統MQ中的一個隊列queue,producer端發送的message必須指定是發送到哪個topic,但是不需要指定topic下的哪個partition,因為kafka會把收到的message進行load balance,均勻的分佈在這個topic下的不同的partition上( hash(message) % [broker數量] )。

物理上存儲上,這個topic會分成一個或多個partition,每個partiton相當於是一個子queue。在物理結構上,每個partition對應一個物理的目錄(文件夾),文件夾命名是[topicname]_[partition]_[序號],一個topic可以有無數多的partition,根據業務需求和數據量來設置。

在kafka配置文件中可隨時更高num.partitions參數來配置更改topic的partition數量,在創建Topic時通過參數指定parittion數量。Topic創建之後通過Kafka提供的工具也可以修改partiton數量。

一般來說,

(1)一個Topic的Partition數量大於等於Broker的數量,可以提高吞吐率。

(2)同一個Partition的Replica儘量分散到不同的機器,高可用。

當add a new partition的時候,partition裡面的message不會重新進行分配,原來的partition裡面的message數據不會變,新加的這個partition剛開始是空的,隨後進入這個topic的message就會重新參與所有partition的load balance

- Partition Replica:每個partition可以在其他的kafka broker節點上存副本,以便某個kafka broker節點宕機不會影響這個kafka集群。replica副本的方式是按照kafka broker的順序存。

例如有5個kafka broker節點,某個topic有3個partition,每個partition存2個副本,那麼partition1存broker1,broker2,partition2存broker2,broker3。。。以此類推(replica副本數目不能大於kafka broker節點的數目,否則報錯。這裡的replica數其實就是partition的副本總數,其中包括一個leader,其他的就是copy副本)。

這樣如果某個broker宕機,其實整個kafka內數據依然是完整的。但是,replica副本數越高,系統雖然越穩定,但是回來帶資源和性能上的下降;replica副本少的話,也會造成系統丟數據的風險。

二、Kafka的優點

2.1 解耦

在項目啟動之初來預測將來項目會碰到什麼需求,是極其困難的。消息系統在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

2.2 冗餘(副本)

有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。

2.3 擴展性

因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。

2.4 靈活性&峰值處理能力

在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

2.5 可恢復性

系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復後被處理。

2.6 順序保證

在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性。

2.7 緩衝

在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩衝層來幫助任務最高效率的執行———寫入隊列的處理會盡可能的快速。該緩衝有助於控制和優化數據流經過系統的速度。

2.8 異步通信

很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然後在需要的時候再去處理它們。

先安裝zookeeper

<code>#上傳zookeeper,kafka
tar -zxvf zookeeper
tar -zxvf kafka

#修改zookeeper配置文件 zoo.cfg
vi zoo.cfg
#數據文件目錄
dataDir=/opt/module/zookeeper-3.4.11/data
#集群配置,此處沒有集群可以不配置
server.1=hadoop101:2888:3888
server.2=hadoop102:2888:3888
server.3=hadoop1032888:3888
/<code>

安裝kafka

<code>mkdir logs 
#實際的數據也就放在logs
cd config/
vi server.properties
#必須是一個唯一的值
broker.id=0
#添加,是否可以刪除topic,默認是false,不能刪除
delete.topic.enable=true
log.dirs=/opt/module/kafka/logs
#存儲的時間和大小
log.retention.hours=168
log.segment.bytes=1073741824
#zookeeper集群 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
/<code>
<code>啟動kafka
bin/kafka-server-start.sh config/server.properties

創建主題
bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --partitions 1 --replication-factor 1 --topic first

查看主題
bin/kafka-topics.sh --list --zookeeper hadoop102:2181


刪除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first

發送消息
bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first

消費消息
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --from-beginning/<code>


分享到:


相關文章: