大數據之HBase MapReduce的實例分析

跟Hadoop的無縫集成使得使用MapReduce對HBase的數據進行分佈式計算非常方便,本文將以前面的blog示例,介紹HBase下MapReduce開發要點。很好理解本文前提是你對Hadoop MapReduce有一定的瞭解。

HBase MapReduce核心類介紹

首先一起來回顧下MapReduce的基本編程模型,

大數據之HBase MapReduce的實例分析

可以看到最基本的是通過Mapper和Reducer來處理KV對,Mapper的輸出經Shuffle及Sort後變為Reducer的輸入。除了Mapper和Reducer外,另外兩個重要的概念是InputFormat和OutputFormat,定義了Map-Reduce的輸入和輸出相關的東西。HBase通過對這些類的擴展(繼承)來方便MapReduce任務來讀寫HTable中的數據。

大數據之HBase MapReduce的實例分析

實例分析

我們還是以最初的blog例子來進行示例分析,業務需求是這樣:找到具有相同興趣的人,我們簡單定義為如果author之間article的tag相同,則認為兩者有相同興趣,將分析結果保存到HBase。除了上面介紹的blog表外,我們新增一張表tag_friend,RowKey為tag,Value為authors,大概就下面這樣。

大數據之HBase MapReduce的實例分析

我們省略了一些跟分析無關的Column數據,上面的數據按前面描述的業務需求經過MapReduce分析,應該得到下面的結果

大數據之HBase MapReduce的實例分析

實際的運算過程分析如下

大數據之HBase MapReduce的實例分析

代碼實現

有了上面的分析,代碼實現就比較簡單了。只需以下幾步

  1. 定義Mapper類繼承TableMapper,map的輸入輸出KV跟上面的分析一致。public static class Mapper extends TableMapper {
  2. public Mapper() {}
  3. @Override
  4. public void map(ImmutableBytesWritable row, Result values,Context context) throws IOException {
  5. ImmutableBytesWritable value = null;
  6. String[] tags = null;
  7. for (KeyValue kv : values.list()) {
  8. if ("author".equals(Bytes.toString(kv.getFamily()))
  9. && "nickname".equals(Bytes.toString(kv.getQualifier()))) {
  10. value = new ImmutableBytesWritable(kv.getValue());
  11. }
  12. if ("article".equals(Bytes.toString(kv.getFamily()))
  13. && "tags".equals(Bytes.toString(kv.getQualifier()))) {
  14. tags = Bytes.toString(kv.getValue()).split(",");
  15. }
  16. }
  17. for (int i = 0; i < tags.length; i++) {
  18. ImmutableBytesWritable key = new ImmutableBytesWritable(
  19. Bytes.toBytes(tags[i].toLowerCase()));
  20. try {
  21. context.write(key,value);
  22. } catch (InterruptedException e) {
  23. throw new IOException(e);
  24. }
  25. }
  26. }
  27. }

複製代碼

  1. 定義Reducer類繼承TableReducer,reduce的輸入輸出KV跟上面分析的一致。public static class Reducer extends TableReducer {
  2. @Override
  3. public void reduce(ImmutableBytesWritable key,Iterable values,
  4. Context context) throws IOException, InterruptedException {
  5. String friends="";
  6. for (ImmutableBytesWritable val : values) {
  7. friends += (friends.length()>0?",":"")+Bytes.toString(val.get());
  8. }
  9. Put put = new Put(key.get());
  10. put.add(Bytes.toBytes("person"), Bytes.toBytes("nicknames"),
  11. Bytes.toBytes(friends));
  12. context.write(key, put);
  13. }
  14. }

複製代碼

  1. 在提交作業時設置inputFormat為TableInputFormat,設置outputFormat為TableOutputFormat,可以藉助TableMapReduceUtil類來簡化編碼。public static void main(String[] args) throws Exception {
  2. Configuration conf = new Configuration();
  3. conf = HBaseConfiguration.create(conf);
  4. Job job = new Job(conf, "HBase_FindFriend");
  5. job.setJarByClass(FindFriend.class);
  6. Scan scan = new Scan();
  7. scan.addColumn(Bytes.toBytes("author"),Bytes.toBytes("nickname"));
  8. scan.addColumn(Bytes.toBytes("article"),Bytes.toBytes("tags"));
  9. TableMapReduceUtil.initTableMapperJob("blog", scan,FindFriend.Mapper.class,
  10. ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
  11. TableMapReduceUtil.initTableReducerJob("tag_friend",FindFriend.Reducer.class, job);
  12. System.exit(job.waitForCompletion(true) ? 0 : 1);
  13. }

複製代碼

小結

本文通過實例分析演示了使用MapReduce分析HBase的數據,需要注意的這只是一種常規的方式(分析表中的數據存到另外的表中),實際上不侷限於此,不過其他方式跟此類似。如果你進行到這裡,你肯定想要馬上運行它看看結果,希望大家多多關注哦。


分享到:


相關文章: