01.29 springboot集成kafka示例

本地kafka和zk環境

我們需要在本地啟動一個單機版的kafka和zookeeper環境。kafka的安裝包自帶zookeeper,直接啟動即可,這個詳細過程不是本文的重點,不詳細說了。

我的本地環境配置如下:

  • win10系統
  • kafka_2.12-1.1.1
  • zookeeper-3.4.9
  • spring boot 2.1.6.RELEASE

啟動zk,端口是2181

<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\zookeeper-server-start.bat .\\config\\zookeeper.properties
/<code>

啟動kafka,端口是

<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-server-start.bat .\\config\\server.properties
/<code>

記得查看啟動日誌確認啟動成功才行。

用kafka自帶的工具創建一個topic試試:

<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test20190713
Created topic "test20190713".
/<code>

可以看到創建成功了。然後我們查詢下kafka的topic,

<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-topics.bat --list --zookeeper localhost:2181
test20190713
/<code>

然後我們可以用kafka自帶的生產者和消費者工具進行測試,進一步驗證本地環境。

首先分別啟動生產者和消費者,

<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-console-producer.bat --broker-list localhost:9092 --topic test20190713
>this is a test
>
/<code>
<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test20190713
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
/<code>

在消費者的窗口輸入消息,很快消費者窗口就會顯示出該消息了。或者消費者啟動也可以用下面的方式:

<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test20190713
/<code>

原因可以參考:

Kafka中的broker-list,bootstrap-server以及zookeeper

下面兩個如何配置

創建demo項目工程

依賴

<code><dependency>
<groupid>org.springframework.kafka/<groupid>
<artifactid>spring-kafka/<artifactid>
/<dependency>
/<code>

配置

<code>#============== kafka ===================
# 指定kafka 代理地址,可以多個

spring.kafka.bootstrap-servers=127.0.0.1:9092

#=============== 生產者配置=======================

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#===============消費者配置=======================
# 指定默認消費者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
/<code>

先來解釋下這幾個配置,

  • bootstrap-servers:連接kafka的地址,多個地址用逗號分隔
  • batch-size:當將多個記錄被髮送到同一個分區時, Producer 將嘗試將記錄組合到更少的請求中。這有助於提升客戶端和服務器端的性能。這個配置控制一個批次的默認大小(以字節為單位)。16384是缺省的配置
  • retries:若設置大於0的值,客戶端會將發送失敗的記錄重新發送
  • buffer-memory:Producer 用來緩衝等待被髮送到服務器的記錄的總字節數,33554432是缺省配置
  • key-serializer:關鍵字的序列化類
  • value-serializer:值的序列化類

到這裡配置就可以結束了,目前spring-kafka已經和spring boot無縫對接,可以自動加載配置文件進行配置,我們不需要再單獨定義配置類。

測試代碼

我們先定義一個消息實體,方便消費者和生產者共享。

<code>@Data
public class Message {
private Long id; //id
private String msg; //消息
private Date sendTime; //時間戳

}
/<code>

然後是生產者,

<code>@Component
@Slf4j
public class KafkaSender {

@Autowired
private KafkaTemplate<string> kafkaTemplate;

private Gson gson = new GsonBuilder().create();

//發送消息方法
public void send() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
log.info("+++++++++++++++++++++ message = {}", gson.toJson(message));
kafkaTemplate.send("malu", gson.toJson(message));

}
}
/<string>/<code>

代碼很簡單,不做過多解釋。

然後是消費者,

<code>@Component
@Slf4j
public class KafkaReceiver {

@KafkaListener(topics = {"malu"})
public void listen(ConsumerRecord, ?> record) {

Optional> kafkaMessage = Optional.ofNullable(record.value());

if (kafkaMessage.isPresent()) {

Object message = kafkaMessage.get();

log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}

}
}
/<code>

只需要在監聽的方法上通過註解配置一個監聽器即可,另外就是指定需要監聽的topic。

kafka的消息再接收端會被封裝成ConsumerRecord對象返回,它內部的value屬性就是實際的消息。

測試

啟動springboot項目,通過日誌可以看出消息的收發都是正常的。

<code>2019-07-29 15:02:28.812  INFO 13468 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-07-29 15:02:28.812 INFO 13468 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5

2019-07-29 15:02:28.819 INFO 13468 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: eGkIiJuNTHGwNkZy31j2NQ

2019-07-29 15:02:31.831 INFO 13468 --- [ main] c.p.github.kafka.producer.KafkaSender : +++++++++++++++++++++ message = {"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"}
2019-07-29 15:02:31.839 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ----------------- record =ConsumerRecord(topic = malu, partition = 0, offset = 1, CreateTime = 1564383751831, serialized key size = -1, serialized value size = 102, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"})
2019-07-29 15:02:31.839 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ------------------ message ={"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"}
2019-07-29 15:02:34.833 INFO 13468 --- [ main] c.p.github.kafka.producer.KafkaSender : +++++++++++++++++++++ message = {"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"}
2019-07-29 15:02:34.848 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ----------------- record =ConsumerRecord(topic = malu, partition = 0, offset = 2, CreateTime = 1564383754834, serialized key size = -1, serialized value size = 102, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"})
2019-07-29 15:02:34.849 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ------------------ message ={"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"}
/<code>

我們在代碼裡創建了一個名為 “malu” 的topic,可以通過命令查詢下:

<code>C:\\kafka\\kafka_2.12-1.1.1                                          
λ .\\bin\\windows\\kafka-topics.bat --list --zookeeper localhost:2181
__consumer_offsets
malu
test20190713
/<code>

其它說明

如果啟動的時候報錯,需要考慮springboot和spring-kafka兼容性問題。比如一開始我啟動的時候就報錯:

<code>Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration'
/<code>

後來把spring-kafka的版本升級下就好了。具體的版本對應關係可以看下官方的說明:

https://spring.io/projects/spring-kafka

參考:

http://kafka.apachecn.org/documentation.html

http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/


分享到:


相關文章: