餓了麼輕量級分布式時序資料庫的設計與探索

作者介紹黃傑,2015年加入餓了麼,現任框架工具部高級開發經理,主要負責餓了麼的監控系統及監控系統周邊的工具。

一、背景

餓了麼對時序數據庫的需求主要來自各監控系統,主要用於存儲監控指標。原來使用的是graphite,後來慢慢對指標有了多維的需求,主要體現在對一個指標加多個Tag來組成Series,然後對Tag進行Filter和Group進行計算,這時graphite基本很難滿足需求。

業界現在用的比較多的主要有如下幾類TSDB:

  • InfluxDB:很多公司都在用,包括餓了麼有部分監控系統也是用的InfluxDB。其優點在於支持多維和多字段,存儲也根據TSDB的特點做了優化,不過開源的部分並不支持。很多公司自己做集群化,但大多基於指標名來,這樣就會有單指的熱點問題。現在餓了麼也是類似的做法,但熱點問題很嚴重,大的指標已經用了最好的服務器,可查詢性能還是不夠理想,如果做成按Series Sharding,那成本還是有一點高;
  • Graphite:根據指標寫入及查詢,計算函數很多,但很難支持多維,包括機房或多集群的查詢。原來餓了麼把業務層的監控指標存儲在Graphite中,並工作的很好,不過多活之後基本已經很難滿足一些需求了,由於其存儲結構的特點,很佔IO,根據目前線上的數據寫放大差不多幾十倍以上;
  • OpenTSDB:基於HBase,優點在於存儲層不用自己考慮,做好查詢聚合就可以,也會存在HBase的熱點問題等。在以前公司也用基於HBase實現的TSDB來解決OpenTSDB的一些問題, 如熱點、部分查詢聚合下放到HBase等,目的是優化其查詢性能,但依賴HBase/HDFS還是很重;
  • HiTSDB:阿里提供的TSDB,存儲也是用HBase,在數據結構及Index上面做了很多優化,具體沒有研究,有興趣的同學可以在阿里雲上試一下;
  • Druid:Druid其實是一個OLAP系統,但也可以用來存儲時間序列數據,不過看到它的架構圖時已經放棄了;
  • ElasticSearch(ES):也有公司直接用ES來存儲,沒有實際測試,但總覺得ES不是一個真正的TSDB;
  • atlas:Netflix出品,全內存TSDB,最近幾小時數據全在內存中,歷史數據需要外部存儲,具體沒有詳細研究;
  • beringei:Facebook出品,同樣是全內存TSDB,最近的數據也在內存,目前應該還在孵化期。

最終我們還是決定自己實現一套分佈式時序數據庫,具體需要解決如下問題:

  • 輕量,目前只依賴於Zookeeper;
  • 基於Series進行Sharding,解決熱點,可以真正水平擴展;
  • 實時寫入、實時查詢,由於大多用於監控系統,所以查詢性能要好;
  • 由於餓了麼目前是多活,監控系統也是多活,所以要支持單機房寫入,多機房聚合查詢等;
  • 要有自動的Rollup功能,如用戶可以寫10s的精度,系統自動Rollup到分鐘、小時、天級別,以支持大時間範圍的查詢,如報表等;
  • 支持類SQL的查詢方式;
  • 支持多副本,以提高整個系統的可靠性,只要還有一個副本存活就可以正常提供服務,副本數指定。

二、整體設計

採用計算和存儲分離的架構,分為計算層LinProxy和存儲層LinStorage。

餓了麼輕量級分佈式時序數據庫的設計與探索

說明:

  • LinProxy主要做一些SQL的解析及一些中間結合的再聚合計算,如果不是跨集群,LinProxy可以不需要,對於單集群的每個節點都內嵌了一個LinProxy來提供查詢服務;
  • LinDB Client主要用於數據的寫入,也有一些查詢的API;
  • LinStorage的每個節點組成一個集群,節點之間進行復制,並有副本的Leader節點提供讀寫服務,這點設計主要是參考Kafka的設計,可以把LinDB理解成類Kafka的數據寫入複製+底層時間序列的存儲層;
  • LinMaster主要負責database、shard、replica的分配,所以LinStorage存儲的調度及MetaData(目前存儲Zookeeper中)的管理;由於LinStorage Node都是對等的,所以我們基於Zookeeper在集群的節點選一個成為Master,每個Node把自身的狀態以心跳的方式上報到Master上,Master根據這些狀態進行調度,如果Master掛了,自動再選一個Master出來,這個過程基本對整個服務是無損的,所以用戶基本無感知。

1、寫入

整個寫過程分為如下2部分組成:

  • WAL複製,這部分設計上參考了Kafka,用戶的寫入只要寫入WAL成功,就認為成功(由於主要用於監控系統,所以對數據的一致性沒有做太多的保證),這樣就可以提供系統的寫入吞吐;
  • 本地寫入,這個過程是把WAL的數據解析寫入到自己的存儲結構中,只有寫入本地存儲的數據才可以查到。

整個過程不像一些系統在每次寫的過程中完成,我們是把這個過程分2步,並異步化了。

WAL複製

目前LinDB的replica複製協議採用多通道複製協議,主要基於WAL在多節點之間的複製,WAL在每個節點上的寫入有獨立的寫操作完成,所以對於Client寫入對應Leader的WAL成功就認為本次寫操作是成功的,Leader所在的節點負責把相應的WAL複製到對應的Follower,同理寫WAL成功認為複製成功,如下所示:

餓了麼輕量級分佈式時序數據庫的設計與探索

多通道複製協議

寫入Leader副本成功就算成功,提高了寫入速率,也帶來了以下問題:

  • 數據一致性的問題;
  • 數據的丟失問題。

以上圖Server1為Leader,3個Replication來複制1-WAL為例來說:

當前Server1是該shard的Leader接受Client的寫入,Server2和Server3都是Follower接受Server1的複製請求,此時1-WAL通道作為當前的數據寫入通道,Server2和Server3可能落後於Server1。

餓了麼輕量級分佈式時序數據庫的設計與探索

說明:

整個過程需要注意以下幾個Index:

  • Client寫入時的Append Index,表示當前Client寫入到哪裡;
  • 對應每個Follower都會有一個Replica Index,表示對應Follower消費Leader上面同步到哪裡;
  • Follower的Ack Index,表示Follower已經成功複製到本地的WAL;
  • 對於Follower的複製請求,其實相當於一個特殊Client的寫入,所以也有一個對應的Append Index。

只有被Ack過的Index,才標示為已經處理完成,對於Leader來說,小於最小的Ack Index的WAL數據是可以被刪除。在這個過程中,如果Server2或者Server3中有一臺出問題,這時對應的Consume Index不會移動,只有等到相應服務恢復之後才會繼續處理。

在整個過程中可能出現如下情況:

  • Leader Replica Index > Follower Append Index,這時需要根據Follower Append Index重置Leader Replica Index,可能存在2種情況,具體情況在複製順序性中描述;
  • Leader Replica Index < Follower Append Index,也同樣存在2種情況,具體情況在複製順序性中描述。

假如此時Server1掛了,從Server2和Server3中選出新的Leader,如此時選為Server2為Leader:

  • Server2就會開啟2-WAL複製通道,向Server1和Server3複製,由於當前Server1掛了,所以暫時只往Server3複製,此時數據的寫入通道為2-WAL;
  • Server1啟動恢復後,Server2會開啟向Server1的2-WAL複製通道,同時Server1會將1-WAL中剩餘的還未向Server2和Server3複製的數據複製給它們。

對於異常情況,WAL中的數據不能正常,由於ACK之後刪除導致WAL佔用過多磁盤,所以對WAL需要有一個SIZE和TTL的清理過程,一旦WAL因為SIZE和TTL清理之後,會導致幾個Index錯亂,具體錯亂情況如上所述。

多通道複製協議帶來的問題:

每個通道都有對應的Index序列,保存每個通道的Last Index。而單通道複製只需要保存1個Last Index即可。這個代價其實還好。

本地寫入

背景

做到Shard級別的寫入隔離,即每個Shard都會有獨立的線程來負責寫入,不會因為某個數據庫或者某個Shard寫入量劇增而導致別的數據庫的寫入,但可能會因為單機承載的Shard數過多導致線程數過多。如果遇到這種情況,應該通過擴機器來解決,或者在新建數據庫的時候合理分配Shard數。

由於是單線程的寫操作,所以在很多情況下,不需要考慮多線程寫帶來的鎖競爭問題。

數據存儲結構

餓了麼輕量級分佈式時序數據庫的設計與探索

說明,以單個數據庫在單節點上的數據結構為例:

  • 一個數據庫在單節點上會存在多個Shard,所有Shard共享一個索引數據;
  • 所有的數據根據數據庫的Interval來計算按時間片,存儲具體的數據,包括數據文件和索引文件。

這樣的設計主要為了方便處理TTL,數據如果過期,直接刪除相應的目錄就可以。每個Shard下面會存在segment,segment根據Interval來存儲相應時間片的數據。

那麼為什麼每個segment下面又按Interval存儲很多個data family?

這個主要由於LinDB主要解決的問題是存儲海量的監控數據,一般的監控數據基本是最新時間寫入,不會寫歷史數據,而整個LinDB的數據存儲類似LSM方式,所以為了減少數據文件之間的合併操作導致寫放大,最終衡量下來,再對segment時間片進行分片。

下面以Interval為10s為例說明:

  • segment按天來存儲;
  • 每個segment按小時來分data family,每個小時一個family,每個family中的文件再按列存儲具體的數據。

寫入流程

餓了麼輕量級分佈式時序數據庫的設計與探索

說明:

  • 系統會為每一個Shard啟一個寫線程,該線程負責這個Shard的所有寫操作。
  • 首先把measurement,tags,fields對應的數據寫入數據庫的索引文件,並生成相應的Measurement ID,Time Series ID及Field ID,主要完成string->int的轉換。這樣的好處是所有的數據存儲都以數據類型來存儲,從而可以減少整個存儲大小。因為對於每個數據點,Measurement,Tags,Field這樣元數據佔用,如cpu{host=1.1.1.1} load=1 1514214168614,其實轉換成ID之後,cpu => 1(measurement id),host=1.1.1.1 => 1(time series id),load => 1(field id),所以最終的數據存儲為1 1 1514214168614=>1,這個考慮OpenTSDB的設計。
  • 如果寫索引失敗,認為本次寫入失敗。失敗分為2種,一種是數據寫入格式有問題,這類失敗直接標示失敗;另外一種由於內部問題,這時寫入失敗需要重試。
  • 使用根據索引得到的ID,再結合寫入時間和數據庫Interval計算,得到需要寫入到哪個segment下的哪個family,寫family的過程,直接寫內存以達到高吞吐量的要求,內存數據到達內存限制之後,會觸發Flush操作。
  • 整個寫過程先寫內存,再由Flusher線程把內存中的數據dump到相應的文件中,這樣就做到了對一批數據順序寫入,同時對於最近的數據根據Field Type進行Rollup操作,從而進一步減少磁盤IO操作。

2、查詢引擎

LinDB查詢需要解決如下問題:

  • 解決多個機房之間的查詢;
  • 高效的流式查詢計算。
餓了麼輕量級分佈式時序數據庫的設計與探索

說明:

  • 由於需要支持多機房或者多集群的查詢,所以引入了LinProxy,LinProxy主要負責面向用戶的查詢請求;
  • SQL Plan負責具體SQL的解析,生成最終的執行計劃及需要計算的中間結果的函數;
  • 通過Zookeeper中的Metadata,把請求路由給具體的LinDB集群中對應的服務;
  • 每個LinConnect負責與一個LinDB集群之間的通信,每個LinConnect內部保存了一份對應集群的Metadata,該Metadata信息在每個Metadata變更的時候由Server端推送給LinConnect,這樣LinConnect基本做到近實時的更新Metadata;
  • Aggregator Stream主要負責把各個LinConnect的中間結果進行最終的合併計算操作;
  • 整個LinProxy處理過程都是異步化,這樣可以利用線程在IO等待的時候做計算。

每個Node接收LinConnect過來的請求,在內部查詢計算成中間結果返回給LinConnect,詳細的過程後面要介紹。

Node查詢

餓了麼輕量級分佈式時序數據庫的設計與探索

說明:

  • 如圖所示,Client過來的一個查詢請求,會產生很多小的查詢任務,每個任務所承擔的職責很單一,只做它所自己的任務,然後把結果給下一個任務,所以需要所有的查詢計算任務都是異常無阻塞處理,IO/CPU任務分離;
  • 整個服務端查詢使用Actor模式來簡化整個Pipeline的處理;
  • 任何一個任務執行完成,如果沒有結果產生,則不會生產下游的任務,所有下游的任務都是根據上游任務是否有結果來決定;
  • 最終把底層結果通過Reduce Aggregate聚合成最終的結果。

3、存儲結構

倒排索引

倒排索引分兩部分,目前索引相關的數據還是存儲在RocksDB中。

  • 根據Time Series的Measurement+Tags生成對應的唯一ID(類似Luence裡面的doc ID);
  • 根據Tags倒排索引,指向一個ID列表。TSID列表以BitMap的方式存儲,以方便查詢的時候通過BitMap操作來過濾出想要的數據。BitMap使用RoaringBitMap;
  • 每一類數據都存儲在獨立的RocksDB Family中。

內存結構

餓了麼輕量級分佈式時序數據庫的設計與探索

為了提高寫入性能,把當前一段時間的數據寫入到內存中,內存到達一定限制或者時間後把內存中的數據Dump到文件中。

內存存儲分為當前可寫和不可寫,當前可寫用於接收正常的數據寫入,不可寫用於Dump到文件中,如果Dump成功,則清空不可寫部分。

如果可寫的Memory Table也到達了內存容量的限制,但不可寫部分還沒有完成Dump,這時寫入會被Block住,直到有可用的內存供數據寫入,目的是為了不會因為佔用過多內存而導致OOM。

MemoryTable內部通過一個Map來存儲Measurement ID→Measurement Store關係,即每個Measurement都存儲在一個獨立的Store中。

在Measurement Store內存儲對應Measurement下面每個TSID的數據,每個TSID對應的數據用一個Memory Block來存儲,每個Memory Block按TSID的順序存儲在Array List中,把TSID存儲在一個BitMap中,通過TSID在Bitmap中位置來定位Memory Block在Array List中的具體位置。

這裡說明一下為什麼不直接使用Map來存儲,因為整個系統是用Java實現的,Java中的Map結構不適合存儲小對象的數據,存在內存放多倍的存儲。

由於每個TSID都會對應一個時間線,每個時間線可能會存在多個數據點的情況,如count時只有一個count值,timer時會有count、sum、min、max等多個值。

每個數據類型以Chunk的方式存儲。Chunk內部又以堆內和堆外2部分內存來存儲,最近一段時間的數據放在堆內,歷史數據壓縮之後放在堆外,在內存中儘量多放一些最近的數據,因為LinDB的目的主要是存儲一些監控類的數據,而監控類的數據主要關心最近一段時間的數據。

文件存儲結構

餓了麼輕量級分佈式時序數據庫的設計與探索

文件存儲跟內存存儲類似,同一個Measurement的數據以Block的方式存儲在一起,查詢時通過Measurement ID定位出該Measurement的數據存儲在哪個Block中。

  • Measurement Block後存儲一個Offset Block,即存儲每個Measurement Block所在的Offset,每個Offset以4 bytes存儲。
  • Offset Block存儲一個Measurement Index Block,按順序存儲每個Measurement ID,以Bitmap的方式存儲。
  • 文件的尾存儲一個Footer Block,主要存儲Version(2 bytes) + Measurement Index Offset(4 bytes) + Measurement Index Length(4 bytes)。
  • Data數據塊都是數值,所以使用xor壓縮,參考facebook的gorilla論文;

Measurement Block:

  • 每個Measurement Block類似Measurement的方式存儲,只是把Measurement ID換成Measurement內的TSID。
  • TS Entry存儲該TSID對應每一列的數據,一列數據對應存儲一段時間的數據點。

查詢邏輯:

  • DataFile在第一次加載的時候會把Measurement Index放在內存中,查詢輸入Measurement ID通過Measurement Index中的第幾個位置,然後通過這個位置N,在Offset Block查詢具體的Measurement Block的Offset,由於每個Offset都是4 bytes,所以offset position = (N-1) * 4,再讀取4 bytes得到真正的Offset。
  • 同樣的道理可以通過TSID,找到具體的TS Entry,再根據條件過濾具體的列數據,最終得到需要讀取的數據。

三、發展歷程

LinDB從2年前正式慢慢服務於公司的監控系統起,從1.0發展到2.0,已經穩定運行2年多,除了一次RocksDB的問題,幾乎沒出過什麼問題。到現在,3.0性能大幅提升,我們基本都是站在業界一些成熟方案的基礎上,慢慢演進而來。

也有人問,LinDB為什麼這麼快,其實我們是參考了很多TSDB的作法,然後取其好的設計,再結合時序的特徵做一些優化。

  • 時序一般都是最新寫入,但也是一種隨機寫,我們會先在內存中把隨寫變成循序寫,最終到寫文件都是順序寫,所有數據都是有序,這樣查詢的時候也是順序讀,這一點很關鍵;
  • 把寫入的measurement/tags/fields都轉化成Int,再生成倒排索引,最終生成一個TSID(類似Luence的doc ID),這樣就大大減少了最終的數據量,畢竟指標這樣字符串是佔絕對的大頭,這點很像OpenTSDB,雖然InfluxDB已經把一段時間的按Block來存儲,但還是在Block的頭放這些數據,這些都是成本,特別是在compact的時候;
  • 不像別的TSDB會把timestamp直接存下來,一般timestamp到毫秒級別佔8個節點,雖然根據時間有序的優勢再用delta-encoded壓縮也是很好,但我們想做到極致,我們是用一個bit來表示時間,具體的做法就是根據上面的描述,把時間的高位和存儲Interval,把高位的時間放在目錄上,再結合高位算一個delta,把delta以1bit的格式存儲,來表示有沒有數據,因為監控數據絕大部分都是連續的數據, 所以這樣做也是合理的,因此在時間這個數據上的存儲也大大減少了空間;
  • 我們發現對一個指標的多個Field的數據,每個Field的數據相鄰的一些點基本是很相近的,LinDB 2.0存儲直接是用RocksDB,多個Field放在一起存儲,再把相鄰的點進行壓縮,這樣其實壓縮率不會很高, 而且每取查詢取Field的時候都要把所有的數據讀出來,這也是LinDB 3.0我們考慮自己實現列式存儲的原因。我們把相同列存在一塊,以提高壓縮率,查詢的時候只讀需要的數據。整個壓縮我們也沒有用gzip、snappy、zlib,因為這些不大適合用於數值類型,我們是直接參考了facebook的gorilla論文的xor的方式來的,這個現在已經被很多TSDB採用;
  • 基於上面這些基本的順序讀已經不成問題,基於TSID查詢的更不是問題,因為整個設計都是基於TSID→data來設計的,所以還要解決一個根據倒排查出一組TSID對數據的隨機讀,如上我們是把TSID放在Bitmap,然後通過Bitmap計算出Offset,直接找到數據,通過存儲時的優化,做到TSID查詢精準查找,而不是通過二分查找;
  • 還有一點就是LinDB在新建數據庫時指定完Interval之後,系統會自己Rollup,不像InfluxDB要寫很多Continue Query,LinDB所有的這一切都是自動化的;
  • 查詢計算並行流式處理。

所以用一句話來總結就是——一個高效的索引外加一堆數值,然後怎麼玩好這堆數值。

自身監控

LinDB也自帶了自身的一些監控功能:

Overview

餓了麼輕量級分佈式時序數據庫的設計與探索

Dashboard

餓了麼輕量級分佈式時序數據庫的設計與探索

未來的展望

  • 豐富查詢函數;
  • 優化內存使用率;
  • 自身監控的提升;
  • 如果有可能,計劃開源。

對比測試

下面是與InfluxDB和LinDB2.0的一些查詢性能對比。由於InfluxDB集群化要商業版,所以都是單機默認配置下,無Cache的測試。服務器配置阿里雲機器:8 Core 16G Memory

大維度

Tags:host(40000),disk(4),partition(20),模擬服務器磁盤的監控,總的Series數為320W,每個Series寫一個數據點:

餓了麼輕量級分佈式時序數據庫的設計與探索

小維度的1天內的聚合測試

Tags:host(400),disk(2),partition(10),模擬服務器磁盤的監控,總的Series數為8K,每個Series寫一天的數據 每個維度每2s寫入1個點,每個維度一天內總共43200個點,所有維度總共43200 * 8000個點,共345600000即3億多數據:

餓了麼輕量級分佈式時序數據庫的設計與探索

小維度的7天內的聚合測試

Tags:host(400),disk(2),partition(10),模擬服務器磁盤的監控,總的Series數為8K,每個Series寫7天的數據,每個維度每5s寫入1個點,每個維度一天內總共17280個點,所有天數所有維度總共172808000 7 個點,即967680000,9億多個點。

這個測試要說明一下,得利於LinDB自動的Rollup,如果InfluxDB開Continue Query的話相信應該也還好。

餓了麼輕量級分佈式時序數據庫的設計與探索


分享到:


相關文章: