有状态流处理:Apache Flink状态后端和如何管理Kafka消费偏移量

有状态流处理:Apache Flink状态后端

Apache Flink的3个状态后端,它们的局限性以及何时根据特定于案例的要求使用它们。


通过有状态流处理,当开发人员启用Flink应用程序的检查点时,状态将持续存在以防止数据丢失并确保在发生故障时完全恢复。为应用程序选择状态后端将影响状态持久化的方式和位置。
了解管理Apache Flink状态和开发有状态流应用程序的实际示例。
Apache Flink附带三个可用的状态后端:MemoryStateBackend,FsStateBackend和RocksDBStateBackend。


有状态流处理:Apache Flink状态后端和如何管理Kafka消费偏移量

该MemoryStateBackend

MemoryStateBackend是一个内部状态后端,用于维护Java堆上的状态。键/值状态和窗口运算符包含存储值和计时器的哈希表。
当您的应用程序检查点时,此后端将在您将状态发送到Apache Flink的作业管理器之前拍摄您的状态的快照,该作业管理器也将其存储在Java堆上。
默认情况下,MemoryStateBackend配置为支持异步快照。异步快照可避免可能导致流应用程序背压的潜在阻塞管道。

使用MemoryStateBackend时需要注意什么:

  • 默认情况下,每个单独的状态的大小默认为5 MB。您可以在MemoryStateBackend构造函数中进一步增加大小。
  • 状态大小受akka帧大小的限制,无论您在配置中设置为最大状态大小,都不能大于akka帧大小(您可以在配置中找到更多信息)。
  • 聚合状态必须适合JobManager内存。

何时使用MemoryStateBackend:

  • 建议使用MemoryStateBackend进行本地开发或调试,因为它的状态有限
  • MemoryStateBackend最适合具有小状态大小的用例和有状态流处理应用程序,例如仅包含一次记录功能(Map,FlatMap或Filter)的作业或使用Kafka 使用者。


该FsStateBackend

FsStateBackend配置使用文件系统完成,例如URL(类型,地址,路径)。一些示例文件系统可能是:

  • “hdfs://namenode:40010/flink/checkpoints” 或
  • “s3://flink/checkpoints”.

当您选择FsStateBackend时,正在进行的数据保存在任务管理器的内存中。在检查点上,此后端将状态快照写入配置的文件系统和目录中的文件,同时它将在JobManager的内存或Zookeeper中存储最少的元数据(对于高可用性情况)。
默认情况下,FsStateBackend配置为提供异步快照,以避免在写入状态检查点时阻塞处理管道。可以通过将构造函数中相应的布尔标志设置为false来实例化FsStateBackend来禁用该功能,例如:

1

new FsStateBackend(path, false);

何时使用FsStateBackend:

  • FsStateBackend最适合处理大状态,长窗口或大键/值状态的Apache Flink有状态流处理作业。
  • FsStateBackend最适合每个高可用性设置。

该RocksDBStateBackend

使用文件系统(类型,地址,路径)执行RocksDBStateBackend的配置,如下例所示:

  • “hdfs://namenode:40010/flink/checkpoints”或
  • “s3://flink/checkpoints”

RocksDBStateBackend使用RocksDB数据库在本地磁盘上保存飞行中的数据。在检查点上,整个RocksDB数据库将被检查点到配置的文件系统中,或者在非常大的状态作业的情况下增量差异。同时,Apache Flink将一些最小的元数据存储在JobManager的内存或Zookeeper中(对于高可用性情况)。RocksDB默认配置为执行异步快照。

使用RocksDBStateBackend时需要注意什么:

  • RocksDB的每个密钥和每个值的最大支持大小为每个2 ^ 31个字节。这是因为RocksDB的JNI桥API基于byte []。
  • 我们需要在此强调,对于使用具有合并操作的状态(例如ListState)的有状态流处理应用程序,可以累积超过2 ^ 31字节超时的值大小,这将导致它们在任何后续检索时失败。

何时使用RocksDBStateBackend:

  • RocksDBStateBackend最适合处理大状态,长窗口或大键/值状态的 Apache Flink有状态流处理作业。
  • RocksDBStateBackend最适合每个高可用性设置。
  • RocksDBStateBackend是目前唯一可用于支持有状态流处理应用程序的增量检查点的状态后端。

使用RocksDB时,状态大小仅受可用磁盘空间量的限制,这使RocksDBStateBackend成为管理超大状态的绝佳选择。使用RocksDB时的权衡是所有状态访问和检索都需要序列化(或反序列化)才能跨越JNI边界。与上面提到的堆上后端相比,这可能会影响应用程序的吞吐量。
不同的状态后端服务于多个开发人员要求,应在开始开发应用程序之前仔细考虑和进行广泛规划后选择。这可确保选择正确的状态后端以最好地满足应用程序和业务需求。

Apache Flink如何管理Kafka消费偏移量


在我们《Flink Friday Tip》的这一集中,我们将逐步说明Apache Flink如何与Apache Kafka协同工作,以确保Kafka主题的记录以一次性保证进行处理。

检查点是Apache Flink的内部机制,可以从故障中恢复。检查点是Flink应用程序状态的一致副本,包括输入的读取位置。如果发生故障,Flink将通过从检查点加载应用程序状态并从恢复的读取位置继续恢复应用程序,就像没有发生任何事情一样。您可以将检查点视为保存计算机游戏的当前状态。如果你在游戏中保存了自己的位置后发生了什么事情,你可以随时回过头再试一次。

检查点使Apache Flink具有容错能力,并确保在发生故障时保留流应用程序的语义。应用程序可以定期触发检查点。

Apache Flink中的Kafka消费者将Flink的检查点机制与有状态运算符集成在一起,其状态是所有Kafka分区中的读取偏移量。触发检查点时,每个分区的偏移量都存储在检查点中。Flink的检查点机制确保所有操作员任务的存储状态是一致的,即它们基于相同的输入数据。当所有操作员任务成功存储其状态时,检查点完成。因此,当从潜在的系统故障重新启动时,系统提供一次性状态更新保证。

下面我们将介绍Apache Flink如何在逐步指南中检查Kafka消费者抵消。在我们的示例中,数据存储在Flink的Job Master中。值得注意的是,在POC或生产用例下,数据通常存储在外部文件存储器(如HDFS或S3)中。

步骤1:

下面的示例从Kafka主题中读取两个分区,每个分区包含“A”,“B”,“C”,“D”,“E”作为消息。我们将两个分区的偏移量设置为零。

有状态流处理:Apache Flink状态后端和如何管理Kafka消费偏移量

第2步:

在第二步中,Kafka消费者开始从分区0读取消息。消息“A”在“飞行中”处理,第一个消费者的偏移量变为1。


有状态流处理:Apache Flink状态后端和如何管理Kafka消费偏移量


第3步:

在第三步中,消息“A”到达Flink Map Task。两个消费者都读取他们的下一个记录(分区0的消息“B”和分区1的消息“A”)。两个分区的偏移量分别更新为2和1。与此同时,Flink的Job Master决定在源头触发检查点。

有状态流处理:Apache Flink状态后端和如何管理Kafka消费偏移量

第4步:

在接下来的步骤中,Kafka使用者任务已经创建了状态的快照(“offset = 2,1”),现在存储在Apache Flink的Job Master中。源分别在来自分区0和1的消息“B”和“A”之后发出检查点屏障。检查点障碍用于对齐所有操作员任务的检查点,并保证整个检查点的一致性。消息“A”到达Flink Map Task,而顶级消费者继续读取其下一条记录(消息“C”)。

有状态流处理:Apache Flink状态后端和如何管理Kafka消费偏移量

第5步:

此步骤显示Flink Map Task从两个源和检查点接收检查点障碍,其状态为Job Master。与此同时,消费者继续从Kafka分区阅读更多活动。


有状态流处理:Apache Flink状态后端和如何管理Kafka消费偏移量


第6步:

此步骤显示Flink Map Task在检查其状态后与Flink Job Master进行通信。当作业的所有任务确认其状态为检查点时,作业主管完成检查点。从现在开始,检查点可用于从故障中恢复。值得一提的是,Apache Flink不依赖于Kafka偏移来恢复潜在的系统故障。


有状态流处理:Apache Flink状态后端和如何管理Kafka消费偏移量


在发生故障时恢复

如果发生故障(例如,工作人员故障),则重新启动所有操作员任务,并将其状态重置为上次完成的检查点。如下图所示。


有状态流处理:Apache Flink状态后端和如何管理Kafka消费偏移量


Kafka源分别从偏移量2和1开始,因为这是完成的检查点的偏移量。当作业重新启动时,我们可以期待正常的系统操作,就好像之前没有发生故障一样。

你可以找到关于如何使用的详细信息和常见问题最好的Apache Flink 和Apache Kafka 在我们以前的博客文章的一个位置。


分享到:


相關文章: