spring-kafka-2.6.1~1. Preface


1. Preface

在基于Kafka的消息传递方案中,Spring for Apache Kafka project应用了Spring的经典核心概念。

我们以“template”来作为发送消息的高级抽象,而且我们还提供了Message-driven POJO支持。


2. What’s new?

2.1. What’s New in 2.6 Since 2.5

本节涵盖了2.5到2.6所做的所有变更。有关先前版本的变更,请参考Change History.


2.1.1. Kafka Client Version

本版需要2.6.0 kafka-clients。


2.1.2. Listener Container Changes

默认的EOSMode当前是BETA。参考Exactly Once Semantics来了解详细信息。

当前若恢复失败,则错误处理器(error handler,继承至FailedRecordProcessor)和DefaultAfterRollbackProcessor默认会重置BackOff。

当然,你也可以根据失败记录和异常来自由定制BackOff。

参考Seek To Current Container Error Handlers, Recovering Batch Error Handler, Publishing Dead-letter Records和After-rollback Processor来了解详情。

除此之外,还可以在容器属性中配置adviceChain。参考Listener Container Properties来了解详情。


2.1.3. @KafkaLisener Changes

现在当使用手动分区分配时,您可以指定一个通配符来决定哪些分区需要重置为初始偏移。

此外,如果listener实现了ConsumerSeekAware,那么手动分区后还会调用onPartitionsAssigned()方法(2.5.5版本加入的新功能)。参考Explicit Partition Assignment了解详情。

为方便查找,AbstractConsumerSeekAware中还添加了许多便利方法。

参考Seeking to a Specific Offset来了解详情。

3. Introduction

参考文档的第一部分对Spring for Apache Kafka和底层概念进行高层概述,并提供了一些代码片段来让您快速上手。

3.1. Quick Tour for the Impatient

下面是Spring Kafka的五分钟教程。

前提: 必须先安装、运行Apache Kafka.然后,您必须获得spring-kafka JAR及其所有依赖项。

最简单的方法是在构建工具中声明依赖。

下面是Maven示例:

org.springframework.kafka

spring-kafka

2.6.1

下面是如何在Gradle添加依赖的示例:

compile 'org.springframework.kafka:spring-kafka:2.6.1'

提示

使用Spring Boot时可省略version,因为它能自动匹配正确的spring-kafka版本:

org.springframework.kafka

spring-kafka

下面是Gradle示例:

compile 'org.springframework.kafka:spring-kafka'


3.1.1. Compatibility

该快速教程所需的最小版本如下:

Apache Kafka Clients 2.4.1

Spring Framework 5.3.x

Minimum Java version: 8

3.1.2. A Very, Very Quick Example

如下所示,你可以直接使用Java来收发消息:

@Test public void testAutoCommit() throws Exception {

logger.info("Start auto");

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");

final CountDownLatch latch = new CountDownLatch(4);

containerProps.setMessageListener(new MessageListener() {

@Override public void onMessage(ConsumerRecord message) {

logger.info("received: " + message);

latch.countDown();

}

});

KafkaMessageListenerContainer container = createContainer(containerProps);

container.setBeanName("testAuto");

container.start(); Thread.sleep(1000); // wait a bit for the container to start

KafkaTemplate template = createTemplate();

template.setDefaultTopic("topic1");

template.sendDefault(0, "foo");

template.sendDefault(2, "bar");

template.sendDefault(0, "baz");

template.sendDefault(2, "qux");

template.flush();

assertTrue(latch.await(60, TimeUnit.SECONDS));

container.stop();

logger.info("Stop auto");

}

private KafkaMessageListenerContainer

createContainer( ContainerProperties containerProps) {

Map props = consumerProps();

DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props);

KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps);

return container;

}

private KafkaTemplate createTemplate() {

Map senderProps = senderProps();

ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps);

KafkaTemplate template = new KafkaTemplate<>(pf);

return template;

}

private Map consumerProps() {

Map props =

new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(ConsumerConfig.GROUP_ID_CONFIG, group);

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return props;

}

private Map senderProps() {

Map props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(ProducerConfig.RETRIES_CONFIG, 0);

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return props;

}

3.1.3. With Java Configuration

您也可以使用Java配置来完成上述示例的等价功能。如下所示:

@Autowired

private Listener listener;

@Autowired

private KafkaTemplate template;

@Test

public void testSimple() throws Exception {

template.send("annotated1", 0, "foo");

template.flush();

assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));

}

@Configuration

@EnableKafka

public class Config {

@Bean

ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

return factory;

}

@Bean

public ConsumerFactory consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());

}

@Bean

public Map consumerConfigs() {

Map props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());

... return props;

}

@Bean

public Listener listener() {

return new Listener();

}

@Bean

public ProducerFactory producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());

}

@Bean

public Map producerConfigs() {

Map props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());

... return props;

}

@Bean public KafkaTemplate kafkaTemplate() {

return new KafkaTemplate(producerFactory());

}

}

public class Listener {

private final CountDownLatch latch1 = new CountDownLatch(1);

@KafkaListener(id = "foo", topics = "annotated1")

public void listen1(String foo) {

this.latch1.countDown();

}

}

3.1.4. Even Quicker, with Spring Boot

Spring Boot可以让事情更简单。

下面的Spring Boot应用程序会向topic发送三条消息,然后再接收消息,最后再停止:

@SpringBootApplication

public class Application implements CommandLineRunner {

public static Logger logger = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {

SpringApplication.run(Application.class, args).close();

}

@Autowired

private KafkaTemplate template;

private final CountDownLatch latch = new CountDownLatch(3);

@Override

public void run(String... args) throws Exception {

this.template.send("myTopic", "foo1");

this.template.send("myTopic", "foo2");

this.template.send("myTopic", "foo3");

latch.await(60, TimeUnit.SECONDS);

logger.info("All received");

}

@KafkaListener(topics = "myTopic")

public void

listen(ConsumerRecord, ?> cr) throws Exception {

logger.info(cr.toString());

latch.countDown();

}

}

Boot能完成大多数配置。当使用local broker时,只需配置如下属性:

Example 1. application.properties

spring.kafka.consumer.group-id=foo

spring.kafka.consumer.auto-offset-reset=earliest

之所以需要第1个属性,是因为我们需要根据组来为消费者分配topic分区。

第2个属性可确保新的消费组能获得先前发送的消息,因为容器可能会在发送完成后启动。


分享到:


相關文章: