問題描述
當你bulk數據到集群,按照ElasticSearch Bulk 源碼解析所描述的:
接著通過executeBulk方法進入原來的流程。在該方法中,對bulkRequest.requests 進行了兩次for循環。
第一次判定如果是IndexRequest就調用IndexRequest.process方法,主要是為了解析出timestamp,routing,id,parent 等字段。
第二次是為了對數據進行分揀。大致是為了形成這麼一種結構:
第二次就是對提交的數據進行分揀,然後根據route/_id 等值找到每個數據所屬的Shard,最後將數據發送到對應Shard所在的Node節點上。
然而這導致了兩個問題:
- ES Node之間會形成N*N個連接,消耗掉過多的bulk線程
- 出現了很多並不需要的網絡IO
所以我們希望能夠避免這種情況。
Spark Partition to ES Shard
我們希望能夠將分揀的邏輯放到Spark端,保證Spark 的Partition 和ES的Shard 一一對應,並且實現特定的Partitoner 保證數據到達ES都會被對應的Shard所在的節點直接消費,而不會再被轉發到其他節點。
經過我的實際測試,做了該調整後,寫入QPS有兩倍以上的提升
理論基礎
這裡的理論基礎自然是es-hadoop項目。
類的調用路徑關係為:
EsSpark -> EsRDDWriter -> RestService -> RestRepository -> RestClient -> NetworkClient -> CommonsHttpTransport
簡單介紹下他們的作用:
- EsSpark, 讀取ES和存儲ES的入口。通過隱式轉換,會顯得更Spark.
- EsRDDWriter ,調用RestService創建PartitionWriter,對ES進行數據寫入
- RestService,負責創建 RestRepository,PartitionWriter
- RestRepository,bulk高層抽象,底層利用NetworkClient做真實的http請求,另外也維護Buffer相關的,典型比如積攢了多少條,多少M之後進行flush等。
- NetworkClient 對 CommonsHttpTransport的封裝,主要添加了一些節點校驗功能。
- CommonsHttpTransport 你可以認為是對HttpClient的一個封裝
原來我以為需要對es-hadoop項目的源碼進行修改才能實現前面提到的邏輯。事實上基於es-hadoop很容易實現上面提到的需求。
我們現在解釋下為什麼不需要修改源碼。
在RestService類裡,構建RestRepository的時候,會判定是多索引還是單索引。對應代碼如下:
RestRepository repository = (iformat.hasPattern() ? initMultiIndices(settings, currentSplit, resource, log) : initSingleIndex(settings, currentSplit, resource, log));
這裡我們只解析單索引部分代碼,在對應的initSingleIndex方法裡有如下代碼:
int bucket = currentInstance % targetShards.size(); Shard chosenShard = orderedShards.get(bucket); Node targetNode = targetShards.get(chosenShard);
先簡要說明下幾個參數變量。
- targetShards 是索引所有的主分片到對應Node節點的映射。
- orderedShards 則是根據shardId 順序排序Shard集合
- currentInstance 是partitionId
因為我們已經通過partitioner 將partitionId 轉化為shardId,
,也就是partitionId X 裡的數據,都是屬於shardId 為X。 也就是說currentInstance == partitionId == shardId。
下面是我們推導出來的關係:
- currentInstance < targetShards.size()
- bucket == currentInstance == partitionId == shardId
- targetNode 持有ShardId=currentInstance 的Primary Shard
所以這段代碼實際完成了partitionId 到 targetNode的映射關係。
ESShardPartitioner 實現
涉及到這塊的主要有 es-hadoop 的mr以及 spark模塊。在mr模塊裡包含了ES的分片規則實現。 spark 模塊則包含ESShardPartitioner類。
代碼如下:
package org.elasticsearch.spark import .... class ESShardPartitioner(settings:String) extends Partitioner { protected val log = LogFactory.getLog(this.getClass()) protected var _numPartitions = -1 override def numPartitions: Int = { val newSettings = new PropertiesSettings().load(settings) val repository = new RestRepository(newSettings) val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly()) repository.close() _numPartitions = targetShards.size() _numPartitions } override def getPartition(key: Any): Int = { val shardId = ShardAlg.shard(key.toString(), _numPartitions) shardId } } public class ShardAlg { public static int shard(String id, int shardNum) { int hash = Murmur3HashFunction.hash(id); return mod(hash, shardNum); } public static int mod(int v, int m) { int r = v % m; if (r < 0) { r += m; } return r; } }
使用方式如下:
......partitionBy(new ESShardPartitioner(settings)).foreachPartition { iter => try { val newSettings = new PropertiesSettings().load(settings) //創建EsRDDWriter val writer = EsRDDCreator.createWriter(newSettings.save()) writer.write(TaskContext.get(), iter.map(f => f._2)) }
不過這種方式也是有一點問題,經過partition 後,Spark Partition Num==ES Primary Shard Num,這樣會使得Spark寫入併發性會受到影響。
這個和Spark Streaming 裡KafkaRDD 的partition數受限於Kafka Partition Num 非常類似。我之前也對這個做了擴展,是的多個Spark Partition 可以映射到同一個Kafka Partition.
所以這裡有第二套方案:
- 修改ESShardPartitioner,可以讓多個分區對應一個Shard,並且通過一個Map維護這個關係
- 每個分區通過EsRDDWriter指定shardId進行寫入。
第二點可能需要修改es-hadoop源碼了,不過修改也很簡單,通過settings傳遞shardId,然後在RestService.initSingleIndex添加如下代碼:
if(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID) != null){ targetNode = targetShards.get(orderedShards.get(Integer.parseInt(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID)))); }
在創建EsRDDWriter時拷貝settings的副本並且加入對應的ConfigurationOptions.ES_BULK_SHARDID.
使用時類似下面這個例子:
//val settings = new SparkSettings(conf).save() .partitionBy(new ESShardPartitioner(settings)).mapPartitionsWithIndex { (partitionIndex, iter) => try { val writer = EsSpark.createEsRDDWriter[Map[String,String]](settings, resource) //shardToPartitions個 Spark partition 對應一個ES Shard val shardId = ESShardPartitioner.shardIdFromPartitionId(partionId, shardToPartitions) //強制該分片寫入到特定的Shard裡 val stats = writer.writeToSpecificPrimaryShard(TaskContext.get(), shardId, iter.map(f => f._2)) List(NewStats(stats.bulkTotalTime, stats.docsSent)).iterator } catch {
這樣可以把一份數據切成多分,併發寫入ES的某個Shard.
關鍵字: ElasticSearch ES 源碼