12.25 kafka0.9.0 新特性(對比0.8.2)

kafka0.9.0 新特性(對比0.8.2)

image.png

1、引入新的Consumer API

0.9.0相比0.8.2,引入了一個新的Consumer API,這個API不再使用high level和low level的基於zookeeper的client;不過仍然支持0.8.0的client。

新的API通過如下方式引入依賴:

 <dependency>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka-clients/<artifactid>
<version>0.9.0.0/<version>
/<dependency>

1.1 consumer的offset

kafka0.8.0 的 consumer客戶端需要不斷與kafka集群的zookeeper交互,以獲取最新的offset。而新的consumer的offset則是交給kafka來管理,kafka通過創建專用的topic進行管理不同partition的offset。kafka自己維護了partition的offset,以供同一個partition的不同consumer使用。(圖中last commit offset 就是已經確認消費的offset)

kafka0.9.0 新特性(對比0.8.2)

假設現在某一個consumer消費到current position時,未來得及確認已消費就掛掉了,那麼下次其他consumer來拉數據時,就從last commit offset開始,重複消費1~6.Consumer的commit。

如果配置了enable.auto.commit為true和auto.commit.interval.ms=xxx,那麼就按照這個頻率進行commit;

為false時,就需要手動進行commit,可以使用同步方式commitSync,也可以使用 commitAsync 進行異步commit,對於異步確認的話,會返回一個hook,可以利用這個hook進行一定的業務邏輯處理。

consumer通過subscribe方法來訂閱它感興趣的topic,每次訂閱之後kafka又有新的consumer加進來的話,那麼就要對該topic的position進行重新分配(consumer和partition的比例最好是一個1:1)。

一般而言這個過程是consumer不感興趣的,因此無需知道;但是如果consumer願意感知這個事情,那麼就可以使用 ConsumerRebalanceListener這個類來進行監聽。

另外Consumer可以訂閱特殊的partition,實現指定消費partition的功能。適用於一些特殊的場景,比如:消費者所要消費的partition與消費者具有某種聯繫;或者消費者本身具有高可用性,如果消費者掛掉了,沒有必要讓kafka來重新分配partition。使用TopicPartition來表示某一個topic的指定partition。

1.2 在kafka外部存儲offset

允許在kafka外部存儲offset,也就是consumer和kafka同時維護一個offset,消費者程序不一定要使用kafka內置的offset存儲,而是可以自主選擇offset的存儲方式。如果能夠實現offset和result的原子性保存,將會實現exactly once的事務性保證,要比kafka的offset提交機制所提供的at-least once更加強壯。比如使用外部數據庫的事務來保存數據處理結果和offset的一致性,要麼共同成功並存儲,要麼失敗回滾。

使用方法:首先將auto.commit提交設置為false,然後使用 ConsumerRecord 來存儲offset,需要定位時,使用seek即可。

1.3 支持多線程

通過引入wakeupException實現,原理類似於多線程中的InterruptException(通過WakeupException就可以對Consumer進行優雅的控制。而且多個線程公用一個Consumer,Consumer本身非線程安全,因此如果不加外部控制,會導致跑出ConcurrentModificationException。多線程很可能導致非順序消費數據的問題,但是將消費和業務處理分離,耦合性降低

2、引入了安全管理機制:

a.客戶端(producer和consumer)連接broker時,可以使用SSL或者SASL進行驗證。

b.驗證從broker到zookeeper的連接

c.使用SSL對broker和client之間,broker之間以及使用SSL的工具進行數據編碼(這有可能導致性能惡化,取決於CPU和JVM實現)

d.驗證客戶端的讀寫操作

e.驗證是一個可插拔式的服務,並且支持統一整個驗證服務。

SSL或者SASL都是可選擇項,如果需要使用,那麼就需要進行額外的配置。

3、引入了Kafka Connect:

kafka connect是一個支持Scala的可靠工具。使用它來定義一個數據導入與導出的connector很容易。具有時延低,API操作簡單的特徵,支持分佈式或單機模式。


分享到:


相關文章: