KafkaConsumer網絡模型分析

KafkaConsumer網絡模型分析

前面我們介紹了Kafka 生產者客戶端的相關原理和網絡模型,對於客戶端消費者的網絡模型,其實現要比生產者的設計還要複雜一些。今天我們主要講述一下KafkaConsumer 網絡模型的主要設計。

消息消費 Demo

對於消費者而言,在初始化 KafkaConsumer 實例時,通常會為其設置一系列的參數。這其中通常會為消費者指定 groupId,也就是消費者組的ID。通常情況下,多個消費者實例可以設置為同一個groupId。這樣同一個 topic 不同分區上面的消息可以分流到不同的消費者實例上面去,從而實現消費的高吞吐。

核心組件

Kafka 消費者端的設計相對生產者要複雜一些,其中一個主要的原因就是,消費者組等相關概念的引入。其中包含了:ConsumerCoordinator、Fetcher、ConsumerNetworkClient 組件,其組合層級示意圖如下所示:

KafkaConsumer網絡模型分析

從上面主要組件關係可以看出,KafkaConsumer 有 2 個成員變量 ConsumerCoordinator 和 Fetcher。其中 ConsumerCoordinator 用來和服務端 Coordinator 交互通訊,提供消費者加入group 或者 reblance 的能力,也就是說在 Consumer 獲取消息之前,一定是需要在一個 group 當中的。加入 group 完成之後,就是要獲取數據了,Fetcher 組件提供了獲取消息的能力,在其中做了一些增大吞吐量的優化,將在本篇後面介紹。ConsumerNetworkClient 是 ConsumerCoordinator 和 Fetcher 共同依賴的組件,它是基於 NetworkClient 的進一步封裝。實現了Future 模式的結果獲取,和線程安全相關的實現。

消費者調用整體流程

下面我們從 KafkaConsumer.poll() 為入口,看一下核心組件之間的調用關係。

KafkaConsumer網絡模型分析

首先當我們調用 KafkaConsumer.poll() 時,首先會調用 updateAssignmentMetadataIfNeeded(),去確認當前消費者是否已經加入group。其中消費者組的協調工作是由 ConsumerCoordinator 組件提供能力的。之後會調用 pollForFetches() 執行消息拉取,拉取的工作是委派給 Fetcher 組件實現的。下面我們詳細分析一下整體流程圖中的實現。

KafkaConsumer.poll()

我們首先看一下消息發送入口的方法實現:

KafkaConsumer.poll() 方法中首先會調用 updateAssignmentMetadataIfNeeded() 更新metadata元數據信息,保證消費者正確的加入group。然後就是發送拉取的請求 pollForFetches(),下面我們詳細的看一下 pollForFetches() 的實現。

pollForFetches()

對於消息拉取的詳細流程,上面代碼中已經有註釋描述。下面我們總結一下消息拉取時,上面代碼做的一些優化操作。首先對於消息的拉取和處理主要流程如下所示:

KafkaConsumer網絡模型分析

對於KafkaConsumer.poll() 不斷的去拉取消息的場景,此時如果兩次拉取是串行的,這個時候可能就沒有充分的利用KafkaConsumer 客戶端的能力。因此我們從上面的代碼中可以看出,在調用完 fetcher.fetchedRecords() 獲取到結果之後,會異步的再去發起請求(fetcher.sendFetches())和輪詢(client.poll()),以供下次拉取立即返回結果。

KafkaConsumer網絡模型分析

下面我們詳細對發送請求(fetcher.sendFetches())、獲取結果(fetcher.fetchedRecords())做詳細介紹。

發送請求 Fetcher.sendFetches()

消費者 Fetcher 組件發送拉取請求的時候,也是和生產這類似,按照 Broker 的維度去發送請求。對於訂閱的分區所屬的節點信息,是存儲在metadata 元數據信息裡面的;對於消費者分區的消費位移 offset 是存儲在訂閱狀態(SubscriptionState)中的。具體可用下面的流程圖展示:

KafkaConsumer網絡模型分析

簡單的描述一下上面的流程圖:

(1)消費者向協調者申請加入 group,並得到分配給它的分區信息。

(2)集群元數據記錄了分區及所屬主副本節點的信息。

(3)消費者訂閱狀態記錄了分區最近的拉取偏移量 offset 信息。

(4)Fetcher 發送請求時,會將所有分區按照Broker(主副本)的維度進行整理組裝 FetchRequest。

(5)每個主副本對應一個FetchRequest,然後Fetcher 向Broker 發送請求。

下面我們看下具體的代碼實現:

獲取結果 Fetcher.fetchedRecords()

Fetcher 組件獲取結果可能會直接利用上一次 KafkaConsumer.poll() 的 FetchRequest發送。此時如果我們假設 KafkaConsumer 訂閱了 P0、P1、P2 三個分區,每次client.poll() 輪詢會拿到4條消息,而一次fetch() 操作最多隻可以獲取2條消息(max.poll.records 設置的閥值)。此時可能會有如下流程:

KafkaConsumer網絡模型分析

1、第一次調用 KafkaConsumer.poll() 獲取消息時,允許拉取的分區是 P0、P1、P2(因為如果分區有未處理完成的記錄,則不允許從服務端拉取,此時3個分區都沒有未處理的消息在緩存中)。此時假設 Broker 端返回了 P0 的 0、1、2、3 四條消息,並存放在 P0 對應的緩存當中,同時返回的結果集只能給出 2 條,也就是 P0 的 0、1 。此時分區隊列中的順序還是 P0->P1->P2,因為P0 中的消息尚未處理完成,下一次 KafkaConsumer.poll() 還會繼續從 P0 分區緩存中獲取消息。

2、第二次調用 KafkaConsumer.poll() 獲取消息時,允許拉取的分區是 P1、P2(此時P0尚有未處理完成的消息)。此時假設 Broker 返回了 P1 的 0、1、2、3 四條消息,並存放在 P1 對應的緩存中,但是此時給出的確是上面 P0 緩存中剩下的 2、3 兩條消息。之後分區隊列中的順序變為 P1->P2->P0,下一次調用 KafkaConsumer.poll() 獲取消息時,會首先從 P1 對應的緩存中獲取數據。

3、第三次調用 KafkaConsumer.poll() 獲取消息時,允許拉取的分區是 P0、P2(此時P0在緩存中的消息已經拉取完畢)。此時假設 Broker 返回了 P2的 0、1、2、3 四條消息,並存放在P2對應的緩存中,此時返回的是上一次結束分區隊列頭部的分區緩存中的數據,此時返回了 P1的0、1 兩條消息。之後分區隊列中的順序不變,還是 P1->P2->P0,因為此時P1 緩存尚有數據。

4、第四次調用KafkaConsumer.poll() 獲取消息時,只有P0 分區可以被拉取。此時假設 Broker 返回了 P0 的4、5、6、7 四條消息,並存放在P0對應的緩存中,此時返回了P1 的 2、3 兩條消息,分區隊列變為P2->P0->P1。

5、第五次調用KafkaConsumer.poll() 獲取消息時,因為P1 緩存中的數據處理完了,此時只有P1 可被拉取。此時假設 Broker 返回了 P1 的 4、5、6、7 四條消息,並存放到P1對應的緩存中,此時返回了P2 的0、1 兩條消息,分區隊列依然為P2->P0->P1。

6、第六次調用KafkaConsumer.poll() 獲取消息時,此時P0、P1、P2 分區對應的緩存中都有數據,此時沒有分區可被拉取。此時直接返回P2 的2、3 兩條消息。分區隊列變為 P0->P1->P2。

7、第七次調用KafkaConsumer.poll() 獲取消息時,此時只有P2 可以被拉取。此時假設 Broker 返回了 P2的 4、5、6、7 四條消息,並存放到 P2 對應的緩存中。此時返回了P0 的4、5 兩條消息,分區隊列依然為 P0->P1->P2。

8、第八次調用KafkaConsumer.poll() 獲取消息時,此時無分區可被拉取。此時返回P0 的6、7 兩條消息。分區隊列變為P1->P2->P0。

下面我們看一下詳細的代碼實現:

/**
 * 拉取數據(poll())完成後, 存儲在 completedFetches 緩存中的數據尚未解析. 此時調用 fetchedRecords() 解析並返回
 * @return 按照分區維度的消息記錄
 */
public Map>> fetchedRecords() {
	Map>> fetched = new HashMap 
<>(); int recordsRemaining = maxPollRecords; try { while (recordsRemaining > 0) { if (nextInLineRecords == null || nextInLineRecords.isFetched) { /** 上一個分區緩存中數據已處理完,則從分區隊列中獲取下一個分區緩存數據 */ CompletedFetch completedFetch = completedFetches.peek(); if (completedFetch == null) break; try { /** 解析分區緩存數據 CompletedFetch, 得到一個 PartitionRecords */ nextInLineRecords = parseCompletedFetch(completedFetch); } catch (Exception e) { FetchResponse.PartitionData partition = completedFetch.partitionData; if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) { completedFetches.poll(); } throw e; } completedFetches.poll(); } else { // 從分區緩存中獲取指定條數的消息 List> records = fetchRecords(nextInLineRecords, recordsRemaining); TopicPartition partition = nextInLineRecords.partition; if (!records.isEmpty()) { List> currentRecords = fetched.get(partition); if (currentRecords == null) { fetched.put(partition, records); } else { // this case shouldn't usually happen because we only send one fetch at a time per partition, // but it might conceivably happen in some rare cases (such as partition leader changes). // we have to copy to a new list because the old one may be immutable List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); newRecords.addAll(currentRecords); newRecords.addAll(records); fetched.put(partition, newRecords); } recordsRemaining -= records.size(); } } } } catch (KafkaException e) { if (fetched.isEmpty()) throw e; } return fetched; }

參考:《Apache Kafka 源碼剖析》、《Kafka技術內幕》


分享到:


相關文章: