RocketMQ 生產者和消息存儲

1. 前言

本篇主要從源碼分析消息的發送及存儲。rocketmq發送消息分為三種實現方式:可靠同步發送、可靠異步發送、單向發送。目前的MQ中間件從存儲模型來看,分為需要持久化和不需要持久化兩種。本篇文章會分析rocketmq的消息存儲機制。

2. RocketMQ 消息

先看看rocketmq 消息封裝類org.apache.rocketmq.common.message.Message


RocketMQ 生產者和消息存儲


基本屬性:主題topic、消息flag、消息體、擴展屬性

隱藏屬性:

  • tag:消息TAG,用於消息過濾
  • keys:消息索引鍵
  • waitStoreMsgOK:消息發送時是否等消息存儲完成後再返回
  • delayTimeLevel:消息延遲級別,用於定時消息或消息重試

擴展屬性都存在Message的properties中。

3. 生產者啟動流程

我們從DefaultMQProducerImpl 的start 方法追蹤。

第一步:檢查productGroup 是否符合要求,並改變生產者的instanceName為進程ID

<code>//DefaultMQProducerImpl::startpublic void start() throws MQClientException {    this.start(true);//默認為true}public void start(final boolean startFactory) throws MQClientException {    switch (this.serviceState) {        case CREATE_JUST:            this.serviceState = ServiceState.START_FAILED;            this.checkConfig();            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {                this.defaultMQProducer.changeInstanceNameToPID();            }            //第一步            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);            if (!registerOK) {                this.serviceState = ServiceState.CREATE_JUST;                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                    null);            }            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());            if (startFactory) {                mQClientFactory.start();            }            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),                this.defaultMQProducer.isSendMessageWithVIPChannel());            this.serviceState = ServiceState.RUNNING;            break;        case RUNNING:        case START_FAILED:        case SHUTDOWN_ALREADY:            throw new MQClientException("The producer service state not OK, maybe started once, "                + this.serviceState                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),                null);        default:            break;    }    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}/<code>

第二步:創建MQClientInstance實例。

第三步:向MQClientInstance註冊,將當前生產者加入MQClientInstance管理中,方便後續調用網絡請求、進行心跳檢測等。

第四步:啟動MQClientInstance,如果MQClientInstance已經啟動,如果已經啟動則本次不啟動。

4. 消息發送基本流程

消息發送流程主要是:驗證消息、查找路由、消息發送(包含異常處理機制)。

消息驗證,主要是進行消息的長度驗證,我們主要講解一下查找路由和消息發送。

4.1 查找路由

消息發送之前,首先需要獲取主題的路由信息

<code>//DefaultMQProducerImpl::tryToFindTopicPublishInfoprivate TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);        if (null == topicPublishInfo || !topicPublishInfo.ok()) {            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);            topicPublishInfo = this.topicPublishInfoTable.get(topic);        }        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {            return topicPublishInfo;        } else {            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);            topicPublishInfo = this.topicPublishInfoTable.get(topic);            return topicPublishInfo;        }    }/<code>

如果生產者緩存了該 topic 路由信息,包含了消息隊列,則直接返回該路由信息,如果沒有緩存或沒有包含消息隊列,則向NameServer查詢該topic的路由信息。如果是第一次發送消息,未找到會嘗試用默認topic去查詢。沒找到則報錯。

4.2 選擇消息

根據路由信息選擇消息隊列,返回的消息隊列按照broker、序號排序。首先消息發送採取重試機制,循環執行,選擇消息隊列、發送消息,發送成功則返回,發送失敗則重試。消息選擇有兩種方式。

  • sendLatencyFaultEnable=false,默認機制
  • sendLatencyFaultEnable=true,啟用Broker故障延遲機制
<code>//MQFaultStrategy::selectOneMessageQueuepublic MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {    if (this.sendLatencyFaultEnable) {        try {            int index = tpInfo.getSendWhichQueue().getAndIncrement();            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                if (pos < 0)                    pos = 0;                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                        return mq;                }            }            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);            if (writeQueueNums > 0) {                final MessageQueue mq = tpInfo.selectOneMessageQueue();                if (notBestBroker != null) {                    mq.setBrokerName(notBestBroker);                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                }                return mq;            } else {                latencyFaultTolerance.remove(notBestBroker);            }        } catch (Exception e) {            log.error("Error occurred when selecting message queue", e);        }        return tpInfo.selectOneMessageQueue();    }    return tpInfo.selectOneMessageQueue(lastBrokerName);}/<code>

4.3 消息發送

消息發送API核心入口:DefaultMQProducerImpl::sendKernelImpl

<code>private SendResult sendKernelImpl(final Message msg,                                    final MessageQueue mq,                                    final CommunicationMode communicationMode,                                    final SendCallback sendCallback,                                    final TopicPublishInfo topicPublishInfo,                                    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//省略}/<code>

參數詳解:

  • Message msg:待發送消息
  • MessageQueue mq:消息將發送到消息隊列上
  • CommunicationMode communicationMode:消息發送模式,SYNC、ASYNC、ONEWAY
  • SendCallback sendCallback:異步消息回調函數
  • TopicPublishInfo topicPublishInfo:主題路由消息
  • long timeout:消息發送超時時間

發送步驟:

  1. 根據MessageQueue獲取Broker的網絡地址
  2. 為消息分配全局唯一ID
  3. 如果註冊了消息發送鉤子函數,則執行消息發送之前的增強邏輯
  4. 構建消息發送請求包
  5. 根據消息發送方式,同步、異步、單向方式進行網絡傳輸
  6. 如果註冊了消息發送鉤子函數,執行after邏輯

4.3.1 同步發送

MQ客戶端發送消息的入口是MQClientAPIImpl::sendMessage

同步發送步驟

  1. 檢查消息發送是否合理
<code>//AbstractSendMessageProcessor::msgCheckprotected RemotingCommand msgCheck(final ChannelHandlerContext ctx,    final SendMessageRequestHeader requestHeader, final RemotingCommand response) {    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())        && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {        response.setCode(ResponseCode.NO_PERMISSION);        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()            + "] sending message is forbidden");        return response;    }    if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {        String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";        log.warn(errorMsg);        response.setCode(ResponseCode.SYSTEM_ERROR);        response.setRemark(errorMsg);        return response;    }    TopicConfig topicConfig =        this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());    if (null == topicConfig) {        int topicSysFlag = 0;        if (requestHeader.isUnitMode()) {            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);            } else {                topicSysFlag = TopicSysFlag.buildSysFlag(true, false);            }        }        log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(            requestHeader.getTopic(),            requestHeader.getDefaultTopic(),            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),            requestHeader.getDefaultTopicQueueNums(), topicSysFlag);        if (null == topicConfig) {            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                topicConfig =                    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(                        requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,                        topicSysFlag);            }        }        if (null == topicConfig) {            response.setCode(ResponseCode.TOPIC_NOT_EXIST);            response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));            return response;        }    }    int queueIdInt = requestHeader.getQueueId();    int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());    if (queueIdInt >= idValid) {        String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",            queueIdInt,            topicConfig.toString(),            RemotingHelper.parseChannelRemoteAddr(ctx.channel()));        log.warn(errorInfo);        response.setCode(ResponseCode.SYSTEM_ERROR);        response.setRemark(errorInfo);        return response;    }    return response;}/<code>
  1. 如果消息重試次數超過允許的最大重試次數,消息將進入DLD延遲隊列
  2. 調用DefaultMessageStore::putMessage進行消息存儲

4.3.2 異步發送

異步發送,無須阻塞等待消息服務器返回消息發送結果,只需要提供一個回調函數供消息發送客戶端在收到響應結果回調。異步方式相比同步發送,發送端的發送性能提高了不少。

4.3.3 單向發送

單向發送,無須等待結果,也無須提供回調函數,消息發送端壓根不關心消息是否發送成功,原理和異步發送相同,只是消息發送端收到結果後什麼也不做。

4.3.4 批量發送

批量消息發送是將同一主題的多條信息一起打包發送給消息服務端,減少網絡調用次數,提高網絡傳輸速率。

單條消息發送時,消息體的內容將保存在body中,批量消息發送,需要將多條消息的內容存儲在body中,RocketMQ 對多條消息內容進行固定格式進行存儲。


RocketMQ 生產者和消息存儲


批量發送:

<code>//DefaultMQProducer::sendpublic SendResult send(    Collection<message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    return this.defaultMQProducerImpl.send(batch(msgs));}/<message>/<code>

**發送流程:**首先在消息發送端,調用batch方法,將一批消息封裝成MessageBatch對象,MessageBatch內部持有Listmessages,這樣批量發送就和單條發送流程完全一樣了。


RocketMQ 生產者和消息存儲


循跡一下:

<code>//DefaultMQProducer::sendpublic SendResult send(    Collection<message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    return this.defaultMQProducerImpl.send(batch(msgs));}//DefaultMQProducer::batchprivate MessageBatch batch(Collection<message> msgs) throws MQClientException {    MessageBatch msgBatch;    try {        msgBatch = MessageBatch.generateFromList(msgs);        for (Message message : msgBatch) {            Validators.checkMessage(message, this);            MessageClientIDSetter.setUniqID(message);            message.setTopic(withNamespace(message.getTopic()));        }        msgBatch.setBody(msgBatch.encode());    } catch (Exception e) {        throw new MQClientException("Failed to initiate the MessageBatch", e);    }    msgBatch.setTopic(withNamespace(msgBatch.getTopic()));    return msgBatch;}//DefaultMQProducerImpl::sendpublic SendResult send(    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    return send(msg, this.defaultMQProducer.getSendMsgTimeout());}/<message>/<message>/<code>

5. 消息存儲

業務系統大多需要MQ有持久存儲的能力,能大大增加系統的高可用性。

我們先看看rocketmq 數據流向:


RocketMQ 生產者和消息存儲


  • CommitLog:消息存儲文件,所有消息主題的消息都存儲在CommitLog文件中
  • ConsumeQueue:消息消費隊列,消息到達CommitLog文件後,將異步轉發到消息消費隊列,供消息消費者消費
  • IndexFile:消息索引文件,主要存儲消息Key與Offset的對應關係
  • 事務狀態服務:存儲每條消息的事務狀態
  • 定時消息服務:每一個延遲級別對應一個消息消費隊列,存儲延遲隊列的消息拉去進度

RocketMQ的存儲架構:


RocketMQ 生產者和消息存儲


消息存儲實現類: org.apache.rocketmq.store.DefaultMessageStore

RocketMQ 生產者和消息存儲

介紹核心屬性:

  • MessageStoreConfig messageStoreConfig:消息存儲配置屬性
  • CommitLog commitLog:CommitLog 文件存儲的實現類
  • ConcurrentMap<string>> consumeQueueTable :消息隊列存儲緩存表,按消息主題分組/<string>
  • FlushConsumeQueueService flushConsumeQueueService:消息隊列文件ConsumeQueue刷盤線程
  • CleanCommitLogService cleanCommitLogService:清除CommitLog問價服務
  • CleanConsumeQueueService cleanConsumeQueueService:清除ConsumeQueue文件服務
  • IndexService indexService:索引文件實現類
  • AllocateMappedFileService allocateMappedFileService:MappedFile分配服務
  • ReputMessageService reputMessageService:CommitLog消息分發,根據CommitLog文件構建ConsumeQueue、IndexFile文件
  • HAService haService:存儲HA機制
  • TransientStorePool transientStorePool:消息堆內存緩存
  • MessageArrivingListener messageArrivingListener:消息拉取長輪詢模式消息達到監聽器
  • BrokerConfig brokerConfig:Broker配置屬性
  • StoreCheckpoint storeCheckpoint:文件刷盤檢測點
  • LinkedList dispatcherList:CommitLog文件轉發請求

5.1 消息發送存儲流程

消息存儲入口:org.apache.rocketmq.store.DefaultMessageStore::putMessage

  1. 如果當前Broker停止工作或Broker為SLAVE 角色或當前Rocket不支持寫入則拒絕消息寫入,如果消息長度超過256字符、消息屬性長度超過65536個字符將拒絕該消息寫入
  2. 驗證消息延遲級別
  3. 獲取當前可以寫入的CommitLog文件
  4. 寫入CommitLog之前,先申請putMessageLock,也就是將消息存儲到CommitLog文件中是串行
  5. 設計消息的存儲時間
  6. 將消息追加到MappedFile中
  7. 創建全局唯一消息ID
  8. 獲取該消息在消息隊列的偏移量
  9. 根據消息體的長度、主題的長度、屬性的長度結合消息存儲格式計算消息的總長度
  10. 如果消息長度 +END_FILE_MIN_BLANK_LENGTH 大於CommitLog文件
  11. 將消息內存存儲到ByteBuffer中,然後創建AppendMessageResult。
  12. 更新消息隊列邏輯偏移量
  13. 處理完消息追加邏輯後將釋放putMessageLock鎖
  14. DefaultAppendMessageCallback::doAppend 只是將消息追加到內存中,需要根據同步刷盤還是異步刷盤方式,將內存中的數據持久化到磁盤

簡化成如下時序圖


RocketMQ 生產者和消息存儲


5.2 內存映射流程

RocketMQ通過使用內存映射文件來提高IO訪問性能,無論是CommitLog、ConsumeQueue還是IndexFile,單個文件都被設計為固定長度,如果一個文件寫滿後再創建一個新文件,文件名就為第一條消息對應的全局物力偏移量。


RocketMQ 生產者和消息存儲


步驟:

  1. 內存映射文件MappedFile通過AllocateMappedFileService創建
  2. MappedFile的創建是典型的生產者-消費者模型
  3. MappedFileQueue調用getLastMappedFile獲取MappedFile時,將請求放入隊列中
  4. AllocateMappedFileService線程持續監聽隊列,隊列有請求時,創建出MappedFile對象
  5. 最後將MappedFile對象預熱,底層調用force方法和mlock方法

5.3 刷盤流程

消息在調用MapedFile的appendMessage後,也只是將消息裝載到了ByteBuffer中,也就是內存中,還沒有落盤。落盤需要將內存flush到磁盤上,針對commitLog,rocketMQ提供了兩種落盤方式。


RocketMQ 生產者和消息存儲


  • producer發送給broker的消息保存在MappedFile中,然後通過刷盤機制同步到磁盤中
  • 刷盤分為同步刷盤和異步刷盤
  • 異步刷盤後臺線程按一定時間間隔執行
  • 同步刷盤也是生產者-消費者模型。broker保存消息到MappedFile後,創建GroupCommitRequest請求放入列表,並阻塞等待。後臺線程從列表中獲取請求並刷新磁盤,成功刷盤後通知等待線程。

同步刷盤(CommitLog.java):

<code>//封裝的一次刷盤請求public static class GroupCommitRequest {    //這次請求要刷到的offSet位置,比如已經刷到2,    private final long nextOffset;    //控制flush的拴    private final CountDownLatch countDownLatch = new CountDownLatch(1);    private volatile boolean flushOK = false;    public GroupCommitRequest(long nextOffset) {        this.nextOffset = nextOffset;    }    public long getNextOffset() {        return nextOffset;    }    //刷完了喚醒    public void wakeupCustomer(final boolean flushOK) {        this.flushOK = flushOK;        this.countDownLatch.countDown();    }    public boolean waitForFlush(long timeout) {        try {            this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);            return this.flushOK;        } catch (InterruptedException e) {            e.printStackTrace();            return false;        }    }}/**    * GroupCommit Service    * 批量刷盤服務    */class GroupCommitService extends FlushCommitLogService {    //用來接收消息的隊列,提供寫消息    private volatile List<groupcommitrequest> requestsWrite = new ArrayList<groupcommitrequest>();    //用來讀消息的隊列,將消息從內存讀到硬盤    private volatile List<groupcommitrequest> requestsRead = new ArrayList<groupcommitrequest>();    //添加一個刷盤的request    public void putRequest(final GroupCommitRequest request) {        synchronized (this) {            //添加到寫消息的list中            this.requestsWrite.add(request);            //喚醒其他線程            if (!this.hasNotified) {                this.hasNotified = true;                this.notify();            }        }    }    //交換讀寫隊列,避免上鎖    private void swapRequests() {        List<groupcommitrequest> tmp = this.requestsWrite;        this.requestsWrite = this.requestsRead;        this.requestsRead = tmp;    }    private void doCommit() {        //讀隊列不為空        if (!this.requestsRead.isEmpty()) {            //遍歷            for (GroupCommitRequest req : this.requestsRead) {                // There may be a message in the next file, so a maximum of                // two times the flush                boolean flushOK = false;                for (int i = 0; (i < 2) && !flushOK; i++) {                    //                    flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset());                    //如果沒刷完 即flushOK為false則繼續刷                    if (!flushOK) {                        CommitLog.this.mapedFileQueue.commit(0);                    }                }                //刷完了喚醒                req.wakeupCustomer(flushOK);            }            long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();            if (storeTimestamp > 0) {                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);            }            //清空讀list            this.requestsRead.clear();        } else {            // Because of individual messages is set to not sync flush, it            // will come to this process            CommitLog.this.mapedFileQueue.commit(0);        }    }    public void run() {        CommitLog.log.info(this.getServiceName() + " service started");        while (!this.isStoped()) {            try {                this.waitForRunning(0);                this.doCommit();            } catch (Exception e) {                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);            }        }        // Under normal circumstances shutdown, wait for the arrival of the        // request, and then flush        //正常關閉時要把沒刷完的刷完        try {            Thread.sleep(10);        } catch (InterruptedException e) {            CommitLog.log.warn("GroupCommitService Exception, ", e);        }        synchronized (this) {            this.swapRequests();        }        this.doCommit();        CommitLog.log.info(this.getServiceName() + " service end");    }    }/<groupcommitrequest>/<groupcommitrequest>/<groupcommitrequest>/<groupcommitrequest>/<groupcommitrequest>/<code>

異步刷盤(CommitLog.java):

<code>public void run() {    CommitLog.log.info(this.getServiceName() + " service started");    //不停輪詢    while (!this.isStoped()) {        boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();        //拿到要刷盤的頁數        int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();        int flushPhysicQueueThoroughInterval =                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();        boolean printFlushProgress = false;        // Print flush progress        long currentTimeMillis = System.currentTimeMillis();        //控制刷盤間隔,如果當前的時間還沒到刷盤的間隔時間則不刷        if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {            this.lastFlushTimestamp = currentTimeMillis;            flushPhysicQueueLeastPages = 0;            printFlushProgress = ((printTimes++ % 10) == 0);        }        try {            //是否需要刷盤休眠            if (flushCommitLogTimed) {                Thread.sleep(interval);            } else {                this.waitForRunning(interval);            }            if (printFlushProgress) {                this.printFlushProgress();            }            //commit開始刷盤            CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);            long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();            if (storeTimestamp > 0) {                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);            }        } catch (Exception e) {            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);            this.printFlushProgress();        }    }    // Normal shutdown, to ensure that all the flush before exit    boolean result = false;    for (int i = 0; i < RetryTimesOver && !result; i++) {        result = CommitLog.this.mapedFileQueue.commit(0);        CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));    }    this.printFlushProgress();    CommitLog.log.info(this.getServiceName() + " service end");}/<code>

6. 小結

小結

消息發送流程圖:


RocketMQ 生產者和消息存儲


消息存儲流程圖:


RocketMQ 生產者和消息存儲


分享到:


相關文章: