本地kafka和zk環境
我們需要在本地啟動一個單機版的kafka和zookeeper環境。kafka的安裝包自帶zookeeper,直接啟動即可,這個詳細過程不是本文的重點,不詳細說了。
我的本地環境配置如下:
- win10系統
- kafka_2.12-1.1.1
- zookeeper-3.4.9
- spring boot 2.1.6.RELEASE
啟動zk,端口是2181
<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\zookeeper-server-start.bat .\\config\\zookeeper.properties
/<code>
啟動kafka,端口是
<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-server-start.bat .\\config\\server.properties
/<code>
記得查看啟動日誌確認啟動成功才行。
用kafka自帶的工具創建一個topic試試:
<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test20190713
Created topic "test20190713".
/<code>
可以看到創建成功了。然後我們查詢下kafka的topic,
<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-topics.bat --list --zookeeper localhost:2181
test20190713
/<code>
然後我們可以用kafka自帶的生產者和消費者工具進行測試,進一步驗證本地環境。
首先分別啟動生產者和消費者,
<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-console-producer.bat --broker-list localhost:9092 --topic test20190713
>this is a test
>
/<code>
<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test20190713
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
/<code>
在消費者的窗口輸入消息,很快消費者窗口就會顯示出該消息了。或者消費者啟動也可以用下面的方式:
<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test20190713
/<code>
原因可以參考:
Kafka中的broker-list,bootstrap-server以及zookeeper
下面兩個如何配置
創建demo項目工程
依賴
<code><dependency>
<groupid>org.springframework.kafka/<groupid>
<artifactid>spring-kafka/<artifactid>
/<dependency>
/<code>
配置
<code>#============== kafka ===================
# 指定kafka 代理地址,可以多個
spring.kafka.bootstrap-servers=127.0.0.1:9092
#=============== 生產者配置=======================
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#===============消費者配置=======================
# 指定默認消費者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
/<code>
先來解釋下這幾個配置,
- bootstrap-servers:連接kafka的地址,多個地址用逗號分隔
- batch-size:當將多個記錄被髮送到同一個分區時, Producer 將嘗試將記錄組合到更少的請求中。這有助於提升客戶端和服務器端的性能。這個配置控制一個批次的默認大小(以字節為單位)。16384是缺省的配置
- retries:若設置大於0的值,客戶端會將發送失敗的記錄重新發送
- buffer-memory:Producer 用來緩衝等待被髮送到服務器的記錄的總字節數,33554432是缺省配置
- key-serializer:關鍵字的序列化類
- value-serializer:值的序列化類
到這裡配置就可以結束了,目前spring-kafka已經和spring boot無縫對接,可以自動加載配置文件進行配置,我們不需要再單獨定義配置類。
測試代碼
我們先定義一個消息實體,方便消費者和生產者共享。
<code>@Data
public class Message {
private Long id; //id
private String msg; //消息
private Date sendTime; //時間戳
}
/<code>
然後是生產者,
<code>@Component
@Slf4j
public class KafkaSender {
@Autowired
private KafkaTemplate<string> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
//發送消息方法
public void send() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
log.info("+++++++++++++++++++++ message = {}", gson.toJson(message));
kafkaTemplate.send("malu", gson.toJson(message));
}
}
/<string>/<code>
代碼很簡單,不做過多解釋。
然後是消費者,
<code>@Component
@Slf4j
public class KafkaReceiver {
@KafkaListener(topics = {"malu"})
public void listen(ConsumerRecord, ?> record) {
Optional> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}
/<code>
只需要在監聽的方法上通過註解配置一個監聽器即可,另外就是指定需要監聽的topic。
kafka的消息再接收端會被封裝成ConsumerRecord對象返回,它內部的value屬性就是實際的消息。
測試
啟動springboot項目,通過日誌可以看出消息的收發都是正常的。
<code>2019-07-29 15:02:28.812 INFO 13468 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
2019-07-29 15:02:28.812 INFO 13468 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
2019-07-29 15:02:28.819 INFO 13468 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: eGkIiJuNTHGwNkZy31j2NQ
2019-07-29 15:02:31.831 INFO 13468 --- [ main] c.p.github.kafka.producer.KafkaSender : +++++++++++++++++++++ message = {"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"}
2019-07-29 15:02:31.839 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ----------------- record =ConsumerRecord(topic = malu, partition = 0, offset = 1, CreateTime = 1564383751831, serialized key size = -1, serialized value size = 102, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"})
2019-07-29 15:02:31.839 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ------------------ message ={"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"}
2019-07-29 15:02:34.833 INFO 13468 --- [ main] c.p.github.kafka.producer.KafkaSender : +++++++++++++++++++++ message = {"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"}
2019-07-29 15:02:34.848 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ----------------- record =ConsumerRecord(topic = malu, partition = 0, offset = 2, CreateTime = 1564383754834, serialized key size = -1, serialized value size = 102, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"})
2019-07-29 15:02:34.849 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ------------------ message ={"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"}
/<code>
我們在代碼裡創建了一個名為 “malu” 的topic,可以通過命令查詢下:
<code>C:\\kafka\\kafka_2.12-1.1.1
λ .\\bin\\windows\\kafka-topics.bat --list --zookeeper localhost:2181
__consumer_offsets
malu
test20190713
/<code>
其它說明
如果啟動的時候報錯,需要考慮springboot和spring-kafka兼容性問題。比如一開始我啟動的時候就報錯:
<code>Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration'
/<code>
後來把spring-kafka的版本升級下就好了。具體的版本對應關係可以看下官方的說明:
https://spring.io/projects/spring-kafka
參考:
http://kafka.apachecn.org/documentation.html
http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/
閱讀更多 科技伍小黑 的文章