自定義Spark Partitioner提升es-hadoop Bulk效率

問題描述

當你bulk數據到集群,按照ElasticSearch Bulk 源碼解析所描述的:

接著通過executeBulk方法進入原來的流程。在該方法中,對bulkRequest.requests 進行了兩次for循環。

第一次判定如果是IndexRequest就調用IndexRequest.process方法,主要是為了解析出timestamp,routing,id,parent 等字段。

第二次是為了對數據進行分揀。大致是為了形成這麼一種結構:

第二次就是對提交的數據進行分揀,然後根據route/_id 等值找到每個數據所屬的Shard,最後將數據發送到對應Shard所在的Node節點上。

然而這導致了兩個問題:

  1. ES Node之間會形成N*N個連接,消耗掉過多的bulk線程
  2. 出現了很多並不需要的網絡IO

所以我們希望能夠避免這種情況。

Spark Partition to ES Shard

我們希望能夠將分揀的邏輯放到Spark端,保證Spark 的Partition 和ES的Shard 一一對應,並且實現特定的Partitoner 保證數據到達ES都會被對應的Shard所在的節點直接消費,而不會再被轉發到其他節點。
經過我的實際測試,做了該調整後,寫入QPS有兩倍以上的提升

理論基礎

這裡的理論基礎自然是es-hadoop項目。

類的調用路徑關係為:

EsSpark -> 
 EsRDDWriter -> 
 RestService -> 
 RestRepository -> 
 RestClient ->
 NetworkClient -> 
 CommonsHttpTransport

簡單介紹下他們的作用:

  • EsSpark, 讀取ES和存儲ES的入口。通過隱式轉換,會顯得更Spark.
  • EsRDDWriter ,調用RestService創建PartitionWriter,對ES進行數據寫入
  • RestService,負責創建 RestRepository,PartitionWriter
  • RestRepository,bulk高層抽象,底層利用NetworkClient做真實的http請求,另外也維護Buffer相關的,典型比如積攢了多少條,多少M之後進行flush等。
  • NetworkClient 對 CommonsHttpTransport的封裝,主要添加了一些節點校驗功能。
  • CommonsHttpTransport 你可以認為是對HttpClient的一個封裝

原來我以為需要對es-hadoop項目的源碼進行修改才能實現前面提到的邏輯。事實上基於es-hadoop很容易實現上面提到的需求。

我們現在解釋下為什麼不需要修改源碼。

在RestService類裡,構建RestRepository的時候,會判定是多索引還是單索引。對應代碼如下:

RestRepository repository = (iformat.hasPattern() ?
 initMultiIndices(settings, currentSplit, resource, log) : 
initSingleIndex(settings, currentSplit, resource, log));

這裡我們只解析單索引部分代碼,在對應的initSingleIndex方法裡有如下代碼:

int bucket = currentInstance % targetShards.size();
Shard chosenShard = orderedShards.get(bucket);
Node targetNode = targetShards.get(chosenShard);

先簡要說明下幾個參數變量。

  • targetShards 是索引所有的主分片到對應Node節點的映射。
  • orderedShards 則是根據shardId 順序排序Shard集合
  • currentInstance 是partitionId

因為我們已經通過partitioner 將partitionId 轉化為shardId,
,也就是partitionId X 裡的數據,都是屬於shardId 為X。 也就是說currentInstance == partitionId == shardId。
下面是我們推導出來的關係:

  • currentInstance < targetShards.size()
  • bucket == currentInstance == partitionId == shardId
  • targetNode 持有ShardId=currentInstance 的Primary Shard

所以這段代碼實際完成了partitionId 到 targetNode的映射關係。

ESShardPartitioner 實現

涉及到這塊的主要有 es-hadoop 的mr以及 spark模塊。在mr模塊裡包含了ES的分片規則實現。 spark 模塊則包含ESShardPartitioner類。

代碼如下:

package org.elasticsearch.spark
import ....
class ESShardPartitioner(settings:String) extends Partitioner {
 protected val log = LogFactory.getLog(this.getClass())
 
 protected var _numPartitions = -1 
 
 override def numPartitions: Int = { 
 val newSettings = new PropertiesSettings().load(settings)
 val repository = new RestRepository(newSettings)
 val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly())
 repository.close()
 _numPartitions = targetShards.size()
 _numPartitions
 }

 override def getPartition(key: Any): Int = {
 val shardId = ShardAlg.shard(key.toString(), _numPartitions)
 shardId
 }
}

public class ShardAlg {
 public static int shard(String id, int shardNum) {
 int hash = Murmur3HashFunction.hash(id);
 return mod(hash, shardNum);
 }

 public static int mod(int v, int m) {
 int r = v % m;
 if (r < 0) {
 r += m;
 }
 return r;
 }
}

使用方式如下:

......partitionBy(new ESShardPartitioner(settings)).foreachPartition { iter =>
 try {
 val newSettings = new PropertiesSettings().load(settings)
 //創建EsRDDWriter
 val writer = EsRDDCreator.createWriter(newSettings.save())
 writer.write(TaskContext.get(), iter.map(f => f._2)) 
 }

不過這種方式也是有一點問題,經過partition 後,Spark Partition Num==ES Primary Shard Num,這樣會使得Spark寫入併發性會受到影響。

這個和Spark Streaming 裡KafkaRDD 的partition數受限於Kafka Partition Num 非常類似。我之前也對這個做了擴展,是的多個Spark Partition 可以映射到同一個Kafka Partition.

所以這裡有第二套方案:

  1. 修改ESShardPartitioner,可以讓多個分區對應一個Shard,並且通過一個Map維護這個關係
  2. 每個分區通過EsRDDWriter指定shardId進行寫入。

第二點可能需要修改es-hadoop源碼了,不過修改也很簡單,通過settings傳遞shardId,然後在RestService.initSingleIndex添加如下代碼:

if(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID) != null){ 
 targetNode = targetShards.get(orderedShards.get(Integer.parseInt(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID))));
 }

在創建EsRDDWriter時拷貝settings的副本並且加入對應的ConfigurationOptions.ES_BULK_SHARDID.

使用時類似下面這個例子:

//val settings = new SparkSettings(conf).save()
.partitionBy(new ESShardPartitioner(settings)).mapPartitionsWithIndex { (partitionIndex, iter) =>
 try {
 val writer = EsSpark.createEsRDDWriter[Map[String,String]](settings, resource)
 //shardToPartitions個 Spark partition 對應一個ES Shard
 val shardId = ESShardPartitioner.shardIdFromPartitionId(partionId, shardToPartitions)
 //強制該分片寫入到特定的Shard裡
 val stats = writer.writeToSpecificPrimaryShard(TaskContext.get(), shardId, iter.map(f => f._2))
 List(NewStats(stats.bulkTotalTime, stats.docsSent)).iterator
 } catch {

這樣可以把一份數據切成多分,併發寫入ES的某個Shard.

自定義Spark Partitioner提升es-hadoop Bulk效率


分享到:


相關文章: