Kafka入门-Producer API 使用

生产者 API 允许应用程序将数据流发送到 Kafka 集群中的主题。

注:使用的是新版本的 Java 语言编写的 API,不对 0.8.x 及以前版本进行讨论

maven 依赖

<dependency>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka-clients/<artifactid>
<version>2.3.0/<version>
/<dependency>

简单示例

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* 2019年10月25日 superz add
*/
public class ProducerDemo
{
public static String brokerServers="localhost:9092";
public static String topic="test";

public static void main(String[] args) {
Properties props=new Properties(); // ①
props.put("bootstrap.servers", brokerServers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 构建KafkaProducer实例
Producer<string> producer = new KafkaProducer<>(props); // ②

// 构建ProducerRecord对象消息进行发送
ProducerRecord<string> record=new ProducerRecord<>(topic,"Hello World"); // ③
producer.send(record); // ④

// 关闭客户端
producer.close(); // ⑤
}
}
/<string>/<string>

示例分析

参数配置

在简单示例中的 ① 可以看到,在构建 KafkaProducer 实例前需要配置相关参数,其中有 3 个参数是必要参数:

bootstrap.server:该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。key.serializer:指定的类将键序列化。Kafka 客户端默认提供了 ByteArraySerializer、StringSerializer 和 IntegerSerizlizer;若需要自定义序列化器,则必须实现 org.apache.kafka.common.serialization.Serializer 接口。value.serializer:与key.serializer一样,指定的类将值序列化。

KafkaProducer 的绝非上面 3 个必要参数,详细的参数见本文最后的参考,开发人员可以根据实际情况来修改这些参数的默认值来解决开发过程遇到的问题。

在实际使用情况下很难记住所有的参数名称,只能有个大概的印象,可以直接使用客户端中的 org.apache.clients.producer.ProducerConfig 类来获取对应的参数,如下:

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

通过 ProducerConfig 类的使用,代码简洁了许多,同时进一步降低了拼写方面的错误,因此推荐使用此种方式来完成参数的配置。

构建生产者实例

简单示例的 ② 构建了一个 KafkaProducer 实例。KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例。

构建消息

简单示例的 ③ 构建了一个 ProducerRecord 对象,ProducerRecord 有很多构造方法,如下:

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<header> headers)
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<header> headers)
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, V value)
/<header>/<header>

可根据实际的需求,构建不同的消息,也就是需要构建不同的 ProducerRecord 对象。

发送消息

简单示例的 ④ 表明在创建生产者实例和构建消息之后就可以开始发送消息了。发送消息主要有三种模式:发后即忘(fire-and-forget)同步(sync)异步(async)

简单示例④就是发送即忘的方式,它只管往 Kafka 中发送消息而并不关心消息是否正确到达。这种方式的可靠性最差,因为可能在某些时候出现发送失败,失败消息没有相应的后续处理而造成消息丢失,但是这种发送方式的性能是最高的。

KafkaProducer 的 send() 方法并非是 void 类型,而是 Future 类型,send() 方法有 2 个重载方法,具体定义如下:

Future<recordmetadata> send(ProducerRecord record);
Future<recordmetadata> send(ProducerRecord record, Callback callback);
/<recordmetadata>/<recordmetadata>

要实现同步的发送方式,可以利用返回的 Future 对象实现,即直接在 send() 方法之后链式调用 get() 方法来阻塞等待 Kafka 的响应,直到消息发送成功,或者发生异常,实现同步的示例代码如下:

producer.send(record).get();

同步发送的方式可靠性高,消息要么发送成,要么发送异常。如果发生异常则可以捕获并进行相应的后续处理,这个就不会造成消息的丢失。不过同步发送模式的性能比较差,需要阻塞等待一条消息发送完成之后才能发送下一条。

要实现异步的发送方式,只需要在 send() 方法里指定一个 Callback 的回调函数,Kafka 在返回响应时调用该函数来实现异步发送确认。异步发送方式的示例如下:

producer.send(record, new Callback()
{
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(null!=exception)
exception.printStackTrace();
else
System.out.println(metadata.offset());
}
});


onCompletion() 方法的两个参数是互斥的,消息发送成功时,metadata 不为 null 而 exception 为 null;消息发送异常时,metadata 为 null 而 exception 不为 null。

消息发送的顺序性

示例:

producer.send(record1,callback1);
producer.send(record2,callback2);

对于同一个分区而言,如果消息 record1 比 record2 先发送,那么 KafkaProducer 就可以保证对应的 callback1 比 callback2 先响应,即回调函数的调用也可以保证分区有序。

关闭客户端实例

在简单示例 ⑤ 进行了 close() 操作,因为一般一个 KafkaProducer 实例不会只负责发送单条消息,更多的时发送多条消息,在发送完这些消息之后,可以一次性调用 KafkaProducer 的 close() 方法来回收资源。

close() 方法会阻塞等待之前所有的发送请求完成后再关闭 KafkaProducer。

KafkaProducer 还提供了一个带超时时间的 close() 方法,源码中定义如下:

public void close(Duration timeout)

KafkaProducer 只会等待 timeout 时间去完成请求处理,然后强制退出。

参考

Producer API 的 javadoc:http://kafka.apachecn.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

生产者参数配置列表见:http://kafka.apachecn.org/documentation.html#producerconfigs

本文由博客一文多发平台 https://openwrite.cn?from=article_bottom 发布!