2018年前100名Apache Spark面試問題和解答(下)

接著上半部繼續,

Apache Spark面試問題答案

一, Spark Driver在spark應用程序中的作用是什麼?

Spark驅動程序是定義知識RDD轉換和操作並向主服務器提交請求的程序。Spark驅動程序是在機器的主節點上運行的程序,它聲明對知識RDD的轉換和操作。

簡單來說,Spark中的驅動程序創建SparkContext,連接到給定的Spark Master。它將RDD圖表聯合提供給Master,無論獨立集群管理器在哪裡運行。

二, Apache Spark集群中的工作節點是什麼?

Apache Spark遵循主/從架構,具有一個主或驅動程序進程以及多個從屬或工作進程

1. master是運行main()程序的驅動程序,其中創建了spark上下文。然後,它與集群管理器交互以安排作業執行並執行任務。

2.工作程序由可以並行運行的進程組成,以執行驅動程序安排的任務。這些過程稱為執行程序。

每當客戶端運行應用程序代碼時,驅動程序都會實例化Spark Context,將轉換和操作轉換為執行的邏輯DAG。然後將此邏輯DAG轉換為物理執行計劃,然後將其細分為較小的物理執行單元。然後,驅動程序與集群管理器交互,以協商執行應用程序代碼任務所需的資源。然後,集群管理器與每個工作節點交互,以瞭解每個節點中運行的執行程序的數量。

工作節點/執行者的角色:

1.執行應用程序代碼的數據處理

2.讀取數據並將數據寫入外部源

3.將計算結果存儲在內存或磁盤中。

執行程序在Spark應用程序的整個生命週期中運行。這是執行者的靜態分配。用戶還可以決定運行任務需要多少執行程序,具體取決於工作負載。這是執行者的動態分配。

在執行任務之前,執行程序通過集群管理器向驅動程序註冊,以便驅動程序知道有多少執行程序正在運行以執行計劃任務。然後,執行程序通過集群管理器開始執行工作節點調度的任務。

每當任何工作節點發生故障時,需要執行的任務將自動分配給任何其他工作節點

三,為什麼變換在Spark中變得懶散?

每當在Apache Spark中執行轉換操作時,它都會被懶惰地評估。在執行操作之前不會執行。Apache Spark只是將變換操作的條目添加到計算的DAG(有向無環圖),這是一個沒有循環的有向有限圖。在該DAG中,所有操作都被分類到不同的階段,在單個階段中沒有數據的混亂。

通過這種方式,Spark可以通過完整地查看DAG來優化執行,並將適當的結果返回給驅動程序。

例如,考慮HDFS中的1TB日誌文件,其中包含錯誤,警告和其他信息。以下是驅動程序中正在執行的操作:

1. 創建此日誌文件

的RDD

2.對此RDD執行flatmap()操作,以根據製表符分隔符拆分日誌文件中的數據。

3.執行filter()操作以提取僅包含錯誤消息的數據

4.執行first()操作以僅獲取第一條錯誤消息。

如果熱切評估上述驅動程序中的所有轉換,那麼整個日誌文件將被加載到內存中,文件中的所有數據將根據選項卡進行拆分,現在要麼需要在某處寫入FlatMap的輸出或將其保存在記憶中。Spark需要等到執行下一個操作,並且資源被阻塞以進行即將進行的操作。除此之外,每個操作spark都需要掃描所有記錄,比如FlatMap處理所有記錄然後再次在過濾操作中處理它們。

另一方面,如果所有轉換都被懶惰地評估,Spark將整體查看DAG併為應用程序準備執行計劃,現在該計劃將被優化,操作將被合併/合併到階段然後執行將開始。Spark創建的優化計劃可提高作業效率和整體吞吐量。

通過Spark中的這種惰性評估,驅動程序和集群之間的切換次數也減少了,從而節省了內存中的時間和資源,並且計算速度也有所提高。

四,我可以在沒有Hadoop的情況下運行Apache Spark嗎?

是的,Apache Spark可以在沒有Hadoop,獨立或在雲中運行。Spark不需要Hadoop集群就可以工作。Spark還可以讀取並處理來自其他文件系統的數據。HDFS只是Spark支持的文件系統之一。

Spark沒有任何存儲層,因此它依賴於分佈式存儲系統之一,用於分佈式計算,如HDFS,Cassandra等。

但是,在Hadoop(HDFS(用於存儲)+ YARN(資源管理器))上運行Spark有很多優點,但這不是強制性要求。Spark是一種用於分佈式計算的。在這種情況下,數據分佈在計算機上,Hadoop的分佈式文件系統HDFS用於存儲不適合內存的數據。

使用Hadoop和Spark的另一個原因是它們都是開源的,並且與其他數據存儲系統相比,它們可以相當容易地相互集成。

五,.在Spark中解釋累加器。

  • 這個討論是繼續問題,命名Apache Spark中可用的兩種類型的共享變量。

累加器介紹:

  • Accumulator是Apache Spark中的共享變量,用於聚合群集中的信息。
  • 換句話說,將來自工作節點的信息/值聚合回驅動程序。(我們將在下面的會議中看到)
  • 為什麼累加器:
  • 當我們在map(),filter()等操作中使用函數時,這些函數可以使用驅動程序中這些函數作用域外定義的變量。
  • 當我們將任務提交到集群時,集群上運行的每個任務都會獲得這些變量的新副本,並且這些變量的更新不會傳播回驅動程序。
  • 累加器降低了此限制。
  • 用例 :
  • 累加器最常見的用途之一是計算作業執行期間發生的事件以進行調試。
  • 意思是數不了。輸入文件中的空白行,沒有。在會話期間來自網絡的錯誤數據包,在奧運會數據分析期間,我們必須找到我們在SQL查詢中所說的年齡(年齡!='NA'),簡短地查找錯誤/損壞的記錄。
  • 例子 :
  • scala> val record = spark.read.textFile("/home/hdadmin/wc-data-blanklines.txt")
  • record: org.apache.spark.sql.Dataset[String] = [value: string]
  • scala> val emptylines = sc.accumulator(0)

  • warning: there were two deprecation warnings; re-run with -deprecation for details
  • emptylines: org.apache.spark.Accumulator[Int] = 0
  • scala> val processdata = record.flatMap(x =>

  • {
  • if(x == "")
  • emptylines += 1
  • x.split(" ")
  • })
  • processdata: org.apache.spark.sql.Dataset[String] = [value: string]

  • scala> processdata.collect
  • 16/12/02 20:55:15 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
  • 輸出:
  • res0:Array [String] = Array(DataFlair,提供,培訓,開,切,邊緣,技術。,“”,DataFlair,是,領導,培訓,提供者,我們,有,訓練,1000s, ,候選人,培訓,焦點,實踐,方面,哪些,工業,需要,而不是理論,知識。,“”,DataFlair,幫助,組織,解決,BigData,問題。, “”,Javadoc,是一個工具,用於生成API,文檔,HTML,格式,文檔,註釋,內容,源代碼,代碼。,它,可以,只下載,僅作為,部分,Java,2,SDK。,To,see,documentation,generated,by,Javadoc,tool ,, go,to,J2SE,1.5.0,API,Documentation。,“”,Javadoc,常見問題解答, - ,這,常見問題,涵蓋,在哪裡,到,下載,Javadoc,工具,如何,到,找到,列表,已知,錯誤和功能,reque ...
  • scala> println(“空行數:”+ emptylines.value)
  • 空行數:10
  • 程序的解釋和結論:
  • 在上面的例子中,我們創建了一個Accumulator [Int] 'emptylines'
  • 在這裡,我們想找到沒有。我們處理過程中的空白行。
  • 之後,我們應用flatMap()轉換來處理我們的數據,但我們想要找出沒有。空行(空白行)所以在flatMap()函數中,如果我們遇到任何空行,累加器空行增加1,否則我們按空格分割行。
  • 之後,我們檢查輸出和否。的空白行。
  • 我們通過調用sc.accumulator(0)來創建具有初始值的累加器,通過調用sc.accumulator(0)即spark Context.accumulator(初始值),其中返回類型為initalValue {org.apache.spark.Accumulator [T],其中T為initalValue]
  • 最後,我們調用累加器上的value()屬性來訪問它的值。
  • 請注意,工作節點上的任務不能訪問累加器的value屬性,因此在任務的上下文中,累加器是隻寫變量。
  • accumulator的value()屬性僅在驅動程序中可用。
  • 我們也可以算不上。在變換/動作的幫助下,我們需要一個額外的操作,但是在累加器的幫助下,我們可以計算一下。我們加載/處理數據時的空行(或更廣泛的事件)。

六, Driver程序在Spark Application中的作用是什麼?

  • 驅動程序負責在集群上啟動各種並行操作。
  • 驅動程序包含應用程序的main()函數。
  • 它是運行用戶代碼的過程,用戶代碼又創建SparkContext對象,
    創建RDD在RDD上執行轉換和操作操作
  • 驅動程序通過SparkContext對象訪問Apache Spark,該對象表示與計算集群的連接(從Spark 2.0開始,我們可以通過SparkSession訪問SparkContext對象)。
  • 驅動程序負責將用戶程序轉換為稱為任務的物理執行單元。
  • 它還定義了集群上的分佈式數據集,我們可以對數據集(轉換和操作)應用不同的操作。
  • Spark程序創建一個名為Directed Acyclic graph的邏輯計劃,當驅動程序運行時,該計劃由驅動程序轉換為物理執行計劃。

七,如何識別給定的操作是程序中的Transformation / Action?

為了識別操作,需要查看操作的返回類型。

  • 如果操作在這種情況下返回一個新的RDD,則操作是“轉換”
  • 如果操作返回除RDD之外的任何其他類型,則操作為“Action”

因此,Transformation從現有RDD(前一個)構造新的RDD,而Action根據應用的轉換計算結果,並將結果返回給驅動程序或將其保存到外部存儲

八,命名Apache Spark中可用的兩種類型的共享變量。

Apache Spark中有兩種類型的共享變量:

(1)累加器:用於聚合信息。

(2)廣播變量:有效地分配大值。

當我們將函數傳遞給Spark時,比如說filter(),這個函數可以使用在函數外部但在Driver程序中定義的變量,但是當我們將任務提交給Cluster時,每個工作節點都獲得一個新的變量副本並更新從這些變量不會傳播回Driver程序。

累加器和廣播變量用於消除上述缺點(即我們可以將更新的值恢復到我們的驅動程序)

九,使用Apache Spark時開發人員常見的錯誤是什麼?

1)DAG的管理- 人們經常在DAG控制中犯錯誤。始終嘗試使用reducebykey而不是groupbykey。ReduceByKey和GroupByKey可以執行幾乎相似的功能,但GroupByKey包含大數據。因此,儘量使用ReduceByKey。始終儘量減少地圖的側面。儘量不要在分區中浪費更多時間。儘量不要隨便洗牌。儘量遠離Skews和分區。

2)保持隨機塊的所需大小

十,默認情況下,Apache Spark中的RDD中創建了多少個分區?

  • 默認情況下,Spark為文件的每個塊創建一個分區(對於HDFS)
  • HDFS塊的默認塊大小為64 MB(Hadoop版本1)/ 128 MB(Hadoop版本2)。
  • 但是,可以明確指定要創建的分區數。
  • 例1:
  • 沒有指定分區
  • val rdd1 = sc.textFile("/home/hdadmin/wc-data.txt")
  • 例2:
  • 以下代碼創建了10個分區的RDD,因為我們指定了no。分區。
  • val rdd1 = sc.textFile("/home/hdadmin/wc-data.txt", 10)
  • 可以通過以下方式查詢分區數:
  • rdd1.partitions.length
  • OR
  • rdd1.getNumPartitions
  • 最佳情況是我們應該按照以下方式製作RDD:
  • Cluster中的核心數量=否。分區

十一,.為什麼我們需要壓縮以及支持的不同壓縮格式是什麼?

  • 在Big Data中,當我們使用壓縮時,它可以節省存儲空間並減少網絡開銷。
  • 可以在將數據寫入HDFS
    時指定壓縮編碼(Hadoop格式)
  • 人們也可以讀取壓縮數據,因為我們也可以使用壓縮編解碼器。
  • 以下是BigData中不同的壓縮格式支持:
  • * gzip
  • * lzo
  • * bzip2
  • * Zlib
  • * Snappy

十二,解釋過濾器轉換。

  • Apache Spark中的filter()轉換將函數作為輸入。
  • 它返回一個RDD,它只有通過輸入函數中提到的條件的元素。
  • 示例
  • val rdd1 = sc.parallelize(List(10,20,40,60))
  • val rdd2 = rdd2.filter(x => x !=10)
  • println(rdd2.collect())
  • 產量
  • 10

十三,如何在交互式shell中啟動和停止scala?

在Scala中啟動交互式shell的命令:

>>>> bin / spark-shell

首先進入spark目錄即

hdadmin@ubuntu:~$ cd spark-1.6.1-bin-hadoop2.6/

hdadmin@ubuntu:~/spark-1.6.1-bin-hadoop2.6$ bin/spark-shell

shisi

-------------------------------------------------- -------------------------------------------------- --------------------------

在Scala中停止交互式shell的命令:

scala>Press (Ctrl+D)

可以看到以下消息

scala> Stopping spark context.

十四,解釋sortByKey()操作

>

sortByKey()是一種轉換。

>它返回按鍵排序的RDD。

>排序可以在(1)升序OR(2)降序OR(3)自定義排序中完成

從:

http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#212_SortByKey

他們將適用於範圍內具有隱式排序[K]的任何鍵類型K. 對於所有標準基元類型,已經存在排序對象。用戶還可以為自定義類型定義自己的排序,或覆蓋默認排序。將使用最近範圍內的隱式排序。

當調用 (K,V)數據集

(其中k為Ordered)時,返回按鍵按升序或降序排序的(K,V)對數據集,如升序參數中所指定。

示例:


val rdd1 = sc.parallelize(Seq(("India",91),("USA",1),("Brazil",55),("Greece",30),("China",86),("Sweden",46),("Turkey",90),("Nepal",977)))

val rdd2 = rdd1.sortByKey()

rdd2.collect();

輸出:

數組[(String,Int)] =(數組(巴西,55),(中國,86),(希臘,30),(印度,91),(尼泊爾,977),(瑞典,46),(火雞,90),(美國,1)


val rdd1 = sc.parallelize(Seq(("India",91),("USA",1),("Brazil",55),("Greece",30),("China",86),("Sweden",46),("Turkey",90),("Nepal",977)))

val rdd2 = rdd1.sortByKey(false)

rdd2.collect();

輸出:

Array [(String,Int)] =(Array(USA,1),(Turkey,90),(Sweden,46),(Nepal,977),(India,91),(Greece,30),(中國,86),(巴西,55)

十五,解釋Spark中的distnct(),union(),intersection()和substract()轉換

union()轉換

  • 最簡單的設定操作。
  • rdd1.union(rdd2),它輸出一個包含兩個來源數據的RDD。
  • 如果輸入RDD中存在重複項,則union()轉換的輸出也將包含重複項,可以使用distinct()進行修復。

val u1 = sc.parallelize(List("c","c","p","m","t"))

val u2 = sc.parallelize(List("c","m","k"))

val result = u1.union(u2)

result.foreach{println}

輸出:

c

c

p

m

t

c

m

k

十六,

在apache spark中解釋foreach()操作

> foreach()操作是一個動作。

>它不會返回任何值。

>它對RDD的每個元素執行輸入功能。

來自:http:

//data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#39_Foreach

它在RDD中的每個項目上執行該功能。它適用於編寫數據庫或發佈到Web服務。它為每個數據項執行參數減少功能。

例:

val mydata = Array(1,2,3,4,5,6,7,8,9,10)

val rdd1 = sc.parallelize(mydata)

rdd1.foreach{x=>println(x)}

OR

rdd1.foreach{println}

輸出:

1

2

3

4

5

6

7

8

9

10

十七,Apache Spark中的groupByKey vs reduceByKey

在對(K,V)對的數據集應用groupByKey()時,數據根據另一個RDD中的鍵值K進行混洗。在這種轉變中,許多不必要的數據通過網絡傳輸。

Spark提供了將數據存儲到單個執行程序機器上的數據多於內存中數據時保存到磁盤的功能。

例:

val data = spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)

val group = data.groupByKey().collect()

group.foreach(println)

在對數據集(K,V)應用reduceByKey時,在對數據進行混合之前,組合具有相同密鑰的同一機器上的對。

例:

val words = Array("one","two","two","four","five","six","six","eight","nine","ten")

val data = spark.sparkContext.parallelize(words).map(w => (w,1)).reduceByKey(_+_)

data.foreach(println)

十八,.解釋mapPartitions()和mapPartitionsWithIndex()

Mappartitions是一種類似於Map的轉換。

在Map中,函數應用於RDD的每個元素,並返回結果RDD的每個其他元素。對於mapPartitions,該函數將應用於RDD的每個分區,而不是每個元素,並返回結果RDD的多個元素。在mapPartitions轉換中,性能得到改善,因為在地圖轉換中消除了每個元素的對象創建。

由於mapPartitions轉換適用於每個分區,因此它將字符串或int值的迭代器作為分區的輸入。

請考慮以下示例:

val data = sc.parallelize(List(1,2,3,4,5,6,7,8), 2)

Map:

def sumfuncmap(numbers : Int) : Int =

{

var sum = 1

return sum + numbers

}

data.map(sumfuncmap).collect

returns Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9) //Applied to each and every element

MapPartitions:

def sumfuncpartition(numbers : Iterator[Int]) : Iterator[Int] =

{

var sum = 1

while(numbers.hasNext)

{

sum = sum + numbers.next()

}

return Iterator(sum)

}

data.mapPartitions(sumfuncpartition).collect

returns

Array[Int] = Array(11, 27) // Applied to each and every element partition-wise

MapPartitionsWithIndex類似於mapPartitions,除了它還需要一個參數作為輸入,它是分區的索引。

十九,Apache Spark中的Map是什麼?

Map是應用於RDD中每個元素的轉換,它提供了一個新的RDD作為結果。在Map轉換中,用戶定義的業務邏輯將應用於RDD中的所有元素。它類似於FlatMap,但與可以產生0,1或多個輸出的FlatMap不同,Map只能產生一對一的輸出。 映射操作將長度為N的RDD轉換為另一個長度為N的RDD。

A -------> a

B -------> b

C -------> c

Map Operation

映射轉換不會將數據從一個分區變為多個分區。它將使操作保持狹窄

二十,Apache Spark中的FlatMap是什麼?

FlatMap是Apache Spark中轉換操作,用於從現有RDD 創建RDD。它需要RDD中的一個元素,並且可以根據業務邏輯生成0,1或多個輸出。它類似於Map操作,但Map產生一對一輸出。如果我們對長度為N的RDD執行Map操作,則輸出RDD的長度也為N.但對於FlatMap操作,輸出RDD可以根據業務邏輯的不同長度

X ------ A x ----------- a

Y ------ B y ----------- b,c

Z ----- -C z ----------- d,e,f

地圖操作FlatMap操作

我們也可以說flatMap將長度為N的RDD轉換為N個集合的集合,然後將其展平為單個RDD結果。

如果我們觀察下面的示例data1 RDD,它是Map操作的輸出,具有與數據RDD相同的元素,

但是data2 RDD沒有相同數量的元素。我們還可以在這裡觀察data2 RDD是data1 RDD的平坦輸出

pranshu @ pranshu-virtual-machine:〜$ cat pk.txt

1 2 3 4

5 6 7 8 9

10 11 12

13 14 15 16 17

18 19 20

scala> val data = sc.textFile(“/ home / pranshu / pk.txt”)

17/05/17 07:08:20 WARN SizeEstimator:無法檢查是否設置了UseCompressedOops; 假設是

數據:org.apache.spark.rdd.RDD [String] = /home/pranshu/pk.txt MapPartitionsRDD [1] at textFile at :24

scala> data.collect

res0:Array [String] = Array(1 2 3 4,5 6 7 8 9,10 11 12,13 14 15 16 17,18 19 20)

斯卡拉>

scala> val data1 = data.map(line => line.split(“”))

data1:org.apache.spark.rdd.RDD [Array [String]] = MapPartitionsRDD [2] at map at :26

斯卡拉>

scala> val data2 = data.flatMap(line => line.split(“”))

data2:org.apache.spark.rdd.RDD [String] =在mapMap at 的MapPartitionsRDD [3]:26

斯卡拉>

scala> data1.collect

res1:Array [Array [String]] = Array(數組(1,2,3,4),數組(5,6,7,8,9 ),數組(10,11,12),數組(13,14,15,16,17),數組(18,19,20))

斯卡拉>

scala> data2.collect

res2:Array [String] =數組(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18, 19,20)

二十一,在Spark中解釋fold()操作。

  • fold()是一個動作。它是廣泛的操作(即跨多個分區的shuffle數據並輸出單個值)
  • 它將函數作為輸入,具有兩個相同類型的參數,並輸出單個輸入類型的值。
  • 它類似於reduce,但還有一個參數'ZERO VALUE'(比如初始值),它將在每個分區的初始調用中使用。

def fold(zeroValue:T)(op:(T,T)⇒T):T

使用給定的關聯函數和中性“零值”聚合每個分區的元素,然後聚合所有分區的結果。函數op(t1,t2)允許修改t1並將其作為結果值返回以避免對象分配; 但是,它不應該修改t2。

這與在Scala等函數式語言中為非分佈式集合實現的摺疊操作有所不同。該摺疊操作可以單獨地應用於分區,然後將這些結果摺疊成最終結果,而不是以某種定義的順序將摺疊順序地應用於每個元素。對於不可交換的函數,結果可能與應用於非分佈式集合的摺疊的結果不同。

zeroValue:op運算符的每個分區的累積結果的初始值,以及組合的初始值來自op運算符的不同分區 - 這通常是中性元素(例如,列表連接為Nil或0為總結)

操作:用於在分區內累積結果並組合來自不同分區的結果的運算符

示例:

val rdd1 = sc.parallelize(List(1,2,3,4,5),3)

rdd1.fold(5)(_+_)

輸出:

Int = 35

val rdd1 = sc.parallelize(List(1,2,3,4,5))

rdd1.fold(5)(_+_)

輸出:

Int = 25

val rdd1 = sc.parallelize(List(1,2,3,4,5),3)

rdd1.fold(3)(_+_)

Int = 27

二十二,解釋API createOrReplaceTempView()

  • 它的基本數據集功能。
  • 它位於org.apache.spark.sql下
  • def createOrReplaceTempView(viewName:String):Unit
  • 使用給定名稱創建臨時視圖。
  • 此臨時視圖的生命週期與用於創建此數據集的SparkSession相關聯。


scala> val df = spark.read.csv("/home/hdadmin/titanic_data.txt")

df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 9 more fields]

scala> df.printSchema

root

|-- _c0: string (nullable = true)

|-- _c1: string (nullable = true)

|-- _c2: string (nullable = true)

|-- _c3: string (nullable = true)

|-- _c4: string (nullable = true)

|-- _c5: string (nullable = true)

|-- _c6: string (nullable = true)

|-- _c7: string (nullable = true)

|-- _c8: string (nullable = true)

|-- _c9: string (nullable = true)

|-- _c10: string (nullable = true)

scala> df.show

+---+---+---+--------------------+-------+-----------+--------------------+-------+-----------------+-----+------+

|_c0|_c1|_c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10|

+---+---+---+--------------------+-------+-----------+--------------------+-------+-----------------+-----+------+


| 1|1st| 1|Allen, Miss Elisa...|29.0000|Southampton| St Louis, MO| B-5| 24160 L221| 2|female|

| 2|1st| 0|Allison, Miss Hel...| 2.0000|Southampton|Montreal, PQ / Ch...| C26| null| null|female|

| 3|1st| 0|Allison, Mr Hudso...|30.0000|Southampton|Montreal, PQ / Ch...| C26| null|(135)| male|

| 4|1st| 0|Allison, Mrs Huds...|25.0000|Southampton|Montreal, PQ / Ch...| C26| null| null|female|

| 5|1st| 1|Allison, Master H...| 0.9167|Southampton|Montreal, PQ / Ch...| C22| null| 11| male|

| 6|1st| 1| Anderson, Mr Harry|47.0000|Southampton| New York, NY| E-12| null| 3| male|

| 7|1st| 1|Andrews, Miss Kor...|63.0000|Southampton| Hudson, NY| D-7| 13502 L77| 10|female|

| 8|1st| 0|Andrews, Mr Thoma...|39.0000|Southampton| Belfast, NI| A-36| null| null| male|

| 9|1st| 1|Appleton, Mrs Edw...|58.0000|Southampton| Bayside, Queens, NY| C-101| null| 2|female|

| 10|1st| 0|Artagaveytia, Mr ...|71.0000| Cherbourg| Montevideo, Uruguay| null| null| (22)| male|

| 11|1st| 0|Astor, Colonel Jo...|47.0000| Cherbourg| New York, NY| null|17754 L224 10s 6d|(124)| male|

| 12|1st| 1|Astor, Mrs John J...|19.0000| Cherbourg| New York, NY| null|17754 L224 10s 6d| 4|female|

| 13|1st| 1|Aubert, Mrs Leont...| NA| Cherbourg| Paris, France| B-35| 17477 L69 6s| 9|female|

| 14|1st| 1|Barkworth, Mr Alg...| NA|Southampton| Hessle, Yorks| A-23| null| B| male|

| 15|1st| 0| Baumann, Mr John D.| NA|Southampton| New York, NY| null| null| null| male|

| 16|1st| 1|Baxter, Mrs James...|50.0000| Cherbourg| Montreal, PQ|B-58/60| null| 6|female|

| 17|1st| 0|Baxter, Mr Quigg ...|24.0000| Cherbourg| Montreal, PQ|B-58/60| null| null| male|

| 18|1st| 0| Beattie, Mr Thomson|36.0000| Cherbourg| Winnipeg, MN| C-6| null| null| male|

| 19|1st| 1|Beckwith, Mr Rich...|37.0000|Southampton| New York, NY| D-35| null| 5| male|

| 20|1st| 1|Beckwith, Mrs Ric...|47.0000|Southampton| New York, NY| D-35| null| 5|female|

+---+---+---+--------------------+-------+-----------+--------------------+-------+-----------------+-----+------+

only showing top 20 rows

scala> df.createOrReplaceTempView("titanicdata")

二十三,解釋Apache Spark中的values()操作

  • values()是一種轉換。
  • 它僅返回值的RDD。


val rdd1 = sc.parallelize(Seq((2,4),(3,6),(4,8),(5,10),(6,12),(7,14),(8,16),(9,18),(10,20)))

val rdd2 = rdd1.values

rdd2.collect

輸出:

Array[Int] = Array(4, 6, 8, 10, 12, 14, 16, 18, 20)

示例2:數據集中的值重複


val rdd1 = sc.parallelize(Seq((2,4),(3,6),(4,8),(2,6),(4,12),(5,10),(5,40),(10,40)))

val rdd2 = rdd1.keys

rdd2.collect

val rdd3 = rdd1.values

rdd3.collect

輸出:

Array[Int] = Array(2, 3, 4, 2, 4, 5, 5, 10)

Array[Int] = Array(4, 6, 8, 6, 12, 10, 40, 40)

二十四,解釋Apache spark中的keys()操作。

  • keys()是一種轉換
  • 它返回一個密鑰的RDD
  • val rdd1 = sc.parallelize(Seq((2,4),(3,6),(4,8),(5,10),(6,12),(7,14),(8,16),(9,18),(10,20)))
  • val rdd2 = rdd1.keys
  • rdd2.collect
  • 輸出:
  • Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10)

示例2 :(重複鍵 - 數據集中存在重複鍵)

val rdd1 = sc.parallelize(Seq((2,4),(3,6),(4,8),(2,6),(4,12),(5,10),(5,40),(10,40)))

val rdd2 = rdd1.keys

rdd2.collect

輸出:

Array[Int] = Array(2, 3, 4, 2, 4, 5, 5, 10)

二十五,在Spark中解釋textFile與fullTextFile

  • 兩者都是org.apache.spark.SparkContext的方法。
  • 文本文件() :
  • def textFile(path:String,minPartitions:Int = defaultMinPartitions):RDD [String]
  • 從HDFS讀取文本文件,本地文件系統(在所有節點上都可用)或任何Hadoop支持的文件系統URI,並將其作為字符串的RDD返回
  • 例如sc.textFile(“/ home / hdadmin / wc-data.txt”)因此它將創建RDD,其中每個單獨的行都是一個元素。
  • 每個人都知道textFile的用法。
  • wholeTextFiles():
  • def wholeTextFiles(path:String,minPartitions:Int = defaultMinPartitions):RDD [(String,String)]
  • 從HDFS讀取文本文件目錄,本地文件系統(在所有節點上都可用)或任何支持Hadoop的文件系統URI。
  • 而不是創建基本RDD,wholeTextFile()返回pairRDD。
  • 例如,目錄中的文件很少,因此通過使用wholeTextFile()方法,
  • 它創建了帶有文件名的對RDD,路徑為鍵,
  • 值為整個文件為字符串
  • val myfilerdd = sc.wholeTextFiles("/home/hdadmin/MyFiles")
  • val keyrdd = myfilerdd.keys
  • keyrdd.collect
  • val filerdd = myfilerdd.values
  • filerdd.collect
  • 輸出:
  • Array [String] = Array(
  • 文件:/home/hdadmin/MyFiles/JavaSparkPi.java,
  • 文件:/home/hdadmin/MyFiles/sumnumber.txt,
  • 文件:/home/hdadmin/MyFiles/JavaHdfsLR.java,
  • 文件: /home/hdadmin/MyFiles/JavaPageRank.java,
  • 文件:/home/hdadmin/MyFiles/JavaLogQuery.java,
  • 文件:/home/hdadmin/MyFiles/wc-data.txt,
  • 文件:/ home / hdadmin / MyFiles / nosum。文本)
  • Array [String] =
  • Array(“/ *
  • *根據一個或多個
  • *貢獻者許可協議許可給Apache Software Foundation(ASF)。
  • 有關版權所有權的其他信息,請參閱隨*此工作分發的NOTICE文件。
  • * ASF許可此根據Apache許可證2.0版
  • *(“許可證”)向您提交;除非符合
  • *許可,否則您不得使用此文件。您可以在以下位置獲取許可副本:
  • http://www.apache.org/licenses/LICENSE-2.0
  • *除非適用法律要求或書面同意,否則
  • 根據許可證分發的軟件*按“原樣”分發
  • *,不附帶任何明示或暗示的擔保或條件。
  • *有關權限和
  • * 的特定語言,請參閱許可證。

二十六,解釋Spark中的cogroup()操作

>這是一個轉變。

>它位於org.apache.spark.rdd.PairRDDFunctions包中

def cogroup [W1,W2,W3](other1:RDD [(K,W1)],other2:RDD [(K,W2)],other3:RDD [(K,W3)]):RDD [(K,( Iterable [V],Iterable [W1],Iterable [W2],Iterable [W3]))]

對於this或other1或other2或other3中的每個鍵k,返回包含元組的結果RDD,該元組具有該鍵,other1,other2和other3中該鍵的值列表。

例:

val myrdd1 = sc.parallelize(List((1,"spark"),(2,"HDFS"),(3,"Hive"),(4,"Flink"),(6,"HBase")))

val myrdd2 = sc.parallelize(List((4,"RealTime"),(5,"Kafka"),(6,"NOSQL"),(1,"stream"),(1,"MLlib")))

val result = myrdd1.cogroup(myrdd2)

result.collect

輸出:

Array [(Int,(Iterable [String],Iterable [String]))] =

Array((4,(CompactBuffer(Flink),CompactBuffer(RealTime))),

(1,(CompactBuffer(spark),CompactBuffer( stream,MLlib))),

(6,(CompactBuffer(HBase),CompactBuffer(NOSQL))),

(3,(CompactBuffer(Hive),CompactBuffer())),

(5,(CompactBuffer(),CompactBuffer(Kafka) )),

(2,(CompactBuffer(HDFS),CompactBuffer())))

二十七,解釋Apache Spark中的pipe()操作

  • 這是一種轉變。
  • def pipe(command:String):RDD [String]
  • 將由管道元素創建的RDD返回給分叉的外部進程。
  • 通常,Spark使用Scala,Java和Python來編寫程序。但是,如果這還不夠,並且想要管道(注入)用其他語言(如'R')編寫的數據,Spark會以pipe()方式的形式提供一般機制
  • Spark在RDD上提供了pipe()方法。
  • 使用Spark的pipe()方法,可以編寫RDD的轉換,可以將RDD中的每個元素從標準輸入讀取為String。
  • 它可以將結果作為String寫入標準輸出。

二十八,.解釋Spark coalesce()操作

>這是一種轉變

>它位於org.apache.spark.rdd.ShuffledRDD包中

DEF聚結(numPartitions:中等,洗牌:布爾=假,partitionCoalescer:選項[PartitionCoalescer] = Option.empty)(隱式ORD:訂購[(K,C)] = NULL):RDD [(K,C)]

返回一個縮減為numPartitions分區的新RDD

這會導致較窄的依賴性,例如,如果從1000個分區到100個分區,則不會進行隨機播放,而是100個新分區中的每個分區將聲明10個當前分區。

但是,如果你正在進行激烈的合併,例如對numPartitions = 1,這可能導致你的計算發生在比你想要的更少的節點上(例如,在numPartitions = 1的情況下,一個節點)。為避免這種情況,您可以傳遞shuffle = true。這將添加一個shuffle步驟,但意味著當前的上游分區將並行執行(無論當前分區是什麼)。

注意:使用shuffle = true,您實際上可以合併到更大數量的分區。如果您有少量分區(例如100),這可能會使一些分區異常大,這很有用。調用coalesce(1000,shuffle = true)將導致1000個分區,並使用散列分區程序分發數據。

來自:http

//data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#214_Coalesce

它會更改存儲數據的分區數。它將原始分區與新數量的分區相結合,因此可以減少分區數量。它是重新分區的優化版本,允許數據移動,但前提是您要減少RDD分區的數量。過濾大型數據集後,它可以更有效地運行操作。

示例:

val myrdd1 = sc.parallelize(1 to 1000, 15)

myrdd1.partitions.length

val myrdd2 = myrdd1.coalesce(5,false)

myrdd2.partitions.length

Int = 5

輸出:

Int = 15

Int = 5

二十九,解釋Spark中的repartition()操作

repartition()是一種轉變。

>此函數更改參數numPartitions(numPartitions:Int)中提到的分區數

>它位於包org.apache.spark.rdd.ShuffledRDD中

def repartition(numPartitions:Int)(隱式ord:Ordering [(K,C)] = null):RDD [(K,C)]

返回一個具有正好numPartitions分區的新RDD

可以增加或減少此RDD中的並行度。在內部,它使用shuffle重新分配數據。

如果要減少此RDD中的分區數,請考慮使用coalesce,這可以避免執行shuffle。

來自

http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/

重新分區將重新調整RDD中的數據,以生成您請求的最終分區數。它可以減少或增加整個網絡中的分區數量和數據。

示例

val rdd1 = sc.parallelize(1 to 100, 3)

rdd1.getNumPartitions

val rdd2 = rdd1.repartition(6)

rdd2.getNumPartitions

輸出:

Int = 3

Int = 6

三十,.解釋Apache Spark中的fullOuterJoin()操作

>這是轉型。

>它位於org.apache.spark.rdd.PairRDDFunctions包中

def fullOuterJoin [W](其他:RDD [(K,W)]):RDD [(K,(Option [V],Option [W]))]

執行此和其他的完全外部聯接。

對於此中的每個元素(k,v),得到的RDD將包含所有對(k,(Some(v),Some(w)))用於其他w,

或對(k,(Some(v)) ,無)))如果其他元素沒有密鑰k。

類似地,對於其他元素(k,w),得到的RDD將包含所有對(k,(Some(v),Some(w)))中的v,

或者對(k,(None,一些(w)))如果其中沒有元素具有密鑰k。

使用現有的分區程序/並行級別對生成的RDD進行散列分區。

示例:

val frdd1 = sc.parallelize(Seq(("Spark",35),("Hive",23),("Spark",45),("HBase",89)))

val frdd2 = sc.parallelize(Seq(("Spark",74),("Flume",12),("Hive",14),("Kafka",25)))

val fullouterjoinrdd = frdd1.fullOuterJoin(frdd2)

fullouterjoinrdd.collect

輸出:

Array [(String,(Option [Int],Option [Int]))] = Array((Spark,(Some(35),Some(74))),(Spark,(Some(45),Some( 74))),(Kafka,(無,有些(25))),(Flume,(無,有些(12))),(Hive,(Some(23),Some(14))),(HBase, (一些(89),無)))

三十一. Expain Spark leftOuterJoin()和rightOuterJoin()操作

> leftOuterJoin()和rightOuterJoin()都是轉換。

>兩者都在org.apache.spark.rdd.PairRDDFunctions包中

leftOuterJoin():

def leftOuterJoin [W](其他:RDD [(K,W)]):RDD [(K,(V,Option [W]))]

執行此和其他的左外連接。對於此中的每個元素(k,v),得到的RDD將包含w中的所有對(k,(v,Some(w))),或者對(k,(v,None))如果不包含其他元素有關鍵k。使用現有分區程序/並行級別對輸出進行散列分區。

leftOuterJoin()在兩個RDD之間執行連接,其中鍵必須存在於第一個RDD中

示例:

val rdd1 = sc.parallelize(Seq(("m",55),("m",56),("e",57),("e",58),("s",59),("s",54)))

val rdd2 = sc.parallelize(Seq(("m",60),("m",65),("s",61),("s",62),("h",63),("h",64)))

val leftjoinrdd = rdd1.leftOuterJoin(rdd2)

leftjoinrdd.collect

輸出:

Array [(String,(Int,Option [Int]))] = Array((s,(59,Some(61))),(s,(59,Some(62))),(s,( 54,Some(61))),(s,(54,Some(62))),(e,(57,None)),(e,(58,None)),(m,(55,Some( 60))),(m,(55,Some(65))),(m,(56,Some(60))),(m,(56,Some(65))))

rightOuterJoin():

def rightOuterJoin [W](其他:RDD [(K,W)]):RDD [(K,(Option [V],W))]

執行此和其他的右外連接。對於其他元素(k,w),得到的RDD將包含所有對(k,(Some(v),w))的v,或者對(k,(None,w))如果沒有其中的元素有關鍵k。使用現有的分區程序/並行級別對生成的RDD進行散列分區。

它執行兩個RDD之間的連接,其中密鑰必須存在於其他RDD中

例:

val rdd1 = sc.parallelize(Seq(("m",55),("m",56),("e",57),("e",58),("s",59),("s",54)))

val rdd2 = sc.parallelize(Seq(("m",60),("m",65),("s",61),("s",62),("h",63),("h",64)))

val rightjoinrdd = rdd1.rightOuterJoin(rdd2)

rightjoinrdd.collect

Array [(String,(Option [Int],Int))] = Array((s,(Some(59),61)),(s,(Some(59),62)),(s,(Some(( 54),61)),(s,(Some(54),62)),(h,(None,63)),(h,(None,64)),(m,(Some(55),60 )),(m,(Some(55),65)),(m,(Some(56),60)),(m,(Some(56),65)))

三十二,解釋Spark join()操作

> join()是轉型。

>它在包org.apache.spark.rdd.pairRDDFunction

def join [W](其他:RDD [(K,W)]):RDD [(K,(V,W))]固定鏈接

返回包含所有元素對的RDD,其中包含匹配鍵和其他元素。

每對元素將作為(k,(v1,v2))元組返回,其中(k,v1)在此,而(k,v2)在其他元素中。在整個群集中執行散列連接。

它正在連接兩個數據集。當調用類型(K,V)和(K,W)的數據集時,返回(K,(V,W))對的數據集以及每個鍵的所有元素對。通過leftOuterJoin,rightOuterJoin和fullOuterJoin支持外連接。

例1:

val rdd1 = sc.parallelize(Seq(("m",55),("m",56),("e",57),("e",58),("s",59),("s",54)))

val rdd2 = sc.parallelize(Seq(("m",60),("m",65),("s",61),("s",62),("h",63),("h",64)))

val joinrdd = rdd1.join(rdd2)

joinrdd.collect

輸出:

Array [(String,(Int,Int))] = Array((m,(54,60)),(m,(54,65)),(m,(56,60)),(m,(56 ,65)),(s,(59,61)),(s,(59,62)),(s,(54,61)),(s,(54,62)))

例2:

val myrdd1 = sc.parallelize(Seq((1,2),(3,4),(3,6)))

val myrdd2 = sc.parallelize(Seq((3,9)))

val myjoinedrdd = myrdd1.join(myrdd2)

myjoinedrdd.collect

輸出:

數組[(Int,(Int,Int))] =數組((3,(4,9)),(3,(6,9)))

三十三,解釋top()和takeOrdered()操作

  • top()和takeOrdered()都是動作。
  • 兩者都返回基於默認排序或基於用戶提供的自定義排序的RDD元素。
  • def top(num: Int)(implicit ord: Ordering[T]): Array[T]
  • 返回此RDD中的前k個(最大)元素,由指定的隱式Ordering [T]定義並維護排序。這與takeOrdered相反。
  • def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
  • 返回此RDD中的第一個k(最小)元素,由指定的隱式Ordering [T]定義並維護排序。這與top相反。
  • 示例:
  • val myrdd1 = sc.parallelize(List(5,7,9,13,51,89))
  • myrdd1.top(3)
  • myrdd1.takeOrdered(3)
  • myrdd1.top(3)
  • 輸出:
  • Array[Int] = Array(89, 51, 13)
  • Array[Int] = Array(5, 7, 9)
  • Array[Int] = Array(89, 51, 13)

三十四,解釋Spark中的first()操作

>這是一個動作。

>它返回RDD的第一個元素。

示例

val rdd1 = sc.textFile("/home/hdadmin/wc-data.txt")

rdd1.count

rdd1.first

輸出:

長:20

字符串:DataFlair是領先的技術培訓提供商

三十五,.解釋Apache Spark中的sum(),max(),min()操作

sum():

>它將RDD中的值相加

>它是一個包org.apache.spark.rdd.DoubleRDDFunctions。

>它的返回類型是Double

例:

val rdd1 = sc.parallelize(1 to 20)

rdd1.sum

輸出:

Double = 210.0

max():

>它從隱式排序(元素順序)定義的RDD元素返回一個最大值

>它是一個包org.apache.spark.rdd

例:

val rdd1 = sc.parallelize(List(1,5,9,0,23,56,99,87))

rdd1.max

輸出:

Int = 99

min():

>它從隱式排序(元素順序)定義的RDD元素返回一個min

>它是一個包org.apache.spark.rdd

例:

val rdd1 = sc.parallelize(List(1,5,9,0,23,56,99,87))

rdd1.min

輸出:

Int = 0

三十六,.解釋Apache Spark RDD中的countByValue()操作

  • 這是一個動作
  • 它返回RDD中每個唯一值的計數作為本地Map(作為Map to driver program)
    (value,countofvalues)
  • 必須小心使用此API,因為它將值返回給驅動程序,因此它僅適用於較小的值。
  • 例:
  • val rdd1 = sc.parallelize(Seq(("HR",5),("RD",4),("ADMIN",5),("SALES",4),("SER",6),("MAN",8)))
  • rdd1.countByValue
  • 輸出:
  • scala.collection.Map [(String,Int),Long] = Map((HR,5) - > 1,(RD,4) - > 1,(SALES,4) - > 1,(ADMIN,5 ) - > 1,(MAN,8) - > 1,(SER,6) - > 1)
  • val rdd2 = sc.parallelize{Seq(10,4,3,3)}
  • rdd2.countByValue
  • 輸出:
  • scala.collection.Map [Int,Long] = Map(4 - > 1,3 - > 2,10 - > 1)

三十七,.解釋Spark中的lookup()操作

>這是一個動作

>它返回RDD中鍵值'key'的值列表

val rdd1 = sc.parallelize(Seq(("Spark",78),("Hive",95),("spark",15),("HBase",25),("spark",39),("BigData",78),("spark",49)))

rdd1.lookup("spark")

rdd1.lookup("Hive")

rdd1.lookup("BigData")

輸出:

Seq [Int] = WrappedArray(15,39,49)

Seq [Int] = WrappedArray(95)

Seq [Int] = WrappedArray(78)

三十八,解釋Spark countByKey()操作

>它是一個動作操作

>返回(key,noofkeycount)對。

它計算RDD的值,該值由每個不同鍵的兩個組件元組組成。它實際上計算每個鍵的元素數,並將結果作為(鍵,計數)對的列表返回給主鍵。

val rdd1 = sc.parallelize(Seq(("Spark",78),("Hive",95),("spark",15),("HBase",25),("spark",39),("BigData",78),("spark",49)))

rdd1.countByKey

輸出:

scala.collection.Map [String,Long] = Map(Hive - > 1,BigData - > 1,HBase - > 1,spark - > 3,Spark - > 1)

三十九,解釋Spark saveAsTextFile()操作

它將RDD的內容寫入文本文件,或使用字符串表示將RDD保存為文件路徑目錄中的文本文件。

四十,解釋reduceByKey()Spark操作

> reduceByKey()是對pairRDD(包含Key / Value)進行轉換的轉換。

> PairRDD包含元組,因此我們需要傳遞元組上的運算符而不是每個元素。

>它使用關聯reduce函數將值與相同的鍵合併。

>它是廣泛的操作,因為數據混洗可能發生在多個分區上。

>它在跨分區發送數據之前在本地合併數據,以優化數據混洗。

>它將函數作為一個輸入,它有兩個相同類型的參數(與同一個鍵相關的值)和一個輸入類型的元素輸出(值)

>我們可以說它有三個重載函數:

reduceBykey(function)

reduceByKey(功能,分配數量)

reduceBykey(partitioner,function)

它使用關聯reduce函數,它合併每個鍵的值。它只能與鍵值對中的Rdd一起使用。它是一種廣泛的操作,可以從多個分區/分區中混洗數據並創建另一個RDD。它使用關聯函數在本地合併數據,以優化數據混洗。組合的結果(例如,和)與值的類型相同,並且當從不同分區組合時的操作也與在分區內組合值時的操作相同。

示例:

val rdd1 = sc.parallelize(Seq(5,10),(5,15),(4,8),(4,12),(5,20),(10,50)))

val rdd2 = rdd1.reduceByKey((x,y)=>x+y)

OR

rdd2.collect()

輸出:

數組[(Int,Int)] =數組((4,20),(10,50),(5,45)

四十一,解釋Spark中的reduce()操作

> reduce()是一個動作。它是寬操作(即跨越多個分區的隨機數據並輸出單個值)

>它將函數作為具有兩個相同類型參數的輸入,並輸出單個輸入類型的值。

>即將RDD的元素組合在一起。

示例1:

val rdd1 = sc.parallelize(1到100)

val rdd2 = rdd1.reduce((x,y)=> x + y)

OR

val rdd2 = rdd1.reduce(_ + _)

輸出:

rdd2:Int = 5050

示例2:

val rdd1 = sc.parallelize(1到5)

val rdd2 = rdd1.reduce(_ * _)

輸出:

rdd2:Int = 120

四十二,在Spark RDD中解釋動作count()

  • count()是Apache Spark RDD操作中的一個操作
  • count()返回RDD中的元素數
  • 示例:
  • val rdd1 = sc.parallelize(List(10,20,30,40))
  • println(rdd1.count())
  • 輸出:
  • 4
  • 它返回RDD中的多個元素或項目。因此,它基本上計算數據集中存在的項目數,並在計數後返回一個數字。

四十三.解釋Spark map()轉換

> map()轉換將函數作為輸入,並將該函數應用於RDD中的每個元素。

>函數的輸出將是每個輸入元素的新元素(值)。

防爆。

val rdd1 = sc.parallelize(List(10,20,30,40))

val rdd2 = rdd1.map(x => x * x)

println(rdd2.collect()。mkString(“,”))

四十四,解釋Apache Spark中的flatMap()轉換

  • 當想要為每個輸入元素生成多個元素(值)時,使用flatMap()。
  • 與map()一樣,flatMap()也將函數作為輸入。
  • 函數的輸出是我們可以迭代的元素的List。(即函數可以為每個輸入元素返回0或更多元素)
  • 簡單地使用flatMap()將輸入行(字符串)拆分為單詞。

val fm1 = sc.parallelize(List("Good Morning", "Data Flair", "Spark Batch"))

val fm2 = fm1.flatMap(y => y.split(" "))

fm2.foreach{println}

輸出如下:

Good

Morning

Data

Flair

Spark

Batch

四十五,Apache Spark有哪些限制?

在,Apache Spark被認為是行業廣泛使用的下一代Gen Big數據工具。但Apache Spark存在一定的侷限性。他們是:

Apache Spark的侷限性:

1.無文件管理系統

Apache Spark依賴於其他平臺,如Hadoop或其他基於雲的平臺文件管理系統。這是Apache Spark的主要問題之一。

2.延遲

使用Apache Spark時,它具有更高的延遲。

3.不支持實時處理

在Spark Streaming中,到達的實時數據流被分成預定義間隔的批次,每批數據被視為Spark Resilient Distributed Database(RDD)。然後使用map,reduce,join等操作處理這些RDD。這些操作的結果是批量返回的。因此,它不是實時處理,但Spark接近實時數據的實時處理。微批處理在Spark Streaming中進行

4.手動優化

手動優化是優化Spark作業所必需的。此外,它適用於特定數據集。如果我們想要在Spark中進行分區和緩存是正確的,我們需要手動控制。

少一點。算法

Spark MLlib在Tanimoto距離等許多可用算法方面落後。

6.窗口標準

Spark不支持基於記錄的窗口標準。它只有基於時間的窗口標準。

7.迭代處理

在Spark中,數據分批迭代,每次迭代都是單獨調度和執行的。

8.

當我們想要經濟高效地處理大數據時,昂貴內存容量可能成為瓶頸,因為在內存中保存數據非常昂貴。此時內存消耗非常高,並且不以用戶友好的方式處理。Spark的成本非常高,因為Apache Spark需要大量的RAM才能在內存中運行。

四十六,什麼是Spark SQL?

Spark SQL是一個Spark接口,用於處理結構化和半結構化數據(定義字段即表格的數據)。它提供了一個名為DataFrame

DataSet的抽象層,我們可以輕鬆處理數據。可以說DataFrame就像關係數據庫中的表。Spark SQL可以以Parquets,JSON,Hive等各種結構化和半結構化格式讀寫數據。在Spark應用程序中使用SparkSQL是使用它的最佳方式。這使我們能夠加載數據並使用SQL進行查詢。我們也可以將它與Python,Java或Scala中的 “常規”程序代碼結合起來。

四十七,解釋Spark SQL緩存和解除

當我們嘗試在另一個用戶使用該表時解凍Spark SQL中的表時會發生什麼?因為我們可以在Spark SQL JDBC服務器中的多個用戶之間使用共享緩存表。

四十八,解釋Spark流媒體

rk Streaming

數據流定義為以無界序列的形式連續到達的數據。為了進一步處理,Streaming將連續流動的輸入數據分離為離散單元。它是一種低延遲處理和分析流數據。

在2013年,Apache Spark Streaming被添加到Apache Spark

。通過Streaming,我們可以對實時數據流進行容錯,可擴展的流處理。從許多來源,如Kafka,Apache Flume,Amazon Kinesis或TCP套接字,可以進行數據攝取。此外,通過使用複雜算法,可以進行處理。用高級函數表示,例如map,reduce,join和window。最後,處理後的數據可以推送到文件系統,數據庫和實時儀表板。

在內部,通過Spark流,接收實時輸入數據流並將其分成批次。然後,這些批次由Spark引擎處理,以批量生成最終結果流。

Discretized Stream或簡稱Spark DStream是它的基本抽象。這也代表了分成小批量的數據流。DStreams構建於Spark的核心數據抽象Spark RDD之上。Streaming可以與Spark MLlibSpark SQL等任何其他Apache Spark組件集成。

四十九,解釋Spark Streaming

Spark Streaming

數據流定義為以無界序列的形式連續到達的數據。為了進一步處理,Streaming將連續流動的輸入數據分離為離散單元。它是一種低延遲處理和分析流數據。

在2013年,Apache Spark Streaming被添加到Apache Spark。通過Streaming,我們可以對實時數據流進行容錯,可擴展的流處理。從許多來源,如Kafka,Apache Flume,Amazon Kinesis或TCP套接字,可以進行數據攝取。此外,通過使用複雜算法,可以進行處理。用高級函數表示,例如map,reduce,join和window。最後,處理後的數據可以推送到文件系統,數據庫和實時儀表板。

在內部,通過Spark流,接收實時輸入數據流並將其分成批次。然後,這些批次由Spark引擎處理,以批量生成最終結果流。

Discretized Stream或簡稱Spark DStream是它的基本抽象。這也代表了分成小批量的數據流。DStreams構建於Spark的核心數據抽象Spark RDD之上。Streaming可以與Spark MLlibSpark SQL等任何其他Apache Spark組件集成。

五十,在Apache Spark Streaming中解釋DStream中的不同轉換

Apache Spark StreamingDStream

中的不同轉換是:

1- map(func) - 通過函數func傳遞源DStream的每個元素來返回一個新的DStream。

2- flatMap(func) - 與map類似,但每個輸入項可以映射到0個或更多輸出項。

3- filter(func) - 通過僅選擇func返回true的源DStream的記錄來返回新的DStream。

4- repartition(numPartitions) - 通過創建更多或更少的分區來更改此DStream中的並行度級別。

5- union(otherStream) - 返回一個新的DStream,它包含源DStream和

otherDStream中元素的並集。

6- 計數() -返回單元素的一個新的DSTREAM RDDS通過計數在源DSTREAM的每個RDD元件的數量。

7- reduce(func) - 通過使用函數func(它接受兩個參數並返回一個)聚合源DStream的每個RDD中的元素,返回單元素RDD的新DStream。

8- countByValue() - 當在類型為K的元素的DStream上調用時,返回(K,Long)對的新DStream,其中每個鍵的值是其在源DStream的每個RDD中的頻率。

9- reduceByKey(func,[numTasks]) - 當在(K,V)對的DStream上調用時,返回一個(K,V)對的新DStream,其中使用給定的reduce函數聚合每個鍵的值。

10- join(otherStream,[numTasks]) - 當在(K,V)和(K,W)對的兩個DStream上調用時,返回一個新的DStream(K,(V,W))對與所有對每個鍵的元素。

11- cogroup(otherStream,[numTasks]) - 當在(K,V)和(K,W)對的DStream上調用時,返回(K,Seq [V],Seq [W])元組的新DStream。

12- transform(func) - 通過將RDD-to-RDD函數應用於源DStream的每個RDD來返回一個新的DStream。

13- updateStateByKey(func) - 返回一個新的“狀態”DStream,其中通過在密鑰的先前狀態和密鑰的新值上應用給定函數來更新每個密鑰的狀態。

希望以上這些多大家有所幫助,能夠幫得到您說明我的努力是沒有白費的,最後,希望大家多多關注下,更多精彩的文章帶給大家!


分享到:


相關文章: