spark系列:Spark Streaming官方文档译文

概述

spark stream是对spark core api的扩展;对于spark core不太了解的请阅读: 。所以本质上是通过批处理来模拟流处理。

spark stream的流数据源可以来自Kafka, Flume, Kinesis, 或 TCP sockets甚至是文件。

spark系列:Spark Streaming官方文档译文

spark stream

对于流数据可以做很多复杂的处理(只有想不到,没有做不到的),如map操作、reduce操作、join操作,甚至是在线训练机器学习模型等等。

最终处理完之后的数据可以写入各种文件系统,如HDFS、数据库等等。

内部机制

spark stream模块接收流数据,并按照时间维度将其分割成一段段的小量的批数据,然后通过spark core引擎来处理。

spark系列:Spark Streaming官方文档译文

批处理模拟流处理

对外提供的接口本质上是对离散小批量数据(discretized stream or DStream)的处理来模拟的流数据。

spark系列:Spark Streaming官方文档译文

基本概念

jar依赖

开发spark stream流处理程序,需要添加如下依赖:

<dependency>
<groupid>org.apache.spark/<groupid>
<artifactid>spark-streaming_2.11/<artifactid>
<version>2.4.0/<version>
/<dependency>

对于外部数据源是kafka、Flume、Flume的,需要额外添加如下依赖:

spark系列:Spark Streaming官方文档译文

jar 依赖

初始化上下文

val spark = SparkSession.builder()
.appName("wordCount")

.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val ssc = new StreamingContext(spark.sparkContext, Seconds(2))

关键参数:Seconds(2) 是流数据的窗口长度;

接入数据源

val lines = ssc.textFileStream("/home/panteng/桌面/stream")//文件
val lines = ssc.socketTextStream("localhost", 9999)//TcpSocket

定义计算流程

/**
* DEMO:每隔6秒钟统计最近30秒的数据,每隔1分钟存储一次
* 应用场景:每天更新用户最近30天的行为数据
*
* @param ssc StreamingContext
*/
def socketStreamWindow(ssc: StreamingContext): Unit = {
val lines = ssc.socketTextStream("localhost", 9999)
val statistics = lines.flatMap(_.split(" "))
.map(w => (w, 1))
.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(6))
statistics.print()
statistics.window(Minutes(1), Minutes(1))
.repartition(1).saveAsTextFiles("/home/xxx/IdeaProjects/hadoop-ecological/output/time")
ssc.start()
ssc.awaitTermination()
}

启动

ssc.start()

一旦启动之后,就不能在定义新的处理流程。

上下文被关闭,则不能够重新启动

一个JVM虚拟机中只能有一个StreamingContext处于活跃(active)状态

默认情况下关闭StreamingContext,也会关闭sparkContext;可以通过参数设置只关闭前者。

核心概念(Dstream)

Dstream是由一系列连续的RDD构成的有序集合。每一个RDD代表一段固定长度时间间隔内的数据。

spark系列:Spark Streaming官方文档译文

Dtream

对Dtream的操作实际上是对一系列的RDD操作。

spark系列:Spark Streaming官方文档译文

接收者(Receivers)

receiver的作用是从流数据源接受数据,并存储曹spark应用的内存中,等待后期处理。

一般来讲,在一个worker/excutor中,一个数据流对应一个receiver,如果一个worker有多个数据流,那么需要对应多个receiver,这是必须保证worker的core数量大于receiver的数量,否则只能接受数据,但不能处理。

Transform操作

transform是将一个RDD转成另一个RDD的操作,接受一个rdd->rdd的函数,并且可以引用外部的rdd。好神奇的一个操作,

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}

窗口操作

两个关键参数窗口长度和移动长度。并且提供了与窗口有关的一系列曹组。如:reduceByWindow、reduceByKeyAndWindow等。

spark系列:Spark Streaming官方文档译文

Window Operations


分享到:


相關文章: