Kafka入門-Producer API 使用

生產者 API 允許應用程序將數據流發送到 Kafka 集群中的主題。

注:使用的是新版本的 Java 語言編寫的 API,不對 0.8.x 及以前版本進行討論

maven 依賴

<dependency>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka-clients/<artifactid>
<version>2.3.0/<version>
/<dependency>

簡單示例

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* 2019年10月25日 superz add
*/
public class ProducerDemo
{
public static String brokerServers="localhost:9092";
public static String topic="test";

public static void main(String[] args) {
Properties props=new Properties(); // ①
props.put("bootstrap.servers", brokerServers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 構建KafkaProducer實例
Producer<string> producer = new KafkaProducer<>(props); // ②

// 構建ProducerRecord對象消息進行發送
ProducerRecord<string> record=new ProducerRecord<>(topic,"Hello World"); // ③
producer.send(record); // ④

// 關閉客戶端
producer.close(); // ⑤
}
}
/<string>/<string>

示例分析

參數配置

在簡單示例中的 ① 可以看到,在構建 KafkaProducer 實例前需要配置相關參數,其中有 3 個參數是必要參數:

  • bootstrap.server:該屬性指定 broker 的地址清單,地址的格式為 host:port。清單裡不需要包含所有的 broker 地址,生產者會從給定的 broker 裡查找到其他 broker 的信息。不過建議至少要提供兩個 broker 的信息,一旦其中一個宕機,生產者仍然能夠連接到集群上。
  • key.serializer:指定的類將鍵序列化。Kafka 客戶端默認提供了 ByteArraySerializer、StringSerializer 和 IntegerSerizlizer;若需要自定義序列化器,則必須實現 org.apache.kafka.common.serialization.Serializer 接口。
  • value.serializer:與key.serializer一樣,指定的類將值序列化。

KafkaProducer 的絕非上面 3 個必要參數,詳細的參數見本文最後的參考,開發人員可以根據實際情況來修改這些參數的默認值來解決開發過程遇到的問題。

在實際使用情況下很難記住所有的參數名稱,只能有個大概的印象,可以直接使用客戶端中的 org.apache.clients.producer.ProducerConfig 類來獲取對應的參數,如下:

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerServers); 

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

通過 ProducerConfig 類的使用,代碼簡潔了許多,同時進一步降低了拼寫方面的錯誤,因此推薦使用此種方式來完成參數的配置。

構建生產者實例

簡單示例的 ② 構建了一個 KafkaProducer 實例。KafkaProducer 是線程安全的,可以在多個線程中共享單個 KafkaProducer 實例。

構建消息

簡單示例的 ③ 構建了一個 ProducerRecord 對象,ProducerRecord 有很多構造方法,如下:

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<header> headers)
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<header> headers)
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, V value)
/<header>/<header>

可根據實際的需求,構建不同的消息,也就是需要構建不同的 ProducerRecord 對象。

發送消息

簡單示例的 ④ 表明在創建生產者實例和構建消息之後就可以開始發送消息了。發送消息主要有三種模式:

發後即忘(fire-and-forget)同步(sync)異步(async)

簡單示例④就是發送即忘的方式,它只管往 Kafka 中發送消息而並不關心消息是否正確到達。這種方式的可靠性最差,因為可能在某些時候出現發送失敗,失敗消息沒有相應的後續處理而造成消息丟失,但是這種發送方式的性能是最高的。

KafkaProducer 的 send() 方法並非是 void 類型,而是 Future 類型,send() 方法有 2 個重載方法,具體定義如下:

Future<recordmetadata> send(ProducerRecord record);
Future<recordmetadata> send(ProducerRecord record, Callback callback);
/<recordmetadata>
/<recordmetadata>

要實現同步的發送方式,可以利用返回的 Future 對象實現,即直接在 send() 方法之後鏈式調用 get() 方法來阻塞等待 Kafka 的響應,直到消息發送成功,或者發生異常,實現同步的示例代碼如下:

producer.send(record).get();

同步發送的方式可靠性高,消息要麼發送成,要麼發送異常。如果發生異常則可以捕獲並進行相應的後續處理,這個就不會造成消息的丟失。不過同步發送模式的性能比較差,需要阻塞等待一條消息發送完成之後才能發送下一條。

要實現異步的發送方式,只需要在 send() 方法裡指定一個 Callback 的回調函數,Kafka 在返回響應時調用該函數來實現異步發送確認。異步發送方式的示例如下:

producer.send(record, new Callback()
{
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(null!=exception)
exception.printStackTrace();
else
System.out.println(metadata.offset());
}
});

onCompletion() 方法的兩個參數是互斥的,消息發送成功時,metadata 不為 null 而 exception 為 null;消息發送異常時,metadata 為 null 而 exception 不為 null。

消息發送的順序性

示例:

producer.send(record1,callback1);
producer.send(record2,callback2);

對於同一個分區而言,如果消息 record1 比 record2 先發送,那麼 KafkaProducer 就可以保證對應的 callback1 比 callback2 先響應,即回調函數的調用也可以保證分區有序。

關閉客戶端實例

在簡單示例 ⑤ 進行了 close() 操作,因為一般一個 KafkaProducer 實例不會只負責發送單條消息,更多的時發送多條消息,在發送完這些消息之後,可以一次性調用 KafkaProducer 的 close() 方法來回收資源。

close() 方法會阻塞等待之前所有的發送請求完成後再關閉 KafkaProducer。

KafkaProducer 還提供了一個帶超時時間的 close() 方法,源碼中定義如下:

public void close(Duration timeout)

KafkaProducer 只會等待 timeout 時間去完成請求處理,然後強制退出。

參考

Producer API 的 javadoc:http://kafka.apachecn.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

生產者參數配置列表見:http://kafka.apachecn.org/documentation.html#producerconfigs

本文由博客一文多發平臺 https://openwrite.cn?from=article_bottom 發佈!


分享到:


相關文章: