Consumer的負載均衡
RocketMQ在消費端的負載均衡如下圖所示,各個partition均勻分佈在各個consumer上,如下圖所示:
所有consumer依次消費每一個partition,如果partition數量不是consumer的整數倍,則排在前面的consumer會消費更多的partition,下面可以看看consumer的實現。
Rebalance的實現
RocketMQ的consumer負載均衡依賴於RebalanceImpl類,以push的方式為例,在DefaultMQPushConsumerImpl為例,其中包含RebalancePushImpl:
RebalanceImpl負責消費端的負載均衡,其中的doRebalance方法:
我們再進入到rebalanceImpl的doRebalance方法,其中調用了rebalanceByTopic,我們看看rebalanceByTopic中的邏輯:
可以看到,其主體邏輯比較簡單:
· 先獲取topic下的MessageQueue,一個MessageQueue實際上就是一個partition
· 然後獲取當前topic和group的client id,即當前group中消費此topic的客戶端
· 隨後對partition和client id做排序
· 然後調用strategy獲取當前客戶端需要消費的partition
· 最後更新訂閱
因此,負載均衡的主體算法在AllocateMessageQueueStrategy中實現,通過DefaultMQPushConsumer的默認構造器我們可以看到,默認使用的AllocateMessageQueueStrategy是AllocateMessageQueueAveragely實現類:
找到具體的實現類後,我們可以看到默認使用的負載均衡算法:
公式寫的非常繞,代幾個數進去算一下就知道,默認情況下,rocketmq使用的是連續分配的方式,示意圖如下:
AllocateMessageQueueStrategy提供了多個實現:
· AllocateMessageQueueAveragely是前面講的默認方式
· AllocateMessageQueueAveragelyByCircle則是本文最前面的示意圖,每個消費者依次消費一個partition
· AllocateMessageQueueConsistentHash使用的是一致性hash算法
· AllocateMachineRoomNearby是通過就近元則,離的近的消費
· AllocateMessageQueueByConfig是通過配置的方式
在不同的情況下,我們可以選擇使用不同的負載均衡實現方式。
對於特定場景,甚至可以自己實現負載均衡策略,比如我們的應用需要消費非常多個topic,而每個topic的partition不一定剛才都是機器 數量的整數倍,這個時候,按照rocketmq提供的負載均衡策略,排在前面的consumer消費的partition數量會多於後面的consumer,當topic非常多時,這就導致排在前面的consumer消費的partition比後面的consumer要多很多,造成集群中不同機器的水位相差非常大,這種場景下就知道自己實現負載均衡策略
何時重新Rebalace
這裡先要介紹一個類MQClientInstance,此類在DefaultMQPushConsumerImpl的start方法中有如下代碼:
這裡的mQClientFactory的實現類實際上就是一個MQClientInstance,進入到MQClientInstance類的構造器中,我們可以看到它創建了一個RebalanceService對象,代碼如下:
我們一級級的看下去,在RebalanceService的run方法中,可以看到,默認每20s調一次doRebalance:
而在父類ServiceThread中,我們可以看到run方法的調用方式,實際上是創建了一個線程:
因此,當一個consumer出現宕機後,默認最多20s,其它機器將重新消費已宕機的機器消費的partition,同樣當有新的consumer連接上後,20s內也會完成rebalance使得新的consumer有機會消費partition
閱讀更多 無醉 的文章