Kafka消费与心跳机制

作者:哥不是小萝莉

出处:


http://www.cnblogs.com/smartloli/

1.概述

最近有同学咨询Kafka的消费和心跳机制,今天笔者将通过这篇博客来逐一介绍这些内容。

2.内容

2.1 Kafka消费

首先,我们来看看消费。Kafka提供了非常简单的消费API,使用者只需初始化Kafka的Broker Server地址,然后实例化KafkaConsumer类即可拿到Topic中的数据。一个简单的Kafka消费实例代码如下所示:

<code>

public

class

JConsumerSubscribe

extends

Thread {

public

static

void

main(

String

[] args) { JConsumerSubscribe jconsumer =

new

JConsumerSubscribe(); jconsumer.start(); }

private

Properties configure() { Properties props =

new

Properties(); props.put(

"bootstrap.servers"

,

"dn1:9092,dn2:9092,dn3:9092"

); props.put(

"group.id"

,

"ke"

); props.put(

"enable.auto.commit"

,

"true"

); props.put(

"auto.commit.interval.ms"

,

"1000"

); props.put(

"key.deserializer"

,

"org.apache.kafka.common.serialization.StringDeserializer"

); props.put(

"value.deserializer"

,

"org.apache.kafka.common.serialization.StringDeserializer"

);

return

props; }

public

void

run() { KafkaConsumer<

String

,

String

> consumer =

new

KafkaConsumer<>(configure()); consumer.subscribe(Arrays.asList(

"test_kafka_topic"

));

boolean

flag =

true

;

while

(flag) { ConsumerRecords<

String

,

String

> records = consumer.poll(Duration.ofMillis(

100

));

for

(ConsumerRecord<

String

,

String

> record : records) System.out.printf(

"offset = %d, key = %s, value = %s%n"

, record.offset(), record.key(), record.value()); } consumer.close(); } } /<code>

上述代码我们就可以非常便捷的拿到Topic中的数据。但是,当我们调用poll方法拉取数据的时候,Kafka Broker Server做了那些事情。接下来,我们可以去看看源代码的实现细节。核心代码如下:

org.apache.kafka.clients.consumer.KafkaConsumer

<code>

private

ConsumerRecords

poll

(

final

long

timeoutMs,

final

boolean

includeMetadataInTimeout)

{ acquireAndEnsureOpen();

try

{

if

(timeoutMs

0

)

throw

new

IllegalArgumentException(

"Timeout must not be negative"

);

if

(

this

.subscriptions.hasNoSubscriptionOrUserAssignment()) {

throw

new

IllegalStateException(

"Consumer is not subscribed to any topics or assigned any partitions"

); }

long

elapsedTime =

0L

;

do

{ client.maybeTriggerWakeup();

final

long

metadataEnd;

if

(includeMetadataInTimeout) {

final

long

metadataStart = time.milliseconds();

if

(!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {

return

ConsumerRecords.empty(); } metadataEnd = time.milliseconds(); elapsedTime += metadataEnd - metadataStart; }

else

{

while

(!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) { log.warn(

"Still waiting for metadata"

); } metadataEnd = time.milliseconds(); }

final

Map>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));

if

(!records.isEmpty()) {

if

(fetcher.sendFetches() >

0

|| client.hasPendingRequests()) { client.pollNoWakeup(); }

return

this

.interceptors.onConsume(

new

ConsumerRecords<>(records)); }

final

long

fetchEnd = time.milliseconds(); elapsedTime += fetchEnd - metadataEnd; }

while

(elapsedTime < timeoutMs);

return

ConsumerRecords.empty(); }

finally

{ release(); } } /<code>

上述代码中有个方法pollForFetches,它的实现逻辑如下:

<code>

private

Map>> pollForFetches(

final

long timeoutMs) {

final

long startMs = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);

final

Map>> records = fetcher.fetchedRecords();

if

(!records.isEmpty()) {

return

records; } fetcher.sendFetches();

if

(!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { pollTimeout = retryBackoffMs; } client.poll(pollTimeout, startMs, () -> {

return

!fetcher.hasCompletedFetches(); });

if

(coordinator.rejoinNeededOrPending()) {

return

Collections.emptyMap(); }

return

fetcher.fetchedRecords(); } /<code>

上述代码中加粗的位置,我们可以看出每次消费者客户端拉取数据时,通过poll方法,先调用fetcher中的fetchedRecords函数,如果获取不到数据,就会发起一个新的sendFetches请求。而在消费数据的时候,每个批次从Kafka Broker Server中拉取数据是有最大数据量限制,默认是500条,由属性(max.poll.records)控制,可以在客户端中设置该属性值来调整我们消费时每次拉取数据的量。

<code>提示:
这里需要注意的是,

max

.poll

.records

返回的是一个

poll

请求的数据总和,与多少个分区无关。因此,每次消费从所有分区中拉取

Topic

的数据的总条数不会超过

max

.poll

.records

所设置的值。 /<code>

而在Fetcher的类中,在sendFetches方法中有限制拉取数据容量的限制,由属性(max.partition.fetch.bytes),默认1MB。可能会有这样一个场景,当满足max.partition.fetch.bytes限制条件,如果需要Fetch出10000条记录,每次默认500条,那么我们需要执行20次才能将这一次通过网络发起的请求全部Fetch完毕。

这里,可能有同学有疑问,我们不能将默认的max.poll.records属性值调到10000吗?可以调,但是还有个属性需要一起配合才可以,这个就是每次poll的超时时间(Duration.ofMillis(100)),这里需要根据你的实际每条数据的容量大小来确定设置超时时间,如果你将最大值调到10000,当你每条记录的容量很大时,超时时间还是100ms,那么可能拉取的数据少于10000条。

而这里,还有另外一个需要注意的事情,就是会话超时的问题。session.timeout.ms默认是10s,group.min.session.timeout.ms默认是6s,group.max.session.timeout.ms默认是30min。当你在处理消费的业务逻辑的时候,如果在10s内没有处理完,那么消费者客户端就会与Kafka Broker Server断开,消费掉的数据,产生的offset就没法提交给Kafka,因为Kafka Broker Server此时认为该消费者程序已经断开,而即使你设置了自动提交属性,或者设置auto.offset.reset属性,你消费的时候还是会出现重复消费的情况,这就是因为session.timeout.ms超时的原因导致的。

2.2 心跳机制

上面在末尾的时候,说到会话超时的情况导致消息重复消费,为什么会有超时?有同学会有这样的疑问,我的消费者线程明明是启动的,也没有退出,为啥消费不到Kafka的消息呢?消费者组也查不到我的ConsumerGroupID呢?这就有可能是超时导致的,而Kafka是通过心跳机制来控制超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。

在org.apache.kafka.clients.consumer.internals.AbstractCoordinator中会启动一个HeartbeatThread线程来定时发送心跳和检测消费者的状态。每个消费者都有个org.apache.kafka.clients.consumer.internals.ConsumerCoordinator,而每个ConsumerCoordinator都会启动一个HeartbeatThread线程来维护心跳,心跳信息存放在org.apache.kafka.clients.consumer.internals.Heartbeat中,声明的Schema如下所示:

<code>    

private

final

int

sessionTimeoutMs;

private

final

int

heartbeatIntervalMs;

private

final

int

maxPollIntervalMs;

private

final

long

retryBackoffMs;

private

volatile

long

lastHeartbeatSend;

private

long

lastHeartbeatReceive;

private

long

lastSessionReset;

private

long

lastPoll;

private

boolean

heartbeatFailed; /<code>

心跳线程中的run方法实现代码如下:

<code>

public

void run() {

try

{ log.debug(

"Heartbeat thread started"

);

while

(

true

) { synchronized (AbstractCoordinator.

this

) {

if

(closed)

return

;

if

(!enabled) { AbstractCoordinator.

this

.wait();

continue

; }

if

(state != MemberState.STABLE) { disable();

continue

; } client.pollNoWakeup(); long now = time.milliseconds();

if

(coordinatorUnknown()) {

if

(findCoordinatorFuture !=

null

|| lookupCoordinator().failed()) AbstractCoordinator.

this

.wait(retryBackoffMs); }

else

if

(heartbeat.sessionTimeoutExpired(now)) { markCoordinatorUnknown(); }

else

if

(heartbeat.pollTimeoutExpired(now)) { maybeLeaveGroup(); }

else

if

(!heartbeat.shouldHeartbeat(now)) { AbstractCoordinator.

this

.wait(retryBackoffMs); }

else

{ heartbeat.sentHeartbeat(now); sendHeartbeatRequest().addListener(new RequestFutureListener<

Void

>() {

public

void onSuccess(

Void

value) { synchronized (AbstractCoordinator.

this

) { heartbeat.receiveHeartbeat(time.milliseconds()); } }

public

void onFailure(RuntimeException e) { synchronized (AbstractCoordinator.

this

) {

if

(e instanceof RebalanceInProgressException) { heartbeat.receiveHeartbeat(time.milliseconds()); }

else

{ heartbeat.failHeartbeat(); AbstractCoordinator.

this

.notify(); } } } }); } } } }

catch

(AuthenticationException e) { log.error(

"An authentication error occurred in the heartbeat thread"

, e);

this

.failed.

set

(e); }

catch

(GroupAuthorizationException e) { log.error(

"A group authorization error occurred in the heartbeat thread"

, e);

this

.failed.

set

(e); }

catch

(InterruptedException | InterruptException e) { Thread.interrupted(); log.error(

"Unexpected interrupt received in heartbeat thread"

, e);

this

.failed.

set

(new RuntimeException(e)); }

catch

(Throwable e) { log.error(

"Heartbeat thread failed due to unexpected error"

, e);

if

(e instanceof RuntimeException)

this

.failed.

set

((RuntimeException) e);

else

this

.failed.

set

(new RuntimeException(e)); }

finally

{ log.debug(

"Heartbeat thread has closed"

); } } View Code/<code>

在心跳线程中这里面包含两个最重要的超时函数,它们是sessionTimeoutExpired和pollTimeoutExpired。

<code>

public

boolean

sessionTimeoutExpired

(

long

now)

{

return

now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs; }

public

boolean

pollTimeoutExpired

(

long

now)

{

return

now - lastPoll > maxPollIntervalMs; } /<code>

2.2.1 sessionTimeoutExpired

如果是sessionTimeout超时,则会被标记为当前协调器处理断开,此时,会将消费者移除,重新分配分区和消费者的对应关系。在Kafka Broker Server中,Consumer Group定义了5中(如果算上Unknown,应该是6种状态)状态,org.apache.kafka.common.ConsumerGroupState,如下图所示:

Kafka消费与心跳机制

2.2.2 pollTimeoutExpired

如果触发了poll超时,此时消费者客户端会退出ConsumerGroup,当再次poll的时候,会重新加入到ConsumerGroup,触发RebalanceGroup。而KafkaConsumer Client是不会帮我们重复poll的,需要我们自己在实现的消费逻辑中不停的调用poll方法。

3.分区与消费线程

关于消费分区与消费线程的对应关系,理论上消费线程数应该小于等于分区数。之前是有这样一种观点,一个消费线程对应一个分区,当消费线程等于分区数是最大化线程的利用率。直接使用KafkaConsumer Client实例,这样使用确实没有什么问题。但是,如果我们有富裕的CPU,其实还可以使用大于分区数的线程,来提升消费能力,这就需要我们对KafkaConsumer Client实例进行改造,实现消费策略预计算,利用额外的CPU开启更多的线程,来实现消费任务分片。具体实现,留到下一篇博客,给大家分享《基于Kafka的分布式查询SQL引擎》。

4.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!


作者:哥不是小萝莉

出处:http://www.cnblogs.com/smartloli/


分享到:


相關文章: