RocketMQ客戶端加載流程

 這節介紹RocketMQ客戶端的啟動流程,即Consumer和Producer的啟動流程。

1. 客戶端demo

 首先先看下客戶端的demo

Producer:

<code>public class SyncProducer {

    public static void main (String[] args) throws Exception {
        // 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer ("GroupTest");
        // 設置NameServer的地址
        producer.setNamesrvAddr ("localhost:9876");
        // 啟動Producer實例
        producer.start ();
        for (int i = 0; i < 100; i++) {
            // 創建消息,並指定Topic,Tag和消息體
            Message msg = new Message ("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 發送消息到一個Broker
            SendResult sendResult = producer.send (msg);
            // 通過sendResult返回消息是否成功送達
            System.out.printf ("%s%n", sendResult);
        }
        // 如果不再發送消息,關閉Producer實例。
        producer.shutdown ();
    }
}/<code>


Consumer:

<code>public class Consumer {

    public static void main (String[] args) throws InterruptedException, MQClientException {

        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest");

        // 設置NameServer的地址
        consumer.setNamesrvAddr ("localhost:9876");

        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe ("TopicTest", "*");
        // 註冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener (new MessageListenerConcurrently () {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage (List msgs, ConsumeConcurrentlyContext context) {
                System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs);
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者實例
        consumer.start ();
        System.out.printf ("Consumer Started.%n");
    }
}/<code>

Producer和Consumer的啟動類似,在初始化然後進行必要設置(主要是客戶端所屬的Group和NameServer地址)後,執行start方法啟動後臺監聽服務,事實上Producer和Consumer都是調用同一個類MQClientInstance的start方法,下圖為繼承關係:

RocketMQ客戶端加載流程

DefaultMQproducer和DefaultMQPushConsumer都繼承自ClientConfig,顧名思義ClientConfig表示客戶端的配置,包括NameServer地址、客戶端地址、客戶端實例名等。由於Producer和Consumer都需要同Broker和NameServer交互,所以配置上有很多相同,這兩個將主要功能的實現都委託給了對應的Impl(DefaultMQProducerImpl和DefaultMQPushConsumerImpl)。Impl內部調用了MQClientInstance來完成客戶端同遠程交互的主要功能,而Producer和Consumer則封裝自己相關的行為,MQClientInstance內部又委託忒了MQClientAPIImpl。

2. Producer的啟動

 DefaultMQProducer的啟動如下:

RocketMQ客戶端加載流程

DefaultMQProducer將start委託給了DefaultMQProducerImpl來完成,主要過程為:

  • DefaultMQProducerImpl先標記客戶端當前狀態為START_FAILED(初始狀態為CREATE_JUST)
  • 調用MQClientManager的getAndCreateMQClientInstance方法獲取MQClientInstance,每個客戶端實例都會對應一個MQClientInstance,並由MQClientManager管理。MQClientManager內部使用一個Map維護各客戶端的關係,key為clientId(格式為ip@instName,instName為pid),value為MQClientInstance實例。當key不存在時則會初始化一個實例,在初始化時連帶初始化MQClientAPIImpl、NettyRemoteClient等。
  • 調用MQClientInstance的registerProducer方法,註冊當前客戶端自身。實現上是客戶端放入client實例緩存中,定時器定時上報,後面會說。
  • 調用MQClientInstance的start方法,啟動客戶端的後臺任務,該方法是重點,後面會介紹。
  • 標記客戶端當前狀態為RUNNING
  • 調用MQClientInstance的sendHeartbeatToAllBrokerWithLock方法,向所有Broker上報心跳

3. Consumer的啟動

 DefaultMQPushConsumer的啟動如下:

RocketMQ客戶端加載流程

DefaultMQPushConsumer同樣將start委託給了DefaultMQPushConsumerImpl來完成,流程上也相似。但相比DefaultMQProducer多了很多其他組件來輔助消費過程,如rebalance、offset管理等,主要過程為:

  • DefaultMQPushConsumerImpl先標記客戶端當前狀態為START_FAILED(初始狀態為CREATE_JUST)
  • 同步設置RebalanceImpl的topic(Map*topic*/String,/*sub expression*/String>)信息
  • 同DefaultMQProducer一致,調用MQClientManager的getAndCreateMQClientInstance方法獲取MQClientInstance,每個客戶端實例都會對應一個MQClientInstance,並由MQClientManager管理。MQClientManager內部使用一個Map維護各客戶端的關係,key為clientId(格式為ip@instName),value為MQClientInstance實例。當key不存在時則會初始化一個實例,在初始化時連帶初始化MQClientAPIImpl、NettyRemoteClient等。這裡需要說明的是,RocketMQ中Consumer的消費模式分為CLUSTERING和BROADCASTING,即集群消費和廣播消費。區別在於集群消費時,一條消息只會被一個實例消費,即各實例會平分所有的消息;而廣播消費時所有實例都會收到同一條消息。體現在clientId的是,集群模式下instName為pid,而廣播模式instName為DEFAULT。
  • 設置RebalanceImpl屬性,包括所在Group、消費模式、消息分配策略(平均分配q的策略)
  • 初始化PlullAPIWrapper,設置消息過濾器鉤子列表
  • 初始化OffsetStore,設置offset的存儲模式,廣播模式使用本地存儲;集群模式使用遠程存儲
  • 初始化ConsumeMessageService,根據監聽器類型設定消息消費模式(順序消費/並行消費),pull模式需要自己指定offset,push不需要設定。
  • 啟動ConsumeMessageService
  • 同DefaultMQProducer一致,調用MQClientInstance的registerProducer方法,註冊當前客戶端自身。實現上是客戶端放入client實例緩存中,定時器定時上報,後面會說。
  • 調用MQClientInstance的start方法,啟動客戶端的後臺任務,該方法是重點,後面會介紹。
  • 標記客戶端當前狀態為RUNNING
  • 判斷監聽信息是否發生改變,從namesrv更新topic的路由信息
  • 調用MQClientInstance的checkClientInBroker方法,確認該實例已經在broker註冊成功,否則拋異常
  • 調用MQClientInstance的sendHeartbeatToAllBrokerWithLock方法,向所有Broker上報心跳
  • 調用MQClientInstance的rebalanceImmediately方法,觸發一次rebalance

 DefaultMQPushConsumer為推模式,RocketMQ還提供了拉模式來消費消息,實現類為DefaultMQPullConsumer,啟動過程類似,推模式是用拉模式來實現的,重點實現都在MQClientInstace中。

4. MQClientInstance

 MQClientInstance為一個門戶類,組合了各功能,如下,包括Rebalance、消費數據統計、生產消息、消費消息等,這些都有對應的實現。

RocketMQ客戶端加載流程

 上面說過,Producer和Consumer在啟動的時候,都會在內部先初始化一個MQClientInstance對象,然後調用其start方法啟動對應的後臺程序,如下:

RocketMQ客戶端加載流程

MQClientInstance的start方法除了調用自身進行準備工作外,也調用了其他組件的start方法開始它們的準備工作,主要流程為:

  1. 先標記客戶端當前狀態為START_FAILED(初始狀態為CREATE_JUST)
  • 若沒有指定nameserver地址,則調用MQClientAPIImpl同步獲取一次(通過設置的Http endpoint同步)
  • 調用MQClientAPIImpl的start方法,主要是初始化Netty客戶端,啟動netty client初始化任務,連接的建立發生在第一次請求時
  • 開啟MQClientInstance的定時任務,包括: 如果沒有指定nameserver地址,每兩分鐘從配置的endpoint處同步nameserver地址
  • 定時從namesrv同步topic路由信息
  • 定時清除下線的broker信息;發送心跳
  • 定時持久化消費者消費的offset信息
  • 每1分鐘調整線程池的大小 調用PullMessageService的start方法,啟動拉取消息線程 調用RebalanceService的start方法,啟動rebalance線程 調用內部Producer(CLIENT_INNER_PRODUCER)的start方法 標記客戶端當前狀態為RUNNING

下面詳細介紹下各個過程。

4.2. MQClientAPIImpl.fetchNameServerAddr

 該方法用於更新NameServer地址,該方法會從
http://xxx:port/rocketmq/yyy,默認8080端口(如果xxx中沒有:,即不帶端口時)中獲取NameServer地址(xxx為域名,由系統配置項rocketmq.namesrv.domain控制,默認為jmenv.tbsite.net;yyy為訪問路徑,由系統配置項
rocketmq.namesrv.domain.subgroup控制,默認為nsaddr)。該地址要求返回結果為一個ip列表,以;隔開,如果獲取回來的地址跟現有的地址不一致則會更新緩存的NameServer地址列表。解析出來的地址列表用於根據NettyRemotingClient內部持有的變量:

<code>private final AtomicReference> namesrvAddrList = new AtomicReference>();/<code>


4.3. MQClientAPIImpl.start

 該方法在內部調用了NettyRemotingClient的start方法,用於初始化Netty客戶端。NettyRemotingClient是基於Netty實現的tcp協議客戶端,主要流程為:

  • 初始化客戶端bootstrap連接池
  • 設置處理鏈:編碼、解碼、空閒處理、連接管理(服務端)、請求分發
  • 每3秒清除超時的請求(netty主線程不處理邏輯)
  • 啟動客戶端的事件處理器,處理IDLE、CLOSE、CONNECT、EXCEPTION事件

關於NettyRemotingClient後面會專門進行講解,這裡只介紹在客戶端啟動時其做了哪些動作。

4.4.2. MQClientInstance.updateTopicRouteInfoFromNameServer

 該方法用於根據客戶端實例關注的所有topic的路由信息,包括客戶端監聽的topic以及producer生產的topic。首先會遍歷從MQClientInstance內部的consumerTable和consumerTable的客戶端實例,拿到所有的topic信息,然後挨個更新topic的路由。

 同步topic路由時,會通過NettyRemotingClient選擇一個NameServer獲取topic路由信息,然後判斷topic信息是否發生了更改,主要比較topic所對應的Queue和Broker是否發生了更改。若路由信息發生了更改則會同步topic所在的broker地址列表,即內部的brokerAddrTable屬性;接著同步produer關注的topic路由信息,即producerTable屬性;接著同步consumer訂閱的topic路由信息,即consumerTable屬性;最後更新本地topic信息,即topicRouteTable屬性。

4.4.3. MQClientInstance.sendHeartbeatToAllBrokerWithLock

 該方法會遍歷MQClient所持有的各個producer和consumer,將客戶端信息構造為HeartbeatData對象,然後調用MQClientAPIImpl的sendHearbeat方法,向所有的broker上報心跳數據。心跳內容包括:

  • Consumer:所有Consumer的Group、消費類型、消息模式、消費起始offset、訂閱消息的篩選類型等
  • Producer:所有Producer的group
4.4.4. MQClientInstance.persistAllConsumerOffset

 該方法會遍歷consumerTable裡的所有MQConsumer對象,獲取每個隊列處理的MessageQueue,然後調用OffsetStore持久化所有的MessageQueue。OffsetStore後面會專門進行講解。

4.4.5. MQClientInstance.adjustThreadPool

 該方法主要是動態調整DefaultMQPushConsumerImpl(推模式)客戶端消費線程池的大小。前面說過推模式是通過包裝拉模式來實現的,內部都依賴PullAPIWrapper。實現上推模式多了一個ConsumeMessageService定時使用拉模式消費消息,該實現需要一個線程池,adjustThreadPool就是動態調整該線程池的大小。關於客戶端消費消息的過程,後面也會專門進行講解。

4.5. PullMessageService.start

 PullMessageService用於封裝拉模式以實現推模式。它會循環從內部的LinkedBlockingQueue 中拿出PullRequest對象(消費q消息封裝的對象),選取一個可用的客戶端實例DefaultMQPushConsumerImpl,調用其pullMessage方法.該方法會判斷消費進度,決定是立即消費還是延遲消費,如果是延遲消費則再放回LinkedBlockingQueue中等待消費;如果是直接消費,則調用PullMessageService(拉模式)的
executePullRequestImmediately消費消息.

 PullMessageService的基礎關係如下:

RocketMQ客戶端加載流程

PullMessageService.start內部主要是啟動線程,該線程會循環執行執行任務,具體實現會在後續介紹消息消費的時候提及。

4.6. RebalanceService.start

 該方法用於啟動rebalance任務。RebalanceService同PullMessageService相同,都繼承自ServiceThread類,,並實現了run方法。RebalanceService在run方法中等待一定時間(默認20S,可以通過
rocketmq.client.rebalance.waitInterval配置具體時間)後會調用
MQClientInstance.doRebalance執行具體的動作。具體實現會在後續介紹rebalance實現的時候提及。

4.7. DefaultMQPushConsumerImpl.start

 在上面2.時有提及該流程,這裡的DefaultMQPushConsumerImpl對象是Group為CLIENT_INNER_PRODUCER的內部對象。

 客戶端的啟動過程就如上面介紹,下面附上該部分當時源碼閱讀過程做的筆記簡圖,該圖描述了客戶端啟動過程的大致過程:

RocketMQ客戶端加載流程

更多原創內容請搜索微信公眾號:啊駝(doubaotaizi)


RocketMQ客戶端加載流程


分享到:


相關文章: