大數據實戰:SparkSQL 三種方法實現變長參數的用戶自定義函數

大數據實戰:SparkSQL 三種方法實現變長參數的用戶自定義函數

apache spark

用戶自定義函數(User Define Function,簡稱UDF)在Spark1.1版本的時候推出。用戶可以在Spark SQL 裡自定義UDF函數來實現複雜的業務邏輯。org.apache.spark.sql.functions中重載了udf函數,最多可支持10個參數。但是日常業務中,我們可能需要不止十個參數。如:有一個DataFrame共有10行100列,每一列表示某個青少年的體重,我們需要增加一列表示每一行的平均體重。如何用udf實現呢?其實,我們可以通過以下3種方式實現支持任意多個參數。

  1. 變長參數函數

  2. Seq類型參數

  3. Row類型參數

我們以剛求平均體重為例,首先定義一個DataFrame,簡化起見只有2行5列。

val df= List[(Double,Double,Double,Double,Double)](

(51,53,52,53,51),

(51,53,52,53,51)

).toDF("w1","w2","w3","w4","w5")

使用變長參數實現變長參數的UDF定義

def avgVarargs(ws: Double*): Double= ws.sum / ws.length

val avgVarargsUDF= udf(avgVarargs _)

df.withColumn("avg",avgVarargsUDF(array($"w1",$"w2",$"w3",$"w4",$"w5")).show //需要用array轉換

使用Seq類型參數實現變長參數的UDF定義和調用

def avgSeq(ws: Seq[Double]): Double= ws.sum / ws.length

val avgSeqUDF = udf(avgSeq)

df.withColumn("avg",avgSeqUDF (array($"w1",$"w2",$"w3",$"w4",$"w5")).show //需要用array轉換

使用Row定義變長參數的UDF定義和調用

def avgRow: (Row => Double) = row => row.toSeq.sum / row.toSeq.length

val avgRowUDF = udf(

avgRow)

df.withColumn("avg",avgRowUDF (struct($"w1",$"w2",$"w3",$"w4",$"w5")).show //用struct轉換

3種實現方法比較

由於array和seq都只能保存相同類型的數據,因此變長參數和Seq類型參數這兩種實現只支持多個相同類型的參數,使用範圍較窄。業務邏輯通常比較複雜,多列數據類型通常不完全一樣,因此Row的方式更靈活可靠,適用範圍更廣,可以支持多種不同類型的列,同時Row類型可以使用模式提取。推薦使用這種方式實現可變參數。


分享到:


相關文章: