kafka原理及應用

kafka原理及應用

producer:消息生產者,發佈消息到kafka集群的終端或服務

broker:kafka集群中包含的服務器。

topic:每條發佈到kafaka集群的消息屬於的類別,即kafka是面向topic的。

partition:partition是物理上的概念,每個topic包含一個或多個partition。kafka分配的單位是partition

consumer:從kafka集群中消費消息的終端或服務器

consumer group:high-level consumer api中,每個consumer都屬於一個consumer group,每條消息只能被consumer group中的一個Consumer消費,但可以被多個consumer group消費

replica:partition的副本,保證partition的高可用。

leader:replica中的一個角色,producer和consumer只跟leader交互

follower:replica中的一個角色,從leader中複製數據

controller:kafka集群中的其中一個服務器,用來進行leader election以及各種failover.

zookeeper:kafka通過zookeeper來存儲集群的meta信息。

kafka原理及應用

producer發佈消息

producer採用push模式將消息發佈到broker,每條消息都被append到patition中,屬於順序寫磁盤(順序寫磁盤效率比隨機寫內存藥膏,保證kafka吞吐率)

消息路由

producer發送消息到broker時,會根據分區算法選擇將其存儲到哪一個partition,其路由機制為:

指定了partition,則直接使用;未指定partition但指定key,通過key的value進行hash選出一個partition;partition和key都為指定,使用輪詢選出一個partition.

producer先從zookeeper的“brokers/.../state”節點找到該partition的leader

producer將消息發送給該leader

leader將消息寫入本地log

followers從leader pull消息,寫入本地log後leader發送 ACK

leader收到所有ISR中的replica的ACK後,增加HW(high watermark最後commit的offset)並向producer發送ACK

producer delivery guarantee

At most once 消息可能丟失,但絕不會重複傳輸

At least one 消息絕不會丟,但可能會重複傳輸

Exactly once 每條消息肯定會被傳遞一次且僅傳輸一次

當producer向broker發送消息時,一旦這條消息被commit,由於replication的存在,它就不會丟,但是如果producer發送數據給broker後,遇到網絡問題造成通信中斷,那producer就無法判斷該條消息是否已經commit.雖然kafka無法確定網絡故障功能期間發生了什麼,但是producer可以生成一種類似於註解的東西,發生故障時冪等性的重試多次,這樣就做到Exactly once。但目前還並未實現,所以目前默認情況下一條消息從producer到broker是確保了At least once,可通過設置producer的一部發送實現At most once.

broker保存消息

物理上把topic分成一個或多個partition(對於server.properties中的num.partiton=3配置),每個partition物理上對於一個文件夾(該文件夾存儲該partition的所有消息和索引文件)

物理消息是否被消費,kafka都會保留所有消息,有兩種策略可以刪除舊數據:1.基於時間:log.retention.hours=168;2.基於大小:log.retention.bytes=1073741824

需要注意的是,因為kafka讀取特定消息的時間複雜度為O(1),即與文件大小無關,所以刪除過期文件與提高kafka性能無關。

topic創建於刪除

1.controller在zookeeper的/brokers/topics節點上註冊watcher,當topic被創建,則controller會通過watch得到該topic的partition的partition/replica分配。

2.controller從/brokers/ids讀取當前所有可用的broker列表,對於 set_p 中的每一個 partition:1.從分配給該partition的所有replica(AR)中選取一個可用的broker作為新的leader,並將AR設置為新的ISR;2.將新的leader和ISR寫入/brokers/topics/[topic]/partitions/[partition]/state

3.controller通過RPC向相關的broker發送LeaderAndISRRequest.

刪除topic

1.controller在zookeeper的/brokers/topics節點註冊watcher,當topic被刪除,則controller會通過watch得到該topic的partition/replica分配。

2.若delete.topic.enable=false,結束;否則controller註冊在/admin/delete_topics上的watch被fire,controller通過回調向對於的broker發送StopReplicaRequest.

kafka HA

replication

同一個partition可能會有多個replica(對應server.properties中的default.replication.factor=N),沒有replica的情況下,一旦broker單機,其上所有partition的數據都可能不可被消費,同時producer也不能再將數據存於其上的partition,引入replication後,同一個partition可能會有多個replica,這是需要在這些replica之間選出一個leader,producer和consumer只與這個leader交互,其他replica作為follower從leader中複製數據。

Kafka 分配 Replica 的算法如下:

1. 將所有 broker(假設共 n 個 broker)和待分配的 partition 排序

2. 將第 i 個 partition 分配到第(i mod n)個 broker 上

3. 將第 i 個 partition 的第 j 個 replica 分配到第((i + j) mode n)個 broker上

leader failover

當partition對應的leader宕機時,需要從follower中選舉出新的leader,在選舉新leader時,一個基本原則是,新的leader必須擁有舊的leader commit過的所有信息。

kafka在zookeeper中/brokers/..../state動態維護了一個ISR(in-sync replicas),ISR裡面的所有replica都跟上了leader,只有ISR裡面的成員才能選為leader,對於f+1個replica,一個partition可以在容忍f個replica失效的情況下保證消息不丟失。

當所有 replica 都不工作時,有兩種可行的方案:

1.等待ISR中的任一個replica活過來,並選它作為leader,可保證數據不丟失,但時間可能相對較長。

2.選擇第一個活過來的replica(不一定是ISR成員)作為leader,無法保證數據不丟失,但相對不可用時間短。

broker failover

1.controller在zookeeper的/brokers/ids/[brokerId]節點註冊watcher,當broker宕機時zookeeper會fire watch

2.controller從/brokers/ids節點讀取可用broker

3.controller決定set_p,該集合包含宕機broker上的所有partition

4.對set_p中的每個partition:從/brokers/topics/[topic]/partitions/[partition]/state節點讀取ISR;決定新leader;將新leader,ISR,controller_epoch和leader_epoch等信息寫入stae節點

5.通過RPC向相關broker發送leaderAndISRRequest命令

controller failover

當 controller 宕機時會觸發 controller failover。每個 broker 都會在 zookeeper 的 "/controller" 節點註冊 watcher,當 controller 宕機時 zookeeper 中的臨時節點消失,所有存活的 broker 收到 fire 的通知,每個 broker 都嘗試創建新的 controller path,只有一個競選成功並當選為 controller。

當新的 controller 當選時,會觸發 KafkaController.onControllerFailover 方法,在該方法中完成如下操作:

1. 讀取並增加 Controller Epoch。

2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上註冊 watcher。

3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上註冊 watcher。

4. 通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上註冊 watcher。

5. 若 delete.topic.enable=true(默認值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上註冊 watcher。

6. 通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上註冊Watch。

7. 初始化 ControllerContext 對象,設置當前所有 topic,“活”著的 broker 列表,所有 partition 的 leader 及 ISR等。

8. 啟動 replicaStateMachine 和 partitionStateMachine。

9. 將 brokerState 狀態設置為 RunningAsController。

10. 將每個 partition 的 Leadership 信息發送給所有“活”著的 broker。

11. 若 auto.leader.rebalance.enable=true(默認值是true),則啟動 partition-rebalance 線程。

12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應的Topic。

consumer 消費消息

consumer API

kafka 提供了兩套 consumer API:

1. The high-level Consumer API

2. The SimpleConsumer API

其中 high-level consumer API 提供了一個從 kafka 消費數據的高層抽象,而 SimpleConsumer API 則需要開發人員更多地關注細節。

The high-level consumer API

high-level consumer API 提供了 consumer group 的語義,一個消息只能被 group 內的一個 consumer 所消費,且 consumer 消費消息時不關注 offset,最後一個 offset 由 zookeeper 保存。

使用 high-level consumer API 可以是多線程的應用,應當注意:

1. 如果消費線程大於 patition 數量,則有些線程將收不到消息

2. 如果 patition 數量大於線程數,則有些線程多收到多個 patition 的消息

3. 如果一個線程消費多個 patition,則無法保證你收到的消息的順序,而一個 patition 內的消息是有序的

The SimpleConsumer API

如果你想要對 patition 有更多的控制權,那就應該使用 SimpleConsumer API,比如:

1. 多次讀取一個消息

2. 只消費一個 patition 中的部分消息

3. 使用事務來保證一個消息僅被消費一次

但是使用此 API 時,partition、offset、broker、leader 等對你不再透明,需要自己去管理。你需要做大量的額外工作:

1. 必須在應用程序中跟蹤 offset,從而確定下一條應該消費哪條消息

2. 應用程序需要通過程序獲知每個 Partition 的 leader 是誰

3. 需要處理 leader 的變更

使用 SimpleConsumer API 的一般流程如下:

1. 查找到一個“活著”的 broker,並且找出每個 partition 的 leader

2. 找出每個 partition 的 follower

3. 定義好請求,該請求應該能描述應用程序需要哪些數據

4. fetch 數據

5. 識別 leader 的變化,並對之作出必要的響應

consumer group

如 2.2 節所說, kafka 的分配單位是 patition。每個 consumer 都屬於一個 group,一個 partition 只能被同一個 group 內的一個 consumer 所消費(也就保障了一個消息只能被 group 內的一個 consuemr 所消費),但是多個 group 可以同時消費這個 partition。

kafka 的設計目標之一就是同時實現離線處理和實時處理,根據這一特性,可以使用 spark/Storm 這些實時處理系統對消息在線處理,同時使用 Hadoop 批處理系統進行離線處理,還可以將數據備份到另一個數據中心,只需要保證這三者屬於不同的 consumer group。如下圖所示:

kafka原理及應用

消費方式

consumer 採用 pull 模式從 broker 中讀取數據。

push 模式很難適應消費速率不同的消費者,因為消息發送速率是由 broker 決定的。它的目標是儘可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費消息。

對於 Kafka 而言,pull 模式更合適,它可簡化 broker 的設計,consumer 可自主控制消費消息的速率,同時 consumer 可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。

consumer delivery guarantee

如果將 consumer 設置為 autocommit,consumer 一旦讀到數據立即自動 commit。如果只討論這一讀取消息的過程,那 Kafka 確保了 Exactly once。

但實際使用中應用程序並非在 consumer 讀取完數據就結束了,而是要進行進一步處理,而數據處理與 commit 的順序在很大程度上決定了consumer delivery guarantee:

1.讀完消息先 commit 再處理消息。

這種模式下,如果 consumer 在 commit 後還沒來得及處理消息就 crash 了,下次重新開始工作後就無法讀到剛剛已提交而未處理的消息,這就對應於 At most once

2.讀完消息先處理再 commit。

這種模式下,如果在處理完消息之後 commit 之前 consumer crash 了,下次重新開始工作時還會處理剛剛未 commit 的消息,實際上該消息已經被處理過了。這就對應於 At least once。

3.如果一定要做到 Exactly once,就需要協調 offset 和實際操作的輸出。

精典的做法是引入兩階段提交。如果能讓 offset 和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支持兩階段提交。比如,consumer 拿到數據後可能把數據放到 HDFS,如果把最新的 offset 和數據本身一起寫到 HDFS,那就可以保證數據的輸出和 offset 的更新要麼都完成,要麼都不完成,間接實現 Exactly once。(目前就 high-level API而言,offset 是存於Zookeeper 中的,無法存於HDFS,而SimpleConsuemr API的 offset 是由自己去維護的,可以將之存於 HDFS 中)

總之,Kafka 默認保證 At least once,並且允許通過設置 producer 異步提交來實現 At most once(見文章《

kafka consumer防止數據丟失》)。而 Exactly once 要求與外部存儲系統協作,幸運的是 kafka 提供的 offset 可以非常直接非常容易得使用這種方式。

更多關於 kafka 傳輸語義的信息請參考《Message Delivery Semantics》。

consumer rebalance

當有 consumer 加入或退出、以及 partition 的改變(如 broker 加入或退出)時會觸發 rebalance。consumer rebalance算法如下:

1. 將目標 topic 下的所有 partirtion 排序,存於PT

2. 對某 consumer group 下所有 consumer 排序,存於 CG,第 i 個consumer 記為 Ci

3. N=size(PT)/size(CG),向上取整

4. 解除 Ci 對原來分配的 partition 的消費權(i從0開始)

5. 將第i*N到(i+1)*N-1個 partition 分配給 Ci

在 0.8.*版本,每個 consumer 都只負責調整自己所消費的 partition,為了保證整個consumer group 的一致性,當一個 consumer 觸發了 rebalance 時,該 consumer group 內的其它所有其它 consumer 也應該同時觸發 rebalance。這會導致以下幾個問題:

1.Herd effect

  任何 broker 或者 consumer 的增減都會觸發所有的 consumer 的 rebalance

2.Split Brain

  每個 consumer 分別單獨通過 zookeeper 判斷哪些 broker 和 consumer 宕機了,那麼不同 consumer 在同一時刻從 zookeeper 看到的 view 就可能不一樣,這是由 zookeeper 的特性決定的,這就會造成不正確的 reblance 嘗試。

3. 調整結果不可控

  所有的 consumer 都並不知道其它 consumer 的 rebalance 是否成功,這可能會導致 kafka 工作在一個不正確的狀態。

kafka文件存儲結構

kafka原理及應用

每一個partition目錄下的文件被平均切割成大小相等(默認一個文件是500兆,可以手動去設置)的數據文件,

每一個數據文件都被稱為一個段(segment file),但每個段消息數量不一定相等,這種特性能夠使得老的segment可以被快速清除。

默認保留7天的數據。

Segment file是什麼?

生產者生產的消息按照一定的分組策略被髮送到broker中partition中的時候,這些消息如果在內存中放不下了,就會放在文件中,

partition在磁盤上就是一個目錄,該目錄名是topic的名稱加上一個序號,在這個partition目錄下,有兩類文件,一類是以log為後綴的文件,

一類是以index為後綴的文件,每一個log文件和一個index文件相對應,這一對文件就是一個segment file,也就是一個段。

其中的log文件就是數據文件,裡面存放的就是消息,而index文件是索引文件,索引文件記錄了元數據信息。

kafka原理及應用

Segment文件命名的規則:partition全局的第一個segment從0(20個0)開始,後續的每一個segment文件名是上一個segment文件中最後一條消息的offset值。

索引文件(index文件)中存儲著大量的元數據,而數據文件(log文件)中存儲著大量的消息。

索引文件(index文件)中的元數據指向對應的數據文件(log文件)中消息的物理偏移地址。

kafka原理及應用

其中每個partiton中所持有的segments列表信息會存儲在zookeeper中.

日誌文件的刪除策略非常簡單:啟動一個後臺線程定期掃描log file列表,把保存時間超過閥值的文件直接刪除(根據文件的創建時間).為了避免刪除文件時仍然有read操作(consumer消費),採取copy-on-write方式.

為什麼選kafka?kafka消息會不會丟?

1.利用Partition實現並行處理

Kafka中的每個Topic都包含一個或多個Partition,且它們位於不同節點。同時,Partition在物理上對應一個本地文件夾,每個Partition包含一個或多個Segment,其中包含一個數據文件與一個索引文件。Partition像一個數組,可以通過索引(offset)去訪問其數據。

2.ISR實現CAP中可用性與數據一致性的動態平衡

Kafka的數據複製方案接近於Master-Slave,不同的是,Kafka既不是完全的同步複製,也不是完全的一步複製,而是基於ISR的動態複製方案。

ISR,In-Sync Replica,每個Partition的Leader都會維護這樣一個列表,其中包含了所有與之同步的Replica。每次寫入數據時,只有ISR中的所有Replica都複製完,Leader才會將這條數據置為Commit,它才能被Consumer消費。

3.順序寫磁盤

將寫磁盤的過程變為順序寫,可極大提高對磁盤的利用率。Consumer通過offset順序消費這些數據,且不刪除已經消費的數據,從而避免隨機寫磁盤的過程。

4.減少網絡開銷

批處理減少了網絡傳輸的overhead,又提高了寫磁盤的效率。

Kafka的API中,從send接口來看,一次只能發送一個ProducerRecord,但是send方法並不是立即將消息發送出去,而是通過batch.size和linger.ms控制實際發送頻率,從而實現批量發送。

kafka消息發送分同步,異步兩種方式,默認是同步方式,可通過producer.type屬性進行配置。kafka保證消息被安全生產,有三個選項0,-1,1;通過request.required.acks屬性進行配置.

0代表:不進行消息接收是否成功的確認(默認值);

1代表:當Leader副本接收成功後,返回接收成功確認信息;

-1代表:當Leader和Follower副本都接收成功後,返回接收成功確認信息;

消息丟失的場景

網絡異常

acks設置為0時,不和Kafka集群進行消息接受確認,當網絡發生異常等情況時,存在消息丟失的可能;

客戶端異常

異步發送時,消息並沒有直接發送至Kafka集群,而是在Client端按一定規則緩存並批量發送。在這期間,如果客戶端發生死機等情況,都會導致消息的丟失;

緩衝區滿了

異步發送時,Client端緩存的消息超出了緩衝池的大小,也存在消息丟失的可能;

Leader副本異常

acks設置為1時,Leader副本接收成功,Kafka集群就返回成功確認信息,而Follower副本可能還在同步。這時Leader副本突然出現異常,新Leader副本(原Follower副本)未能和其保持一致,就會出現消息丟失的情況;

以上就是消息丟失的幾種情況,在日常應用中,我們需要結合自身的應用場景來選擇不同的配置。

想要更高的吞吐量就設置:異步、ack=0;想要不丟失消息數據就選:同步、ack=-1策略


分享到:


相關文章: