kafka原理及应用

kafka原理及应用

producer:消息生产者,发布消息到kafka集群的终端或服务

broker:kafka集群中包含的服务器。

topic:每条发布到kafaka集群的消息属于的类别,即kafka是面向topic的。

partition:partition是物理上的概念,每个topic包含一个或多个partition。kafka分配的单位是partition

consumer:从kafka集群中消费消息的终端或服务器

consumer group:high-level consumer api中,每个consumer都属于一个consumer group,每条消息只能被consumer group中的一个Consumer消费,但可以被多个consumer group消费

replica:partition的副本,保证partition的高可用。

leader:replica中的一个角色,producer和consumer只跟leader交互

follower:replica中的一个角色,从leader中复制数据

controller:kafka集群中的其中一个服务器,用来进行leader election以及各种failover.

zookeeper:kafka通过zookeeper来存储集群的meta信息。

kafka原理及应用

producer发布消息

producer采用push模式将消息发布到broker,每条消息都被append到patition中,属于顺序写磁盘(顺序写磁盘效率比随机写内存药膏,保证kafka吞吐率)

消息路由

producer发送消息到broker时,会根据分区算法选择将其存储到哪一个partition,其路由机制为:

指定了partition,则直接使用;未指定partition但指定key,通过key的value进行hash选出一个partition;partition和key都为指定,使用轮询选出一个partition.

producer先从zookeeper的“brokers/.../state”节点找到该partition的leader

producer将消息发送给该leader

leader将消息写入本地log

followers从leader pull消息,写入本地log后leader发送 ACK

leader收到所有ISR中的replica的ACK后,增加HW(high watermark最后commit的offset)并向producer发送ACK

producer delivery guarantee

At most once 消息可能丢失,但绝不会重复传输

At least one 消息绝不会丢,但可能会重复传输

Exactly once 每条消息肯定会被传递一次且仅传输一次

当producer向broker发送消息时,一旦这条消息被commit,由于replication的存在,它就不会丢,但是如果producer发送数据给broker后,遇到网络问题造成通信中断,那producer就无法判断该条消息是否已经commit.虽然kafka无法确定网络故障功能期间发生了什么,但是producer可以生成一种类似于注解的东西,发生故障时幂等性的重试多次,这样就做到Exactly once。但目前还并未实现,所以目前默认情况下一条消息从producer到broker是确保了At least once,可通过设置producer的一部发送实现At most once.

broker保存消息

物理上把topic分成一个或多个partition(对于server.properties中的num.partiton=3配置),每个partition物理上对于一个文件夹(该文件夹存储该partition的所有消息和索引文件)

物理消息是否被消费,kafka都会保留所有消息,有两种策略可以删除旧数据:1.基于时间:log.retention.hours=168;2.基于大小:log.retention.bytes=1073741824

需要注意的是,因为kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以删除过期文件与提高kafka性能无关。

topic创建于删除

1.controller在zookeeper的/brokers/topics节点上注册watcher,当topic被创建,则controller会通过watch得到该topic的partition的partition/replica分配。

2.controller从/brokers/ids读取当前所有可用的broker列表,对于 set_p 中的每一个 partition:1.从分配给该partition的所有replica(AR)中选取一个可用的broker作为新的leader,并将AR设置为新的ISR;2.将新的leader和ISR写入/brokers/topics/[topic]/partitions/[partition]/state

3.controller通过RPC向相关的broker发送LeaderAndISRRequest.

删除topic

1.controller在zookeeper的/brokers/topics节点注册watcher,当topic被删除,则controller会通过watch得到该topic的partition/replica分配。

2.若delete.topic.enable=false,结束;否则controller注册在/admin/delete_topics上的watch被fire,controller通过回调向对于的broker发送StopReplicaRequest.

kafka HA

replication

同一个partition可能会有多个replica(对应server.properties中的default.replication.factor=N),没有replica的情况下,一旦broker单机,其上所有partition的数据都可能不可被消费,同时producer也不能再将数据存于其上的partition,引入replication后,同一个partition可能会有多个replica,这是需要在这些replica之间选出一个leader,producer和consumer只与这个leader交互,其他replica作为follower从leader中复制数据。

Kafka 分配 Replica 的算法如下:

1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序

2. 将第 i 个 partition 分配到第(i mod n)个 broker 上

3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

leader failover

当partition对应的leader宕机时,需要从follower中选举出新的leader,在选举新leader时,一个基本原则是,新的leader必须拥有旧的leader commit过的所有信息。

kafka在zookeeper中/brokers/..../state动态维护了一个ISR(in-sync replicas),ISR里面的所有replica都跟上了leader,只有ISR里面的成员才能选为leader,对于f+1个replica,一个partition可以在容忍f个replica失效的情况下保证消息不丢失。

当所有 replica 都不工作时,有两种可行的方案:

1.等待ISR中的任一个replica活过来,并选它作为leader,可保证数据不丢失,但时间可能相对较长。

2.选择第一个活过来的replica(不一定是ISR成员)作为leader,无法保证数据不丢失,但相对不可用时间短。

broker failover

1.controller在zookeeper的/brokers/ids/[brokerId]节点注册watcher,当broker宕机时zookeeper会fire watch

2.controller从/brokers/ids节点读取可用broker

3.controller决定set_p,该集合包含宕机broker上的所有partition

4.对set_p中的每个partition:从/brokers/topics/[topic]/partitions/[partition]/state节点读取ISR;决定新leader;将新leader,ISR,controller_epoch和leader_epoch等信息写入stae节点

5.通过RPC向相关broker发送leaderAndISRRequest命令

controller failover

当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。

当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作:

1. 读取并增加 Controller Epoch。

2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。

3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。

4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。

5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。

6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。

7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。

8. 启动 replicaStateMachine 和 partitionStateMachine。

9. 将 brokerState 状态设置为 RunningAsController。

10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。

11. 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。

12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

consumer 消费消息

consumer API

kafka 提供了两套 consumer API:

1. The high-level Consumer API

2. The SimpleConsumer API

其中 high-level consumer API 提供了一个从 kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多地关注细节。

The high-level consumer API

high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。

使用 high-level consumer API 可以是多线程的应用,应当注意:

1. 如果消费线程大于 patition 数量,则有些线程将收不到消息

2. 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息

3. 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的

The SimpleConsumer API

如果你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,比如:

1. 多次读取一个消息

2. 只消费一个 patition 中的部分消息

3. 使用事务来保证一个消息仅被消费一次

但是使用此 API 时,partition、offset、broker、leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:

1. 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息

2. 应用程序需要通过程序获知每个 Partition 的 leader 是谁

3. 需要处理 leader 的变更

使用 SimpleConsumer API 的一般流程如下:

1. 查找到一个“活着”的 broker,并且找出每个 partition 的 leader

2. 找出每个 partition 的 follower

3. 定义好请求,该请求应该能描述应用程序需要哪些数据

4. fetch 数据

5. 识别 leader 的变化,并对之作出必要的响应

consumer group

如 2.2 节所说, kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。

kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 consumer group。如下图所示:

kafka原理及应用

消费方式

consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

consumer delivery guarantee

如果将 consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit。如果只讨论这一读取消息的过程,那 Kafka 确保了 Exactly once。

但实际使用中应用程序并非在 consumer 读取完数据就结束了,而是要进行进一步处理,而数据处理与 commit 的顺序在很大程度上决定了consumer delivery guarantee:

1.读完消息先 commit 再处理消息。

这种模式下,如果 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once

2.读完消息先处理再 commit。

这种模式下,如果在处理完消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。

3.如果一定要做到 Exactly once,就需要协调 offset 和实际操作的输出。

精典的做法是引入两阶段提交。如果能让 offset 和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,无法存于HDFS,而SimpleConsuemr API的 offset 是由自己去维护的,可以将之存于 HDFS 中)

总之,Kafka 默认保证 At least once,并且允许通过设置 producer 异步提交来实现 At most once(见文章《

kafka consumer防止数据丢失》)。而 Exactly once 要求与外部存储系统协作,幸运的是 kafka 提供的 offset 可以非常直接非常容易得使用这种方式。

更多关于 kafka 传输语义的信息请参考《Message Delivery Semantics》。

consumer rebalance

当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。consumer rebalance算法如下:

1. 将目标 topic 下的所有 partirtion 排序,存于PT

2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci

3. N=size(PT)/size(CG),向上取整

4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始)

5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci

在 0.8.*版本,每个 consumer 都只负责调整自己所消费的 partition,为了保证整个consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance。这会导致以下几个问题:

1.Herd effect

  任何 broker 或者 consumer 的增减都会触发所有的 consumer 的 rebalance

2.Split Brain

  每个 consumer 分别单独通过 zookeeper 判断哪些 broker 和 consumer 宕机了,那么不同 consumer 在同一时刻从 zookeeper 看到的 view 就可能不一样,这是由 zookeeper 的特性决定的,这就会造成不正确的 reblance 尝试。

3. 调整结果不可控

  所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,这可能会导致 kafka 工作在一个不正确的状态。

kafka文件存储结构

kafka原理及应用

每一个partition目录下的文件被平均切割成大小相等(默认一个文件是500兆,可以手动去设置)的数据文件,

每一个数据文件都被称为一个段(segment file),但每个段消息数量不一定相等,这种特性能够使得老的segment可以被快速清除。

默认保留7天的数据。

Segment file是什么?

生产者生产的消息按照一定的分组策略被发送到broker中partition中的时候,这些消息如果在内存中放不下了,就会放在文件中,

partition在磁盘上就是一个目录,该目录名是topic的名称加上一个序号,在这个partition目录下,有两类文件,一类是以log为后缀的文件,

一类是以index为后缀的文件,每一个log文件和一个index文件相对应,这一对文件就是一个segment file,也就是一个段。

其中的log文件就是数据文件,里面存放的就是消息,而index文件是索引文件,索引文件记录了元数据信息。

kafka原理及应用

Segment文件命名的规则:partition全局的第一个segment从0(20个0)开始,后续的每一个segment文件名是上一个segment文件中最后一条消息的offset值。

索引文件(index文件)中存储着大量的元数据,而数据文件(log文件)中存储着大量的消息。

索引文件(index文件)中的元数据指向对应的数据文件(log文件)中消息的物理偏移地址。

kafka原理及应用

其中每个partiton中所持有的segments列表信息会存储在zookeeper中.

日志文件的删除策略非常简单:启动一个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间).为了避免删除文件时仍然有read操作(consumer消费),采取copy-on-write方式.

为什么选kafka?kafka消息会不会丢?

1.利用Partition实现并行处理

Kafka中的每个Topic都包含一个或多个Partition,且它们位于不同节点。同时,Partition在物理上对应一个本地文件夹,每个Partition包含一个或多个Segment,其中包含一个数据文件与一个索引文件。Partition像一个数组,可以通过索引(offset)去访问其数据。

2.ISR实现CAP中可用性与数据一致性的动态平衡

Kafka的数据复制方案接近于Master-Slave,不同的是,Kafka既不是完全的同步复制,也不是完全的一步复制,而是基于ISR的动态复制方案。

ISR,In-Sync Replica,每个Partition的Leader都会维护这样一个列表,其中包含了所有与之同步的Replica。每次写入数据时,只有ISR中的所有Replica都复制完,Leader才会将这条数据置为Commit,它才能被Consumer消费。

3.顺序写磁盘

将写磁盘的过程变为顺序写,可极大提高对磁盘的利用率。Consumer通过offset顺序消费这些数据,且不删除已经消费的数据,从而避免随机写磁盘的过程。

4.减少网络开销

批处理减少了网络传输的overhead,又提高了写磁盘的效率。

Kafka的API中,从send接口来看,一次只能发送一个ProducerRecord,但是send方法并不是立即将消息发送出去,而是通过batch.size和linger.ms控制实际发送频率,从而实现批量发送。

kafka消息发送分同步,异步两种方式,默认是同步方式,可通过producer.type属性进行配置。kafka保证消息被安全生产,有三个选项0,-1,1;通过request.required.acks属性进行配置.

0代表:不进行消息接收是否成功的确认(默认值);

1代表:当Leader副本接收成功后,返回接收成功确认信息;

-1代表:当Leader和Follower副本都接收成功后,返回接收成功确认信息;

消息丢失的场景

网络异常

acks设置为0时,不和Kafka集群进行消息接受确认,当网络发生异常等情况时,存在消息丢失的可能;

客户端异常

异步发送时,消息并没有直接发送至Kafka集群,而是在Client端按一定规则缓存并批量发送。在这期间,如果客户端发生死机等情况,都会导致消息的丢失;

缓冲区满了

异步发送时,Client端缓存的消息超出了缓冲池的大小,也存在消息丢失的可能;

Leader副本异常

acks设置为1时,Leader副本接收成功,Kafka集群就返回成功确认信息,而Follower副本可能还在同步。这时Leader副本突然出现异常,新Leader副本(原Follower副本)未能和其保持一致,就会出现消息丢失的情况;

以上就是消息丢失的几种情况,在日常应用中,我们需要结合自身的应用场景来选择不同的配置。

想要更高的吞吐量就设置:异步、ack=0;想要不丢失消息数据就选:同步、ack=-1策略


分享到:


相關文章: