Delta Lake的使用

Delta Lake 的使用初步:

一:数据库是Oracle


二:将Oracle的数据导入到hdfs上,并且使用 Delta 的格式进行保存,需要支持合并、删除等。

步骤上:

  1. 首先需要一个spark的jdbc程序,连接数据库进行数据导入操作。spark.read.jdbc ,这个不需要赘述,这里的窍门是 加入 hint 的操作。hint在数据库中可以告诉 数据库 走 用户指定的索引。而在spark的jdbc的API上,看上去并没有什么地方可以加hint。
    之前做过尝试,要不在 column中加入hint。因为spark去读取数据库的数据,肯定是执行了数据库的sql的。
Delta Lake的使用

JDBCRDD

这里可以看到,是生成了 sqlText 这句sql进行去数据库执行,hint肯定是加在 ${columnList} 之前的,那要是我第一个column中加入hint是不是就可以了。
结果是差强人意,因为这样sql执行没有问题,但是spark的dataset中,column的校验是同步过的,怎么可能会有column的名称叫 /*index*/开头的字段。
后来看到可以对 options.tableQuery 做处理。


Delta Lake的使用

这里的tableOrQuery 就是可以处理成一句查询的sql。将表名改成一句查询的sql,而在sql中加入hint,这样就可以做到了。

<code>println(s"sql===> (select ${baseColumns} from ${lTable} where ${sql._1})t")
spark.read.jdbc(prop.getProperty("url"),
s"(select ${baseColumns} from ${lTable} where ${sql._1})t", prop)
.withColumn("pt", expr(s"date_format(${partitionKey},'yyyyMMdd')"))
.write.mode(SaveMode.Overwrite).partitionBy("pt").parquet(commonWritePath + "/" + lTable + "/z_index=" + sql._2)/<code>

这里,,我们可以在 ${baseColumns}中加入hint,例如

baseColumns=/*+index(GE_BALANCE_DETAIL IDX_BALANCE_DETAIL_BALANCE_DAT) */id,created_time,bill_code,balance_type_id,site_id,balance_confirm_money,settlement_center_id,other_site_id,balance_date

这样去数据库查询的时候,就可以返回我们想要的结果,hint也使用起来了,而且返回的 column 也不会有问题。
这样结果返回后可以根据分区字段往hdfs写数据了。此外因为spark的机制原因,执行action动作的时候,其他的任务无法运行,这样比较浪费。因此可以通过

<code>val jobExecutor = Executors.newFixedThreadPool(inputParams.getOrElse("queryCnt", "2").toInt)
val latch = new CountDownLatch(sqls.length)/<code>

来开启多线程,这样可以同时跑多个action任务了,不过这里需要解决 临时文件被删除,导致出现

<code>WARN TaskSetManager: Lost task: org.apache.spark.SparkException: Task failed while writing rows.* WARN TaskSetManager: Lost task:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):* No lease on /<code>

这样的问题,可以通过写不同的目录来避免。

2.需要合并的数据写入到hdfs之后,需要下一步,数据合并来。

首先是判断合并的模式,模式如果是override,就说明是全覆盖模式,这种最简单来,直接使用

<code>val data = spark.read.jdbc(prop.getProperty("url"), lTable, prop)
(columns match {
case Array("*") => data
case other => data.selectExpr(columns: _*)
}).write.format("delta").mode("overwrite").option("mergeSchema", "true").option("overwriteSchema", "true")
.save(deltaTablePath)/<code>

就可以将结果以Delta的形式保存到hdfs上。

如果模式是 merge,这种就复杂点来。

需要进行delta的merge操作。

<code>updateDS.select("pt").distinct().collect().map(_.getAs[Int]("pt")).sortWith(_ < _)
.grouped(inputParams.getOrElse("groupCnt", "30").toInt)
.foreach(rows => {

println(s"pts = ${rows.mkString(",")}")

DeltaTable.forPath(spark, deltaTablePath).as("events")
.merge(
updateDS.filter(s"pt in (${rows.mkString(",")})")
.as("updates"),
s"events.pt in (${rows.mkString(",")}) and events.pt=updates.pt and events.${uniKey} = updates.${uniKey}")
.whenMatched.updateExpr(
updateColumns.filter(r => r.equalsIgnoreCase(uniKey) == false)
.map(r => {
r -> ("updates." + r)
}).toMap)
.whenNotMatched.insertExpr(
updateColumns
.map(r => {
r -> ("updates." + r)
}).toMap)
.execute()
})/<code>

这里的 pt是分区,首先获取 导入的数据含有的分区有哪些,然后以默认30个分区作为一个批次进行合并。合并的条件就是 分区相同,且 唯一性主键相同,进行合并。如果匹配到了,执行updated操作,记录需要合并的字段。如果没有匹配到就可以执行insert操作,这个有点像 oracle 的 merge into 操作。

3.数据都合并完之后,就是对于历史数据的清算了,Delta Lake的delete操作是不会删除hdfs上的文件的,而是添加墓碑标记,证明这些数据已经被删除了,那么通过delta的形式去读取的时候是不会读取到这些 delete 的数据的。
但是如果是通过parquet的格式去读取,依旧可以读取到这些已经被打上delete操作的数据的。
为了完全的进行物理删除,需要执行Delta 的 vacuum 方法,执行这个方法后,Delta会完全物理删除已经被打上标签的 文件 和 目录。

4.这样就可以完成Delta的存储了,不过是最基本的,后面还需要解决小文件合并,表结构修改等问题,就不在这里讲述了。


分享到:


相關文章: