從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

上次的 Kafka 重啟失敗事件,對為什麼重啟失敗的原因似乎並沒有解釋清楚,那麼我就在這裡按照我對 Kafka 的認識,從源碼和日誌文件結構去嘗試尋找原因。

從源碼中定位到問題的根源

首先把導致 Kafka 進程退出的異常棧貼出來:

從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

注:以下源碼基於 kafka 0.11.0.2 版本。

我們直接從 index 文件損壞警告日誌的位置開始:

kafka.log.Log#loadSegmentFiles

從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

從前一篇文章中已經說到,Kafka 在啟動的時候,會檢查kafka是否為 cleanshutdown,判斷依據為 ${log.dirs} 目錄中是否存在 .kafka_cleanshutDown 的文件,如果非正常退出就沒有這個文件,接著就需要 recover log 處理,在處理中會調用 。

在 recover 前,會調用 sanityCheck() 方法用於檢驗每個 log sement 的 index 文件,確保索引文件的完整性 ,如果發現索引文件損壞,刪除並調用 recoverSegment() 方法進行索引文件的重構,最終會調用 recover() 方法:

kafka.log.LogSegment#recover

從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

源碼中相關變量說明:

  • log:當前日誌 Segment 文件的對象;
  • batchs:一個 log segment 的消息壓縮批次;
  • batch:消息壓縮批次;
  • indexIntervalBytes:該參數決定了索引文件稀疏間隔打底有多大,由 broker 端參數 log.index.interval.bytes 決定,默認值為 4 KB,即表示當前分區 log 文件寫入了 4 KB 數據後才會在索引文件中增加一個索引項(entry);
  • validBytes:當前消息批次在 log 文件中的物理地址。

知道相關參數的含義之後,那麼這段代碼的也就容易解讀了:循環讀取 log 文件中的消息批次,並讀取消息批次中的 baseOffset 以及在 log 文件中物理地址,將其追加到索引文件中,追加的間隔為 indexIntervalBytes 大小。

我們再來解讀下消息批次中的 baseOffset:

我們知道一批消息中,有最開頭的消息和末尾消息,所以一個消息批次中,分別有 baseOffset 和 lastOffset,源碼註釋如下:

從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

其中最關鍵的描述是:它可以是也可以不是第一條記錄的偏移量

kafka.log.OffsetIndex#append

從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

以上是追加索引塊核心方法,在這裡可以看到 Kafka 異常棧的詳細信息,Kafka 進程也就是在這裡被異常中斷退出的(這裡吐槽一下,為什麼一個分區有損壞,要整個 broker 掛掉?寧錯過,不放過?就不能標記該分區不能用,然後讓 broker 正常啟動以提供服務給其他分區嗎?建議 Kafka 在日誌恢復期間加強異常處理,不知道後續版本有沒有優化,後面等我拿 2.x 版本源碼分析一波

),退出的條件是:

<code>_entries == 0 || offset > _lastOffset = false
/<code>

也就是說,假設索引文件中的索引條目為 0,說明索引文件內容為空,那麼直接可以追加索引,而如果索引文件中有索引條目了,需要消息批次中的 baseOffset 大於索引文件最後一個條目中的位移,因為索引文件是遞增的,因此不允許比最後一個條目的索引還小的消息位移。

現在也就很好理解了,產生這個異常報錯的根本原因,是因為後面的消息批次中,有位移比最後索引位移還要小(或者等於)。

前面也說過了,消息批次中的 baseOffset 不一定是第一條記錄的偏移量,那麼問題是不是出在這裡?我的理解是這裡有可能會造成兩個消息批次獲取到的 baseOffset 有相交的值?對此我並沒有繼續研究下去了,但我確定的是,在 kafka 2.2.1 版本中,append() 方法中的 offset 已經改成 消息批次中的 lastOffset 了:

從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

這裡我也需要吐槽一下,**如果出現這個 bug,意味著這個問題除非是將這些故障的日誌文件和索引文件刪除,否則該節點永遠啟動不了,這也太暴力了吧?**我花了非常多時間去專門看了很多相關 issue,目前還沒看到有解決這個問題的方案?或者我需要繼續尋找?我把相關 issue 貼出來:

https://issues.apache.org/jira/browse/KAFKA-1211

https://issues.apache.org/jira/browse/KAFKA-3919

https://issues.apache.org/jira/browse/KAFKA-3955

嚴重建議各位儘快把 Kafka 版本升級到 2.x 版本,舊版本太多問題了,後面我著重研究 2.x 版本的源碼。

下面我從日誌文件結構中繼續分析。

從日誌文件結構中看到問題的本質

我們用 Kafka 提供的 DumpLogSegments 工具打開 log 和 index 文件:

<code>$ ~/kafka_2.11-0.11.0.2/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /dfs5/kafka/data/secLog-2/00000000000110325000.log > secLog.log

$ ~/kafka_2.11-0.11.0.2/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /dfs5/kafka/data/secLog-2/00000000000110325000.index > secLog-index.log
/<code>

用 less -Nm 命令查看,log 和 index 對比:

從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

如上圖所示,index最後記錄的 offset = 110756715,positioin=182484660,與異常棧顯示的一樣,說明在進行追加下一個索引塊的時候,發現下一個索引塊的 offset 索引不大於最後一個索引塊的 offset,因此不允許追加,報異常並退出進程,那麼問題就出現在下一個消息批次的 baseOffset,根據 log.index.interval.bytes 默認值大小為 4 KB(4096),而追加的條件前面也說了,需要大於 log.index.interval.bytes,因此我們 DumpLogSegments 工具查詢:

從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

從 dump 信息中可知,在 positioin=182484660 往後的幾個消息批次中,它們的大小加起來大於 4096 的消息批次的 offset=110756804,postion=182488996,它的 baseOffset 很可能就是 110756715,與索引文件最後一個索引塊的 Offset 相同,因此出現錯誤。

接著我們繼續用 DumpLogSegments 工具查看消息批次內容:

我們先查看 offset = 110756715,positioin=182484660 的消息塊詳情:

從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

接著尋找 offset = 110756715,的消息批次塊:

從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

終於找到你了,跟我預測的一樣!postion=182488996,在將該消息批次追加到索引文件中,發生 offset 混亂了。

總結

如果還是沒找到官方的處理方案,就只能刪除這些錯誤日誌文件和索引文件,然後重啟節點?非常遺憾,我在查看了相關的 issue 之後,貌似還沒看到官方的解決辦法,所幸的是該集群是日誌集群,數據丟失也沒有太大問題。

我也嘗試發送郵件給 Kafka 維護者,期待大佬的回應:

從源碼和日誌文件結構中分析 Kafka 重啟失敗事件

不過呢,0.11.0.2 版本屬於很舊的版本了,因此,升級 Kafka 版本才是長久之計啊!我已經迫不及待地想擼 kafka 源碼了!

在這個過程中,我學到了很多,同時也意識到想要繼續深入研究 Kafka,必須要學會 Scala,才能從源碼中一探 Kafka 的各種細節。

接下來我還要對關於 Kafka 分區不可用的一些思考,在下一篇章節會講到,敬請期待!


分享到:


相關文章: