怎麼在Hadoop集羣中新增ElasticSearch操作和Hadoop插件使用

怎麼在Hadoop集群中新增ElasticSearch操作和Hadoop插件使用

在沒有引入elasticsearch-hadoop-xxx.jar相應的Jar包時,的在Hive中執行ElasticSearch外部表操作,會報如下的異常:

[plain] view plain copy

  1. Exception in thread "main" java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "ip-172-17-30-146/172.17.30.146"; destination host is: "ip-172-17-30-146":9000;

通過Spark查看執行任務的MR日誌,報錯如下:

[plain] view plain copy

  1. .hadoop.mapreduce.v2.app.MRAppMaster failed in state INITED; cause: org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.elasticsearch.hadoop.mr.EsOutputFormat not found

  2. org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.elasticsearch.hadoop.mr.EsOutputFormat not found

此時報的是Yarn服務器上面找不到ES-Hadoop相關的類,此時需要做的將elasticsearch-hadoop-xxx.jar增加到Hadoop相關應用的環境中,根據目前我所用到的環境,需要增加的應用有:

[plain] view plain copy

  1. 1.Hive

  2. 2.Spark

  3. 3.Yarn

需要將elasticsearch-hadoop-xxx.jar增加到所有服務器這些應用的環境中,然後重新執行執行就不會報這個問題了。

另外:目前我的做法是手動將elasticsearch-hadoop-6.2.4.jar一臺一臺複製到Yarn服務器的lib目錄下,不知道CDH是否有簡化的管理功能,可以直接上傳對應的Jar包?

為了操作上的操作,我準備了一個批命令,命令記錄如下:

[plain] view plain copy

  1. #/data/share_libs是我的第三共享jar包的目錄

  2. cd /data/share_libs

  3. wget https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-6.2.4.zip

  4. unzip elasticsearch-hadoop-6.2.4.zip

  5. cd elasticsearch-hadoop-6.2.4/dist

  6. #注:這裡不要把所有elasticsearch-hadoop*.jar文件都拷貝過去了,否則Yarn中會報這些不同的Jar包的版本衝突

  7. mv elasticsearch-hadoop-6.2.4.jar /opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/jars/

  8. cd /data/share_libs

  9. #刪除不必要的資源

  10. rm -f elasticsearch-hadoop-6.2.4.zip

  11. rm -rf elasticsearch-hadoop-6.2.4

怎麼在Hadoop集群中新增ElasticSearch操作和Hadoop插件使用

  1. #注:目錄/data/share_libs,在Spark中設置為了其第三庫的目錄,在Hive中也設置為了其auxlib目錄,因而在這裡建立軟件鏈接後,Spark和Hive都可以使用

  2. #Spark中設置第三庫的目錄,可以參看前面一篇文章:https://blog.csdn.net/fenglibing/article/details/80437246

  3. ln -s /opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/jars/elasticsearch-hadoop-6.2.4.jar elasticsearch-hadoop-6.2.4.jar

  4. cd /opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop-yarn/lib

  5. ln -s /opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/jars/elasticsearch-hadoop-6.2.4.jar elasticsearch-hadoop-6.2.4.jar

以下是通過創建一個外部表,然後測試查詢的語句:

[sql] view plain copy

  1. create external table test_in_es

  2. (

  3. id string,

  4. k string,

  5. v string

  6. )

  7. STORED BY'org.elasticsearch.hadoop.hive.EsStorageHandler'

  8. TBLPROPERTIES(

  9. 'es.nodes' = 'http://vpc-es-xxxxxxxxx.eu-west-1.es.amazonaws.com:80',

  10. 'es.index.auto.create' = 'false',

  11. 'es.nodes.wan.only' = 'true',

  12. 'es.resource' = 'test/test',

  13. 'es.read.metadata' = 'true',

  14. 'es.mapping.names' = 'id:_metadata._id,k:k, v:v');

  15. select * from test_in_es;

如果遇到“EsHadoopIllegalArgumentException:No data nodes with HTTP-enabled available”這樣的異常,請查看這篇文章:https://blog.csdn.net/fenglibing/article/details/80478551。

利用spark讀取es數據源的簡單示例

import org.elasticsearch.spark.sql._val esOptions = Map("es.nodes"->"192.168.1.2,192.168.1.3", "es.scroll.size"->"1000", "es.field.read.as.array.include"->"SampleField")val esDF = sqlContext.read.format("org.elasticsearch.spark.sql").options(esOptions).load("sampleindex/es-spark")esDF.registerTempTable("esdemotbl")

es.scroll.size 一次性讀入的記錄數,默認是10, 如果不設置為大一點的值,要從es中讀取1億條數據,那將是一個漫長的過程

es.field.read.as.array.include 有的字段在es中是以string類型存儲,但其中包含逗號(,), spark默認認為這是數組類型,如果讀取這種字段的話,就會報錯,怎麼辦,那就用es.field.read.as.array.include來顯式指明

spark讀取es中數據的時候,partition數目取決於es中指定index的shard數目,為了獲得比較高的併發讀取性能,建議適當設置shard數目,為什麼是適當,因為具體取決於集群規模等多種因素。

字段名的大小寫問題

在hive中,字段名是_大小寫不敏感_的, 但在ES中是大小寫敏感的

你說,這又怎麼樣。 呵呵, 這意味著不做特殊處理,永遠無法讀出es中大寫字段名的內容,你看到的將是滿屏的_NULL_

這該怎麼破,很簡單,指定 es.mapping.names

比如在es中,字段名為DemoField, 要讀出其中的內容,hive表的字義就該這樣寫

create external table es_demo_tbl(demofield string)STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'TBLPROPERTIES('es.nodes'='192.168.1.2,192.168.1.3', 'es.resource'='demoindex/sample','es.mapping.names'='demofield:DemoField')

注意是先hive中的字段名,然後是es中的字段名


分享到:


相關文章: