目錄
安裝
本指南假設您正在使用SBT。使用類似的工具,如Maven或Leiningen,也應該有細微的區別。
HBASE RDD可以作為SBT中的依賴項添加,其中:
<code>dependencies += "eu.unicredit" %% "hbase-rdd" % "0.9.0"/<code>
目前,該項目依賴於以下工件:
<code>"org.apache.spark" %% "spark-core" % "2.4.3" % "provided","org.apache.hbase" % "hbase-common" % "2.1.0-cdh6.2.0" % "provided","org.apache.hbase" % "hbase-mapreduce" % "2.1.0-cdh6.2.0" % "provided","org.apache.hbase" % "hbase-server" % "2.1.0-cdh6.2.0" % "provided",/<code>
所有依賴項都顯示為provided範圍,所以您必須要麼在項目中擁有這些依賴項,要麼在集群中具有本地可用的相應工件。其中大多數都可以在Cloudera存儲庫中獲得,您可以在下面的行中添加這些存儲庫:
<code>resolvers ++= Seq( "Cloudera repos" at "https://repository.cloudera.com/artifactory/cloudera-repos", "Cloudera releases" at "https://repository.cloudera.com/artifactory/libs-release")/<code>
使用
初步
首先,添加以下導入以獲得必要的關聯:
<code>import unicredit.spark.hbase._/<code>
然後,您必須給出連接到HBASE的配置參數。這是通過提供unicredit.spark.hbase.HBaseConfig...這可以通過幾種方式來實現,即增加普遍性。
帶著hbase-site.xml
如果你碰巧在類路徑上hbase-site.xml使用正確的配置參數,您可以只需
<code>implicit val config = HBaseConfig()/<code>
否則,您將不得不以編程方式配置HBASE RDD。
有一個案例類
最簡單的方法是擁有一個包含兩個字符串成員的case類。quorum和rootdir...然後,下面這樣的東西就能工作了
<code>case class Config( quorum: String, rootdir: String, ... // Possibly other parameters)val c = Config(...)implicit val config = HBaseConfig(c)/<code>
帶著地圖
為了自定義更多的參數,可以提供(String, String),就像
<code>implicit val config = HBaseConfig( "hbase.rootdir" -> "...", "hbase.zookeeper.quorum" -> "...", ...)/<code>
具有Hadoop配置對象
最後,可以從現有的org.apache.hadoop.conf.Configuration
<code>val conf: Configuration = ...implicit val config = HBaseConfig(conf)/<code>
關於類型的註記
在HBASE中,每個數據,包括表和列名,都存儲為Array[Byte]...為了簡單起見,我們假設所有表、列和列的姓氏實際上都是字符串。
另一方面,單元格的內容可以具有任何類型,這些類型可以轉換為Array[Byte]...為了做到這一點,我們定義了兩個特徵unicredit.spark.hbase:
<code>trait Reads[A] { def read(data: Array[Byte]): A }trait Writes[A] { def write(data: A): Array[Byte] }/<code>
讀取類型的方法。A從HBASE將需要一個隱式Reads[A]在作用域上,以及寫入HBASE的對稱方法都需要隱式的。Writes[A].
默認情況下,我們為String, org.json4s.JValue而那些微不足道的Array[Byte].
讀HBASE
一些方法被添加到SparkContext為了讀到HBASE。
如果您知道要讀取哪些列,則可以使用sc.hbase()...假設列cf1:col1, cf1:col2和cf2:col3表中t1,並將內容序列化為utf-8字符串,則可以這樣做。
<code>val table = "t1"val columns = Map( "cf1" -> Set("col1", "col2"), "cf2" -> Set("col3"))val rdd = sc.hbase[String](table, columns)/<code>
總體而言,sc.hbase[K, Q, V]具有表示單元格的行鍵、限定符和內容類型的類型參數,並返回RDD[(K, Map[String, Map[Q, V]])]...結果RDD的每個元素都是一個鍵/值對,其中鍵是來自HBASE的行鍵,值是一個嵌套映射,它將列族和列與值關聯起來。缺少的列將從映射中省略,因此,例如,可以將上面的列投影到col2列做類似的事情
<code>rdd.flatMap({ case (k, v) => v("cf1") get "col2" map { col => k -> col } // or equivalently // Try(k -> v("cf1")("col2")).toOption})/<code>
你也可以用sc.hbase[K, V](table, columns)其中只有行鍵和值的類型參數是顯式的,而限定符是字符串,或者sc.hbase[V](table, columns),其中行鍵和限定符的類型是字符串。這些選擇適用於所有sc.hbase()和sc.hbaseTS()方法。
第二種可能是獲得整個列族。如果您事先不知道哪個是列名,這將是有用的。您可以使用方法來完成此操作。sc.hbase[A],就像
<code>val table = "t1"val families = Set("cf1", "cf2")val rdd = sc.hbase[String](table, families)/<code>
輸出,就像sc.hbase[A],是一個RDD[(String, Map[String, Map[String, A]])].
如果您需要同時讀取時間戳,則可以在這兩種情況下使用。sc.hbaseTS[K, Q, V]並獲得一個RDD[(K, Map[String, Map[Q, (V, Long)]])]...生成的RDD的每個元素都是一個鍵/值對,其中鍵是來自HBASE的行鍵,值是一個嵌套映射,它將列族和列關聯到元組(value,時間戳)。
最後,有一個較低級別的原始訪問權限。org.apache.hadoop.hbase.client.Result實例。為了這個,就這麼做
<code>val table = "t1"val rdd = sc.hbase[K](table)/<code>
的返回值sc.hbase是RDD[(K, Result)]...第一個元素是行鍵,第二個元素是org.apache.hadoop.hbase.client.Result,所以您可以使用原始的HBASE API來查詢它。
還通過提供自定義過濾器或掃描對象來支持HBASE側過濾器:
<code>val filter = new PrefixFilter(Bytes.toBytes("abc"))val table = "t1"val families = Set("cf1", "cf2")val rdd = sc.hbase[String](table, families, filter)/<code>
寫信給HBASE
為了寫入HBASE,在某些類型的RDD上添加了一些方法。
第一個是與你從HBASE讀取的方式平行的。假設你有一個RDD[(K, Map[String, Map[Q, V]])]還有Writes[K], Writes[Q],和Writes[V]在範圍內。然後,您可以使用該方法寫入HBASE。toHBase,就像
<code>val table = "t1"val rdd: RDD[(K, Map[String, Map[Q, V]])] = ...rdd.toHBase(table)/<code>
在只需在單個列系列上編寫的情況下,可以使用簡化的表單。然後,一個類似的方法可以在RDD[(K, Map[Q, V])],如下所示
<code>val table = "t1"val cf = "cf1"val rdd: RDD[(K, Map[Q, V])] = ...rdd.toHBase(table, cf)/<code>
或者,如果您有一組固定的列,如
<code>val table = "t1"val cf = "cf1"val headers: Seq[Q] = ...val rdd: RDD[(K, Seq[V])] = ...rdd.toHBase(table, cf, headers)/<code>
哪裡headers的列名Seq[V]價值。
如果需要編寫時間戳,可以在RDD中使用元組(V,Long),其中第二個元素表示時間戳,如
<code>val rdd: RDD[(K, Map[String, Map[Q, (V, Long)]])] = .../<code>
或者,對於簡化的形式,比如
<code>val rdd: RDD[(K, Map[Q, (V, Long)])] = .../<code>
或者,用一組固定的列
<code>val rdd: RDD[(K, Seq[(V, Long)])] = .../<code>
你可以看看WriteTsvToHBase.scala在……裡面HBASE-RDD-示例項目如何從Hdfs到HBase
從HBASE刪除
為了從HBASE中刪除,在某些類型的RDD上添加了一些方法。
假設你有一個RDD[(K, Map[String, Set[Q])]行鍵和家庭/一組列的映射。然後可以使用方法從HBASE中刪除deleteHBase,就像
<code>val table = "t1"val rdd: RDD[(K, Map[String, Set[Q])] = ...rdd.deleteHBase(table)/<code>
在只需從單個列家族中刪除的情況下,可以使用簡化的表單。然後,一個類似的方法可以在RDD[(K, Set[Q])]行鍵和一組列,如下所示
<code>val table = "t1"val cf = "cf1"val rdd: RDD[(K, Set[Q])] = ...rdd.deleteHBase(table, cf)/<code>
或者,如果要刪除一個列族、整個列族或整行的固定列集,則可以在RDD[K]中的行鍵,可以如下所示
<code>val table = "t1"val cf = "cf1"val headers: Set[Q] = ...val rdd: RDD[K] = ...rdd.deleteHBase(table, cf, headers)/<code>
或
<code>val cfs = Set("cf1", "cf2")rdd.deleteHBase(table, cfs)/<code>
或
<code>rdd.deleteHBase(table)/<code>
如果需要使用時間戳刪除,可以在rdd中使用元組(String,long),其中第一個元素是列,第二個元素表示時間戳,如下所示
<code>val rdd: RDD[(K, Map[String, Set[(Q, Long)]])] = .../<code>
或者,對於簡化的形式,比如
<code>val rdd: RDD[(K, Set[(Q, Long)])] = .../<code>
使用HFiles將批量加載到HBASE
在大量寫入HBASE的情況下,直接將對象寫入表中可能效率低下,並可能導致HBASE沒有響應(例如,它會觸發區域分裂)。更好的方法是創建HFiles,而不是調用LoadIncrementalHFiles作業將它們移動到HBASE的文件系統。不幸的是,這種方法相當繁瑣,因為它意味著以下步驟:
- 確保該表存在並具有區域分割,以便將行均勻地分配到區域中(以獲得更好的性能)。
- 使用HFileOutputFormat2輸出格式,實現並執行映射(並減少)作業,將有序的PUT或KeyValue對象寫入HFile文件。還原階段是通過調用HFileOutputFormat2.figreIncrementalLoad在幕後配置的。
- 執行LoadIncrementalHFiles作業,將HFile文件移動到HBASE的文件系統。
- 清理臨時文件和文件夾
現在,您可以通過調用toHBaseBulk,就像
<code>val table = "t1"val rdd: RDD[(K, Map[String, Map[Q, V]])] = ...rdd.toHBaseBulk(table)/<code>
在只需在單個列系列上寫入的情況下,可以使用簡化的表單。
<code>val table = "t1"val cf = "cf1"val rdd: RDD[(K, Map[Q, V])] = ...rdd.toHBaseBulk(table, cf)/<code>
或者,如果您有一組固定的列,如
<code>val table = "t1"val cf = "cf1"val headers: Seq[Q] = ...val rdd: RDD[(K, Seq[V])] = ...rdd.toHBaseBulk(table, cf, headers)/<code>
哪裡headers的列名Seq[A]價值。
如果需要編寫時間戳,可以使用元組。(A, Long)在你的RDD,其中第二個元素表示時間戳,如
<code>val rdd: RDD[(K, Map[String, Map[Q, (V, Long)]])] = .../<code>
或者,對於簡化的形式,比如
<code>val rdd: RDD[(K, Map[Q, (V, Long)])] = .../<code>
或者,如果是一組固定的列,如
<code>val rdd: RDD[(K, Seq[(V, Long)])] = .../<code>
但是第一步呢?為此,一個Admin對象使用一些幫助方法來挽救。必須通過實例打開到HBASE的連接(如1.0.0版以來所要求的那樣)
<code> val admin = Admin()/<code>
然後
- admin.tableExists(tableName: String, family: String)*檢查表是否存在,並相應地返回true或false。如果表tableName存在但列族family不,一個IllegalArgumentException拋出
- admin.tableExists(tableName: String, families: Set[String])*檢查表是否存在,並相應地返回true或false。如果表tableName存在但至少有一個families不,一個IllegalArgumentException拋出
- admin.snapshot(tableName: String)*創建表的快照tableName,命名<tablename>_yyyyMMddHHmmss(後綴是快照操作的日期和時間)/<tablename>
- admin.snapshot(tableName: String, snapshotName: String)*創建表的快照tableName,命名為“快照名稱”。
- admin.createTable(tableName: String, family: String, splitKeys: Seq[K])*創建一個表tableName列族family和由已排序的拆分鍵序列定義的區域。splitKeys
- admin.createTable(tableName: String, families: Set[String], splitKeys: Seq[K])*創建一個表tableName列族families和由已排序的拆分鍵序列定義的區域。splitKeys
- admin.createTable(tableName: String, families: Set[String])*創建一個表tableName列族families
- admin.createTable(tableName: String, families: String*)*創建一個表tableName列族families
- admin.disableTable(tableName: String)*禁用表tableName(刪除前必須禁用表)
- admin.deleteTable(tableName: String)*刪除表tableName
- admin.truncateTable(tableName: String, preserveSplits: Boolean)截斷表tableName,可選擇地保持區域分裂。
- admin.computeSplits(rdd: RDD[K], regionsCount: Int)*給予RDD鍵和所需區域的數目(regionsCount),返回一個已排序的拆分鍵序列,用於createTable()
最後,必須關閉與HBASE的連接。
<code>admin.close/<code>
你可以看看ImportTsvToHFiles.scala在……裡面HBASE-RDD-示例項目如何從Hdfs到HBase
設置每個家庭每個區域的HFiles數
為了獲得最佳的性能,HBASE應該使用每個家庭每個區域1 HFile。另一方面,您使用的HFiles越多,SPark作業中的分區就越多,因此SPark任務運行得更快,佔用的內存堆也更少。您可以通過將額外的可選參數傳遞給toHBaseBulk()方法,numFilesPerRegionPerFamily=
<code>rdd.toHBaseBulk(table, numFilesPerRegionPerFamily=32)/<code>
或
<code>rdd.toHBaseBulk(table, cf, numFilesPerRegionPerFamily=32)/<code>
或
<code>rdd.toHBaseBulk(table, cf, headers, numFilesPerRegionPerFamily=32)/<code>
閱讀更多 大數據全棧工程師 的文章