Kafka 概述
Apache Kafka 是一個分佈式流處理平臺,用於構建實時的數據管道和流式的應用.它可以讓你發佈和訂閱流式的記錄,可以儲存流式的記錄,並且有較好的容錯性,可以在流式記錄產生時就進行處理。
Apache Kafka是分佈式發佈-訂閱消息系統,在 kafka官網上對 Kafka 的定義:一個分佈式發佈-訂閱消息傳遞系統。
Kafka 特性
- 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作;
- 可擴展性:kafka集群支持熱擴展;
- 持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失;
- 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗);
- 高併發:支持數千個客戶端同時讀寫;
- 支持實時在線處理和離線處理:可以使用Storm這種實時流處理系統對消息進行實時進行處理,同時還可以使用Hadoop這種批處理系統進行離線處理;
Kafka 使用場景
- 日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如Hadoop、Hbase、Solr等;
- 消息系統:解耦和生產者和消費者、緩存消息等;
- 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到Hadoop、數據倉庫中做離線分析和挖掘;
- 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分佈式應用的數據,生產各種操作的集中反饋,比如報警和報告;
- 流式處理:比如spark streaming和storm;
- 事件源;
Spring Boot2.0 + Kafka
1,安裝配置Kafka ,Zookeeper
安裝和配置過程很簡單,就不詳細說了,參考官網:http://kafka.apache.org/quickstart
使用命令啟動Kafka: bin``/kafka-server-start``.sh config``/server``.properties
下面給出我的環境:
Centos 7.5, Kafka 2.11, Zookeeper-3.4.13, JDK1.8+
2,創建 Spring Boot 項目
注意版本:該項目使用Spring Boot 2.0 +,低版本可能不對
- pom.xml引用
- 定義消息生產者
- 直接使用 KafkaTemplate 發送消息 ,Spring Boot自動裝配,不需要自己定義一個Kafka配置類,吐槽一下網站的文章,全都是互相抄,全都寫一個 ProduceConfig Consumerconfig 類, Kafka 的參數配置 硬編碼在代碼中,簡直無法直視。。
- 定義一個泛型類KafkaSender
T 就是你需要發送的消息 對象,序列化使用阿里的 fastjson
消息發送後,可以在回調類裡面處理自己的業務,ListenableFutureCallback 類有兩個方法,分別是 onFailureon 和 onSuccess ,實際場景可以在這兩個方法,處理自己的具體業務,這裡不做實現。
/**
* 消息生產者
*
* @author Jarvis
* @date 2018/8/3
*/
@Component
public class KafkaSender
private Logger logger = LoggerFactory.getLogger(KafkaSender.class);
@Autowired
private KafkaTemplate
/**
* kafka 發送消息
*
* @param obj 消息對象
*/
public void send(T obj) {
String jsonObj = JSON.toJSONString(obj);
logger.info("------------ message = {}", jsonObj);
//發送消息
ListenableFuture
future.addCallback(new ListenableFutureCallback
@Override
public void onFailure(Throwable throwable) {
logger.info("Produce: The message failed to be sent:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult
//TODO 業務處理
logger.info("Produce: The message was sent successfully:");
logger.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString());
}
});
}
}
- 定義消息消費者
- 使用@KafkaListener 註解監聽 topics 消息,此處的topics 必須和 send 函數中的 一致
@Header(KafkaHeaders.RECEIVED_TOPI 直接獲取 topic
/**
* 監聽kafka.tut 的 topic
*
* @param record
* @param topic topic
*/
@KafkaListener(id = "tut", topics = "kafka.tut")
public void listen(ConsumerRecord, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
//判斷是否NULL
Optional> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
//獲取消息
Object message = kafkaMessage.get();
logger.info("Receive: +++++++++++++++ Topic:" + topic);
logger.info("Receive: +++++++++++++++ Record:" + record);
logger.info("Receive: +++++++++++++++ Message:" + message);
}
}
- 配置文件 application.yml
spring:
application:
name: kafka-tutorial
kafka:
# 指定kafka 代理地址,可以多個
bootstrap-servers: 192.168.10.100:9092
producer:
retries: 0
# 每次批量發送消息的數量
batch-size: 16384
# 緩存容量
buffer-memory: 33554432
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默認消費者group id
group-id: consumer-tutorial
auto-commit-interval: 100
auto-offset-reset: earliest
enable-auto-commit: true
# 指定消息key和消息體的編解碼方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 指定listener 容器中的線程數,用於提高併發量
listener:
concurrency: 3
- 直接使用 @Autowired 對類 KafkaSender 自動裝配,然後調用 send 方法發送消息即可,下面給出代碼:
@Autowired
private KafkaSender
@Test
public void kafkaSend() throws InterruptedException {
//模擬發消息
for (int i = 0; i < 5; i++) {
User user = new User();
user.setId(System.currentTimeMillis());
user.setMsg(UUID.randomUUID().toString());
user.setSendTime(new Date());
kafkaSender.send(message);
Thread.sleep(3000);
}
}
控制檯可以看到執行成功:
在服務器執行 bin/kafka-topics.sh --list --zookeeper localhost:2181 可以看到topic
Kafka如何保證數據的不丟失
1.生產者數據的不丟失
- 新版本的producer採用異步發送機制。KafkaProducer.send(ProducerRecord)方法僅僅是把這條消息放入一個緩存中(即RecordAccumulator,本質上使用了隊列來緩存記錄),同時後臺的IO線程會不斷掃描該緩存區,將滿足條件的消息封裝到某個batch中然後發送出去。顯然,這個過程中就有一個數據丟失的窗口:若IO線程發送之前client端掛掉了,累積在accumulator中的數據的確有可能會丟失。 kafka的ack機制:在kafka發送數據的時候,每次發送消息都會有一個確認反饋機制,確保消息正常的能夠被收到。
- 如果是同步模式:ack機制能夠保證數據的不丟失,如果ack設置為0,風險很大,一般不建議設置為0
- producer.type=sync
- request.required.acks=1
- 如果是異步模式:通過buffer來進行控制數據的發送,有兩個值來進行控制,時間閾值與消息的數量閾值,如果buffer滿了數據還沒有發送出去,如果設置的是立即清理模式,風險很大,一定要設置為阻塞模式
- producer.type=async
- request.required.acks=1
- queue.buffering.max.ms=5000
- queue.buffering.max.messages=10000
- queue.enqueue.timeout.ms = -1
- batch.num.messages=200
- 結論:producer有丟數據的可能,但是可以通過配置保證消息的不丟失
- 2.消費者數據的不丟失
- 如果在消息處理完成前就提交了offset,那麼就有可能造成數據的丟失。由於Kafka consumer默認是自動提交位移的,所以在後臺提交位移前一定要保證消息被正常處理了,因此不建議採用很重的處理邏輯,如果處理耗時很長,則建議把邏輯放到另一個線程中去做。為了避免數據丟失,現給出兩點建議:
- enable.auto.commit=false 關閉自動提交位移
- 在消息被完整處理之後再手動提交位移
- 如果使用了storm,要開啟storm的ackfail機制;
- 如果沒有使用storm,確認數據被完成處理之後,再更新offset值。低級API中需要手動控制offset值。通過offset commit 來保證數據的不丟失,kafka自己記錄了每次消費的offset數值,下次繼續消費的時候,接著上次的offset進行消費即可。
閱讀更多 JAVA高級開發 的文章