Spring Boot2.0 整合 Kafka

Kafka 概述

Apache Kafka 是一個分佈式流處理平臺,用於構建實時的數據管道和流式的應用.它可以讓你發佈和訂閱流式的記錄,可以儲存流式的記錄,並且有較好的容錯性,可以在流式記錄產生時就進行處理。

Apache Kafka是分佈式發佈-訂閱消息系統,在 kafka官網上對 Kafka 的定義:一個分佈式發佈-訂閱消息傳遞系統。

Kafka 特性

  1. 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作;
  2. 可擴展性:kafka集群支持熱擴展;
  3. 持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失;
  4. 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗);
  5. 高併發:支持數千個客戶端同時讀寫;
  6. 支持實時在線處理和離線處理:可以使用Storm這種實時流處理系統對消息進行實時進行處理,同時還可以使用Hadoop這種批處理系統進行離線處理;

Kafka 使用場景

  1. 日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如Hadoop、Hbase、Solr等;
  2. 消息系統:解耦和生產者和消費者、緩存消息等;
  3. 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到Hadoop、數據倉庫中做離線分析和挖掘;
  4. 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分佈式應用的數據,生產各種操作的集中反饋,比如報警和報告;
  5. 流式處理:比如spark streaming和storm;
  6. 事件源;

Spring Boot2.0 + Kafka

1,安裝配置Kafka ,Zookeeper

安裝和配置過程很簡單,就不詳細說了,參考官網:http://kafka.apache.org/quickstart

使用命令啟動Kafka: bin``/kafka-server-start``.sh config``/server``.properties

下面給出我的環境:

Centos 7.5, Kafka 2.11, Zookeeper-3.4.13, JDK1.8+

2,創建 Spring Boot 項目

注意版本:該項目使用Spring Boot 2.0 +,低版本可能不對

  1. pom.xml引用

org.springframework.boot

spring-boot-starter

org.springframework.kafka

spring-kafka

com.alibaba

fastjson

1.2.47

  1. 定義消息生產者
  2. 直接使用 KafkaTemplate 發送消息 ,Spring Boot自動裝配,不需要自己定義一個Kafka配置類,吐槽一下網站的文章,全都是互相抄,全都寫一個 ProduceConfig Consumerconfig 類, Kafka 的參數配置 硬編碼在代碼中,簡直無法直視。。
  3. 定義一個泛型類KafkaSender T 就是你需要發送的消息 對象,序列化使用阿里的 fastjson

消息發送後,可以在回調類裡面處理自己的業務,ListenableFutureCallback 類有兩個方法,分別是 onFailureon 和 onSuccess ,實際場景可以在這兩個方法,處理自己的具體業務,這裡不做實現。

/**

* 消息生產者

*

* @author Jarvis

* @date 2018/8/3

*/

@Component

public class KafkaSender {

private Logger logger = LoggerFactory.getLogger(KafkaSender.class);

@Autowired

private KafkaTemplate kafkaTemplate;

/**

* kafka 發送消息

*

* @param obj 消息對象

*/

public void send(T obj) {

String jsonObj = JSON.toJSONString(obj);

logger.info("------------ message = {}", jsonObj);

//發送消息

ListenableFuture> future = kafkaTemplate.send("kafka.tut", jsonObj);

future.addCallback(new ListenableFutureCallback>() {

@Override

public void onFailure(Throwable throwable) {

logger.info("Produce: The message failed to be sent:" + throwable.getMessage());

}

@Override

public void onSuccess(SendResult stringObjectSendResult) {

//TODO 業務處理

logger.info("Produce: The message was sent successfully:");

logger.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString());

}

});

}

}

  1. 定義消息消費者
  2. 使用@KafkaListener 註解監聽 topics 消息,此處的topics 必須和 send 函數中的 一致

@Header(KafkaHeaders.RECEIVED_TOPI 直接獲取 topic

/**

* 監聽kafka.tut 的 topic

*

* @param record

* @param topic topic

*/

@KafkaListener(id = "tut", topics = "kafka.tut")

public void listen(ConsumerRecord, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

//判斷是否NULL

Optional> kafkaMessage = Optional.ofNullable(record.value());

if (kafkaMessage.isPresent()) {

//獲取消息

Object message = kafkaMessage.get();

logger.info("Receive: +++++++++++++++ Topic:" + topic);

logger.info("Receive: +++++++++++++++ Record:" + record);

logger.info("Receive: +++++++++++++++ Message:" + message);

}

}

  1. 配置文件 application.yml

spring:

application:

name: kafka-tutorial

kafka:

# 指定kafka 代理地址,可以多個

bootstrap-servers: 192.168.10.100:9092

producer:

retries: 0

# 每次批量發送消息的數量

batch-size: 16384

# 緩存容量

buffer-memory: 33554432

# 指定消息key和消息體的編解碼方式

key-serializer: org.apache.kafka.common.serialization.StringSerializer

value-serializer: org.apache.kafka.common.serialization.StringSerializer

consumer:

# 指定默認消費者group id

group-id: consumer-tutorial

auto-commit-interval: 100

auto-offset-reset: earliest

enable-auto-commit: true

# 指定消息key和消息體的編解碼方式

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

# 指定listener 容器中的線程數,用於提高併發量

listener:

concurrency: 3

  1. 直接使用 @Autowired 對類 KafkaSender 自動裝配,然後調用 send 方法發送消息即可,下面給出代碼:

@Autowired

private KafkaSender kafkaSender;

@Test

public void kafkaSend() throws InterruptedException {

//模擬發消息

for (int i = 0; i < 5; i++) {

User user = new User();

user.setId(System.currentTimeMillis());

user.setMsg(UUID.randomUUID().toString());

user.setSendTime(new Date());

kafkaSender.send(message);

Thread.sleep(3000);

}

}

控制檯可以看到執行成功:

Spring Boot2.0 整合 Kafka

在服務器執行 bin/kafka-topics.sh --list --zookeeper localhost:2181 可以看到topic

Spring Boot2.0 整合 Kafka

Kafka如何保證數據的不丟失

1.生產者數據的不丟失

  • 新版本的producer採用異步發送機制。KafkaProducer.send(ProducerRecord)方法僅僅是把這條消息放入一個緩存中(即RecordAccumulator,本質上使用了隊列來緩存記錄),同時後臺的IO線程會不斷掃描該緩存區,將滿足條件的消息封裝到某個batch中然後發送出去。顯然,這個過程中就有一個數據丟失的窗口:若IO線程發送之前client端掛掉了,累積在accumulator中的數據的確有可能會丟失。 kafka的ack機制:在kafka發送數據的時候,每次發送消息都會有一個確認反饋機制,確保消息正常的能夠被收到。
  • 如果是同步模式:ack機制能夠保證數據的不丟失,如果ack設置為0,風險很大,一般不建議設置為0
  • producer.type=sync
  • request.required.acks=1
  • 如果是異步模式:通過buffer來進行控制數據的發送,有兩個值來進行控制,時間閾值與消息的數量閾值,如果buffer滿了數據還沒有發送出去,如果設置的是立即清理模式,風險很大,一定要設置為阻塞模式
  • producer.type=async
  • request.required.acks=1
  • queue.buffering.max.ms=5000
  • queue.buffering.max.messages=10000
  • queue.enqueue.timeout.ms = -1
  • batch.num.messages=200
  • 結論:producer有丟數據的可能,但是可以通過配置保證消息的不丟失
  • 2.消費者數據的不丟失
  • 如果在消息處理完成前就提交了offset,那麼就有可能造成數據的丟失。由於Kafka consumer默認是自動提交位移的,所以在後臺提交位移前一定要保證消息被正常處理了,因此不建議採用很重的處理邏輯,如果處理耗時很長,則建議把邏輯放到另一個線程中去做。為了避免數據丟失,現給出兩點建議:
  • enable.auto.commit=false 關閉自動提交位移
  • 在消息被完整處理之後再手動提交位移
  • 如果使用了storm,要開啟storm的ackfail機制;
  • 如果沒有使用storm,確認數據被完成處理之後,再更新offset值。低級API中需要手動控制offset值。通過offset commit 來保證數據的不丟失,kafka自己記錄了每次消費的offset數值,下次繼續消費的時候,接著上次的offset進行消費即可。


分享到:


相關文章: