「大數據」(八十三)Spark之Streaming實時流

【導讀:數據是二十一世紀的石油,蘊含巨大價值,這是·情報通·大數據技術系列第[83]篇文章,歡迎閱讀和收藏】

1 基本概念

Spark Streaming 是 Spark 核心 API 的一個擴展,可以實現高吞吐量的、具備容錯機制的實時流數據的處理。支持從多種數據源獲取數據,包括 Kafk 、 Flume 、 Twitter 、 ZeroMQ 、 Kinesis 以及 TCP sockets ,從數據源獲取數據之後,可以使用諸如 map 、 reduce 、 join 和 window 等高級函數進行復雜算法的處理。最後還可以將處理結果存儲到文件系統,數據庫和現場儀表盤。在“ One Stack rule them all ”的基礎上,還可以使用 Spark 的其他子框架,如集群學習、圖計算等,對流數據進行處理。

2 原理介紹

2.1 Spark Streaming 處理的數據流圖

「大數據」(八十三)Spark之Streaming實時流

Spark Streaming 的基本原理是將輸入數據流以時間片(秒級)為單位進行拆分 ,

然後以類似批處理的方式處理每個時間片數據

首先, Spark Streaming 把實時輸入數據流以時間片Δ t (如 1 秒)為單位切分成塊。 Spark Streaming 會把每塊數據作為一個 RDD ,並使用 RDD 操作處理每一小塊數據。每個塊都會生成一個 Spark Job 處理,最終結果也返回多塊。

2.2 SparkStreaming 支持 的業務場景

目前而言 SparkStreaming 主要支持以下三種業務場景 :

1. 無狀態操作:只關注當前的 DStream 中的實時數據,例如 只對當前 DStream 中的數據做正確性校驗

2. 有狀態操作:對有狀態的 DStream 進行操作時 , 需要依賴之前的數據 例如 統計網站各個模塊總的訪問量

3. 窗口操作 : 對指定時間段範圍內的 DStream 數據進行操作,例如 需要統計一天之內網站各個模塊的訪問數量

2.3 SparkStreaming 支持的操作

Discretized Stream 是 Spark Streaming 的基礎抽象,代表持續性的數據流和經過各種 Spark 原語操作後的結果數據流。在內部實現上, DStream 由連續的序列化 RDD 來表示。支持的操作主要包含以下幾種 :

1. Action

當某個 Output Operations 原語被調用時, stream 才會開始真正的計算過程。現階段支持的 Output 方式有以下幾種

print()

foreachRDD(func)

saveAsObjectFiles(prefix, [suffix])

saveAsTextFiles(prefix, [suffix])

saveAsHadoopFiles(prefix, [suffix])

2. 常規 RDD 的 Transformation 操作


對常規 RDD 使用的 transformation 操作,在 DStream 上都適用


3. 有狀態的 Transformation

UpdateStateByKey: 使用該方法主要是使用目前的 DStream 數據來更新歷史數據

4. 窗口的 Transformation
Window Operations 有點類似於 Storm 中的 State ,可以設置窗口的大小和滑動窗口的間隔來動態的獲取當前 Steaming 的允許狀態。

主要支持的操作有:

0. window(windowLength, slideInterval)

1. countByWindow(windowLength, slideInterval)

2. reduceByWindow(func, windowLength, slideInterval)

3. reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

4. reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

5. countByValueAndWindow(windowLength, slideInterval, [numTasks])

2.4 Spark Streaming 優化

監控手段:

一般來說,使用 Spark 自帶的 Web UI 就能滿足大部分的監控需求。對於 Spark Streaming 來說,以下兩個度量指標尤為重要(在 Batch Processing Statistics 標籤下):

Processing Time :處理每個 batch 的時間

Scheduling Delay: 每個 batch 在隊列中等待前一個 batch 完成處理所等待的時間

若 Processing Time 的值一直大於 Scheduling Delay ,或者 Scheduling Delay 的值持續增長,代表系統已經無法處理這樣大的數據輸入量了,這時就需要考慮各種優化方法來增強系統的負載。

優化方式 :

1. 利用集群資源,減少處理每個批次的數據的時間

a. 控制 reduce 數量,太多的 reducer, 造成很多的小任務 , 以此產生很多啟動任務的開銷。太 少的 reducer, 任務執行行慢 !

b. 序列化:包含輸入數據序列化、 RDD 序列化、 TASK 序列化

2. 在 Standalone 及 coarse-grained 模式下的任務啟動要比 fine-grained 省時

3. 給每個批次的數據量的設定一個合適的大小,原則 : 要來得及消化流進系統的數據

4. 內存調優

a. 清理緩存的 RDD

b. 在 spark.cleaner.ttl 之前緩存的 RDD 都會被清除掉

c. 設置 spark.streaming.unpersis, 系統為你分憂

d. 使用併發垃圾收集器


分享到:


相關文章: