大數據為什麼那麼火?一文帶你瞭解Spark與SQL結合的力量

Spark是一種大規模、快速計算的集群平臺,本頭條號試圖通過學習Spark官網的實戰演練筆記提升筆者實操能力以及展現Spark的精彩之處。有關框架介紹和環境配置可以參考以下內容:

本文的參考配置為:Deepin 15.11、Java 1.8.0_241、Hadoop 2.10.0、Spark 2.4.4、scala 2.11.12

一、Spark SQL入門

Spark SQL 是 Spark 處理結構化數據的一個模塊。與基礎的 Spark RDD API 不同,Spark SQL 提供了查詢結構化數據計算結果等信息的接口。在內部,Spark SQL 使用這個額外的信息去執行額外的優化。有幾種方式可以跟 Spark SQL 進行交互,包括 SQL Dataset API。當使用相同執行引擎進行計算時,無論使用哪種 API / 語言都可以快速的計算。這種統一意味著開發人員能夠在基於提供最自然的方式來表達一個給定的 transformation API 之間實現輕鬆的來回切換不同的。

1.Spark Session

Spark SQL中所有功能的入口點是SparkSession 類。要創建一個 SparkSession,僅使用 SparkSession.builder()就可以了。如果提示已創建的Warning,則代表之前有創建SparkSession,有些設置不會生效,可以通過.stop方法先停止當前SparkSession。

2.創建DataFrames

在一個 SparkSession中,應用程序可以從一個已經存在的 RDD,從hive表,或者從 Spark數據源中創建一個DataFrames。

一個 Dataset 是一個分佈式的數據集合 Dataset 是在 Spark 1.6 中被添加的新接口,它提供了 RDD 的優點(強類型化,能夠使用強大的 lambda 函數)與Spark SQL執行引擎的優點。一個 Dataset 可以從 JVM 對象來 構造 並且使用轉換功能(map,flatMap,filter,等等)。一個 DataFrame 是一個 Dataset 組成的指定列。

3.SQL語句運行

SparkSession 的 sql 函數可以讓應用程序以編程的方式運行 SQL 查詢,並將結果作為一個 DataFrame 返回。

Spark SQL中的臨時視圖是session級別

的,也就是會隨著session的消失而消失。如果你想讓一個臨時視圖在所有session中相互傳遞並且可用,直到Spark 應用退出,你可以建立一個全局的臨時視圖。全局的臨時視圖存在於系統數據庫 global_temp中,我們必須加上庫名去引用它,比如。SELECT * FROM global_temp.view1。

4.創建DataSets

Dataset 與 RDD 相似,然而,並不是使用 Java 序列化或者 Kryo 編碼器來序列化用於處理或者通過網絡進行傳輸的對象。雖然編碼器和標準的序列化都負責將一個對象序列化成字節,編碼器是動態生成的代碼,並且使用了一種允許 Spark 去執行許多像 filtering,sorting 以及 hashing 這樣的操作,不需要將字節反序列化成對象的格式。

5.RDD互操作性

Spark SQL 支持兩種不同的方法用於轉換已存在的 RDD 成為 Dataset,分別是使用反射推斷Schema和以編程的方式指定Schema。

Spark SQL 的 Scala 接口支持自動轉換一個包含

case classes 的 RDD 為 DataFrame。Case class 定義了表的 Schema。Case class 的參數名使用反射讀取並且成為了列名。Case class 也可以是嵌套的或者包含像 Seq 或者 Array 這樣的複雜類型。這個 RDD 能夠被隱式轉換成一個 DataFrame 然後被註冊為一個表。表可以用於後續的 SQL 語句。

6.UDF自定義函數

內置的DataFrames函數提供常見的聚合,例如count()countDistinct()avg()max()min()等。儘管這些函數是為DataFrames設計的,但用戶不限於預定義的聚合功能,還可以創建自己的功能。

二、數據源

Spark SQL 支持通過 DataFrame 接口對各種 data sources(數據源)進行操作。DataFrame 可以使用 relational transformations(關係轉換)操作,也可用於創建 temporary view(臨時視圖)。將 DataFrame 註冊為 temporary view(臨時視圖)允許您對其數據運行 SQL 查詢。本節 描述了使用 Spark Data Sources

加載和保存數據的一般方法,然後涉及可用於 built-in data sources(內置數據源)的 specific options(特定選項)。

1.通用功能

在最簡單的形式中,默認數據源(parquet,除非另有配置 spark.sql.sources.default)將用於所有操作。還可以手動指定數據源格式。對於內置的源,你也可以使用它們的 短名稱(json,parquet,jdbc,orc,libsvm,csv,text)。從任何 data source type(數據源類型)加載 DataFrames 可以使用此 syntax(語法)轉換為其他類型。

保存操作可以選擇使用 SaveMode,它指定如何處理現有數據如果存在的話。重要的是這些保存模式不使用任何鎖定。另外,當執行 Overwrite 時,數據將在新數據寫出之前被刪除。DataFrames 也可以使用 saveAsTable 命令作為 persistent tables(持久表)保存到 Hive metastore 中。對於基於文件的數據源,也可以對 output(輸出)進行 bucket sort 或者 partitionBucketing 和

sorting 僅適用於 persistent tables 。

2.Hive表

Spark SQL 還支持讀取和寫入存儲在 Apache Hive 中的數據。但是,由於 Hive 具有大量依賴關係,因此這些依賴關係不包含在默認 Spark 分發中。如果在類路徑中找到 Hive 依賴項,Spark 將自動加載它們。請注意,這些 Hive 依賴關係也必須存在於所有工作節點上,因為它們將需要訪問 Hive 序列化和反序列化庫(SerDes),以訪問存儲在 Hive 中的數據。

創建 Hive 表時,需要定義如何 從/向 文件系統 read/write 數據,即 “輸入格式” 和 “輸出格式”。您還需要定義該表如何將數據反序列化為行,或將行序列化為數據,即 “serde”。以下選項可用於指定存儲格式(“serde”, “input format”, “output format”),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。默認情況下,我們將以純文本形式讀取表格文件。請注意,Hive 存儲處理程序在創建表時不受支持,您可以使用 Hive 端的存儲處理程序創建一個表,並使用 Spark SQL 來讀取它。

3.JDBC數據庫

Spark SQL 還包括可以使用 JDBC 從其他數據庫讀取數據的數據源。此功能應優於使用 JdbcRDD。這是因為結果作為 DataFrame 返回,並且可以輕鬆地在 Spark SQL 中處理或與其他數據源連接。JDBC 數據源也更容易從 Java 或 Python 使用,因為它不需要用戶提供 ClassTag。(請注意,這不同於 Spark SQL JDBC 服務器,允許其他應用程序使用 Spark SQL 運行查詢)。

有關Spark SQL的內容至此結束,下文將進一步對Spark Streaming即Spark流處理的內容做詳細介紹。前文筆記請參考下面的鏈接:


分享到:


相關文章: