概述
spark stream是对spark core api的扩展;对于spark core不太了解的请阅读: 。所以本质上是通过批处理来模拟流处理。
spark stream的流数据源可以来自Kafka, Flume, Kinesis, 或 TCP sockets甚至是文件。
对于流数据可以做很多复杂的处理(只有想不到,没有做不到的),如map操作、reduce操作、join操作,甚至是在线训练机器学习模型等等。
最终处理完之后的数据可以写入各种文件系统,如HDFS、数据库等等。
内部机制
spark stream模块接收流数据,并按照时间维度将其分割成一段段的小量的批数据,然后通过spark core引擎来处理。
对外提供的接口本质上是对离散小批量数据(discretized stream or DStream)的处理来模拟的流数据。
基本概念
jar依赖
开发spark stream流处理程序,需要添加如下依赖:
<dependency>
<groupid>org.apache.spark/<groupid>
<artifactid>spark-streaming_2.11/<artifactid>
<version>2.4.0/<version>
/<dependency>
对于外部数据源是kafka、Flume、Flume的,需要额外添加如下依赖:
初始化上下文
val spark = SparkSession.builder()
.appName("wordCount")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
关键参数:Seconds(2) 是流数据的窗口长度;
接入数据源
val lines = ssc.textFileStream("/home/panteng/桌面/stream")//文件
val lines = ssc.socketTextStream("localhost", 9999)//TcpSocket
定义计算流程
/**
* DEMO:每隔6秒钟统计最近30秒的数据,每隔1分钟存储一次
* 应用场景:每天更新用户最近30天的行为数据
*
* @param ssc StreamingContext
*/
def socketStreamWindow(ssc: StreamingContext): Unit = {
val lines = ssc.socketTextStream("localhost", 9999)
val statistics = lines.flatMap(_.split(" "))
.map(w => (w, 1))
.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(6))
statistics.print()
statistics.window(Minutes(1), Minutes(1))
.repartition(1).saveAsTextFiles("/home/xxx/IdeaProjects/hadoop-ecological/output/time")
ssc.start()
ssc.awaitTermination()
}
启动
ssc.start()
一旦启动之后,就不能在定义新的处理流程。
上下文被关闭,则不能够重新启动
一个JVM虚拟机中只能有一个StreamingContext处于活跃(active)状态
默认情况下关闭StreamingContext,也会关闭sparkContext;可以通过参数设置只关闭前者。
核心概念(Dstream)
Dstream是由一系列连续的RDD构成的有序集合。每一个RDD代表一段固定长度时间间隔内的数据。
对Dtream的操作实际上是对一系列的RDD操作。
接收者(Receivers)
receiver的作用是从流数据源接受数据,并存储曹spark应用的内存中,等待后期处理。
一般来讲,在一个worker/excutor中,一个数据流对应一个receiver,如果一个worker有多个数据流,那么需要对应多个receiver,这是必须保证worker的core数量大于receiver的数量,否则只能接受数据,但不能处理。
Transform操作
transform是将一个RDD转成另一个RDD的操作,接受一个rdd->rdd的函数,并且可以引用外部的rdd。好神奇的一个操作,
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}
窗口操作
两个关键参数窗口长度和移动长度。并且提供了与窗口有关的一系列曹组。如:reduceByWindow、reduceByKeyAndWindow等。
閱讀更多 IT技術百貨 的文章