04.13 架構師基本操作Kafka消息投遞語義-不丟不重

介紹

架構師基本操作Kafka消息投遞語義-不丟不重

kafka支持3種消息投遞語義:

  • At most once——最多一次,消息可能會丟失,但不會重複

  • At least once——最少一次,消息不會丟失,可能會重複

  • Exactly once——只且一次,消息不丟失不重複,只且消費一次。

但是整體的消息投遞語義需要Producer端和Consumer端兩者來保證。

Producer 消息生產者端

一個場景例子:

當producer向broker發送一條消息,這時網絡出錯了,producer無法得知broker是否接受到了這條消息。

網絡出錯可能是發生在消息傳遞的過程中,也可能發生在broker已經接受到了消息,並返回ack給producer的過程中。

這時,producer只能進行重發,消息可能會重複,但是保證了at least once。

0.11.0的版本通過給每個producer一個唯一ID,並且在每條消息中生成一個sequence num,

這樣就能對消息去重,達到producer端的exactly once。

這裡還涉及到producer端的acks設置和broker端的副本數量,以及min.insync.replicas的設置。

比如producer端的acks設置如下:

acks=0 //消息發了就發了,不等任何響應就認為消息發送成功

acks=1 //leader分片寫消息成功就返回響應給producer

acks=all(-1) //當acks=all, min.insync.replicas=2,就要求INSRNC列表中必須要有2個副本都寫成功,才返回響應給producer,

如果INSRNC中已同步副本數量不足2,就會報異常,如果沒有2個副本寫成功,也會報異常,消息就會認為沒有寫成功。

如果想學習Java工程化、高性能及分佈式、微服務、源碼分析的朋友可以加我的Java進階群:582505643,群裡有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給大家。

架構師基本操作Kafka消息投遞語義-不丟不重

Broker 消息接收端

上文說過acks=1,表示當leader分片副本寫消息成功就返回響應給producer,此時認為消息發送成功。

如果leader寫成功單馬上掛了,還沒有將這個寫成功的消息同步給其他的分片副本,那麼這個分片此時的ISR列表為空,

如果unclean.leader.election.enable=true,就會發生log truncation(日誌截取),同樣會發生消息丟失。

如果unclean.leader.election.enable=false,那麼這個分片上的服務就不可用了,producer向這個分片發消息就會拋異常。

所以我們設置min.insync.replicas=2,unclean.leader.election.enable=false,producer端的acks=all,這樣發送成功的消息就絕不會丟失。

Consumer 消息消費者端

所有分片的副本都有自己的log文件(保存消息)和相同的offset值。當consumer沒掛的時候,offset直接保存在內存中,

如果掛了,就會發生負載均衡,需要consumer group中另外的consumer來接管並繼續消費。

consumer消費消息的方式有以下2種;

  1. consumer讀取消息,保存offset,然後處理消息。

    現在假設一個場景:保存offset成功,但是消息處理失敗,consumer又掛了,這時來接管的consumer

    就只能從上次保存的offset繼續消費,這種情況下就有可能丟消息,但是保證了at most once語義。

  2. consumer讀取消息,處理消息,處理成功,保存offset。

    如果消息處理成功,但是在保存offset時,consumer掛了,這時來接管的consumer也只能

    從上一次保存的offset開始消費,這時消息就會被重複消費,也就是保證了at least once語義。

以上這些機制的保證都不是直接一個配置可以解決的,而是你的consumer代碼來完成的,只是一個處理順序先後問題。

第一種對應的代碼:

List<string> messages = consumer.poll();consumer.commitOffset();processMsg(messages);/<string>

第二種對應的代碼:

List<string> messages = consumer.poll();processMsg(messages);consumer.commitOffset();/<string> 

Exactly Once實現原理

下面詳細說說exactly once的實現原理。

Producer端的消息冪等性保證

每個Producer在初始化的時候都會被分配一個唯一的PID,

Producer向指定的Topic的特定Partition發送的消息都攜帶一個sequence number(簡稱seqNum),從零開始的單調遞增的。

Broker會將Topic-Partition對應的seqNum在內存中維護,每次接受到Producer的消息都會進行校驗;

只有seqNum比上次提交的seqNum剛好大一,才被認為是合法的。比它大的,說明消息有丟失;比它小的,說明消息重複發送了。

以上說的這個只是針對單個Producer在一個session內的情況,假設Producer掛了,又重新啟動一個Producer被而且分配了另外一個PID,

這樣就不能達到防重的目的了,所以kafka又引進了Transactional Guarantees(事務性保證)。

Transactional Guarantees 事務性保證

kafka的事務性保證說的是:同時向多個TopicPartitions發送消息,要麼都成功,要麼都失敗。

為什麼搞這麼個東西出來?我想了下有可能是這種例子:

用戶定了一張機票,付款成功之後,訂單的狀態改了,飛機座位也被佔了,這樣相當於是

2條消息,那麼保證這個事務性就是:向訂單狀態的Topic和飛機座位的Topic分別發送一條消息,

這樣就需要kafka的這種事務性保證。

這種功能可以使得consumer offset的提交(也是向broker產生消息)和producer的發送消息綁定在一起。

用戶需要提供一個唯一的全局性TransactionalId,這樣就能將PID和TransactionalId映射起來,就能解決

producer掛掉後跨session的問題,應該是將之前PID的TransactionalId賦值給新的producer。

Consumer端

以上的事務性保證只是針對的producer端,對consumer端無法保證,有以下原因:

  1. 壓實類型的topics,有些事務消息可能被新版本的producer重寫

  2. 事務可能跨坐2個log segments,這時舊的segments可能被刪除,就會丟消息

  3. 消費者可能尋址到事務中任意一點,也會丟失一些初始化的消息

  4. 消費者可能不會同時從所有的參與事務的TopicPartitions分片中消費消息

如果是消費kafka中的topic,並且將結果寫回到kafka中另外的topic,

可以將消息處理後結果的保存和offset的保存綁定為一個事務,這時就能保證

消息的處理和offset的提交要麼都成功,要麼都失敗。

如果是將處理消息後的結果保存到外部系統,這時就要用到兩階段提交(tow-phase commit),

但是這樣做很麻煩,較好的方式是offset自己管理,將它和消息的結果保存到同一個地方,整體上進行綁定,

可以參考Kafka Connect中HDFS的例子。

如果想學習Java工程化、高性能及分佈式、微服務、源碼分析的朋友可以加我的Java進階群:582505643,群裡有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給大家。

架構師基本操作Kafka消息投遞語義-不丟不重


分享到:


相關文章: