1. Preface
在基于Kafka的消息传递方案中,Spring for Apache Kafka project应用了Spring的经典核心概念。
我们以“template”来作为发送消息的高级抽象,而且我们还提供了Message-driven POJO支持。
2. What’s new?
2.1. What’s New in 2.6 Since 2.5
本节涵盖了2.5到2.6所做的所有变更。有关先前版本的变更,请参考Change History.
2.1.1. Kafka Client Version
本版需要2.6.0 kafka-clients。
2.1.2. Listener Container Changes
默认的EOSMode当前是BETA。参考Exactly Once Semantics来了解详细信息。
当前若恢复失败,则错误处理器(error handler,继承至FailedRecordProcessor)和DefaultAfterRollbackProcessor默认会重置BackOff。
当然,你也可以根据失败记录和异常来自由定制BackOff。
参考Seek To Current Container Error Handlers, Recovering Batch Error Handler, Publishing Dead-letter Records和After-rollback Processor来了解详情。
除此之外,还可以在容器属性中配置adviceChain。参考Listener Container Properties来了解详情。
2.1.3. @KafkaLisener Changes
现在当使用手动分区分配时,您可以指定一个通配符来决定哪些分区需要重置为初始偏移。
此外,如果listener实现了ConsumerSeekAware,那么手动分区后还会调用onPartitionsAssigned()方法(2.5.5版本加入的新功能)。参考Explicit Partition Assignment了解详情。
为方便查找,AbstractConsumerSeekAware中还添加了许多便利方法。
参考Seeking to a Specific Offset来了解详情。
3. Introduction
参考文档的第一部分对Spring for Apache Kafka和底层概念进行高层概述,并提供了一些代码片段来让您快速上手。
3.1. Quick Tour for the Impatient
下面是Spring Kafka的五分钟教程。
前提: 必须先安装、运行Apache Kafka.然后,您必须获得spring-kafka JAR及其所有依赖项。
最简单的方法是在构建工具中声明依赖。
下面是Maven示例:
org.springframework.kafka
spring-kafka
2.6.1
下面是如何在Gradle添加依赖的示例:
compile 'org.springframework.kafka:spring-kafka:2.6.1'
提示
使用Spring Boot时可省略version,因为它能自动匹配正确的spring-kafka版本:
org.springframework.kafka
spring-kafka
下面是Gradle示例:
compile 'org.springframework.kafka:spring-kafka'
3.1.1. Compatibility
该快速教程所需的最小版本如下:
Apache Kafka Clients 2.4.1
Spring Framework 5.3.x
Minimum Java version: 8
3.1.2. A Very, Very Quick Example
如下所示,你可以直接使用Java来收发消息:
@Test public void testAutoCommit() throws Exception {
logger.info("Start auto");
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener(new MessageListener() {
@Override public void onMessage(ConsumerRecord message) {
logger.info("received: " + message);
latch.countDown();
}
});
KafkaMessageListenerContainer container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start(); Thread.sleep(1000); // wait a bit for the container to start
KafkaTemplate template = createTemplate();
template.setDefaultTopic("topic1");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();
logger.info("Stop auto");
}
private KafkaMessageListenerContainer
createContainer( ContainerProperties containerProps) {Map props = consumerProps();
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props);
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
}
private KafkaTemplate createTemplate() {
Map senderProps = senderProps();
ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps);
KafkaTemplate template = new KafkaTemplate<>(pf);
return template;
}
private Map consumerProps() {
Map props =
new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
private Map senderProps() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
3.1.3. With Java Configuration
您也可以使用Java配置来完成上述示例的等价功能。如下所示:
@Autowired
private Listener listener;
@Autowired
private KafkaTemplate template;
@Test
public void testSimple() throws Exception {
template.send("annotated1", 0, "foo");
template.flush();
assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
... return props;
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map producerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
... return props;
}
@Bean public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
public class Listener {
private final CountDownLatch latch1 = new CountDownLatch(1);
@KafkaListener(id = "foo", topics = "annotated1")
public void listen1(String foo) {
this.latch1.countDown();
}
}
3.1.4. Even Quicker, with Spring Boot
Spring Boot可以让事情更简单。
下面的Spring Boot应用程序会向topic发送三条消息,然后再接收消息,最后再停止:
@SpringBootApplication
public class Application implements CommandLineRunner {
public static Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Autowired
private KafkaTemplate template;
private final CountDownLatch latch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
this.template.send("myTopic", "foo1");
this.template.send("myTopic", "foo2");
this.template.send("myTopic", "foo3");
latch.await(60, TimeUnit.SECONDS);
logger.info("All received");
}
@KafkaListener(topics = "myTopic")
public void
listen(ConsumerRecord, ?> cr) throws Exception {logger.info(cr.toString());
latch.countDown();
}
}
Boot能完成大多数配置。当使用local broker时,只需配置如下属性:
Example 1. application.properties
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=earliest
之所以需要第1个属性,是因为我们需要根据组来为消费者分配topic分区。
第2个属性可确保新的消费组能获得先前发送的消息,因为容器可能会在发送完成后启动。