06.25 druid中 如何從本地批(batch)導入與從hdfs 批導入數據 實戰

使用indexing service 批導入數據,如何配置task文件,指定從本地 和 hdfs中導入數據。很多在手冊中沒有詳細說明,配置起來存在困難。

先搭建幾個節點:coordinator、historical、overlord、middleManager。並且啟動服務。

前提:需要準備好mysql(http://my.oschina.net/u/2460844/blog/637334 該文中說明了mysql的配置)、hdfs集群、zookeeper(單機版就可以)

1. __common 配置:

[html] view plain copy

[html] view plain copy

  1. druid.extensions.loadList=["mysql-metadata-storage","druid-hdfs-storage"]
  2. ruid.startup.logging.logProperties=true
  3. druid.zk.service.host=10.70.27.8:2181,10.70.27.10:2181,10.70.27.12:2181
  4. druid.zk.paths.base=/druid
  5. druid.metadata.storage.type=mysql
  6. druid.metadata.storage.connector.connectURI=jdbc:mysql://10.70.27.12:3306/druid
  7. druid.metadata.storage.connector.user=fool
  8. druid.metadata.storage.connector.password=fool
  9. druid.storage.type=hdfs
  10. druid.storage.storageDirectory=hdfs://10.70.27.3:9000/data/druid/segments
  11. druid.indexer.logs.type=hdfs
  12. druid.indexer.logs.directory=/data/druid/indexing-logs
  13. druid.monitoring.monitors=["io.druid.java.util.metrics.JvmMonitor"]
  14. druid.emitter=logging
  15. druid.emitter.logging.logLevel=info
  16. druid.indexing.doubleStorage=double

2. coordinator 配置:

[html] view plain copy

  1. druid.host=druid01
  2. druid.port=8081
  3. druid.service=coordinator
  4. druid.coordinator.startDelay=PT5M

3. historical 配置:

[html] view plain copy

  1. druid.host=druid02
  2. druid.port=8082
  3. druid.service=druid/historical
  4. druid.historical.cache.useCache=true
  5. druid.historical.cache.populateCache=true
  6. druid.processing.buffer.sizeBytes=100000000
  7. druid.processing.numThreads=3
  8. druid.server.http.numThreads=5
  9. druid.server.maxSize=300000000000
  10. druid.segmentCache.locations=[{"path": " /tmp/druid/indexCache", "maxSize": 300000000000}]
  11. druid.monitoring.monitors=["io.druid.server.metrics.HistoricalMetricsMonitor", "com.metamx.metrics.JvmMonitor"]

4. overlord 配置:

[html] view plain copy

  1. druid.host=druid03
  2. druid.port=8090
  3. druid.service=overlord
  4. druid.indexer.autoscale.doAutoscale=true
  5. druid.indexer.autoscale.strategy=ec2
  6. druid.indexer.autoscale.workerIdleTimeout=PT90m
  7. druid.indexer.autoscale.terminatePeriod=PT5M
  8. druid.indexer.autoscale.workerVersion=0
  9. druid.indexer.logs.type=local
  10. druid.indexer.logs.directory=/tmp/druid/indexlog
  11. druid.indexer.runner.type=remote
  12. druid.indexer.runner.minWorkerVersion=0
  13. # Store all task state in the metadata storage
  14. druid.indexer.storage.type=metadata
  15. #druid.indexer.fork.property.druid.processing.numThreads=1
  16. #druid.indexer.fork.property.druid.computation.buffer.size=100000000
  17. druid.indexer.runner.type=remote

5. middleManager 配置:

[html]

view plain copy

  1. druid.host=druid04
  2. druid.port=8091
  3. druid.service=druid/middlemanager
  4. druid.indexer.logs.type=local
  5. druid.indexer.logs.directory=/tmp/druid/indexlog
  6. druid.indexer.fork.property.druid.processing.numThreads=5
  7. druid.indexer.fork.property.druid.computation.buffer.size=100000000
  8. # Resources for peons
  9. druid.indexer.runner.javaOpts=-server -Xmx3g
  10. druid.indexer.task.baseTaskDir=/tmp/persistent/task/

6. 分別啟動各個節點,如果出現了啟動問題,很能是因為內存問題,可適當調整java運行參數。

7. 需要導入的數據 wikipedia_data.csv , wikipedia_data.json

---wikipedia_data.json:

[html] view plain copy

  1. {"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
  2. {"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
  3. {"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
  4. {"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
  5. {"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "cancer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}

---wikipedia_data.csv:

[html] view plain copy

  1. 31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Francisco, 57, 200, -143
  2. 2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Francisc, 57, 200, -143
  3. 2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Francis, 57, 200, -143
  4. 2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Franci, 57, 200, -143
  5. 2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Franc, 57, 200, -143
  6. 2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Fran, 57, 200, -143
  7. 2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Fra, 57, 200, -143
  8. 2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Fr, 57, 200, -143
  9. 2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San F, 57, 200, -143
  10. 2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, Sa , 57, 200, -143

注意 這裡導入的數據 如果保存在本機磁盤導入時,數據文件必須保存在middleManager節點上,

不然提交task後無法找到文件。

如果是從hdfs中導入,只需要先put到hdfs文件系統中。這裡的overlord 節點是druid03(你可以換成ip)。

  • 8 從本地倒入數據到druid

在任意一個節點上(保證這個節點能夠訪問druid03), 創建一個json的index task任務.

8.1 導入一個 本地local保存的json格式的文件,這個task的json如下所示:

8.1.1 先將數據 wikipedia_data.json 保存在middleManager節點的druid的文件夾下(比如/root/druid-0.8.3)。

vi wikipedia_index_local_json_task.json , 內容如下。

[html] view plain copy

  1. {
  2. "type" : "index_hadoop",
  3. "spec" : {
  4. "dataSchema" : {
  5. "dataSource" : "wikipedia",
  6. "parser" : {
  7. "type" : "string",
  8. "parseSpec" : {
  9. "format" : "json",
  10. "timestampSpec" : {
  11. "column" : "timestamp",
  12. "format" : "auto"
  13. },
  14. "dimensionsSpec" : {
  15. "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
  16. "dimensionExclusions" : [],
  17. "spatialDimensions" : []
  18. }
  19. }
  20. },
  21. "metricsSpec" : [
  22. {
  23. "type" : "count",
  24. "name" : "count"
  25. },
  26. {
  27. "type" : "doubleSum",
  28. "name" : "added",
  29. "fieldName" : "added"
  30. },
  31. {
  32. "type" : "doubleSum",
  33. "name" : "deleted",
  34. "fieldName" : "deleted"
  35. },
  36. {
  37. "type" : "doubleSum",
  38. "name" : "delta",
  39. "fieldName" : "delta"
  40. }
  41. ],
  42. "granularitySpec" : {
  43. "type" : "uniform",
  44. "segmentGranularity" : "DAY",
  45. "queryGranularity" : "NONE",
  46. "intervals" : [ "2013-08-31/2013-09-01" ]
  47. }
  48. },
  49. "ioConfig": {
  50. "type": "index",
  51. "firehose": {
  52. "type": "local",
  53. "baseDir": "./",
  54. "filter": "wikipedia_data.json"
  55. }
  56. },
  57. "tuningConfig": {
  58. "type": "index",
  59. "targetPartitionSize": 0,
  60. "rowFlushBoundary": 0
  61. }
  62. }
  63. }

8.1.2. 提交任務,前面已經說過了overlord節點在druid03主機上,所以得向 druid03 主機提交任務,命令如下:

# curl -X 'POST' -H 'Content-Type:application/json' -d @wikipedia_index_local_json_task.json druid03:8090/druid/indexer/v1/task

在overlord節點的日誌上可以看出任務的情況,當出現如下信息表示任務成功

[html] view plain copy

  1. 2016-03-29T17:35:11,385 INFO [forking-task-runner-1] io.druid.indexing.overlord.ForkingTaskRunner - Logging task index_hadoop_NN_2016-03-29T17:35:11.510+08:00 output to: /tmp/persistent/task/index_hadoop_NN_2016-03-29T17:35:11.510+08:00/log
  2. 2016-03-29T17:42:15,263 INFO [forking-task-runner-1] io.druid.indexing.overlord.ForkingTaskRunner - Process exited with status[0] for task: index_hadoop_NN_2016-03-29T17:35:11.510+08:00
  3. 2016-03-29T17:42:15,265 INFO [forking-task-runner-1] io.druid.indexing.common.tasklogs.FileTaskLogs - Wrote task log to: /tmp/druid/indexlog/index_hadoop_NN_2016-03-29T17:35:11.510+08:00.log
  4. 2016-03-29T17:42:15,267 INFO [forking-task-runner-1] io.druid.indexing.overlord.ForkingTaskRunner - Removing task directory: /tmp/persistent/task/index_hadoop_NN_2016-03-29T17:35:11.510+08:00
  5. 2016-03-29T17:42:15,284 INFO [WorkerTaskMonitor-1] io.druid.indexing.worker.WorkerTaskMonitor - Job's finished. Completed [index_hadoop_NN_2016-03-29T17:35:11.510+08:00] with status [SUCCESS

8.2 本地導入csv格式數據的 task文件示例。

wikipedia_data.csv 需要先保存在middleManager節點的druid目錄下(比如/root/druid-0.8.3)。

8.2.1 task文件 wikipedia_index_local_csv_task.json 內容如下:

[html] view plain copy

  1. {
  2. "type": "index",
  3. "spec": {
  4. "dataSchema": {
  5. "dataSource": "wikipedia",
  6. "parser": {
  7. "type": "string",
  8. "parseSpec":
  9. {
  10. "format" : "csv",
  11. "timestampSpec" :
  12. {
  13. "column" : "timestamp"
  14. },
  15. "columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],
  16. "dimensionsSpec" :
  17. {
  18. "dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
  19. }
  20. }
  21. },
  22. "metricsSpec": [
  23. {
  24. "type": "count",
  25. "name": "count"
  26. },
  27. {
  28. "type": "doubleSum",
  29. "name": "added",
  30. "fieldName": "added"
  31. },
  32. {
  33. "type": "doubleSum",
  34. "name": "deleted",
  35. "fieldName": "deleted"
  36. },
  37. {
  38. "type": "doubleSum",
  39. "name": "delta",
  40. "fieldName": "delta"
  41. }
  42. ],
  43. "granularitySpec": {
  44. "type": "uniform",
  45. "segmentGranularity": "DAY",
  46. "queryGranularity": "NONE",
  47. "intervals": ["2013-08-31/2013-09-01"]
  48. }
  49. },
  50. "ioConfig": {
  51. "type": "index",
  52. "firehose": {
  53. "type": "local",
  54. "baseDir": "./",
  55. "filter": "wikipedia_data.csv"
  56. }
  57. },
  58. "tuningConfig": {
  59. "type": "index",
  60. "targetPartitionSize": 0,
  61. "rowFlushBoundary": 0
  62. }
  63. }
  64. }

8.2.2. 提交任務,前面已經說過了overlord節點在druid03主機上,所以得向 druid03 主機提交任務,命令如下:

# curl -X 'POST' -H 'Content-Type:application/json' -d @wikipedia_index_local_csv_task.json druid03:8090/druid/indexer/v1/task

下面說一下如何從hdfs倒入csv和json格式的文件。

9 從hdfs中倒入數據到druid

9.1 導入hdfs中的json文件。

先需要把wikipedia_data.json ftp到到hdfs系統中,記住目錄然後在task文件中給定路徑,hdfs路徑中要帶有hdfs 的namenode的 名字或者ip。

這裡使用vm1.cci代替namenode的ip。注意對比與本地導入task文件的區別,這些區別決定你能否導入成功。

然後和從本地倒入數據的過程一樣,向overload提交任務即可。

ask.json 文件描述如下:

[html] view plain copy

  1. {
  2. "type" : "index_hadoop",
  3. "spec" : {
  4. "dataSchema" : {
  5. "dataSource" : "wikipedia",
  6. "parser" : {
  7. "type" : "string",
  8. "parseSpec" : {
  9. "format" : "json",
  10. "timestampSpec" : {
  11. "column" : "timestamp",
  12. "format" : "auto"
  13. },
  14. "dimensionsSpec" : {
  15. "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
  16. "dimensionExclusions" : [],
  17. "spatialDimensions" : []
  18. }
  19. }
  20. },
  21. "metricsSpec" : [
  22. {
  23. "type" : "count",
  24. "name" : "count"
  25. },
  26. {
  27. "type" : "doubleSum",
  28. "name" : "added",
  29. "fieldName" : "added"
  30. },
  31. {
  32. "type" : "doubleSum",
  33. "name" : "deleted",
  34. "fieldName" : "deleted"
  35. },
  36. {
  37. "type" : "doubleSum",
  38. "name" : "delta",
  39. "fieldName" : "delta"
  40. }
  41. ],
  42. "granularitySpec" : {
  43. "type" : "uniform",
  44. "segmentGranularity" : "DAY",
  45. "queryGranularity" : "NONE",
  46. "intervals" : [ "2013-08-31/2013-09-01" ]
  47. }
  48. },
  49. "ioConfig" : {
  50. "type" : "hadoop",
  51. "inputSpec" : {
  52. "type" : "static",
  53. "paths" : "hdfs://vm1.cci/tmp/druid/datasource/wikipedia_data.json"
  54. }
  55. },
  56. "tuningConfig" : {
  57. "type": "hadoop"
  58. }
  59. }
  60. }

9.2 導入hdfs中的csv格式文件。

task.json 文件描述如下:

[html] view plain copy

  1. {
  2. "type": "index",
  3. "spec": {
  4. "dataSchema": {
  5. "dataSource": "wikipedia",
  6. "parser": {
  7. "type": "string",
  8. "parseSpec":
  9. {
  10. "format" : "csv",
  11. "timestampSpec" :
  12. {
  13. "column" : "timestamp"
  14. },
  15. "columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],
  16. "dimensionsSpec" :
  17. {
  18. "dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
  19. }
  20. }
  21. },
  22. "metricsSpec": [
  23. {
  24. "type": "count",
  25. "name": "count"
  26. },
  27. {
  28. "type": "doubleSum",
  29. "name": "added",
  30. "fieldName": "added"
  31. },
  32. {
  33. "type": "doubleSum",
  34. "name": "deleted",
  35. "fieldName": "deleted"
  36. },
  37. {
  38. "type": "doubleSum",
  39. "name": "delta",
  40. "fieldName": "delta"
  41. }
  42. ],
  43. "granularitySpec": {
  44. "type": "uniform",
  45. "segmentGranularity": "DAY",
  46. "queryGranularity": "NONE",
  47. "intervals": ["2013-08-31/2013-09-01"]
  48. }
  49. },
  50. "ioConfig" : {
  51. "type" : "hadoop",
  52. "inputSpec" : {
  53. "type" : "static",
  54. "paths" : "hdfs://vm1.cci/tmp/druid/datasource/wikipedia_data.csv"
  55. }
  56. },
  57. "tuningConfig" : {
  58. "type": "hadoop"
  59. }
  60. }
  61. }

總結: druid.io 可以配置的項超級多,任何一個地方配置疏忽都可能會導致task失敗。

這裡給出四種示例,還是有必要細分其中的差別。初學者磕絆在此很難免。

注意: 如果你的druid extension用的hadoop 版本和目標的hadoop機器用的版本不一樣,則必須用druid自己帶的hadoop版本,否則hadoop Map Reduce任務起不來。 通過下面的兩個參數指定druid 自己帶的hadoop版本。本次druid自己帶的是2.7.3 hadoop版本。

"tuningConfig" : {

"type" : "hadoop",

"partitionsSpec" : {

"type" : "hashed",

"targetPartitionSize" : 5000000

},

"jobProperties" : {

"mapreduce.job.classloader":"true"

}

}

},

"hadoopDependencyCoordinates": [

"org.apache.hadoop:hadoop-client:2.7.3"

]

druid中 如何從本地批(batch)導入與從hdfs 批導入數據 實戰


分享到:


相關文章: