Hbase原理與使用

hbase

hbase簡介

1.1.什麼是hbase

HBASE是一個高可靠性、高性能、面向列、可伸縮的分佈式存儲系統,利用HBASE技術可在廉價PC Server上搭建起大規模結構化存儲集群。

HBASE的目標是存儲並處理大型的數據,更具體來說是僅需使用普通的硬件配置,就能夠處理由成千上萬的行和列所組成的大型數據。

HBASE是Google Bigtable的開源實現,但是也有很多不同之處。比如:Google Bigtable利用GFS作為其文件存儲系統,HBASE利用Hadoop HDFS作為其文件存儲系統;Google運行MAPREDUCE來處理Bigtable中的海量數據,HBASE同樣利用Hadoop MapReduce來處理HBASE中的海量數據;Google Bigtable利用Chubby作為協同服務,HBASE利用Zookeeper作為對應。

1.2.與傳統數據庫的對比

1、傳統數據庫遇到的問題:

1)數據量很大的時候無法存儲

2)沒有很好的備份機制

3)數據達到一定數量開始緩慢,很大的話基本無法支撐

2、HBASE優勢:

1)線性擴展,隨著數據量增多可以通過節點擴展進行支撐

2)數據存儲在hdfs上,備份機制健全

3)通過zookeeper協調查找數據,訪問速度塊。

1.3.hbase集群中的角色

1、一個或者多個主節點,Hmaster

2、多個從節點,HregionServer

habse安裝

2.1.hbase安裝

2.1.1.上傳

用工具上傳

2.1.2.解壓

su – hadoop

tar -zxvf hbase-0.94.6.tar.gz

2.1.3.重命名

mv hbase-0.94.6 hbase

2.1.4.修改環境變量(每臺機器都要執行)

su – root

vi /etc/profile

添加內容:

export HBASE_HOME=/home/hadoop/hbase

export PATH=PATH: PATH: PATH:PATH:PATH:PATH:HBASE_HOME/bin

source /etc/proflie

su - hadoop

2.1.5.修改配置文件

上傳配置文件

2.1.6.分發到其他節點

scp -r /home/hadoop/hbase hadoop@slave1:/home/hadoop/

scp -r /home/hadoop/hbase hadoop@slave2:/home/hadoop/

scp -r /home/hadoop/hbase hadoop@slave3:/home/hadoop/

2.1.7.啟動

注意:啟動hbase之前,必須保證hadoop集群和zookeeper集群是可用的。

start-hbase.sh

2.1.8.監控

1、進入命令行

hbase shell

2、頁面監控

http://master:60010/

hbase數據模型

3.1.hbase數據模型

3.1.1.Row Key

與nosql數據庫們一樣,row key是用來檢索記錄的主鍵。訪問HBASE table中的行,只有三種方式:

1.通過單個row key訪問

2.通過row key的range(正則)

3.全表掃描

Row key行鍵 (Row key)可以是任意字符串(最大長度 是 64KB,實際應用中長度一般為 10-100bytes),在HBASE內部,row key保存為字節數組。存儲時,數據按照Row key的字典序(byte order)排序存儲。設計key時,要充分排序存儲這個特性,將經常一起讀取的行存儲放到一起。(位置相關性)

3.1.2.Columns Family

列簇 :HBASE表中的每個列,都歸屬於某個列族。列族是表的schema的一部 分(而列不是),必須在使用表之前定義。列名都以列族作為前綴。例如 courses:history,courses:math都屬於courses 這個列族。

3.1.3.Cell

由{row key, columnFamily, version} 唯一確定的單元。cell中 的數據是沒有類型的,全部是字節碼形式存貯。

關鍵字:無類型、字節碼

3.1.4.Time Stamp

HBASE 中通過rowkey和columns確定的為一個存貯單元稱為cell。每個 cell都保存 著同一份數據的多個版本。版本通過時間戳來索引。時間戳的類型是 64位整型。時間戳可以由HBASE(在數據寫入時自動 )賦值,此時時間戳是精確到毫秒 的當前系統時間。時間戳也可以由客戶顯式賦值。如果應用程序要避免數據版 本衝突,就必須自己生成具有唯一性的時間戳。每個 cell中,不同版本的數據按照時間倒序排序,即最新的數據排在最前面。

為了避免數據存在過多版本造成的的管理 (包括存貯和索引)負擔,HBASE提供 了兩種數據版本回收方式。一是保存數據的最後n個版本,二是保存最近一段 時間內的版本(比如最近七天)。用戶可以針對每個列族進行設置。

hbase命令

4.1.命令的進退

1、hbase提供了一個shell的終端給用戶交互

#KaTeX parse error: Expected 'EOF', got '#' at position 44: …2、如果退出執行quit命令

#̲HBASE_HOME/bin/hbase shell

……

quit

4.2.命令

名稱命令表達式

創建表:create ‘表名’, ‘列族名1’,‘列族名2’,‘列族名N’

查看所有表:list

描述表:describe ‘表名’

判斷表存在:exists ‘表名’

判斷是否禁用/啟用表:is_enabled ‘表名’/is_disabled ‘表名’

添加記錄:put ‘表名’, ‘rowKey’, ‘列族 : 列‘ , ‘值’

查看記錄rowkey下的所有數據:get ‘表名’ , ‘rowKey’

查看錶中的記錄總數:count ‘表名’

獲取某個列族:get ‘表名’,‘rowkey’,‘列族’

獲取某個列族的某個列 :get ‘表名’,‘rowkey’,‘列族:列’

刪除記錄: delete ‘表名’ ,‘行名’ , ‘列族:列’

刪除整行 :deleteall ‘表名’,‘rowkey’

刪除一張表:先要屏蔽該表,才能對該表進行刪除 第一步: disable ‘表名’ ,第二步 : drop ‘表名’

清空表:truncate ‘表名’

查看所有記錄 :scan “表名”

查看某個表某個列中所有數據:scan “表名” , {COLUMNS=>‘列族名:列名’}

更新記錄: 就是重寫一遍,進行覆蓋,hbase沒有修改,都是追加

hbase依賴zookeeper

1、保存Hmaster的地址和backup-master地址

hmaster:

a)管理HregionServer

b)做增刪改查表的節點

c)管理HregionServer中的表分配

2、保存表-ROOT-的地址

hbase默認的根表,檢索表。

3、HRegionServer列表

表的增刪改查數據。

和hdfs交互,存取數據。

hbase開發

6.1.配置

HBaseConfiguration

包:org.apache.hadoop.hbase.HBaseConfiguration

作用:通過此類可以對HBase進行配置

用法實例:

Configuration config = HBaseConfiguration.create();

說明: HBaseConfiguration.create() 默認會從classpath 中查找 hbase-site.xml 中的配置信息,初始化 Configuration。

使用方法:

static Configuration config = null;

static {

config = HBaseConfiguration.create();

config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");

config.set("hbase.zookeeper.property.clientPort", "2181");

}

123456

6.2.表管理類

HBaseAdmin

包:org.apache.hadoop.hbase.client.HBaseAdmin

作用:提供接口關係HBase 數據庫中的表信息

用法:

HBaseAdmin admin = new HBaseAdmin(config);

1

6.3.表描述類

HTableDescriptor

包:org.apache.hadoop.hbase.HTableDescriptor

作用:HTableDescriptor 類包含了表的名字以及表的列族信息

表的schema(設計)

用法:

HTableDescriptor htd =new HTableDescriptor(tablename);

htd.addFamily(new HColumnDescriptor(“myFamily”));

12

6.4.列族的描述類

HColumnDescriptor

包:org.apache.hadoop.hbase.HColumnDescriptor

作用:HColumnDescriptor 維護列族的信息

用法:

htd.addFamily(new HColumnDescriptor(“myFamily”));

6.5.創建表的操作

CreateTable(一般我們用shell創建表)

static Configuration config = null;

static {

config = HBaseConfiguration.create();

config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");

config.set("hbase.zookeeper.property.clientPort", "2181");

}

HBaseAdmin admin = new HBaseAdmin(config);

HTableDescriptor desc = new HTableDescriptor(tableName);

HColumnDescriptor family1 = new HColumnDescriptor(“f1”);

HColumnDescriptor family2 = new HColumnDescriptor(“f2”);

desc.addFamily(family1);

desc.addFamily(family2);

admin.createTable(desc);

1234567891011121314

6.6.刪除表

HBaseAdmin admin = new HBaseAdmin(config);

admin.disableTable(tableName);

admin.deleteTable(tableName);

123

6.7.創建一個表的類

HTable

包:org.apache.hadoop.hbase.client.HTable

作用:HTable 和 HBase 的表通信

用法:

// 普通獲取表

HTable table = new HTable(config,Bytes.toBytes(tablename);

// 通過連接池獲取表

Connection connection = ConnectionFactory.createConnection(config);

HTableInterface table = connection.getTable(TableName.valueOf("user"));

12345

6.8.單條插入數據

Put

包:org.apache.hadoop.hbase.client.Put

作用:插入數據

用法:

Put put = new Put(row);

p.add(family,qualifier,value);

12

說明:向表 tablename 添加 “family,qualifier,value”指定的值。

示例代碼:

Connection connection = ConnectionFactory.createConnection(config);

HTableInterface table = connection.getTable(TableName.valueOf("user"));

Put put = new Put(Bytes.toBytes(rowKey));

put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier),Bytes.toBytes(value));

table.put(put);

12345

6.9.批量插入

批量插入

List list = new ArrayList();

Put put = new Put(Bytes.toBytes(rowKey));//獲取put,用於插入

put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier),Bytes.toBytes(value));//封裝信息

list.add(put);

table.put(list);//添加記錄

12345

6.10.刪除數據

Delete

包:org.apache.hadoop.hbase.client.Delete

作用:刪除給定rowkey的數據

用法:

Delete del= new Delete(Bytes.toBytes(rowKey));

table.delete(del);

代碼實例

Connection connection = ConnectionFactory.createConnection(config);

HTableInterface table = connection.getTable(TableName.valueOf("user"));

Delete del= new Delete(Bytes.toBytes(rowKey));

table.delete(del);

1234

6.11.單條查詢

Get

包:org.apache.hadoop.hbase.client.Get

作用:獲取單個行的數據

用法:

HTable table = new HTable(config,Bytes.toBytes(tablename));

Get get = new Get(Bytes.toBytes(row));

Result result = table.get(get);

123

說明:獲取 tablename 表中 row 行的對應數據

代碼示例:

Connection connection = ConnectionFactory.createConnection(config);

HTableInterface table = connection.getTable(TableName.valueOf("user"));

Get get = new Get(rowKey.getBytes());

Result row = table.get(get);

for (KeyValue kv : row.raw()) {

System.out.print(new String(kv.getRow()) + " ");

System.out.print(new String(kv.getFamily()) + ":");

System.out.print(new String(kv.getQualifier()) + " = ");

System.out.print(new String(kv.getValue()));

System.out.print(" timestamp = " + kv.getTimestamp() + "\n");

}

1234567891011

6.12.批量查詢

ResultScanner

包:org.apache.hadoop.hbase.client.ResultScanner

作用:獲取值的接口

用法:

ResultScanner scanner = table.getScanner(scan);

For(Result rowResult : scanner){

Bytes[] str = rowResult.getValue(family,column);

}

說明:循環獲取行中列值。

代碼示例:

Connection connection = ConnectionFactory.createConnection(config);

HTableInterface table = connection.getTable(TableName.valueOf("user"));

Scan scan = new Scan();

scan.setStartRow("a1".getBytes());

scan.setStopRow("a20".getBytes());

ResultScanner scanner = table.getScanner(scan);

for (Result row : scanner) {

System.out.println("\nRowkey: " + new String(row.getRow()));

for (KeyValue kv : row.raw()) {

System.out.print(new String(kv.getRow()) + " ");

System.out.print(new String(kv.getFamily()) + ":");

System.out.print(new String(kv.getQualifier()) + " = ");

System.out.print(new String(kv.getValue()));

System.out.print(" timestamp = " + kv.getTimestamp() + "\n");

}

}

12345678910111213141516

6.13.hbase過濾器

6.13.1.FilterList

FilterList 代表一個過濾器列表,可以添加多個過濾器進行查詢,多個過濾器之間的關係有:

與關係(符合所有):FilterList.Operator.MUST_PASS_ALL

或關係(符合任一):FilterList.Operator.MUST_PASS_ONE

使用方法:

FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);

Scan s1 = new Scan();

filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(“f1”), Bytes.toBytes(“c1”), CompareOp.EQUAL,Bytes.toBytes(“v1”) ) );

filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(“f1”), Bytes.toBytes(“c2”), CompareOp.EQUAL,Bytes.toBytes(“v2”) ) );

// 添加下面這一行後,則只返回指定的cell,同一行中的其他cell不返回

s1.addColumn(Bytes.toBytes(“f1”), Bytes.toBytes(“c1”));

s1.setFilter(filterList); //設置filter

ResultScanner ResultScannerFilterList = table.getScanner(s1); //返回結果列表

12345678

6.13.2.過濾器的種類

過濾器的種類:

列植過濾器—SingleColumnValueFilter

過濾列植的相等、不等、範圍等

列名前綴過濾器—ColumnPrefixFilter

過濾指定前綴的列名

多個列名前綴過濾器—MultipleColumnPrefixFilter

過濾多個指定前綴的列名

rowKey過濾器—RowFilter

通過正則,過濾rowKey值。

6.13.3.列植過濾器—SingleColumnValueFilter

SingleColumnValueFilter 列值判斷

相等 (CompareOp.EQUAL ),

不等(CompareOp.NOT_EQUAL),

範圍 (e.g., CompareOp.GREATER)…………

下面示例檢查列值和字符串’values’ 相等…

SingleColumnValueFilter f = new SingleColumnValueFilter(

Bytes.toBytes(“cFamily”) Bytes.toBytes(“column”), CompareFilter.CompareOp.EQUAL,

Bytes.toBytes(“values”));

s1.setFilter(f);

注意:如果過濾器過濾的列在數據表中有的行中不存在,那麼這個過濾器對此行無法過濾。

6.13.4.列名前綴過濾器—ColumnPrefixFilter

過濾器—ColumnPrefixFilter

ColumnPrefixFilter 用於指定列名前綴值相等

ColumnPrefixFilter f = new ColumnPrefixFilter(Bytes.toBytes(“values”));

s1.setFilter(f);

6.13.5.多個列值前綴過濾器—MultipleColumnPrefixFilter

MultipleColumnPrefixFilter 和 ColumnPrefixFilter 行為差不多,但可以指定多個前綴

byte[][] prefixes = new byte[][] {Bytes.toBytes(“value1”),Bytes.toBytes(“value2”)};

Filter f = new MultipleColumnPrefixFilter(prefixes);

s1.setFilter(f);

6.13.6.rowKey過濾器—RowFilter

RowFilter 是rowkey過濾器

通常根據rowkey來指定範圍時,使用scan掃描器的StartRow和StopRow方法比較好。

Filter f = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^1234")); //匹配以1234開頭的rowkey

s1.setFilter(f);

7.hbase原理

7.1.體系圖

Hbase原理與使用

7.1.1.寫流程

1、client向hregionserver發送寫請求。

2、hregionserver將數據寫到hlog(write ahead log)。為了數據的持久化和恢復。

3、hregionserver將數據寫到內存(memstore)

4、反饋client寫成功。

7.1.2.數據flush過程

1、當memstore數據達到閾值(默認是64M),將數據刷到硬盤,將內存中的數據刪除,同時刪除Hlog中的歷史數據。

2、並將數據存儲到hdfs中。

3、在hlog中做標記點。

7.1.3.數據合併過程

1、當數據塊達到4塊,hmaster將數據塊加載到本地,進行合併

2、當合並的數據超過256M,進行拆分,將拆分後的region分配給不同的hregionserver管理

3、當hregionser宕機後,將hregionserver上的hlog拆分,然後分配給不同的hregionserver加載,修改.META.

4、注意:hlog會同步到hdfs

7.1.4.hbase的讀流程

1、通過zookeeper和-ROOT- .META.表定位hregionserver。

2、數據從內存和硬盤合併後返回給client

3、數據塊會緩存

7.1.5.hmaster的職責

1、管理用戶對Table的增、刪、改、查操作;

2、記錄region在哪臺Hregion server上

3、在Region Split後,負責新Region的分配;

4、新機器加入時,管理HRegion Server的負載均衡,調整Region分佈

5、在HRegion Server宕機後,負責失效HRegion Server 上的Regions遷移。

7.1.6.hregionserver的職責

HRegion Server主要負責響應用戶I/O請求,向HDFS文件系統中讀寫數據,是HBASE中最核心的模塊。

HRegion Server管理了很多table的分區,也就是region。

7.1.7.client職責

Client

HBASE Client使用HBASE的RPC機制與HMaster和RegionServer進行通信

管理類操作:Client與HMaster進行RPC;

數據讀寫類操作:Client與HRegionServer進行RPC。

8.MapReduce操作Hbase

8.1.實現方法

Hbase對MapReduce提供支持,它實現了TableMapper類和TableReducer類,我們只需要繼承這兩個類即可。

1、寫個mapper繼承TableMapper

參數:Text:mapper的輸出key類型; IntWritable:mapper的輸出value類型。

其中的map方法如下:

map(ImmutableBytesWritable key, Result value,Context context)

參數:key:rowKey;value: Result ,一行數據; context上下文

2、寫個reduce繼承TableReducer

參數:Text:reducer的輸入key; IntWritable:reduce的輸入value;

ImmutableBytesWritable:reduce輸出到hbase中的rowKey類型。

其中的reduce方法如下:

reduce(Text key, Iterable values,Context context)

參數: key:reduce的輸入key;values:reduce的輸入value;

8.2.準備表

1、建立數據來源表‘word’,包含一個列族‘content’

向表中添加數據,在列族中放入列‘info’,並將短文數據放入該列中,如此插入多行,行鍵為不同的數據即可

2、建立輸出表‘stat’,包含一個列族‘content’

3、通過Mr操作Hbase的‘word’表,對‘content:info’中的短文做詞頻統計,並將統計結果寫入‘stat’表的‘content:info中’,行鍵為單詞

8.3.實現

<code>package com.itcast.hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
/**
 * mapreduce操作hbase
 * @author wilson
 *
 */
public class HBaseMr {
	/**
	 * 創建hbase配置
	 */
	static Configuration config = null;
	static {
		config = HBaseConfiguration.create();
		config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");
		config.set("hbase.zookeeper.property.clientPort", "2181");
	}
	/**
	 * 表信息
	 */
	public static final String tableName = "word";//表名1
	public static final String colf = "content";//列族
	public static final String col = "info";//列
	public static final String tableName2 = "stat";//表名2
	/**
	 * 初始化表結構,及其數據
	 */
	public static void initTB() {
		HTable table=null;
		HBaseAdmin admin=null;
		try {
			admin = new HBaseAdmin(config);//創建表管理
			/*刪除表*/
			if (admin.tableExists(tableName)||admin.tableExists(tableName2)) {
				System.out.println("table is already exists!");
				admin.disableTable(tableName);
				admin.deleteTable(tableName);
				admin.disableTable(tableName2);
				admin.deleteTable(tableName2);
			}
			/*創建表*/
				HTableDescriptor desc = new HTableDescriptor(tableName);
				HColumnDescriptor family = new HColumnDescriptor(colf);
				desc.addFamily(family);
				admin.createTable(desc);
				HTableDescriptor desc2 = new HTableDescriptor(tableName2);
				HColumnDescriptor family2 = new HColumnDescriptor(colf);
				desc2.addFamily(family2);
				admin.createTable(desc2);
			/*插入數據*/
				table = new HTable(config,tableName);
				table.setAutoFlush(false);
				table.setWriteBufferSize(5);
				List lp = new ArrayList();
				Put p1 = new Put(Bytes.toBytes("1"));
				p1.add(colf.getBytes(), col.getBytes(),	("The Apache Hadoop software library is a framework").getBytes());
				lp.add(p1);
				Put p2 = new Put(Bytes.toBytes("2"));p2.add(colf.getBytes(),col.getBytes(),("The common utilities that support the other Hadoop modules").getBytes());
				lp.add(p2);
				Put p3 = new Put(Bytes.toBytes("3"));
				p3.add(colf.getBytes(), col.getBytes(),("Hadoop by reading the documentation").getBytes());
				lp.add(p3);
				Put p4 = new Put(Bytes.toBytes("4"));
				p4.add(colf.getBytes(), col.getBytes(),("Hadoop from the release page").getBytes());
				lp.add(p4);
				Put p5 = new Put(Bytes.toBytes("5"));
				p5.add(colf.getBytes(), col.getBytes(),("Hadoop on the mailing list").getBytes());
				lp.add(p5);
				table.put(lp);
				table.flushCommits();
				lp.clear();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if(table!=null){
					table.close();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	/**
	 * MyMapper 繼承 TableMapper
	 * TableMapper 
	 * Text:輸出的key類型,
	 * IntWritable:輸出的value類型
	 */
	public static class MyMapper extends TableMapper {
		private static IntWritable one = new IntWritable(1);
		private static Text word = new Text();
		@Override
		//輸入的類型為:key:rowKey; value:一行數據的結果集Result
		protected void map(ImmutableBytesWritable key, Result value,
				Context context) throws IOException, InterruptedException {
			//獲取一行數據中的colf:col
			String words = Bytes.toString(value.getValue(Bytes.toBytes(colf), Bytes.toBytes(col)));// 表裡面只有一個列族,所以我就直接獲取每一行的值
			//按空格分割
			String itr[] = words.toString().split(" ");
			//循環輸出word和1
			for (int i = 0; i < itr.length; i++) {
				word.set(itr[i]);
				context.write(word, one);
			}
		}
	}
	/**
	 * MyReducer 繼承 TableReducer
	 * TableReducer 
	 * Text:輸入的key類型,
	 * IntWritable:輸入的value類型,
	 * ImmutableBytesWritable:輸出類型,表示rowkey的類型
	 */
	public static class MyReducer extends
			TableReducer {
		@Override
		protected void reduce(Text key, Iterable values,
				Context context) throws IOException, InterruptedException {
			//對mapper的數據求和
			int sum = 0;
			for (IntWritable val : values) {//疊加
				sum += val.get();
			}
			// 創建put,設置rowkey為單詞
			Put put = new Put(Bytes.toBytes(key.toString()));
			// 封裝數據
			put.add(Bytes.toBytes(colf), Bytes.toBytes(col),Bytes.toBytes(String.valueOf(sum)));
			//寫到hbase,需要指定rowkey、put
			context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put);
		}
	}
	
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		config.set("df.default.name", "hdfs://master:9000/");//設置hdfs的默認路徑
		config.set("hadoop.job.ugi", "hadoop,hadoop");//用戶名,組
		config.set("mapred.job.tracker", "master:9001");//設置jobtracker在哪
		//初始化表
		initTB();//初始化表
		//創建job
		Job job = new Job(config, "HBaseMr");//job
		job.setJarByClass(HBaseMr.class);//主類
		//創建scan
		Scan scan = new Scan();
		//可以指定查詢某一列
		scan.addColumn(Bytes.toBytes(colf), Bytes.toBytes(col));
		//創建查詢hbase的mapper,設置表名、scan、mapper類、mapper的輸出key、mapper的輸出value
		TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,Text.class, IntWritable.class, job);
		//創建寫入hbase的reducer,指定表名、reducer類、job
		TableMapReduceUtil.initTableReducerJob(tableName2, MyReducer.class, job);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
/<code>


分享到:


相關文章: