大數據乾貨丨激發RDD從HBase中讀取、寫入和刪除


大數據乾貨丨激發RDD從HBase中讀取、寫入和刪除


目錄

  • 安裝
  • 初步
  • 關於類型的註記
  • 讀HBASE
  • 寫信給HBASE
  • 從HBASE刪除
  • 大量裝載到HBASE
  • 示例項目
  • 安裝

    本指南假設您正在使用SBT。使用類似的工具,如Maven或Leiningen,也應該有細微的區別。

    HBASE RDD可以作為SBT中的依賴項添加,其中:

    <code>dependencies += "eu.unicredit" %% "hbase-rdd" % "0.9.0"/<code>

    目前,該項目依賴於以下工件:

    <code>"org.apache.spark" %% "spark-core" % "2.4.3" % "provided","org.apache.hbase" % "hbase-common" % "2.1.0-cdh6.2.0" % "provided","org.apache.hbase" % "hbase-mapreduce" % "2.1.0-cdh6.2.0" % "provided","org.apache.hbase" % "hbase-server" % "2.1.0-cdh6.2.0" % "provided",/<code>

    所有依賴項都顯示為provided範圍,所以您必須要麼在項目中擁有這些依賴項,要麼在集群中具有本地可用的相應工件。其中大多數都可以在Cloudera存儲庫中獲得,您可以在下面的行中添加這些存儲庫:

    <code>resolvers ++= Seq(  "Cloudera repos" at "https://repository.cloudera.com/artifactory/cloudera-repos",  "Cloudera releases" at "https://repository.cloudera.com/artifactory/libs-release")/<code>

    使用

    初步

    首先,添加以下導入以獲得必要的關聯:

    <code>import unicredit.spark.hbase._/<code>

    然後,您必須給出連接到HBASE的配置參數。這是通過提供unicredit.spark.hbase.HBaseConfig...這可以通過幾種方式來實現,即增加普遍性。

    帶著hbase-site.xml

    如果你碰巧在類路徑上hbase-site.xml使用正確的配置參數,您可以只需

    <code>implicit val config = HBaseConfig()/<code>

    否則,您將不得不以編程方式配置HBASE RDD。

    有一個案例類

    最簡單的方法是擁有一個包含兩個字符串成員的case類。quorum和rootdir...然後,下面這樣的東西就能工作了

    <code>case class Config(  quorum: String,  rootdir: String,  ... // Possibly other parameters)val c = Config(...)implicit val config = HBaseConfig(c)/<code>

    帶著地圖

    為了自定義更多的參數,可以提供(String, String),就像

    <code>implicit val config = HBaseConfig(  "hbase.rootdir" -> "...",  "hbase.zookeeper.quorum" -> "...",  ...)/<code>

    具有Hadoop配置對象

    最後,可以從現有的org.apache.hadoop.conf.Configuration

    <code>val conf: Configuration = ...implicit val config = HBaseConfig(conf)/<code>

    關於類型的註記

    在HBASE中,每個數據,包括表和列名,都存儲為Array[Byte]...為了簡單起見,我們假設所有表、列和列的姓氏實際上都是字符串。

    另一方面,單元格的內容可以具有任何類型,這些類型可以轉換為Array[Byte]...為了做到這一點,我們定義了兩個特徵unicredit.spark.hbase:

    <code>trait Reads[A] { def read(data: Array[Byte]): A }trait Writes[A] { def write(data: A): Array[Byte] }/<code>

    讀取類型的方法。A從HBASE將需要一個隱式Reads[A]在作用域上,以及寫入HBASE的對稱方法都需要隱式的。Writes[A].

    默認情況下,我們為String, org.json4s.JValue而那些微不足道的Array[Byte].

    讀HBASE

    一些方法被添加到SparkContext為了讀到HBASE。

    如果您知道要讀取哪些列,則可以使用sc.hbase()...假設列cf1:col1, cf1:col2和cf2:col3表中t1,並將內容序列化為utf-8字符串,則可以這樣做。

    <code>val table = "t1"val columns = Map(  "cf1" -> Set("col1", "col2"),  "cf2" -> Set("col3"))val rdd = sc.hbase[String](table, columns)/<code>

    總體而言,sc.hbase[K, Q, V]具有表示單元格的行鍵、限定符和內容類型的類型參數,並返回RDD[(K, Map[String, Map[Q, V]])]...結果RDD的每個元素都是一個鍵/值對,其中鍵是來自HBASE的行鍵,值是一個嵌套映射,它將列族和列與值關聯起來。缺少的列將從映射中省略,因此,例如,可以將上面的列投影到col2列做類似的事情

    <code>rdd.flatMap({ case (k, v) =>  v("cf1") get "col2" map { col =>    k -> col  }  // or equivalently  // Try(k -> v("cf1")("col2")).toOption})/<code>

    你也可以用sc.hbase[K, V](table, columns)其中只有行鍵和值的類型參數是顯式的,而限定符是字符串,或者sc.hbase[V](table, columns),其中行鍵和限定符的類型是字符串。這些選擇適用於所有sc.hbase()和sc.hbaseTS()方法。

    第二種可能是獲得整個列族。如果您事先不知道哪個是列名,這將是有用的。您可以使用方法來完成此操作。sc.hbase[A],就像

    <code>val table = "t1"val families = Set("cf1", "cf2")val rdd = sc.hbase[String](table, families)/<code>

    輸出,就像sc.hbase[A],是一個RDD[(String, Map[String, Map[String, A]])].

    如果您需要同時讀取時間戳,則可以在這兩種情況下使用。sc.hbaseTS[K, Q, V]並獲得一個RDD[(K, Map[String, Map[Q, (V, Long)]])]...生成的RDD的每個元素都是一個鍵/值對,其中鍵是來自HBASE的行鍵,值是一個嵌套映射,它將列族和列關聯到元組(value,時間戳)。

    最後,有一個較低級別的原始訪問權限。org.apache.hadoop.hbase.client.Result實例。為了這個,就這麼做

    <code>val table = "t1"val rdd = sc.hbase[K](table)/<code>

    的返回值sc.hbase是RDD[(K, Result)]...第一個元素是行鍵,第二個元素是org.apache.hadoop.hbase.client.Result,所以您可以使用原始的HBASE API來查詢它。

    還通過提供自定義過濾器或掃描對象來支持HBASE側過濾器:

    <code>val filter = new PrefixFilter(Bytes.toBytes("abc"))val table = "t1"val families = Set("cf1", "cf2")val rdd = sc.hbase[String](table, families, filter)/<code>

    寫信給HBASE

    為了寫入HBASE,在某些類型的RDD上添加了一些方法。

    第一個是與你從HBASE讀取的方式平行的。假設你有一個RDD[(K, Map[String, Map[Q, V]])]還有Writes[K], Writes[Q],和Writes[V]在範圍內。然後,您可以使用該方法寫入HBASE。toHBase,就像

    <code>val table = "t1"val rdd: RDD[(K, Map[String, Map[Q, V]])] = ...rdd.toHBase(table)/<code>

    在只需在單個列系列上編寫的情況下,可以使用簡化的表單。然後,一個類似的方法可以在RDD[(K, Map[Q, V])],如下所示

    <code>val table = "t1"val cf = "cf1"val rdd: RDD[(K, Map[Q, V])] = ...rdd.toHBase(table, cf)/<code>

    或者,如果您有一組固定的列,如

    <code>val table = "t1"val cf = "cf1"val headers: Seq[Q] = ...val rdd: RDD[(K, Seq[V])] = ...rdd.toHBase(table, cf, headers)/<code>

    哪裡headers的列名Seq[V]價值。

    如果需要編寫時間戳,可以在RDD中使用元組(V,Long),其中第二個元素表示時間戳,如

    <code>val rdd: RDD[(K, Map[String, Map[Q, (V, Long)]])] = .../<code>

    或者,對於簡化的形式,比如

    <code>val rdd: RDD[(K, Map[Q, (V, Long)])] = .../<code>

    或者,用一組固定的列

    <code>val rdd: RDD[(K, Seq[(V, Long)])] = .../<code>

    你可以看看WriteTsvToHBase.scala在……裡面HBASE-RDD-示例項目如何從Hdfs到HBase

    從HBASE刪除

    為了從HBASE中刪除,在某些類型的RDD上添加了一些方法。

    假設你有一個RDD[(K, Map[String, Set[Q])]行鍵和家庭/一組列的映射。然後可以使用方法從HBASE中刪除deleteHBase,就像

    <code>val table = "t1"val rdd: RDD[(K, Map[String, Set[Q])] = ...rdd.deleteHBase(table)/<code>

    在只需從單個列家族中刪除的情況下,可以使用簡化的表單。然後,一個類似的方法可以在RDD[(K, Set[Q])]行鍵和一組列,如下所示

    <code>val table = "t1"val cf = "cf1"val rdd: RDD[(K, Set[Q])] = ...rdd.deleteHBase(table, cf)/<code>

    或者,如果要刪除一個列族、整個列族或整行的固定列集,則可以在RDD[K]中的行鍵,可以如下所示

    <code>val table = "t1"val cf = "cf1"val headers: Set[Q] = ...val rdd: RDD[K] = ...rdd.deleteHBase(table, cf, headers)/<code>

    <code>val cfs = Set("cf1", "cf2")rdd.deleteHBase(table, cfs)/<code>

    <code>rdd.deleteHBase(table)/<code>

    如果需要使用時間戳刪除,可以在rdd中使用元組(String,long),其中第一個元素是列,第二個元素表示時間戳,如下所示

    <code>val rdd: RDD[(K, Map[String, Set[(Q, Long)]])] = .../<code>

    或者,對於簡化的形式,比如

    <code>val rdd: RDD[(K, Set[(Q, Long)])] = .../<code>

    使用HFiles將批量加載到HBASE

    在大量寫入HBASE的情況下,直接將對象寫入表中可能效率低下,並可能導致HBASE沒有響應(例如,它會觸發區域分裂)。更好的方法是創建HFiles,而不是調用LoadIncrementalHFiles作業將它們移動到HBASE的文件系統。不幸的是,這種方法相當繁瑣,因為它意味著以下步驟:

    1. 確保該表存在並具有區域分割,以便將行均勻地分配到區域中(以獲得更好的性能)。
    2. 使用HFileOutputFormat2輸出格式,實現並執行映射(並減少)作業,將有序的PUT或KeyValue對象寫入HFile文件。還原階段是通過調用HFileOutputFormat2.figreIncrementalLoad在幕後配置的。
    3. 執行LoadIncrementalHFiles作業,將HFile文件移動到HBASE的文件系統。
    4. 清理臨時文件和文件夾

    現在,您可以通過調用toHBaseBulk,就像

    <code>val table = "t1"val rdd: RDD[(K, Map[String, Map[Q, V]])] = ...rdd.toHBaseBulk(table)/<code>

    在只需在單個列系列上寫入的情況下,可以使用簡化的表單。

    <code>val table = "t1"val cf = "cf1"val rdd: RDD[(K, Map[Q, V])] = ...rdd.toHBaseBulk(table, cf)/<code>

    或者,如果您有一組固定的列,如

    <code>val table = "t1"val cf = "cf1"val headers: Seq[Q] = ...val rdd: RDD[(K, Seq[V])] = ...rdd.toHBaseBulk(table, cf, headers)/<code>

    哪裡headers的列名Seq[A]價值。

    如果需要編寫時間戳,可以使用元組。(A, Long)在你的RDD,其中第二個元素表示時間戳,如

    <code>val rdd: RDD[(K, Map[String, Map[Q, (V, Long)]])] = .../<code>

    或者,對於簡化的形式,比如

    <code>val rdd: RDD[(K, Map[Q, (V, Long)])] = .../<code>

    或者,如果是一組固定的列,如

    <code>val rdd: RDD[(K, Seq[(V, Long)])] = .../<code>

    但是第一步呢?為此,一個Admin對象使用一些幫助方法來挽救。必須通過實例打開到HBASE的連接(如1.0.0版以來所要求的那樣)

    <code>  val admin = Admin()/<code>

    然後

    • admin.tableExists(tableName: String, family: String)*檢查表是否存在,並相應地返回true或false。如果表tableName存在但列族family不,一個IllegalArgumentException拋出
    • admin.tableExists(tableName: String, families: Set[String])*檢查表是否存在,並相應地返回true或false。如果表tableName存在但至少有一個families不,一個IllegalArgumentException拋出
    • admin.snapshot(tableName: String)*創建表的快照tableName,命名<tablename>_yyyyMMddHHmmss(後綴是快照操作的日期和時間)/<tablename>
    • admin.snapshot(tableName: String, snapshotName: String)*創建表的快照tableName,命名為“快照名稱”。
    • admin.createTable(tableName: String, family: String, splitKeys: Seq[K])*創建一個表tableName列族family和由已排序的拆分鍵序列定義的區域。splitKeys
    • admin.createTable(tableName: String, families: Set[String], splitKeys: Seq[K])*創建一個表tableName列族families和由已排序的拆分鍵序列定義的區域。splitKeys
    • admin.createTable(tableName: String, families: Set[String])*創建一個表tableName列族families
    • admin.createTable(tableName: String, families: String*)*創建一個表tableName列族families
    • admin.disableTable(tableName: String)*禁用表tableName(刪除前必須禁用表)
    • admin.deleteTable(tableName: String)*刪除表tableName
    • admin.truncateTable(tableName: String, preserveSplits: Boolean)截斷表tableName,可選擇地保持區域分裂。
    • admin.computeSplits(rdd: RDD[K], regionsCount: Int)*給予RDD鍵和所需區域的數目(regionsCount),返回一個已排序的拆分鍵序列,用於createTable()

    最後,必須關閉與HBASE的連接。

    <code>admin.close/<code>

    你可以看看ImportTsvToHFiles.scala在……裡面HBASE-RDD-示例項目如何從Hdfs到HBase

    設置每個家庭每個區域的HFiles數

    為了獲得最佳的性能,HBASE應該使用每個家庭每個區域1 HFile。另一方面,您使用的HFiles越多,SPark作業中的分區就越多,因此SPark任務運行得更快,佔用的內存堆也更少。您可以通過將額外的可選參數傳遞給toHBaseBulk()方法,numFilesPerRegionPerFamily=其中N(默認值為1)是介於1和hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily參數(默認值為32)。

    <code>rdd.toHBaseBulk(table, numFilesPerRegionPerFamily=32)/<code>

    <code>rdd.toHBaseBulk(table, cf, numFilesPerRegionPerFamily=32)/<code>

    <code>rdd.toHBaseBulk(table, cf, headers, numFilesPerRegionPerFamily=32)/<code>


    分享到:


    相關文章: