1. 前言
本篇主要從源碼分析消息的發送及存儲。rocketmq發送消息分為三種實現方式:可靠同步發送、可靠異步發送、單向發送。目前的MQ中間件從存儲模型來看,分為需要持久化和不需要持久化兩種。本篇文章會分析rocketmq的消息存儲機制。
2. RocketMQ 消息
先看看rocketmq 消息封裝類org.apache.rocketmq.common.message.Message
基本屬性:主題topic、消息flag、消息體、擴展屬性
隱藏屬性:
- tag:消息TAG,用於消息過濾
- keys:消息索引鍵
- waitStoreMsgOK:消息發送時是否等消息存儲完成後再返回
- delayTimeLevel:消息延遲級別,用於定時消息或消息重試
擴展屬性都存在Message的properties中。
3. 生產者啟動流程
我們從DefaultMQProducerImpl 的start 方法追蹤。
第一步:檢查productGroup 是否符合要求,並改變生產者的instanceName為進程ID
<code>//DefaultMQProducerImpl::startpublic void start() throws MQClientException { this.start(true);//默認為true}public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } //第一步 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}/<code>
第二步:創建MQClientInstance實例。
第三步:向MQClientInstance註冊,將當前生產者加入MQClientInstance管理中,方便後續調用網絡請求、進行心跳檢測等。
第四步:啟動MQClientInstance,如果MQClientInstance已經啟動,如果已經啟動則本次不啟動。
4. 消息發送基本流程
消息發送流程主要是:驗證消息、查找路由、消息發送(包含異常處理機制)。
消息驗證,主要是進行消息的長度驗證,我們主要講解一下查找路由和消息發送。
4.1 查找路由
消息發送之前,首先需要獲取主題的路由信息
<code>//DefaultMQProducerImpl::tryToFindTopicPublishInfoprivate TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }/<code>
如果生產者緩存了該 topic 路由信息,包含了消息隊列,則直接返回該路由信息,如果沒有緩存或沒有包含消息隊列,則向NameServer查詢該topic的路由信息。如果是第一次發送消息,未找到會嘗試用默認topic去查詢。沒找到則報錯。
4.2 選擇消息
根據路由信息選擇消息隊列,返回的消息隊列按照broker、序號排序。首先消息發送採取重試機制,循環執行,選擇消息隊列、發送消息,發送成功則返回,發送失敗則重試。消息選擇有兩種方式。
- sendLatencyFaultEnable=false,默認機制
- sendLatencyFaultEnable=true,啟用Broker故障延遲機制
<code>//MQFaultStrategy::selectOneMessageQueuepublic MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName);}/<code>
4.3 消息發送
消息發送API核心入口:DefaultMQProducerImpl::sendKernelImpl
<code>private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//省略}/<code>
參數詳解:
- Message msg:待發送消息
- MessageQueue mq:消息將發送到消息隊列上
- CommunicationMode communicationMode:消息發送模式,SYNC、ASYNC、ONEWAY
- SendCallback sendCallback:異步消息回調函數
- TopicPublishInfo topicPublishInfo:主題路由消息
- long timeout:消息發送超時時間
發送步驟:
- 根據MessageQueue獲取Broker的網絡地址
- 為消息分配全局唯一ID
- 如果註冊了消息發送鉤子函數,則執行消息發送之前的增強邏輯
- 構建消息發送請求包
- 根據消息發送方式,同步、異步、單向方式進行網絡傳輸
- 如果註冊了消息發送鉤子函數,執行after邏輯
4.3.1 同步發送
MQ客戶端發送消息的入口是MQClientAPIImpl::sendMessage
同步發送步驟:
- 檢查消息發送是否合理
<code>//AbstractSendMessageProcessor::msgCheckprotected RemotingCommand msgCheck(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final RemotingCommand response) { if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); return response; } if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; log.warn(errorMsg); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorMsg); return response; } TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (null == topicConfig) { int topicSysFlag = 0; if (requestHeader.isUnitMode()) { if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } else { topicSysFlag = TopicSysFlag.buildSysFlag(true, false); } } log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress()); topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod( requestHeader.getTopic(), requestHeader.getDefaultTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getDefaultTopicQueueNums(), topicSysFlag); if (null == topicConfig) { if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } } if (null == topicConfig) { response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } } int queueIdInt = requestHeader.getQueueId(); int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums()); if (queueIdInt >= idValid) { String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s", queueIdInt, topicConfig.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); return response; } return response;}/<code>
- 如果消息重試次數超過允許的最大重試次數,消息將進入DLD延遲隊列
- 調用DefaultMessageStore::putMessage進行消息存儲
4.3.2 異步發送
異步發送,無須阻塞等待消息服務器返回消息發送結果,只需要提供一個回調函數供消息發送客戶端在收到響應結果回調。異步方式相比同步發送,發送端的發送性能提高了不少。
4.3.3 單向發送
單向發送,無須等待結果,也無須提供回調函數,消息發送端壓根不關心消息是否發送成功,原理和異步發送相同,只是消息發送端收到結果後什麼也不做。
4.3.4 批量發送
批量消息發送是將同一主題的多條信息一起打包發送給消息服務端,減少網絡調用次數,提高網絡傳輸速率。
單條消息發送時,消息體的內容將保存在body中,批量消息發送,需要將多條消息的內容存儲在body中,RocketMQ 對多條消息內容進行固定格式進行存儲。
批量發送:
<code>//DefaultMQProducer::sendpublic SendResult send( Collection<message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(batch(msgs));}/<message>/<code>
**發送流程:**首先在消息發送端,調用batch方法,將一批消息封裝成MessageBatch對象,MessageBatch內部持有Listmessages,這樣批量發送就和單條發送流程完全一樣了。
循跡一下:
<code>//DefaultMQProducer::sendpublic SendResult send( Collection<message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(batch(msgs));}//DefaultMQProducer::batchprivate MessageBatch batch(Collection<message> msgs) throws MQClientException { MessageBatch msgBatch; try { msgBatch = MessageBatch.generateFromList(msgs); for (Message message : msgBatch) { Validators.checkMessage(message, this); MessageClientIDSetter.setUniqID(message); message.setTopic(withNamespace(message.getTopic())); } msgBatch.setBody(msgBatch.encode()); } catch (Exception e) { throw new MQClientException("Failed to initiate the MessageBatch", e); } msgBatch.setTopic(withNamespace(msgBatch.getTopic())); return msgBatch;}//DefaultMQProducerImpl::sendpublic SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, this.defaultMQProducer.getSendMsgTimeout());}/<message>/<message>/<code>
5. 消息存儲
業務系統大多需要MQ有持久存儲的能力,能大大增加系統的高可用性。
我們先看看rocketmq 數據流向:
- CommitLog:消息存儲文件,所有消息主題的消息都存儲在CommitLog文件中
- ConsumeQueue:消息消費隊列,消息到達CommitLog文件後,將異步轉發到消息消費隊列,供消息消費者消費
- IndexFile:消息索引文件,主要存儲消息Key與Offset的對應關係
- 事務狀態服務:存儲每條消息的事務狀態
- 定時消息服務:每一個延遲級別對應一個消息消費隊列,存儲延遲隊列的消息拉去進度
RocketMQ的存儲架構:
消息存儲實現類: org.apache.rocketmq.store.DefaultMessageStore
介紹核心屬性:
- MessageStoreConfig messageStoreConfig:消息存儲配置屬性
- CommitLog commitLog:CommitLog 文件存儲的實現類
- ConcurrentMap<string>> consumeQueueTable :消息隊列存儲緩存表,按消息主題分組/<string>
- FlushConsumeQueueService flushConsumeQueueService:消息隊列文件ConsumeQueue刷盤線程
- CleanCommitLogService cleanCommitLogService:清除CommitLog問價服務
- CleanConsumeQueueService cleanConsumeQueueService:清除ConsumeQueue文件服務
- IndexService indexService:索引文件實現類
- AllocateMappedFileService allocateMappedFileService:MappedFile分配服務
- ReputMessageService reputMessageService:CommitLog消息分發,根據CommitLog文件構建ConsumeQueue、IndexFile文件
- HAService haService:存儲HA機制
- TransientStorePool transientStorePool:消息堆內存緩存
- MessageArrivingListener messageArrivingListener:消息拉取長輪詢模式消息達到監聽器
- BrokerConfig brokerConfig:Broker配置屬性
- StoreCheckpoint storeCheckpoint:文件刷盤檢測點
- LinkedList dispatcherList:CommitLog文件轉發請求
5.1 消息發送存儲流程
消息存儲入口:org.apache.rocketmq.store.DefaultMessageStore::putMessage
- 如果當前Broker停止工作或Broker為SLAVE 角色或當前Rocket不支持寫入則拒絕消息寫入,如果消息長度超過256字符、消息屬性長度超過65536個字符將拒絕該消息寫入
- 驗證消息延遲級別
- 獲取當前可以寫入的CommitLog文件
- 寫入CommitLog之前,先申請putMessageLock,也就是將消息存儲到CommitLog文件中是串行
- 設計消息的存儲時間
- 將消息追加到MappedFile中
- 創建全局唯一消息ID
- 獲取該消息在消息隊列的偏移量
- 根據消息體的長度、主題的長度、屬性的長度結合消息存儲格式計算消息的總長度
- 如果消息長度 +END_FILE_MIN_BLANK_LENGTH 大於CommitLog文件
- 將消息內存存儲到ByteBuffer中,然後創建AppendMessageResult。
- 更新消息隊列邏輯偏移量
- 處理完消息追加邏輯後將釋放putMessageLock鎖
- DefaultAppendMessageCallback::doAppend 只是將消息追加到內存中,需要根據同步刷盤還是異步刷盤方式,將內存中的數據持久化到磁盤
簡化成如下時序圖 :
5.2 內存映射流程
RocketMQ通過使用內存映射文件來提高IO訪問性能,無論是CommitLog、ConsumeQueue還是IndexFile,單個文件都被設計為固定長度,如果一個文件寫滿後再創建一個新文件,文件名就為第一條消息對應的全局物力偏移量。
步驟:
- 內存映射文件MappedFile通過AllocateMappedFileService創建
- MappedFile的創建是典型的生產者-消費者模型
- MappedFileQueue調用getLastMappedFile獲取MappedFile時,將請求放入隊列中
- AllocateMappedFileService線程持續監聽隊列,隊列有請求時,創建出MappedFile對象
- 最後將MappedFile對象預熱,底層調用force方法和mlock方法
5.3 刷盤流程
消息在調用MapedFile的appendMessage後,也只是將消息裝載到了ByteBuffer中,也就是內存中,還沒有落盤。落盤需要將內存flush到磁盤上,針對commitLog,rocketMQ提供了兩種落盤方式。
- producer發送給broker的消息保存在MappedFile中,然後通過刷盤機制同步到磁盤中
- 刷盤分為同步刷盤和異步刷盤
- 異步刷盤後臺線程按一定時間間隔執行
- 同步刷盤也是生產者-消費者模型。broker保存消息到MappedFile後,創建GroupCommitRequest請求放入列表,並阻塞等待。後臺線程從列表中獲取請求並刷新磁盤,成功刷盤後通知等待線程。
同步刷盤(CommitLog.java):
<code>//封裝的一次刷盤請求public static class GroupCommitRequest { //這次請求要刷到的offSet位置,比如已經刷到2, private final long nextOffset; //控制flush的拴 private final CountDownLatch countDownLatch = new CountDownLatch(1); private volatile boolean flushOK = false; public GroupCommitRequest(long nextOffset) { this.nextOffset = nextOffset; } public long getNextOffset() { return nextOffset; } //刷完了喚醒 public void wakeupCustomer(final boolean flushOK) { this.flushOK = flushOK; this.countDownLatch.countDown(); } public boolean waitForFlush(long timeout) { try { this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); return this.flushOK; } catch (InterruptedException e) { e.printStackTrace(); return false; } }}/** * GroupCommit Service * 批量刷盤服務 */class GroupCommitService extends FlushCommitLogService { //用來接收消息的隊列,提供寫消息 private volatile List<groupcommitrequest> requestsWrite = new ArrayList<groupcommitrequest>(); //用來讀消息的隊列,將消息從內存讀到硬盤 private volatile List<groupcommitrequest> requestsRead = new ArrayList<groupcommitrequest>(); //添加一個刷盤的request public void putRequest(final GroupCommitRequest request) { synchronized (this) { //添加到寫消息的list中 this.requestsWrite.add(request); //喚醒其他線程 if (!this.hasNotified) { this.hasNotified = true; this.notify(); } } } //交換讀寫隊列,避免上鎖 private void swapRequests() { List<groupcommitrequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } private void doCommit() { //讀隊列不為空 if (!this.requestsRead.isEmpty()) { //遍歷 for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; (i < 2) && !flushOK; i++) { // flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset()); //如果沒刷完 即flushOK為false則繼續刷 if (!flushOK) { CommitLog.this.mapedFileQueue.commit(0); } } //刷完了喚醒 req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } //清空讀list this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mapedFileQueue.commit(0); } } public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStoped()) { try { this.waitForRunning(0); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush //正常關閉時要把沒刷完的刷完 try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { this.swapRequests(); } this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); } }/<groupcommitrequest>/<groupcommitrequest>/<groupcommitrequest>/<groupcommitrequest>/<groupcommitrequest>/<code>
異步刷盤(CommitLog.java):
<code>public void run() { CommitLog.log.info(this.getServiceName() + " service started"); //不停輪詢 while (!this.isStoped()) { boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); //拿到要刷盤的頁數 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); //控制刷盤間隔,如果當前的時間還沒到刷盤的間隔時間則不刷 if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = ((printTimes++ % 10) == 0); } try { //是否需要刷盤休眠 if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); } if (printFlushProgress) { this.printFlushProgress(); } //commit開始刷盤 CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RetryTimesOver && !result; i++) { result = CommitLog.this.mapedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end");}/<code>
6. 小結
小結
消息發送流程圖:
消息存儲流程圖:
閱讀更多 來一杯82年的Java 的文章