Flink 原理與實現:內存管理

如今,大數據領域的開源框架(Hadoop,Spark,Storm)都使用的 JVM,當然也包括 Flink。基於 JVM 的數據分析引擎都需要面對將大量數據存到內存中,這就不得不面對 JVM 存在的幾個問題:

  1. Java 對象存儲密度低。一個只包含 boolean 屬性的對象佔用了16個字節內存:對象頭佔了8個,boolean 屬性佔了1個,對齊填充佔了7個。而實際上只需要一個bit(1/8字節)就夠了。
  2. Full GC 會極大地影響性能,尤其是為了處理更大數據而開了很大內存空間的JVM來說,GC 會達到秒級甚至分鐘級。
  3. OOM 問題影響穩定性。OutOfMemoryError是分佈式計算框架經常會遇到的問題,當JVM中所有對象大小超過分配給JVM的內存大小時,就會發生OutOfMemoryError錯誤,導致JVM崩潰,分佈式框架的健壯性和性能都會受到影響。

所以目前,越來越多的大數據項目開始自己管理JVM內存了,像 Spark、Flink、HBase,為的就是獲得像 C 一樣的性能以及避免 OOM 的發生。本文將會討論 Flink 是如何解決上面的問題的,主要內容包括內存管理、定製的序列化工具、緩存友好的數據結構和算法、堆外內存、JIT編譯優化等。

積極的內存管理

Flink 並不是將大量對象存在堆上,而是將對象都序列化到一個預分配的內存塊上,這個內存塊叫做 MemorySegment,它代表了一段固定長度的內存(默認大小為 32KB),也是 Flink 中最小的內存分配單元,並且提供了非常高效的讀寫方法。你可以把 MemorySegment 想象成是為 Flink 定製的 java.nio.ByteBuffer。它的底層可以是一個普通的 Java 字節數組(byte[]),也可以是一個申請在堆外的 ByteBuffer。每條記錄都會以序列化的形式存儲在一個或多個MemorySegment中。

Flink 中的 Worker 名叫 TaskManager,是用來運行用戶代碼的 JVM 進程。TaskManager 的堆內存主要被分成了三個部分:

Flink 原理與實現:內存管理

  • ●Network Buffers: 一定數量的32KB大小的 buffer,主要用於數據的網絡傳輸。在 TaskManager 啟動的時候就會分配。默認數量是 2048 個,可以通過 taskmanager.network.numberOfBuffers 來配置。(閱讀這篇文章瞭解更多Network Buffer的管理)
  • ●Memory Manager Pool: 這是一個由 MemoryManager 管理的,由眾多MemorySegment組成的超大集合。Flink 中的算法(如 sort/shuffle/join)會向這個內存池申請 MemorySegment,將序列化後的數據存於其中,使用完後釋放回內存池。默認情況下,池子佔了堆內存的 70% 的大小。
  • ●Remaining (Free) Heap: 這部分的內存是留給用戶代碼以及 TaskManager 的數據結構使用的。因為這些數據結構一般都很小,所以基本上這些內存都是給用戶代碼使用的。從GC的角度來看,可以把這裡看成的新生代,也就是說這裡主要都是由用戶代碼生成的短期對象。

注意:Memory Manager Pool 主要在Batch模式下使用。在Steaming模式下,該池子不會預分配內存,也不會向該池子請求內存塊。也就是說該部分的內存都是可以給用戶代碼使用的。不過社區是打算在 Streaming 模式下也能將該池子利用起來。

Flink 採用類似 DBMS 的 sort 和 join 算法,直接操作二進制數據,從而使序列化/反序列化帶來的開銷達到最小。所以 Flink 的內部實現更像 C/C++ 而非 Java。如果需要處理的數據超出了內存限制,則會將部分數據存儲到硬盤上。如果要操作多塊MemorySegment就像操作一塊大的連續內存一樣,Flink會使用邏輯視圖(AbstractPagedInputView)來方便操作。下圖描述了 Flink 如何存儲序列化後的數據到內存塊中,以及在需要的時候如何將數據存儲到磁盤上。

從上面我們能夠得出 Flink 積極的內存管理以及直接操作二進制數據有以下幾點好處:

  1. 減少GC壓力。顯而易見,因為所有常駐型數據都以二進制的形式存在 Flink 的MemoryManager中,這些MemorySegment一直呆在老年代而不會被GC回收。其他的數據對象基本上是由用戶代碼生成的短生命週期對象,這部分對象可以被 Minor GC 快速回收。只要用戶不去創建大量類似緩存的常駐型對象,那麼老年代的大小是不會變的,Major GC也就永遠不會發生。從而有效地降低了垃圾回收的壓力。另外,這裡的內存塊還可以是堆外內存,這可以使得 JVM 內存更小,從而加速垃圾回收。
  2. 避免了OOM。所有的運行時數據結構和算法只能通過內存池申請內存,保證了其使用的內存大小是固定的,不會因為運行時數據結構和算法而發生OOM。在內存吃緊的情況下,算法(sort/join等)會高效地將一大批內存塊寫到磁盤,之後再讀回來。因此,OutOfMemoryErrors可以有效地被避免。
  3. 節省內存空間。Java 對象在存儲上有很多額外的消耗(如上一節所談)。如果只存儲實際數據的二進制內容,就可以避免這部分消耗。
  4. 高效的二進制操作 & 緩存友好的計算。二進制數據以定義好的格式存儲,可以高效地比較與操作。另外,該二進制形式可以把相關的值,以及hash值,鍵值和指針等相鄰地放進內存中。這使得數據結構可以對高速緩存更友好,可以從 L1/L2/L3 緩存獲得性能的提升(下文會詳細解釋)。

為 Flink 量身定製的序列化框架

目前 Java 生態圈提供了眾多的序列化框架:Java serialization, Kryo, Apache Avro 等等。但是 Flink 實現了自己的序列化框架。因為在 Flink 中處理的數據流通常是同一類型,由於數據集對象的類型固定,對於數據集可以只保存一份對象Schema信息,節省大量的存儲空間。同時,對於固定大小的類型,也可通過固定的偏移位置存取。當我們需要訪問某個對象成員變量的時候,通過定製的序列化工具,並不需要反序列化整個Java對象,而是可以直接通過偏移量,只是反序列化特定的對象成員變量。如果對象的成員變量較多時,能夠大大減少Java對象的創建開銷,以及內存數據的拷貝大小。

Flink支持任意的Java或是Scala類型。Flink 在數據類型上有很大的進步,不需要實現一個特定的接口(像Hadoop中的org.apache.hadoop.io.Writable),Flink 能夠自動識別數據類型。Flink 通過 Java Reflection 框架分析基於 Java 的 Flink 程序 UDF (User Define Function)的返回類型的類型信息,通過 Scala Compiler 分析基於 Scala 的 Flink 程序 UDF 的返回類型的類型信息。類型信息由 TypeInformation 類表示,TypeInformation 支持以下幾種類型:

  • ●BasicTypeInfo: 任意Java 基本類型(裝箱的)或 String 類型。
  • ●BasicArrayTypeInfo: 任意Java基本類型數組(裝箱的)或 String 數組。
  • ●WritableTypeInfo: 任意 Hadoop Writable 接口的實現類。
  • ●TupleTypeInfo: 任意的 Flink Tuple 類型(支持Tuple1 to Tuple25)。Flink tuples 是固定長度固定類型的Java Tuple實現。
  • ●CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)。
  • ●PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java對象的所有成員變量,要麼是 public 修飾符定義,要麼有 getter/setter 方法。
  • ●GenericTypeInfo: 任意無法匹配之前幾種類型的類。

前六種數據類型基本上可以滿足絕大部分的Flink程序,針對前六種類型數據集,Flink皆可以自動生成對應的TypeSerializer,能非常高效地對數據集進行序列化和反序列化。對於最後一種數據類型,Flink會使用Kryo進行序列化和反序列化。每個TypeInformation中,都包含了serializer,類型會自動通過serializer進行序列化,然後用Java Unsafe接口寫入MemorySegments。對於可以用作key的數據類型,Flink還同時自動生成TypeComparator,用來輔助直接對序列化後的二進制數據進行compare、hash等操作。對於 Tuple、CaseClass、POJO 等組合類型,其TypeSerializer和TypeComparator也是組合的,序列化和比較時會委託給對應的serializers和comparators。如下圖展示 一個內嵌型的Tuple3 對象的序列化過程。

Flink 原理與實現:內存管理

可以看出這種序列化方式存儲密度是相當緊湊的。其中 int 佔4字節,double 佔8字節,POJO多個一個字節的header,PojoSerializer只負責將header序列化進去,並委託每個字段對應的serializer對字段進行序列化。

Flink 的類型系統可以很輕鬆地擴展出自定義的TypeInformation、Serializer以及Comparator,來提升數據類型在序列化和比較時的性能。

Flink 如何直接操作二進制數據

Flink 提供瞭如 group、sort、join 等操作,這些操作都需要訪問海量數據。這裡,我們以sort為例,這是一個在 Flink 中使用非常頻繁的操作。

首先,Flink 會從 MemoryManager 中申請一批 MemorySegment,我們把這批 MemorySegment 稱作 sort buffer,用來存放排序的數據。

Flink 原理與實現:內存管理

我們會把 sort buffer 分成兩塊區域。一個區域是用來存放所有對象完整的二進制數據。另一個區域用來存放指向完整二進制數據的指針以及定長的序列化後的key(key+pointer)。如果需要序列化的key是個變長類型,如String,則會取其前綴序列化。如上圖所示,當一個對象要加到 sort buffer 中時,它的二進制數據會被加到第一個區域,指針(可能還有key)會被加到第二個區域。

將實際的數據和指針加定長key分開存放有兩個目的。第一,交換定長塊(key+pointer)更高效,不用交換真實的數據也不用移動其他key和pointer。第二,這樣做是緩存友好的,因為key都是連續存儲在內存中的,可以大大減少 cache miss(後面會詳細解釋)。

排序的關鍵是比大小和交換。Flink 中,會先用 key 比大小,這樣就可以直接用二進制的key比較而不需要反序列化出整個對象。因為key是定長的,所以如果key相同(或者沒有提供二進制key),那就必須將真實的二進制數據反序列化出來,然後再做比較。之後,只需要交換key+pointer就可以達到排序的效果,真實的數據不用移動。

Flink 原理與實現:內存管理

最後,訪問排序後的數據,可以沿著排好序的key+pointer區域順序訪問,通過pointer找到對應的真實數據,並寫到內存或外部(更多細節可以看這篇文章 Joins in Flink)。

緩存友好的數據結構和算法

隨著磁盤IO和網絡IO越來越快,CPU逐漸成為了大數據領域的瓶頸。從 L1/L2/L3 緩存讀取數據的速度比從主內存讀取數據的速度快好幾個量級。通過性能分析可以發現,CPU時間中的很大一部分都是浪費在等待數據從主內存過來上。如果這些數據可以從 L1/L2/L3 緩存過來,那麼這些等待時間可以極大地降低,並且所有的算法會因此而受益。

在上面討論中我們談到的,Flink 通過定製的序列化框架將算法中需要操作的數據(如sort中的key)連續存儲,而完整數據存儲在其他地方。因為對於完整的數據來說,key+pointer更容易裝進緩存,這大大提高了緩存命中率,從而提高了基礎算法的效率。這對於上層應用是完全透明的,可以充分享受緩存友好帶來的性能提升。

走向堆外內存

Flink 基於堆內存的內存管理機制已經可以解決很多JVM現存問題了,為什麼還要引入堆外內存?

  1. 啟動超大內存(上百GB)的JVM需要很長時間,GC停留時間也會很長(分鐘級)。使用堆外內存的話,可以極大地減小堆內存(只需要分配Remaining Heap那一塊),使得 TaskManager 擴展到上百GB內存不是問題。
  2. 高效的 IO 操作。堆外內存在寫磁盤或網絡傳輸時是 zero-copy,而堆內存的話,至少需要 copy 一次。
  3. 堆外內存是進程間共享的。也就是說,即使JVM進程崩潰也不會丟失數據。這可以用來做故障恢復(Flink暫時沒有利用起這個,不過未來很可能會去做)。

但是強大的東西總是會有其負面的一面,不然為何大家不都用堆外內存呢。

  1. 堆內存的使用、監控、調試都要簡單很多。堆外內存意味著更復雜更麻煩。
  2. Flink 有時需要分配短生命週期的 MemorySegment,這個申請在堆上會更廉價。
  3. 有些操作在堆內存上會快一點點。

Flink用通過ByteBuffer.allocateDirect(numBytes)來申請堆外內存,用 sun.misc.Unsafe 來操作堆外內存。

基於 Flink 優秀的設計,實現堆外內存是很方便的。Flink 將原來的 MemorySegment 變成了抽象類,並生成了兩個子類。HeapMemorySegment 和 HybridMemorySegment。從字面意思上也很容易理解,前者是用來分配堆內存的,後者是用來分配堆外內存和堆內存的。是的,你沒有看錯,後者既可以分配堆外內存又可以分配堆內存。為什麼要這樣設計呢?

首先假設HybridMemorySegment只提供分配堆外內存。在上述堆外內存的不足中的第二點談到,Flink 有時需要分配短生命週期的 buffer,這些buffer用HeapMemorySegment會更高效。那麼當使用堆外內存時,為了也滿足堆內存的需求,我們需要同時加載兩個子類。這就涉及到了 JIT 編譯優化的問題。因為以前 MemorySegment 是一個單獨的 final 類,沒有子類。JIT 編譯時,所有要調用的方法都是確定的,所有的方法調用都可以被去虛化(de-virtualized)和內聯(inlined),這可以極大地提高性能(MemroySegment的使用相當頻繁)。然而如果同時加載兩個子類,那麼 JIT 編譯器就只能在真正運行到的時候才知道是哪個子類,這樣就無法提前做優化。實際測試的性能差距在 2.7 被左右。

Flink 使用了兩種方案:

方案1:只能有一種 MemorySegment 實現被加載

代碼中所有的短生命週期和長生命週期的MemorySegment都實例化其中一個子類,另一個子類根本沒有實例化過(使用工廠模式來控制)。那麼運行一段時間後,JIT 會意識到所有調用的方法都是確定的,然後會做優化。

方案2:提供一種實現能同時處理堆內存和堆外內存

這就是 HybridMemorySegment 了,能同時處理堆與堆外內存,這樣就不需要子類了。這裡 Flink 優雅地實現了一份代碼能同時操作堆和堆外內存。這主要歸功於 sun.misc.Unsafe提供的一系列方法,如getLong方法:

sun.misc.Unsafe.getLong(Object reference, long offset)

  • ●如果reference不為空,則會取該對象的地址,加上後面的offset,從相對地址處取出8字節並得到 long。這對應了堆內存的場景。
  • ●如果reference為空,則offset就是要操作的絕對地址,從該地址處取出數據。這對應了堆外內存的場景。

這裡我們看下 MemorySegment 及其子類的實現。

public abstract class MemorySegment {

// 堆內存引用

protected final byte[] heapMemory;

// 堆外內存地址

protected long address;

//堆內存的初始化

MemorySegment(byte[] buffer, Object owner) {

//一些先驗檢查

...

this.heapMemory = buffer;

this.address = BYTE_ARRAY_BASE_OFFSET;

...

}

//堆外內存的初始化

MemorySegment(long offHeapAddress, int size, Object owner) {

//一些先驗檢查

...

this.heapMemory = null;

this.address = offHeapAddress;

...

}

public final long getLong(int index) {

final long pos = address + index;

if (index >= 0 && pos <= addressLimit - 8) {

// 這是我們關注的地方,使用 Unsafe 來操作 on-heap & off-heap

return UNSAFE.getLong(heapMemory, pos);

}

else if (address > addressLimit) {

throw new IllegalStateException("segment has been freed");

}

else {

// index is in fact invalid

throw new IndexOutOfBoundsException();

}

}

...

}

public final class HeapMemorySegment extends MemorySegment {

// 指向heapMemory的額外引用,用來如數組越界的檢查

private byte[] memory;

// 只能初始化堆內存

HeapMemorySegment(byte[] memory, Object owner) {

super(Objects.requireNonNull(memory), owner);

this.memory = memory;

}

...

}

public final class HybridMemorySegment extends MemorySegment {

private final ByteBuffer offHeapBuffer;

// 堆外內存初始化

HybridMemorySegment(ByteBuffer buffer, Object owner) {

super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);

this.offHeapBuffer = buffer;

}

// 堆內存初始化

HybridMemorySegment(byte[] buffer, Object owner) {

super(buffer, owner);

this.offHeapBuffer = null;

}

...

}

可以發現,HybridMemorySegment 中的很多方法其實都下沉到了父類去實現。包括堆內堆外內存的初始化。MemorySegment 中的 getXXX/putXXX 方法都是調用了 unsafe 方法,可以說MemorySegment已經具有了些 Hybrid 的意思了。HeapMemorySegment只調用了父類的MemorySegment(byte[] buffer, Object owner)方法,也就只能申請堆內存。另外,閱讀代碼你會發現,許多方法(大量的 getXXX/putXXX)都被標記成了 final,兩個子類也是 final 類型,為的也是優化 JIT 編譯器,會提醒 JIT 這個方法是可以被去虛化和內聯的。

對於堆外內存,使用 HybridMemorySegment 能同時用來代表堆和堆外內存。這樣只需要一個類就能代表長生命週期的堆外內存和短生命週期的堆內存。既然HybridMemorySegment已經這麼全能,為什麼還要方案1呢?因為我們需要工廠模式來保證只有一個子類被加載(為了更高的性能),而且HeapMemorySegment比heap模式的HybridMemorySegment要快。

下方是一些性能測試數據,更詳細的數據請參考這篇文章。

Flink 原理與實現:內存管理

總結

本文主要總結了 Flink 面對 JVM 存在的問題,而在內存管理的道路上越走越深。從自己管理內存,到序列化框架,再到堆外內存。其實縱觀大數據生態圈,其實會發現各個開源項目都有同樣的趨勢。比如最近炒的很火熱的 Spark Tungsten 項目,與 Flink 在內存管理上的思想是及其相似的。


分享到:


相關文章: