從源碼看RocketMQ的消費端負載均衡和Rebalance機制

Consumer的負載均衡

RocketMQ在消費端的負載均衡如下圖所示,各個partition均勻分佈在各個consumer上,如下圖所示:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

所有consumer依次消費每一個partition,如果partition數量不是consumer的整數倍,則排在前面的consumer會消費更多的partition,下面可以看看consumer的實現。

Rebalance的實現

RocketMQ的consumer負載均衡依賴於RebalanceImpl類,以push的方式為例,在DefaultMQPushConsumerImpl為例,其中包含RebalancePushImpl:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

RebalanceImpl負責消費端的負載均衡,其中的doRebalance方法:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

我們再進入到rebalanceImpl的doRebalance方法,其中調用了rebalanceByTopic,我們看看rebalanceByTopic中的邏輯:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

可以看到,其主體邏輯比較簡單:

· 先獲取topic下的MessageQueue,一個MessageQueue實際上就是一個partition

· 然後獲取當前topic和group的client id,即當前group中消費此topic的客戶端

· 隨後對partition和client id做排序

· 然後調用strategy獲取當前客戶端需要消費的partition

· 最後更新訂閱

因此,負載均衡的主體算法在AllocateMessageQueueStrategy中實現,通過DefaultMQPushConsumer的默認構造器我們可以看到,默認使用的AllocateMessageQueueStrategy是AllocateMessageQueueAveragely實現類:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

找到具體的實現類後,我們可以看到默認使用的負載均衡算法:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

公式寫的非常繞,代幾個數進去算一下就知道,默認情況下,rocketmq使用的是連續分配的方式,示意圖如下:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

AllocateMessageQueueStrategy提供了多個實現:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

· AllocateMessageQueueAveragely是前面講的默認方式

· AllocateMessageQueueAveragelyByCircle則是本文最前面的示意圖,每個消費者依次消費一個partition

· AllocateMessageQueueConsistentHash使用的是一致性hash算法

· AllocateMachineRoomNearby是通過就近元則,離的近的消費

· AllocateMessageQueueByConfig是通過配置的方式

在不同的情況下,我們可以選擇使用不同的負載均衡實現方式。

對於特定場景,甚至可以自己實現負載均衡策略,比如我們的應用需要消費非常多個topic,而每個topic的partition不一定剛才都是機器 數量的整數倍,這個時候,按照rocketmq提供的負載均衡策略,排在前面的consumer消費的partition數量會多於後面的consumer,當topic非常多時,這就導致排在前面的consumer消費的partition比後面的consumer要多很多,造成集群中不同機器的水位相差非常大,這種場景下就知道自己實現負載均衡策略

何時重新Rebalace

這裡先要介紹一個類MQClientInstance,此類在DefaultMQPushConsumerImpl的start方法中有如下代碼:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

這裡的mQClientFactory的實現類實際上就是一個MQClientInstance,進入到MQClientInstance類的構造器中,我們可以看到它創建了一個RebalanceService對象,代碼如下:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

我們一級級的看下去,在RebalanceService的run方法中,可以看到,默認每20s調一次doRebalance:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

而在父類ServiceThread中,我們可以看到run方法的調用方式,實際上是創建了一個線程:

從源碼看RocketMQ的消費端負載均衡和Rebalance機制

因此,當一個consumer出現宕機後,默認最多20s,其它機器將重新消費已宕機的機器消費的partition,同樣當有新的consumer連接上後,20s內也會完成rebalance使得新的consumer有機會消費partition


分享到:


相關文章: