Spark Streaming 和 Flink 谁是数据开发者的最爱?

ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd)

}

第一行就是计算得到该批次生成 KafkaRDD 每个分区要消费的最大 offset。 接着看 latestLeaderOffsets(maxRetries)。

@tailrec protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {// 可以看到的是用来指定获取最大偏移分区的列表还是只有currentOffsets,没有发现关于新增的分区的内容。

val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec, do it manually

if (o.isLeft) { val err = o.left.get.toString if (retries <= 0) { throw new SparkException(err)

} else {

logError(err) Thread.sleep(kc.config.refreshLeaderBackoffMs)

latestLeaderOffsets(retries - 1)

}

} else {

o.right.get

}

}

其中 protected var currentOffsets = fromOffsets,这个仅仅是在构建 DirectKafkaInputDStream 的时候初始化,并在 compute 里面更新:

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)

中间没有检测 Kafka 新增 topic 或者分区的代码,所以可以确认 Spark Streaming 与 kafka 0.8 的版本结合不支持动态分区检测。

Spark Streaming 与 Kafka 0.10 版本结合

入口同样是 DirectKafkaInputDStream 的 compute 方法,捡主要的部分说,Compute 里第一行也是计算当前 job 生成 kafkardd 要消费的每个分区的最大 offset:

// 获取当前生成job,要用到的KafkaRDD每个分区最大消费偏移值

val untilOffsets = clamp(latestOffsets())

具体检测 Kafka 新增 topic 或者分区的代码在 latestOffsets()

/**

* Returns the latest (highest) available offsets, taking new partitions into account. */

protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer

paranoidPoll(c) // 获取所有的分区信息

val parts = c.assignment().asScala // make sure new partitions are reflected in currentOffsets

// 做差获取新增的分区信息

val newPartitions = parts.diff(currentOffsets.keySet) // position for new partitions determined by auto.offset.reset if no commit

// 新分区消费位置,没有记录的化是由auto.offset.reset决定

currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap // don't want to consume messages, so pause

c.pause(newPartitions.asJava) // find latest available offsets

c.seekToEnd(currentOffsets.keySet.asJava)

parts.map(tp => tp -> c.position(tp)).toMap

}

该方法内有获取 Kafka 新增分区,并将其更新到 currentOffsets 的过程,所以可以验证 Spark Streaming 与 Kafka 0.10 版本结合支持动态分区检测。

Flink

入口类是 FlinkKafkaConsumerBase,该类是所有 Flink 的 Kafka 消费者的父类。

Spark Streaming 和 Flink 谁是数据开发者的最爱?

(图 10)

在 FlinkKafkaConsumerBase 的 run 方法中,创建了 kafkaFetcher,实际上就是消费者:

this.kafkaFetcher = createFetcher(

sourceContext,

subscribedPartitionsToStartOffsets,

periodicWatermarkAssigner,

punctuatedWatermarkAssigner,

(StreamingRuntimeContext) getRuntimeContext(),

offsetCommitMode,

getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),

useMetrics);

接是创建了一个线程,该线程会定期检测 Kafka 新增分区,然后将其添加到 kafkaFetcher 里。

if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) { final AtomicReference discoveryLoopErrorRef = new AtomicReference<>(); this.discoveryLoopThread = new Thread(new Runnable() { @Override

public void run() { try { // --------------------- partition discovery loop ---------------------

List discoveredPartitions; // throughout the loop, we always eagerly check if we are still running before

// performing the next operation, so that we can escape the loop as soon as possible

while (running) { if (LOG.isDebugEnabled()) { LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());

} try {

discoveredPartitions = partitionDiscoverer.discoverPartitions();

} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { // the partition discoverer may have been closed or woken up before or during the discovery;

// this would only happen if the consumer was canceled; simply escape the loop

break;

} // no need to add the discovered partitions if we were closed during the meantime

if (running && !discoveredPartitions.isEmpty()) {

kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);

} // do not waste any time sleeping if we're not running anymore

if (running && discoveryIntervalMillis != 0) { try { Thread.sleep(discoveryIntervalMillis);

} catch (InterruptedException iex) { // may be interrupted if the consumer was canceled midway; simply escape the loop

break;

}

}

}

} catch (Exception e) {

discoveryLoopErrorRef.set(e);

} finally { // calling cancel will also let the fetcher loop escape

// (if not running, cancel() was already called)

if (running) {

cancel();

}

}

}

}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());

discoveryLoopThread.start();

kafkaFetcher.runFetchLoop();

上面,就是 Flink 动态发现 Kafka 新增分区的过程。不过与 Spark 无需做任何配置不同的是,Flink 动态发现 Kafka 新增分区,这个功能需要被使能的。

也很简单,需要将 flink.partition-discovery.interval-millis 该属性设置为大于 0 即可。

Spark Streaming 和 Flink 谁是数据开发者的最爱?

容错机制及处理语义

本节内容主要是想对比两者在故障恢复及如何保证仅一次的处理语义。这个时候适合抛出一个问题:实时处理的时候,如何保证数据仅一次处理语义?

Spark Streaming 保证仅一次处理

对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。

对于 Spark Streaming 与 Kafka 结合的 direct Stream 可以自己维护 offset 到 Zookeeper、Kafka 或任何其它外部系统,每次提交完结果之后再提交 offset,这样故障恢复重启可以利用上次提交的 offset 恢复,保证数据不丢失。

但是假如故障发生在提交结果之后、提交 offset 之前会导致数据多次处理,这个时候我们需要保证处理结果多次输出不影响正常的业务。

由此可以分析,假设要保证数据恰一次处理语义,那么结果输出和 offset 提交必须在一个事务内完成。在这里有以下两种做法:

  • repartition(1) Spark Streaming 输出的 action 变成仅一个 partition,这样可以利用事务去做:

Dstream.foreachRDD(rdd=>{

rdd.repartition(1).foreachPartition(partition=>{ // 开启事务

partition.foreach(each=>{// 提交数据

}) // 提交事务

})

})

  • 将结果和 offset 一起提交。

也就是结果数据包含 offset。这样提交结果和提交 offset 就是一个操作完成,不会数据丢失,也不会重复处理。故障恢复的时候可以利用上次提交结果带的 offset。

Flink 与 kafka 0.11 保证仅一次处理

若要 sink 支持仅一次语义,必须以事务的方式写数据到 Kafka,这样当提交事务时两次 checkpoint 间的所有写入操作作为一个事务被提交。这确保了出现故障或崩溃时这些写入操作能够被回滚。

在一个分布式且含有多个并发执行 sink 的应用中,仅仅执行单次提交或回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得到一致性的结果。

Flink 使用两阶段提交协议以及预提交(pre-commit)阶段来解决这个问题。

本例中的 Flink 应用如图 11 所示包含以下组件:

  • 一个source,从Kafka中读取数据(即KafkaConsumer);
  • 一个时间窗口化的聚会操作;
  • 一个sink,将结果写回到Kafka(即KafkaProducer)。
Spark Streaming 和 Flink 谁是数据开发者的最爱?

(图 11)

下面详细讲解 Flink 的两段提交思路:

Spark Streaming 和 Flink 谁是数据开发者的最爱?

(图 12)

如图 12 所示,Flink checkpointing 开始时便进入到 pre-commit 阶段。

具体来说,一旦 checkpoint 开始,Flink 的 JobManager 向输入流中写入一个 checkpoint barrier ,将流中所有消息分割成属于本次 checkpoint 的消息以及属于下次 checkpoint 的,barrier 也会在操作算子间流转。

对于每个 operator 来说,该 barrier 会触发 operator 状态后端为该 operator 状态打快照。

data source 保存了 Kafka 的 offset,之后把 checkpoint barrier 传递到后续的 operator。

这种方式仅适用于 operator 仅有它的内部状态。内部状态是指 Flink state backends 保存和管理的内容(如第二个 operator 中 window 聚合算出来的 sum)。

当一个进程仅有它的内部状态的时候,除了在 checkpoint 之前将需要将数据更改写入到 state backend,不需要在预提交阶段做其他的动作。

在 checkpoint 成功的时候,Flink 会正确的提交这些写入,在 checkpoint 失败的时候会终止提交,过程可见图 13。

Spark Streaming 和 Flink 谁是数据开发者的最爱?

(图 13)

当结合外部系统的时候,外部系统必须要支持可与两阶段提交协议捆绑使用的事务。

显然本例中的 sink 由于引入了 kafka sink,因此在预提交阶段 data sink 必须预提交外部事务。如下图:

Spark Streaming 和 Flink 谁是数据开发者的最爱?

(图 14)

当 barrier 在所有的算子中传递一遍,并且触发的快照写入完成,预提交阶段完成。

所有的触发状态快照都被视为 checkpoint 的一部分,也可以说 checkpoint 是整个应用程序的状态快照,包括预提交外部状态。出现故障可以从 checkpoint 恢复。

下一步就是通知所有的操作算子 checkpoint 成功。该阶段 jobmanager 会为每个 operator 发起 checkpoint 已完成的回调逻辑。

本例中 data source 和窗口操作无外部状态,因此该阶段,这两个算子无需执行任何逻辑,但是 data sink 是有外部状态的,因此,此时我们必须提交外部事务,如下图:

Spark Streaming 和 Flink 谁是数据开发者的最爱?

(图 15)

以上就是 Flink 实现恰一次处理的基本逻辑。

Spark Streaming 和 Flink 谁是数据开发者的最爱?

Back pressure

消费者消费的速度低于生产者生产的速度,为了使应用正常,消费者会反馈给生产者来调节生产者生产的速度,以使得消费者需要多少,生产者生产多少。(*back pressure 后面一律称为背压。)

Spark Streaming 的背压

Spark Streaming 跟 Kafka 结合是存在背压机制的,目标是根据当前 job 的处理情况来调节后续批次的获取 Kafka 消息的条数。

为了达到这个目的,Spark Streaming 在原有的架构上加入了一个 RateController,利用的算法是 PID,需要的反馈数据是任务处理的结束时间、调度时间、处理时间、消息条数。

这些数据是通过 SparkListener 体系获得,然后通过 PIDRateEsimator 的 compute 计算得到一个速率,进而可以计算得到一个 offset,然后跟限速设置最大消费条数比较得到一个最终要消费的消息最大 offset。

PIDRateEsimator 的 compute 方法如下:

def compute( time: Long, // in milliseconds

numElements: Long, processingDelay: Long, // in milliseconds

schedulingDelay: Long // in milliseconds

): Option[Double] = {

logTrace(s"\ntime = $time, # records = $numElements, " +

s"processing time = $processingDelay, scheduling delay = $schedulingDelay") this.synchronized { if (time > latestTime && numElements > 0 && processingDelay > 0) { val delaySinceUpdate = (time - latestTime).toDouble / 1000

val processingRate = numElements.toDouble / processingDelay * 1000

val error = latestRate - processingRate val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2)

val dError = (error - latestError) / delaySinceUpdate val newRate = (latestRate - proportional * error -

integral * historicalError -

derivative * dError).max(minRate)

logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin)

latestTime = time if (firstRun) {

latestRate = processingRate

latestError = 0D

firstRun = false

logTrace("First run, rate estimation skipped") None

} else {

latestRate = newRate

latestError = error

logTrace(s"New rate = $newRate") Some(newRate)

}

} else {

logTrace("Rate estimation skipped") None

}

}

}

Flink 的背压

与 Spark Streaming 的背压不同的是,Flink 背压是 jobmanager 针对每一个 task 每 50ms 触发 100 次 Thread.getStackTrace() 调用,求出阻塞的占比。过程如图 16 所示:

Spark Streaming 和 Flink 谁是数据开发者的最爱?

(图 16)

阻塞占比在 web 上划分了三个等级:

  • OK: 0 <= Ratio <= 0.10,表示状态良好;
  • LOW: 0.10 < Ratio <= 0.5,表示有待观察;
  • HIGH: 0.5 < Ratio <= 1,表示要处理了。

“征稿啦!”

CSDN 公众号秉持着「与千万技术人共成长」理念,不仅以「极客头条」、「畅言」栏目在第一时间以技术人的独特视角描述技术人关心的行业焦点事件,更有「技术头条」专栏,深度解读行业内的热门技术与场景应用,让所有的开发者紧跟技术潮流,保持警醒的技术嗅觉,对行业趋势、技术有更为全面的认知。

如果你有优质的文章,或是行业热点事件、技术趋势的真知灼见,或是深度的应用实践、场景方案等的新见解,欢迎联系 CSDN 投稿,联系方式:微信(guorui_1118,请备注投稿+姓名+公司职位),邮箱([email protected])。


分享到:


相關文章: